Class Scheduler
- java.lang.Object
-
- rx.Scheduler
-
- Direct Known Subclasses:
CachedThreadScheduler
,EventLoopsScheduler
,ExecutorScheduler
,ImmediateScheduler
,ImmediateScheduler
,NewThreadScheduler
,NewThreadScheduler
,SchedulerWhen
,TestScheduler
,TrampolineScheduler
,TrampolineScheduler
public abstract class Scheduler extends java.lang.Object
AScheduler
is an object that schedules units of work. You can find common implementations of this class inSchedulers
.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
Scheduler.Worker
Sequential Scheduler for executing actions on a single thread or event loop.
-
Field Summary
Fields Modifier and Type Field Description (package private) static long
CLOCK_DRIFT_TOLERANCE_NANOS
The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
-
Constructor Summary
Constructors Constructor Description Scheduler()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description abstract Scheduler.Worker
createWorker()
Retrieves or creates a newScheduler.Worker
that represents serial execution of actions.long
now()
Gets the current time, in milliseconds, according to this Scheduler.<S extends Scheduler & Subscription>
Swhen(Func1<Observable<Observable<Completable>>,Completable> combine)
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done.
-
-
-
Method Detail
-
createWorker
public abstract Scheduler.Worker createWorker()
Retrieves or creates a newScheduler.Worker
that represents serial execution of actions.When work is completed it should be unsubscribed using
Subscription.unsubscribe()
.Work on a
Scheduler.Worker
is guaranteed to be sequential.- Returns:
- a Worker representing a serial queue of actions to be executed
-
now
public long now()
Gets the current time, in milliseconds, according to this Scheduler.- Returns:
- the scheduler's notion of current absolute time in milliseconds
-
when
@Experimental public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>,Completable> combine)
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on thisScheduler
. The only parameter is a function that flattens anObservable
ofObservable
ofCompletable
s into just oneCompletable
. There must be a chain of operators connecting the returned value to the sourceObservable
otherwise any work scheduled on the returnedScheduler
will not be executed.When
createWorker()
is invoked aObservable
ofCompletable
s is onNext'd to the combinator to be flattened. If the innerObservable
is not immediately subscribed to an calls toScheduler.Worker.schedule(rx.functions.Action0)
are buffered. Once theObservable
is subscribed to actions are then onNext'd asCompletable
s.Finally the actions scheduled on the parent
Scheduler
when the inner mostCompletable
s are subscribed to.When the
Scheduler.Worker
is unsubscribed theCompletable
emits an onComplete and triggers any behavior in the flattening operator. TheObservable
and allCompletable
s give to the flattening function never onError.Limit the amount concurrency two at a time without creating a new fix size thread pool:
Scheduler limitSched = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // callbacks two at a time return Completable.merge(Observable.merge(workers), 2); });
This is a slightly different way to limit the concurrency but it has some interesting benefits and drawbacks to the method above. It works by limited the number of concurrent
Scheduler.Worker
s rather than individual actions. Generally eachObservable
uses its ownScheduler.Worker
. This means that this will essentially limit the number of concurrent subscribes. The danger comes from using operators likeObservable.zip(Observable, Observable, rx.functions.Func2)
where subscribing to the firstObservable
could deadlock the subscription to the second.Scheduler limitSched = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // Observables two at a time return Completable.merge(Observable.merge(workers, 2)); });
Slowing down the rate to no more than than 1 a second. This suffers from the same problem as the one above I could find anObservable
operator that limits the rate without dropping the values (aka leaky bucket algorithm).Scheduler slowSched = Schedulers.computation().when(workers -> { // use concatenate to make each worker happen one at a time. return Completable.concat(workers.map(actions -> { // delay the starting of the next worker by 1 second. return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); })); });
- Type Parameters:
S
- a Scheduler and a Subscription- Parameters:
combine
- the function that takes a two-level nested Observable sequence of a Completable and returns the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.- Returns:
- the Scheduler with the customized execution behavior
-
-