手撸XXL-JOB(四)——远程调用定时任务

Java Socket网络编程

网络编程是Java编程中的重要组成部分,包括服务端和客户端两部分内容。Socket是Java网络编程的基本组件之一,用于在应用程序之间提供双向通信,Socket提供了一种标准的接口,允许应用程序通过网络发送和接收数据,在Java中,Socket可以分为客户端Socket和服务端Socket两种类型。
客户端Socket:客户端 Socket 用于与服务端 Socket 进行通信。客户端 Socket 通过指定服务端的 IP 地址和端口号,连接到服务端 Socket,然后发送数据到服务端 Socket。
服务端Socket:服务端 Socket 用于接收来自客户端 Socket 的连接请求,并在连接成功后,与客户端 Socket 进行通信。服务端 Socket 首先需要创建一个 ServerSocket 对象,并通过 bind 方法绑定到一个本地端口,然后等待客户端 Socket 的连接请求。
下面是Socket的一个示例:
服务端:

package org.example.demo1;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;public class Server {public static void main(String[] args) {try {ServerSocket serverSocket = new ServerSocket(8000);System.out.println("Server started, waiting for client...");Socket socket = serverSocket.accept();System.out.println("Client connected.");BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream(), true);String message;while ((message = in.readLine()) != null) {System.out.println("Client:" + message);out.println("Server received message:" + message);}in.close();out.close();socket.close();serverSocket.close();} catch (IOException e) {e.printStackTrace();}}
}

客户端:

package org.example.demo1;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;public class Client {public static void main(String[] args) {try {Socket socket = new Socket("localhost", 8000);System.out.println("Connected to server.");BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream(), true);BufferedReader consoleIn = new BufferedReader(new InputStreamReader(System.in));String message;while ((message = consoleIn.readLine()) != null) {out.println(message);System.out.println("Server:" + in.readLine());}consoleIn.close();in.close();out.close();socket.close();} catch (IOException e) {e.printStackTrace();}}
}

首先启动服务端,然后启动客户端,在客户端的控制台,输入数据,服务端能接收到数据并返回对应的响应。
image.png
image.png

远程调用定时任务

首先,我们创建两个模块,core模块包含yang-job的一些核心内容,比如IJobExecutor执行器、JobExecuteRequest执行器请求等;client模块依赖core模块,并封装和socket客户端调用相关的一些内容。
然后创建一个sample1模块,用于演示。
image.png

core模块

image.png
core目前定义了定时任务执行类和其入参、出参等信息,其中,YangJobTransferDTO包含任务类路径和任务请求,如下所示:

package com.yang.job.dto;import com.yang.job.execute.YangJobExecuteRequest;import java.io.Serializable;public class YangJobTransferDTO implements Serializable {private String className;private YangJobExecuteRequest yangJobExecuteRequest;public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public YangJobExecuteRequest getYangJobExecuteRequest() {return yangJobExecuteRequest;}public void setYangJobExecuteRequest(YangJobExecuteRequest yangJobExecuteRequest) {this.yangJobExecuteRequest = yangJobExecuteRequest;}
}
client模块

image.png
client模块定义了客户端所需要的一些类,其中,YangJob为注解类,对于每一个定时任务,需要加上YangJob注解,才能被正确调用。

package com.yang.job.client.annotations;import java.lang.annotation.*;@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface YangJob {
}

YangJobClientProperty为配置信息类,目前需要两个配置信息,客户端socket的ip和端口号

package com.yang.job.client.configuration;import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class YangJobClientProperty {@Value("${yang-job.executor.port}")private Integer port;@Value("${yang-job.executor.ip}")private String ip;public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}
}

YangJobClientPostProcessor在SpringBoot加载完毕后,扫描bean,将实现IYongJobExecutor的bean,注册到YangJobClientManager的map中,方便后续调用

package com.yang.job.client.schema;import com.yang.job.client.annotations.YangJob;
import com.yang.job.client.YangJobClientManager;
import com.yang.job.execute.IYangJobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;public class YangJobClientPostProcessor implements BeanPostProcessor {public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (!(bean instanceof IYangJobExecutor)) {return bean;}YangJob annotation = bean.getClass().getAnnotation(YangJob.class);if (annotation == null) {return bean;}YangJobClientManager.putJobExecutor(bean.getClass().getName(), (IYangJobExecutor) bean);return bean;}
}

