/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.loadbalance;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.FrameType;
import io.rsocket.loadbalance.FluxDeferredResolution;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.loadbalance.MonoDeferredResolution;
import io.rsocket.loadbalance.RSocketPool;
import io.rsocket.loadbalance.ResolvingOperator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

final class PooledRSocket
extends ResolvingOperator<RSocket>
implements CoreSubscriber<RSocket>,
RSocket {
    final RSocketPool parent;
    final Mono<RSocket> rSocketSource;
    final LoadbalanceTarget loadbalanceTarget;
    final Sinks.Empty<Void> onCloseSink;
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<PooledRSocket, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(PooledRSocket.class, Subscription.class, "s");

    PooledRSocket(RSocketPool parent, Mono<RSocket> rSocketSource, LoadbalanceTarget loadbalanceTarget) {
        this.parent = parent;
        this.rSocketSource = rSocketSource;
        this.loadbalanceTarget = loadbalanceTarget;
        this.onCloseSink = Sinks.unsafe().empty();
    }

    @Override
    public void onSubscribe(Subscription s2) {
        if (Operators.setOnce(S, this, s2)) {
            s2.request(Long.MAX_VALUE);
        }
    }

    @Override
    public void onComplete() {
        Subscription s2 = this.s;
        RSocket value = (RSocket)this.value;
        if (s2 == Operators.cancelledSubscription() || !S.compareAndSet(this, s2, null)) {
            this.doFinally();
            return;
        }
        if (value == null) {
            this.terminate(new IllegalStateException("Source completed empty"));
        } else {
            this.complete(value);
        }
    }

    @Override
    public void onError(Throwable t) {
        Subscription s2 = this.s;
        if (s2 == Operators.cancelledSubscription() || S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
            this.doFinally();
            Operators.onErrorDropped(t, Context.empty());
            return;
        }
        this.doFinally();
        this.doCleanup(t);
    }

    @Override
    public void onNext(RSocket value) {
        if (this.s == Operators.cancelledSubscription()) {
            this.doOnValueExpired(value);
            return;
        }
        this.value = value;
        this.doFinally();
    }

    @Override
    protected void doSubscribe() {
        this.rSocketSource.subscribe(this);
    }

    @Override
    protected void doOnValueResolved(RSocket value) {
        value.onClose().subscribe(null, this::doCleanup, () -> this.doCleanup(ON_DISPOSE));
    }

    void doCleanup(Throwable t) {
        PooledRSocket[] newSockets;
        PooledRSocket[] sockets;
        if (this.isDisposed()) {
            return;
        }
        this.terminate(t);
        RSocketPool parent = this.parent;
        do {
            sockets = parent.activeSockets;
            int activeSocketsCount = sockets.length;
            int index = -1;
            for (int i = 0; i < activeSocketsCount; ++i) {
                if (sockets[i] != this) continue;
                index = i;
                break;
            }
            if (index == -1) break;
            if (activeSocketsCount == 1) {
                newSockets = RSocketPool.EMPTY;
                continue;
            }
            int lastIndex = activeSocketsCount - 1;
            newSockets = new PooledRSocket[lastIndex];
            if (index != 0) {
                System.arraycopy(sockets, 0, newSockets, 0, index);
            }
            if (index == lastIndex) continue;
            System.arraycopy(sockets, index + 1, newSockets, index, lastIndex - index);
        } while (!RSocketPool.ACTIVE_SOCKETS.compareAndSet(parent, sockets, newSockets));
        if (t == ON_DISPOSE) {
            this.onCloseSink.tryEmitEmpty();
        } else {
            this.onCloseSink.tryEmitError(t);
        }
    }

    @Override
    protected void doOnValueExpired(RSocket value) {
        value.dispose();
    }

    @Override
    protected void doOnDispose() {
        Operators.terminate(S, this);
        RSocket value = (RSocket)this.value;
        if (value != null) {
            value.onClose().subscribe(null, this.onCloseSink::tryEmitError, this.onCloseSink::tryEmitEmpty);
        } else {
            this.onCloseSink.tryEmitEmpty();
        }
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        return new MonoInner<Void>(this, payload, FrameType.REQUEST_FNF);
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        return new MonoInner<Payload>(this, payload, FrameType.REQUEST_RESPONSE);
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return new FluxInner<Payload>(this, payload, FrameType.REQUEST_STREAM);
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return new FluxInner<Publisher<Payload>>(this, payloads, FrameType.REQUEST_CHANNEL);
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        return new MonoInner<Void>(this, payload, FrameType.METADATA_PUSH);
    }

    LoadbalanceTarget target() {
        return this.loadbalanceTarget;
    }

    @Override
    public Mono<Void> onClose() {
        return this.onCloseSink.asMono();
    }

    @Override
    public double availability() {
        RSocket socket = (RSocket)this.valueIfResolved();
        return socket != null ? socket.availability() : 0.0;
    }

    static final class FluxInner<INPUT>
    extends FluxDeferredResolution<INPUT, RSocket> {
        FluxInner(PooledRSocket parent, INPUT fluxOrPayload, FrameType requestType) {
            super(parent, fluxOrPayload, requestType);
        }

        @Override
        public void accept(RSocket rSocket, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                if (this.requestType == FrameType.REQUEST_STREAM) {
                    ReferenceCountUtil.safeRelease((Object)this.fluxOrPayload);
                }
                this.onError(t);
                return;
            }
            if (rSocket != null) {
                Flux<Payload> source;
                switch (this.requestType) {
                    case REQUEST_STREAM: {
                        source = rSocket.requestStream((Payload)this.fluxOrPayload);
                        break;
                    }
                    case REQUEST_CHANNEL: {
                        source = rSocket.requestChannel((Flux)this.fluxOrPayload);
                        break;
                    }
                    default: {
                        Operators.error(this.actual, new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                source.subscribe(this);
            } else {
                this.parent.observe(this);
            }
        }
    }

    static final class MonoInner<RESULT>
    extends MonoDeferredResolution<RESULT, RSocket> {
        MonoInner(PooledRSocket parent, Payload payload, FrameType requestType) {
            super(parent, payload, requestType);
        }

        @Override
        public void accept(RSocket rSocket, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                ReferenceCountUtil.safeRelease((Object)this.payload);
                this.onError(t);
                return;
            }
            if (rSocket != null) {
                Mono<Object> source;
                switch (this.requestType) {
                    case REQUEST_FNF: {
                        source = rSocket.fireAndForget(this.payload);
                        break;
                    }
                    case REQUEST_RESPONSE: {
                        source = rSocket.requestResponse(this.payload);
                        break;
                    }
                    case METADATA_PUSH: {
                        source = rSocket.metadataPush(this.payload);
                        break;
                    }
                    default: {
                        Operators.error(this.actual, new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                source.subscribe(this);
            } else {
                this.parent.observe(this);
            }
        }
    }
}

