# 一.概念例析快速入门

组件 Job、JobDetail、Trigger、Calendar、Scheduler

1.+版本 jar 包中,JobDetail 是个类,直接通过构造方法与 Job 类关联。SimpleTrigger 和 CornTrigger 是类;

在 2.+jar 包中,JobDetail 是个接口,SimpleTrigger 和 CornTrigger 是接口

# 1.Job

job 接口只有一个 execute(JobExcutionContext jec)方法,JobExcutionContext 提供了调度的上下文信息,比如,jobdetail 和 trigger,还可以添加业务信息

# 2.JobDetail

job 任务实例,有一个重要属性 jobDataMap,存放 job 运行的具体信息,创建实例的方法

JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, group(taskId)).build();
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put(DdlConstants.DDL_MUTABLE_GRAPH, ddlMutableGraphContext);
jobDataMap.put(DdlConstants.DDL_NODE, ddlMutableGraphContext.getNodeMap().get(t.getOutNodeKey()));
jobDataMap.put(DdlConstants.TASK_RECORD_DO, t);
1
2
3
4
5

# 3.Trigger

触发器,时间触发规则,主要有 2 个 SimpleTrigger 和 CronTrigger

  • SimpleTrigger
    • 只触发一次
    • 固定时间间隔
  • CronTrigger
    • 通过 cron 表达式触发
SimpleTrigger simpleTrigger = TriggerBuilder
    .newTrigger()
    .withIdentity("trigger1")
    .forJob("job1", "jgroup1")//在这里绑定
    .withSchedule(SimpleScheduleBuilder.repeatSecondlyForTotalCount(10, 2))
    .startNow()
    .build();
1
2
3
4
5
6
7

# 4.Calendar

就是 java.util.Calendar 日期类,在 scheduler 中使用如下

//第一步:创建节假日类
 // ②四一愚人节
Calendar foolDay = new GregorianCalendar();    //这里的Calendar是 java.util.Calendar。根据当前时间所在的默认时区创建一个“日子”
foolDay.add(Calendar.MONTH, 4);
foolDay.add(Calendar.DATE, 1);
// ③国庆节
Calendar nationalDay = new GregorianCalendar();
nationalDay.add(Calendar.MONTH, 10);
nationalDay.add(Calendar.DATE, 1);

//第二步:创建AnnualCalendar,它的作用是排除排除每一年中指定的一天或多天
AnnualCalendar holidays = new AnnualCalendar();
//设置排除日期有两种方法
// 第一种:排除的日期,如果设置为false则为包含(included)
holidays.setDayExcluded(foolDay, true);
holidays.setDayExcluded(nationalDay, true);
//第二种,创建一个数组。
ArrayList<Calendar> calendars = new ArrayList<Calendar>();
calendars.add(foolDay);
calendars.add(nationalDay);
holidays.setDaysExcluded(calendars);

//第三步:将holidays添加进我们的触发器
simpleTrigger.setCalendarName("holidays");

//第四步:设置好然后需要在我们的scheduler中注册
scheduler.addCalendar("holidays",holidays, false,false);,注意这里的第一个参数为calendarName,需要和触发器中添加的Calendar名字像对应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 5.Scheduler

Scheduler 是任务记录表,存储了 job 和 trigger 信息,job 和 trigger 都有组和名称,组和名称标识唯一性,一个 job 可以对应多个 trigger,一个 trigger 只能对应一个 job

SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//将jobDetail和Trigger注册到一个scheduler里,建立起两者的关联关系
scheduler.scheduleJob(jobDetail,Trigger);
scheduler.start();//开始任务调度
1
2
3
4
5

# 二.CronTrigger 个性化

# 1.表达式

位置 时间域名 允许值 允许的特殊字符
1 0-59 ,-*/
2 分支 0-59 ,-*?/
3 小时 0-23 ,-*/
4 日期 1-31 ,-*/LWC
5 月份 1-12 或 JAN-DEC ,-*/
6 星期 1-7 或 SUN-SAT ,-*?/LC#
7 年(可选) 1970-2099 ,-*/

