2023-09-13  阅读(213)
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104381851

PoolThreadLocalCache

要讲PoolThreadCache,得先知道PoolThreadLocalCache是什么,其实他就是FastThreadLocal的子类,里面存的类型才是PoolThreadCache

202309132204049541.png

initialValue

initialValue是获取初始化值的,也就是创建PoolThreadCache并返回。这里主要是useCacheForAllThreads 标签,是否是所有线程都可以用缓存,还是只能FastThreadLocalThread用缓存。当然如果有调度参数,可能还要启动调度任务,这个不是重点,暂时不说。

     @Override
            protected synchronized PoolThreadCache initialValue() {
                final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);//获取使用率最少的一个
                final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
    
                final Thread current = Thread.currentThread();
                if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {//使用了useCacheForAllThreads或者是FastThreadLocalThread线程
                    final PoolThreadCache cache = new PoolThreadCache(
                            heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                            DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
    
                    if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {//有间隔的调度任务
                        final EventExecutor executor = ThreadExecutorMap.currentExecutor();
                        if (executor != null) {
                            executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
                                    DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                        }
                    }
                    return cache;
                }
                // No caching so just use 0 as sizes.否则没有缓存
                return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
            }

leastUsedArena

获取线程使用数量最少的PoolArena,这样才能实现线程对内存的负载均衡,否则一个内存被多个线程用,其他的内存没有线程用,那使用效率就低了,还会引起严重的线程竞争问题。

     private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
                if (arenas == null || arenas.length == 0) {
                    return null;
                }
    
                PoolArena<T> minArena = arenas[0];
                for (int i = 1; i < arenas.length; i++) {
                    PoolArena<T> arena = arenas[i];
                    if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                        minArena = arena;
                    }
                }
    
                return minArena;
            }

PoolThreadCache缓存什么

因为缓存的类型就是这个,只是里面放着各种缓存数据。

里面缓存这一堆MemoryRegionCache类型的数组。

202309132204055742.png

MemoryRegionCache

这个才是缓存包装,看看里面有什么:

202309132204062803.png
里面有缓存的类型:

202309132204071304.png
有能缓存的数量size,分别对应三个类型是512,256,64,其实是从这里来的:

202309132204079575.png
还有一个allocations表示缓存已经分配出去了多少个,这个在后面释放的时候会用到。
最后一个就是queue队列了,他是一个MpscArrayQueue多生产者单消费者的队列,也就是说可以多个线程放入数据,只有一个线程可以取数据。里面存的是Entry类型,这个有点让我想起了写操作的时候,也是将数据封装成Entry类型,我们来看看。

Entry

202309132204087466.png
一个回收处理器recyclerHandle,一个块信息,一个句柄,一个nioBuffer(一般都是null)。可见主要还是存哪个块的哪部分内存被缓存了。handle里就描述了很多信息,可以获得块内的偏移地址,子页偏移地址,子页等等。所以我们现在知道了, 缓存就是缓存块内对应的内存信息

PoolThreadCache构造函数

前面说到PoolThreadLocalCacheinitialValue创建了PoolThreadCache,那说明时候调用的呢,在创建缓冲区的方法中尝试从PoolThreadLocalCache获取PoolThreadCache

202309132204097847.png
里面就是FastThreadLocalget方法,前几篇讲过,就不多说了,如果没获取到就会调用initialValue创建一个默认的:

202309132204108128.png

202309132204116249.png
然后我们看构造函数,其实没什么特别的,就是一些缓存数组的初始化,和属性设置,看注释应该就可以懂:

     PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                        int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                        int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
            checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
            this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
            this.heapArena = heapArena;
            this.directArena = directArena;
            if (directArena != null) {//直接缓冲区的缓存
                tinySubPageDirectCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
                smallSubPageDirectCaches = createSubPageCaches(
                        smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
    
                numShiftsNormalDirect = log2(directArena.pageSize);//获取2的次数
                normalDirectCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, directArena);
    
                directArena.numThreadCaches.getAndIncrement();//directArena线程缓存数+1
            } else {//没有就设置默认值
                // No directArea is configured so just null out all caches
                tinySubPageDirectCaches = null;
                smallSubPageDirectCaches = null;
                normalDirectCaches = null;
                numShiftsNormalDirect = -1;
            }
            if (heapArena != null) {//堆内缓冲区
                // Create the caches for the heap allocations
                tinySubPageHeapCaches = createSubPageCaches(
                        tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
                smallSubPageHeapCaches = createSubPageCaches(
                        smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
    
                numShiftsNormalHeap = log2(heapArena.pageSize);
                normalHeapCaches = createNormalCaches(
                        normalCacheSize, maxCachedBufferCapacity, heapArena);
    
                heapArena.numThreadCaches.getAndIncrement();
            } else {
                // No heapArea is configured so just null out all caches
                tinySubPageHeapCaches = null;
                smallSubPageHeapCaches = null;
                normalHeapCaches = null;
                numShiftsNormalHeap = -1;
            }
    
            // Only check if there are caches in use.
            if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                    || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                    && freeSweepAllocationThreshold < 1) {
                throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                        + freeSweepAllocationThreshold + " (expected: > 0)");
            }
        }

