/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

class SetupHandlingDuplexConnection
extends Flux<ByteBuf>
implements DuplexConnection,
CoreSubscriber<ByteBuf>,
Subscription {
    final DuplexConnection source;
    final MonoSink<Tuple2<ByteBuf, DuplexConnection>> sink;
    Subscription s;
    boolean firstFrameReceived = false;
    CoreSubscriber<? super ByteBuf> actual;
    boolean done;
    Throwable t;

    SetupHandlingDuplexConnection(DuplexConnection source, MonoSink<Tuple2<ByteBuf, DuplexConnection>> sink) {
        this.source = source;
        this.sink = sink;
        source.receive().subscribe(this);
    }

    @Override
    public void dispose() {
        this.source.dispose();
    }

    @Override
    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.source.onClose();
    }

    @Override
    public void sendFrame(int streamId, ByteBuf frame) {
        this.source.sendFrame(streamId, frame);
    }

    @Override
    public Flux<ByteBuf> receive() {
        return this;
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.source.remoteAddress();
    }

    @Override
    public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
        if (this.done) {
            Throwable t = this.t;
            if (t == null) {
                Operators.complete(actual);
            } else {
                Operators.error(actual, t);
            }
            return;
        }
        this.actual = actual;
        actual.onSubscribe(this);
    }

    @Override
    public void request(long n) {
        if (n != Long.MAX_VALUE) {
            this.actual.onError(new IllegalArgumentException("Only unbounded request is allowed"));
            return;
        }
        this.s.request(Long.MAX_VALUE);
    }

    @Override
    public void cancel() {
        this.source.dispose();
        this.s.cancel();
    }

    @Override
    public void onSubscribe(Subscription s2) {
        if (Operators.validate(this.s, s2)) {
            this.s = s2;
            s2.request(1L);
        }
    }

    @Override
    public void onNext(ByteBuf frame) {
        if (!this.firstFrameReceived) {
            this.firstFrameReceived = true;
            this.sink.success(Tuples.of(frame, this));
            return;
        }
        this.actual.onNext((ByteBuf)frame);
    }

    @Override
    public void onError(Throwable t) {
        if (this.done) {
            Operators.onErrorDropped(t, Context.empty());
            return;
        }
        this.done = true;
        this.t = t;
        if (!this.firstFrameReceived) {
            this.sink.error(t);
            return;
        }
        CoreSubscriber<? super ByteBuf> actual = this.actual;
        if (actual != null) {
            actual.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        if (!this.firstFrameReceived) {
            this.sink.error(new ClosedChannelException());
            return;
        }
        CoreSubscriber<? super ByteBuf> actual = this.actual;
        if (actual != null) {
            actual.onComplete();
        }
    }

    @Override
    public void sendErrorAndClose(RSocketErrorException e) {
        this.source.sendErrorAndClose(e);
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.source.alloc();
    }
}

