2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104443352

Recycler

首先他是个抽象类,有个抽象方法,创建对象,也就是说在对象池没有对象的时候得能创建对象:

    protected abstract T newObject(Handle<T> handle);

他与对象池配合使用,比如ObjectPool中的RecyclerObjectPool

    private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
            private final Recycler<T> recycler;//回收器
    
            RecyclerObjectPool(final ObjectCreator<T> creator) {
                 recycler = new Recycler<T>() {
                    @Override
                    protected T newObject(Handle<T> handle) {
                        return creator.newObject(handle);
                    }
                };
            }
    
            @Override
            public T get() {
                return recycler.get();
            }
        }

可见他将具体如果创建对象交给了ObjectCreator接口:

        public interface ObjectCreator<T> {
    
            T newObject(Handle<T> handle);
        }

并且封装了一个对象池的静态方法,只要传入创建器即可:

        public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
            return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
        }

比如我们的PooledHeapByteBuf的对象池,只要返回相应的对象就好:

     private static final ObjectPool<PooledHeapByteBuf> RECYCLER = ObjectPool.newPool(
                new ObjectCreator<PooledHeapByteBuf>() {
            @Override
            public PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
                return new PooledHeapByteBuf(handle, 0);
            }
        });

一些配置属性

基本都已经注释,忘记了可以来看,后面讲到的时候会提起,当然这些参数在静态代码块里可以通过参数设置的方式修改。

      private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);//id生成器
        private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();//获取所属线程的id
        private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
        private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;//每个线程本地变量Stack最大容量,默认4096
        private static final int INITIAL_CAPACITY;//Stack初始化容量,默认256
        private static final int MAX_SHARED_CAPACITY_FACTOR;//最大共享容量因子,影响WeakOrderQueue的容量,默认2
        private static final int MAX_DELAYED_QUEUES_PER_THREAD;//每个线程本地变量WeakHashMap的最大键值对个数,默认CPU核心数x2
        private static final int LINK_CAPACITY;//链接中的数组容量,默认16
        private static final int RATIO;//回收间隔,默认8

简单例子入手

我们直接从获取和回收开始讲好了,这样比较有针对性。我们以一个简单的例子入手:

    public class RecycleTest {
    //创建回收器
        private static final Recycler<MyBuff> RECYCLER = new Recycler<MyBuff>() {
            @Override
            protected MyBuff newObject(Handle<MyBuff> handle) {
                return new MyBuff(handle);
            }
        };
    
        private static class MyBuff {
            private final Recycler.Handle<MyBuff> handle;
    
            public MyBuff(Recycler.Handle<MyBuff> handle) {
                this.handle = handle;
            }
    
            public void recycle() {
                handle.recycle(this);
            }
        }
    
    
        public static void main(String[] args)  {
            MyBuff myBuff = RECYCLER.get();
            myBuff.recycle();
        }
    }

这个够简单了,创建一个回收器RECYCLER,一个测试类MyBuff ,然后就是获取和回收。

Recycler初始化

DELAYED_RECYCLED

先看一些重要的静态变量的初始化,DELAYED_RECYCLED 是一个线程本地变量,里面存放的是一个map,具体类型是WeakHashMap<Stack<?>, WeakOrderQueue>map里面放着键值对,每个回收器回收了其他线程创建的对象时,就会放入对象所对应的StackWeakHashMap中的WeakOrderQueue里,这个上一篇讲过,就不多讲了。

        private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
                new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
            //默认初始化
            @Override
            protected Map<Stack<?>, WeakOrderQueue> initialValue() {
                return new WeakHashMap<Stack<?>, WeakOrderQueue>();//弱键回收,键如果只有弱引用,可以被GC回收,然后将整个键值对回收
            }
        };

threadLocal

另外一个线程本地变量就是放Stack的,有初始化,也有安全删除的方法:

      private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
            @Override
            protected Stack<T> initialValue() {
                return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
                        interval, maxDelayedQueuesPerThread);
            }
            //安全删除Stack键值对
            @Override
            protected void onRemoval(Stack<T> value) {
                // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
                if (value.threadRef.get() == Thread.currentThread()) {
                   if (DELAYED_RECYCLED.isSet()) {
                       DELAYED_RECYCLED.get().remove(value);
                   }
                }
            }
        };

Recycler的构造函数就不多讲了,就是属性赋值。

202309132205183761.png

Stack构造方法

