/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

public class InMemoryResumableFramesStore
extends Flux<ByteBuf>
implements ResumableFramesStore,
Subscription {
    private FramesSubscriber framesSubscriber;
    private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class);
    final Sinks.Empty<Void> disposed = Sinks.empty();
    final Queue<ByteBuf> cachedFrames;
    final String side;
    final String session;
    final int cacheLimit;
    volatile long impliedPosition;
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> IMPLIED_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "impliedPosition");
    volatile long firstAvailableFramePosition;
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> FIRST_AVAILABLE_FRAME_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "firstAvailableFramePosition");
    long remoteImpliedPosition;
    int cacheSize;
    Throwable terminal;
    CoreSubscriber<? super ByteBuf> actual;
    CoreSubscriber<? super ByteBuf> pendingActual;
    volatile long state;
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> STATE = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state");
    static final long FINALIZED_FLAG = Long.MIN_VALUE;
    static final long DISPOSED_FLAG = 0x4000000000000000L;
    static final long TERMINATED_FLAG = 0x2000000000000000L;
    static final long CONNECTED_FLAG = 0x1000000000000000L;
    static final long PENDING_CONNECTION_FLAG = 0x800000000000000L;
    static final long REMOTE_IMPLIED_POSITION_CHANGED_FLAG = 0x400000000000000L;
    static final long HAS_FRAME_FLAG = 0x200000000000000L;
    static final long MAX_WORK_IN_PROGRESS = 0xFFFFFFFFL;

    public InMemoryResumableFramesStore(String side, ByteBuf session, int cacheSizeBytes) {
        this.side = side;
        this.session = session.toString(CharsetUtil.UTF_8);
        this.cacheLimit = cacheSizeBytes;
        this.cachedFrames = new ArrayDeque<ByteBuf>();
    }

    @Override
    public Mono<Void> saveFrames(Flux<ByteBuf> frames) {
        return frames.transform(Operators.lift((__, actual) -> {
            this.framesSubscriber = new FramesSubscriber((CoreSubscriber<? super Void>)actual, this);
            return this.framesSubscriber;
        })).then();
    }

    @Override
    public void releaseFrames(long remoteImpliedPos) {
        long lastReceivedRemoteImpliedPosition = this.remoteImpliedPosition;
        if (lastReceivedRemoteImpliedPosition > remoteImpliedPos) {
            throw new IllegalStateException("Given Remote Implied Position is behind the last received Remote Implied Position");
        }
        this.remoteImpliedPosition = remoteImpliedPos;
        long previousState = InMemoryResumableFramesStore.markRemoteImpliedPositionChanged(this);
        if (InMemoryResumableFramesStore.isFinalized(previousState) || InMemoryResumableFramesStore.isWorkInProgress(previousState)) {
            return;
        }
        this.drain(previousState + 1L | 0x400000000000000L);
    }

    void drain(long expectedState) {
        Fuseable.QueueSubscription<ByteBuf> qs = this.framesSubscriber.qs;
        Queue<ByteBuf> cachedFrames = this.cachedFrames;
        do {
            if (InMemoryResumableFramesStore.hasRemoteImpliedPositionChanged(expectedState)) {
                expectedState = this.handlePendingRemoteImpliedPositionChanges(expectedState, cachedFrames);
            }
            if (InMemoryResumableFramesStore.hasPendingConnection(expectedState)) {
                expectedState = this.handlePendingConnection(expectedState, cachedFrames);
            }
            if (InMemoryResumableFramesStore.isConnected(expectedState)) {
                if (InMemoryResumableFramesStore.isTerminated(expectedState)) {
                    this.handleTerminal(this.terminal);
                } else if (this.isDisposed()) {
                    this.handleTerminal(new CancellationException("Disposed"));
                } else if (InMemoryResumableFramesStore.hasFrames(expectedState)) {
                    this.handlePendingFrames(qs);
                }
            }
            if (InMemoryResumableFramesStore.isDisposed(expectedState) || InMemoryResumableFramesStore.isTerminated(expectedState)) {
                InMemoryResumableFramesStore.clearAndFinalize(this);
                return;
            }
            if (!InMemoryResumableFramesStore.isFinalized(expectedState = InMemoryResumableFramesStore.markWorkDone(this, expectedState))) continue;
            return;
        } while (InMemoryResumableFramesStore.isWorkInProgress(expectedState));
    }

    long handlePendingRemoteImpliedPositionChanges(long expectedState, Queue<ByteBuf> cachedFrames) {
        long remoteImpliedPosition = this.remoteImpliedPosition;
        long firstAvailableFramePosition = this.firstAvailableFramePosition;
        long toDropFromCache = Math.max(0L, remoteImpliedPosition - firstAvailableFramePosition);
        if (toDropFromCache > 0L) {
            int droppedFromCache = InMemoryResumableFramesStore.dropFramesFromCache(toDropFromCache, cachedFrames);
            if (toDropFromCache > (long)droppedFromCache) {
                this.terminal = new IllegalStateException(String.format("Local and remote state disagreement: need to remove additional %d bytes, but cache is empty", toDropFromCache));
                expectedState = InMemoryResumableFramesStore.markTerminated(this) | 0x2000000000000000L;
            }
            if (toDropFromCache < (long)droppedFromCache) {
                this.terminal = new IllegalStateException("Local and remote state disagreement: local and remote frame sizes are not equal");
                expectedState = InMemoryResumableFramesStore.markTerminated(this) | 0x2000000000000000L;
            }
            FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + (long)droppedFromCache);
            if (this.cacheLimit != Integer.MAX_VALUE) {
                this.cacheSize -= droppedFromCache;
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[{}]|Session[{}]. Removed frames from cache to position[{}]. CacheSize[{}]", new Object[]{this.side, this.session, this.remoteImpliedPosition, this.cacheSize});
                }
            }
        }
        return expectedState;
    }

    void handlePendingFrames(Fuseable.QueueSubscription<ByteBuf> qs) {
        do {
            ByteBuf frame;
            boolean empty;
            boolean bl = empty = (frame = (ByteBuf)qs.poll()) == null;
            if (empty) break;
            this.handleFrame(frame);
        } while (InMemoryResumableFramesStore.isConnected(this.state));
    }

    long handlePendingConnection(long expectedState, Queue<ByteBuf> cachedFrames) {
        CoreSubscriber<? super ByteBuf> lastActual = null;
        while (true) {
            CoreSubscriber<? super ByteBuf> nextActual;
            if ((nextActual = this.pendingActual) != lastActual) {
                for (ByteBuf frame : cachedFrames) {
                    nextActual.onNext((ByteBuf)frame.retainedSlice());
                }
            }
            if (InMemoryResumableFramesStore.isConnected(expectedState = InMemoryResumableFramesStore.markConnected(this, expectedState))) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[{}]|Session[{}]. Connected at Position[{}] and ImpliedPosition[{}]", new Object[]{this.side, this.session, this.firstAvailableFramePosition, this.impliedPosition});
                }
                this.actual = nextActual;
                break;
            }
            if (!InMemoryResumableFramesStore.hasPendingConnection(expectedState)) break;
            lastActual = nextActual;
        }
        return expectedState;
    }

    static int dropFramesFromCache(long toRemoveBytes, Queue<ByteBuf> cache) {
        int removedBytes = 0;
        while (toRemoveBytes > (long)removedBytes && cache.size() > 0) {
            ByteBuf cachedFrame = cache.poll();
            int frameSize = cachedFrame.readableBytes();
            cachedFrame.release();
            removedBytes += frameSize;
        }
        return removedBytes;
    }

    @Override
    public Flux<ByteBuf> resumeStream() {
        return this;
    }

    @Override
    public long framePosition() {
        return this.firstAvailableFramePosition;
    }

    @Override
    public long frameImpliedPosition() {
        return this.impliedPosition & Long.MAX_VALUE;
    }

    @Override
    public boolean resumableFrameReceived(ByteBuf frame) {
        long impliedPosition;
        int frameSize = frame.readableBytes();
        do {
            if ((impliedPosition = this.impliedPosition) >= 0L) continue;
            return false;
        } while (!IMPLIED_POSITION.compareAndSet(this, impliedPosition, impliedPosition + (long)frameSize));
        return true;
    }

    void pauseImplied() {
        long impliedPosition;
        while (!IMPLIED_POSITION.compareAndSet(this, impliedPosition = this.impliedPosition, impliedPosition | Long.MIN_VALUE)) {
        }
        logger.debug("Side[{}]|Session[{}]. Paused at position[{}]", new Object[]{this.side, this.session, impliedPosition});
    }

    void resumeImplied() {
        long restoredImpliedPosition;
        long impliedPosition;
        while (!IMPLIED_POSITION.compareAndSet(this, impliedPosition = this.impliedPosition, restoredImpliedPosition = impliedPosition & Long.MAX_VALUE)) {
        }
        logger.debug("Side[{}]|Session[{}]. Resumed at position[{}]", new Object[]{this.side, this.session, restoredImpliedPosition});
    }

    @Override
    public Mono<Void> onClose() {
        return this.disposed.asMono();
    }

    @Override
    public void dispose() {
        long previousState = InMemoryResumableFramesStore.markDisposed(this);
        if (InMemoryResumableFramesStore.isFinalized(previousState) || InMemoryResumableFramesStore.isDisposed(previousState) || InMemoryResumableFramesStore.isWorkInProgress(previousState)) {
            return;
        }
        this.drain(previousState | 0x4000000000000000L);
    }

    void clearCache() {
        ByteBuf frame;
        Queue<ByteBuf> frames = this.cachedFrames;
        this.cacheSize = 0;
        while ((frame = frames.poll()) != null) {
            frame.release();
        }
    }

    @Override
    public boolean isDisposed() {
        return InMemoryResumableFramesStore.isDisposed(this.state);
    }

    void handleFrame(ByteBuf frame) {
        boolean isResumable = ResumableDuplexConnection.isResumableFrame(frame);
        if (isResumable) {
            this.handleResumableFrame(frame);
            return;
        }
        this.handleConnectionFrame(frame);
    }

    void handleTerminal(@Nullable Throwable t) {
        if (t != null) {
            this.actual.onError(t);
        } else {
            this.actual.onComplete();
        }
    }

    void handleConnectionFrame(ByteBuf frame) {
        this.actual.onNext((ByteBuf)frame);
    }

    void handleResumableFrame(ByteBuf frame) {
        boolean canBeStore;
        Queue<ByteBuf> frames = this.cachedFrames;
        int incomingFrameSize = frame.readableBytes();
        int cacheLimit = this.cacheLimit;
        int cacheSize = this.cacheSize;
        if (cacheLimit != Integer.MAX_VALUE) {
            long availableSize = cacheLimit - cacheSize;
            if (availableSize < (long)incomingFrameSize) {
                long firstAvailableFramePosition = this.firstAvailableFramePosition;
                long toRemoveBytes = (long)incomingFrameSize - availableSize;
                int removedBytes = InMemoryResumableFramesStore.dropFramesFromCache(toRemoveBytes, frames);
                cacheSize -= removedBytes;
                boolean bl = canBeStore = (long)removedBytes >= toRemoveBytes;
                if (canBeStore) {
                    FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + (long)removedBytes);
                } else {
                    this.cacheSize = cacheSize;
                    FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + (long)removedBytes + (long)incomingFrameSize);
                }
            } else {
                canBeStore = true;
            }
        } else {
            canBeStore = true;
        }
        if (canBeStore) {
            frames.offer(frame);
            if (cacheLimit != Integer.MAX_VALUE) {
                this.cacheSize = cacheSize + incomingFrameSize;
            }
        }
        this.actual.onNext((ByteBuf)(canBeStore ? frame.retainedSlice() : frame));
    }

    @Override
    public void request(long n) {
    }

    @Override
    public void cancel() {
        this.pauseImplied();
        InMemoryResumableFramesStore.markDisconnected(this);
        if (logger.isDebugEnabled()) {
            logger.debug("Side[{}]|Session[{}]. Disconnected at Position[{}] and ImpliedPosition[{}]", new Object[]{this.side, this.session, this.firstAvailableFramePosition, this.frameImpliedPosition()});
        }
    }

    @Override
    public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
        this.resumeImplied();
        actual.onSubscribe(this);
        this.pendingActual = actual;
        long previousState = InMemoryResumableFramesStore.markPendingConnection(this);
        if (InMemoryResumableFramesStore.isDisposed(previousState)) {
            actual.onError(new CancellationException("Disposed"));
            return;
        }
        if (InMemoryResumableFramesStore.isTerminated(previousState)) {
            actual.onError(new CancellationException("Disposed"));
            return;
        }
        if (InMemoryResumableFramesStore.isWorkInProgress(previousState)) {
            return;
        }
        this.drain(previousState + 1L | 0x800000000000000L);
    }

    static long markFrameAdded(InMemoryResumableFramesStore store) {
        long nextState;
        long state;
        do {
            if (InMemoryResumableFramesStore.isFinalized(state = store.state)) {
                return state;
            }
            nextState = state;
            if (!InMemoryResumableFramesStore.isConnected(state) && !InMemoryResumableFramesStore.hasPendingConnection(state) && !InMemoryResumableFramesStore.isWorkInProgress(state)) continue;
            long l = nextState = (state & 0xFFFFFFFFL) == 0xFFFFFFFFL ? nextState : nextState + 1L;
        } while (!STATE.compareAndSet(store, state, nextState | 0x200000000000000L));
        return state;
    }

    static long markPendingConnection(InMemoryResumableFramesStore store) {
        long nextState;
        long state;
        do {
            if (InMemoryResumableFramesStore.isFinalized(state = store.state) || InMemoryResumableFramesStore.isDisposed(state) || InMemoryResumableFramesStore.isTerminated(state)) {
                return state;
            }
            if (!InMemoryResumableFramesStore.isConnected(state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, (nextState = (state & 0xFFFFFFFFL) == 0xFFFFFFFFL ? state : state + 1L) | 0x800000000000000L));
        return state;
    }

    static long markRemoteImpliedPositionChanged(InMemoryResumableFramesStore store) {
        long nextState;
        long state;
        do {
            if (!InMemoryResumableFramesStore.isFinalized(state = store.state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, (nextState = (state & 0xFFFFFFFFL) == 0xFFFFFFFFL ? state : state + 1L) | 0x400000000000000L));
        return state;
    }

    static long markDisconnected(InMemoryResumableFramesStore store) {
        long state;
        do {
            if (!InMemoryResumableFramesStore.isFinalized(state = store.state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, state & 0xEFFFFFFFFFFFFFFFL & 0xF7FFFFFFFFFFFFFFL));
        return state;
    }

    static long markWorkDone(InMemoryResumableFramesStore store, long expectedState) {
        long nextState;
        long state;
        do {
            if (expectedState != (state = store.state)) {
                return state;
            }
            if (!InMemoryResumableFramesStore.isFinalized(state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, nextState = state & 0xFFFFFFFF00000000L & 0xFBFFFFFFFFFFFFFFL));
        return nextState;
    }

    static long markConnected(InMemoryResumableFramesStore store, long expectedState) {
        long nextState;
        long state;
        do {
            if ((state = store.state) != expectedState) {
                return state;
            }
            if (!InMemoryResumableFramesStore.isFinalized(state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, nextState = state ^ 0x800000000000000L | 0x1000000000000000L));
        return nextState;
    }

    static long markTerminated(InMemoryResumableFramesStore store) {
        long nextState;
        long state;
        do {
            if (!InMemoryResumableFramesStore.isFinalized(state = store.state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, (nextState = (state & 0xFFFFFFFFL) == 0xFFFFFFFFL ? state : state + 1L) | 0x2000000000000000L));
        return state;
    }

    static long markDisposed(InMemoryResumableFramesStore store) {
        long nextState;
        long state;
        do {
            if (!InMemoryResumableFramesStore.isFinalized(state = store.state)) continue;
            return state;
        } while (!STATE.compareAndSet(store, state, (nextState = (state & 0xFFFFFFFFL) == 0xFFFFFFFFL ? state : state + 1L) | 0x4000000000000000L));
        return state;
    }

    static void clearAndFinalize(InMemoryResumableFramesStore store) {
        long state;
        Fuseable.QueueSubscription<ByteBuf> qs = store.framesSubscriber.qs;
        do {
            state = store.state;
            qs.clear();
            store.clearCache();
            if (!InMemoryResumableFramesStore.isFinalized(state)) continue;
            return;
        } while (!STATE.compareAndSet(store, state, state | Long.MIN_VALUE));
        store.disposed.tryEmitEmpty();
        store.framesSubscriber.onComplete();
    }

    static boolean isConnected(long state) {
        return (state & 0x1000000000000000L) == 0x1000000000000000L;
    }

    static boolean hasRemoteImpliedPositionChanged(long state) {
        return (state & 0x400000000000000L) == 0x400000000000000L;
    }

    static boolean hasPendingConnection(long state) {
        return (state & 0x800000000000000L) == 0x800000000000000L;
    }

    static boolean hasFrames(long state) {
        return (state & 0x200000000000000L) == 0x200000000000000L;
    }

    static boolean isTerminated(long state) {
        return (state & 0x2000000000000000L) == 0x2000000000000000L;
    }

    static boolean isDisposed(long state) {
        return (state & 0x4000000000000000L) == 0x4000000000000000L;
    }

    static boolean isFinalized(long state) {
        return (state & Long.MIN_VALUE) == Long.MIN_VALUE;
    }

    static boolean isWorkInProgress(long state) {
        return (state & 0xFFFFFFFFL) > 0L;
    }

    static class FramesSubscriber
    implements CoreSubscriber<ByteBuf>,
    Fuseable.QueueSubscription<Void> {
        final CoreSubscriber<? super Void> actual;
        final InMemoryResumableFramesStore parent;
        Fuseable.QueueSubscription<ByteBuf> qs;
        boolean done;

        FramesSubscriber(CoreSubscriber<? super Void> actual, InMemoryResumableFramesStore parent) {
            this.actual = actual;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Subscription s2) {
            if (Operators.validate(this.qs, s2)) {
                Fuseable.QueueSubscription qs;
                this.qs = qs = (Fuseable.QueueSubscription)s2;
                int m3 = qs.requestFusion(3);
                if (m3 != 2) {
                    s2.cancel();
                    this.actual.onSubscribe(this);
                    this.actual.onError(new IllegalStateException("Source has to be ASYNC fuseable"));
                    return;
                }
                this.actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(ByteBuf byteBuf) {
            InMemoryResumableFramesStore parent = this.parent;
            long previousState = InMemoryResumableFramesStore.markFrameAdded(parent);
            if (InMemoryResumableFramesStore.isFinalized(previousState)) {
                this.qs.clear();
                return;
            }
            if (InMemoryResumableFramesStore.isWorkInProgress(previousState) || !InMemoryResumableFramesStore.isConnected(previousState) && !InMemoryResumableFramesStore.hasPendingConnection(previousState)) {
                return;
            }
            parent.drain(previousState + 1L);
        }

        @Override
        public void onError(Throwable t) {
            if (this.done) {
                Operators.onErrorDropped(t, this.actual.currentContext());
                return;
            }
            InMemoryResumableFramesStore parent = this.parent;
            parent.terminal = t;
            this.done = true;
            long previousState = InMemoryResumableFramesStore.markTerminated(parent);
            if (InMemoryResumableFramesStore.isFinalized(previousState)) {
                Operators.onErrorDropped(t, this.actual.currentContext());
                return;
            }
            if (InMemoryResumableFramesStore.isWorkInProgress(previousState)) {
                return;
            }
            parent.drain(previousState | 0x2000000000000000L);
        }

        @Override
        public void onComplete() {
            if (this.done) {
                return;
            }
            InMemoryResumableFramesStore parent = this.parent;
            this.done = true;
            long previousState = InMemoryResumableFramesStore.markTerminated(parent);
            if (InMemoryResumableFramesStore.isFinalized(previousState)) {
                return;
            }
            if (InMemoryResumableFramesStore.isWorkInProgress(previousState)) {
                return;
            }
            parent.drain(previousState | 0x2000000000000000L);
        }

        @Override
        public void cancel() {
            if (this.done) {
                return;
            }
            this.done = true;
            long previousState = InMemoryResumableFramesStore.markTerminated(this.parent);
            if (InMemoryResumableFramesStore.isFinalized(previousState)) {
                return;
            }
            if (InMemoryResumableFramesStore.isWorkInProgress(previousState)) {
                return;
            }
            this.parent.drain(previousState | 0x2000000000000000L);
        }

        @Override
        public void request(long n) {
        }

        @Override
        public int requestFusion(int requestedMode) {
            return 0;
        }

        @Override
        public Void poll() {
            return null;
        }

        @Override
        public int size() {
            return 0;
        }

        @Override
        public boolean isEmpty() {
            return false;
        }

        @Override
        public void clear() {
        }
    }
}

