/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.codec.ServiceMessageCodec;
import java.lang.reflect.Type;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.channel.AbortedException;

public class RSocketClientChannel
implements ClientChannel {
    private final Mono<RSocket> rsocket;
    private final ServiceMessageCodec messageCodec;

    public RSocketClientChannel(Mono<RSocket> rsocket, ServiceMessageCodec codec) {
        this.rsocket = rsocket;
        this.messageCodec = codec;
    }

    public Mono<ServiceMessage> requestResponse(ServiceMessage message, Type responseType) {
        return this.rsocket.flatMap(rsocket -> rsocket.requestResponse(this.toPayload(message))).map(this::toMessage).map(msg -> ServiceMessageCodec.decodeData((ServiceMessage)msg, (Type)responseType)).onErrorMap(RSocketClientChannel::mapConnectionAborted);
    }

    public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseType) {
        return this.rsocket.flatMapMany(rsocket -> rsocket.requestStream(this.toPayload(message))).map(this::toMessage).map(msg -> ServiceMessageCodec.decodeData((ServiceMessage)msg, (Type)responseType)).onErrorMap(RSocketClientChannel::mapConnectionAborted);
    }

    public Flux<ServiceMessage> requestChannel(Publisher<ServiceMessage> publisher, Type responseType) {
        return this.rsocket.flatMapMany(rsocket -> rsocket.requestChannel((Publisher)Flux.from((Publisher)publisher).map(this::toPayload))).map(this::toMessage).map(msg -> ServiceMessageCodec.decodeData((ServiceMessage)msg, (Type)responseType)).onErrorMap(RSocketClientChannel::mapConnectionAborted);
    }

    private Payload toPayload(ServiceMessage request) {
        return (Payload)this.messageCodec.encodeAndTransform(request, ByteBufPayload::create);
    }

    private ServiceMessage toMessage(Payload payload) {
        try {
            ServiceMessage serviceMessage = this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain());
            return serviceMessage;
        }
        finally {
            payload.release();
        }
    }

    private static Throwable mapConnectionAborted(Throwable t) {
        return AbortedException.isConnectionReset((Throwable)t) || ConnectionClosedException.isConnectionClosed((Throwable)t) ? new ConnectionClosedException(t) : t;
    }
}

