Package rx.internal.operators
Class OperatorMerge.MergeSubscriber<T>
- java.lang.Object
-
- rx.Subscriber<Observable<? extends T>>
-
- rx.internal.operators.OperatorMerge.MergeSubscriber<T>
-
- Type Parameters:
T
- the value type
- All Implemented Interfaces:
Observer<Observable<? extends T>>
,Subscription
- Enclosing class:
- OperatorMerge<T>
static final class OperatorMerge.MergeSubscriber<T> extends Subscriber<Observable<? extends T>>
The subscriber that observes Observables.
-
-
Field Summary
Fields Modifier and Type Field Description (package private) Subscriber<? super T>
child
(package private) boolean
delayErrors
(package private) boolean
done
(package private) boolean
emitting
Guarded by this.(package private) static OperatorMerge.InnerSubscriber<?>[]
EMPTY
An empty array to avoid creating new empty arrays in removeInner.(package private) java.util.concurrent.ConcurrentLinkedQueue<java.lang.Throwable>
errors
Due to the emission loop, we need to store errors somewhere if !delayErrors.(package private) java.lang.Object
innerGuard
(package private) OperatorMerge.InnerSubscriber<?>[]
innerSubscribers
Copy-on-write array, guarded by innerGuard.(package private) long
lastId
Which was the last InnerSubscriber that emitted? Accessed if emitting == true.(package private) int
lastIndex
What was its index in the innerSubscribers array? Accessed if emitting == true.(package private) int
maxConcurrent
(package private) boolean
missed
Guarded by this.(package private) NotificationLite<T>
nl
(package private) OperatorMerge.MergeProducer<T>
producer
(package private) java.util.Queue<java.lang.Object>
queue
(package private) int
scalarEmissionCount
(package private) int
scalarEmissionLimit
(package private) CompositeSubscription
subscriptions
Tracks the active subscriptions to sources.(package private) long
uniqueId
Used to generate unique InnerSubscriber IDs.
-
Constructor Summary
Constructors Constructor Description MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) void
addInner(OperatorMerge.InnerSubscriber<T> inner)
(package private) boolean
checkTerminate()
Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.(package private) void
emit()
(package private) void
emitEmpty()
(package private) void
emitLoop()
The standard emission loop serializing events and requests.protected void
emitScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value, long r)
protected void
emitScalar(T value, long r)
(package private) CompositeSubscription
getOrCreateComposite()
(package private) java.util.Queue<java.lang.Throwable>
getOrCreateErrorQueue()
void
onCompleted()
Notifies the Observer that theObservable
has finished sending push-based notifications.void
onError(java.lang.Throwable e)
Notifies the Observer that theObservable
has experienced an error condition.void
onNext(Observable<? extends T> t)
Provides the Observer with a new item to observe.protected void
queueScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value)
protected void
queueScalar(T value)
(package private) void
removeInner(OperatorMerge.InnerSubscriber<T> inner)
private void
reportError()
void
requestMore(long n)
(package private) void
tryEmit(OperatorMerge.InnerSubscriber<T> subscriber, T value)
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.(package private) void
tryEmit(T value)
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.-
Methods inherited from class rx.Subscriber
add, isUnsubscribed, onStart, request, setProducer, unsubscribe
-
-
-
-
Field Detail
-
child
final Subscriber<? super T> child
-
delayErrors
final boolean delayErrors
-
maxConcurrent
final int maxConcurrent
-
producer
OperatorMerge.MergeProducer<T> producer
-
queue
volatile java.util.Queue<java.lang.Object> queue
-
subscriptions
volatile CompositeSubscription subscriptions
Tracks the active subscriptions to sources.
-
errors
volatile java.util.concurrent.ConcurrentLinkedQueue<java.lang.Throwable> errors
Due to the emission loop, we need to store errors somewhere if !delayErrors.
-
nl
final NotificationLite<T> nl
-
done
volatile boolean done
-
emitting
boolean emitting
Guarded by this.
-
missed
boolean missed
Guarded by this.
-
innerGuard
final java.lang.Object innerGuard
-
innerSubscribers
volatile OperatorMerge.InnerSubscriber<?>[] innerSubscribers
Copy-on-write array, guarded by innerGuard.
-
uniqueId
long uniqueId
Used to generate unique InnerSubscriber IDs. Modified from onNext only.
-
lastId
long lastId
Which was the last InnerSubscriber that emitted? Accessed if emitting == true.
-
lastIndex
int lastIndex
What was its index in the innerSubscribers array? Accessed if emitting == true.
-
EMPTY
static final OperatorMerge.InnerSubscriber<?>[] EMPTY
An empty array to avoid creating new empty arrays in removeInner.
-
scalarEmissionLimit
final int scalarEmissionLimit
-
scalarEmissionCount
int scalarEmissionCount
-
-
Constructor Detail
-
MergeSubscriber
public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent)
-
-
Method Detail
-
getOrCreateErrorQueue
java.util.Queue<java.lang.Throwable> getOrCreateErrorQueue()
-
getOrCreateComposite
CompositeSubscription getOrCreateComposite()
-
onNext
public void onNext(Observable<? extends T> t)
Description copied from interface:Observer
Provides the Observer with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onCompleted()
orObserver.onError(java.lang.Throwable)
.- Parameters:
t
- the item emitted by the Observable
-
emitEmpty
void emitEmpty()
-
reportError
private void reportError()
-
onError
public void onError(java.lang.Throwable e)
Description copied from interface:Observer
Notifies the Observer that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onCompleted()
.- Parameters:
e
- the exception encountered by the Observable
-
onCompleted
public void onCompleted()
Description copied from interface:Observer
Notifies the Observer that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
.
-
addInner
void addInner(OperatorMerge.InnerSubscriber<T> inner)
-
removeInner
void removeInner(OperatorMerge.InnerSubscriber<T> inner)
-
tryEmit
void tryEmit(OperatorMerge.InnerSubscriber<T> subscriber, T value)
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
subscriber
-value
-
-
queueScalar
protected void queueScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value)
-
emitScalar
protected void emitScalar(OperatorMerge.InnerSubscriber<T> subscriber, T value, long r)
-
requestMore
public void requestMore(long n)
-
tryEmit
void tryEmit(T value)
Tries to emit the value directly to the child if no concurrent emission is happening at the moment.Since the scalar-value queue optimization applies to both the main source and the inner subscribers, we handle things in a shared manner.
- Parameters:
subscriber
-value
-
-
queueScalar
protected void queueScalar(T value)
-
emitScalar
protected void emitScalar(T value, long r)
-
emit
void emit()
-
emitLoop
void emitLoop()
The standard emission loop serializing events and requests.
-
checkTerminate
boolean checkTerminate()
Check if the operator reached some terminal state: child unsubscribed, an error was reported and we don't delay errors.- Returns:
- true if the child unsubscribed or there are errors available and merge doesn't delay errors.
-
-