|
|
|
@ -84,6 +84,9 @@ public class Observable<T> {
@@ -84,6 +84,9 @@ public class Observable<T> {
|
|
|
|
|
@Override |
|
|
|
|
public void onComplete() {} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onError(@NonNull Throwable throwable) {} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onNext(T item) {} |
|
|
|
|
}); |
|
|
|
@ -106,6 +109,7 @@ public class Observable<T> {
@@ -106,6 +109,7 @@ public class Observable<T> {
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
executeOnObserverThread(new OnStartRunnable<>(subscription)); |
|
|
|
|
mAction.onSubscribe(new Subscriber<T>() { |
|
|
|
|
@Override |
|
|
|
|
public void onComplete() { |
|
|
|
@ -118,6 +122,17 @@ public class Observable<T> {
@@ -118,6 +122,17 @@ public class Observable<T> {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onError(@NonNull final Throwable throwable) { |
|
|
|
|
if (!mOnCompleteExecuted) { |
|
|
|
|
mOnCompleteExecuted = true; |
|
|
|
|
executeOnObserverThread(new OnErrorRunnable<>(subscription, throwable)); |
|
|
|
|
} else { |
|
|
|
|
Log.e(TAG, "onComplete already called"); |
|
|
|
|
throw new RuntimeException("onComplete already called"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void onNext(final T item) { |
|
|
|
|
if (!mOnCompleteExecuted) { |
|
|
|
@ -174,5 +189,31 @@ public class Observable<T> {
@@ -174,5 +189,31 @@ public class Observable<T> {
|
|
|
|
|
subscription.onNext(item); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static class OnErrorRunnable<T> implements Runnable { |
|
|
|
|
private final Subscription<T> subscription; |
|
|
|
|
private final Throwable throwable; |
|
|
|
|
|
|
|
|
|
public OnErrorRunnable(Subscription<T> subscription, Throwable throwable) { |
|
|
|
|
this.subscription = subscription; |
|
|
|
|
this.throwable = throwable; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
subscription.onError(throwable); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static class OnStartRunnable<T> implements Runnable { |
|
|
|
|
private final Subscription<T> subscription; |
|
|
|
|
|
|
|
|
|
public OnStartRunnable(Subscription<T> subscription) {this.subscription = subscription;} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
subscription.onStart(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|