createSubPageCaches

这个就是创建子页类型的缓存数组,tiny类型默认是32SubPageMemoryRegionCache,每个内部可以缓存512Entrysmall类型默认是4SubPageMemoryRegionCache,每个内部可以缓存256Entry。数组个数就是PoolArena中的,内存大小也是一一对应的。

      private static <T> MemoryRegionCache<T>[] createSubPageCaches(
                int cacheSize, int numCaches, SizeClass sizeClass) {
            if (cacheSize > 0 && numCaches > 0) {
                @SuppressWarnings("unchecked")
                MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
                for (int i = 0; i < cache.length; i++) {
                    // TODO: maybe use cacheSize / cache.length
                    cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
                }
                return cache;
            } else {
                return null;
            }

createNormalCaches

normal类型进行了控制,最多只能存maxCachedBufferCapacity 默认是32K的大小,也就是说可以存8K,16K,32K3个档次,所以数组个数是3

     private static <T> MemoryRegionCache<T>[] createNormalCaches(
                int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
            if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
                int max = Math.min(area.chunkSize, maxCachedBufferCapacity);//默认最多缓存maxCachedBufferCapacity=32K 不然太大了
                int arraySize = Math.max(1, log2(max / area.pageSize) + 1);//默认是3
    
                @SuppressWarnings("unchecked")
                MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
                for (int i = 0; i < cache.length; i++) {
                    cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
                }
                return cache;
            } else {
                return null;
            }
        }

PoolThreadCache分配内存allocate

前面讲了PoolThreadCache创建好了,解析来看看怎么用。在PoolArenaallocate方法理有3处:
首先是tinysmall类型的。

2023091322041249110.png
然后是normal类型的:

2023091322041362411.png
我们进去看看,其实内部逻辑都差不多:

2023091322041500812.png

cacheForTiny

其中cacheForTiny,cacheForSmall,cacheForNormal也类似,我就拿cacheForTiny来说,其实就是获取对应类型的索引,看数组中是否存在该缓存,如果存在,就返回,否则就是null

        private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
            int idx = PoolArena.tinyIdx(normCapacity);
            if (area.isDirect()) {
                return cache(tinySubPageDirectCaches, idx);
            }
            return cache(tinySubPageHeapCaches, idx);
        }
    
    
        private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
            if (cache == null || idx > cache.length - 1) {
                return null;
            }
            return cache[idx];
        }

allocate

然后就是尝试让缓存来分配,如果有分配过,无论成功失败,都会使得allocations增加,如果分配的数量超过阈值后,就会清0,并且对缓存进行清除trim,估计是避免长时间缓存着又没用到,等于说是内存泄露了,trim后面讲,里面涉及东西比较多:

       @SuppressWarnings({ "unchecked", "rawtypes" })
        private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
            if (cache == null) {
                // no cache found so just return false here
                return false;
            }
            boolean allocated = cache.allocate(buf, reqCapacity);
            if (++ allocations >= freeSweepAllocationThreshold) {//已分配次数是否大于清除次数阈值
                allocations = 0;//分配次数清0
                trim();
            }
            return allocated;
        }

MemoryRegionCache的allocate

关键就是看缓存怎么分配啦,其实就是从队列queue里取出Entry实体,然后进行initBuf初始化。

      public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
                Entry<T> entry = queue.poll();//取出实体
                if (entry == null) {
                    return false;
                }
                initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
                entry.recycle();//回收实体
    
                // allocations is not thread-safe which is fine as this is only called from the same thread all time.
                ++ allocations;//已分配出去的+1
                return true;
            }

initBuf

这个初始化是关键,其实他是子类实现的,我们看看,一种是SubPageMemoryRegionCache的,对应子页tinysmall类型:

2023091322041582713.png
一种是NormalMemoryRegionCache的,对应normal类型:

2023091322041665314.png

这个跟我们正常内存分配流程最后的一样,子页的是调用PoolChunkinitBufWithSubpagenormal类型是initBuf

2023091322041762615.png
当然最开始的时候队列里面没有实体,就返回false啦,然后走正常的内存分配流程。

PoolThreadCache尝试缓存内存

这个是在缓冲区调用了release的时候会尝试缓存:

2023091322041934116.png
如果能回收缓冲区的话,最终调用deallocate

2023091322042029017.png

PooledByteBuf的deallocate

其实就是释放内存资源,属性重新设置回默认,自己也回收到对象池里。

        @Override
        protected final void deallocate() {
            if (handle >= 0) {
                final long handle = this.handle;
                this.handle = -1;
                memory = null;
                chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
                tmpNioBuf = null;
                chunk = null;
                recycle();//放进池里面
            }
        }

后面就会用到PoolArenafree方法啦,里面也比较复杂,所以下一篇再讲吧。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文
  • 点赞