2023-08-16  阅读(326)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

时间轮,是一种高效的、批量管理定时任务的调度模型 。我在《透彻理解Java网络编程》《透彻理解Kafka》两个专栏中,分别介绍过Netty和Kafka中的时间轮算法实现。

在Dubbo中,对时间轮的应用主要体现在如下两个方面:

  • 失败重试: 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等;
  • 周期性定时任务: 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制。

本章,我将对Apache Dubbo中的时间轮算法进行讲解。在Dubbo中,实现时间轮算法的思路和Netty几乎是完全一样的:时间轮是一种环形结构,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务,指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。通过时间轮算法,可以将定时任务的存取操作以及取消操作的时间复杂度降为 O(1),非常适合海量定时任务的调度管理。

202308162140226751.png

一、核心接口

关于时间轮算法的介绍我就不赘述了,本章我主要讲解Dubbo中的时间轮算法实现。Dubbo 的时间轮实现位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包中:

202308162140243452.png

一共四个包含四个核心类:

  • Timer:时间轮调度器,该接口提供了两个核心方法:创建任务 newTimeout() 、停止所有未执行任务 stop()
  • TimerTask:时间轮任务,所有的定时任务都要继承该接口;
  • Timeout:与 TimerTask 对象是一对一的关系,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象。通过 Timeout 对象,不仅可以查看定时任务的状态,还可以取消定时任务;
  • HashedWheelTimer:Timer接口的时间轮算法实现类。

202308162140249223.png

1.1 Timer

Timer 接口定义了定时器的基本行为,如下所示:

    public interface Timer {
    
        /**
         * 在指定的delay时间后,调度一个定时任务.
         */
        Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
    
        /**
         * 停止所有尚未执行的任务
         */
        Set<Timeout> stop();
    
        /**
         * 判断当前Timer是否已经停止
         */
        boolean isStop();
    }

上述方法的核心是 newTimeout() :提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,类似于向线程池提交任务返回一个Future对象。

1.2 TimerTask

TimerTask代表了一个定时任务,该接口非常简单,只定义了一个 run() 方法:

    public interface TimerTask {
    
        /**
         * 在指定延时后,执行任务
         */
        void run(Timeout timeout) throws Exception;
    }

1.3 Timeout

Timeout 对象与 TimerTask 对象一一对应,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。通过 Timeout 对象,我们不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。Timeout 接口中的方法如下:

    public interface Timeout {
    
        /**
         * 返回关联的Timer
         */
        Timer timer();
    
        /**
         * 返回关联的TimerTask
         */
        TimerTask task();
    
        /**
         * 判断关联的TimerTask是否已过期
         */
        boolean isExpired();
    
        /**
         * 判断关联的TimerTask是否已取消
         */
        boolean isCancelled();
    
        /**
         * 取消关联的TimerTask
         */
        boolean cancel();
    }

二、算法实现

HashedWheelTimer是Timer接口的时间轮算法实现,我们通过一个示例来看看该如何使用HashedWheelTimer:

    public class HashedWheelTimerTest {
    
        public static void main(String[] args) {
            Timer timer = new HashedWheelTimer();
    
            // 创建一个任务timeout1,10秒后执行
            Timeout timeout1 = timer.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) {
                    System.out.println("timeout1: " + new Date());
                }
            }, 10, TimeUnit.SECONDS);
    
            // 取消任务timeout1
            if (!timeout1.isExpired()) {
                timeout1.cancel();
            }
    
            // 创建一个任务,1秒后执行
            timer.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws InterruptedException {
                    System.out.println("timeout2: " + new Date());
                    Thread.sleep(5000);
                }
            }, 1, TimeUnit.SECONDS);
    
            // 创建一个任务,3秒后执行
            timer.newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) {
                    System.out.println("timeout3: " + new Date());
                }
            }, 3, TimeUnit.SECONDS);
        }
    }

HashedWheelTimer的内部结构如下图:

202308162140259254.png

2.1 HashedWheelTimeout

HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类,它扮演了两个角色:

  1. 时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器;
  2. 定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于查看和控制定时任务。

