FlowableObserveOn$BaseObserveOnSubscriber.smali
.class abstract Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;
.super Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
.source "FlowableObserveOn.java"
# interfaces
.implements Lio/reactivex/FlowableSubscriber;
.implements Ljava/lang/Runnable;
# annotations
.annotation system Ldalvik/annotation/EnclosingClass;
value = Lio/reactivex/internal/operators/flowable/FlowableObserveOn;
.end annotation
.annotation system Ldalvik/annotation/InnerClass;
accessFlags = 0x408
name = "BaseObserveOnSubscriber"
.end annotation
.annotation system Ldalvik/annotation/Signature;
value = {
"<T:",
"Ljava/lang/Object;",
">",
"Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription<",
"TT;>;",
"Lio/reactivex/FlowableSubscriber<",
"TT;>;",
"Ljava/lang/Runnable;"
}
.end annotation
# static fields
.field private static final serialVersionUID:J = -0x725dec0716520049L
# instance fields
.field volatile cancelled:Z
.field final delayError:Z
.field volatile done:Z
.field error:Ljava/lang/Throwable;
.field final limit:I
.field outputFused:Z
.field final prefetch:I
.field produced:J
.field queue:Lio/reactivex/internal/fuseable/SimpleQueue;
.annotation system Ldalvik/annotation/Signature;
value = {
"Lio/reactivex/internal/fuseable/SimpleQueue<",
"TT;>;"
}
.end annotation
.end field
.field final requested:Ljava/util/concurrent/atomic/AtomicLong;
.field s:Lorg/reactivestreams/Subscription;
.field sourceMode:I
.field final worker:Lio/reactivex/Scheduler$Worker;
# direct methods
.method static constructor <clinit>()V
.registers 1
return-void
.end method
.method constructor <init>(Lio/reactivex/Scheduler$Worker;ZI)V
.registers 5
.param p1, "worker" # Lio/reactivex/Scheduler$Worker;
.param p2, "delayError" # Z
.param p3, "prefetch" # I
.line 94
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
invoke-direct {p0}, Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;-><init>()V
.line 95
iput-object p1, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->worker:Lio/reactivex/Scheduler$Worker;
.line 96
iput-boolean p2, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->delayError:Z
.line 97
iput p3, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->prefetch:I
.line 98
new-instance v0, Ljava/util/concurrent/atomic/AtomicLong;
invoke-direct {v0}, Ljava/util/concurrent/atomic/AtomicLong;-><init>()V
iput-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->requested:Ljava/util/concurrent/atomic/AtomicLong;
.line 99
shr-int/lit8 v0, p3, 0x2
sub-int v0, p3, v0
iput v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->limit:I
.line 100
return-void
.end method
# virtual methods
.method public final cancel()V
.registers 2
.line 149
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->cancelled:Z
if-eqz v0, :cond_5
.line 150
return-void
.line 153
:cond_5
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->cancelled:Z
.line 154
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->s:Lorg/reactivestreams/Subscription;
invoke-interface {v0}, Lorg/reactivestreams/Subscription;->cancel()V
.line 155
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->worker:Lio/reactivex/Scheduler$Worker;
invoke-virtual {v0}, Lio/reactivex/Scheduler$Worker;->dispose()V
.line 157
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->getAndIncrement()I
move-result v0
if-nez v0, :cond_1d
.line 158
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->queue:Lio/reactivex/internal/fuseable/SimpleQueue;
invoke-interface {v0}, Lio/reactivex/internal/fuseable/SimpleQueue;->clear()V
.line 160
:cond_1d
return-void
.end method
.method final checkTerminated(ZZLorg/reactivestreams/Subscriber;)Z
.registers 7
.param p1, "d" # Z
.param p2, "empty" # Z
.annotation system Ldalvik/annotation/Signature;
value = {
"(ZZ",
"Lorg/reactivestreams/Subscriber<",
"*>;)Z"
}
.end annotation
.line 187
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
.local p3, "a":Lorg/reactivestreams/Subscriber;, "Lorg/reactivestreams/Subscriber<*>;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->cancelled:Z
const/4 v1, 0x1
if-eqz v0, :cond_9
.line 188
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->clear()V
.line 189
return v1
.line 191
:cond_9
if-eqz p1, :cond_3d
.line 192
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->delayError:Z
if-eqz v0, :cond_22
.line 193
if-eqz p2, :cond_3d
.line 194
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->error:Ljava/lang/Throwable;
.line 195
.local v0, "e":Ljava/lang/Throwable;
if-eqz v0, :cond_19
.line 196
invoke-interface {p3, v0}, Lorg/reactivestreams/Subscriber;->onError(Ljava/lang/Throwable;)V
goto :goto_1c
.line 198
:cond_19
invoke-interface {p3}, Lorg/reactivestreams/Subscriber;->onComplete()V
.line 200
:goto_1c
iget-object v2, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->worker:Lio/reactivex/Scheduler$Worker;
invoke-virtual {v2}, Lio/reactivex/Scheduler$Worker;->dispose()V
.line 201
return v1
.line 204
.end local v0 # "e":Ljava/lang/Throwable;
:cond_22
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->error:Ljava/lang/Throwable;
.line 205
.restart local v0 # "e":Ljava/lang/Throwable;
if-eqz v0, :cond_32
.line 206
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->clear()V
.line 207
invoke-interface {p3, v0}, Lorg/reactivestreams/Subscriber;->onError(Ljava/lang/Throwable;)V
.line 208
iget-object v2, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->worker:Lio/reactivex/Scheduler$Worker;
invoke-virtual {v2}, Lio/reactivex/Scheduler$Worker;->dispose()V
.line 209
return v1
.line 211
:cond_32
if-eqz p2, :cond_3d
.line 212
invoke-interface {p3}, Lorg/reactivestreams/Subscriber;->onComplete()V
.line 213
iget-object v2, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->worker:Lio/reactivex/Scheduler$Worker;
invoke-virtual {v2}, Lio/reactivex/Scheduler$Worker;->dispose()V
.line 214
return v1
.line 219
.end local v0 # "e":Ljava/lang/Throwable;
:cond_3d
const/4 v0, 0x0
return v0
.end method
.method public final clear()V
.registers 2
.line 233
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->queue:Lio/reactivex/internal/fuseable/SimpleQueue;
invoke-interface {v0}, Lio/reactivex/internal/fuseable/SimpleQueue;->clear()V
.line 234
return-void
.end method
.method public final isEmpty()Z
.registers 2
.line 238
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->queue:Lio/reactivex/internal/fuseable/SimpleQueue;
invoke-interface {v0}, Lio/reactivex/internal/fuseable/SimpleQueue;->isEmpty()Z
move-result v0
return v0
.end method
.method public final onComplete()V
.registers 2
.line 133
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->done:Z
if-nez v0, :cond_a
.line 134
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->done:Z
.line 135
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->trySchedule()V
.line 137
:cond_a
return-void
.end method
.method public final onError(Ljava/lang/Throwable;)V
.registers 3
.param p1, "t" # Ljava/lang/Throwable;
.line 122
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->done:Z
if-eqz v0, :cond_8
.line 123
invoke-static {p1}, Lio/reactivex/plugins/RxJavaPlugins;->onError(Ljava/lang/Throwable;)V
.line 124
return-void
.line 126
:cond_8
iput-object p1, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->error:Ljava/lang/Throwable;
.line 127
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->done:Z
.line 128
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->trySchedule()V
.line 129
return-void
.end method
.method public final onNext(Ljava/lang/Object;)V
.registers 4
.annotation system Ldalvik/annotation/Signature;
value = {
"(TT;)V"
}
.end annotation
.line 104
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
.local p1, "t":Ljava/lang/Object;, "TT;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->done:Z
if-eqz v0, :cond_5
.line 105
return-void
.line 107
:cond_5
iget v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->sourceMode:I
const/4 v1, 0x2
if-ne v0, v1, :cond_e
.line 108
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->trySchedule()V
.line 109
return-void
.line 111
:cond_e
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->queue:Lio/reactivex/internal/fuseable/SimpleQueue;
invoke-interface {v0, p1}, Lio/reactivex/internal/fuseable/SimpleQueue;->offer(Ljava/lang/Object;)Z
move-result v0
if-nez v0, :cond_27
.line 112
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->s:Lorg/reactivestreams/Subscription;
invoke-interface {v0}, Lorg/reactivestreams/Subscription;->cancel()V
.line 114
new-instance v0, Lio/reactivex/exceptions/MissingBackpressureException;
const-string v1, "Queue is full?!"
invoke-direct {v0, v1}, Lio/reactivex/exceptions/MissingBackpressureException;-><init>(Ljava/lang/String;)V
iput-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->error:Ljava/lang/Throwable;
.line 115
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->done:Z
.line 117
:cond_27
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->trySchedule()V
.line 118
return-void
.end method
.method public final request(J)V
.registers 4
.param p1, "n" # J
.line 141
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
invoke-static {p1, p2}, Lio/reactivex/internal/subscriptions/SubscriptionHelper;->validate(J)Z
move-result v0
if-eqz v0, :cond_e
.line 142
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->requested:Ljava/util/concurrent/atomic/AtomicLong;
invoke-static {v0, p1, p2}, Lio/reactivex/internal/util/BackpressureHelper;->add(Ljava/util/concurrent/atomic/AtomicLong;J)J
.line 143
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->trySchedule()V
.line 145
:cond_e
return-void
.end method
.method public final requestFusion(I)I
.registers 3
.param p1, "requestedMode" # I
.line 224
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
and-int/lit8 v0, p1, 0x2
if-eqz v0, :cond_9
.line 225
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->outputFused:Z
.line 226
const/4 v0, 0x2
return v0
.line 228
:cond_9
const/4 v0, 0x0
return v0
.end method
.method public final run()V
.registers 3
.line 171
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->outputFused:Z
if-eqz v0, :cond_8
.line 172
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->runBackfused()V
goto :goto_14
.line 173
:cond_8
iget v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->sourceMode:I
const/4 v1, 0x1
if-ne v0, v1, :cond_11
.line 174
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->runSync()V
goto :goto_14
.line 176
:cond_11
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->runAsync()V
.line 178
:goto_14
return-void
.end method
.method abstract runAsync()V
.end method
.method abstract runBackfused()V
.end method
.method abstract runSync()V
.end method
.method final trySchedule()V
.registers 2
.line 163
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;, "Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber<TT;>;"
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->getAndIncrement()I
move-result v0
if-eqz v0, :cond_7
.line 164
return-void
.line 166
:cond_7
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableObserveOn$BaseObserveOnSubscriber;->worker:Lio/reactivex/Scheduler$Worker;
invoke-virtual {v0, p0}, Lio/reactivex/Scheduler$Worker;->schedule(Ljava/lang/Runnable;)Lio/reactivex/disposables/Disposable;
.line 167
return-void
.end method