/*
 * Decompiled with CFR 0.152.
 */
package net.rocketpowered.common.network.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketErrorException;
import io.rsocket.exceptions.CustomRSocketException;
import io.rsocket.util.ByteBufPayload;
import java.util.function.Supplier;
import net.rocketpowered.common.ErrorCode;
import net.rocketpowered.common.RocketException;
import net.rocketpowered.common.network.Connection;
import net.rocketpowered.common.network.ConnectionHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class RSocketConnection
implements Connection {
    private static final Logger logger = LoggerFactory.getLogger(RSocketConnection.class);
    private final RSocket outboundHandler;
    private ConnectionHandler connectionHandler;

    public RSocketConnection(RSocket outboundHandler) {
        this.outboundHandler = outboundHandler;
    }

    Supplier<RSocket> inboundRSocket() {
        return () -> new InboundRSocket();
    }

    @Override
    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    @Override
    public void loadConnectionHandler(ConnectionHandler connectionHandler) {
        if (this.connectionHandler != null) {
            this.connectionHandler.unload(this);
        }
        this.connectionHandler = connectionHandler;
        this.connectionHandler.load(this);
    }

    @Override
    public Publisher<?> send(Publisher<?> dataPublisher) {
        return Flux.from(dataPublisher).transformDeferred(this::processOutbound).transformDeferred(this.outboundHandler::requestChannel).transformDeferred(this::processInbound);
    }

    @Override
    public Mono<Void> onClose() {
        return this.outboundHandler.onClose();
    }

    @Override
    public void dispose() {
        this.outboundHandler.dispose();
    }

    private Publisher<?> processInbound(Flux<Payload> payloads) {
        if (this.connectionHandler == null) {
            throw new IllegalStateException("connectionHandler cannot be null");
        }
        return payloads.map(Payload::sliceData).map(this.connectionHandler.getProtocol()::decodeAndRelease).onErrorMap(CustomRSocketException.class, RocketException::fromRSocketException);
    }

    private Flux<Payload> processOutbound(Flux<?> dataPublisher) {
        if (this.connectionHandler == null) {
            throw new IllegalStateException("connectionHandler cannot be null");
        }
        return dataPublisher.map(this.connectionHandler.getProtocol()::encode).doOnError(RocketException.class, error -> {
            if (error.getError() == ErrorCode.INTERNAL_SERVER_EXCEPTION) {
                logger.error("Internal server error", (Throwable)error);
            }
        }).onErrorMap(RocketException.class, RocketException::toRSocketException).onErrorMap(error -> !(error instanceof RSocketErrorException), error -> {
            logger.error("Unhandled outgoing exception", error);
            return new RocketException(ErrorCode.INTERNAL_SERVER_EXCEPTION, error.getMessage());
        }).map(ByteBufPayload::create);
    }

    private class InboundRSocket
    implements RSocket {
        private InboundRSocket() {
        }

        @Override
        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return Flux.from(payloads).transformDeferred(RSocketConnection.this::processInbound).flatMap(payload -> RSocketConnection.this.connectionHandler.handlePayload(RSocketConnection.this, payload)).transformDeferred(RSocketConnection.this::processOutbound);
        }
    }
}

