|
|
@ -146,6 +146,7 @@ public class Observable<T> { |
|
|
|
@Nullable private volatile OnSubscribe<T> mOnSubscribe; |
|
|
|
@Nullable private volatile OnSubscribe<T> mOnSubscribe; |
|
|
|
@NonNull private final Observable<T> mObservable; |
|
|
|
@NonNull private final Observable<T> mObservable; |
|
|
|
private boolean mOnCompleteExecuted = false; |
|
|
|
private boolean mOnCompleteExecuted = false; |
|
|
|
|
|
|
|
private boolean mOnError = false; |
|
|
|
|
|
|
|
|
|
|
|
public SubscriberImpl(@NonNull OnSubscribe<T> onSubscribe, @NonNull Observable<T> observable) { |
|
|
|
public SubscriberImpl(@NonNull OnSubscribe<T> onSubscribe, @NonNull Observable<T> observable) { |
|
|
|
mOnSubscribe = onSubscribe; |
|
|
|
mOnSubscribe = onSubscribe; |
|
|
@ -160,10 +161,10 @@ public class Observable<T> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onComplete() { |
|
|
|
public void onComplete() { |
|
|
|
OnSubscribe<T> onSubscribe = mOnSubscribe; |
|
|
|
OnSubscribe<T> onSubscribe = mOnSubscribe; |
|
|
|
if (!mOnCompleteExecuted && onSubscribe != null) { |
|
|
|
if (!mOnCompleteExecuted && onSubscribe != null && !mOnError) { |
|
|
|
mOnCompleteExecuted = true; |
|
|
|
mOnCompleteExecuted = true; |
|
|
|
mObservable.executeOnObserverThread(new OnCompleteRunnable<>(onSubscribe)); |
|
|
|
mObservable.executeOnObserverThread(new OnCompleteRunnable<>(onSubscribe)); |
|
|
|
} else { |
|
|
|
} else if (!mOnError) { |
|
|
|
Log.e(TAG, "onComplete called more than once"); |
|
|
|
Log.e(TAG, "onComplete called more than once"); |
|
|
|
throw new RuntimeException("onComplete called more than once"); |
|
|
|
throw new RuntimeException("onComplete called more than once"); |
|
|
|
} |
|
|
|
} |
|
|
@ -181,7 +182,7 @@ public class Observable<T> { |
|
|
|
public void onError(@NonNull final Throwable throwable) { |
|
|
|
public void onError(@NonNull final Throwable throwable) { |
|
|
|
OnSubscribe<T> onSubscribe = mOnSubscribe; |
|
|
|
OnSubscribe<T> onSubscribe = mOnSubscribe; |
|
|
|
if (onSubscribe != null) { |
|
|
|
if (onSubscribe != null) { |
|
|
|
mOnCompleteExecuted = true; |
|
|
|
mOnError = true; |
|
|
|
mObservable.executeOnObserverThread(new OnErrorRunnable<>(onSubscribe, throwable)); |
|
|
|
mObservable.executeOnObserverThread(new OnErrorRunnable<>(onSubscribe, throwable)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|