YangJobClientManager负责监听端口和管理定时任务的执行,它会监听我们配置的yang-job.execute.port端口号,然后当接收到消息时,将消息转为入参,并取出对应的定时任务执行类,执行对应的代码。

package com.yang.job.client;import com.alibaba.fastjson.JSONObject;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.dto.ResultT;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class YangJobClientManager {private static Map<String, IYangJobExecutor> className2JobExecutorMap = new ConcurrentHashMap<>();private YangJobClientPropertyDTO yangJobClientPropertyDTO;private ServerSocket serverSocket;public YangJobClientManager(YangJobClientPropertyDTO yangJobClientPropertyDTO) {this.yangJobClientPropertyDTO = yangJobClientPropertyDTO;}public void init() {Integer port = this.yangJobClientPropertyDTO.getPort();try {this.serverSocket = new ServerSocket(port);} catch (IOException e) {throw new RuntimeException(e);}System.out.println("init success============");new Thread(() -> {while (true) {try {Socket socket = serverSocket.accept();BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);String params = bufferedReader.readLine();YangJobTransferDTO yangJobTransferDTO = JSONObject.parseObject(params, YangJobTransferDTO.class);System.out.println(yangJobTransferDTO);String className = yangJobTransferDTO.getClassName();YangJobExecuteRequest yangJobExecuteRequest = yangJobTransferDTO.getYangJobExecuteRequest();IYangJobExecutor jobExecutor = getJobExecutor(className);if (jobExecutor != null) {ResultT response = jobExecutor.execute(yangJobExecuteRequest);printWriter.println(JSONObject.toJSONString(response));}bufferedReader.close();printWriter.close();socket.close();} catch (IOException e) {e.printStackTrace();}if (serverSocket.isClosed() || serverSocket == null) {break;}}}).start();}public void shutdown() {if (this.serverSocket != null) {try {this.serverSocket.close();} catch (IOException e) {throw new RuntimeException(e);}}}public YangJobClientPropertyDTO getYangJobPropertyDTO() {return this.yangJobClientPropertyDTO;}public static void putJobExecutor(String className, IYangJobExecutor iJobExecutor) {className2JobExecutorMap.put(className, iJobExecutor);}public static IYangJobExecutor getJobExecutor(String className) {return className2JobExecutorMap.get(className);}}

YangJobClientContext为客户端的上下文,负责监听SpringBoot刷新消息和关闭消息,并执行对应的操作。

package com.yang.job.client;import com.yang.job.client.utils.SpringContextUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;public class YangJobClientContext implements ApplicationListener<ApplicationContextEvent> {private static YangJobClientContext instance;private ApplicationContext applicationContext;@Overridepublic void onApplicationEvent(ApplicationContextEvent event) {if (event instanceof ContextRefreshedEvent) {System.out.println("刷新了=========");YangJobClientContext.instance = this;instance.applicationContext = applicationContext;init();} else if (event instanceof ContextClosedEvent) {System.out.println("销毁了=========");shutdown();}}private void init() {YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);yangJobClientManager.init();}private void shutdown() {YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);yangJobClientManager.shutdown();}
}

YangJobClientConfiguration为配置类,负责对YangJobClientPostProcessor、YangJobClientManager和YangJobClientContext的统一配置管理。

package com.yang.job.client.configuration;import com.yang.job.client.YangJobClientManager;
import com.yang.job.client.YangJobClientContext;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.client.schema.YangJobClientPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class YangJobClientConfiguration {@Autowiredprivate YangJobClientProperty yangJobClientProperty;@Beanpublic YangJobClientPostProcessor yangJobPostProcessor() {return new YangJobClientPostProcessor();}@Beanpublic YangJobClientManager yangJobClientManager() {YangJobClientPropertyDTO yangJobClientPropertyDTO = new YangJobClientPropertyDTO();yangJobClientPropertyDTO.setIp(yangJobClientProperty.getIp());yangJobClientPropertyDTO.setPort(yangJobClientProperty.getPort());return new YangJobClientManager(yangJobClientPropertyDTO);}@Beanpublic YangJobClientContext yangJobContext() {return new YangJobClientContext();}
}

