publicHashedWheelTimer( ThreadFactory threadFactory, // 用来创建worker线程 long tickDuration, // tick的时长,也就是指针多久转一格 TimeUnit unit, // tickDuration的时间单位 int ticksPerWheel, // 一圈有几格 boolean leakDetection // 是否开启内存泄露检测 ){
// 一些参数校验 if (threadFactory == null) { thrownew NullPointerException("threadFactory"); } if (unit == null) { thrownew NullPointerException("unit"); } if (tickDuration <= 0) { thrownew IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { thrownew IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); }
privatestatic HashedWheelBucket[] createWheel(int ticksPerWheel) { // 一些参数校验 if (ticksPerWheel <= 0) { thrownew IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { thrownew IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); }
// 初始化ticksPerWheel的值为不小于ticksPerWheel的最小2的n次方 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); // 初始化wheel数组 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
normalizeTicksPerWheel()的代码:
// 初始化ticksPerWheel的值为不小于ticksPerWheel的最小2的n次方 privatestaticintnormalizeTicksPerWheel(int ticksPerWheel){ int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }
privateintnormalizeTicksPerWheel(int ticksPerWheel){ // 这里参考java8 hashmap的算法,使推算的过程固定 int n = ticksPerWheel - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; // 这里1073741824 = 2^30,防止溢出 return (n < 0) ? 1 : (n >= 1073741824) ? 1073741824 : n + 1; }
HashedWheelTimer源码之启动、停止与添加任务
start()启动时间轮的方法:
// 启动时间轮。这个方法其实不需要显示的主动调用,因为在添加定时任务(newTimeout()方法)的时候会自动调用此方法。 // 这个是合理的设计,因为如果时间轮里根本没有定时任务,启动时间轮也是空耗资源 publicvoidstart(){ // 判断当前时间轮的状态,如果是初始化,则启动worker线程,启动整个时间轮;如果已经启动则略过;如果是已经停止,则报错 // 这里是一个Lock Free的设计。因为可能有多个线程调用启动方法,这里使用AtomicIntegerFieldUpdater原子的更新时间轮的状态 switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: thrownew IllegalStateException("cannot be started once stopped"); default: thrownew Error("Invalid WorkerState"); }
// 等待worker线程初始化时间轮的启动时间 while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
public Set<Timeout> stop(){ // worker线程不能停止时间轮,也就是加入的定时任务,不能调用这个方法。 // 不然会有恶意的定时任务调用这个方法而造成大量定时任务失效 if (Thread.currentThread() == workerThread) { thrownew IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } // 尝试CAS替换当前状态为“停止:2”。如果失败,则当前时间轮的状态只能是“初始化:0”或者“停止:2”。直接将当前状态设置为“停止:2“ if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit){ // 参数校验 if (task == null) { thrownew NullPointerException("task"); } if (unit == null) { thrownew NullPointerException("unit"); } // 如果时间轮没有启动,则启动 start();
// Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. // 计算任务的deadline long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // 这里定时任务不是直接加到对应的格子中,而是先加入到一个队列里,然后等到下一个tick的时候,会从队列里取出最多100000个任务加入到指定的格子中 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
// 基础的链表移除node操作 publicvoidremove(HashedWheelTimeout timeout){ HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; }
if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } elseif (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null; }
/** * Clear this bucket and return all not expired / cancelled {@link Timeout}s. */ publicvoidclearTimeouts(Set<Timeout> set){ for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } }
// 链表的poll操作 private HashedWheelTimeout pollTimeout(){ HashedWheelTimeout head = this.head; if (head == null) { returnnull; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; }
// null out prev and next to allow for GC. head.next = null; head.prev = null; head.bucket = null; return head; } }
HashedWheelTimer源码之Worker
Worker是时间轮的核心线程类。tick的转动,过期任务的处理都是在这个线程中处理的。
privatefinalclassWorkerimplementsRunnable{ privatefinal Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// 将取消的任务取出,并从格子中移除 privatevoidprocessCancelledTasks(){ for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } }
/** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ //sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长 privatelongwaitForNextTick(){ //下次tick的时间点, 用于计算需要sleep的时间 long deadline = tickDuration * (tick + 1);
// Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) { // 这里是因为windows平台的定时调度最小单位为10ms,如果不是10ms的倍数,可能会引起sleep时间不准确 sleepTimeMs = sleepTimeMs / 10 * 10; }