217 lines
6.4 KiB
Java
217 lines
6.4 KiB
Java
package rx.subjects;
|
|
|
|
import j0.g;
|
|
import java.util.ArrayList;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import rx.Observable;
|
|
import rx.Producer;
|
|
import rx.Subscriber;
|
|
import rx.Subscription;
|
|
import rx.exceptions.MissingBackpressureException;
|
|
public final class PublishSubject<T> extends Subject<T, T> {
|
|
public final b<T> j;
|
|
|
|
public static final class a<T> extends AtomicLong implements Producer, Subscription, g<T> {
|
|
private static final long serialVersionUID = 6451806817170721536L;
|
|
public final Subscriber<? super T> actual;
|
|
public final b<T> parent;
|
|
public long produced;
|
|
|
|
public a(b<T> bVar, Subscriber<? super T> subscriber) {
|
|
this.parent = bVar;
|
|
this.actual = subscriber;
|
|
}
|
|
|
|
@Override // rx.Subscription
|
|
public boolean isUnsubscribed() {
|
|
return get() == Long.MIN_VALUE;
|
|
}
|
|
|
|
@Override // rx.Producer
|
|
public void j(long j) {
|
|
long j2;
|
|
if (c.q.a.k.a.h0(j)) {
|
|
do {
|
|
j2 = get();
|
|
if (j2 == Long.MIN_VALUE) {
|
|
return;
|
|
}
|
|
} while (!compareAndSet(j2, c.q.a.k.a.d(j2, j)));
|
|
}
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onCompleted() {
|
|
if (get() != Long.MIN_VALUE) {
|
|
this.actual.onCompleted();
|
|
}
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onError(Throwable th) {
|
|
if (get() != Long.MIN_VALUE) {
|
|
this.actual.onError(th);
|
|
}
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onNext(T t) {
|
|
long j = get();
|
|
if (j != Long.MIN_VALUE) {
|
|
long j2 = this.produced;
|
|
if (j != j2) {
|
|
this.produced = j2 + 1;
|
|
this.actual.onNext(t);
|
|
return;
|
|
}
|
|
unsubscribe();
|
|
this.actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests"));
|
|
}
|
|
}
|
|
|
|
@Override // rx.Subscription
|
|
public void unsubscribe() {
|
|
if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
|
|
this.parent.a(this);
|
|
}
|
|
}
|
|
}
|
|
|
|
public static final class b<T> extends AtomicReference<a<T>[]> implements Observable.a<T>, g<T> {
|
|
public static final a[] i = new a[0];
|
|
public static final a[] j = new a[0];
|
|
private static final long serialVersionUID = -7568940796666027140L;
|
|
public Throwable error;
|
|
|
|
public b() {
|
|
lazySet(i);
|
|
}
|
|
|
|
public void a(a<T> aVar) {
|
|
a<T>[] aVarArr;
|
|
a[] aVarArr2;
|
|
do {
|
|
aVarArr = get();
|
|
if (aVarArr != j && aVarArr != i) {
|
|
int length = aVarArr.length;
|
|
int i2 = 0;
|
|
while (true) {
|
|
if (i2 >= length) {
|
|
i2 = -1;
|
|
break;
|
|
} else if (aVarArr[i2] == aVar) {
|
|
break;
|
|
} else {
|
|
i2++;
|
|
}
|
|
}
|
|
if (i2 >= 0) {
|
|
if (length == 1) {
|
|
aVarArr2 = i;
|
|
} else {
|
|
a[] aVarArr3 = new a[(length - 1)];
|
|
System.arraycopy(aVarArr, 0, aVarArr3, 0, i2);
|
|
System.arraycopy(aVarArr, i2 + 1, aVarArr3, i2, (length - i2) - 1);
|
|
aVarArr2 = aVarArr3;
|
|
}
|
|
} else {
|
|
return;
|
|
}
|
|
} else {
|
|
return;
|
|
}
|
|
} while (!compareAndSet(aVarArr, aVarArr2));
|
|
}
|
|
|
|
@Override // rx.functions.Action1
|
|
public void call(Object obj) {
|
|
boolean z2;
|
|
Subscriber subscriber = (Subscriber) obj;
|
|
a<T> aVar = new a<>(this, subscriber);
|
|
subscriber.add(aVar);
|
|
subscriber.setProducer(aVar);
|
|
while (true) {
|
|
a<T>[] aVarArr = get();
|
|
z2 = false;
|
|
if (aVarArr != j) {
|
|
int length = aVarArr.length;
|
|
a[] aVarArr2 = new a[(length + 1)];
|
|
System.arraycopy(aVarArr, 0, aVarArr2, 0, length);
|
|
aVarArr2[length] = aVar;
|
|
if (compareAndSet(aVarArr, aVarArr2)) {
|
|
z2 = true;
|
|
break;
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
if (!z2) {
|
|
Throwable th = this.error;
|
|
if (th != null) {
|
|
subscriber.onError(th);
|
|
} else {
|
|
subscriber.onCompleted();
|
|
}
|
|
} else if (aVar.isUnsubscribed()) {
|
|
a(aVar);
|
|
}
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onCompleted() {
|
|
for (a<T> aVar : getAndSet(j)) {
|
|
aVar.onCompleted();
|
|
}
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onError(Throwable th) {
|
|
this.error = th;
|
|
ArrayList arrayList = null;
|
|
for (a<T> aVar : getAndSet(j)) {
|
|
try {
|
|
aVar.onError(th);
|
|
} catch (Throwable th2) {
|
|
if (arrayList == null) {
|
|
arrayList = new ArrayList(1);
|
|
}
|
|
arrayList.add(th2);
|
|
}
|
|
}
|
|
c.q.a.k.a.Z(arrayList);
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onNext(T t) {
|
|
for (a<T> aVar : get()) {
|
|
aVar.onNext(t);
|
|
}
|
|
}
|
|
}
|
|
|
|
public PublishSubject(b<T> bVar) {
|
|
super(bVar);
|
|
this.j = bVar;
|
|
}
|
|
|
|
public static <T> PublishSubject<T> k0() {
|
|
return new PublishSubject<>(new b());
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onCompleted() {
|
|
this.j.onCompleted();
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onError(Throwable th) {
|
|
this.j.onError(th);
|
|
}
|
|
|
|
@Override // j0.g
|
|
public void onNext(T t) {
|
|
this.j.onNext(t);
|
|
}
|
|
}
|