文章目录
- 1. 概要
- 2. TimingWheel
- 2.1 核心参数
- 2.2 添加任务
- 2.3 推进时间
- 3. TimerTaskList
- 3.1 添加节点
- 3.2 删除节点
- 3.3 刷新链表
- 3.4 队列相关
- 4. 时间轮链表节点-TimerTaskEntry
- 5. TimerTask
- 6. Timer 和 SystemTimer - 设计降级逻辑
- 7. 上层调用
- 8. 小结
1. 概要
时间轮的文章:
- 定时/延时任务-Netty时间轮的使用
- 定时/延时任务-时间轮
- 定时/延时任务-实现一个简单时间轮
- 定时/延时任务-实现一个分层时间轮
- 定时/延时任务-Netty时间轮源码分析
上一篇文章中介绍了 Netty 时间轮的源码分析,这篇文章就接着来看下 Kafka 的源码分析,由于 Kafka 是使用的 Scala 语言,所以可能会有点难分析,不过还是会尽量说清楚的。
2. TimingWheel
2.1 核心参数
Kafka的时间轮实现比较简单,主要核心参数就在 TimingWheel 里面,那么下面就先看下核心参数:
@nonthreadsafe
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {...
}
首先这里就是 TimingWheel 的类定义,同时构造函数参数列表:
- tickMs: Long:表示时间轮的每个时间刻度(tick)的毫秒数
- wheelSize: Int:表示时间轮的大小(即有多少个时间刻度)
- startMs: Long:表示时间轮的起始时间(以毫秒为单位)
- taskCounter: AtomicInteger:用来计数的原子整数,表示任务的数量
- queue: DelayQueue[TimerTaskList]:延迟队列,用于存储时间轮的任务列表
上面几个就是时间轮构造参数,下面就是时间轮的几个核心参数的构造:
private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs)
private[this] var overflowWheel: TimingWheel = null
- 首先是
interval
,这个就是我们之前说的一层时间轮的时间间隔,在分层时间轮下,当前层的时间轮间隔就是当前时间轮的格子数(wheelSize) * 每一格的时间间隔(tickMs )
- buckets 就是当前时间轮中的时间格子数组,从代码中也能看到,其实这里做的就是创建一个
wheelSize
长度的数组,然后分别初始化 - currentTime 就是当前时间,startMs 就是时间轮的启动时间,假设启动时间是 43ms,一个 tick 的时间是 20ms,那么当前时间就是
startMs - (startMs % tickMs)
,结果就是43 - 43 % 20 = 40
,currentTime 就是控制指针跳动的时间 - overflowWheel 就是上层时间轮,上层时间轮的时间间隔
tickMs
就是本层时间轮的 interval,看上面的图就可以看懂。
2.2 添加任务
def add(timerTaskEntry: TimerTaskEntry): Boolean = {// 任务的过期时间val expiration = timerTaskEntry.expirationMs // 任务取消了if (timerTaskEntry.cancelled) {false} else if (expiration < currentTime + tickMs) {// 如果任务过期时间已经小于当前时间格子的时间,就说明要执行了false} else if (expiration < currentTime + interval) {// 如果过期时间 < currentTime + 本层时间轮的时间间隔,就说明// 任务可以添加到本层时间轮中val virtualId = expiration / tickMs// 获取对应下标的时间格子val bucket = buckets((virtualId % wheelSize.toLong).toInt)// 添加任务bucket.add(timerTaskEntry)// 然后设置这个格子的过期时间添加到任务队列里面if (bucket.setExpiration(virtualId * tickMs)) {queue.offer(bucket)}true} else {// 都不满足,那就说明过期时间已经超过本层时间轮的管理范围了,需要// 到上层时间轮去加入任务if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}
}
其实里面的逻辑很简单
- 首先判断下如果任务已经取消了,就直接返回 false,添加失败
- 判断下如果添加的任务的过期时间已经小于
当前时间 + 一格时间时长
,那么表示这个任务已经过期了,需要执行 - 如果添加的任务小于
当前时间 + 本层时间轮的总时间
,那么这个任务还没有执行,并且这个任务可以添加到本层时间轮中 - 否则就是说本层时间轮没办法管理这个任务,需要把这个任务添加到上层时间轮中
如果上层时间轮为空,那么同时也会创建出上层时间轮
private[this] def addOverflowWheel(): Unit = {// 加锁synchronized {if (overflowWheel == null) {// 创建上层时间轮overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}}}
对当前线程加锁,然后创建上层时间轮,注意上层时间轮的启动时间是当前时间 currentTime
,同时上层时间轮的 tickMs
时间间隔是本层时间轮的时间跨度 interval
。注意所有时间轮使用一个延时队列。
2.3 推进时间
def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {// 设置当前时间currentTime = timeMs - (timeMs % tickMs)// 同时也推进上层时间轮if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}
}
设置当前时间,其实所谓的推进时间就是推进当前时间,在上面也说过了,当前时间的计算就是:timeMs - (timeMs % tickMs)
,同时除了推进当前时间轮,还需要推进上层时间轮的时间指针。
3. TimerTaskList
下面就来看下时间轮上面的链表定义,还是一样,我们先看里面的参数定义,因为链表其实参数并不多,所以不需要一个一个拿出来介绍
private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = rootprivate[this] val expiration = new AtomicLong(-1L)def setExpiration(expirationMs: Long): Boolean = {expiration.getAndSet(expirationMs) != expirationMs}
上面几个就是参数了,链表肯定要有头尾节点了,不过 kafka 这里是用了一个 root 节点同时作为头尾节点,只有一个节点的时候就指向自己。
同时因为时间轮存放到任务队列里面是以 TimerTaskList 为单位去存储的,为什么会这样呢?前面的文章里面说过,一个链表上面的任务延时等级是一样的,所以没必要以任务节点为单位去存储,这样如果一个链表上面有 100000
个任务,延时队列里面就得放 100000
个节点,我们直到延时队列时间复杂度是 O(logn),节点一旦比较多,消耗的时间就多了。而且这 10000 个节点的过期时间是一样的,所以用一个 TimerTaskList 来代替就行了。所以延时队列的节点就是一个个的 TimerTaskList
。然后再看下下面的方法。
3.1 添加节点
其实添加节点就是双向链表的添加逻辑。
def add(timerTaskEntry: TimerTaskEntry): Unit = {var done = falsewhile (!done) {// 先删除这个任务timerTaskEntry.remove()synchronized {// 加锁timerTaskEntry.synchronized {if (timerTaskEntry.list == null) {// 链表结构 tail -> timerTaskEntry -> root// tail = rootval tail = root.prevtimerTaskEntry.next = roottimerTaskEntry.prev = tailtimerTaskEntry.list = thistail.next = timerTaskEntryroot.prev = timerTaskEntrytaskCounter.incrementAndGet()done = true}}}}
}
在添加任务到链表的时候首先会去删除一下这个任务,确保这个任务没有在先前被添加到时间轮中,然后加锁去添加。添加的时候其实就是形成 tail -> timerTaskEntry -> root 的结构(双向的)
,因为 tail = root,所以就是一个环形的双向链表。
3.2 删除节点
def remove(timerTaskEntry: TimerTaskEntry): Unit = {synchronized {timerTaskEntry.synchronized {if (timerTaskEntry.list eq this) {timerTaskEntry.next.prev = timerTaskEntry.prevtimerTaskEntry.prev.next = timerTaskEntry.nexttimerTaskEntry.next = nulltimerTaskEntry.prev = nulltimerTaskEntry.list = nulltaskCounter.decrementAndGet()}}}
}
删除节点的逻辑也很简单,就是两步:
- timerTaskEntry.next.prev = timerTaskEntry.prev
- timerTaskEntry.prev.next = timerTaskEntry.next
最后再把当前 timerTaskEntry 的属性都置空,然后让任务数量 - 1 就可以了。
3.3 刷新链表
所谓刷新链表,就是把这个链表上面的所有任务都删掉,然后执行传入的函数,这个方法是当链表过期的时候,就把上面的所有任务都删掉,然后一个一个任务执行具体逻辑。
// f 类似 Java8 里面的 function,其实这里就是传入一个 f 函数去处理节点
def flush(f: (TimerTaskEntry)=>Unit): Unit = {synchronized {// 从头结点开始遍历var head = root.nextwhile (head ne root) {// 调用上面的删除节点方法把节点从链表中移除掉remove(head)// 调用函数把任务添加到线程池中等待调度f(head)// 继续下一个节点head = root.next}// 链表都没有任务了,当然过期时间就设置成 -1 了expiration.set(-1L)}
}
其实里面的 f 函数的逻辑就是把这个任务节点丢到线程池中等待线程去调度,也就是具体执行任务。
3.4 队列相关
那既然 TimerTaskList 是要加入延时队列的,肯定要有一个获取延时和比较的方法了
def getDelay(unit: TimeUnit): Long = {unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}def compareTo(d: Delayed): Int = {val other = d.asInstanceOf[TimerTaskList]java.lang.Long.compare(getExpiration, other.getExpiration)
}
- getDelay 就是获取延时,这里其实就是用任务的
过期时间 - 当前时间
,如果小于 0,最后就会返回 0,表示可以马上开始执行了 - compareTo 就是任务队列里面比较两个任务的延时时间
4. 时间轮链表节点-TimerTaskEntry
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {...
}
这里就是链表节点了,两个参数分别是任务和延时时间,因为内容确实不多,所以下面直接给出所有的逻辑。
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {// 所属的链表@volatilevar list: TimerTaskList = null// 链表前后节点var next: TimerTaskEntry = nullvar prev: TimerTaskEntry = null// 设置定时任务if (timerTask != null) timerTask.setTimerTaskEntry(this)// 任务是否取消了,如果任务取消了就会从当前节点中删掉,所以下面就是// 判断下这个任务的所属的链表节点还是不是自己def cancelled: Boolean = {timerTask.getTimerTaskEntry != this}// 把当前节点从链表中删除掉def remove(): Unit = {var currentList = listwhile (currentList != null) {currentList.remove(this)currentList = list}}// 比较两个任务的延时时间override def compare(that: TimerTaskEntry): Int = {java.lang.Long.compare(expirationMs, that.expirationMs)}
}
说实话上面的逻辑确实难看懂,没学过 scala 语言的话,我也只是大概大概翻译下。核心逻辑能看懂就行了。
5. TimerTask
trait TimerTask extends Runnable {...
}
在 Scala 语言中,trait TimerTask extends Runnable 是一个特质(trait)声明,表示该特质继承自 Runnable 接口。
- trait 是 Scala 中的一个特性,类似于 Java 中的接口(interface)。
- 特质可以包含抽象方法和具体方法,也可以有字段和实现。
- 与 Java 接口不同,Scala 特质可以混合(mixin)到类中,实现多重继承的效果。
来看下里面的一些参数:
val delayMs: Long // timestamp in millisecond
private[this] var timerTaskEntry: TimerTaskEntry = null
首先就是任务的延时时间 delayMs
,然后就是这个任务属于哪一个链表节点。再来看下面的几个方法:
// 任务取消
def cancel(): Unit = {synchronized {// 就是把任务从链表节点中移除掉if (timerTaskEntry != null) timerTaskEntry.remove()timerTaskEntry = null}
}// 设置任务到链表节点上
private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {synchronized {// 如果任务所属链表节点不等于要设置的节点,说明这个任务有可能原来在另一条// 链表上,现在要加入当前的链表,所以需要首先把任务节点从原来的链表上移除// 因为任务都不在原来的链表上了,节点肯定也带删掉if (timerTaskEntry != null && timerTaskEntry != entry)// 就把当前节点从链表上移除掉timerTaskEntry.remove()// 然后设置所属节点为传入的节点timerTaskEntry = entry}
}// 获取当前任务所属的链表节点
private[timer] def getTimerTaskEntry: TimerTaskEntry = timerTaskEntry
其实里面的方法并不多,下面简单来说下:
- 任务取消就是把链表节点从链表中移除掉,同时把当前任务所属的链表节点置空,逻辑不复杂,因为任务都要删除了,链表节点肯定不能继续待在链表中的
- 设置任务到新的链表节点,这里面如果发现这个任务原来已经设置过了,现在要设置到一个新的链表上,就需要先把当前节点从链表上移除掉,然后再重新设置新的节点
6. Timer 和 SystemTimer - 设计降级逻辑
Timer 是 Scala 定义的一个接口,包括几种时间轮的方法,下面就来简单看下:
trait Timer {/*** Add a new task to this executor. It will be executed after the task's delay* (beginning from the time of submission)* @param timerTask the task to add*/def add(timerTask: TimerTask): Unit/*** Advance the internal clock, executing any tasks whose expiration has been* reached within the duration of the passed timeout.* @param timeoutMs* @return whether or not any tasks were executed*/def advanceClock(timeoutMs: Long): Boolean/*** Get the number of tasks pending execution* @return the number of tasks*/def size: Int/*** Shutdown the timer service, leaving pending tasks unexecuted*/def shutdown(): Unit
}
可以看到 Timer 接口里面定义的四个方法分别就是:添加、推进时间轮、时间轮任务数、关闭时间轮,那下面就来看下 Timer 的实现类 SystemTimer,SystemTimer 也是时间轮的顶层管理类。
@threadsafe
class SystemTimer(executorName: String,tickMs: Long = 1,wheelSize: Int = 20,startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {...
}
可以看到,SystemTimer 接收几个参数
- executorName:执行任务的线程名称
- tickMs:默认的最底层时间轮的时间间隔
- wheelSize:每一层时间轮的大小
- startMs:启动时间,就是当前时间
下面来看下几个变量:
// 执行任务的线程池
private[this] val taskExecutor = Executors.newFixedThreadPool(1,(runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))// 延时队列
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
// 任务数量
private[this] val taskCounter = new AtomicInteger(0)
// 时间轮
private[this] val timingWheel = new TimingWheel(tickMs = tickMs,wheelSize = wheelSize,startMs = startMs,taskCounter = taskCounter,delayQueue
)// 读写锁
private[this] val readWriteLock = new ReentrantReadWriteLock()
private[this] val readLock = readWriteLock.readLock()
private[this] val writeLock = readWriteLock.writeLock()
下面看下几个方法,首先就是添加任务的方法
def add(timerTask: TimerTask): Unit = {// 加锁readLock.lock()try {// 添加任务,创建一个链表节点,把任务放到链表节点中// 再调用 addTimerTaskEntry 把链表节点添加到链表上addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))} finally {// 解锁readLock.unlock()}}private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// 调用时间轮TimingWheel 的add方法添加if (!timingWheel.add(timerTaskEntry)) {// 添加失败的情况下要么就是过期了,要么就是取消了if (!timerTaskEntry.cancelled)// 如果不是取消了,那么就要执行这个过期任务taskExecutor.submit(timerTaskEntry.timerTask)}}
在添加任务任务的时候,如果任务已经过期了或者任务被取消了,那么就会判断,如果不是任务取消,就会把任务丢到线程池里面去执行。上面就是添加的方法,下面再看下推进时间轮的方法。
def advanceClock(timeoutMs: Long): Boolean = {
// 从延时队列里面获取过期链表,超时时间 timeoutMs
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)// 如果不为空if (bucket != null) {writeLock.lock()try {while (bucket != null) {// 推进时间轮timingWheel.advanceClock(bucket.getExpiration)// 然后执行过期链表下面的所有任务bucket.flush(reinsert)// 继续阻塞bucket = delayQueue.poll()}} finally {// 解锁writeLock.unlock()}true} else {false}
}// 把链表节点重新添加回时间轮上
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) =>
addTimerTaskEntry(timerTaskEntry)private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// 重新添加回时间轮,如果添加失败再执行任务if (!timingWheel.add(timerTaskEntry)) {if (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}}
首先推进时间轮的时候会从延时队列里面获取过期的链表,第一次获取超时时间是 timeoutMs
,这个是上层调用者设置的。这里如果没有获取到任务,就不会往下走推进时间轮,其实这里就是解决了 Netty 时间轮的空轮转问题。
Netty 的时间轮是不断执行的,不管有没有任务过期都会去遍历当前 tick 的链表下面的所有任务,同时推进时间轮,看看有没有任务需要执行,所以如果 Netty 时间轮中有一个很长时间都不会执行的任务,在遍历的时候就做了很多 “无用功”。
Kafka 则是通过延时队列的方式,没有任务就不会去遍历推进时间轮,有了任务才会去处理。所以这也算是一种精确唤醒执行了。
推进时间轮的方法已经说了,当推进时间轮之后,就回去调用 bucket.flush(reinsert)
方法,前面我们说过 flush 会传入一个 f 函数用来执行过期链表上面的任务, 这个函数就是 reinsert
。
那么问题来了,不是说执行过期任务吗,为什么是重新把任务添加回时间轮上?
- 这就不得不说下分层时间轮的降级逻辑了,分层时间轮中上层时间轮的任务只有降级到最底层时间轮才能被执行。
- 比如现在时间轮的层级是 1 -> 2 -> 3,那么 3 号时间轮上面的任务要降级到 1 才能被执行。
- 那么如何才能降级呢?我们直到时间轮是不断被推进的,也就是 currentTime 是不断增大的,所以当链表节点重新添加回时间轮的时候,原本应该添加到 3 号时间轮的节点会添加到 2 号,同理 2 号的节点会添加到 1 号,还是不清楚的可以去看下概要里面的时间轮介绍。
最后刷新链表完成之后,继续阻塞在任务队列里面,不过这里阻塞就没有超时时间了,因为可以避免无意义的唤醒,防止空轮转,直到有任务才醒来。如果说时间轮添加了一个更快执行的任务,那么在添加方法里面就会往 delay 队列添加一个更早过期的节点,这里 SystemTimer 也会被更快唤醒。
7. 上层调用
上面就是时间轮的核心实现了,那么你可能会好奇,时间轮在哪被调用了,其实就是在 DelayedOperation.scala
里面执行。我们看下这个方法里面的 advanceClock
。
def advanceClock(timeoutMs: Long): Unit = {timeoutTimer.advanceClock(timeoutMs)if (estimatedTotalOperations.get - numDelayed > purgeInterval) {estimatedTotalOperations.getAndSet(numDelayed)debug("Begin purging watch lists")val purged = watcherLists.foldLeft(0) {case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum}debug("Purged %d elements from watch lists.".format(purged))}
}
timeoutTimer.advanceClock(timeoutMs)
推进时间轮, 下面的逻辑先就不用细看了。那么这个 advanceClock
方法又是在哪被调用了呢?
private class ExpiredOperationReaper extends ShutdownableThread("ExpirationReaper-%d-%s".format(brokerId, purgatoryName),false) {override def doWork(): Unit = {advanceClock(200L)}}
这个方法会每隔 200ms 推动一次时间轮,从而推动延时任务的执行。
8. 小结
好了,到这里分层时间轮 Kafka 的源码就写好了,下面还会介绍下 RocketMQ 的延时任务源码的逻辑,不过在这之前我会简单说下 SpringBoot 的定时延时任务,毕竟 Java 的框架里面肯定少不了 SpringBoot 的身影。至于 Dubbo,看了下里面的时间轮源码,跟 Netty 的基本一模一样, Netty 在上一篇文章里面也介绍过了,所以后面就不再介绍。
如有错误,欢迎提出!!!