|
|
@ -21,12 +21,12 @@ public class Observable<T> { |
|
|
|
|
|
|
|
|
|
|
|
private static final String TAG = Observable.class.getSimpleName(); |
|
|
|
private static final String TAG = Observable.class.getSimpleName(); |
|
|
|
|
|
|
|
|
|
|
|
@NonNull private Action<T> mAction; |
|
|
|
@NonNull private final Action<T> mAction; |
|
|
|
@Nullable private Executor mSubscriber; |
|
|
|
@Nullable private Executor mSubscriber; |
|
|
|
@Nullable private Executor mObserver; |
|
|
|
@Nullable private Executor mObserver; |
|
|
|
@NonNull private final Executor mDefault; |
|
|
|
@NonNull private final Executor mDefault; |
|
|
|
|
|
|
|
|
|
|
|
public Observable(@NonNull Action<T> action) { |
|
|
|
private Observable(@NonNull Action<T> action) { |
|
|
|
mAction = action; |
|
|
|
mAction = action; |
|
|
|
Looper looper = Looper.myLooper(); |
|
|
|
Looper looper = Looper.myLooper(); |
|
|
|
Preconditions.checkNonNull(looper); |
|
|
|
Preconditions.checkNonNull(looper); |
|
|
@ -111,7 +111,7 @@ public class Observable<T> { |
|
|
|
public void onComplete() { |
|
|
|
public void onComplete() { |
|
|
|
if (!mOnCompleteExecuted) { |
|
|
|
if (!mOnCompleteExecuted) { |
|
|
|
mOnCompleteExecuted = true; |
|
|
|
mOnCompleteExecuted = true; |
|
|
|
executeOnObserverThread(new OnCompleteRunnable(subscription)); |
|
|
|
executeOnObserverThread(new OnCompleteRunnable<>(subscription)); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
Log.e(TAG, "onComplete called more than once"); |
|
|
|
Log.e(TAG, "onComplete called more than once"); |
|
|
|
throw new RuntimeException("onComplete called more than once"); |
|
|
|
throw new RuntimeException("onComplete called more than once"); |
|
|
@ -121,7 +121,7 @@ public class Observable<T> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onNext(final T item) { |
|
|
|
public void onNext(final T item) { |
|
|
|
if (!mOnCompleteExecuted) { |
|
|
|
if (!mOnCompleteExecuted) { |
|
|
|
executeOnObserverThread(new OnNextRunnable(subscription, item)); |
|
|
|
executeOnObserverThread(new OnNextRunnable<>(subscription, item)); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
Log.e(TAG, "onComplete has been already called, onNext should not be called"); |
|
|
|
Log.e(TAG, "onComplete has been already called, onNext should not be called"); |
|
|
|
throw new RuntimeException("onNext should not be called after onComplete has been called"); |
|
|
|
throw new RuntimeException("onNext should not be called after onComplete has been called"); |
|
|
@ -149,10 +149,10 @@ public class Observable<T> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static class OnCompleteRunnable implements Runnable { |
|
|
|
private static class OnCompleteRunnable<T> implements Runnable { |
|
|
|
private final Subscription subscription; |
|
|
|
private final Subscription<T> subscription; |
|
|
|
|
|
|
|
|
|
|
|
public OnCompleteRunnable(Subscription subscription) {this.subscription = subscription;} |
|
|
|
public OnCompleteRunnable(Subscription<T> subscription) {this.subscription = subscription;} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
public void run() { |
|
|
@ -160,7 +160,7 @@ public class Observable<T> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private class OnNextRunnable implements Runnable { |
|
|
|
private static class OnNextRunnable<T> implements Runnable { |
|
|
|
private final Subscription<T> subscription; |
|
|
|
private final Subscription<T> subscription; |
|
|
|
private final T item; |
|
|
|
private final T item; |
|
|
|
|
|
|
|
|
|
|
|