ParallelJoin$JoinSubscriptionDelayError.smali
.class final Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;
.super Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;
.source "ParallelJoin.java"
# annotations
.annotation system Ldalvik/annotation/EnclosingClass;
value = Lio/reactivex/internal/operators/parallel/ParallelJoin;
.end annotation
.annotation system Ldalvik/annotation/InnerClass;
accessFlags = 0x18
name = "JoinSubscriptionDelayError"
.end annotation
.annotation system Ldalvik/annotation/Signature;
value = {
"<T:",
"Ljava/lang/Object;",
">",
"Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase<",
"TT;>;"
}
.end annotation
# static fields
.field private static final serialVersionUID:J = -0x4fa158f1d44428dbL
# direct methods
.method static constructor <clinit>()V
.registers 1
return-void
.end method
.method constructor <init>(Lorg/reactivestreams/Subscriber;II)V
.registers 4
.param p2, "n" # I
.param p3, "prefetch" # I
.annotation system Ldalvik/annotation/Signature;
value = {
"(",
"Lorg/reactivestreams/Subscriber<",
"-TT;>;II)V"
}
.end annotation
.line 325
.local p0, "this":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError<TT;>;"
.local p1, "actual":Lorg/reactivestreams/Subscriber;, "Lorg/reactivestreams/Subscriber<-TT;>;"
invoke-direct {p0, p1, p2, p3}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionBase;-><init>(Lorg/reactivestreams/Subscriber;II)V
.line 326
return-void
.end method
# virtual methods
.method drain()V
.registers 2
.line 384
.local p0, "this":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError<TT;>;"
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->getAndIncrement()I
move-result v0
if-eqz v0, :cond_7
.line 385
return-void
.line 388
:cond_7
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->drainLoop()V
.line 389
return-void
.end method
.method drainLoop()V
.registers 20
.line 392
.local p0, "this":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError<TT;>;"
move-object/from16 v0, p0
const/4 v1, 0x1
.line 394
.local v1, "missed":I
iget-object v2, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->subscribers:[Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;
.line 395
.local v2, "s":[Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;, "[Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<TT;>;"
array-length v3, v2
.line 396
.local v3, "n":I
iget-object v4, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->actual:Lorg/reactivestreams/Subscriber;
.line 400
.local v4, "a":Lorg/reactivestreams/Subscriber;, "Lorg/reactivestreams/Subscriber<-TT;>;"
:goto_8
iget-object v5, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->requested:Ljava/util/concurrent/atomic/AtomicLong;
invoke-virtual {v5}, Ljava/util/concurrent/atomic/AtomicLong;->get()J
move-result-wide v5
.line 401
.local v5, "r":J
const-wide/16 v7, 0x0
.line 404
.local v7, "e":J
:goto_10
const/4 v10, 0x1
cmp-long v11, v7, v5
if-eqz v11, :cond_6d
.line 405
iget-boolean v11, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->cancelled:Z
if-eqz v11, :cond_1d
.line 406
invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->cleanup()V
.line 407
return-void
.line 410
:cond_1d
iget-object v11, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->done:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v11}, Ljava/util/concurrent/atomic/AtomicInteger;->get()I
move-result v11
if-nez v11, :cond_27
const/4 v11, 0x1
goto :goto_28
:cond_27
const/4 v11, 0x0
.line 412
.local v11, "d":Z
:goto_28
const/4 v12, 0x1
.line 414
.local v12, "empty":Z
const/4 v13, 0x0
.local v13, "i":I
:goto_2a
if-ge v13, v3, :cond_4d
.line 415
aget-object v14, v2, v13
.line 417
.local v14, "inner":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<TT;>;"
iget-object v15, v14, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
.line 418
.local v15, "q":Lio/reactivex/internal/fuseable/SimplePlainQueue;, "Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;"
if-eqz v15, :cond_4a
.line 419
invoke-interface {v15}, Lio/reactivex/internal/fuseable/SimplePlainQueue;->poll()Ljava/lang/Object;
move-result-object v9
.line 421
.local v9, "v":Ljava/lang/Object;, "TT;"
if-eqz v9, :cond_4a
.line 422
const/4 v12, 0x0
.line 423
invoke-interface {v4, v9}, Lorg/reactivestreams/Subscriber;->onNext(Ljava/lang/Object;)V
.line 424
invoke-virtual {v14}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->requestOne()V
.line 425
const-wide/16 v16, 0x1
add-long v16, v7, v16
move-wide/from16 v7, v16
cmp-long v18, v16, v5
if-nez v18, :cond_4a
.line 426
goto :goto_6d
.line 414
.end local v9 # "v":Ljava/lang/Object;, "TT;"
.end local v14 # "inner":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<TT;>;"
.end local v15 # "q":Lio/reactivex/internal/fuseable/SimplePlainQueue;, "Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;"
:cond_4a
add-int/lit8 v13, v13, 0x1
goto :goto_2a
.line 432
.end local v13 # "i":I
:cond_4d
if-eqz v11, :cond_69
if-eqz v12, :cond_69
.line 433
iget-object v9, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
invoke-virtual {v9}, Lio/reactivex/internal/util/AtomicThrowable;->get()Ljava/lang/Object;
move-result-object v9
check-cast v9, Ljava/lang/Throwable;
.line 434
.local v9, "ex":Ljava/lang/Throwable;
if-eqz v9, :cond_65
.line 435
iget-object v10, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
invoke-virtual {v10}, Lio/reactivex/internal/util/AtomicThrowable;->terminate()Ljava/lang/Throwable;
move-result-object v10
invoke-interface {v4, v10}, Lorg/reactivestreams/Subscriber;->onError(Ljava/lang/Throwable;)V
goto :goto_68
.line 437
:cond_65
invoke-interface {v4}, Lorg/reactivestreams/Subscriber;->onComplete()V
.line 439
:goto_68
return-void
.line 442
.end local v9 # "ex":Ljava/lang/Throwable;
:cond_69
if-eqz v12, :cond_6c
.line 443
goto :goto_6d
.line 445
.end local v11 # "d":Z
.end local v12 # "empty":Z
:cond_6c
goto :goto_10
.line 447
:cond_6d
:goto_6d
cmp-long v9, v7, v5
if-nez v9, :cond_b5
.line 448
iget-boolean v9, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->cancelled:Z
if-eqz v9, :cond_79
.line 449
invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->cleanup()V
.line 450
return-void
.line 453
:cond_79
iget-object v9, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->done:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v9}, Ljava/util/concurrent/atomic/AtomicInteger;->get()I
move-result v9
if-nez v9, :cond_83
const/4 v9, 0x1
goto :goto_84
:cond_83
const/4 v9, 0x0
.line 455
.local v9, "d":Z
:goto_84
const/4 v10, 0x1
.line 457
.local v10, "empty":Z
const/4 v11, 0x0
.local v11, "i":I
:goto_86
if-ge v11, v3, :cond_99
.line 458
aget-object v12, v2, v11
.line 460
.local v12, "inner":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<TT;>;"
iget-object v13, v12, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->queue:Lio/reactivex/internal/fuseable/SimplePlainQueue;
.line 461
.local v13, "q":Lio/reactivex/internal/fuseable/SimpleQueue;, "Lio/reactivex/internal/fuseable/SimpleQueue<TT;>;"
if-eqz v13, :cond_96
invoke-interface {v13}, Lio/reactivex/internal/fuseable/SimpleQueue;->isEmpty()Z
move-result v14
if-nez v14, :cond_96
.line 462
const/4 v10, 0x0
.line 463
goto :goto_99
.line 457
.end local v12 # "inner":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<TT;>;"
.end local v13 # "q":Lio/reactivex/internal/fuseable/SimpleQueue;, "Lio/reactivex/internal/fuseable/SimpleQueue<TT;>;"
:cond_96
add-int/lit8 v11, v11, 0x1
goto :goto_86
.line 467
.end local v11 # "i":I
:cond_99
:goto_99
if-eqz v9, :cond_b5
if-eqz v10, :cond_b5
.line 468
iget-object v11, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
invoke-virtual {v11}, Lio/reactivex/internal/util/AtomicThrowable;->get()Ljava/lang/Object;
move-result-object v11
check-cast v11, Ljava/lang/Throwable;
.line 469
.local v11, "ex":Ljava/lang/Throwable;
if-eqz v11, :cond_b1
.line 470
iget-object v12, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
invoke-virtual {v12}, Lio/reactivex/internal/util/AtomicThrowable;->terminate()Ljava/lang/Throwable;
move-result-object v12
invoke-interface {v4, v12}, Lorg/reactivestreams/Subscriber;->onError(Ljava/lang/Throwable;)V
goto :goto_b4
.line 472
:cond_b1
invoke-interface {v4}, Lorg/reactivestreams/Subscriber;->onComplete()V
.line 474
:goto_b4
return-void
.line 478
.end local v9 # "d":Z
.end local v10 # "empty":Z
.end local v11 # "ex":Ljava/lang/Throwable;
:cond_b5
const-wide/16 v9, 0x0
cmp-long v11, v7, v9
if-eqz v11, :cond_ca
const-wide v9, 0x7fffffffffffffffL
cmp-long v11, v5, v9
if-eqz v11, :cond_ca
.line 479
iget-object v9, v0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->requested:Ljava/util/concurrent/atomic/AtomicLong;
neg-long v10, v7
invoke-virtual {v9, v10, v11}, Ljava/util/concurrent/atomic/AtomicLong;->addAndGet(J)J
.line 482
:cond_ca
invoke-virtual/range {p0 .. p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->get()I
move-result v9
.line 483
.local v9, "w":I
if-ne v9, v1, :cond_d9
.line 484
neg-int v10, v1
invoke-virtual {v0, v10}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->addAndGet(I)I
move-result v1
.line 485
if-nez v1, :cond_da
.line 486
nop
.line 492
.end local v5 # "r":J
.end local v7 # "e":J
.end local v9 # "w":I
return-void
.line 489
.restart local v5 # "r":J
.restart local v7 # "e":J
.restart local v9 # "w":I
:cond_d9
move v1, v9
.line 491
.end local v5 # "r":J
.end local v7 # "e":J
.end local v9 # "w":I
:cond_da
goto/16 :goto_8
.end method
.method onComplete()V
.registers 2
.line 378
.local p0, "this":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError<TT;>;"
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->done:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicInteger;->decrementAndGet()I
.line 379
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->drain()V
.line 380
return-void
.end method
.method onError(Ljava/lang/Throwable;)V
.registers 3
.param p1, "e" # Ljava/lang/Throwable;
.line 371
.local p0, "this":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError<TT;>;"
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
invoke-virtual {v0, p1}, Lio/reactivex/internal/util/AtomicThrowable;->addThrowable(Ljava/lang/Throwable;)Z
.line 372
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->done:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicInteger;->decrementAndGet()I
.line 373
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->drain()V
.line 374
return-void
.end method
.method onNext(Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;Ljava/lang/Object;)V
.registers 9
.annotation system Ldalvik/annotation/Signature;
value = {
"(",
"Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<",
"TT;>;TT;)V"
}
.end annotation
.line 330
.local p0, "this":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError<TT;>;"
.local p1, "inner":Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;, "Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber<TT;>;"
.local p2, "value":Ljava/lang/Object;, "TT;"
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->get()I
move-result v0
const-string v1, "Queue full?!"
if-nez v0, :cond_62
const/4 v0, 0x0
const/4 v2, 0x1
invoke-virtual {p0, v0, v2}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->compareAndSet(II)Z
move-result v0
if-eqz v0, :cond_62
.line 331
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->requested:Ljava/util/concurrent/atomic/AtomicLong;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicLong;->get()J
move-result-wide v2
const-wide/16 v4, 0x0
cmp-long v0, v2, v4
if-eqz v0, :cond_3b
.line 332
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->actual:Lorg/reactivestreams/Subscriber;
invoke-interface {v0, p2}, Lorg/reactivestreams/Subscriber;->onNext(Ljava/lang/Object;)V
.line 333
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->requested:Ljava/util/concurrent/atomic/AtomicLong;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicLong;->get()J
move-result-wide v0
const-wide v2, 0x7fffffffffffffffL
cmp-long v4, v0, v2
if-eqz v4, :cond_35
.line 334
iget-object v0, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->requested:Ljava/util/concurrent/atomic/AtomicLong;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicLong;->decrementAndGet()J
.line 336
:cond_35
const-wide/16 v0, 0x1
invoke-virtual {p1, v0, v1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->request(J)V
goto :goto_5b
.line 338
:cond_3b
invoke-virtual {p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->getQueue()Lio/reactivex/internal/fuseable/SimplePlainQueue;
move-result-object v0
.line 340
.local v0, "q":Lio/reactivex/internal/fuseable/SimplePlainQueue;, "Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;"
invoke-interface {v0, p2}, Lio/reactivex/internal/fuseable/SimplePlainQueue;->offer(Ljava/lang/Object;)Z
move-result v2
if-nez v2, :cond_5b
.line 341
invoke-virtual {p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->cancel()Z
.line 342
iget-object v2, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
new-instance v3, Lio/reactivex/exceptions/MissingBackpressureException;
invoke-direct {v3, v1}, Lio/reactivex/exceptions/MissingBackpressureException;-><init>(Ljava/lang/String;)V
invoke-virtual {v2, v3}, Lio/reactivex/internal/util/AtomicThrowable;->addThrowable(Ljava/lang/Throwable;)Z
.line 343
iget-object v1, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->done:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v1}, Ljava/util/concurrent/atomic/AtomicInteger;->decrementAndGet()I
.line 344
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->drainLoop()V
.line 345
return-void
.line 348
.end local v0 # "q":Lio/reactivex/internal/fuseable/SimplePlainQueue;, "Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;"
:cond_5b
:goto_5b
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->decrementAndGet()I
move-result v0
if-nez v0, :cond_88
.line 349
return-void
.line 352
:cond_62
invoke-virtual {p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->getQueue()Lio/reactivex/internal/fuseable/SimplePlainQueue;
move-result-object v0
.line 354
.restart local v0 # "q":Lio/reactivex/internal/fuseable/SimplePlainQueue;, "Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;"
invoke-interface {v0, p2}, Lio/reactivex/internal/fuseable/SimplePlainQueue;->offer(Ljava/lang/Object;)Z
move-result v2
if-nez v2, :cond_81
.line 355
invoke-virtual {p1}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinInnerSubscriber;->cancel()Z
move-result v2
if-eqz v2, :cond_81
.line 356
iget-object v2, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->errors:Lio/reactivex/internal/util/AtomicThrowable;
new-instance v3, Lio/reactivex/exceptions/MissingBackpressureException;
invoke-direct {v3, v1}, Lio/reactivex/exceptions/MissingBackpressureException;-><init>(Ljava/lang/String;)V
invoke-virtual {v2, v3}, Lio/reactivex/internal/util/AtomicThrowable;->addThrowable(Ljava/lang/Throwable;)Z
.line 357
iget-object v1, p0, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->done:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v1}, Ljava/util/concurrent/atomic/AtomicInteger;->decrementAndGet()I
.line 361
:cond_81
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->getAndIncrement()I
move-result v1
if-eqz v1, :cond_88
.line 362
return-void
.line 366
.end local v0 # "q":Lio/reactivex/internal/fuseable/SimplePlainQueue;, "Lio/reactivex/internal/fuseable/SimplePlainQueue<TT;>;"
:cond_88
invoke-virtual {p0}, Lio/reactivex/internal/operators/parallel/ParallelJoin$JoinSubscriptionDelayError;->drainLoop()V
.line 367
return-void
.end method