# 2.特殊字符

  • 星号(*) 表示每一时刻
  • 问号(?) 表示无意义的值 只能用在日期和星期
  • 减号(-) 表示范围 5-10 表示 5 到 10
  • 逗号(,) 表示并列的有效值
  • 斜杠(x/y) 表示等步长序列 x 为起始值,y 为增量步长值
  • L 表示 last 日期和星期字段
  • W 表示离给定日期最近的工作日
  • LW 表示最后一个工作日
  • 井号(#) 表示第几个星期几 6#3 表示第三个星期五
  • C 表示只在日期和星期中使用,关联的日期 5C 5 号之后的第一天

# 3.实例

cron 表达式 含义
0 0 12 * * ? 每天 12 点整触发一次
0 15 10 ? * * 每天 10 点 15 分触发一次

# 4.添加方式

//使用cornTrigger规则  每天10点42分
Trigger trigger=TriggerBuilder.newTrigger().withIdentity("simpleTrigger", "triggerGroup")
    .withSchedule(CronScheduleBuilder.cronSchedule("0 42 10 * * ? *"))
    .startNow().build();
1
2
3
4

# 三.存储与持久化操作

# 1.内存存储 RAMJobStore

存储在内存,速度快,缺点重启会丢失

# 2.持久性 JobStore

将 JobDetail 等信息持久化我们的数据库

表名 描述
QRTZ_CALENDARS 以 Blob 类型存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS 存储 Cron Trigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中)
QRTZ_LOCKS 存储程序的非观锁的信息(假如使用了悲观锁)
QRTZ_JOB_DETAILS 存储每一个已配置的 Job 的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的 JobListener 的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数
QRTZ_BLOG_TRIGGERS 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)
QRTZ_TRIGGER_LISTENERS 存储已配置的 TriggerListener 的信息
QRTZ_TRIGGERS 存储已配置的 Trigger 的信息

# 3.测试

public class pickNewsJob implements Job {

    @Override
    public void execute(JobExecutionContext jec) throws JobExecutionException {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        System.out.println("在"+sdf.format(new Date())+"扒取新闻");
    }