先说下这个构造方法,后面一些参数会有用到,其实就是一些参数的设置。

            Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
                  int interval, int maxDelayedQueues) {
                this.parent = parent;//回收器
                threadRef = new WeakReference<Thread>(thread);//所属线程的弱引用
                this.maxCapacity = maxCapacity;//最大容量,默认4096
                availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));//共享容量,也就是其他线程中的WeakOrderQueue中的最大容量的总和 2048
                elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];//存放对象的数组,默认256大小
                this.interval = interval;//回收间隔
                handleRecycleCount = interval; // 间隔计数器,第一个会被回收
                this.maxDelayedQueues = maxDelayedQueues;//关联的WeakOrderQueue最大个数,默认16
            }

处理器回收recycle(this)

我们先讲回收呢,因为获取里面会涉及到回收后的一些知识,不讲回收理解不了的。

      @Override
            public void recycle(Object object) {
    			...
                Stack<?> stack = this.stack;
    			...
                stack.push(this);//入栈
            }

stack.push

这个方法就分两种情况了:

    void push(DefaultHandle<?> item) {
                Thread currentThread = Thread.currentThread();
                if (threadRef.get() == currentThread) {//属于栈的线程,直接入栈
                    pushNow(item);
                } else {//不属于栈的线程或者属于栈的但是被回收得到线程,需要后面入栈,先放进WeakOrderQueue
                    pushLater(item, currentThread);
                }
            }

pushNow当前线程是Stack的所属线程

如果当前线程是属于Stack的所属线程,就调用这个方法,直接将对象放入elements的数组中。

这个过程还是比较好理解的,首先判断是否回收过,然后记录回收信息,判断回收的数量有没超过限制,或者是不是丢弃,根据回收间隔。然后看elements数组是否需要扩容,每次扩容到两倍,但是不超过最大容量默认4096。最后把对象放入指定索引的位置。

           private void pushNow(DefaultHandle<?> item) {
                if ((item.recycleId | item.lastRecycledId) != 0) {//尝试过回收
                    throw new IllegalStateException("recycled already");
                }
                item.recycleId = item.lastRecycledId = OWN_THREAD_ID;//记录定义的线程ID
    
                int size = this.size;//已有对象数量
                if (size >= maxCapacity || dropHandle(item)) {
                    return;
                }
                if (size == elements.length) {//要扩容了 每次x2 直到maxCapacity
                    elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
                }
    
                elements[size] = item;//放入数组中
                this.size = size + 1;//个数+1
            }
dropHandle

这个就是间隔回收,两次回收之间隔8个对象。

      boolean dropHandle(DefaultHandle<?> handle) {
                if (!handle.hasBeenRecycled) {//没被回收过
                    if (handleRecycleCount < interval) {//回收次数小于回收阈值
                        handleRecycleCount++;//回收次数+1
                        // Drop the object.
                        return true;//丢弃
                    }
                    handleRecycleCount = 0;//清零
                    handle.hasBeenRecycled = true;//被回收了
                }
                return false;
            }

这里容易无解,其实应该是除了第一个直接被回收外,后面每9个回收1个。图示根据回收过来的序号排序从0开始,绿色表示能被回收,红色表示被丢弃:

202309132205193652.png

pushLater当前线程不是Stack的所属线程

202309132205200183.png
这种情况下就是另一个线程来回收,看源码吧。

      private void pushLater(DefaultHandle<?> item, Thread thread) {
    			...
                Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();//每个线程都会有自己的map
                WeakOrderQueue queue = delayedRecycled.get(this);//获取对应的WeakOrderQueue
                if (queue == null) {//不存在尝试创建一个放入map
                    if (delayedRecycled.size() >= maxDelayedQueues) {//数量大于阈值 放一个假WeakOrderQueue,丢弃对象
                        delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                        return;
                    }
    
                    if ((queue = newWeakOrderQueue(thread)) == null) {//创建一个队列,如果要分配的容量(16)不够的话就丢弃对象
                        // drop object
                        return;
                    }
                    delayedRecycled.put(this, queue);//放入map里
                } else if (queue == WeakOrderQueue.DUMMY) {//如果是假的,就丢弃
                    // drop object
                    return;
                }
    
                queue.add(item);//放入WeakOrderQueue
            }
            
    		static final WeakOrderQueue DUMMY = new WeakOrderQueue();

首先我们会获取线程本地变量WeakHashMap<Stack<?>, WeakOrderQueue>,然后根据Stack获取WeakOrderQueue

  • 如果获取不到,说明还没有这个Stack关联的WeakOrderQueue被创建。尝试创建,但是如果WeakHashMap键值对数量超过限制了,就放一个假的WeakOrderQueue,其实就是一个空的队列,DUMMY。否则的话就尝试创建一个,如果还有分配的容量的话,就创建,并和Stack一起放入WeakHashMap中,不行的话就丢弃对象。
  • 如果获取的是DUMMY 的话,说明WeakHashMap放满了,就丢弃。
  • 如果获取到了且不是DUMMY就尝试放队列里。
