/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.FireAndForgetResponderSubscriber;
import io.rsocket.core.FrameHandler;
import io.rsocket.core.MetadataPushResponderSubscriber;
import io.rsocket.core.RequestChannelResponderSubscriber;
import io.rsocket.core.RequestResponseResponderSubscriber;
import io.rsocket.core.RequestStreamResponderSubscriber;
import io.rsocket.core.RequesterResponderSupport;
import io.rsocket.core.ResponderLeaseTracker;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.RequestChannelFrameCodec;
import io.rsocket.frame.RequestFireAndForgetFrameCodec;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.RequestResponseFrameCodec;
import io.rsocket.frame.RequestStreamFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RequestInterceptor;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

class RSocketResponder
extends RequesterResponderSupport
implements RSocket {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private final RSocket requestHandler;
    @Nullable
    private final ResponderLeaseTracker leaseHandler;
    private volatile Throwable terminationError;
    private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(RSocketResponder.class, Throwable.class, "terminationError");

    RSocketResponder(DuplexConnection connection, RSocket requestHandler, PayloadDecoder payloadDecoder, @Nullable ResponderLeaseTracker leaseHandler, int mtu, int maxFrameLength, int maxInboundPayloadSize, Function<RSocket, ? extends RequestInterceptor> requestInterceptorFunction) {
        super(mtu, maxFrameLength, maxInboundPayloadSize, payloadDecoder, connection, null, requestInterceptorFunction);
        this.requestHandler = requestHandler;
        this.leaseHandler = leaseHandler;
        connection.receive().subscribe(this::handleFrame, e -> {});
        connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
    }

    private void tryTerminateOnConnectionError(Throwable e) {
        this.tryTerminate(() -> e);
    }

    private void tryTerminateOnConnectionClose() {
        this.tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
    }

    private void tryTerminate(Supplier<Throwable> errorSupplier) {
        Throwable e;
        if (this.terminationError == null && TERMINATION_ERROR.compareAndSet(this, null, e = errorSupplier.get())) {
            this.doOnDispose();
        }
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        }
        catch (Throwable t) {
            return Mono.error(t);
        }
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        }
        catch (Throwable t) {
            return Mono.error(t);
        }
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        }
        catch (Throwable t) {
            return Flux.error(t);
        }
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        try {
            return this.requestHandler.requestChannel(payloads);
        }
        catch (Throwable t) {
            return Flux.error(t);
        }
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        }
        catch (Throwable t) {
            return Mono.error(t);
        }
    }

    @Override
    public void dispose() {
        this.tryTerminate(() -> new CancellationException("Disposed"));
    }

    @Override
    public boolean isDisposed() {
        return this.getDuplexConnection().isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.getDuplexConnection().onClose();
    }

    final void doOnDispose() {
        ResponderLeaseTracker handler;
        this.cleanUpSendingSubscriptions();
        this.getDuplexConnection().dispose();
        RequestInterceptor requestInterceptor = this.getRequestInterceptor();
        if (requestInterceptor != null) {
            requestInterceptor.dispose();
        }
        if ((handler = this.leaseHandler) != null) {
            handler.dispose();
        }
        this.requestHandler.dispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanUpSendingSubscriptions() {
        ArrayList activeStreamsCopy;
        RSocketResponder rSocketResponder = this;
        synchronized (rSocketResponder) {
            IntObjectMap activeStreams = this.activeStreams;
            activeStreamsCopy = new ArrayList(activeStreams.values());
        }
        for (FrameHandler handler : activeStreamsCopy) {
            if (handler == null) continue;
            handler.handleCancel();
        }
    }

    final void handleFrame(ByteBuf frame) {
        try {
            int streamId = FrameHeaderCodec.streamId(frame);
            FrameType frameType = FrameHeaderCodec.frameType(frame);
            switch (frameType) {
                case REQUEST_FNF: {
                    this.handleFireAndForget(streamId, frame);
                    break;
                }
                case REQUEST_RESPONSE: {
                    this.handleRequestResponse(streamId, frame);
                    break;
                }
                case REQUEST_STREAM: {
                    long streamInitialRequestN = RequestStreamFrameCodec.initialRequestN(frame);
                    this.handleStream(streamId, frame, streamInitialRequestN);
                    break;
                }
                case REQUEST_CHANNEL: {
                    long channelInitialRequestN = RequestChannelFrameCodec.initialRequestN(frame);
                    this.handleChannel(streamId, frame, channelInitialRequestN, FrameHeaderCodec.hasComplete(frame));
                    break;
                }
                case METADATA_PUSH: {
                    this.handleMetadataPush(this.metadataPush((Payload)super.getPayloadDecoder().apply(frame)));
                    break;
                }
                case CANCEL: {
                    FrameHandler receiver = super.get(streamId);
                    if (receiver != null) {
                        receiver.handleCancel();
                    }
                    break;
                }
                case REQUEST_N: {
                    FrameHandler receiver = super.get(streamId);
                    if (receiver != null) {
                        long n = RequestNFrameCodec.requestN(frame);
                        receiver.handleRequestN(n);
                    }
                    break;
                }
                case PAYLOAD: {
                    break;
                }
                case NEXT: {
                    FrameHandler receiver = super.get(streamId);
                    if (receiver != null) {
                        boolean hasFollows = FrameHeaderCodec.hasFollows(frame);
                        receiver.handleNext(frame, hasFollows, false);
                    }
                    break;
                }
                case COMPLETE: {
                    FrameHandler receiver = super.get(streamId);
                    if (receiver != null) {
                        receiver.handleComplete();
                    }
                    break;
                }
                case ERROR: {
                    FrameHandler receiver = super.get(streamId);
                    if (receiver != null) {
                        receiver.handleError(Exceptions.from(streamId, frame));
                    }
                    break;
                }
                case NEXT_COMPLETE: {
                    FrameHandler receiver = super.get(streamId);
                    if (receiver != null) {
                        receiver.handleNext(frame, false, true);
                    }
                    break;
                }
                case SETUP: {
                    this.getDuplexConnection().sendFrame(streamId, ErrorFrameCodec.encode(super.getAllocator(), streamId, new IllegalStateException("Setup frame received post setup.")));
                    break;
                }
                default: {
                    this.getDuplexConnection().sendFrame(streamId, ErrorFrameCodec.encode(super.getAllocator(), streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + (Object)((Object)frameType))));
                    break;
                }
            }
        }
        catch (Throwable t) {
            LOGGER.error("Unexpected error during frame handling", t);
            this.getDuplexConnection().sendFrame(0, ErrorFrameCodec.encode(super.getAllocator(), 0, new ConnectionErrorException("Unexpected error during frame handling", t)));
            this.tryTerminateOnConnectionError(t);
        }
    }

    final void handleFireAndForget(int streamId, ByteBuf frame) {
        Throwable leaseError;
        ResponderLeaseTracker leaseHandler = this.leaseHandler;
        if (leaseHandler == null || (leaseError = leaseHandler.use()) == null) {
            if (FrameHeaderCodec.hasFollows(frame)) {
                RequestInterceptor requestInterceptor = this.getRequestInterceptor();
                if (requestInterceptor != null) {
                    requestInterceptor.onStart(streamId, FrameType.REQUEST_FNF, RequestFireAndForgetFrameCodec.metadata(frame));
                }
                FireAndForgetResponderSubscriber subscriber = new FireAndForgetResponderSubscriber(streamId, frame, this, this);
                this.add(streamId, subscriber);
            } else {
                RequestInterceptor requestInterceptor = this.getRequestInterceptor();
                if (requestInterceptor != null) {
                    requestInterceptor.onStart(streamId, FrameType.REQUEST_FNF, RequestFireAndForgetFrameCodec.metadata(frame));
                    this.fireAndForget((Payload)super.getPayloadDecoder().apply(frame)).subscribe(new FireAndForgetResponderSubscriber(streamId, this));
                } else {
                    this.fireAndForget((Payload)super.getPayloadDecoder().apply(frame)).subscribe(FireAndForgetResponderSubscriber.INSTANCE);
                }
            }
        } else {
            RequestInterceptor requestTracker = this.getRequestInterceptor();
            if (requestTracker != null) {
                requestTracker.onReject(leaseError, FrameType.REQUEST_FNF, RequestFireAndForgetFrameCodec.metadata(frame));
            }
        }
    }

    final void handleRequestResponse(int streamId, ByteBuf frame) {
        Throwable leaseError;
        ResponderLeaseTracker leaseHandler = this.leaseHandler;
        if (leaseHandler == null || (leaseError = leaseHandler.use()) == null) {
            RequestInterceptor requestInterceptor = this.getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onStart(streamId, FrameType.REQUEST_RESPONSE, RequestResponseFrameCodec.metadata(frame));
            }
            if (FrameHeaderCodec.hasFollows(frame)) {
                RequestResponseResponderSubscriber subscriber = new RequestResponseResponderSubscriber(streamId, frame, this, this);
                this.add(streamId, subscriber);
            } else {
                RequestResponseResponderSubscriber subscriber = new RequestResponseResponderSubscriber(streamId, this);
                if (this.add(streamId, subscriber)) {
                    this.requestResponse((Payload)super.getPayloadDecoder().apply(frame)).subscribe(subscriber);
                }
            }
        } else {
            RequestInterceptor requestInterceptor = this.getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onReject(leaseError, FrameType.REQUEST_RESPONSE, RequestResponseFrameCodec.metadata(frame));
            }
            this.sendLeaseRejection(streamId, leaseError);
        }
    }

    final void handleStream(int streamId, ByteBuf frame, long initialRequestN) {
        Throwable leaseError;
        ResponderLeaseTracker leaseHandler = this.leaseHandler;
        if (leaseHandler == null || (leaseError = leaseHandler.use()) == null) {
            RequestInterceptor requestInterceptor = this.getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onStart(streamId, FrameType.REQUEST_STREAM, RequestStreamFrameCodec.metadata(frame));
            }
            if (FrameHeaderCodec.hasFollows(frame)) {
                RequestStreamResponderSubscriber subscriber = new RequestStreamResponderSubscriber(streamId, initialRequestN, frame, this, this);
                this.add(streamId, subscriber);
            } else {
                RequestStreamResponderSubscriber subscriber = new RequestStreamResponderSubscriber(streamId, initialRequestN, this);
                if (this.add(streamId, subscriber)) {
                    this.requestStream((Payload)super.getPayloadDecoder().apply(frame)).subscribe(subscriber);
                }
            }
        } else {
            RequestInterceptor requestInterceptor = this.getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onReject(leaseError, FrameType.REQUEST_STREAM, RequestStreamFrameCodec.metadata(frame));
            }
            this.sendLeaseRejection(streamId, leaseError);
        }
    }

    final void handleChannel(int streamId, ByteBuf frame, long initialRequestN, boolean complete) {
        Throwable leaseError;
        ResponderLeaseTracker leaseHandler = this.leaseHandler;
        if (leaseHandler == null || (leaseError = leaseHandler.use()) == null) {
            RequestInterceptor requestInterceptor = this.getRequestInterceptor();
            if (requestInterceptor != null) {
                requestInterceptor.onStart(streamId, FrameType.REQUEST_CHANNEL, RequestChannelFrameCodec.metadata(frame));
            }
            if (FrameHeaderCodec.hasFollows(frame)) {
                RequestChannelResponderSubscriber subscriber = new RequestChannelResponderSubscriber(streamId, initialRequestN, frame, this, this);
                this.add(streamId, subscriber);
            } else {
                Payload firstPayload = (Payload)super.getPayloadDecoder().apply(frame);
                RequestChannelResponderSubscriber subscriber = new RequestChannelResponderSubscriber(streamId, initialRequestN, firstPayload, this);
                if (this.add(streamId, subscriber)) {
                    this.requestChannel(subscriber).subscribe(subscriber);
                    if (complete) {
                        subscriber.handleComplete();
                    }
                }
            }
        } else {
            RequestInterceptor requestTracker = this.getRequestInterceptor();
            if (requestTracker != null) {
                requestTracker.onReject(leaseError, FrameType.REQUEST_CHANNEL, RequestChannelFrameCodec.metadata(frame));
            }
            this.sendLeaseRejection(streamId, leaseError);
        }
    }

    private void sendLeaseRejection(int streamId, Throwable leaseError) {
        this.getDuplexConnection().sendFrame(streamId, ErrorFrameCodec.encode(this.getAllocator(), streamId, leaseError));
    }

    private void handleMetadataPush(Mono<Void> result) {
        result.subscribe(MetadataPushResponderSubscriber.INSTANCE);
    }

    @Override
    public boolean add(int streamId, FrameHandler frameHandler) {
        if (!super.add(streamId, frameHandler)) {
            frameHandler.handleCancel();
            return false;
        }
        return true;
    }
}