最后,为了使引入client依赖的应用,能自动装配我们提供的bean,我们在resources目录下创建META-INF目录,在该目录下创建spring.factories文件,文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.yang.job.client.utils.SpringContextUtils,\com.yang.job.client.configuration.YangJobClientProperty,\com.yang.job.client.configuration.YangJobClientConfiguration
sample1

我们创建一个sample1项目,引入spring-boot-starter-web依赖和yang-client,yang-core的依赖

  <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.yang</groupId><artifactId>yang-job-core</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.yang</groupId><artifactId>yang-job-client</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies>

创建启动类

package com.yang.job.sample1;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class YangJobSample1App {public static void main(String[] args) {SpringApplication.run(YangJobSample1App.class, args);}
}

创建一个任务类:

package com.yang.job.sample1.task;import com.yang.job.client.annotations.YangJob;
import com.yang.job.dto.ResultT;
import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;@Component
@YangJob
public class TestTask1 implements IYangJobExecutor {@Overridepublic ResultT execute(YangJobExecuteRequest yangJobExecuteRequest) {System.out.println("开启定时任务了,入参为:" + yangJobExecuteRequest);return ResultT.success();}
}

添加配置文件,因为client模块的YangJobClientProperty需要有yang-job.executor.port和yang-job.executor.ip这两个配置,如果我们的配置文件中,缺少这些配置,会导致报错,无法启动项目。

spring:application:name: YangJobSample1App
yang-job:executor:port: 9999ip: 127.0.0.1
server:port: 8001
测试

我们先启动刚才的sample1项目,然后执行下列代码,来远程调用TestTask1方法执行类。

 public static void main(String[] args) {try {Socket socket = new Socket("127.0.0.1", 9999);System.out.println("链接成功=============");PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();yangJobExecuteRequest.setJobId("1");yangJobExecuteRequest.addParam("num", "1");YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();yangJobTransferDTO.setClassName("com.yang.job.sample1.task.TestTask1");yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));System.out.println("response:" + bufferedReader.readLine());bufferedReader.close();printWriter.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

执行结果如下,说明我们能成功地进行远程调用。
image.png
image.png

添加远程任务

domain层

在上一篇文章中,我们操作的任务,都是本地任务,现在我们需要对远程任务进行操作,为了区分任务类型,我们首先在domain层添加一个任务类型枚举

package com.yang.job.admin.domain.enums;public enum JobTypeEnum {LOCAL("local", "本地任务"),REMOTE("remote", "远程任务");private String name;private String description;JobTypeEnum(String name, String description) {this.name = name;this.description = description;}public String getName() {return name;}public String getDescription() {return description;}public static JobTypeEnum getJobTypeByName(String name) {for (JobTypeEnum value : values()) {if (value.getName().equals(name)) {return value;}}return null;}
}

然后修改YangJobModel,添加上任务类型枚举和远程任务信息

