1. 概述 本文,我们来分享 UnpooledByteBufAllocator ,普通 的 ByteBuf 的分配器,不基于内存池 。
2. ByteBufAllocatorMetricProvider io.netty.buffer.ByteBufAllocatorMetricProvider
,ByteBufAllocator Metric 提供者接口,用于监控 ByteBuf 的 Heap 和 Direct 占用内存的情况 。代码如下:
public interface ByteBufAllocatorMetricProvider { ByteBufAllocatorMetric metric () ; }
ByteBufAllocatorMetricProvider 有两个子类:UnpooledByteBufAllocator 和 PooledByteBufAllocator 。
3. ByteBufAllocatorMetric io.netty.buffer.ByteBufAllocatorMetric
,ByteBufAllocator Metric 接口。代码如下:
public interface ByteBufAllocatorMetric { long usedHeapMemory () ; long usedDirectMemory () ; }
ByteBufAllocatorMetric 有两个子类:UnpooledByteBufAllocatorMetric 和 PooledByteBufAllocatorMetric 。
3.1 UnpooledByteBufAllocatorMetric UnpooledByteBufAllocatorMetric ,在 UnpooledByteBufAllocator 的内部静态类 ,实现 ByteBufAllocatorMetric 接口,UnpooledByteBufAllocator Metric 实现类。代码如下:
final LongCounter directCounter = PlatformDependent.newLongCounter();final LongCounter heapCounter = PlatformDependent.newLongCounter();@Override public long usedHeapMemory () { return heapCounter.value(); } @Override public long usedDirectMemory () { return directCounter.value(); }
4. UnpooledByteBufAllocator io.netty.buffer.UnpooledByteBufAllocator
,实现 ByteBufAllocatorMetricProvider 接口,继承 AbstractByteBufAllocator 抽象类,普通 的 ByteBuf 的分配器,不基于内存池 。
4.1 构造方法 private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();private final boolean disableLeakDetector;private final boolean noCleaner;public UnpooledByteBufAllocator (boolean preferDirect) { this (preferDirect, false ); } public UnpooledByteBufAllocator (boolean preferDirect, boolean disableLeakDetector) { this (preferDirect, disableLeakDetector, PlatformDependent.useDirectBufferNoCleaner() ); } public UnpooledByteBufAllocator (boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) { super (preferDirect); this .disableLeakDetector = disableLeakDetector; noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() && PlatformDependent.hasDirectBufferNoCleanerConstructor() ; }
metric
属性,UnpooledByteBufAllocatorMetric 对象。
disableLeakDetector
属性,是否禁用内存泄露检测功能。
noCleaner
属性,是否不使用 io.netty.util.internal.Cleaner
来释放 Direct ByteBuf 。
4.2 newHeapBuffer @Override protected ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new InstrumentedUnpooledUnsafeHeapByteBuf(this , initialCapacity, maxCapacity) : new InstrumentedUnpooledHeapByteBuf(this , initialCapacity, maxCapacity); }
4.3 newDirectBuffer @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { final ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this , initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this , initialCapacity, maxCapacity); } else { buf = new InstrumentedUnpooledDirectByteBuf(this , initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
4.4 compositeHeapBuffer @Override public CompositeByteBuf compositeHeapBuffer (int maxNumComponents) { CompositeByteBuf buf = new CompositeByteBuf(this , false , maxNumComponents); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
结合了 disableLeakDetector
属性。
4.5 compositeDirectBuffer @Override public CompositeByteBuf compositeDirectBuffer (int maxNumComponents) { CompositeByteBuf buf = new CompositeByteBuf(this , true , maxNumComponents); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
结合了 disableLeakDetector
属性。
4.6 isDirectBufferPooled @Override public boolean isDirectBufferPooled () { return false ; }
4.7 Metric 相关操作方法 @Override public ByteBufAllocatorMetric metric () { return metric; } void incrementDirect (int amount) { metric.directCounter.add(amount); } void decrementDirect (int amount) { metric.directCounter.add(-amount); } void incrementHeap (int amount) { metric.heapCounter.add(amount); } void decrementHeap (int amount) { metric.heapCounter.add(-amount); }
5. Instrumented ByteBuf 因为要和 Metric 结合,所以通过继承 的方式,进行增强。
5.1 InstrumentedUnpooledUnsafeHeapByteBuf Instrumented UnpooledUnsafeHeapByteBuf ,在 UnpooledByteBufAllocator 的内部静态类 ,继承 UnpooledUnsafeHeapByteBuf 类。代码如下:
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf { InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected byte [] allocateArray(int initialCapacity) { byte [] bytes = super .allocateArray(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } @Override protected void freeArray (byte [] array) { int length = array.length; super .freeArray(array); ((UnpooledByteBufAllocator) alloc()).decrementHeap(length); } }
在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Heap 占用内存的大小。
5.2 InstrumentedUnpooledHeapByteBuf Instrumented UnpooledHeapByteBuf ,在 UnpooledByteBufAllocator 的内部静态类 ,继承 UnpooledHeapByteBuf 类。代码如下:
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf { InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected byte [] allocateArray(int initialCapacity) { byte [] bytes = super .allocateArray(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } @Override protected void freeArray (byte [] array) { int length = array.length; super .freeArray(array); ((UnpooledByteBufAllocator) alloc()).decrementHeap(length); } }
在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Heap 占用内存的大小。
5.3 InstrumentedUnpooledUnsafeDirectByteBuf Instrumented UnpooledUnsafeDirectByteBuf ,在 UnpooledByteBufAllocator 的内部静态类 ,继承 UnpooledUnsafeDirectByteBuf 类。代码如下:
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { InstrumentedUnpooledUnsafeDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { ByteBuffer buffer = super .allocateDirect(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect (ByteBuffer buffer) { int capacity = buffer.capacity(); super .freeDirect(buffer); ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Direct 占用内存的大小。
5.4 InstrumentedUnpooledDirectByteBuf Instrumented UnpooledDirectByteBuf 的内部静态类 ,继承 UnpooledDirectByteBuf 类。代码如下:
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf { InstrumentedUnpooledDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { ByteBuffer buffer = super .allocateDirect(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect (ByteBuffer buffer) { int capacity = buffer.capacity(); super .freeDirect(buffer); ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Direct 占用内存的大小。
5.5 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf Instrumented UnpooledDirectByteBuf 的内部静态类 ,继承 UnpooledUnsafeNoCleanerDirectByteBuf 类。代码如下:
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeNoCleanerDirectByteBuf { InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { ByteBuffer buffer = super .allocateDirect(initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override ByteBuffer reallocateDirect (ByteBuffer oldBuffer, int initialCapacity) { int capacity = oldBuffer.capacity(); ByteBuffer buffer = super .reallocateDirect(oldBuffer, initialCapacity); ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity); return buffer; } @Override protected void freeDirect (ByteBuffer buffer) { int capacity = buffer.capacity(); super .freeDirect(buffer); ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
在原先的基础上,调用 Metric 相应的增减操作方法,得以记录 Heap 占用内存的大小。
5.5.1 UnpooledUnsafeNoCleanerDirectByteBuf io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf
,继承 UnpooledUnsafeDirectByteBuf 类。代码如下:
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf { UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect (int initialCapacity) { return PlatformDependent.allocateDirectNoCleaner(initialCapacity); } ByteBuffer reallocateDirect (ByteBuffer oldBuffer, int initialCapacity) { return PlatformDependent.reallocateDirectNoCleaner(oldBuffer, initialCapacity); } @Override protected void freeDirect (ByteBuffer buffer) { PlatformDependent.freeDirectNoCleaner(buffer); } @Override public ByteBuf capacity (int newCapacity) { checkNewCapacity(newCapacity); int oldCapacity = capacity(); if (newCapacity == oldCapacity) { return this ; } ByteBuffer newBuffer = reallocateDirect(buffer, newCapacity); if (newCapacity < oldCapacity) { if (readerIndex() < newCapacity) { if (writerIndex() > newCapacity) { writerIndex(newCapacity); } } else { setIndex(newCapacity, newCapacity); } } setByteBuffer(newBuffer, false ); return this ; } }
FROM 《Netty源码分析(一) ByteBuf》
和 UnpooledUnsafeDirectByteBuf 最大区别在于 UnpooledUnsafeNoCleanerDirectByteBuf 在 allocate的时候通过反射构造函数的方式创建DirectByteBuffer,这样在DirectByteBuffer中没有对应的Cleaner函数(通过ByteBuffer.allocateDirect的方式会自动生成Cleaner函数,Cleaner用于内存回收,具体可以看源码),内存回收时,UnpooledUnsafeDirectByteBuf通过调用DirectByteBuffer中的Cleaner函数回收,而UnpooledUnsafeNoCleanerDirectByteBuf直接使用UNSAFE.freeMemory(address)释放内存地址。
666. 彩蛋 😈 小水文一篇。铺垫铺垫,你懂的。