Quartz是一个开源的任务调度框架,负责任务进度管理(就是一个在预先被纳入日程,当时间到达时,负责执行(或者通知)其他软件组件的系统),由OpenSymphony(一个开源组织)开发,这个框架进行了优良地解耦设计,
Quartz主要功能,就是在设定的时间内干什么事情,比如说把redis中的缓存数据与数据库中的数据进行同步、定时发送信息、设置一个周三才开始的活动。Quartz是一个基于Java实现的任务调度框架,应该说叫定时任务调度框架。在Java领域,有很多定时任务框架,这里简单对比一下目前比较流行的三款:
Quartz主要分为三大部分:
下图是Quartz主要的关系图:
文字化的描述就是,Job通过实现类与Trigger进行绑定,一个Job可以绑定多个Trigger(实现了Quartz的异步通知),但是一个Trigger只能绑定一个Job,且Job的实现类与Trigger通过Group和name来标识唯一性,Quartz是使用多线程来处理任务的。
Quartz集群中每个节点是一个独立的Quartz任务应用,它又管理者其他节点。该集群需要分别对每个节点分别启动或停止,独立的Quartz节点并不与另一个节点或是管理节点通信。Quartz应用是通过共有相同数据库表来感知到另一应用。也就是说只有使用持久化JobStore存储Job和Trigger才能完成Quartz集群部署方案。
Quartz的集群部署方案是分布式的,没有负责集中管理的节点,而是利用数据库行锁的方式来实现集群环境下的并发控制。Quartz主要有两个行锁:
锁名 | 解释 |
---|---|
STATE_ACCESS | 状态访问锁 |
Trigger_ACCESS | 触发器访问锁 |
一个scheduler实例在集群模式下首先获取行锁才行。Quartz集群争用触发器行锁,锁被占用只能等待,获取触发器行锁之后,先获取需要等待触发的其他触发器信息。
定时任务的诸多要素,如任务名称、数量、状态、运行频率、运行时间等,是要存储起来的。JobStore,就是用来存储任务和触发器相关的信息的。Quartz 中有两种存储任务的方式,一种在在内存(RAMJobStore),一种是在数据库(JDBCJobStore)。
Quartz 默认的 JobStore 是 RAMJobstore,也就是把任务和触发器信息运行的信息存储在内存中,用到了 HashMap、TreeSet、HashSet 等等数据结构,如果程序崩溃或重启,所有存储在内存中的数据都会丢失。所以我们需要把这些数据持久化到磁盘。
# Default Properties file for use by StdSchedulerFactory # to create a Quartz Scheduler Instance, if a different # properties file is not explicitly specified. # org.quartz.scheduler.instanceName: DefaultQuartzScheduler org.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 10 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore # 标识Quartz的持久化存储是RAM
public class XXXJobs implements Job { @Autowired private XXXService xxxService; // 自己写的实现void execute()方法 @Override public void execute(JobExecutionContext context) throws JobExecutionException { log.debug("xxxQueueJob start..."); // 分布式锁,如果获取失败则返回 XXXJob proxy = ApplicationContextUtil.getBean(getClass()); proxy.jobHandler(); log.debug("xxxQueueJob end..."); } @JobToken @RedisLock(expiration = 20L) public void jobHandler() { // 定时任务核心逻辑 } }
Trigger是有状态的:NONE, NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED,状态之间转换关系如下图所示:
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "trigger-group") .startNow()//立即生效 .forJob(jobDetail) .withSchedule(SimpleScheduleBuilder .simpleSchedule() .withIntervalInSeconds(2) //每隔3s执行一次 .repeatForever()) // 永久循环 .build();
JobDataMap jobDataMap = jobDetail.getJobDataMap(); String name = jobDetail.getKey().getName(); String group = jobDetail.getKey().getGroup(); String jobName = jobDetail.getJobClass().getName();
public class HelloJob implements Job { @Override public void execute(JobExecutionContext context) throws JobExecutionException { Object tv1 = context.getTrigger().getJobDataMap().get("t1"); Object tv2 = context.getTrigger().getJobDataMap().get("t2"); Object jv1 = context.getJobDetail().getJobDataMap().get("j1"); Object jv2 = context.getJobDetail().getJobDataMap().get("j2"); Object sv = null; try { sv = context.getScheduler().getContext().get("skey"); } catch (SchedulerException e) { e.printStackTrace(); } System.out.println(tv1+":"+tv2); System.out.println(jv1+":"+jv2); System.out.println(sv); System.out.println("hello:"+LocalDateTime.now()); } } ----------------------------------------------------------------------------------------- public class Test { public static void main(String[] args) throws SchedulerException { //创建一个scheduler Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.getContext().put("skey", "svalue"); //创建一个Trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger1", "group1") .usingJobData("t1", "tv1") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3) .repeatForever()).build(); trigger.getJobDataMap().put("t2", "tv2"); //创建一个job JobDetail job = JobBuilder.newJob(HelloJob.class) .usingJobData("j1", "jv1") .withIdentity("myjob", "mygroup").build(); job.getJobDataMap().put("j2", "jv2"); //注册trigger并启动scheduler scheduler.scheduleJob(job,trigger); scheduler.start(); } }
在 Quartz 中三个核心模块分别是 Job、Trigger、Scheduler,既然 Quartz中存在监听器,相应的,这三者也分别有对应的监听器。监听器的作用便是用于当任务调度中你所关注事件发生时,能够及时获取这一事件的通知。监听器也有作用域,主要分为
三核心监听器:
任务调度中,与任务 Job 相关的事件包括: Job 开始要执行的提示,执行完成的提示,接口如下:
package org.quartz; public interface JobListener { String getName(); //用于获取改JobListener 的名称 // Scheduler 在 JobDetail 将要被执行时调用这个方法 void jobToBeExecuted(JobExecutionContext var1); // cheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决时会调用该方法 void jobExecutionVetoed(JobExecutionContext var1); // Scheduler 在 JobDetail 被执行之后调用这个方法 void jobWasExecuted(JobExecutionContext var1, JobExecutionException var2); }
将JobListener绑定到Scheduler中
//监听所有的Job scheduler.getListenerManager().addJobListener(new SimpleJobListener(), EverythingMatcher.allJobs()); //监听特定的Job scheduler.getListenerManager().addJobListener(new SimpleJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("HelloWorld1_Job", "HelloWorld1_Group"))); //监听同一任务组的Job scheduler.getListenerManager().addJobListener(new SimpleJobListener(), GroupMatcher.jobGroupEquals("HelloWorld2_Group")); //监听两个任务组的Job scheduler.getListenerManager().addJobListener(new SimpleJobListener(), OrMatcher.or(GroupMatcher.jobGroupEquals("HelloWorld1_Group"), GroupMatcher.jobGroupEquals("HelloWorld2_Group")));
任务调度中,与触发器 Trigger 相关的事件包括: 触发器触发、触发器未正常触发、触发器完成等,接口如下:
package org.quartz; import org.quartz.Trigger.CompletedExecutionInstruction; public interface TriggerListener { String getName(); //用于获取触发器的名称 // 当与监听器相关联的Trigger被触发,Job上的**execute()**方法将被执行时,Scheduler就调用该方法 void triggerFired(Trigger var1, JobExecutionContext var2); //在 Trigger 触发后,Job 将要被执行时由 Scheduler 调用这个方法。 // TriggerListener 给了一个选择去否决 Job 的执行。假如这个方法返回 true,这个 Job 将不会为此次 Trigger 触发而得到执行 boolean vetoJobExecution(Trigger var1, JobExecutionContext var2); // Scheduler 调用这个方法是在 Trigger 错过触发时 void triggerMisfired(Trigger var1); // Trigger 被触发并且完成了 Job 的执行时,Scheduler 调用这个方法 void triggerComplete(Trigger var1, JobExecutionContext var2, CompletedExecutionInstruction var3); }
将TriggerListener绑定到Scheduler中:
//监听所有的Trigger scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), EverythingMatcher.allTriggers()); //监听特定的Trigger scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), KeyMatcher.keyEquals(TriggerKey.triggerKey("HelloWord1_Job", "HelloWorld1_Group"))); //监听一组Trigger scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), GroupMatcher.groupEquals("HelloWorld1_Group")); //移除监听器 scheduler.getListenerManager().removeTriggerListener("SimpleTrigger");
SchedulerListener会在Scheduler的生命周期中关键事件发生时被调用。与Scheduler有关的事件包括:增加一个job/trigger,删除一个job/trigger,scheduler发生严重错误,关闭scheduler等。接口如下:
package org.quartz; public interface SchedulerListener { // 用于部署JobDetail时调用 void jobScheduled(Trigger var1); // 用于卸载JobDetail时调用 void jobUnscheduled(TriggerKey var1); // 当一个 Trigger 来到了再也不会触发的状态时调用这个方法 void triggerFinalized(Trigger var1); // Scheduler 调用这个方法是发生在一个 Trigger 或 Trigger 组被暂停时 void triggerPaused(TriggerKey var1); void triggersPaused(String var1); // Scheduler 调用这个方法是发生成一个 Trigger 或 Trigger 组从暂停中恢复时 void triggerResumed(TriggerKey var1); void triggersResumed(String var1); void jobAdded(JobDetail var1); void jobDeleted(JobKey var1); void jobPaused(JobKey var1); void jobsPaused(String var1); void jobResumed(JobKey var1); void jobsResumed(String var1); void schedulerError(String var1, SchedulerException var2); void schedulerInStandbyMode(); // 当Scheduler处于StandBy模式时,调用该方法 void schedulerStarted(); void schedulerStarting(); void schedulerShutdown(); void schedulerShuttingdown(); void schedulingDataCleared(); }
将SchedulerListener绑定到Scheduler中
//创建监听 scheduler.getListenerManager().addSchedulerListener(new SimpleSchedulerListener()); //移除监听 scheduler.getListenerManager().removeSchedulerListener(new SimpleSchedulerListener());
org.springframework.boot spring-boot-starter-quartz
import org.quartz.Job; import org.quartz.JobExecutionContext; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; public class SimpleJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) { // 创建一个事件,下面仅创建一个输出语句作演示 System.out.println(Thread.currentThread().getName() + "--" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())); } }
import com.quartz.demo.schedule.SimpleJob; import org.junit.jupiter.api.Test; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import java.util.Date; import java.util.concurrent.TimeUnit; public class SimpleQuartzTest { /* * 基于时间间隔的定时任务 */ @Test public void simpleTest() throws SchedulerException, InterruptedException { // 1、创建Scheduler(调度器) SchedulerFactory schedulerFactory = new StdSchedulerFactory(); Scheduler scheduler = schedulerFactory.getScheduler(); // 2、创建JobDetail实例,并与SimpleJob类绑定(Job执行内容) JobDetail jobDetail = JobBuilder.newJob(SimpleJob.class) .withIdentity("job1", "group1") .build(); // 3、构建Trigger(触发器),定义执行频率和时长 Trigger trigger = TriggerBuilder.newTrigger() // 指定group和name,这是唯一身份标识 .withIdentity("trigger-1", "trigger-group") .startNow() //立即生效 .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(2) //每隔2s执行一次 .repeatForever()) // 永久执行 .build(); //4、将Job和Trigger交给Scheduler调度 scheduler.scheduleJob(jobDetail, trigger); // 5、启动Scheduler scheduler.start(); // 休眠,决定调度器运行时间,这里设置30s TimeUnit.SECONDS.sleep(30); // 关闭Scheduler scheduler.shutdown(); } }
附
参考:https://blog.csdn.net/mu_wind/article/details/124257719
cron在线生成:https://cron.qqe2.com/