【优化】XXLJOB修改为使用虚拟线程

【优化】XXLJOB修改为使用虚拟线程

新建这几个目录 类, 去找项目对应的xxljob的源码

主要是将 new Thread  改为  虚拟线程

Thread.ofVirtual().name("VT").unstarted

以下代码是 xxljob   2.3.0版本  举一反三 去修改对应版本的代码

<!--        定时任务--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version></dependency>

XxlJobExecutor
package com.xxl.job.core.executor;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** Created by xuxueli on 2016/3/2 21:14.*/
public class XxlJobExecutor  {private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);// ---------------------- param ----------------------private String adminAddresses;private String accessToken;private String appname;private String address;private String ip;private int port;private String logPath;private int logRetentionDays;public void setAdminAddresses(String adminAddresses) {this.adminAddresses = adminAddresses;}public void setAccessToken(String accessToken) {this.accessToken = accessToken;}public void setAppname(String appname) {this.appname = appname;}public void setAddress(String address) {this.address = address;}public void setIp(String ip) {this.ip = ip;}public void setPort(int port) {this.port = port;}public void setLogPath(String logPath) {this.logPath = logPath;}public void setLogRetentionDays(int logRetentionDays) {this.logRetentionDays = logRetentionDays;}// ---------------------- start + stop ----------------------public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);}public void destroy(){// destory executor-serverstopEmbedServer();// destory jobThreadRepositoryif (jobThreadRepository.size() > 0) {for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");// wait for job thread push result to callback queueif (oldJobThread != null) {try {oldJobThread.join();} catch (InterruptedException e) {logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);}}}jobThreadRepository.clear();}jobHandlerRepository.clear();// destory JobLogFileCleanThreadJobLogFileCleanThread.getInstance().toStop();// destory TriggerCallbackThreadTriggerCallbackThread.getInstance().toStop();}// ---------------------- admin-client (rpc invoker) ----------------------private static List<AdminBiz> adminBizList;private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}// ---------------------- executor-server (rpc provider) ----------------------private EmbedServer embedServer = null;private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}private void stopEmbedServer() {// stop provider factoryif (embedServer != null) {try {embedServer.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}}// ---------------------- job handler repository ----------------------private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}// ---------------------- job thread repository ----------------------private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);Thread.ofVirtual().name("VT_XXLJOB").start(newJobThread);
//        newJobThread.start();
//        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public static JobThread removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread = jobThreadRepository.remove(jobId);if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();return oldJobThread;}return null;}public static JobThread loadJobThread(int jobId){JobThread jobThread = jobThreadRepository.get(jobId);return jobThread;}}