package com.yang.job.admin.domain.model;import com.yang.job.admin.client.dto.common.BusinessException;
import com.yang.job.admin.client.dto.common.ErrorCode;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.event.SaveJobPostEvent;
import com.yang.job.admin.domain.event.SubmitJobPostEvent;
import com.yang.job.admin.domain.event.UpdateJobPostEvent;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.event.EventCenter;
import com.yang.job.admin.infra.job.YangJobManager;
import com.yang.job.admin.infra.job.request.YangJobSubmitParam;
import com.yang.job.admin.infra.utils.CronUtils;
import com.yang.job.admin.infra.utils.SpringContextUtils;
import lombok.Data;import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Data
public class YangJobModel implements Serializable {private Integer jobId;private String jobName;private String description;private String cron;private String executeClassPath;private Runnable runnable;private JobExecuteStrategyEnum executeStrategy;private JobTypeEnum jobType;private RemoteExecutorMessage remoteExecutorMessage;private Integer enable;private Integer open;private Date createTime;private Date updateTime;private Map<String, String> featureMap = new HashMap<>();private Map<String, String> executeParamMap = new HashMap<>();public boolean isEnable() {if (this.enable == null) {return false;}return this.enable == 1;}public boolean isOpen() {if (!isEnable()) {return false;}if (this.open == null) {return false;}return this.open == 1;}public boolean isClose() {return !isOpen();}public boolean isLocalJob() {return JobTypeEnum.LOCAL == this.jobType;}public boolean isRemoteJob() {return JobTypeEnum.REMOTE == this.jobType;}public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) {if (jobExecuteStrategyEnum == null) {throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST);}this.executeStrategy = jobExecuteStrategyEnum;}public void submitJob() {YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam();YangJobManager yangJobManager = getYangJobManager();yangJobManager.submitJob(yangJobSubmitParam);// 提交任务后,发送提交任务后置事件SubmitJobPostEvent submitJobPostEvent = new SubmitJobPostEvent(yangJobSubmitParam);getEventCenter().postEvent(submitJobPostEvent);}public void cancelJob() {YangJobManager yangJobManager = getYangJobManager();yangJobManager.cancelJob(this.jobId);}private YangJobSubmitParam convert2YangJobSubmitParam() {YangJobSubmitParam yangJobBuildParam = new YangJobSubmitParam();yangJobBuildParam.setJobId(this.jobId);yangJobBuildParam.setRunnable(this.runnable);ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(this.cron, ZonedDateTime.now());ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(this.cron, nextExecutionTime);long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();yangJobBuildParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);yangJobBuildParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);yangJobBuildParam.setJobExecuteStrategy(this.executeStrategy);return yangJobBuildParam;}public void postSaveJobEvent() {SaveJobPostEvent saveJobPostEvent = new SaveJobPostEvent(this.jobId);getEventCenter().asyncPostEvent(saveJobPostEvent);}public void postUpdateJobEvent() {UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);getEventCenter().asyncPostEvent(updateJobPostEvent);}public void postDeleteJobEvent() {UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);getEventCenter().asyncPostEvent(updateJobPostEvent);}private YangJobManager getYangJobManager() {return SpringContextUtils.getBeanOfType(YangJobManager.class);}private EventCenter getEventCenter() {return SpringContextUtils.getBeanOfType(EventCenter.class);}}

远程任务信息类:

package com.yang.job.admin.domain.valueobject;import lombok.Data;import java.io.Serializable;@Data
public class RemoteExecutorMessage implements Serializable {private String ip;private Integer port;
}

接着我们添加一个features枚举,用于记录映射features字段中各个key表示的含义,因为我们现在表的设计中没有任务类型字段和远程信息相关的字段,所以会将这些信息添加到features字段中

package com.yang.job.admin.domain.enums;public enum JobModelFeatureEnum {JOB_TYPE("jobType", "任务类型"),REMOTE_EXECUTOR_IP("executorIp", "执行器ip"),REMOTE_EXECUTOR_PORT("executorPort", "执行器端口"),REMOTE_EXECUTOR_MESSAGE("r_executor_m", "远程执行器的信息");private String name;private String description;JobModelFeatureEnum(String name, String description) {this.name = name;this.description = description;}public String getName() {return name;}public String getDescription() {return description;}
}
client层

我们修改原先的NewYangJobCommand类,加上任务类型属性

package com.yang.job.admin.client.dto.command;import lombok.Data;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;@Data
public class NewYangJobCommand implements Serializable {private String jobName;private String description;private String cron;private String executeStrategy;private String jobType;private String executeClassPath;private Integer open;private Map<String, String> params = new HashMap<>();
}

然后修改YangJobDTO类,也加上jobType属性

package com.yang.job.admin.client.dto;import lombok.Data;import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Data
public class YangJobDTO implements Serializable {private Integer jobId;private String jobName;private String description;private String cron;private String executeStrategy;private String executeClassPath;private String jobType;private Integer enable;private Integer open;private Date createTime;private Date updateTime;private Map<String, String> featureMap = new HashMap<>();private Map<String, String> executeParamMap = new HashMap<>();
}
application层

接着修改YangJobApplicationService类的convertYangJobModel方法,将jobType任务类型和远程任务信息添加到YangJobModel中

