微服务Sun-job

Sun-job执行器解析

初始化


SunJobExecutor(sunjob执行器,由config调用启动)—>启动线程TriggerCallbackThread、JobLogFileCleanThread—>启动netty服务端EmbedServer—>配置netty服务端处理器EmbedHttpServerHandler

接收到调度请求


EmbedHttpServerHandler服务端处理器在接收到客户端的请求—>启动ExecutorBizImpl的处理方法—>ExecutorBizImpl启动JobThread线程调用执行器进行执行

流程图


Sun-job调度器解析

页面手动执行任务


JobInfoController控制器接收请求—>调用JobTriggerPoolHelper的线程池—>判断是该任务是进入快线程池还是慢线程池(如果一分钟内,该任务执行请求花费超过500毫秒的次数达到10次,则该任务放入慢线程池)—>线程池处理任务,获取任务的执行器信息,判断分片信息,分别调用执行器—>组装请求参数,根据路由策略,获取到执行器的IP地址—>每个执行器IP绑定一个ExecutorBizClient客户端对象,发送http请求到相应的执行器,执行器服务端接收到请求,执行相应的任务—>执行完毕后,更新相应的日志信息

调度器轮询任务


主要由两个线程来控制需要调度的任务

调度查询线程

  1. 锁住xxl-job-lock表,确保同一时间只有一个调度器在轮询任务;
  2. 获取当前时间+延迟时间内的任务;
  3. 遍历这些任务
    1. 下次执行时间+延迟时间<现在:称为misfire,要么忽略,要么执行一次。根据当前时间更新下次执行时间;
    2. 下次执行时间<现在:执行任务,根据当前时间更新下次执行时间,如果下次执行时间在当前周期内,放入时间轮中;
    3. 其他情况:放入时间轮中,根据下次执行时间更新下次执行时间
  4. 该次轮询超过1秒钟,立即开始下次轮询;
  5. 该次轮询未超过1秒钟,判断该周期内是否有任务,如果没有任务,等待到下一个周期再轮询。如果有任务,等待到下一个整秒。

时间轮线程

1.每整秒轮询一次
2.查看map中当前秒和上一秒是否有任务
3.如果有任务,执行这些任务,并清空

流程图

Sun-job执行器开发流程

第一步:添加依赖


在POM.xml文件中,添加依赖

注意:版本号需要与sun-job-admin的版本号保持一致,否则无法调用

配置application-dev.yml文件
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔,为空则关闭自动注册;
sun.job.admin.addresses=
### 执行器通讯TOKEN [选填]:非空时启用;需要与SUN-JOB-ADMIN中保持一致
sun.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册;会根据执行器名称进行自动注册
sun.job.executor.appname=sun-job-executor
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
sun.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
sun.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
sun.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
sun.job.executor.logpath=/data/applogs/sun-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能; 
sun.job.executor.logretentiondays=30

第二步:开发执行器


开发SunJobConfig.java文件,读取上面的配置,并注册执行器
package com.sunline.pd.engine.config;

import com.sunline.job.core.executor.SunJobExecutor;
import com.sunline.jraf.context.SpringContext;
import com.sunline.pd.engine.job.PdAppointmentJobHandler;
import com.sunline.pd.engine.job.PdCalculationJobHandler;
import com.sunline.pd.engine.job.PdMappingJobHandler;
import com.sunline.pd.engine.job.PdTransferJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * sun-job config
 */
@Configuration
public class SunJobConfig {
    private static final Logger logger = LoggerFactory.getLogger(SunJobConfig.class);
    @Value("${sun.job.admin.addresses}")
    private String adminAddresses;
    @Value("${sun.job.accessToken}")
    private String accessToken;
    @Value("${sun.job.executor.appname}")
    private String appname;
    @Value("${sun.job.executor.ip}")
    private String ip;
    @Value("${sun.job.executor.port}")
    private int port;
    @Value("${sun.job.executor.logpath}")
    private String logPath;
    @Value("${sun.job.executor.logretentiondays}")
    private int logRetentionDays;

    /**
     * init
     */
    @Bean
    public void initXxlJobExecutor() throws Exception {
        //执行器        
        TestJobHandler testJobHandler = SpringContext.getBean(TestJobHandler.class);
        // 注册执行器        
        SunJobExecutor.registJobHandler("testJobHandler", testJobHandler);
        logger.info(">>>>>>>>>>> sun-job config init.");
        SunJobExecutor sunJobExecutor = new SunJobExecutor();
        sunJobExecutor.setAdminAddresses(adminAddresses);
        sunJobExecutor.setAppname(appname);
        sunJobExecutor.setIp(ip);
        sunJobExecutor.setPort(port);
        sunJobExecutor.setAccessToken(accessToken);
        sunJobExecutor.setLogPath(logPath);
        sunJobExecutor.setLogRetentionDays(logRetentionDays);
        sunJobExecutor.start();
    }
}
开发执行器
package com.sunline.pd.engine.job;

import com.sunline.job.core.biz.model.ExecutorParam;
import com.sunline.job.core.biz.model.ReturnT;
import com.sunline.job.core.handler.IJobHandler;
import com.sunline.job.core.log.SunJobLogger;
import com.sunline.jraf.util.BeanUtil;
import com.sunline.pm.api.modules.pd.calculation.CalculationParam;
import com.sunline.pm.api.utils.StringUtil;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * sun-job执行器(TEST)
 */
@Component
public class TestJobHandler extends IJobHandler {
    @Override
    public ReturnT<String> execute(ExecutorParam executorParam) {
        SunJobLogger.log("TEST,Hello world!");
        //flowParam为任务链入参,jobParam为任务入参        
        SunJobLogger.log("Input String is " + executorParam.getFlowParam() + "." + executorParam.getJobParam());
        return SUCCESS;
    }
}

第三步:启动服务


启动的日志中,可以看到执行器的注册情况
2021-04-25 09:43:55.920  INFO 16224 --- [           main] c.s.j.c.e.SunJobExecutor                 : >>>>>>>>>>> sun-job register jobhandler success, name:pdCalculationJobHandler, jobHandler:com.sunline.pd.engine.job.PdCalculationJobHandler@4e502162
2021-04-25 09:43:55.921  INFO 16224 --- [           main] c.s.p.e.c.SunJobConfig                   : >>>>>>>>>>> sun-job config init.
2021-04-25 09:43:56.008  INFO 16224 --- [      Thread-45] c.s.j.c.s.EmbedServer                    : >>>>>>>>>>> sun-job remoting server start success, nettype = class com.sunline.job.core.server.EmbedServer, port = 9997
2021-04-25 09:43:56.269  INFO 16224 --- [           main] pertySourcedRequestMappingHandlerMapping : Mapped URL path [/v2/api-docs] onto method [public org.springframework.http.ResponseEntity springfox.documentation.swagger2.web.Swagger2Controller.getDocumentation(java.lang.String,javax.servlet.http.HttpServletRequest)]
2021-04-25 09:43:56.294 DEBUG 16224 --- [rRegistryThread] c.s.j.c.t.ExecutorRegistryThread         : >>>>>>>>>>> sun-job registry success, registryParam:RegistryParam{registryGroup='EXECUTOR', registryKey='sun-job-executor-pd', registryValue='http://172.17.128.1:9997/'}, registryResult:ReturnT [code=200, msg=null, content=null]