    public static void main(String args[]) throws SchedulerException {
        JobDetail jobDetail = JobBuilder.newJob(pickNewsJob.class)
                .withIdentity("job1", "jgroup1").build();
        SimpleTrigger simpleTrigger = TriggerBuilder
                .newTrigger()
                .withIdentity("trigger1")
                .withSchedule(SimpleScheduleBuilder.repeatSecondlyForTotalCount(10, 2))
                .startNow()
                .build();

        //创建scheduler
        SchedulerFactory schedulerFactory = new StdSchedulerFactory();
        Scheduler scheduler = schedulerFactory.getScheduler();
        scheduler.scheduleJob(jobDetail, simpleTrigger);
        scheduler.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 四.JobListener 配置类

  1. 自定义监听器接口实现类
  2. 向 scheduler 中注册监听器实现类

监听器类型主要分为三种,和 Quartz 三个核心类相对应:JobListener,TriggerListener,SchedulerListener

# 1.JobListener

我们的 jobListener 实现类必须实现其以下方法:

方法 说明
getName() getName() 方法返回一个字符串用以说明 JobListener 的名称。对于注册为全局的监听器,getName() 主要用于记录日志,对于由特定 Job 引用的 JobListener,注册在 JobDetail 上的监听器名称必须匹配从监听器上 getName() 方法的返回值。
jobToBeExecuted() Scheduler 在 JobDetail 将要被执行时调用这个方法。
jobExecutionVetoed() Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法。
jobWasExecuted() Scheduler 在 JobDetail 被执行之后调用这个方法。

# 2.注册

@Slf4j
@Component
public class ListenerConfig {

    @Autowired
    private Scheduler scheduler;
    @Autowired
    private DdlJobListener ddlJobListener;
    @Autowired
    private DdlSchedulerListener ddlSchedulerListener;

    @PostConstruct
    public void init() {
        try {
            scheduler.getListenerManager().addJobListener(ddlJobListener, GroupMatcher.groupStartsWith(GROUP_PREFIX));
            scheduler.getListenerManager().addSchedulerListener(ddlSchedulerListener);
        } catch (SchedulerException e) {
            log.error("", e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 五.Listener 配置

# 1.TriggerListener 配置

public static void main(String args[]) throws SchedulerException {
    final JobDetail pickNewsJob = JobBuilder.newJob(PickNewsJob.class)
            .withIdentity("job1", "jgroup1").build();
    JobDetail getHottestJob = JobBuilder.newJob(GetHottestJob.class)
            .withIdentity("job2", "jgroup2").build();

    SimpleTrigger pickNewsTrigger = TriggerBuilder
            .newTrigger()
            .withIdentity("trigger1","tgroup1")
            .withSchedule(SimpleScheduleBuilder.repeatSecondlyForTotalCount(2, 1)).startNow()
            .build();
    SimpleTrigger getHottestTrigger = TriggerBuilder
            .newTrigger()
            .withIdentity("trigger2","tgroup2")
            .withSchedule(SimpleScheduleBuilder.repeatSecondlyForTotalCount(2, 2)).startNow()
            .build();

    Scheduler scheduler = new StdSchedulerFactory().getScheduler();
    JobListener myJobListener = new MyJobListener();
    KeyMatcher<JobKey> keyMatcher = KeyMatcher.keyEquals(pickNewsJob.getKey());
    scheduler.getListenerManager().addJobListener(myJobListener, keyMatcher);
    /********下面是新加部分***********/
    TriggerListener myTriggerListener = new MyTriggerListener();
    KeyMatcher<TriggerKey> tkeyMatcher = KeyMatcher.keyEquals(pickNewsTrigger.getKey());
    scheduler.getListenerManager().addTriggerListener(myTriggerListener, tkeyMatcher);

    scheduler.scheduleJob(pickNewsJob, pickNewsTrigger);
    scheduler.scheduleJob(getHottestJob,getHottestTrigger);
    scheduler.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 2.schedulerListener 配置

SchedulerListener mySchedulerListener = new MySchedulerListener();
scheduler.getListenerManager().addSchedulerListener(mySchedulerListener);
1
2

# 七.开发配置

# 1.添加任务

public void run(Integer taskId, String outputNodeKey) {
        TaskDO taskDO = mapper.selectById(taskId);
        Optional.ofNullable(taskDO).orElseThrow(() -> new DdlException(ErrorCodeEnum.NOT_EXIST_TASK));
        DagConfDO one = dagConfService.getOne(new LambdaQueryWrapper<DagConfDO>().eq(DagConfDO::getTaskId, taskId)
                .orderByDesc(DagConfDO::getVersion).last("limit 1"));
        Class<? extends Job> jobClass = DdlJob.class;
        Date date = new Date();
        String jobkey = JOB_KEY_MARK + SPLIT_STR + ExecTypeEnum.MANUAL.name() + SPLIT_STR + taskId +
                SPLIT_STR + taskDO.getLastDagConfVersion() + SPLIT_STR + outputNodeKey + SPLIT_STR + date.getTime();
        String triggerKey = TRIGGER_KEY_MARK + SPLIT_STR + ExecTypeEnum.MANUAL.name() + SPLIT_STR + taskId +
                SPLIT_STR + taskDO.getLastDagConfVersion() + SPLIT_STR + outputNodeKey + SPLIT_STR + date.getTime();
        // 构建定时任务信息
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobkey, group(taskId)).build();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey, group(taskId)).startNow().build();
        String conf = one.getConf();
        ObjectMapper mapper = new ObjectMapper();
        DagConf dagConf = null;
        try {
            dagConf = mapper.readValue(conf, DagConf.class);
        } catch (JsonProcessingException e) {
            log.error("", e);
        }
        DdlMutableGraphContext ddlMutableGraphContext = new DdlMutableGraphContext(dagConf);
        TaskRecordDO taskRecordDO = new TaskRecordDO();
        taskRecordDO.setTaskId(taskId);
        taskRecordDO.setExecStatus(ExecStatusEnum.PENDING.name().toLowerCase());
        taskRecordDO.setExecType(ExecTypeEnum.MANUAL.name().toLowerCase());
        taskRecordDO.setPlanRunTime(new Date());
        taskRecordDO.setDagConfVersion(taskDO.getLastDagConfVersion());
        taskRecordDO.setOutNodeKey(outputNodeKey);
        taskRecordDO.setOutNodeName(ddlMutableGraphContext.getNodeMap().get(outputNodeKey).getName());

        jobDataMap.put(DdlConstants.DDL_MUTABLE_GRAPH, ddlMutableGraphContext);
        jobDataMap.put(DdlConstants.DDL_NODE, ddlMutableGraphContext.getNodeMap().get(outputNodeKey));
        jobDataMap.put(DdlConstants.TASK_RECORD_DO, taskRecordDO);
        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            log.error("", e);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# 2.监听器

@Slf4j
@Component
public class ListenerConfig {

    @Autowired
    private Scheduler scheduler;
    @Autowired
    private DdlJobListener ddlJobListener;
    @Autowired
    private DdlSchedulerListener ddlSchedulerListener;

    @PostConstruct
    public void init() {
        try {
            scheduler.getListenerManager().addJobListener(ddlJobListener, GroupMatcher.groupStartsWith(GROUP_PREFIX));
            scheduler.getListenerManager().addSchedulerListener(ddlSchedulerListener);
        } catch (SchedulerException e) {
            log.error("", e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.deepexi.datasense.ddl.operator.schedule;

import com.deepexi.datasense.ddl.operator.constants.DdlConstants;
import com.deepexi.datasense.ddl.operator.model.entity.TaskRecordDO;
import com.deepexi.datasense.ddl.operator.service.TaskRecordService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DdlSchedulerListener implements SchedulerListener {

    @Autowired
    private TaskRecordService taskRecordService;


    @Override
    public void jobScheduled(Trigger trigger) {
        String jobName = trigger.getJobKey().getName();
        log.info("jobScheduled:" + jobName + " has been scheduled");
    }

    @Override
    public void jobUnscheduled(TriggerKey triggerKey) {
        log.info("jobUnscheduled:" + triggerKey + " is being unscheduled");
    }

    @Override
    public void triggerFinalized(Trigger trigger) {
        log.info("triggerFinalized:" + "Trigger is finished for " + trigger.getJobKey().getName());
    }

    @Override
    public void triggerPaused(TriggerKey triggerKey) {
        log.info("triggerPaused:" + triggerKey + " is being paused");
    }

    @Override
    public void triggersPaused(String triggerGroup) {
        log.info("triggersPaused:" + "trigger group " + triggerGroup + " is being paused");
    }

    @Override
    public void triggerResumed(TriggerKey triggerKey) {
        log.info("triggerResumed:" + triggerKey + " is being resumed");
    }

    @Override
    public void triggersResumed(String triggerGroup) {
        log.info("triggersResumed:" + "trigger group " + triggerGroup + " is being resumed");
    }

    /**
     * 任务加入到调度器中,就新建一条任务运行记录,状态为"待执行"
     * 任务完成后要把这条记录状态改成完成
     * 所以需要把任务运行记录存入到jobDataMap中,待任务完成的时候,好从jobDataMap中取出taskRecordDO
     * 改状态。
     * 所以需要有更新jobDataMap的操作。目前quartz是job-store-type=RAM模式,是支持更新jobDataMap操作
     * 假如将来job-store-type=jdbc,那么要检测是否支持更新jobDataMap操作
     *
     * @param jobDetail
     */
    @Override
    public void jobAdded(JobDetail jobDetail) {
        JobKey key = jobDetail.getKey();
        log.info("jobAdded:{} is added", key);
        if (key.toString().startsWith(DdlConstants.GROUP_PREFIX)) {
            JobDataMap jobDataMap = jobDetail.getJobDataMap();
            TaskRecordDO taskRecordDO = (TaskRecordDO) (jobDataMap.get(DdlConstants.TASK_RECORD_DO));
            taskRecordService.save(taskRecordDO);
            jobDataMap.put(DdlConstants.TASK_RECORD_DO, taskRecordDO);
        }
    }

    @Override
    public void jobDeleted(JobKey jobKey) {
        log.info("jobDeleted:" + jobKey + " is deleted");
    }

    @Override
    public void jobPaused(JobKey jobKey) {
        log.info("jobPaused:" + jobKey + " is paused");
    }

    @Override
    public void jobsPaused(String jobGroup) {
        log.info("jobsPaused:" + "job group " + jobGroup + " is paused");
    }

    @Override
    public void jobResumed(JobKey jobKey) {
        log.info("jobResumed:" + jobKey + " is resumed");
    }

    @Override
    public void jobsResumed(String jobGroup) {
        log.info("jobsResumed:" + "job group " + jobGroup + " is resumed");
    }

    @Override
    public void schedulerError(String msg, SchedulerException cause) {
        log.error(msg, cause.getUnderlyingException());
    }

    @Override
    public void schedulerInStandbyMode() {
        log.info("schedulerInStandbyMode:" + "scheduler is in standby mode");
    }

    @Override
    public void schedulerStarted() {
        log.info("schedulerStarted:" + "scheduler has been started");
    }

    @Override
    public void schedulerStarting() {
        log.info("schedulerStarting:" + "scheduler is being started");
    }

    @Override
    public void schedulerShutdown() {
        log.info("schedulerShutdown:" + "scheduler has been shutdown");
    }

    @Override
    public void schedulerShuttingdown() {
        log.info("schedulerShuttingdown:" + "scheduler is being shutdown");
    }

    @Override
    public void schedulingDataCleared() {
        log.info("schedulingDataCleared:" + "scheduler has cleared all data");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.deepexi.datasense.ddl.operator.schedule;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.deepexi.datasense.ddl.operator.constants.DdlConstants;
import com.deepexi.datasense.ddl.operator.enums.ExecStatusEnum;
import com.deepexi.datasense.ddl.operator.enums.ExecTypeEnum;
import com.deepexi.datasense.ddl.operator.model.entity.TaskDO;
import com.deepexi.datasense.ddl.operator.model.entity.TaskRecordDO;
import com.deepexi.datasense.ddl.operator.service.TaskRecordService;
import com.deepexi.datasense.ddl.operator.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Objects;

@Slf4j
@Component
public class DdlJobListener implements JobListener {

    @Autowired
    private TaskService taskService;
    @Autowired
    private TaskRecordService taskRecordService;


    @Override
    public String getName() {
        String name = getClass().getSimpleName();
        log.info("getName:" + " listener name is:" + name);
        return name;
    }

    /**
     * 任务开始执行时,有几件事要做
     * 1.改任务状态为执行中
     * 2.改任务记录状态为执行中
     *
     * @param context
     */
    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        log.info("jobToBeExecuted:" + jobName + " is going to be executed");

        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
        TaskRecordDO taskRecordDO = (TaskRecordDO) (jobDataMap.get(DdlConstants.TASK_RECORD_DO));
        Integer taskId = taskRecordDO.getTaskId();
        LambdaUpdateWrapper<TaskDO> updateWrapper = new LambdaUpdateWrapper<TaskDO>().eq(TaskDO::getId, taskId)
                .set(TaskDO::getLastRunTime, new Date())
                .set(TaskDO::getExecStatus, ExecStatusEnum.RUNNING.name().toLowerCase());

        LambdaUpdateWrapper<TaskRecordDO> updateTaskRecordWrapper = new LambdaUpdateWrapper<TaskRecordDO>()
                .eq(TaskRecordDO::getId, taskRecordDO.getId())
                .set(TaskRecordDO::getStartRunTime, new Date())
                .set(TaskRecordDO::getExecStatus, ExecStatusEnum.RUNNING.name().toLowerCase());
        taskRecordService.update(updateTaskRecordWrapper);

        taskService.update(new TaskDO(), updateWrapper);
    }

    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        log.info("jobExecutionVetoed:" + jobName + " was vetoed and not executed");
    }

    /**
     * 任务执行完成了有几件事情需要做
     * 1、更新任务状态为已完成
     * 2、更新任务记录状态为已完成
     * 3、创建下次需要执行的任务记录(假如任务类型为周期),其状态为待执行
     *
     * @param context
     * @param jobException
     */
    @Override
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        String jobName = context.getJobDetail().getKey().getName();
        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();

        TaskRecordDO taskRecordDO = (TaskRecordDO) (jobDataMap.get(DdlConstants.TASK_RECORD_DO));

        log.info("jobWasExecuted:" + jobName + " was executed");

        String execStatus = ExecStatusEnum.SUCCEED.name().toLowerCase();
        String stdoutLog = null;
        String stderrLog = null;
        String error = null;
        Object o = jobDataMap.get(DdlConstants.TASK_STATUS);
        if (Objects.nonNull(o)) {
            execStatus = String.valueOf(o);
            stderrLog = String.valueOf(jobDataMap.get(DdlConstants.EXEC_LOG));
            error = String.valueOf(jobDataMap.get(DdlConstants.EXEC_ERR));
        } else {
            stdoutLog = String.valueOf(jobDataMap.get(DdlConstants.EXEC_LOG));
        }

        //更新task
        Integer taskId = taskRecordDO.getTaskId();
        LambdaUpdateWrapper<TaskDO> updateWrapper = new LambdaUpdateWrapper<TaskDO>().eq(TaskDO::getId, taskId)
                .set(TaskDO::getLastRunTime, new Date())
                .set(TaskDO::getExecStatus, execStatus);
        //.set(TaskDO::getLastDagConfVersion, taskRecordDO.getDagConfVersion())
        taskService.update(new TaskDO(), updateWrapper);

        //更新TaskRecordDO
        LambdaUpdateWrapper<TaskRecordDO> updateTaskRecorWrapper = new LambdaUpdateWrapper<TaskRecordDO>()
                .eq(TaskRecordDO::getId, taskRecordDO.getId())
                .set(TaskRecordDO::getEndRunTime, new Date())
                .set(TaskRecordDO::getStdoutLog, stdoutLog)
                .set(TaskRecordDO::getStderrLog, stderrLog)
                .set(TaskRecordDO::getError, error)
                .set(TaskRecordDO::getExecStatus, execStatus);
        taskRecordService.update(updateTaskRecorWrapper);

        String execType = taskRecordDO.getExecType();
        if (ExecTypeEnum.CYCLE.name().equalsIgnoreCase(execType)) {
            //插入下一次执行记录
            Date nextFireTime = context.getTrigger().getNextFireTime();
            taskRecordDO.setPlanRunTime(nextFireTime);
            taskRecordDO.setId(null);
            taskRecordDO.setTimes(taskRecordDO.getTimes() + 1);
            taskRecordDO.setExecStatus(ExecStatusEnum.PENDING.name().toLowerCase());
            taskRecordService.save(taskRecordDO);
            jobDataMap.put(DdlConstants.TASK_RECORD_DO, taskRecordDO);
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

# 3.执行器

@Slf4j
public class DdlJob extends QuartzJobBean {

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) {
        JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        DdlMutableGraphContext ddlMutableGraphContext = (DdlMutableGraphContext) (jobDataMap.get(DdlConstants.DDL_MUTABLE_GRAPH));
        StringBuilder sb = ddlMutableGraphContext.getSb();
        try {
            DdlNode ddlNode = (DdlNode) (jobDataMap.get(DdlConstants.DDL_NODE));
            new DdlExecution(ddlMutableGraphContext, ddlNode).execution();
        } catch (Exception e) {
            jobDataMap.put(DdlConstants.TASK_STATUS, ExecStatusEnum.FAILED.name().toLowerCase());
            sb.append(e);
            jobDataMap.put(DdlConstants.EXEC_ERR, e.toString());
            log.error("", e);
            throw new OperatorException("任务执行失败!", e);
        } finally {
            jobDataMap.put(DdlConstants.EXEC_LOG, sb.toString());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
上次更新: 10/29/2024, 10:27:50 AM