HashedWheelTimeout 中的核心字段和方法说明如下:

    // HashedWheelTimeout.java
    
    private static final class HashedWheelTimeout implements Timeout {
    
        // 定时任务状态:新建
        private static final int ST_INIT = 0;
        // 定时任务状态:已取消
        private static final int ST_CANCELLED = 1;
        // 定时任务状态:已过期
        private static final int ST_EXPIRED = 2;
    
        // 定时任务当前所处状态
        private volatile int state = ST_INIT;
    
        // 原子更新器,用于更新当前定时任务的状态
        private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
    
        // 所属时间轮
        private final HashedWheelTimer timer;
    
        // 实际被调度的任务
        private final TimerTask task;
    
        // 定时任务执行的时间,在创建HashedWheelTimeout时指定
        // 计算公式:currentTime + delay(任务延迟时间) - startTime(HashedWheelTimer的启动时间),单位为纳秒。
        private final long deadline;
    
        // 当前任务剩余的时钟周期数
        // 当任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长时,就出现了套圈的情况,需要该字段值表示剩余的时钟周期。
        long remainingRounds;
    
        // 当前定时任务在链表中的前驱节点
        HashedWheelTimeout next;
    
        // 后继节点
        HashedWheelTimeout prev;
    
        // 时间轮中的Bucket
        HashedWheelBucket bucket;
    
        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }
    
        @Override
        public boolean cancel() {
            // 设置任务状态为ST_CANCELLED
            if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }
            // 将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁
            timer.cancelledTimeouts.add(this);
            return true;
        }
    
        void remove() {
            // 获取所在的时间轮Bucket
            HashedWheelBucket bucket = this.bucket;
            if (bucket != null) {
                // 从时间轮删除该任务
                bucket.remove(this);
            } else {
                // 待执行的任务数减去1
                timer.pendingTimeouts.decrementAndGet();
            }
        }
    
        public void expire() {
            // 设置任务状态为ST_EXPIRED
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }
    
            try {
                // 立即同步执行当前任务
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }
    }

2.2 HashedWheelBucket

HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。

HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表了。

    // HashedWheelBucket.java
    
    private static final class HashedWheelBucket {
        // 链表头节点
        private HashedWheelTimeout head;
        // 链表尾节点
        private HashedWheelTimeout tail;
    
        /**
         * 新增 HashedWheelTimeout 到链表尾节点
         */
        void addTimeout(HashedWheelTimeout timeout) {
            assert timeout.bucket == null;
            timeout.bucket = this;
            // 链表尾插法
            if (head == null) {
                head = tail = timeout;
            } else {
                tail.next = timeout;
                timeout.prev = tail;
                tail = timeout;
            }
        }
        /**
         * 遍历链表中所有HashedWheelTimeout节点:
         * 1.如果任务到期,则remove()取出,然后调用expire()方法执行;
         * 2.如果任务已取消,则remove()取出后丢弃;
         * 3.如果任务未到期,则将remainingRounds字段值减去1
         */
        void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;
    
            // 遍历链接表所有节点
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                // 任务到期
                if (timeout.remainingRounds <= 0) {
                    // 取出任务
                    next = remove(timeout);
                    // 任务已到期
                    if (timeout.deadline <= deadline) {
                        // 立即执行
                        timeout.expire();
                    } else {
                        throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } 
                // 任务已取消
                else if (timeout.isCancelled()) {
                    // 直接抛弃任务
                    next = remove(timeout);
                } 
                // 任务未到期
                else {
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }
    
        /**
         * 从双向链表中移除指定的 HashedWheelTimeout 节点
         */
        public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
            // 双向链表操作,从链表移除节点
            HashedWheelTimeout next = timeout.next;
            if (timeout.prev != null) {
                timeout.prev.next = next;
            }
            if (timeout.next != null) {
                timeout.next.prev = timeout.prev;
            }
    
            if (timeout == head) {
                if (timeout == tail) {
                    tail = null;
                    head = null;
                } else {
                    head = next;
                }
            } else if (timeout == tail) {
                tail = timeout.prev;
            }
            timeout.prev = null;
            timeout.next = null;
            timeout.bucket = null;
            // 待执行的任务数减去1
            timeout.timer.pendingTimeouts.decrementAndGet();
            return next;
        }
    
        /**
         * 移除双向链表中所有已过期或已取消的任务,并添加到set集合中
         */
        void clearTimeouts(Set<Timeout> set) {
            for (;;) {
                HashedWheelTimeout timeout = pollTimeout();
                if (timeout == null) {
                    return;
                }
                if (timeout.isExpired() || timeout.isCancelled()) {
                    continue;
                }
                set.add(timeout);
            }
        }
    
        /**
         * 移除双向链表的头节点任务,并返回该任务
         */    
        private HashedWheelTimeout pollTimeout() {
            HashedWheelTimeout head = this.head;
            if (head == null) {
                return null;
            }
            HashedWheelTimeout next = head.next;
            if (next == null) {
                tail = this.head = null;
            } else {
                this.head = next;
                next.prev = null;
            }
            head.next = null;
            head.prev = null;
            head.bucket = null;
            return head;
        }
    }

2.3 HashedWheelTimer

HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减1。

核心字段

我们先来看下HashedWheelTimer的核心字段:

    // HashedWheelTimer.java
    
    public class HashedWheelTimer implements Timer {
        //...
    
        // 时间轮状态的原子修改器
        private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
    
        // 后台工作线程
        private final Worker worker = new Worker();        // Runnable对象
        private final Thread workerThread;                // 工作线程
    
        // 时间轮当前所处的状态:init、started、shutdown
        private volatile int workerState;
        private static final int WORKER_STATE_INIT = 0;
        private static final int WORKER_STATE_STARTED = 1;
        private static final int WORKER_STATE_SHUTDOWN = 2;
    
        // 时间指针每次加1所代表的实际时间间隔,单位纳秒
        private final long tickDuration;
    
        // 时间轮的环形队列,每个元素是一个槽,当指定时间轮槽数为n时,实际上会取大于且最靠近n的2的幂次方值
        private final HashedWheelBucket[] wheel;
    
        // 掩码,mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽
        private final int mask;
    
        // 暂存外部提交到时间轮中的定时任务
        private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
    
        // 暂存已取消的定时任务
        private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
    
        // 当前时间轮中剩余的定时任务总数
        private final AtomicLong pendingTimeouts = new AtomicLong(0);
    
        // 最大定时任务数
        private final long maxPendingTimeouts;
    
        // 当前时间轮的启动时间,提交到该时间轮的定时任务的deadline字段值以该时间戳为起点进行计算
        private volatile long startTime;
    }

构造函数

然后来看时间轮的构造:

    // HashedWheelTimer.java
    
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, 
                            int ticksPerWheel, long maxPendingTimeouts) {
    
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
    
        // 初始化时间轮
        wheel = createWheel(ticksPerWheel);
        // 掩码,用于后续计算每个任务的槽位
        mask = wheel.length - 1;
    
        // 间隔时长,纳秒
        this.tickDuration = unit.toNanos(tickDuration);
    
        // 防止溢出
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
        }
        // 工作线程
        workerThread = threadFactory.newThread(worker);
    
        // 最大能容纳的定数任务数
        this.maxPendingTimeouts = maxPendingTimeouts;
    
        // 时间轮对象数加1
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

createWheel方法用于创建HashedWheelTimer对象内部的时间槽:

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }
        // 时间槽数量调整为2的幂次
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        // 创建槽数组
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

提交任务

HashedWheelTimer对外提供了一个 newTimeout() 接口用于提交定时任务,在定时任务进入到 timeouts 队列之前会先调用 start()方法启动时间轮,内部会完成以下关键步骤:

  1. 确定时间轮的 startTime 字段;
  2. 启动 workerThread 线程,开始执行 worker 任务。
    // HashedWheelTimer.java
    
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
    
        // 任务数加1
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // 判断任务数是否超过限制的最大值
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                                                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                                                 + "timeouts (" + maxPendingTimeouts + ")");
        }
    
        // 启动工作线程
        start();
    
        // 计算任务的deadline:当前时间 + delay时间 - 时间轮创建时间
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    
        // 防止deadline溢出
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 将任务封装成HashedWheelTimeout
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        // 加入队列
        timeouts.add(timeout);
        return timeout;
    }

我们来看start方法,它的内部会启动工作线程,并且主线程会等待工作线程完成startTime的设置:

    // HashedWheelTimer.java
    
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    
    public void start() {
        // 判断当前时间轮的状态
        switch (WORKER_STATE_UPDATER.get(this)) {
            // INIT初始化
            case WORKER_STATE_INIT:
                // 更新时间轮状态
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    // 启动工作线程
                    workerThread.start();
                }
                break;
            // STARTED已启动
            case WORKER_STATE_STARTED:
                break;
            // SHUTDOWN已关闭
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }
    
        while (startTime == 0) {
            try {
                // 主线程在这里等待,直到工作线程启动
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
            }
        }
    }

工作线程

HashedWheelTimer的构造过程比较简单,主要就是创建了一个工作线程workerThread,并传入一个Woker对象,Woker本质是一个Runnabdle任务类,它的run方法由workerThread工作线程执行,会一直循环执行以下逻辑:

  1. 时间轮指针转动,工作线程等待当前tick结束;
  2. 清理已取消的定时任务,这些定时任务在用户取消时,会记录到cancelledTimeouts队列中,每次指针转动时,都会清理该队列;
  3. 将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中;
  4. 根据指针指向的当前时间槽,处理该槽位的双向链表中的所有定时任务。
    // Worker.java
    
    private final class Worker implements Runnable {
        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
    
        // 时间轮指针,一个步长为1的单调递增计数器
        private long tick;
    
        @Override
        public void run() {
            // 初始化时间轮的启动时间
            startTime = System.nanoTime();
            if (startTime == 0) {
                startTime = 1;
            }
    
            // 通知主线程放行,startTimeInitialized是一个CountdownLatch对象
            startTimeInitialized.countDown();
    
            do {
                // 1.等待tick结束
                final long deadline = waitForNextTick();
                if (deadline > 0) {    // 大于0说明没有被中断
                    // 计算时间槽位
                    int idx = (int) (tick & mask);
                    // 2.清理已取消的任务
                    processCancelledTasks();
                    // 3.将timeouts中的任务转移到各自对应的槽位
                    transferTimeoutsToBuckets();
                    // 4.处理当前槽位的双向链表中的所有定时任务
                    HashedWheelBucket bucket = wheel[idx];
                    bucket.expireTimeouts(deadline);
                    // 5.指向下一个时间槽
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    
            // 执行到这里,说明时间轮已经关闭
    
            // 遍历所有时间槽
            for (HashedWheelBucket bucket : wheel) {
                // 移除该槽的双向链表中的所有已过期或已取消的任务,把它们添加到unprocessedTimeouts集合中
                bucket.clearTimeouts(unprocessedTimeouts);
            }
    
            // 将timeouts队列中任务全部取出
            for (; ; ) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                // 如果任务还没取消
                if (!timeout.isCancelled()) {
                    // 添加任务到队列unprocessedTimeouts
                    unprocessedTimeouts.add(timeout);
                }
            }
            // 清理 cancelledTimeouts 队列中用户主动取消的定时任务
            processCancelledTasks();
        }
    
        private void transferTimeoutsToBuckets() {
            // 每次最多转移100000个任务
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    continue;
                }
    
                // 计算任务的剩余轮数
                long calculated = timeout.deadline / tickDuration;
                timeout.remainingRounds = (calculated - tick) / wheel.length;
    
                // 计算任务所属的槽位
                final long ticks = Math.max(calculated, tick);
                int stopIndex = (int) (ticks & mask);
    
                // 向指定槽位添加任务
                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }
    
        private void processCancelledTasks() {
            // 遍历已取消的任务
            for (; ; ) {
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    break;
                }
                try {
                    // 移除任务
                    timeout.remove();
                } catch (Throwable t) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("An exception was thrown while process a cancellation task", t);
                    }
                }
            }
        }
    
        /**
         * 等待当前tick的时间间隔结束,并返回结束时的当前时间
         */   
        private long waitForNextTick() {
            // 计算当前时间槽的deadline
            long deadline = tickDuration * (tick + 1);
    
            for (; ; ) {
                // 计算线程需要等待多久(sleepTimeMs),才会到达deadline
                final long currentTime = System.nanoTime() - startTime;
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
    
                // 已到达deadline
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        // 返回当前时间
                        return currentTime;
                    }
                }
                // Windows平台特殊处理
                if (isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }
    
                try {
                    // Sleep一段时间,等待deadline到来
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }
    
        Set<Timeout> unprocessedTimeouts() {
            return Collections.unmodifiableSet(unprocessedTimeouts);
        }
    }

从上述算法实现可以看出,Dubbo中的时间轮算法实现,基本就是copy了Netty的代码。

三、总结

本章,我再次对时间轮算法的思想及其实现进行了讲解。事实上在Dubbo中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成时,再向时间轮提交一次当前任务,这样就会在下个周期执行该任务。这样的话,即使当前任务执行出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。

当然,Netty和Dubbo实现的时间轮算法都有一定的局限,特别是不能适应 海量定时任务,且任务的开始时间跨度非常长的场景,比如有的是 1 分钟之后执行,有的是 1 小时之后执行,有的是 1 年之后执行 ,这种情况下就需要对时间轮算法进行优化,Kafka采用的解决方案就是 多层时间轮+DelayQueue结构时间槽 ,具体我不再赘述了,读者可以自己去看下Kafka的实现,或者参考我的专栏《透彻理解Java网络编程》中的内容。

阅读全文
  • 点赞