-
create an Observable from scratch by calling observer methods programmatically
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); }
-
do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) { return create(new OnSubscribeDefer<T>(observableFactory)); } public final class OnSubscribeDefer<T> implements OnSubscribe<T> { @Override public void call(final Subscriber<? super T> s) { Observable<? extends T> o; try { o = observableFactory.call(); } catch (Throwable t) { Exceptions.throwOrReport(t, s); return; } o.unsafeSubscribe(Subscribers.wrap(s)); } }
-
create Observables that have very precise and limited behavior
public static <T> Observable<T> empty() { return EmptyObservableHolder.instance(); } public void call(Subscriber<? super Object> child) { child.onCompleted(); } public static <T> Observable<T> error(Throwable exception) { return create(new OnSubscribeThrow<T>(exception)); } public static <T> Observable<T> never() { return NeverObservableHolder.instance(); }
EmptyObservableHolder为枚举类型,关键代码:
call方法中调用child.onCompleted();NeverObservableHolder为枚举类型,关键代码:call方法中没有调用child任何代码error关键代码:call方法中调用observer.onError(exception); -
convert some other object or data structure into an Observable
public static <T> Observable<T> from(Iterable<? extends T> iterable) { return create(new OnSubscribeFromIterable<T>(iterable)); return create(new OnSubscribeFromArray<T>(array)); } public static <T> Observable<T> from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]); } return create(new OnSubscribeFromArray<T>(array)); } public static <T> Observable<T> from(Future<? extends T> future) { return (Observable<T>)create(OnSubscribeToObservableFuture .toObservableFuture(future)); } public static <T> Observable<T> from(Future<? extends T> future, long timeout, TimeUnit unit) { return (Observable<T>)create(OnSubscribeToObservableFuture .toObservableFuture(future, timeout, unit)); }
-
###Interval create an Observable that emits a sequence of integers spaced by a particular time interval
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { return create(new OnSubscribeTimerPeriodically(initialDelay, period, unit, scheduler)); }
-
###Just convert an object or a set of objects into an Observable that emits that or those objects
public static <T> Observable<T> just(final T value) { return ScalarSynchronousObservable.create(value); } public static <T> Observable<T> just(T t1, T t2) { return from((T[])new Object[] { t1, t2 }); }
-
###Range create an Observable that emits a range of sequential integers
public static Observable<Integer> range(int start, int count) { if (count < 0) { throw new IllegalArgumentException("Count can not be negative"); } if (count == 0) { return Observable.empty(); } if (start > Integer.MAX_VALUE - count + 1) { throw new IllegalArgumentException( "start + count can not exceed Integer.MAX_VALUE"); } if(count == 1) { return Observable.just(start); } return Observable.create(new OnSubscribeRange(start, start + (count - 1))); } public static Observable<Integer> range(int start, int count, Scheduler scheduler) { return range(start, count).subscribeOn(scheduler); }
-
###Repeat create an Observable that emits a particular item or sequence of items repeatedly
public final Observable<T> repeat() { return OnSubscribeRedo.<T>repeat(this); } public final Observable<T> repeat(Scheduler scheduler) { return OnSubscribeRedo.<T>repeat(this, scheduler); } public final Observable<T> repeat(final long count) { return OnSubscribeRedo.<T>repeat(this, count); } public final Observable<T> repeat(final long count, Scheduler scheduler) { return OnSubscribeRedo.<T>repeat(this, count, scheduler); }
-
###Timer create an Observable that emits a single item after a given delay
public static Observable<Long> timer(long delay, TimeUnit unit) { return timer(delay, unit, Schedulers.computation()); } public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) { return create(new OnSubscribeTimerOnce(delay, unit, scheduler)); }
-
###Start
Java中没有实现