 private YangJobModel convert2YangJobModel(NewYangJobCommand newYangJobCommand) {String jobType = newYangJobCommand.getJobType();JobTypeEnum jobTypeEnum = JobTypeEnum.getJobTypeByName(jobType);if (jobType == null) {throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);}YangJobModel yangJobModel = new YangJobModel();yangJobModel.setJobName(newYangJobCommand.getJobName());yangJobModel.setDescription(newYangJobCommand.getDescription());yangJobModel.setCron(newYangJobCommand.getCron());yangJobModel.setOpen(newYangJobCommand.getOpen());yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(newYangJobCommand.getExecuteStrategy()));yangJobModel.setExecuteClassPath(newYangJobCommand.getExecuteClassPath());yangJobModel.setExecuteParamMap(newYangJobCommand.getParams());yangJobModel.setJobType(jobTypeEnum);if (jobTypeEnum == JobTypeEnum.REMOTE) {String ip = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_IP.getName());String port = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_PORT.getName());if (ip == null || port == null) {throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);}RemoteExecutorMessage remoteExecutorMessage = new RemoteExecutorMessage();remoteExecutorMessage.setIp(ip);remoteExecutorMessage.setPort(Integer.valueOf(port));yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);} else {if (yangJobModel.getExecuteClassPath() == null || yangJobModel.getExecuteClassPath().isEmpty()) {throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);}try {Class.forName(yangJobModel.getExecuteClassPath());} catch (ClassNotFoundException e) {e.printStackTrace();throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);}}return yangJobModel;}
infra层

最后修改基础设施层,首先修改YangJobModelConvertor类,将RemoteMessage和JobType转化到features中,以及从features中取出

package com.yang.job.admin.infra.gatewayimpl.repository.convertor;import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobModelFeatureEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.model.YangJobModel;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.data.YangJobData;
import com.yang.job.admin.infra.job.thread.RemoteJobExecuteThread;
import com.yang.job.admin.infra.utils.FeaturesUtils;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class YangJobModelConvertor {public YangJobData convert2Data(YangJobModel yangJobModel) {if (yangJobModel == null) {return null;}YangJobData yangJobData = new YangJobData();yangJobData.setJobId(yangJobModel.getJobId());yangJobData.setJobName(yangJobModel.getJobName());yangJobData.setDescription(yangJobModel.getDescription());yangJobData.setCron(yangJobModel.getCron());yangJobData.setExecuteClassPath(yangJobModel.getExecuteClassPath());yangJobData.setEnable(yangJobModel.getEnable());yangJobData.setOpen(yangJobModel.getOpen());yangJobData.setCreateTime(yangJobModel.getCreateTime());yangJobData.setUpdateTime(yangJobModel.getUpdateTime());Map<String, String> featureMap = yangJobModel.getFeatureMap();featureMap.put(JobModelFeatureEnum.JOB_TYPE.getName(), yangJobModel.getJobType().getName());featureMap.put(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName(), JSONObject.toJSONString(yangJobModel.getRemoteExecutorMessage()));yangJobData.setFeatures(FeaturesUtils.convert2Features(featureMap));yangJobData.setExecuteParams(FeaturesUtils.convert2Features(yangJobModel.getExecuteParamMap()));yangJobData.setExecuteStrategy(yangJobModel.getExecuteStrategy().getName());return yangJobData;}public YangJobModel convert2Model(YangJobData yangJobData) {if (yangJobData == null) {return null;}YangJobModel yangJobModel = new YangJobModel();yangJobModel.setJobId(yangJobData.getJobId());yangJobModel.setDescription(yangJobData.getDescription());yangJobModel.setCron(yangJobData.getCron());yangJobModel.setJobName(yangJobData.getJobName());yangJobModel.setExecuteClassPath(yangJobData.getExecuteClassPath());yangJobModel.setEnable(yangJobData.getEnable());yangJobModel.setOpen(yangJobData.getOpen());yangJobModel.setCreateTime(yangJobData.getCreateTime());yangJobModel.setUpdateTime(yangJobData.getUpdateTime());yangJobModel.setFeatureMap(FeaturesUtils.convert2FeatureMap(yangJobData.getFeatures()));yangJobModel.setExecuteParamMap(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));JobExecuteStrategyEnum executeStrategy = JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy());if (executeStrategy == null) {throw new RuntimeException("执行策略有误!");}JobTypeEnum jobType = JobTypeEnum.getJobTypeByName(yangJobModel.getFeatureMap().get(JobModelFeatureEnum.JOB_TYPE.getName()));yangJobModel.setJobType(jobType);String remoteMessageStr = yangJobModel.getFeatureMap().get(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName());RemoteExecutorMessage remoteExecutorMessage = JSONObject.parseObject(remoteMessageStr, RemoteExecutorMessage.class);yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);yangJobModel.setExecuteStrategy(executeStrategy);yangJobModel.setRunnable(buildRunnable(yangJobModel));return yangJobModel;}private Runnable buildRunnable(YangJobModel yangJobModel) {if (yangJobModel.isLocalJob()) {String executeClassPath = yangJobModel.getExecuteClassPath();try {Class<?> aClass = Class.forName(executeClassPath);if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {throw new RuntimeException("该类必须实现IYangJobExecutor接口");}IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);Runnable runnable = () -> executor.execute(yangJobExecuteRequest);return runnable;} catch (InstantiationException | IllegalAccessException e) {e.printStackTrace();} catch (ClassNotFoundException e) {System.out.println(String.format("%s 类路径对应的类不存在", executeClassPath));e.printStackTrace();}} else {RemoteExecutorMessage remoteExecutorMessage = yangJobModel.getRemoteExecutorMessage();String executeClassPath = yangJobModel.getExecuteClassPath();YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();yangJobTransferDTO.setClassName(executeClassPath);YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);return new RemoteJobExecuteThread(remoteExecutorMessage, yangJobTransferDTO);}return null;}private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobModel yangJobModel) {YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();yangJobExecuteRequest.setJobId(yangJobModel.getJobId().toString());yangJobExecuteRequest.setParams(yangJobModel.getExecuteParamMap());return yangJobExecuteRequest;}
}

