1. 概述
在 《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(五)PoolArena》 一文中,我们看到 PoolArena 在分配( #allocate(...)
)和释放( #free(...)
)内存的过程中,无可避免会出现 synchronized
的身影。虽然锁的粒度不是很大,但是如果一个 PoolArena 如果被多个线程引用,带来的线程锁的同步和竞争。并且,如果在锁竞争的过程中,申请 Direct ByteBuffer ,那么带来的线程等待就可能是几百毫秒的时间。
那么该如何解决呢?如下图红框所示:
FROM 《jemalloc源码解析-内存管理》
给每个线程引入其独有的 tcache 线程缓存。
- 在释放已分配的内存块时,不放回到 Chunk 中,而是缓存到 tcache 中。
- 在分配内存块时,优先从 tcache 获取。无法获取到,再从 Chunk 中分配。
通过这样的方式,尽可能的避免多线程的同步和竞争。
2. PoolThreadCache
io.netty.buffer.PoolThreadCache
,Netty 对 Jemalloc tcache 的实现类,内存分配的线程缓存。
2.1 构造方法
/** |
- 虽然代码比较多,主要分为 Heap 和 Direct 两种内存。
- Direct 相关
directArena
属性,对应的 Heap PoolArena 对象。tinySubPageDirectCaches
属性,Direct 类型的 tiny Subpage 内存块缓存数组。- 默认情况下,数组大小为 512 。
- 在【第 15 行】的代码,调用
#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)
方法,创建 MemoryRegionCache 数组。详细解析,见 「2.2 createSubPageCaches」 。
smallSubPageDirectCaches
属性,Direct 类型的 small Subpage 内存块缓存数组。- 默认情况下,数组大小为 256 。
- 在【第 17 行】的代码,调用
#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)
方法,创建 MemoryRegionCache 数组。详细解析,见 「2.2 createSubPageCaches」 。
normalDirectCaches
属性,Direct 类型的 normal Page 内存块缓存数组。- 默认情况下,数组大小为 64 。
- 在【第 22 行】的代码,调用
#createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area)
方法,创建 MemoryRegionCache 数组。详细解析,见 「2.3 createNormalCaches」 。 numShiftsNormalDirect
属性,用于计算请求分配的 normal 类型的内存块,在normalDirectCaches
数组中的位置。- 默认情况下,数值为 13 。
- 在【第 20 行】的代码,调用
#log2(int pageSize)
方法,log2(pageSize) = log2(8192) = 13
。
- 在【第 25 行】的代码,增加
directArena
的线程引用计数。通过这样的方式,我们能够知道,一个 PoolArena 对象,被多少线程所引用。
- Heap 相关,和【Direct 相关】基本类似。
allocations
属性,分配次数计数器。每次分配时,该计数器 + 1 。freeSweepAllocationThreshold
属性,当allocations
到达该阀值时,调用#free()
方法,释放缓存。同时,会重置allocations
计数器为 0 。
2.2 createSubPageCaches
#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)
方法,创建 Subpage 内存块缓存数组。代码如下:
// tiny 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_TINY_CACHE_SIZE = 512 , numCaches = PoolArena.numTinySubpagePools = 512 >>> 4 = 32 |
- 创建的 Subpage 内存块缓存数组,实际和
PoolArena.tinySubpagePools
和PoolArena.smallSubpagePools
数组大小保持一致。从而实现,相同大小的内存,能对应相同的数组下标。sizeClass
=tiny
时, 默认cacheSize
=PooledByteBufAllocator.DEFAULT_TINY_CACHE_SIZE = 512
,numCaches
=PoolArena.numTinySubpagePools = 512 >>> 4 = 32
。sizeClass
=small
时,默认cacheSize
=PooledByteBufAllocator.DEFAULT_SMALL_CACHE_SIZE = 256
,numCaches
=pageSize - 9 = 13 - 9 = 4
。
- 创建的数组,每个元素的类型为 SubPageMemoryRegionCache 。详细解析,见 「3.X.1 SubPageMemoryRegionCache」 。
2.3 createNormalCaches
#createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass)
方法,创建 Normal Page 内存块缓存数组。代码如下:
// normal 类型,默认 cacheSize = PooledByteBufAllocator.DEFAULT_NORMAL_CACHE_SIZE = 64 , maxCachedBufferCapacity = PoolArena.DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024 = 32KB |
maxCachedBufferCapacity
属性,缓存的 Normal 内存块的最大容量,避免过大的 Normal 内存块被缓存,占用过多通过。默认情况下,maxCachedBufferCapacity = PoolArena.DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024 = 32KB
。也就说,在<1>
处,arraySize
的计算数组大小的结果为 3 。刚好是cache[0] = 8KB
、cache[1] = 16KB
、cache[2] = 32KB
。那么,如果申请的 Normal 内存块大小为64KB
,超过了数组大小,所以无法被缓存。😈 是不是和原先自己认为的maxCachedBufferCapacity
实现最大容量的想法,有点不同。- 创建的数组,每个元素的类型为 SubPageMemoryRegionCache 。详细解析,见 「3.X.2 NormalMemoryRegionCache」 。
2.4 cache
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { |
三个方法,分别获取内存容量对应所在的 MemoryRegionCache 对象。通过调用
#cache(MemoryRegionCache<T>[] cache, int idx)
方法,代码如下:private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
// 不在范围内,说明不缓存该容量的内存块
if (cache == null || idx > cache.length - 1) {
return null;
}
// 获得 MemoryRegionCache 对象
return cache[idx];
}
当然,考虑到使用便利,封装了 #cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass)
方法,支持获取对应内存类型的 MemoryRegionCache 对象。代码如下:
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) { |
2.5 add
#add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass)
方法,添加内存块到 PoolThreadCache 的指定 MemoryRegionCache 的队列中,进行缓存。并且,返回是否添加成功。代码如下:
/** |
- 代码比较简单,胖友自己看注释。
- 在
PoolArea#free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache)
中,调用该方法。所以,可以结合 《精尽 Netty 源码解析 —— Buffer 之 Jemalloc(五)PoolArena》 的 「2.6 free」 一起看看罗。
2.6 allocate
/** |
三个方法,从缓存中分别获取不同容量大小的内存块,初始化到 PooledByteBuf 对象中。通过调用
#allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity)
方法,代码如下:1: private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
2: if (cache == null) {
3: // no cache found so just return false here
4: return false;
5: }
6: // 分配内存块,并初始化到 MemoryRegionCache 中
7: boolean allocated = cache.allocate(buf, reqCapacity);
8: // 到达阀值,整理缓存
9: if (++ allocations >= freeSweepAllocationThreshold) {
10: allocations = 0;
11: trim();
12: }
13: // 返回是否分配成功
14: return allocated;
15: }- 第 7 行:调用
MemoryRegionCache#allocate(buf, reqCapacity)
方法,从缓存中分配内存块,并初始化到 MemoryRegionCache 中。 - 第 8 至 12 行:增加
allocations
计数。若到达阀值(freeSweepAllocationThreshold
),重置计数,并调用#trim()
方法,整理缓存。详细解析,见 「2.7 trim」 。 - 第 14 行:返回是否分配成功。如果从缓存中分配失败,后续就从 PoolArena 中获取内存块。
- 第 7 行:调用
2.7 free
#trim()
方法,整理缓存,释放使用频度较少的内存块缓存。代码如下:
private static int free(MemoryRegionCache<?> cache) { |
- 会调用所有 MemoryRegionCache 的
#trim()
方法,整理每个内存块缓存。详细解析,见 「3.6 trim」 。
2.8 finalize
#finalize()
方法,对象销毁时,清空缓存等等。代码如下:
/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner. |
- 代码比较简单,胖友自己看。主要是
<1>
、<2>
、<3.1>/<3.2>
三个点。
3. MemoryRegionCache
MemoryRegionCache ,是 PoolThreadCache 的内部静态类,内存块缓存。在其内部,有一个队列,存储缓存的内存块。如下图所示:MemoryRegionCache
3.1 构造方法
private abstract static class MemoryRegionCache<T> { |
sizeClass
属性,内存类型。queue
属性,队列,里面存储内存块。每个元素为 Entry 对象,对应一个内存块。代码如下:static final class Entry<T> {
/**
* Recycler 处理器,用于回收 Entry 对象
*/
final Handle<Entry<?>> recyclerHandle;
/**
* PoolChunk 对象
*/
PoolChunk<T> chunk;
/**
* 内存块在 {@link #chunk} 的位置
*/
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
// 置空
chunk = null;
handle = -1;
// 回收 Entry 对象
recyclerHandle.recycle(this);
}
}- 通过
chunk
和handle
属性,可以唯一确认一个内存块。 recyclerHandle
属性,用于回收 Entry 对象,用于#recycle()
方法中。
- 通过
size
属性,队列大小。allocations
属性,分配次数计数器。- 在
<1>
处理,我们可以看到创建的queue
属性,类型为 MPSC( Multiple Producer Single Consumer ) 队列,即多个生产者单一消费者。为什么使用 MPSC 队列呢?- 多个生产者,指的是多个线程,移除( 释放 )内存块出队列。
- 单个消费者,指的是单个线程,添加( 缓存 )内存块到队列。
3.2 newEntry
#newEntry(PoolChunk<?> chunk, long handle)
方法,创建 Entry 对象。代码如下:
"rawtypes") ( |
3.3 add
#add(PoolChunk<T> chunk, long handle)
方法,添加( 缓存 )内存块到队列,并返回是否添加成功。代码如下:
/** |
3.4 allocate
#allocate(PooledByteBuf<T> buf, int reqCapacity)
方法,从队列中获取缓存的内存块,初始化到 PooledByteBuf 对象中,并返回是否分配成功。代码如下:
/** |
- 代码比较简单,胖友自己看注释。
在
<1>
处,调用#initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity)
抽象方法,初始化内存块到 PooledByteBuf 对象中。代码如下:/**
* Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);- 该抽象方法需要子类 SubPageMemoryRegionCache 和 NormalMemoryRegionCache 来实现。并且,这也是 MemoryRegionCache 的唯一的抽象方法。
3.5 free
#free(...)
方法,清除队列。代码如下:
/** |
- 代码比较简单,胖友自己看注释。
<1>
处, 释放缓存的内存块回 Chunk 中。代码如下:private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
// 回收 Entry 对象
// recycle now so PoolChunk can be GC'ed.
entry.recycle();
// 释放缓存的内存块回 Chunk 中
chunk.arena.freeChunk(chunk, handle, sizeClass);
}
3.6 trim
这块当时没太看懂,后来读了 《自顶向下深入分析Netty(十)–PoolThreadCache》 文章后,看懂了 #trim()
方法。引用如下:
在分配过程还有一个
trim()
方法,当分配操作达到一定阈值(Netty默认8192)时,没有被分配出去的缓存空间都要被释放,以防止内存泄漏,核心代码如下:
// 内部类MemoryRegionCache |
也就是说,期望一个 MemoryRegionCache 频繁进行回收-分配,这样
allocations
>size
,将不会释放队列中的任何一个节点表示的内存空间;但如果长时间没有分配,则应该释放这一部分空间,防止内存占据过多。Tiny请求缓存512 个节点,由此可知当使用率超过
512 / 8192 = 6.25%
时就不会释放节点。
3.X1 SubPageMemoryRegionCache
SubPageMemoryRegionCache ,是 PoolThreadCache 的内部静态类,继承 MemoryRegionCache 抽象类,Subpage MemoryRegionCache 实现类。代码如下:
/** |
3.X2 NormalMemoryRegionCache
NormalMemoryRegionCache ,是 PoolThreadCache 的内部静态类,继承 MemoryRegionCache 抽象类,Page MemoryRegionCache 实现类。代码如下:
/** |
666. 彩蛋
嘿嘿,比想象中简单蛮多的一篇文章。
推荐阅读文章:
- Hypercube 《自顶向下深入分析Netty(十)–PoolThreadCache》