// PlatformDependent.java /** * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single * consumer (one thread!). * @return A MPSC queue which may be unbounded. */ publicstatic <T> Queue<T> newMpscQueue(){ return Mpsc.newMpscQueue(); }
// Mpsc.java static <T> Queue<T> newMpscQueue(){ return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE) : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE); }
// BaseMpscLinkedArrayQueue.java /** * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size. * Must be 2 or more. */ publicBaseMpscLinkedArrayQueue(finalint initialCapacity) { // 校验队列容量值,大小必须不小于2 RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
// 通过传入的参数通过Pow2算法获取大于initialCapacity最近的一个2的n次方的值 int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity); // leave lower bit of mask clear long mask = (p2capacity - 1) << 1; // 通过p2capacity计算获得mask值,该值后续将用作扩容的值 // need extra element to point at next array E[] buffer = allocate(p2capacity + 1); // 默认分配一个 p2capacity + 1 大小的数据缓冲区 producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; consumerMask = mask; // 同时用mask作为初始化队列的Limit值,当生产者指针producerIndex超过该Limit值时就需要做扩容处理 soProducerLimit(mask); // we know it's all empty to start with }
1、源码: // BaseMpscLinkedArrayQueue.java @Override publicbooleanoffer(final E e) { if (null == e) // 待添加的元素e不允许为空,否则抛空指针异常 { thrownew NullPointerException(); }
long mask; E[] buffer; long pIndex;
while (true) { long producerLimit = lvProducerLimit(); // 获取当前数据Limit的阈值 pIndex = lvProducerIndex(); // 获取当前生产者指针位置 // lower bit is indicative of resize, if we see it we spin until it's cleared if ((pIndex & 1) == 1) { continue; } // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
// mask/buffer may get changed by resizing -> only use for array access after successful CAS. mask = this.producerMask; buffer = this.producerBuffer; // a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)
// assumption behind this optimization is that queue is almost always empty or near empty if (producerLimit <= pIndex) // 当阈值小于等于生产者指针位置时,则需要扩容,否则直接通过CAS操作对pIndex做加2处理 { // 通过offerSlowPath返回状态值,来查看怎么来处理这个待添加的元素 int result = offerSlowPath(mask, pIndex, producerLimit); switch (result) { case CONTINUE_TO_P_INDEX_CAS: break; case RETRY: // 可能由于并发原因导致CAS失败,那么则再次重新尝试添加元素 continue; case QUEUE_FULL: // 队列已满,直接返回false操作 returnfalse; case QUEUE_RESIZE: // 队列需要扩容操作 resize(mask, buffer, pIndex, e); // 对队列进行直接扩容操作 returntrue; } }
// 能走到这里,则说明当前的生产者指针位置还没有超过阈值,因此直接通过CAS操作做加2处理 if (casProducerIndex(pIndex, pIndex + 2)) { break; } } // INDEX visible before ELEMENT // 获取计算需要添加元素的位置 finallong offset = modifiedCalcElementOffset(pIndex, mask); // 在buffer的offset位置添加e元素 soElement(buffer, offset, e); // release element e returntrue; }
// UnsafeRefArrayAccess.java /** * An ordered store(store + StoreStore barrier) of an element to a given offset * * @param buffer this.buffer * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset} * @param e an orderly kitty */ publicstatic <E> voidsoElement(E[] buffer, long offset, E e) { // 通过Unsafe对象调用native方法,将元素e设置到buffer缓冲区的offset位置 UNSAFE.putOrderedObject(buffer, offset, e); }
1、源码: // BaseMpscLinkedArrayQueue.java /** * We do not inline resize into this method because we do not resize on fill. */ privateintofferSlowPath(long mask, long pIndex, long producerLimit) { // 获取消费者指针 finallong cIndex = lvConsumerIndex(); // 获取当前缓冲区的容量值,getCurrentBufferCapacity方法由子类MpscUnboundedArrayQueue实现,默认返回mask值 long bufferCapacity = getCurrentBufferCapacity(mask);
// 如果消费指针加上容量值如果超过了生产指针,那么则会尝试进行扩容处理 if (cIndex + bufferCapacity > pIndex) { if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) { // retry from top return RETRY; } else { // continue to pIndex CAS return CONTINUE_TO_P_INDEX_CAS; } } // full and cannot grow 子类MpscUnboundedArrayQueue默认返回Integer.MAX_VALUE值,所以不会进入此分支 elseif (availableInQueue(pIndex, cIndex) <= 0) { // offer should return false; return QUEUE_FULL; } // grab index for resize -> set lower bit 尝试扩容队列 elseif (casProducerIndex(pIndex, pIndex + 1)) { // trigger a resize return QUEUE_RESIZE; } else { // failed resize attempt, retry from top return RETRY; } }
// Invalidate racing CASs // We never set the limit beyond the bounds of a buffer // 重新扩容阈值,因为availableInQueue反正都是Integer.MAX_VALUE值,所以自然就取mask值啦 // 因此针对MpscUnboundedArrayQueue来说,扩容的值其实就是mask的值的大小 soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
// make resize visible to the other producers // 设置生产者指针加2处理 soProducerIndex(pIndex + 2);
// INDEX visible before ELEMENT, consistent with consumer expectation
// make resize visible to consumer // 用一个空对象来衔接新老缓冲区,凡是在缓冲区中碰到JUMP对象的话,那么就得琢磨着准备着获取下一个缓冲区的数据元素了 soElement(oldBuffer, offsetInOld, JUMP); }
1、源码: // BaseMpscLinkedArrayQueue.java /** * {@inheritDoc} * <p> * This implementation is correct for single consumer thread use only. */ @SuppressWarnings("unchecked") @Override public E poll() { final E[] buffer = consumerBuffer; // 获取缓冲区的数据 finallong index = consumerIndex; finallong mask = consumerMask;
// 根据消费指针与mask来获取当前需要从哪个位置开始来移除元素 finallong offset = modifiedCalcElementOffset(index, mask); // 从buffer缓冲区的offset位置获取元素内容 Object e = lvElement(buffer, offset);// LoadLoad if (e == null) // 如果元素为null的话 { // 则再探讨看看消费指针是不是和生产指针是不是相同 if (index != lvProducerIndex()) { // poll() == null iff queue is empty, null element is not strong enough indicator, so we must // check the producer index. If the queue is indeed not empty we spin until element is // visible. // 若不相同的话,则先尝试从buffer缓冲区的offset位置获取元素先,若获取元素为null则结束while处理 do { e = lvElement(buffer, offset); } while (e == null); } // 说明消费指针是不是和生产指针是相等的,那么则缓冲区的数据已经被消费完了,直接返回null即可 else { returnnull; } }
// 如果元素为JUMP空对象的话,那么意味着我们就得获取下一缓冲区进行读取数据了 if (e == JUMP) { // final E[] nextBuffer = getNextBuffer(buffer, mask); // return newBufferPoll(nextBuffer, index); }
// UnsafeRefArrayAccess.java /** * A volatile load (load + LoadLoad barrier) of an element from a given offset. * * @param buffer this.buffer * @param offset computed via {@link UnsafeRefArrayAccess#calcElementOffset(long)} * @return the element at the offset */ @SuppressWarnings("unchecked") publicstatic <E> E lvElement(E[] buffer, long offset) { // 通过Unsafe对象调用native方法,获取buffer缓冲区offset位置的元素 return (E) UNSAFE.getObjectVolatile(buffer, offset); }
1、源码: // BaseMpscLinkedArrayQueue.java @Override publicfinalintsize() { // NOTE: because indices are on even numbers we cannot use the size util.
/* * It is possible for a thread to be interrupted or reschedule between the read of the producer and * consumer indices, therefore protection is required to ensure size is within valid range. In the * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer * index BEFORE the producer index. */ long after = lvConsumerIndex(); // 获取消费指针 long size; while (true) // 为了防止在获取大小的时候指针发生变化,那么则死循环自旋方式获取大小数值 { finallong before = after; finallong currentProducerIndex = lvProducerIndex(); // 获取生产者指针 after = lvConsumerIndex(); // 获取消费指针
// 若消费指针前后不一致,那么可以说是由于并发原因导致了指针发生了变化; // 那么则进行下一次循环继续获取最新的指针值再次进行判断 } // Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded // indexed queues. if (size > Integer.MAX_VALUE) { return Integer.MAX_VALUE; } else { return (int) size; } }
1、源码: // BaseMpscLinkedArrayQueue.java @Override publicfinalbooleanisEmpty() { // Order matters! // Loading consumer before producer allows for producer increments after consumer index is read. // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is // nothing we can do to make this an exact method. // 这个就简单了,直接判断消费指针和生产指针是不是相等就知道了 return (this.lvConsumerIndex() == this.lvProducerIndex()); }