关于定时任务,相关的技术文档,技术点也是非常的多,比如:ScheduledExecutorService,基于注解的@Scheduled,还有更多基础的实现,针对于单体服务这些都是可以正常使用的,配置简单,操作便捷,但是针对大型项目在分布式的大环境下,这些定时技术是不支持分布式的,会导致任务重复执行,也可以加锁等技术实现,这篇文章主要介绍的是分布式的定时任务之一:quartz。
实际执行任务调度的控制器,在spring中通过SchedulerFactoryBean封装起来
任务的触发,触发器有SimpleTrigger,CronTrigger,DateIntervalTrigger和NthIncludedDayTrigger四种类型,其中SimpleTrigger:能够周期性的设置任务触发;CronTrigger:使用cron表达式的方式实现任务触发,实现更多样,使用场景也是最多的。
业务需要执行的任务操作,实现Job接口,或者继承QuartzJobBean,实现/重写里面的方法。
现代开发中springboot已经普及,并且基于springboot整合Quartz也是非常的便利。
org.springframework.boot spring-boot-starter-quartz mysql mysql-connector-java 8.0.28 org.springframework.boot spring-boot-starter-jdbc
里面依赖了quartz的starter,以及mysql数据库,引入数据库的依赖目的是在于,后续quartz的job,trigger等,以及集群相关配置用到的锁,来控制任务集群模式下的重复执行等信息都是需要保存到数据库的,默认是放在内存中,内存中不利于管理,使用quartz的意义就不明显了。
这里就将quartz相关的所有配置都用yml来配置
server: port: 8888 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: mysql0707 url: jdbc:mysql://127.0.0.1:3306/quartz_config?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false # 定时配置 quartz: # 相关属性配置 properties: org: quartz: # 数据源 dataSource: globalJobDataSource: # URL必须大写 URL: jdbc:mysql://127.0.0.1:3306/quartz_config?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false driver: com.mysql.cj.jdbc.Driver maxConnections: 5 username: root password: ************ # 必须指定数据源类型 provider: hikaricp scheduler: instanceName: globalScheduler # 实例id instanceId: AUTO type: com.alibaba.druid.pool.DruidDataSource jobStore: # 数据源 dataSource: globalJobDataSource # JobStoreTX将用于独立环境,提交和回滚都将由这个类处理 class: org.quartz.impl.jdbcjobstore.JobStoreTX # 驱动配置 driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate # 表前缀 tablePrefix: QRTZ_ # 失效阈值(只有配置了这个时间,超时策略根据这个时间才有效) misfireThreshold: 100 # 集群配置 isClustered: true # 线程池配置 threadPool: class: org.quartz.simpl.SimpleThreadPool # 线程数 threadCount: 10 # 优先级 threadPriority: 5
这里面有quartz的数据源,线程池,集群和misfire相关配置,简单配置,更多的配置可以到官网查看。
这个实体类的作用就是后续实现对任务的开启,关闭,删除,暂停等操作需要用到的实体。
@Data public class JobInfo { /** * 任务名称 */ private String jobName; /** * 任务组 */ private String jobGroup; /** * 触发器名称 */ private String triggerName; /** * 触发器组 */ private String triggerGroup; /** * cron表达式 */ private String cron; /** * 类名 */ private String className; /** * 状态 */ private String status; /** * 下一次执行时间 */ private String nextTime; /** * 上一次执行时间 */ private String prevTime; /** * 配置信息(data) */ private String config; }
/** * @author Y * @DisallowConcurrentExecution:这个注解的作用就是同一个任务必须在上一次执行完毕之后,再按照corn时间执行,不会并行执行 * @PersistJobDataAfterExecution:这个注解的作用就是下一个任务用到上一个任务的修改数据(定时任务里面的jobData数据流转) * @description 任务1 这两个注解作用是 * @date 2023/6/28 */ @DisallowConcurrentExecution @PersistJobDataAfterExecution public class JobOne extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) { System.out.println("TimeEventJob正在执行..." + LocalDateTime.now()); // 执行10秒 try { Thread.sleep(9000); System.out.println("TimeEventJob执行完毕..." + LocalDateTime.now()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
这个类就是继承的QuartzJobBean,当然也可以实现Job接口,这个类就是任务需要具体执行的业务操作类,类上面添加了两个注解,这两个注解的目的就是让同一个任务必须在上一个任务执行完毕之后再按照触发后续执行,以及定时任务里面的JobDataMap,能够在任务中流转以及修改更新;不添加注解的情况下,JobDataMap里面的数据不能在任务之间流转,以及任务的触发不会参照上一任务是否执行完毕。
@Configuration public class JobHandler { @Resource private Scheduler scheduler; /** * 添加任务 */ @SuppressWarnings("unchecked") public void addJob(JobInfo jobInfo) throws SchedulerException, ClassNotFoundException { Objects.requireNonNull(jobInfo, "任务信息不能为空"); // 生成job key JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup()); // 当前任务不存在才进行添加 if (!scheduler.checkExists(jobKey)) { ClassjobClass = (Class )Class.forName(jobInfo.getClassName()); // 任务明细 JobDetail jobDetail = JobBuilder .newJob(jobClass) .withIdentity(jobKey) .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup()) .withDescription(jobInfo.getJobName()) .build(); // 配置信息 jobDetail.getJobDataMap().put("config", jobInfo.getConfig()); // 定义触发器 TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getTriggerName(), jobInfo.getTriggerGroup()); // 设置任务的错过机制 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity(triggerKey) .withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCron()).withMisfireHandlingInstructionDoNothing()) .build(); scheduler.scheduleJob(jobDetail, trigger); } else { throw new SchedulerException(jobInfo.getJobName() + "任务已存在,无需重复添加"); } } /** * 任务暂停 */ public void pauseJob(String jobGroup, String jobName) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); if (scheduler.checkExists(jobKey)) { scheduler.pauseJob(jobKey); } } /** * 继续任务 */ public void continueJob(String jobGroup, String jobName) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); if (scheduler.checkExists(jobKey)) { scheduler.resumeJob(jobKey); } } /** * 删除任务 */ public boolean deleteJob(String jobGroup, String jobName) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); if (scheduler.checkExists(jobKey)) { // 这里还需要先删除trigger相关 //TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getTriggerName(), jobInfo.getTriggerGroup()); //scheduler.getTrigger() //scheduler.rescheduleJob() return scheduler.deleteJob(jobKey); } return false; } /** * 获取任务信息 */ public JobInfo getJobInfo(String jobGroup, String jobName) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName, jobGroup); if (!scheduler.checkExists(jobKey)) { return null; } List extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey); if (Objects.isNull(triggers)) { throw new SchedulerException("未获取到触发器信息"); } TriggerKey triggerKey = triggers.get(0).getKey(); Trigger.TriggerState triggerState = scheduler.getTriggerState(triggerKey); JobDetail jobDetail = scheduler.getJobDetail(jobKey); JobInfo jobInfo = new JobInfo(); jobInfo.setJobName(jobGroup); jobInfo.setJobGroup(jobName); jobInfo.setTriggerName(triggerKey.getName()); jobInfo.setTriggerGroup(triggerKey.getGroup()); jobInfo.setClassName(jobDetail.getJobClass().getName()); jobInfo.setStatus(triggerState.toString()); if (Objects.nonNull(jobDetail.getJobDataMap())) { jobInfo.setConfig(JSONObject.toJSONString(jobDetail.getJobDataMap())); } CronTrigger theTrigger = (CronTrigger) triggers.get(0); jobInfo.setCron(theTrigger.getCronExpression()); return jobInfo; } }
/** * @author Yang * @description 任务操作 * @date 2023/6/28 */ @RestController @RequestMapping("/job") public class QuartzController { @Resource private JobHandler jobHandler; @Resource private Scheduler scheduler; /** * 查询所有的任务 */ @RequestMapping("/all") public Listlist() throws SchedulerException { List jobInfos = new ArrayList<>(); List triggerGroupNames = scheduler.getTriggerGroupNames(); for (String triggerGroupName : triggerGroupNames) { Set triggerKeySet = scheduler .getTriggerKeys(GroupMatcher.triggerGroupEquals(triggerGroupName)); for (TriggerKey triggerKey : triggerKeySet) { Trigger trigger = scheduler.getTrigger(triggerKey); JobKey jobKey = trigger.getJobKey(); JobInfo jobInfo = jobHandler.getJobInfo(jobKey.getGroup(), jobKey.getName()); jobInfos.add(jobInfo); } } return jobInfos; } /** * 添加任务 */ @PostMapping("/add") public JobInfo addJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ClassNotFoundException { jobHandler.addJob(jobInfo); return jobInfo; } /** * 暂停任务 */ @RequestMapping("/pause") public void pauseJob(@RequestParam("jobGroup") String jobGroup, @RequestParam("jobName") String jobName) throws SchedulerException { jobHandler.pauseJob(jobGroup, jobName); } /** * 继续任务 */ @RequestMapping("/continue") public void continueJob(@RequestParam("jobGroup") String jobGroup, @RequestParam("jobName") String jobName) throws SchedulerException { jobHandler.continueJob(jobGroup, jobName); } /** * 删除任务 */ @RequestMapping("/delete") public boolean deleteJob(@RequestParam("jobGroup") String jobGroup, @RequestParam("jobName") String jobName) throws SchedulerException { return jobHandler.deleteJob(jobGroup, jobName); } }
当添加了任务之后,重启服务的时候,会自动开启之前添加的任务,如果需要停机关闭任务,还需要其他的操作关闭任务。