/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameLengthCodec;
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

public final class TcpDuplexConnection
extends BaseDuplexConnection {
    private final Connection connection;

    public TcpDuplexConnection(Connection connection) {
        this.connection = Objects.requireNonNull(connection, "connection must not be null");
        connection.channel().closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener)future -> {
            if (!this.isDisposed()) {
                this.dispose();
            }
        }));
        connection.outbound().send(this.sender).then().subscribe();
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.connection.channel().remoteAddress();
    }

    @Override
    protected void doOnClose() {
        this.connection.dispose();
    }

    @Override
    public Mono<Void> onClose() {
        return super.onClose().and(this.connection.onDispose());
    }

    @Override
    public void sendErrorAndClose(RSocketErrorException e) {
        ByteBuf errorFrame = ErrorFrameCodec.encode(this.alloc(), 0, e);
        this.connection.outbound().sendObject(FrameLengthCodec.encode(this.alloc(), errorFrame.readableBytes(), errorFrame)).subscribe((Subscriber<? super Void>)this.connection.disposeSubscriber());
        this.sender.onComplete();
    }

    @Override
    public Flux<ByteBuf> receive() {
        return this.connection.inbound().receive().map(FrameLengthCodec::frame);
    }

    @Override
    public void sendFrame(int streamId, ByteBuf frame) {
        super.sendFrame(streamId, FrameLengthCodec.encode(this.alloc(), frame.readableBytes(), frame));
    }
}