然后添加一个RemoteJobExecuteThread类,该类实现runnable接口,当我们的任务类型为远程调用时,其YangJobModel的runnable属性为remoteJobExecuteThread类

package com.yang.job.admin.infra.job.thread;import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.core.dto.YangJobTransferDTO;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;public class RemoteJobExecuteThread implements Runnable {private YangJobTransferDTO yangJobTransferDTO;private RemoteExecutorMessage remoteExecutorMessage;public RemoteJobExecuteThread(RemoteExecutorMessage remoteExecutorMessage, YangJobTransferDTO yangJobTransferDTO) {this.remoteExecutorMessage = remoteExecutorMessage;this.yangJobTransferDTO = yangJobTransferDTO;}@Overridepublic void run() {try {String ip = remoteExecutorMessage.getIp();Integer port = remoteExecutorMessage.getPort();Socket socket = new Socket(ip, port);PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));bufferedReader.close();printWriter.close();socket.close();} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
}
测试

我们先启动之前的sample1项目,然后启动yang-job-admin,调用http://localhost:8080/job添加任务,请求体如下:

{"jobName": "RemoteJobExecutor",
"description":"RemoteJobExecutor","cron": "0/10 * * * * ?","executeStrategy": "withFixedDelay","executeClassPath": "com.yang.job.sample1.task.TestTask1",
"open":1,
"jobType":"remote",
"params":{
"executorIp":"127.0.0.1",
"executorPort":"9999"
}
}

image.png
添加成功后,我们查看Sample1项目的控制台,可以看到,每10秒,这个TestTask1任务会被调用一次
image.png

参考文章

https://www.yihuo.tech/programming/server-stack/exploring-the-java-network-programming-paradigm-socket-udp-nio-and-netty-in-focus/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/700678.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目实施方案:多点异地机动车典型系统试验状态可视监控系统

目录 一、需求分析 1.1项目背景 1.2项目概述 二、系统优势 2.1兼容性能力强 2.2接入协议多样 2.3并发能力强 2.3.1 单平台参数 2.3.2 多平台性能参数 2.4 系统稳定性 三、建设目标 3.1安全性 3.2可扩展性 3.3易用性 3.4兼容性 3.5 响应能力 四、系统整体解决方…

Nodejs笔记2

