Package rx.internal.util.unsafe
Class MpmcArrayQueue<E>
- java.lang.Object
-
- java.util.AbstractCollection<E>
-
- java.util.AbstractQueue<E>
-
- rx.internal.util.unsafe.ConcurrentCircularArrayQueueL0Pad<E>
-
- rx.internal.util.unsafe.ConcurrentCircularArrayQueue<E>
-
- rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue<E>
-
- rx.internal.util.unsafe.MpmcArrayQueueL1Pad<E>
-
- rx.internal.util.unsafe.MpmcArrayQueueProducerField<E>
-
- rx.internal.util.unsafe.MpmcArrayQueueL2Pad<E>
-
- rx.internal.util.unsafe.MpmcArrayQueueConsumerField<E>
-
- rx.internal.util.unsafe.MpmcArrayQueue<E>
-
- Type Parameters:
E
- type of the element stored in theQueue
- All Implemented Interfaces:
java.lang.Iterable<E>
,java.util.Collection<E>
,java.util.Queue<E>
,MessagePassingQueue<E>
@SuppressAnimalSniffer public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E>
A Multi-Producer-Multi-Consumer queue based on aConcurrentCircularArrayQueue
. This implies that any and all threads may call the offer/poll/peek methods and correctness is maintained.
This implementation follows patterns documented on the package level for False Sharing protection.
The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as well as this implementation.
Tradeoffs to keep in mind:- Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays. We are trading memory to avoid false sharing(active and passive).
- 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array. This is doubling/tripling the memory allocated for the buffer.
- Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or equal to the requested capacity.
-
-
Field Summary
Fields Modifier and Type Field Description (package private) long
p30
(package private) long
p31
(package private) long
p32
(package private) long
p33
(package private) long
p34
(package private) long
p35
(package private) long
p36
(package private) long
p37
(package private) long
p40
(package private) long
p41
(package private) long
p42
(package private) long
p43
(package private) long
p44
(package private) long
p45
(package private) long
p46
-
Fields inherited from class rx.internal.util.unsafe.MpmcArrayQueueL2Pad
p20, p21, p22, p23, p24, p25, p26
-
Fields inherited from class rx.internal.util.unsafe.MpmcArrayQueueL1Pad
p10, p11, p12, p13, p14, p15, p16
-
Fields inherited from class rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue
sequenceBuffer
-
Fields inherited from class rx.internal.util.unsafe.ConcurrentCircularArrayQueue
buffer, BUFFER_PAD, mask, SPARSE_SHIFT
-
-
Constructor Summary
Constructors Constructor Description MpmcArrayQueue(int capacity)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
isEmpty()
This method's accuracy is subject to concurrent modifications happening as the observation is carried out.boolean
offer(E e)
Called from a producer thread subject to the restrictions appropriate to the implementation and according to theQueue.offer(Object)
interface.E
peek()
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.peek()
interface.E
poll()
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.poll()
interface.int
size()
This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value.-
Methods inherited from class rx.internal.util.unsafe.MpmcArrayQueueConsumerField
casConsumerIndex, lvConsumerIndex
-
Methods inherited from class rx.internal.util.unsafe.MpmcArrayQueueProducerField
casProducerIndex, lvProducerIndex
-
Methods inherited from class rx.internal.util.unsafe.ConcurrentSequencedCircularArrayQueue
calcSequenceOffset, lvSequence, soSequence
-
Methods inherited from class rx.internal.util.unsafe.ConcurrentCircularArrayQueue
calcElementOffset, calcElementOffset, clear, iterator, lpElement, lpElement, lvElement, lvElement, soElement, soElement, spElement, spElement
-
Methods inherited from class java.util.AbstractCollection
contains, containsAll, remove, removeAll, retainAll, toArray, toArray, toString
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
-
-
-
Method Detail
-
offer
public boolean offer(E e)
Description copied from interface:MessagePassingQueue
Called from a producer thread subject to the restrictions appropriate to the implementation and according to theQueue.offer(Object)
interface.- Returns:
- true if element was inserted into the queue, false iff full
-
poll
public E poll()
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.poll()
interface.Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must test producer index when next element is not visible.
- Returns:
- a message from the queue if one is available, null iff empty
-
peek
public E peek()
Description copied from interface:MessagePassingQueue
Called from the consumer thread subject to the restrictions appropriate to the implementation and according to theQueue.peek()
interface.- Returns:
- a message from the queue if one is available, null iff empty
-
size
public int size()
Description copied from interface:MessagePassingQueue
This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).- Specified by:
size
in interfacejava.util.Collection<E>
- Specified by:
size
in interfaceMessagePassingQueue<E>
- Specified by:
size
in classjava.util.AbstractCollection<E>
- Returns:
- number of messages in the queue, between 0 and queue capacity or
Integer.MAX_VALUE
if not bounded
-
isEmpty
public boolean isEmpty()
Description copied from interface:MessagePassingQueue
This method's accuracy is subject to concurrent modifications happening as the observation is carried out.- Specified by:
isEmpty
in interfacejava.util.Collection<E>
- Specified by:
isEmpty
in interfaceMessagePassingQueue<E>
- Overrides:
isEmpty
in classjava.util.AbstractCollection<E>
- Returns:
- true if empty, false otherwise
-
-