/*
 * Decompiled with CFR 0.152.
 */
package net.rocketpowered.sdk.network;

import io.rsocket.Closeable;
import io.rsocket.core.RSocketConnector;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.transport.netty.client.TcpClientTransport;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import net.rocketpowered.common.GatewayVersion;
import net.rocketpowered.common.network.Connection;
import net.rocketpowered.common.network.protocol.handshake.HandshakeMessage;
import net.rocketpowered.common.network.protocol.handshake.HandshakeProtocol;
import net.rocketpowered.common.network.protocol.v1.login.LoginProtocol;
import net.rocketpowered.common.network.rsocket.RSocketBootstrapper;
import net.rocketpowered.sdk.exception.GatewayUnavailableException;
import net.rocketpowered.sdk.util.ServiceRecord;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.netty.tcp.TcpClient;
import reactor.retry.Repeat;

public class ConnectionManager {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
    private final Sinks.Many<Connection> connectionFeed = Sinks.many().replay().latest();
    private final Supplier<String> serviceName;
    private final Function<Connection, Mono<Void>> loginHandler;
    @Nullable
    private Connection connection;

    public ConnectionManager(Supplier<String> serviceName, Function<Connection, Mono<Void>> loginHandler) {
        this.serviceName = serviceName;
        this.loginHandler = loginHandler;
    }

    public void init() {
        this.connect().doOnNext(this::connected).flatMap(Closeable::onClose).onErrorMap(ConnectException.class, GatewayUnavailableException::new).onErrorMap(ConnectionErrorException.class, GatewayUnavailableException::new).onErrorMap(ClosedChannelException.class, GatewayUnavailableException::new).doOnError(this::connectionError).onErrorResume(GatewayUnavailableException.class, __ -> Mono.empty()).repeatWhen(Repeat.times(Long.MAX_VALUE).exponentialBackoff(Duration.ofSeconds(10L), Duration.ofMinutes(2L))).subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    private void connectionError(Throwable error) {
        if (error instanceof GatewayUnavailableException) {
            logger.info("Gateway unavailable: {}", (Object)error.getMessage());
        } else {
            logger.error("Connection error", error);
        }
        this.connection = null;
    }

    private void connected(Connection connection) {
        logger.info("Connected to Rocket");
        this.connectionFeed.tryEmitNext(connection);
        this.connection = connection;
    }

    private Mono<Connection> connect() {
        return Mono.fromSupplier(this.serviceName).map(ServiceRecord::resolve).flatMap(Mono::justOrEmpty).doOnNext(serviceRecord -> logger.info("Connecting to Rocket @ {}:{}", (Object)serviceRecord.host(), (Object)serviceRecord.port())).flatMap(serviceRecord -> RSocketBootstrapper.connect(RSocketConnector::create, TcpClientTransport.create((TcpClient)TcpClient.create().remoteAddress(serviceRecord::toInetSocketAddress)))).doOnNext(connection -> connection.loadConnectionHandler(() -> HandshakeProtocol.INSTANCE)).delayUntil(connection -> connection.send(new HandshakeMessage(GatewayVersion.V1))).doOnNext(connection -> connection.loadConnectionHandler(() -> LoginProtocol.INSTANCE)).delayUntil(this.loginHandler);
    }

    public Flux<Connection> getConnectionFeed() {
        return this.connectionFeed.asFlux();
    }

    public Optional<Connection> getConnection() {
        return Optional.ofNullable(this.connection);
    }
}