模块化 模块化初体验 模块暴露数据 导入模块 fs 写绝对路径 require写相对路径不会受到影响 ./../不能省略 js 和json文件后缀可以省略 如果存在 命名相同的js和json文件&#xff0c;优先导入js文件 导入文件夹时的情况 require导入模块的基本流程 commonJS模块…

上位机图像处理和嵌入式模块部署(树莓派4b和电源供给)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 前面&#xff0c;我们说过pc电脑和嵌入式设备&#xff0c;两者都可以实现相同的软件功能。但是和pc相比较&#xff0c;嵌入式设备不仅价格更便宜&a…

24深圳杯C题18页高质量论文+可执行代码+图表

比赛题目的完整版思路可执行代码数据参考论文都会在第一时间更新上传的&#xff0c;大家可以参考我往期的资料&#xff0c;所有的资料数据以及到最后更新的参考论文都是一次付费后续免费的。注意&#xff1a;&#xff08;建议先下单占坑&#xff0c;因为随着后续我们更新资料数…

129.哈希表:有效的字母异位词(力扣)

242. 有效的字母异位词 - 力扣&#xff08;LeetCode&#xff09; 题目描述 代码解决以及思路 这个方法的时间复杂度为O(N)&#xff0c;其中N是字符串的长度&#xff0c;空间复杂度为O(1)&#xff08;因为辅助数组的大小是固定的26&#xff09;。 class Solution { public:bo…

智能终端RK3568主板在智慧公交条形屏项目的应用,支持鸿蒙,支持全国产化

基于AIoT-3568A的智慧公交条形屏&#xff0c;可支持公交线路动态展示&#xff0c;语音到站提醒&#xff0c;减少过乘、漏乘的情况&#xff0c;有效提高了公交服务效率和质量&#xff0c;为乘客提供了更舒适、更安全和更方便的出行体验&#xff0c;为城市的发展增添了新的活力。…

在idea中使用vue

一、安装node.js 1、在node.js官网&#xff08;下载 | Node.js 中文网&#xff09;上下载适合自己电脑版本的node.js压缩包 2、下载完成后进行解压并安装&#xff0c;一定要记住自己的安装路径 一直点击next即可&#xff0c;这部选第一个 3、安装成功后&#xff0c;按住winR输入…

嵌入式科普(16)c语言函数参数的传递方式

目录 一、概述 二、C函数参数 2.1 一张图讲清 2.2 按数据类型分类&#xff1a; 2.2.1 基本数据类型参数&#xff1a; 2.2.2 数组参数&#xff1a; 2.2.3 结构体参数&#xff1a; 2.2.4 指针参数&#xff1a; 2.2.5 函数指针参数&#xff1a; 2.3 按传递方式分类&…

具身触觉社区| “大咖面对面”第一期活动顺利举行

4月27日&#xff0c;由中国人工智能学会认知系统与信息处理专委会组织的“具身触觉社区”第一期“大咖面对面”分享活动顺利举行&#xff0c;我们邀请到了美国麻省理工学院&#xff08;MIT&#xff09;博士、视触觉传感器的奠基人、GelSight指尖传感器发明人李瑞老师为社区带来…

原生小程序开发如何使用 tailwindcss

原生小程序开发如何使用 tailwindcss 原生小程序开发如何使用 tailwindcss 前言什么是 weapp-tailwindcss ?0. 准备环境以及小程序项目1. 安装与配置 tailwindcss 0. 使用包管理器安装 tailwindcss1. 在项目目录下创建 postcss.config.js 并注册 tailwindcss2. 配置 tailwind…

SpringBoot项目的项目部署全过程

一、前端 安装nginx 1.将提前准备好的nginx的安装包上传到Linux中/opt目录下(我用的是Xftp) 2.解压 2.1:在xshell中解压该文件: tar -zxvf nginx-1.20.1.tar.gz 2.2:进入解压后的目录 cd nginx-1.20.1/ 2.3:安装需要的依赖 yum -y install zlib zlib-devel openssl openssl-de…

HTML特殊字符

特殊字符 有特殊含义的字符成为字符实体 对于有特殊含义的字符,需要通过转移字符来表示 <span> <br><a href"http://www.atguigu.com">我 爱 前端</a> <br>&amp;amp; 效果