newWeakOrderQueue创建队列

看看他是如果创建队列的。

            private WeakOrderQueue newWeakOrderQueue(Thread thread) {
                return WeakOrderQueue.newQueue(this, thread);
            }
    		static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
                //  是否可分配链接
                if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
                    return null;//分配失败
                }
                final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
    
                stack.setHead(queue);//头插法,新的队列插到头部
    
                return queue;
            }
Head链接管理者

首先先介绍下Head类,他管理着里面所有的链接Link的创建和回收。内部还有一个Link的连接,其实是单链表的表头,所有的Link都会被串起来,还有一个容量availableSharedCapacity,后续的分配和回收都会用到。

202309132205205244.png

Link具体存对象

本身就是原子对象,可以计数,这个在后面放入对象的时候会用到。这个里面其实就是一个数组,用来存对象,默认容量是16,还有一个next指向下一个,至于readIndex就是获取对象的时候用,这个跟netty自定义的ByteBuf的读索引类似,表示下一个可获取对象的索引。

202309132205214965.png
基本就是这样的结构:

202309132205222946.png

Head.reserveSpaceForLink为Link申请空间

传进来的参数是stack.availableSharedCapacity也就是2048,说明可以申请的容量是跟这个参数相关的,最多2048个。也就是说**每个Stack在其他线程中的回收对象最多是2048个。**每次分配16,如果容量小于16个了,就不分配了,因此可能导致WeakOrderQueue创建失败,丢弃对象。

                static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
                    for (;;) {
                        int available = availableSharedCapacity.get();
                        if (available < LINK_CAPACITY) {//可分配容量小于16 分配失败
                            return false;
                        }
                        if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
                            return true; //分配成功
                        }
                    }
                }
WeakOrderQueue构造方法

创建一个链接Link,然后给创建一个Head,并传入availableSharedCapacity引用,根据这个availableSharedCapacity来进行后续Link的分配和回收的。然后还有个队尾的引用,同时也存在回收间隔,跟Stack一样,默认是8

      private WeakOrderQueue(Stack<?> stack, Thread thread) {
                super(thread);
                tail = new Link();//创建链接,分配LINK_CAPACITY个DefaultHandle类型的数组
    
                head = new Head(stack.availableSharedCapacity);
                head.link = tail;
                interval = stack.interval;
                handleRecycleCount = interval; // Start at interval so the first one will be recycled.
            }
stack.setHead(queue) 设置头结点

因为需要跟Stack有关联,所以会跟Stackhead结点形成一个单链表,头插法,而且这里用方法同步,主要是多线程可能同时回收,所以需要同步。

202309132205235067.png

queue.add加入队列

这个也是间隔回收的,从队尾的Link 开始,看是否满了,如果满了就重新创建一个Link加入链表,然后在elements对应索引位置放入对象,Link本身就是AtomicInteger,可以进行索引的改变。

     void add(DefaultHandle<?> handle) {
                handle.lastRecycledId = id;//记录上次回收的线程id
    
                if (handleRecycleCount < interval) {//回收次数小于间隔,就丢弃对象,为了不让队列增长过快
                    handleRecycleCount++;
    
                    return;
                }
                handleRecycleCount = 0;
    
                Link tail = this.tail;
                int writeIndex;
                if ((writeIndex = tail.get()) == LINK_CAPACITY) {//如果超过链接容量限制了
                    Link link = head.newLink();//创建新的链接,如果创建不成功,就返回null,丢弃对象
                    if (link == null) {
                        // Drop it.
                        return;
                    }
                    
                    this.tail = tail = tail.next = link;//加入链表
    
                    writeIndex = tail.get();
                }
                tail.elements[writeIndex] = handle;//放入对象
                handle.stack = null;//放进queue里就没有栈了
    
                tail.lazySet(writeIndex + 1);//不需要立即可见,这里都是单线程操作
            }
head.newLink创建链接

其实就是前面讲过的申请空间,创建Link。如果成功就创建一个链接返回,否则就返回null

    			Link newLink() {
                    return reserveSpaceForLink(availableSharedCapacity) ? new Link() : null;
                }

再回忆下这张图:

202309132205242138.png

至此,回收已经讲完了,后面我们将获取。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文