Recycler
首先他是个抽象类,有个抽象方法,创建对象,也就是说在对象池没有对象的时候得能创建对象:
protected abstract T newObject(Handle<T> handle);
他与对象池配合使用,比如ObjectPool
中的RecyclerObjectPool
:
private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
private final Recycler<T> recycler;//回收器
RecyclerObjectPool(final ObjectCreator<T> creator) {
recycler = new Recycler<T>() {
@Override
protected T newObject(Handle<T> handle) {
return creator.newObject(handle);
}
};
}
@Override
public T get() {
return recycler.get();
}
}
可见他将具体如果创建对象交给了ObjectCreator
接口:
public interface ObjectCreator<T> {
T newObject(Handle<T> handle);
}
并且封装了一个对象池的静态方法,只要传入创建器即可:
public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
}
比如我们的PooledHeapByteBuf
的对象池,只要返回相应的对象就好:
private static final ObjectPool<PooledHeapByteBuf> RECYCLER = ObjectPool.newPool(
new ObjectCreator<PooledHeapByteBuf>() {
@Override
public PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
});
一些配置属性
基本都已经注释,忘记了可以来看,后面讲到的时候会提起,当然这些参数在静态代码块里可以通过参数设置的方式修改。
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);//id生成器
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();//获取所属线程的id
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;//每个线程本地变量Stack最大容量,默认4096
private static final int INITIAL_CAPACITY;//Stack初始化容量,默认256
private static final int MAX_SHARED_CAPACITY_FACTOR;//最大共享容量因子,影响WeakOrderQueue的容量,默认2
private static final int MAX_DELAYED_QUEUES_PER_THREAD;//每个线程本地变量WeakHashMap的最大键值对个数,默认CPU核心数x2
private static final int LINK_CAPACITY;//链接中的数组容量,默认16
private static final int RATIO;//回收间隔,默认8
简单例子入手
我们直接从获取和回收开始讲好了,这样比较有针对性。我们以一个简单的例子入手:
public class RecycleTest {
//创建回收器
private static final Recycler<MyBuff> RECYCLER = new Recycler<MyBuff>() {
@Override
protected MyBuff newObject(Handle<MyBuff> handle) {
return new MyBuff(handle);
}
};
private static class MyBuff {
private final Recycler.Handle<MyBuff> handle;
public MyBuff(Recycler.Handle<MyBuff> handle) {
this.handle = handle;
}
public void recycle() {
handle.recycle(this);
}
}
public static void main(String[] args) {
MyBuff myBuff = RECYCLER.get();
myBuff.recycle();
}
}
这个够简单了,创建一个回收器RECYCLER
,一个测试类MyBuff
,然后就是获取和回收。
Recycler初始化
DELAYED_RECYCLED
先看一些重要的静态变量的初始化,DELAYED_RECYCLED
是一个线程本地变量,里面存放的是一个map
,具体类型是WeakHashMap<Stack<?>, WeakOrderQueue>
,map
里面放着键值对,每个回收器回收了其他线程创建的对象时,就会放入对象所对应的Stack
在WeakHashMap
中的WeakOrderQueue
里,这个上一篇讲过,就不多讲了。
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
//默认初始化
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();//弱键回收,键如果只有弱引用,可以被GC回收,然后将整个键值对回收
}
};
threadLocal
另外一个线程本地变量就是放Stack
的,有初始化,也有安全删除的方法:
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
interval, maxDelayedQueuesPerThread);
}
//安全删除Stack键值对
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
Recycler
的构造函数就不多讲了,就是属性赋值。
Stack构造方法
先说下这个构造方法,后面一些参数会有用到,其实就是一些参数的设置。
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int interval, int maxDelayedQueues) {
this.parent = parent;//回收器
threadRef = new WeakReference<Thread>(thread);//所属线程的弱引用
this.maxCapacity = maxCapacity;//最大容量,默认4096
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));//共享容量,也就是其他线程中的WeakOrderQueue中的最大容量的总和 2048
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];//存放对象的数组,默认256大小
this.interval = interval;//回收间隔
handleRecycleCount = interval; // 间隔计数器,第一个会被回收
this.maxDelayedQueues = maxDelayedQueues;//关联的WeakOrderQueue最大个数,默认16
}
处理器回收recycle(this)
我们先讲回收呢,因为获取里面会涉及到回收后的一些知识,不讲回收理解不了的。
@Override
public void recycle(Object object) {
...
Stack<?> stack = this.stack;
...
stack.push(this);//入栈
}
stack.push
这个方法就分两种情况了:
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {//属于栈的线程,直接入栈
pushNow(item);
} else {//不属于栈的线程或者属于栈的但是被回收得到线程,需要后面入栈,先放进WeakOrderQueue
pushLater(item, currentThread);
}
}
pushNow当前线程是Stack的所属线程
如果当前线程是属于Stack的所属线程,就调用这个方法,直接将对象放入elements
的数组中。
这个过程还是比较好理解的,首先判断是否回收过,然后记录回收信息,判断回收的数量有没超过限制,或者是不是丢弃,根据回收间隔。然后看elements
数组是否需要扩容,每次扩容到两倍,但是不超过最大容量默认4096
。最后把对象放入指定索引的位置。
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {//尝试过回收
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;//记录定义的线程ID
int size = this.size;//已有对象数量
if (size >= maxCapacity || dropHandle(item)) {
return;
}
if (size == elements.length) {//要扩容了 每次x2 直到maxCapacity
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;//放入数组中
this.size = size + 1;//个数+1
}
dropHandle
这个就是间隔回收,两次回收之间隔8
个对象。
boolean dropHandle(DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) {//没被回收过
if (handleRecycleCount < interval) {//回收次数小于回收阈值
handleRecycleCount++;//回收次数+1
// Drop the object.
return true;//丢弃
}
handleRecycleCount = 0;//清零
handle.hasBeenRecycled = true;//被回收了
}
return false;
}
这里容易无解,其实应该是除了第一个直接被回收外,后面每9
个回收1
个。图示根据回收过来的序号排序从0
开始,绿色表示能被回收,红色表示被丢弃:
pushLater当前线程不是Stack的所属线程
这种情况下就是另一个线程来回收,看源码吧。
private void pushLater(DefaultHandle<?> item, Thread thread) {
...
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();//每个线程都会有自己的map
WeakOrderQueue queue = delayedRecycled.get(this);//获取对应的WeakOrderQueue
if (queue == null) {//不存在尝试创建一个放入map
if (delayedRecycled.size() >= maxDelayedQueues) {//数量大于阈值 放一个假WeakOrderQueue,丢弃对象
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
if ((queue = newWeakOrderQueue(thread)) == null) {//创建一个队列,如果要分配的容量(16)不够的话就丢弃对象
// drop object
return;
}
delayedRecycled.put(this, queue);//放入map里
} else if (queue == WeakOrderQueue.DUMMY) {//如果是假的,就丢弃
// drop object
return;
}
queue.add(item);//放入WeakOrderQueue
}
static final WeakOrderQueue DUMMY = new WeakOrderQueue();
首先我们会获取线程本地变量WeakHashMap<Stack<?>, WeakOrderQueue>
,然后根据Stack
获取WeakOrderQueue
。
- 如果获取不到,说明还没有这个
Stack
关联的WeakOrderQueue
被创建。尝试创建,但是如果WeakHashMap
键值对数量超过限制了,就放一个假的WeakOrderQueue
,其实就是一个空的队列,DUMMY
。否则的话就尝试创建一个,如果还有分配的容量的话,就创建,并和Stack
一起放入WeakHashMap
中,不行的话就丢弃对象。 - 如果获取的是
DUMMY
的话,说明WeakHashMap
放满了,就丢弃。 - 如果获取到了且不是
DUMMY
就尝试放队列里。
newWeakOrderQueue创建队列
看看他是如果创建队列的。
private WeakOrderQueue newWeakOrderQueue(Thread thread) {
return WeakOrderQueue.newQueue(this, thread);
}
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
// 是否可分配链接
if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
return null;//分配失败
}
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
stack.setHead(queue);//头插法,新的队列插到头部
return queue;
}
Head链接管理者
首先先介绍下Head
类,他管理着里面所有的链接Link
的创建和回收。内部还有一个Link
的连接,其实是单链表的表头,所有的Link
都会被串起来,还有一个容量availableSharedCapacity
,后续的分配和回收都会用到。
Link具体存对象
本身就是原子对象,可以计数,这个在后面放入对象的时候会用到。这个里面其实就是一个数组,用来存对象,默认容量是16
,还有一个next
指向下一个,至于readIndex
就是获取对象的时候用,这个跟netty
自定义的ByteBuf
的读索引类似,表示下一个可获取对象的索引。
基本就是这样的结构:
Head.reserveSpaceForLink为Link申请空间
传进来的参数是stack.availableSharedCapacity
也就是2048
,说明可以申请的容量是跟这个参数相关的,最多2048
个。也就是说**每个Stack
在其他线程中的回收对象最多是2048
个。**每次分配16个
,如果容量小于16
个了,就不分配了,因此可能导致WeakOrderQueue
创建失败,丢弃对象。
static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
for (;;) {
int available = availableSharedCapacity.get();
if (available < LINK_CAPACITY) {//可分配容量小于16 分配失败
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
return true; //分配成功
}
}
}
WeakOrderQueue构造方法
创建一个链接Link
,然后给创建一个Head
,并传入availableSharedCapacity
引用,根据这个availableSharedCapacity
来进行后续Link
的分配和回收的。然后还有个队尾的引用,同时也存在回收间隔,跟Stack
一样,默认是8
。
private WeakOrderQueue(Stack<?> stack, Thread thread) {
super(thread);
tail = new Link();//创建链接,分配LINK_CAPACITY个DefaultHandle类型的数组
head = new Head(stack.availableSharedCapacity);
head.link = tail;
interval = stack.interval;
handleRecycleCount = interval; // Start at interval so the first one will be recycled.
}
stack.setHead(queue) 设置头结点
因为需要跟Stack
有关联,所以会跟Stack
的head
结点形成一个单链表,头插法,而且这里用方法同步,主要是多线程可能同时回收,所以需要同步。
queue.add加入队列
这个也是间隔回收的,从队尾的Link
开始,看是否满了,如果满了就重新创建一个Link
加入链表,然后在elements
对应索引位置放入对象,Link
本身就是AtomicInteger
,可以进行索引的改变。
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;//记录上次回收的线程id
if (handleRecycleCount < interval) {//回收次数小于间隔,就丢弃对象,为了不让队列增长过快
handleRecycleCount++;
return;
}
handleRecycleCount = 0;
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {//如果超过链接容量限制了
Link link = head.newLink();//创建新的链接,如果创建不成功,就返回null,丢弃对象
if (link == null) {
// Drop it.
return;
}
this.tail = tail = tail.next = link;//加入链表
writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;//放入对象
handle.stack = null;//放进queue里就没有栈了
tail.lazySet(writeIndex + 1);//不需要立即可见,这里都是单线程操作
}
head.newLink创建链接
其实就是前面讲过的申请空间,创建Link
。如果成功就创建一个链接返回,否则就返回null
。
Link newLink() {
return reserveSpaceForLink(availableSharedCapacity) ? new Link() : null;
}
再回忆下这张图:
至此,回收已经讲完了,后面我们将获取。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。