EmbedServer
package com.xxl.job.core.server;import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2020-04-11 21:25*/
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = Thread.ofVirtual().name("VT_EmbedServer").unstarted(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ExecutorService bizThreadPool = Executors.newVirtualThreadPerTaskExecutor();//                ThreadPoolExecutor bizThreadPool2 = new ThreadPoolExecutor(
//                        0,
//                        200,
//                        60L,
//                        TimeUnit.SECONDS,
//                        new LinkedBlockingQueue<Runnable>(2000),
//                        new ThreadFactory() {
//                            @Override
//                            public Thread newThread(Runnable r) {
//                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
//                            }
//                        },
//                        new RejectedExecutionHandler() {
//                            @Override
//                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
//                            }
//                        });try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}public void stop() throws Exception {// destroy server threadif (thread!=null && thread.isAlive()) {thread.interrupt();}// stop registrystopRegistry();logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");}// ---------------------- registry ----------------------/*** netty_http** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2015-11-24 22:25:15*/public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz;private String accessToken;
//        private ThreadPoolExecutor bizThreadPool;private ExecutorService bizThreadPool;public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ExecutorService bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invokeObject responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to jsonString responseJson = GsonTool.toJson(responseObj);// write responsewriteResponse(ctx, keepAlive, responseJson);}});}private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken!=null&& accessToken.trim().length()>0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {if ("/beat".equals(uri)) {return executorBiz.beat();} else if ("/idleBeat".equals(uri)) {IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);} else if ("/run".equals(uri)) {TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);} else if ("/kill".equals(uri)) {KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);} else if ("/log".equals(uri)) {LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write responseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close();      // beat 3N, close if idlelogger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");} else {super.userEventTriggered(ctx, evt);}}}// ---------------------- registry ----------------------public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}public void stopRegistry() {// stop registryExecutorRegistryThread.getInstance().toStop();}}

ExecutorRegistryThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 17/3/2.*/
public class ExecutorRegistryThread {private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);private static ExecutorRegistryThread instance = new ExecutorRegistryThread();public static ExecutorRegistryThread getInstance(){return instance;}private Thread registryThread;private volatile boolean toStop = false;public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});registryThread.setDaemon(true);registryThread.setName("VT_xxl-job, executor ExecutorRegistryThread");registryThread.start();}public void toStop() {toStop = true;// interrupt and waitif (registryThread != null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}}

JobLogFileCleanThread
package com.xxl.job.core.thread;import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** job file clean thread** @author xuxueli 2017-12-29 16:23:43*/
public class JobLogFileCleanThread {private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);private static JobLogFileCleanThread instance = new JobLogFileCleanThread();public static JobLogFileCleanThread getInstance(){return instance;}private Thread localThread;private volatile boolean toStop = false;public void start(final long logRetentionDays){// limit min valueif (logRetentionDays < 3 ) {return;}localThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDaysFile[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!=null && childDirs.length>0) {// todayCalendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate = todayCal.getTime();for (File childFile: childDirs) {// validif (!childFile.isDirectory()) {continue;}if (childFile.getName().indexOf("-") == -1) {continue;}// file create dateDate logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");}});localThread.setDaemon(true);localThread.setName("VT_xxl-job, executor JobLogFileCleanThread");localThread.start();}public void toStop() {toStop = true;if (localThread == null) {return;}// interrupt and waitlocalThread.interrupt();try {localThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}

JobThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;/*** handler thread* @author xuxueli 2016-1-16 19:52:47*/
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;private IJobHandler handler;private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_IDprivate volatile boolean toStop = false;private String stopReason;private boolean running = false;    // if running jobprivate int idleTimes = 0;			// idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.debug(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = Thread.ofVirtual().name("VT_XXL").start(futureTask);
//                            futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.debug(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}

TriggerCallbackThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.job.core.util.JdkSerializeTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 16/7/22.*/
public class TriggerCallbackThread {private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);private static TriggerCallbackThread instance = new TriggerCallbackThread();public static TriggerCallbackThread getInstance(){return instance;}/*** job results callback queue*/private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();public static void pushCallBack(HandleCallbackParam callback){getInstance().callBackQueue.add(callback);logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());}/*** callback thread*/private Thread triggerCallbackThread;private Thread triggerRetryCallbackThread;private volatile boolean toStop = false;public void start() {// validif (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callbacktriggerCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// normal callbackwhile(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("VT_xxl-job, executor TriggerCallbackThread");triggerCallbackThread.start();// retrytriggerRetryCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while(!toStop){try {retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");}});triggerRetryCallbackThread.setDaemon(true);triggerRetryCallbackThread.start();}public void toStop(){toStop = true;// stop callback, interrupt and waitif (triggerCallbackThread != null) {    // support empty admin addresstriggerCallbackThread.interrupt();try {triggerCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop retry, interrupt and waitif (triggerRetryCallbackThread != null) {triggerRetryCallbackThread.interrupt();try {triggerRetryCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}/*** do callback, will retry if error* @param callbackParamList*/private void doCallback(List<HandleCallbackParam> callbackParamList){boolean callbackRet = false;// callback, will retry if errorfor (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");callbackRet = true;break;} else {callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);}} catch (Exception e) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());}}if (!callbackRet) {appendFailCallbackFile(callbackParamList);}}/*** callback log*/private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){for (HandleCallbackParam callbackParam: callbackParamList) {String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());XxlJobContext.setXxlJobContext(new XxlJobContext(-1,null,logFileName,-1,-1));XxlJobHelper.log(logContent);}}// ---------------------- fail-callback file ----------------------private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){// validif (callbackParamList==null || callbackParamList.size()==0) {return;}// append filebyte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));if (callbackLogFile.exists()) {for (int i = 0; i < 100; i++) {callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));if (!callbackLogFile.exists()) {break;}}}FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);}private void retryFailCallbackFile(){// validFile callbackLogPath = new File(failCallbackFilePath);if (!callbackLogPath.exists()) {return;}if (callbackLogPath.isFile()) {callbackLogPath.delete();}if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {return;}// load and clear file, retryfor (File callbaclLogFile: callbackLogPath.listFiles()) {byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);// avoid empty fileif(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){callbaclLogFile.delete();continue;}List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);callbaclLogFile.delete();doCallback(callbackParamList);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/294659.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ElasticSearch 数据分片

一、ElasticSearch 分片 ElasticSearch集群中有许多个节点(Node)&#xff0c;每一个节点实例就是一个实例&#xff1b;数据分布在分片之间。集群的容量和性能主要取决于分片如何在节点上如何分配。将数据分片是为了提高可处理的容量和易于进行水平扩展&#xff0c;为分片做副本…

【已解决】使用fastjson返回给echarts的时候怎么不忽略null值?

问题复现&#xff1a; 在使用fastjson的将对象序列化成json字符串的时候&#xff0c;如果对象属性为null的话&#xff0c;会被忽略掉的。有时候&#xff0c;这些null值还是很有用的。比如我们在使用echarts的时候&#xff0c;返回给前端就不能因为null而忽略&#xff0c;如果忽…

前端开发基于Qunee绘制网络拓扑图总结-01

节点、连线添加label标签&#xff1a; 当需要在节点或者连线上添加图标、文字等醒目标识时&#xff0c;可添加label标签 自定义事件控制label标签的显示、隐藏&#xff1a; 外部点击事件控制某些自定义标识显、隐 showHideLableUI(edge, visible) {let uis edge.bindingUIs…

揭秘2024年最新骨传导耳机排行榜,全面解析骨传导耳机排行榜品牌

随着科技的飞速发展&#xff0c;人们对音质和舒适度的需求也在不断提高。骨传导耳机作为一种独特的耳机类型&#xff0c;近年来逐渐受到了消费者的关注。它通过将声音通过骨骼传导&#xff0c;而不是传统的耳道传递&#xff0c;既能保证音质&#xff0c;又能避免长时间佩戴耳机…

使用教程之【SkyWant.[2304]】路由器操作系统,破解移动【Netkeeper】校园网【小白篇】

许多高校目前饱受Netkeeper认证的痛苦&#xff0c;普通路由器无法使用&#xff0c; 教你利用SkyWant的Netkeeper认证软件来使你的SkyWant路由器顺利认证上网&#xff0c;全宿舍又可以合作共赢了&#xff01; 步骤一&#xff1a;正确连接网线&#xff0c;插电开机 正确连接网…

vscode | python | remote-SSH | Debug 配置 + CLIP4Clip实验记录

安装Extension 本地安装Remote-SSH、python 远程服务器上安装Python 难点&#xff1a;主机和远程服务器上安装Python扩展失败&#xff0c;可能是网络、代理等原因导致解决方法&#xff1a; 主机在官方网站下载Python扩展&#xff1a;https://marketplace.visualstudio.com/it…

idea中使用wsl作为启动项目的环境,便于linux环境下的测试

在idea中使用wsl作为启动项目的系统环境&#xff0c;首先安装wsl&#xff0c;这里不做过多的介绍&#xff0c;可以直接去微软官网看教程&#xff0c;也可以自己搜教程。 在wsl中安装jdk&#xff0c;linux中直接用命令安装openjdk即可 sudo apt-get updatesudo apt install ope…

【愚公系列】2023年12月 HarmonyOS应用开发者高级认证(完美答案)

&#x1f3c6; 作者简介&#xff0c;愚公搬代码 &#x1f3c6;《头衔》&#xff1a;华为云特约编辑&#xff0c;华为云云享专家&#xff0c;华为开发者专家&#xff0c;华为产品云测专家&#xff0c;CSDN博客专家&#xff0c;CSDN商业化专家&#xff0c;阿里云专家博主&#xf…

轻松搭建知识付费小程序:让知识传播更便捷

明理信息科技saas知识付费平台 在当今数字化时代&#xff0c;知识付费已经成为一种趋势&#xff0c;越来越多的人愿意为有价值的知识付费。然而&#xff0c;公共知识付费平台虽然内容丰富&#xff0c;但难以满足个人或企业个性化的需求和品牌打造。同时&#xff0c;开发和维护…

LeetCode刷题--- 目标和

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏 力扣递归算法题 http://t.csdnimg.cn/yUl2I 【C】 http://t.csdnimg.cn/6AbpV 数据结构与算法 http://t.csdnimg.cn/hKh2l 前言&#xff1a;这个专栏主要讲述递归递归、搜…

【华为鸿蒙系统学习】- 如何利用鸿蒙系统进行App项目开发|自学篇

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 &#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 目录 创建鸿蒙第一个App项目 项目创建 工程目录区 预览区 运行Hello World 基本工程目录 ws:工…

XPM_CDC_PULSE

MACRO_GROUP: XPM MACRO_SUBGROUP: XPM_CDC 1、Introduction 此宏将源时钟域中的脉冲同步到目标时钟域。源时钟域中任何大小的脉冲&#xff0c;如果正确启动&#xff0c;将生成单个目标时钟周期大小的脉冲。 为了正确操作&#xff0c;输入数据必须由目标时钟采样两次或…