@ -39,7 +39,7 @@ public class Observable<T> {
@@ -39,7 +39,7 @@ public class Observable<T> {
* must not be null .
*
* @param action the Action to perform
* @param < T > the type that will be emitted to the subscriber
* @param < T > the type that will be emitted to the onSubscribe
* @return a valid non - null Observable .
* /
@NonNull
@ -49,7 +49,7 @@ public class Observable<T> {
@@ -49,7 +49,7 @@ public class Observable<T> {
}
/ * *
* Tells the Observable what Executor that the subscriber
* Tells the Observable what Executor that the onSubscribe
* work should run on .
*
* @param subscribeExecutor the Executor to run the work on .
@ -61,7 +61,7 @@ public class Observable<T> {
@@ -61,7 +61,7 @@ public class Observable<T> {
}
/ * *
* Tells the Observable what Executor the subscriber should observe
* Tells the Observable what Executor the onSubscribe should observe
* the work on .
*
* @param observerExecutor the Executor to run to callback on .
@ -80,7 +80,7 @@ public class Observable<T> {
@@ -80,7 +80,7 @@ public class Observable<T> {
executeOnSubscriberThread ( new Runnable ( ) {
@Override
public void run ( ) {
mAction . onSubscribe ( new On Subscribe< T > ( null ) {
mAction . onSubscribe ( new Subscriber < T > ( ) {
@Override
public void unsubscribe ( ) { }
@ -88,7 +88,7 @@ public class Observable<T> {
@@ -88,7 +88,7 @@ public class Observable<T> {
public void onComplete ( ) { }
@Override
public void s tart( ) { }
public void onS tart( ) { }
@Override
public void onError ( @NonNull Throwable throwable ) { }
@ -102,73 +102,27 @@ public class Observable<T> {
@@ -102,73 +102,27 @@ public class Observable<T> {
/ * *
* Immediately subscribes to the Observable and starts
* sending events from the Observable to the { @link Subscriber } .
* sending events from the Observable to the { @link On Subscribe} .
*
* @param subscriber the class that wishes to receive onNext and
* onComplete callbacks from the Observable .
* @param onSubscribe the class that wishes to receive onNext and
* onComplete callbacks from the Observable .
* /
public Subscription subscribe ( @NonNull Subscriber < T > subscriber ) {
public Subscription subscribe ( @NonNull On Subscribe< T > onSubscribe ) {
Preconditions . checkNonNull ( subscriber ) ;
Preconditions . checkNonNull ( onSubscribe ) ;
final On Subscribe< T > onSubscribe = new OnSubscribe < T > ( subscriber ) {
final Subscriber < T > subscriber = new SubscriberImpl < > ( onSubscribe , this ) ;
@Override
public void unsubscribe ( ) {
setSubscriber ( null ) ;
}
private boolean mOnCompleteExecuted = false ;
@Override
public void onComplete ( ) {
Subscriber < T > subscription = getSubscriber ( ) ;
if ( ! mOnCompleteExecuted & & subscription ! = null ) {
mOnCompleteExecuted = true ;
executeOnObserverThread ( new OnCompleteRunnable < > ( subscription ) ) ;
} else {
Log . e ( TAG , "onComplete called more than once" ) ;
throw new RuntimeException ( "onComplete called more than once" ) ;
}
}
@Override
public void start ( ) {
Subscriber < T > subscription = getSubscriber ( ) ;
executeOnObserverThread ( new OnStartRunnable < > ( subscription ) ) ;
}
@Override
public void onError ( @NonNull final Throwable throwable ) {
Subscriber < T > subscription = getSubscriber ( ) ;
if ( ! mOnCompleteExecuted & & subscription ! = null ) {
mOnCompleteExecuted = true ;
executeOnObserverThread ( new OnErrorRunnable < > ( subscription , throwable ) ) ;
} else {
Log . e ( TAG , "onComplete already called" ) ;
throw new RuntimeException ( "onComplete already called" ) ;
}
}
subscriber . onStart ( ) ;
@Override
public void onNext ( final T item ) {
Subscriber < T > subscription = getSubscriber ( ) ;
if ( ! mOnCompleteExecuted & & subscription ! = null ) {
executeOnObserverThread ( new OnNextRunnable < > ( subscription , item ) ) ;
} else {
Log . e ( TAG , "onComplete has been already called, onNext should not be called" ) ;
throw new RuntimeException ( "onNext should not be called after onComplete has been called" ) ;
}
}
} ;
executeOnSubscriberThread ( new Runnable ( ) {
@Override
public void run ( ) {
mAction . onSubscribe ( onSubscribe ) ;
mAction . onSubscribe ( subscriber ) ;
}
} ) ;
return onSubscribe ;
return subscriber ;
}
private void executeOnObserverThread ( @NonNull Runnable runnable ) {
@ -187,55 +141,111 @@ public class Observable<T> {
@@ -187,55 +141,111 @@ public class Observable<T> {
}
}
private static class SubscriberImpl < T > implements Subscriber < T > {
@Nullable private OnSubscribe < T > mOnSubscribe ;
@NonNull private Observable < T > mObservable ;
private boolean mOnCompleteExecuted = false ;
public SubscriberImpl ( @NonNull OnSubscribe < T > onSubscribe , @NonNull Observable < T > observable ) {
mOnSubscribe = onSubscribe ;
mObservable = observable ;
}
@Override
public void unsubscribe ( ) {
mOnSubscribe = null ;
}
@Override
public void onComplete ( ) {
if ( ! mOnCompleteExecuted & & mOnSubscribe ! = null ) {
mOnCompleteExecuted = true ;
mObservable . executeOnObserverThread ( new OnCompleteRunnable < > ( mOnSubscribe ) ) ;
} else {
Log . e ( TAG , "onComplete called more than once" ) ;
throw new RuntimeException ( "onComplete called more than once" ) ;
}
}
@Override
public void onStart ( ) {
if ( mOnSubscribe ! = null ) {
mObservable . executeOnObserverThread ( new OnStartRunnable < > ( mOnSubscribe ) ) ;
}
}
@Override
public void onError ( @NonNull final Throwable throwable ) {
if ( ! mOnCompleteExecuted & & mOnSubscribe ! = null ) {
mOnCompleteExecuted = true ;
mObservable . executeOnObserverThread ( new OnErrorRunnable < > ( mOnSubscribe , throwable ) ) ;
} else {
Log . e ( TAG , "onComplete already called" ) ;
throw new RuntimeException ( "onComplete already called" ) ;
}
}
@Override
public void onNext ( final T item ) {
if ( ! mOnCompleteExecuted & & mOnSubscribe ! = null ) {
mObservable . executeOnObserverThread ( new OnNextRunnable < > ( mOnSubscribe , item ) ) ;
} else {
Log . e ( TAG , "onComplete has been already called, onNext should not be called" ) ;
throw new RuntimeException ( "onNext should not be called after onComplete has been called" ) ;
}
}
}
private static class OnCompleteRunnable < T > implements Runnable {
private final Subscriber < T > subscriber ;
private final On Subscribe< T > onSubscribe ;
public OnCompleteRunnable ( Subscriber < T > subscriber ) { this . subscriber = subscriber ; }
public OnCompleteRunnable ( On Subscribe< T > onSubscribe ) { this . onSubscribe = onSubscribe ; }
@Override
public void run ( ) {
subscriber . onComplete ( ) ;
onSubscribe . onComplete ( ) ;
}
}
private static class OnNextRunnable < T > implements Runnable {
private final Subscriber < T > subscriber ;
private final On Subscribe< T > onSubscribe ;
private final T item ;
public OnNextRunnable ( Subscriber < T > subscriber , T item ) {
this . subscriber = subscriber ;
public OnNextRunnable ( On Subscribe< T > onSubscribe , T item ) {
this . onSubscribe = onSubscribe ;
this . item = item ;
}
@Override
public void run ( ) {
subscriber . onNext ( item ) ;
onSubscribe . onNext ( item ) ;
}
}
private static class OnErrorRunnable < T > implements Runnable {
private final Subscriber < T > subscriber ;
private final On Subscribe< T > onSubscribe ;
private final Throwable throwable ;
public OnErrorRunnable ( Subscriber < T > subscriber , Throwable throwable ) {
this . subscriber = subscriber ;
public OnErrorRunnable ( On Subscribe< T > onSubscribe , Throwable throwable ) {
this . onSubscribe = onSubscribe ;
this . throwable = throwable ;
}
@Override
public void run ( ) {
subscriber . onError ( throwable ) ;
onSubscribe . onError ( throwable ) ;
}
}
private static class OnStartRunnable < T > implements Runnable {
private final Subscriber < T > subscriber ;
private final On Subscribe< T > onSubscribe ;
public OnStartRunnable ( Subscriber < T > subscriber ) { this . subscriber = subscriber ; }
public OnStartRunnable ( On Subscribe< T > onSubscribe ) { this . onSubscribe = onSubscribe ; }
@Override
public void run ( ) {
subscriber . onStart ( ) ;
onSubscribe . onStart ( ) ;
}
}
}