XxlJob任务调度流程、原理分析~ 您所在的位置:网站首页 延迟任务调度 XxlJob任务调度流程、原理分析~

XxlJob任务调度流程、原理分析~

2024-01-19 16:47| 来源: 网络整理| 查看: 265

前言

在前面的文章中,我们了解了执行器与调度中心之间的注册、注销、通信逻辑,接下来我们来了解xxlJob的重点逻辑,即任务调度。

用过xxlJob的小伙伴们应该都知道,我们在xxlJob管理平台中配置了一个定时任务后,该任务会根据我们配置的时间来进行定时调度,此外我们还能手动触发一次调度。

所以本文了解分为两大模块:执行一次调度、定时调度。

执行一次调度 调度中心向执行器发起执行任务请求

在控制台手动执行一次任务,我们可以看到请求的/jobinfo/trigger接口

image-20230909182159793

@Controller @RequestMapping("/jobinfo") public class JobInfoController { @RequestMapping("/trigger") @ResponseBody //@PermissionLimit(limit = false) public ReturnT triggerJob(int id, String executorParam, String addressList) { // force cover job param if (executorParam == null) { executorParam = ""; } // todo 处理 JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList); return ReturnT.SUCCESS; } }

在controller简单做了下参数校验后来到JobTriggerPoolHelper#addTrigger继续处理~

// com.xxl.job.admin.core.thread.JobTriggerPoolHelper#addTrigger public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // 选择线程池 ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); // 任务在1分钟窗口期内,超时次数超过10次,则让其进入slowTriggerPool if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // todo 执行任务 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // 检查窗口期,如果是新的1分钟窗口,则清除jobTimeoutCountMap信息,重新计算 long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // 计算任务耗时,如果超过500ms,则记录到jobTimeoutCountMap中 long cost = System.currentTimeMillis()-start; if (cost > 500) { // ob-timeout threshold 500ms AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); }

JobTriggerPoolHelper#addTrigger中会先选择一个线程池,然后交由线程池进行异步执行

线程池选择方式如下:

默认选择fastTriggerPool 同时存在一个jobTimeoutCountMap记录任务超时次数,如果超过10次,则选择slowTriggerPool

根据相关git提交记录,我们能够很清晰了解上面的代码逻辑

image-20230909183112665

继续跟踪任务的执行逻辑,我们来到了XxlJobTrigger#trigger

// com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // 根据jobId从数据库中查询job相关信息 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } // 手动执行一次时,xxljob支持传入参数,这里就是如果手动传入了,那么会覆盖数据库里的执行参数 if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } // 失败重试次数 int finalFailRetryCount = failRetryCount >=0 ? failRetryCount : jobInfo.getExecutorFailRetryCount(); // 获取jobGroup信息,可以理解为job归属的服务信息,例如: order、pay、product等 XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // cover addressList if (addressList!=null && addressList.trim().length()>0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } // sharding param int[] shardingParam = null; if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } // 分片广播执行 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } // todo 执行任务 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } } image-20230909184401570

手动执行时,支持传入执行参数、指定的机器地址

而XxlJobTrigger#trigger的逻辑就是,从数据库中查出job的相关信息,将手动设置的参数进行覆盖,执行时会有两种执行模式,一种是单机执行 一种是分片执行,这里我们就看单机执行.

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // ...... // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); } // ..... } public static ReturnT runExecutor(TriggerParam triggerParam, String address){ ReturnT runResult = null; try { // 根据address 获取执行器 ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); // todo 执行任务~ runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } // ..... return runResult; }

最终是向目标机器携带执行参数等信息,发起run请求

// com.xxl.job.core.biz.client.ExecutorBizClient#run public ReturnT run(TriggerParam triggerParam) { return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); } 执行器处理执行任务请求

在执行器启动的时候,会开启一个Netty server来处理http请求,其中自定义了EmbedHttpServerHandler处理器

public class EmbedServer { public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { @Override public void run() { // ..... try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); // ...... // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); } finally { // stop try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); } } public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8); String uri = msg.uri(); HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); // invoke bizThreadPool.execute(new Runnable() { @Override public void run() { // 处理请求 Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json String responseJson = GsonTool.toJson(responseObj); // write response writeResponse(ctx, keepAlive, responseJson); } }); } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { // ..... // services mapping try { switch (uri) { // ...... case "/run": TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); // todo 执行任务 return executorBiz.run(triggerParam); // ...... } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e)); } } }

EmbedHttpServerHandler中,接收到请求后,仍然是交由线程池进行异步执行~

在process中,根据url来switch,进行不同的逻辑处理~

// com.xxl.job.core.biz.impl.ExecutorBizImpl#run public ReturnT run(TriggerParam triggerParam) { // 根据jobId查询任务执行线程(根据源码来看一个任务对应一个线程) JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread != null ? jobThread.getHandler() : null; String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); // spring bean模式 if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // 这个判断说明要可能修改了要执行的handlerName,与之前的不一致,需要把之前的任务remove掉 if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } // ..... // replace thread (new or exists invalid) if (jobThread == null) { // 第一次执行 or 执行的新的handlerName与之前的不一致 jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // 添加任务到队列中 ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } 根据jobId查询任务对应的执行线程JobThread 匹配任务类型,创建IJobHandler,此时如果新需要执行的IJobHandler与oldIJobHandler不一致,那么需要以新的为准,销毁旧的任务线程 将任务添加到对应任务线程的队列中

根据源码来看,我们可以得出一个任务对应一个线程的结论

下面是注册任务线程的逻辑,本质上是一个map维护着jobId -> jobThead的映射关系

private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) { // 创建任务线程 JobThread newJobThread = new JobThread(jobId, handler); // 启动线程 newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); // todo 注册,维护映射关系 JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { // todo 打断旧的任务线程 oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; }

再来看,JobThread继承了Thread,实例化好后便start了,在run方法中,会阻塞从队列中获取任务,超时时间为3秒

成功获取任务后,将其封装为FutureTask, 再开启一个新的线程去执行(进行超时控制),内部最终调用handler.execute();

public class JobThread extends Thread { // todo 任务队列 private LinkedBlockingQueue triggerQueue; public ReturnT pushTriggerQueue(TriggerParam triggerParam) { // ...... // 添加到队列中 triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } @Override public void run() { // ...... // execute while(!toStop) { running = false; idleTimes++; TriggerParam triggerParam = null; try { // 会阻塞从队列中获取任务,超时时间为3秒 triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { // ...... if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { FutureTask futureTask = new FutureTask(new Callable() { @Override public Boolean call() throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); // todo 执行任务 handler.execute(); return true; } }); // 创建一个新的线程去执行 futureThread = new Thread(futureTask); futureThread.start(); // 进行超时控制 Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { // ..... } else { // just execute handler.execute(); } // ..... } else { // ..... } } catch (Throwable e) { // ..... } finally { // ..... } } // ..... } }

对于Spring来说,其对应的IJobHandler是MethodJobHandler,所以这就是终点了,最终调用了被@XxlJob修饰的任务~

image-20230910004132310

定时调度

在调度中心启动时,存在一个XxlJobAdminConfig bean ,在初始化后会对xxlJobScheduler进行实例初始化~

@Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private static XxlJobAdminConfig adminConfig = null; public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } // ---------------------- XxlJobScheduler ---------------------- private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception { adminConfig = this; xxlJobScheduler = new XxlJobScheduler(); // todo 初始化 xxlJobScheduler.init(); } }

我们直接看定时调度部分

image-20230910004959523

在JobScheduleHelper#start中,创建并启动了scheduleThread、ringThread

image-20230910005058976

scheduleThread:定时从数据库中扫描出即将要执行的任务

ringThread: 定时扫出来的任务可能还未到执行时间,则放入到时间轮中进行调度

定时任务线程

既然是定时,那么我们先来看看这个定时的间隔是多少~

image-20230910152151341

如图所示,当任务执行时间< 1s时,会进行sleep,preReadSuc代表是否扫描出将要执行的任务,如果扫描出来,则sleep 1s,反之则是5s,同时再- System.currentTimeMillis() % 1000, 打散多admin节点下时的调度,防止同时进行锁竞争。

为什么没扫描出来任务时,需要sleep 5s ?

因为在任务扫描时,是扫描出未来5s内要执行的任务,如果没扫出来,则可以sleep 5s等到下一个扫描的时间节点~ 代码如👇🏻

public void start() { // schedule thread scheduleThread = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis() % 1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } // pageSize,扫描出的最大任务数,默认6000 int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); // todo db行锁 preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'schedule_lock' for update"); preparedStatement.execute(); // tx start // 1、pre read long nowTime = System.currentTimeMillis(); // todo 从数据库中取出,未来5s内要执行的任务 List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); // ...... } catch(Exception e){ } } } }); }

在扫描出来任务后,根据任务不同超时区间,进行不同的逻辑处理~

超时5s以上 for (XxlJobInfo jobInfo: scheduleList) { // 超时5s以上 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 匹配操作策略,默认忽略 MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); // todo 如果是立刻执行一次 if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // todo 与前面讲到的执行一次任务逻辑一致~ JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); } // todo 更新jobInfo中下一次的执行时间 refreshNextValidTime(jobInfo, new Date()); } }

当任务超时5s以上时,会触发调度过期策略,默认是**忽略,但如果配置了立即执行**,则会执行一次任务(逻辑最开始讲解的执行一次任务逻辑一致)

image-20230910154156634

超时,但未超过5s for (XxlJobInfo jobInfo: scheduleList) { // todo 超时,但是未超过5s else if (nowTime > jobInfo.getTriggerNextTime()) { // 1、trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // todo 更新jobInfo中下一次的执行时间 refreshNextValidTime(jobInfo, new Date()); // 执行成功,并且下一次执行时机 < 5s if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 计算下一次执行时间的秒数 int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 添加到时间轮中进行调度 pushTimeRing(ringSecond, jobInfo.getId()); // 再次刷新下次执行时间 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } }

当任务超时,但是未超过5s时,会立即执行一次任务

并且如果任务执行成功,且下一次执行的时间在未来5s之内,则把当前任务加入到时间轮中进行调度

未超时(未到执行时间) for (XxlJobInfo jobInfo: scheduleList) { // todo 未超时 else { // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } }

任务未超时,但任务会在未来5s内执行,所以把任务加入到时间轮中进行调度

时间轮线程 private volatile static Map ringData = new ConcurrentHashMap(); // ring thread ringThread = new Thread(new Runnable() { @Override public void run() { while (!ringThreadToStop) { try { // todo sleep 1s, // - System.currentTimeMillis() % 1000: 打散多节点调度 TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } try { // 待执行任务id集合 List ringItemData = new ArrayList(); // todo 拿到当前秒待执行的任务和前一秒待执行的任务 int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; for (int i = 0; i < 2; i++) { // remove掉,这样可以避免重复拿到已经执行过的任务id List tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } if (ringItemData.size() > 0) { // todo 遍历执行任务 for (int jobId: ringItemData) { JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } } } });

xxlJob中的时间轮,本质上是一个ConcurrentHashMap,key为秒数,value为秒对应的需要执行的任务id集合

在ringThread,只要线程没有被停止,那么每隔近1s从map中取出当前秒和前一秒对应的待执行任务,取出前一秒的任务,是担心逻辑处理时间过长导致会跳过1s的任务。

这里采用remove的方式,移除并返回任务,防止任务被重复获取执行

我是 Code皮皮虾 ,会在以后的日子里跟大家一起学习,一起进步! 觉得文章不错的话,可以在 掘金 关注我,这样就不会错过很多技术干货啦~



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有