FlowableCreate$BufferAsyncEmitter.smali
.class final Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;
.super Lio/reactivex/internal/operators/flowable/FlowableCreate$BaseEmitter;
.source "FlowableCreate.java"
# annotations
.annotation system Ldalvik/annotation/EnclosingClass;
value = Lio/reactivex/internal/operators/flowable/FlowableCreate;
.end annotation
.annotation system Ldalvik/annotation/InnerClass;
accessFlags = 0x18
name = "BufferAsyncEmitter"
.end annotation
.annotation system Ldalvik/annotation/Signature;
value = {
"<T:",
"Ljava/lang/Object;",
">",
"Lio/reactivex/internal/operators/flowable/FlowableCreate$BaseEmitter<",
"TT;>;"
}
.end annotation
# static fields
.field private static final serialVersionUID:J = 0x21aef8f9f7f1cbc3L
# instance fields
.field volatile done:Z
.field error:Ljava/lang/Throwable;
.field final queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
.annotation system Ldalvik/annotation/Signature;
value = {
"Lio/reactivex/internal/queue/SpscLinkedArrayQueue<",
"TT;>;"
}
.end annotation
.end field
.field final wip:Ljava/util/concurrent/atomic/AtomicInteger;
# direct methods
.method static constructor <clinit>()V
.registers 1
return-void
.end method
.method constructor <init>(Lorg/reactivestreams/Subscriber;I)V
.registers 4
.param p2, "capacityHint" # I
.annotation system Ldalvik/annotation/Signature;
value = {
"(",
"Lorg/reactivestreams/Subscriber<",
"-TT;>;I)V"
}
.end annotation
.line 450
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
.local p1, "actual":Lorg/reactivestreams/Subscriber;, "Lorg/reactivestreams/Subscriber<-TT;>;"
invoke-direct {p0, p1}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BaseEmitter;-><init>(Lorg/reactivestreams/Subscriber;)V
.line 451
new-instance v0, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
invoke-direct {v0, p2}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;-><init>(I)V
iput-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
.line 452
new-instance v0, Ljava/util/concurrent/atomic/AtomicInteger;
invoke-direct {v0}, Ljava/util/concurrent/atomic/AtomicInteger;-><init>()V
iput-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->wip:Ljava/util/concurrent/atomic/AtomicInteger;
.line 453
return-void
.end method
# virtual methods
.method drain()V
.registers 13
.line 504
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->wip:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicInteger;->getAndIncrement()I
move-result v0
if-eqz v0, :cond_9
.line 505
return-void
.line 508
:cond_9
const/4 v0, 0x1
.line 509
.local v0, "missed":I
iget-object v1, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->actual:Lorg/reactivestreams/Subscriber;
.line 510
.local v1, "a":Lorg/reactivestreams/Subscriber;, "Lorg/reactivestreams/Subscriber<-TT;>;"
iget-object v2, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
.line 513
.local v2, "q":Lio/reactivex/internal/queue/SpscLinkedArrayQueue;, "Lio/reactivex/internal/queue/SpscLinkedArrayQueue<TT;>;"
:goto_e
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->get()J
move-result-wide v3
.line 514
.local v3, "r":J
const-wide/16 v5, 0x0
.line 516
.local v5, "e":J
:goto_14
cmp-long v7, v5, v3
if-eqz v7, :cond_47
.line 517
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->isCancelled()Z
move-result v7
if-eqz v7, :cond_22
.line 518
invoke-virtual {v2}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->clear()V
.line 519
return-void
.line 522
:cond_22
iget-boolean v7, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->done:Z
.line 524
.local v7, "d":Z
invoke-virtual {v2}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->poll()Ljava/lang/Object;
move-result-object v8
.line 526
.local v8, "o":Ljava/lang/Object;, "TT;"
if-nez v8, :cond_2c
const/4 v9, 0x1
goto :goto_2d
:cond_2c
const/4 v9, 0x0
.line 528
.local v9, "empty":Z
:goto_2d
if-eqz v7, :cond_3d
if-eqz v9, :cond_3d
.line 529
iget-object v10, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->error:Ljava/lang/Throwable;
.line 530
.local v10, "ex":Ljava/lang/Throwable;
if-eqz v10, :cond_39
.line 531
invoke-virtual {p0, v10}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->error(Ljava/lang/Throwable;)Z
goto :goto_3c
.line 533
:cond_39
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->complete()V
.line 535
:goto_3c
return-void
.line 538
.end local v10 # "ex":Ljava/lang/Throwable;
:cond_3d
if-eqz v9, :cond_40
.line 539
goto :goto_47
.line 542
:cond_40
invoke-interface {v1, v8}, Lorg/reactivestreams/Subscriber;->onNext(Ljava/lang/Object;)V
.line 544
const-wide/16 v10, 0x1
add-long/2addr v5, v10
.line 545
.end local v7 # "d":Z
.end local v8 # "o":Ljava/lang/Object;, "TT;"
.end local v9 # "empty":Z
goto :goto_14
.line 547
:cond_47
:goto_47
cmp-long v7, v5, v3
if-nez v7, :cond_6b
.line 548
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->isCancelled()Z
move-result v7
if-eqz v7, :cond_55
.line 549
invoke-virtual {v2}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->clear()V
.line 550
return-void
.line 553
:cond_55
iget-boolean v7, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->done:Z
.line 555
.restart local v7 # "d":Z
invoke-virtual {v2}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->isEmpty()Z
move-result v8
.line 557
.local v8, "empty":Z
if-eqz v7, :cond_6b
if-eqz v8, :cond_6b
.line 558
iget-object v9, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->error:Ljava/lang/Throwable;
.line 559
.local v9, "ex":Ljava/lang/Throwable;
if-eqz v9, :cond_67
.line 560
invoke-virtual {p0, v9}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->error(Ljava/lang/Throwable;)Z
goto :goto_6a
.line 562
:cond_67
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->complete()V
.line 564
:goto_6a
return-void
.line 568
.end local v7 # "d":Z
.end local v8 # "empty":Z
.end local v9 # "ex":Ljava/lang/Throwable;
:cond_6b
const-wide/16 v7, 0x0
cmp-long v9, v5, v7
if-eqz v9, :cond_74
.line 569
invoke-static {p0, v5, v6}, Lio/reactivex/internal/util/BackpressureHelper;->produced(Ljava/util/concurrent/atomic/AtomicLong;J)J
.line 572
:cond_74
iget-object v7, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->wip:Ljava/util/concurrent/atomic/AtomicInteger;
neg-int v8, v0
invoke-virtual {v7, v8}, Ljava/util/concurrent/atomic/AtomicInteger;->addAndGet(I)I
move-result v0
.line 573
if-nez v0, :cond_7f
.line 574
nop
.line 577
.end local v3 # "r":J
.end local v5 # "e":J
return-void
.line 576
:cond_7f
goto :goto_e
.end method
.method public onComplete()V
.registers 2
.line 487
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->done:Z
.line 488
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->drain()V
.line 489
return-void
.end method
.method public onNext(Ljava/lang/Object;)V
.registers 4
.annotation system Ldalvik/annotation/Signature;
value = {
"(TT;)V"
}
.end annotation
.line 457
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
.local p1, "t":Ljava/lang/Object;, "TT;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->done:Z
if-nez v0, :cond_21
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->isCancelled()Z
move-result v0
if-eqz v0, :cond_b
goto :goto_21
.line 461
:cond_b
if-nez p1, :cond_18
.line 462
new-instance v0, Ljava/lang/NullPointerException;
const-string v1, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."
invoke-direct {v0, v1}, Ljava/lang/NullPointerException;-><init>(Ljava/lang/String;)V
invoke-virtual {p0, v0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->onError(Ljava/lang/Throwable;)V
.line 463
return-void
.line 465
:cond_18
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
invoke-virtual {v0, p1}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->offer(Ljava/lang/Object;)Z
.line 466
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->drain()V
.line 467
return-void
.line 458
:cond_21
:goto_21
return-void
.end method
.method onRequested()V
.registers 1
.line 493
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->drain()V
.line 494
return-void
.end method
.method onUnsubscribed()V
.registers 2
.line 498
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->wip:Ljava/util/concurrent/atomic/AtomicInteger;
invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicInteger;->getAndIncrement()I
move-result v0
if-nez v0, :cond_d
.line 499
iget-object v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
invoke-virtual {v0}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->clear()V
.line 501
:cond_d
return-void
.end method
.method public tryOnError(Ljava/lang/Throwable;)Z
.registers 4
.param p1, "e" # Ljava/lang/Throwable;
.line 471
.local p0, "this":Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;, "Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter<TT;>;"
iget-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->done:Z
if-nez v0, :cond_1e
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->isCancelled()Z
move-result v0
if-eqz v0, :cond_b
goto :goto_1e
.line 475
:cond_b
if-nez p1, :cond_15
.line 476
new-instance v0, Ljava/lang/NullPointerException;
const-string v1, "onError called with null. Null values are generally not allowed in 2.x operators and sources."
invoke-direct {v0, v1}, Ljava/lang/NullPointerException;-><init>(Ljava/lang/String;)V
move-object p1, v0
.line 479
:cond_15
iput-object p1, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->error:Ljava/lang/Throwable;
.line 480
const/4 v0, 0x1
iput-boolean v0, p0, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->done:Z
.line 481
invoke-virtual {p0}, Lio/reactivex/internal/operators/flowable/FlowableCreate$BufferAsyncEmitter;->drain()V
.line 482
return v0
.line 472
:cond_1e
:goto_1e
const/4 v0, 0x0
return v0
.end method