本文要点
- 1.Quartz相关重要概念
- 2.如何实现分布式调度
- 3.核心源码的实现
1.Quartz相关重要概念
1.1 scheduler:: 任务调度器
如图所示:
Scheduler 有三种:RemoteMBeanScheduler, RemoteScheduler 和 StdScheduler。
- RemoteMBeanScheduler:
- RemoteScheduler:
- StdScheduler:
1.2 trigger: 触发器,用于定义任务调度时间规则
Trigger 主要有三种:SimpleTrigger, CronTrigger,CalendarIntervalTrigger
SimpleTrigger:支持简单的频率调用的Trigger
CronTrigger:支持cron表达式的Trigger
CalendarIntervalTrigger:支持基于日历周期的Trigger
1.3 job:: 任务,即被调度的任务
主要有两种类型的 job:无状态的(stateless)和有状态的(stateful)。
对于同一个 trigger 来说,有状态(添加@DisallowConcurrentExecution注解
)的 job 不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。一个 job 可以被多个 trigger 关联,但是一个 trigger 只能关联一个 job。
1.4 misfire::错过的,指本来应该被执行但实际没有被执行的任务调度
-
系统因为某些原因被重启。在系统关闭到重新启动之间的一段时间里,可能有些任务会被 misfire;
-
Trigger 被暂停(suspend)的一段时间里,有些任务可能会被 misfire;
-
线程池中所有线程都被占用,导致任务无法被触发执行,造成 misfire;
-
有状态任务在下次触发时间到达时,上次执行还没有结束;
2.Quartz如何实现分布式调度
3.核心源码的实现
Quartz是基于DB锁来实现的分布式调度。
首先要明确Trigger有几个状态: WAITING(等待)、ACQUIRED(获取)、EXECUTING(执行)、COMPLETE(完成)、BLOCKED(阻塞)、ERROR(报错)、PAUSED(暂停)、PAUSED_BLOCKED(暂停阻塞)、DELETED(删除)
。
具体看下org.quartz.core.QuartzSchedulerThread
核心代码的具体实现
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
/**
* 默认没加锁
*
* 1.查询 TRIGGERS 状态是WAITING 的trigger
*
* 2.更新 TRIGGERS WAITING---->ACQUIRED
*
* 3.插入FIRED_TRIGGERS表
*/
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime()