2023-08-02  阅读(319)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/354

本章,我将对Netty中的 Recycler对象池 进行讲解。所谓对象池,顾名思义,就是程序对象(Java对象)的一个缓存池。与内存池类似,对象池的目的也是为了提升 Netty 的并发处理能力,避免频繁创建和销毁对象所带来的性能损耗。

那么,Netty是如何实现对象池的?我们在实践中又该如何运用对象池呢?带着这两个问题,我们来看Netty中Recycler对象池的设计与实现。

一、Recycler简介

Recycler是Netty实现的轻量级对象回收站,借助 Recycler 可以完成对象的获取和回收。

1.1 基本使用

我们通过一个例子直观感受下 Recycler 如何使用。

假设我们有一个User类,需要实现User对象的复用。首先,我们定义一个UserRecycler对象池:

    // UserRecycler.java
    
    public class UserRecycler extends Recycler<User> {
        @Override
        protected User newObject(Handle handle) {
            return new User(handle);
        }
    }

User对象:

    // User.java
    
    public class User {
    
        private Recycler.Handle<User> handle;
        private String name;
    
        public User(Recycler.Handle<User> handle) {
            this.handle = handle;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public void recycle() {
            handle.recycle(this);
        }
    }

测试用例:

    public class RecyclerTest {
    
        private static final UserRecycler userRecycler = new UserRecycler();
    
        public static void main(String[] args) {
    
            // 1.从对象池获取User对象
            User user1 = userRecycler.get();
            user1.setName("hello");
    
            // 2.回收对象
            user1.recycle();
    
            // 3.从对象池获取对象
            User user2 = userRecycler.get();
    
            System.out.println(user2.getName());
            System.out.println(user1 == user2);
        }
    }

打印结果如下:

    hello
    true

1.2 内部结构

Recycler的内部结构,如下图所示:

202308022227435391.png

可以看到,Recycler一共包含四个核心组件: StackWeakOrderQueueLinkDefaultHandle 。各个组件的关系,可以通过下图描述:

202308022227442532.png

Stack

Stack是每个线程私有的,用于存储当前线程回收的对象。Netty 为了避免多线程场景下的锁竞争问题,每个线程都会持有自己的对象池,内部通过FastThreadLocal来实现线程私有化。

FastThreadLocal 可以理解为Netty里的 ThreadLocal,后续我章节会专门对它进行讲解。

来看下Stack的源码:

    // Recycler.Stack.java
    
    private static final class Stack<T> {
           // 所属的Recycler
        final Recycler<T> parent;
    
        // 所属线程的弱引用
        final WeakReference<Thread> threadRef;
    
        // WeakOrderQueue最大个数
        private final int maxDelayedQueues;
    
        // 其它线程回收对象时,最多可以回收当前线程创建的对象个数
        final AtomicInteger availableSharedCapacity;
    
        // 对象池的最大大小,默认4k
        private final int maxCapacity;
    
        // 存储缓存数据的数组
        DefaultHandle<?>[] elements;
    
        // 缓存的DefaultHandle对象个数
        int size;
    
        // WeakOrderQueue链表的三个重要指针
        private WeakOrderQueue cursor, prev;
        private volatile WeakOrderQueue head;
    
        //...
    }

Stack内部有一个WeakOrderQueue链表,链表的每个节点指向一个WeakOrderQueue队列,每个队列里保存着 其它线程所释放的对象

比如,ThreadA 表示当前线程,那么它Stack内部的WeakOrderQueue链表,就存储着 ThreadB、ThreadC 等其它线程释放的对象。

availableSharedCapacity字段的默认值为:new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)) = 16K。它的含义是:其它线程在回收对象时,最多可以回收 ThreadA 创建的对象个数不能超过 availableSharedCapacity。

WeakOrderQueue

WeakOrderQueue 用于存储其它线程回收当前线程所分配的对象,并且在合适的时机,Stack会从其它线程的 WeakOrderQueue 中收割对象。比如,ThreadB从ThreadA回收对象时,会将回收到的对象放到ThreadA的 WeakOrderQueue中。

Link

每个 WeakOrderQueue 中都包含一个 Link 链表,回收对象都会被存在 Link 链表中的节点上,每个 Link 节点默认存储 16 个对象,当每个 Link 节点存储满了会创建新的 Link 节点放入链表尾部。

DefaultHandle

DefaultHandle 实例中保存了实际回收的对象,Stack 和 WeakOrderQueue 都使用 DefaultHandle 存储回收的对象。

二、Recycler原理

对 Recycler 有了初步认识后,我们再来看从 Recycler 获取对象和回收对象的原理。

2.1 获取对象

从对象池中获取对象的入口是在Recycler.get()方法,该方法的逻辑非常清晰:

  1. 首先,通过 FastThreadLocal 获取当前线程的私有Stack;
  2. 尝试出栈一个 DefaultHandle 对象,如果结果为空说明 Stack 中没有可用 DefaultHandle 对象,则调用newObject生成一个新对象,并完成handle与对象和Stack的绑定。
    // Recycler.java
    
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
    
        // 获取当前线程的私有Stack
        Stack<T> stack = threadLocal.get();
        // 出栈一个DefaultHandle对象
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            // 创建对象,并保存到 DefaultHandle
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

我们再来看 Stack的出栈操作,跟进下Stack.pop()的源码:

    // Recycler.Stack.java
    
    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {
            // 尝试从其它线程回收的对象中转移一些到自己的elements数组中
            if (!scavenge()) {
                return null;
            }
            size = this.size;
            if (size <= 0) {
                // double check, avoid races
                return null;
            }
        }
        // 从栈顶弹出元素
        size --;
        DefaultHandle ret = elements[size];
        elements[size] = null;
        this.size = size;
    
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        return ret;
    }

整个逻辑也很清晰:

  1. 如果当前线程的私有Stack中有可用的对象,直接将对象出栈;
  2. 如果 elements 数组中没有可用的对象,则调用 scavenge 方法。scavenge 的作用是从其它线程回收的对象中转移一些到 elements 数组当中。

Stack.scavenge()方法非常有意思,它会想办法从 WeakOrderQueue 链表中迁移部分对象。它的设计思想与J.U.C中的ForkJoinPool有些类似,ForkJoinPool采用了“工作窃取”算法,也是从其它线程中“窃取”对象执行。

每个 Stack 都有一个 WeakOrderQueue 链表,链表中的每个 WeakOrderQueue 中保存了其它线程回收的对象:

    // Recycler.Stack.java
    
    private boolean scavenge() {
        // 尝试从 WeakOrderQueue 中转移对象到Stack中
        if (scavengeSome()) {
            return true;
        }
    
        // 如果迁移失败,就会重置cursor指针到head节点
        prev = null;
        cursor = head;
        return false;
    }

继续看scavengeSome方法:

    // Recycler.Stack.java
    
    private boolean scavengeSome() {
        WeakOrderQueue prev;
        // cursor指针指向当前WeakorderQueueu链表的读取位置
        WeakOrderQueue cursor = this.cursor;
        // 如果cursor指针为null, 则是第一次从WeakorderQueueu链表中获取对象
        if (cursor == null) {
            prev = null;
            cursor = head;
            if (cursor == null) {
                return false;
            }
        } else {
            prev = this.prev;
        }
    
        // 不断循环从WeakOrderQueue链表中找到一个可用的对象实例
        boolean success = false;
        do {
             // 尝试迁移WeakOrderQueue中部分对象实例到Stack中
            if (cursor.transfer(this)) {
                success = true;
                break;
            }
            WeakOrderQueue next = cursor.getNext();
            if (cursor.get() == null) {
                // 如果已退出的线程还有数据
                if (cursor.hasFinalData()) {
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }
                // 将已退出的线程从WeakOrderQueue链表中移除
                if (prev != null) {
                    cursor.reclaimAllSpaceAndUnlink();
                    prev.setNext(next);
                }
            } else {
                prev = cursor;
            }
            // 将cursor指针指向下一个WeakOrderQueue
            cursor = next;
        } while (cursor != null && !success);
        this.prev = prev;
        this.cursor = cursor;
        return success;
    }

scavenge 的源码中,首先会从 cursor 指针指向的 WeakOrderQueue 节点回收部分对象到 Stack 的 elements 数组中,如果没有回收到数据就会将 cursor 指针移到下一个 WeakOrderQueue,重复执行以上过程直至回收到对象为止,可以通过下图来理解:

202308022227472953.png

2.2 回收对象

理解了如何从 Recycler 获取对象之后,我们再来看 Recycler 是如何回收对象的,直接定位到对象回收的源码入口 DefaultHandle.recycle()

    // Recycler.DefaultHandle.java
    
    public void recycle(Object object) {
        if (object != value) {
            throw new IllegalArgumentException("object does not belong to handle");
        }
    
        Stack<?> stack = this.stack;
        if (lastRecycledId != recycleId || stack == null) {
            throw new IllegalStateException("recycled already");
        }
        stack.push(this);
    }

在回收对象时,会向 Stack 中 push 对象,push 会分为 当前线程回收其它线程回收 两种情况,分别对应pushNowpushLater两个方法:

    // Recycler.DefaultHandle.java
    
    void push(DefaultHandle<?> item) {
        Thread currentThread = Thread.currentThread();
        if (threadRef.get() == currentThread) {
            // 当前线程回收
            pushNow(item);
        } else {
            // 其它线程回收
            pushLater(item, currentThread);
        }
    }

当前线程回收

当前线程回收对象的逻辑非常简单,就是直接向 Stack 的 elements 数组中添加数据,对象会被存放在栈顶指针指向的位置。如果超过了 Stack 的最大容量,那么对象会被直接丢弃,这里使用了dropHandle方法控制对象的回收速率,每 8 个对象会有一个被回收到 Stack 中:

    // Recycler.DefaultHandle.java
    
    private void pushNow(DefaultHandle<?> item) {
        // 防止被多次回收
        if (item.recycleId != 0 || !item.compareAndSetLastRecycledId(0, OWN_THREAD_ID)) {
            throw new IllegalStateException("recycled already");
        }
        item.recycleId = OWN_THREAD_ID;
    
        int size = this.size;
        // 1. 超出最大容量 2. 控制回收速率
        if (size >= maxCapacity || dropHandle(item)) {
            return;
        }
        if (size == elements.length) {
            elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
        }
    
        elements[size] = item;
        this.size = size + 1;
    }

其它线程回收

其它线程回收时,首先通过 FastThreadLocal 取出当前对象的DELAYED_RECYCLED缓存:DELAYED_RECYCLED 存放着当前线程帮助其它线程回收对象的映射关系。

举个例子,假如 item 是 ThreadA 分配的对象,当前线程是 ThreadB,此时 ThreadB 帮助 ThreadA 回收 item,那么 DELAYED_RECYCLED 放入的 key 是 StackA,然后从 delayedRecycled 中取出 StackA 对应的 WeakOrderQueue,如果 WeakOrderQueue 不存在,那么为 StackA 新创建一个 WeakOrderQueue,并将其加入DELAYED_RECYCLED缓存。

WeakOrderQueue.allocate() 会检查帮助 StackA 回收的对象总数是否超过 2K 个,如果没有超过 2K,会将 StackA 的 head 指针指向新创建的 WeakOrderQueue,否则不再为 StackA 回收对象。

当然 ThreadB 不会只帮助 ThreadA 回收对象,它可以帮助其它多个线程回收,所以 DELAYED_RECYCLED 使用的 Map 结构,为了防止 DELAYED_RECYCLED 内存膨胀,Netty 也采取了保护措施,从 delayedRecycled.size() >= maxDelayedQueues 可以看出,每个线程最多帮助 2 倍 CPU 核数的线程回收线程,如果超过了该阈值,假设当前对象绑定的为 StackX,那么将在 Map 中为 StackX 放入一种特殊的 WeakOrderQueue.DUMMY,表示当前线程无法帮助 StackX 回收对象。

    // Recycler.DefaultHandle.java
    
    private void pushLater(DefaultHandle<?> item, Thread thread) {
        if (maxDelayedQueues == 0) {
            return;
        }
        // 当前线程帮助其它线程回收对象的缓存映射
        Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
        // 取出 Stack 对应的 WeakOrderQueue
        WeakOrderQueue queue = delayedRecycled.get(this);
        if (queue == null) {
            // 最多帮助2 * CPU核数的线程回收线程
            if (delayedRecycled.size() >= maxDelayedQueues) {
                // 存一个Dummy节点,表示当前线程无法再帮助该Stack回收对象
                delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                return;
            }
            // 新建WeakOrderQueue
            if ((queue = newWeakOrderQueue(thread)) == null) {
                return;
            }
            delayedRecycled.put(this, queue);
        } else if (queue == WeakOrderQueue.DUMMY) {
            return;
        }
        // 添加对象到WeakOrderQueue的Link链表中
        queue.add(item);
    }

至此,Recycler如何获取和回收对象的底层原理就全部分析完了,Recycler回收对象时向WeakOrderQueue中存放对象,从Recycler获取对象时,WeakOrderQueue中的对象会作为Stack的储备,而且有效地解决了跨线程回收的问题。

三、总结

Netty中大量运用了Recycler。 例如,我们在使用 PooledDirectByteBuf 时,并不是每次都去创建新的对象,而是从对象池中获取预先分配好的对象实例,不再使用 PooledDirectByteBuf 时,会被回收归还到对象池中。

最后,对Recycler对象池进行一个总结:

  • 对象池有两个重要的组成部分:Stack 和 WeakOrderQueue;
  • Recycler 获取对象时,优先从 Stack 中查找,如果 Stack 没有可用对象,会尝试从 WeakOrderQueue 迁移部分对象到 Stack 中;
  • Recycler 回收对象时,分为当前线程对象回收和其它线程对象回收两种情况,当前线程回收直接向 Stack 中添加对象,其它线程回收向 WeakOrderQueue 中的 Link 添加对象;
  • 对象回收时会控制回收速率,每8个对象会回收一个,其他的全部丢弃。
阅读全文
  • 点赞