/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.core.LeasePermitHandler;
import io.rsocket.core.PayloadValidationUtils;
import io.rsocket.core.ReassemblyUtils;
import io.rsocket.core.RequesterFrameHandler;
import io.rsocket.core.RequesterLeaseTracker;
import io.rsocket.core.RequesterResponderSupport;
import io.rsocket.core.SendUtils;
import io.rsocket.core.StateUtils;
import io.rsocket.frame.CancelFrameCodec;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

final class RequestChannelRequesterFlux
extends Flux<Payload>
implements RequesterFrameHandler,
LeasePermitHandler,
CoreSubscriber<Payload>,
Subscription,
Scannable {
    final ByteBufAllocator allocator;
    final int mtu;
    final int maxFrameLength;
    final int maxInboundPayloadSize;
    final RequesterResponderSupport requesterResponderSupport;
    final DuplexConnection connection;
    final PayloadDecoder payloadDecoder;
    final Publisher<Payload> payloadsPublisher;
    @Nullable
    final RequesterLeaseTracker requesterLeaseTracker;
    @Nullable
    final RequestInterceptor requestInterceptor;
    volatile long state;
    static final AtomicLongFieldUpdater<RequestChannelRequesterFlux> STATE = AtomicLongFieldUpdater.newUpdater(RequestChannelRequesterFlux.class, "state");
    int streamId;
    boolean isFirstSignal = true;
    Payload firstPayload;
    Subscription outboundSubscription;
    boolean outboundDone;
    Throwable outboundError;
    Context cachedContext;
    CoreSubscriber<? super Payload> inboundSubscriber;
    boolean inboundDone;
    long requested;
    long produced;
    CompositeByteBuf frames;

    RequestChannelRequesterFlux(Publisher<Payload> payloadsPublisher, RequesterResponderSupport requesterResponderSupport) {
        this.allocator = requesterResponderSupport.getAllocator();
        this.payloadsPublisher = payloadsPublisher;
        this.mtu = requesterResponderSupport.getMtu();
        this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
        this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
        this.requesterResponderSupport = requesterResponderSupport;
        this.connection = requesterResponderSupport.getDuplexConnection();
        this.payloadDecoder = requesterResponderSupport.getPayloadDecoder();
        this.requesterLeaseTracker = requesterResponderSupport.getRequesterLeaseTracker();
        this.requestInterceptor = requesterResponderSupport.getRequestInterceptor();
    }

    @Override
    public void subscribe(CoreSubscriber<? super Payload> actual) {
        Objects.requireNonNull(actual, "subscribe");
        long previousState = StateUtils.markSubscribed(STATE, this);
        if (StateUtils.isSubscribedOrTerminated(previousState)) {
            IllegalStateException e = new IllegalStateException("RequestChannelFlux allows only a single Subscriber");
            RequestInterceptor requestInterceptor = this.requestInterceptor;
            if (requestInterceptor != null) {
                requestInterceptor.onReject(e, FrameType.REQUEST_CHANNEL, null);
            }
            Operators.error(actual, e);
            return;
        }
        this.inboundSubscriber = actual;
        this.payloadsPublisher.subscribe(this);
    }

    @Override
    public void onSubscribe(Subscription outboundSubscription) {
        if (Operators.validate(this.outboundSubscription, outboundSubscription)) {
            this.outboundSubscription = outboundSubscription;
            this.inboundSubscriber.onSubscribe(this);
        }
    }

    @Override
    public final void request(long n) {
        if (!Operators.validate(n)) {
            return;
        }
        this.requested = Operators.addCap(this.requested, n);
        long previousState = StateUtils.addRequestN(STATE, this, n, this.requesterLeaseTracker == null);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        if (StateUtils.hasRequested(previousState)) {
            if (StateUtils.isFirstFrameSent(previousState) && !StateUtils.isMaxAllowedRequestN(StateUtils.extractRequestN(previousState))) {
                int streamId = this.streamId;
                ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
                this.connection.sendFrame(streamId, requestNFrame);
            }
            return;
        }
        this.outboundSubscription.request(1L);
    }

    @Override
    public void onNext(Payload p) {
        if (this.outboundDone) {
            p.release();
            return;
        }
        if (this.isFirstSignal) {
            boolean leaseEnabled;
            this.isFirstSignal = false;
            RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
            boolean bl = leaseEnabled = requesterLeaseTracker != null;
            if (leaseEnabled) {
                this.firstPayload = p;
                long previousState = StateUtils.markFirstPayloadReceived(STATE, this);
                if (StateUtils.isTerminated(previousState)) {
                    this.firstPayload = null;
                    p.release();
                    return;
                }
                requesterLeaseTracker.issue(this);
            } else {
                long state = this.state;
                if (StateUtils.isTerminated(state)) {
                    p.release();
                    return;
                }
                this.sendFirstPayload(p, StateUtils.extractRequestN(state), false);
            }
        } else {
            this.sendFollowingPayload(p);
        }
    }

    @Override
    public boolean handlePermit() {
        long previousState = StateUtils.markReadyToSendFirstFrame(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return false;
        }
        Payload firstPayload = this.firstPayload;
        this.firstPayload = null;
        this.sendFirstPayload(firstPayload, StateUtils.extractRequestN(previousState), StateUtils.isOutboundTerminated(previousState));
        return true;
    }

    void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean completed) {
        int streamId;
        int mtu = this.mtu;
        try {
            if (!PayloadValidationUtils.isValid(mtu, this.maxFrameLength, firstPayload, true)) {
                long previousState = StateUtils.markTerminated(STATE, this);
                if (StateUtils.isTerminated(previousState)) {
                    return;
                }
                if (!StateUtils.isOutboundTerminated(previousState)) {
                    this.outboundSubscription.cancel();
                }
                IllegalArgumentException e = new IllegalArgumentException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", this.maxFrameLength));
                RequestInterceptor requestInterceptor = this.requestInterceptor;
                if (requestInterceptor != null) {
                    requestInterceptor.onReject(e, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
                }
                firstPayload.release();
                this.inboundDone = true;
                this.inboundSubscriber.onError(e);
                return;
            }
        }
        catch (IllegalReferenceCountException e) {
            RequestInterceptor requestInterceptor;
            long previousState = StateUtils.markTerminated(STATE, this);
            if (StateUtils.isTerminated(previousState)) {
                Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
                return;
            }
            if (!StateUtils.isOutboundTerminated(previousState)) {
                this.outboundSubscription.cancel();
            }
            if ((requestInterceptor = this.requestInterceptor) != null) {
                requestInterceptor.onReject(e, FrameType.REQUEST_CHANNEL, null);
            }
            this.inboundDone = true;
            this.inboundSubscriber.onError(e);
            return;
        }
        RequesterResponderSupport sm = this.requesterResponderSupport;
        DuplexConnection connection = this.connection;
        ByteBufAllocator allocator = this.allocator;
        try {
            this.streamId = streamId = sm.addAndGetNextStreamId(this);
        }
        catch (Throwable t) {
            long previousState = StateUtils.markTerminated(STATE, this);
            firstPayload.release();
            if (StateUtils.isTerminated(previousState)) {
                Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
                return;
            }
            if (!StateUtils.isOutboundTerminated(previousState)) {
                this.outboundSubscription.cancel();
            }
            Throwable ut = Exceptions.unwrap(t);
            RequestInterceptor requestInterceptor = this.requestInterceptor;
            if (requestInterceptor != null) {
                requestInterceptor.onReject(ut, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
            }
            this.inboundDone = true;
            this.inboundSubscriber.onError(ut);
            return;
        }
        RequestInterceptor requestInterceptor = this.requestInterceptor;
        if (requestInterceptor != null) {
            requestInterceptor.onStart(streamId, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
        }
        try {
            SendUtils.sendReleasingPayload(streamId, FrameType.REQUEST_CHANNEL, initialRequestN, mtu, firstPayload, connection, allocator, completed);
        }
        catch (Throwable t) {
            long previousState = StateUtils.markTerminated(STATE, this);
            firstPayload.release();
            if (StateUtils.isTerminated(previousState)) {
                Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
                return;
            }
            sm.remove(streamId, this);
            if (!StateUtils.isOutboundTerminated(previousState)) {
                this.outboundSubscription.cancel();
            }
            if (requestInterceptor != null) {
                requestInterceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
            }
            this.inboundDone = true;
            this.inboundSubscriber.onError(t);
            return;
        }
        long previousState = StateUtils.markFirstFrameSent(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            if (this.inboundDone) {
                return;
            }
            sm.remove(streamId, this);
            Throwable outboundError = this.outboundError;
            if (outboundError != null) {
                ByteBuf errorFrame = ErrorFrameCodec.encode(allocator, streamId, outboundError);
                connection.sendFrame(streamId, errorFrame);
                if (requestInterceptor != null) {
                    requestInterceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, outboundError);
                }
                this.inboundDone = true;
                this.inboundSubscriber.onError(outboundError);
            } else {
                ByteBuf cancelFrame = CancelFrameCodec.encode(allocator, streamId);
                connection.sendFrame(streamId, cancelFrame);
                if (requestInterceptor != null) {
                    requestInterceptor.onCancel(streamId, FrameType.REQUEST_CHANNEL);
                }
            }
            return;
        }
        if (!completed && StateUtils.isOutboundTerminated(previousState)) {
            ByteBuf completeFrame = PayloadFrameCodec.encodeComplete(this.allocator, streamId);
            connection.sendFrame(streamId, completeFrame);
        }
        if (StateUtils.isMaxAllowedRequestN(initialRequestN)) {
            return;
        }
        long requestN = StateUtils.extractRequestN(previousState);
        if (StateUtils.isMaxAllowedRequestN(requestN)) {
            ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
            connection.sendFrame(streamId, requestNFrame);
            return;
        }
        if (requestN > initialRequestN) {
            ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN);
            connection.sendFrame(streamId, requestNFrame);
        }
    }

    final void sendFollowingPayload(Payload followingPayload) {
        int streamId = this.streamId;
        int mtu = this.mtu;
        try {
            if (!PayloadValidationUtils.isValid(mtu, this.maxFrameLength, followingPayload, true)) {
                followingPayload.release();
                IllegalArgumentException e = new IllegalArgumentException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", this.maxFrameLength));
                if (!this.tryCancel()) {
                    Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
                    return;
                }
                this.propagateErrorSafely(e);
                return;
            }
        }
        catch (IllegalReferenceCountException e) {
            if (!this.tryCancel()) {
                Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
                return;
            }
            this.propagateErrorSafely(e);
            return;
        }
        try {
            SendUtils.sendReleasingPayload(streamId, FrameType.NEXT, mtu, followingPayload, this.connection, this.allocator, true);
        }
        catch (Throwable e) {
            if (!this.tryCancel()) {
                Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
                return;
            }
            this.propagateErrorSafely(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void propagateErrorSafely(Throwable t) {
        if (!this.inboundDone) {
            RequestChannelRequesterFlux requestChannelRequesterFlux = this;
            synchronized (requestChannelRequesterFlux) {
                if (!this.inboundDone) {
                    RequestInterceptor interceptor = this.requestInterceptor;
                    if (interceptor != null) {
                        interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, t);
                    }
                    this.inboundDone = true;
                    this.inboundSubscriber.onError(t);
                } else {
                    Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
                }
            }
        } else {
            Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
        }
    }

    @Override
    public final void cancel() {
        if (!this.tryCancel()) {
            return;
        }
        RequestInterceptor requestInterceptor = this.requestInterceptor;
        if (requestInterceptor != null) {
            requestInterceptor.onCancel(this.streamId, FrameType.REQUEST_CHANNEL);
        }
    }

    boolean tryCancel() {
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return false;
        }
        if (!StateUtils.isOutboundTerminated(previousState)) {
            this.outboundSubscription.cancel();
        }
        if (!StateUtils.isReadyToSendFirstFrame(previousState) && StateUtils.isFirstPayloadReceived(previousState)) {
            Payload firstPayload = this.firstPayload;
            this.firstPayload = null;
            firstPayload.release();
            return false;
        }
        ReassemblyUtils.synchronizedRelease(this, previousState);
        boolean firstFrameSent = StateUtils.isFirstFrameSent(previousState);
        if (firstFrameSent) {
            int streamId = this.streamId;
            this.requesterResponderSupport.remove(streamId, this);
            ByteBuf cancelFrame = CancelFrameCodec.encode(this.allocator, streamId);
            this.connection.sendFrame(streamId, cancelFrame);
        }
        return firstFrameSent;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(Throwable t) {
        if (this.outboundDone) {
            Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
            return;
        }
        this.outboundError = t;
        this.outboundDone = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState) || StateUtils.isOutboundTerminated(previousState)) {
            Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
            return;
        }
        if (this.isFirstSignal) {
            this.inboundDone = true;
            this.inboundSubscriber.onError(t);
            return;
        }
        if (!StateUtils.isReadyToSendFirstFrame(previousState)) {
            Payload firstPayload = this.firstPayload;
            this.firstPayload = null;
            firstPayload.release();
            this.inboundDone = true;
            this.inboundSubscriber.onError(t);
            return;
        }
        ReassemblyUtils.synchronizedRelease(this, previousState);
        if (StateUtils.isFirstFrameSent(previousState)) {
            int streamId = this.streamId;
            this.requesterResponderSupport.remove(streamId, this);
            ByteBuf errorFrame = ErrorFrameCodec.encode(this.allocator, streamId, t);
            this.connection.sendFrame(streamId, errorFrame);
            if (!StateUtils.isInboundTerminated(previousState)) {
                RequestChannelRequesterFlux requestChannelRequesterFlux = this;
                synchronized (requestChannelRequesterFlux) {
                    RequestInterceptor interceptor = this.requestInterceptor;
                    if (interceptor != null) {
                        interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
                    }
                    this.inboundDone = true;
                    this.inboundSubscriber.onError(t);
                }
            } else {
                Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
            }
        }
    }

    @Override
    public void onComplete() {
        if (this.outboundDone) {
            return;
        }
        this.outboundDone = true;
        long previousState = StateUtils.markOutboundTerminated(STATE, this, true);
        if (StateUtils.isTerminated(previousState) || StateUtils.isOutboundTerminated(previousState)) {
            return;
        }
        if (!StateUtils.isFirstFrameSent(previousState)) {
            if (!StateUtils.isFirstPayloadReceived(previousState)) {
                this.inboundSubscriber.onError(new CancellationException("Empty Source"));
            }
            return;
        }
        int streamId = this.streamId;
        ByteBuf completeFrame = PayloadFrameCodec.encodeComplete(this.allocator, streamId);
        this.connection.sendFrame(streamId, completeFrame);
        if (StateUtils.isInboundTerminated(previousState)) {
            this.requesterResponderSupport.remove(streamId, this);
            RequestInterceptor interceptor = this.requestInterceptor;
            if (interceptor != null) {
                interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, null);
            }
        }
    }

    @Override
    public final void handleComplete() {
        if (this.inboundDone) {
            return;
        }
        this.inboundDone = true;
        long previousState = StateUtils.markInboundTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        if (StateUtils.isOutboundTerminated(previousState)) {
            this.requesterResponderSupport.remove(this.streamId, this);
            RequestInterceptor interceptor = this.requestInterceptor;
            if (interceptor != null) {
                interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, null);
            }
        }
        this.inboundSubscriber.onComplete();
    }

    @Override
    public final void handlePermitError(Throwable cause) {
        this.inboundDone = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState) || StateUtils.isInboundTerminated(previousState)) {
            Operators.onErrorDropped(cause, this.inboundSubscriber.currentContext());
            return;
        }
        if (!StateUtils.isOutboundTerminated(previousState)) {
            this.outboundSubscription.cancel();
        }
        Payload p = this.firstPayload;
        RequestInterceptor interceptor = this.requestInterceptor;
        if (interceptor != null) {
            interceptor.onReject(cause, FrameType.REQUEST_CHANNEL, p.metadata());
        }
        p.release();
        this.inboundSubscriber.onError(cause);
    }

    @Override
    public final void handleError(Throwable cause) {
        if (this.inboundDone) {
            Operators.onErrorDropped(cause, this.inboundSubscriber.currentContext());
            return;
        }
        this.inboundDone = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState) || StateUtils.isInboundTerminated(previousState)) {
            Operators.onErrorDropped(cause, this.inboundSubscriber.currentContext());
            return;
        }
        if (!StateUtils.isOutboundTerminated(previousState)) {
            this.outboundSubscription.cancel();
        }
        ReassemblyUtils.release(this, previousState);
        int streamId = this.streamId;
        this.requesterResponderSupport.remove(streamId, this);
        RequestInterceptor interceptor = this.requestInterceptor;
        if (interceptor != null) {
            interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, cause);
        }
        this.inboundSubscriber.onError(cause);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void handlePayload(Payload value) {
        RequestChannelRequesterFlux requestChannelRequesterFlux = this;
        synchronized (requestChannelRequesterFlux) {
            if (this.inboundDone) {
                value.release();
                return;
            }
            long produced = this.produced;
            if (this.requested == produced) {
                value.release();
                if (!this.tryCancel()) {
                    return;
                }
                IllegalStateException cause = Exceptions.failWithOverflow("The number of messages received exceeds the number requested");
                RequestInterceptor requestInterceptor = this.requestInterceptor;
                if (requestInterceptor != null) {
                    requestInterceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, cause);
                }
                this.inboundSubscriber.onError(cause);
                return;
            }
            this.produced = produced + 1L;
            this.inboundSubscriber.onNext(value);
        }
    }

    @Override
    public void handleRequestN(long n) {
        this.outboundSubscription.request(n);
    }

    @Override
    public void handleCancel() {
        RequestInterceptor interceptor;
        if (this.outboundDone) {
            return;
        }
        long previousState = StateUtils.markOutboundTerminated(STATE, this, false);
        if (StateUtils.isTerminated(previousState) || StateUtils.isOutboundTerminated(previousState)) {
            return;
        }
        boolean inboundTerminated = StateUtils.isInboundTerminated(previousState);
        if (inboundTerminated) {
            this.requesterResponderSupport.remove(this.streamId, this);
        }
        this.outboundSubscription.cancel();
        if (inboundTerminated && (interceptor = this.requestInterceptor) != null) {
            interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, null);
        }
    }

    @Override
    public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload) {
        ReassemblyUtils.handleNextSupport(STATE, this, this, this.inboundSubscriber, this.payloadDecoder, this.allocator, this.maxInboundPayloadSize, frame, hasFollows, isLastPayload);
    }

    @Override
    @NonNull
    public Context currentContext() {
        long state = this.state;
        if (StateUtils.isSubscribedOrTerminated(state)) {
            Context cachedContext = this.cachedContext;
            if (cachedContext == null) {
                this.cachedContext = cachedContext = this.inboundSubscriber.currentContext().putAll((ContextView)SendUtils.DISCARD_CONTEXT);
            }
            return cachedContext;
        }
        return Context.empty();
    }

    @Override
    public CompositeByteBuf getFrames() {
        return this.frames;
    }

    @Override
    public void setFrames(CompositeByteBuf byteBuf) {
        this.frames = byteBuf;
    }

    @Override
    @Nullable
    public Object scanUnsafe(Scannable.Attr key) {
        long state = this.state;
        if (key == Scannable.Attr.TERMINATED) {
            return StateUtils.isTerminated(state);
        }
        if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
            return state;
        }
        return null;
    }

    @Override
    @NonNull
    public String stepName() {
        return "source(RequestChannelFlux)";
    }
}

