/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.examples.services;

import io.scalecube.services.annotations.Inject;
import io.scalecube.services.examples.services.Service1;
import io.scalecube.services.examples.services.Service2;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.locks.LockSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

public class Service1Impl
implements Service1 {
    private static final long SLEEP_PERIOD_NS = 10000L;
    @Inject
    private Service2 remoteService;

    @Override
    public Flux<String> manyDelay(long interval) {
        return Flux.create(sink -> sink.onRequest(r -> this.onRequest((FluxSink<String>)sink, interval))).subscribeOn(Schedulers.parallel()).log("manyDelay   |");
    }

    @Override
    public Flux<String> remoteCallThenManyDelay(long interval) {
        return this.remoteService.oneDelay(interval).publishOn(Schedulers.parallel()).log("remoteCall  |").then(this.remoteService.oneDelay(interval).publishOn(Schedulers.parallel()).log("remoteCall2  |")).flatMapMany(i -> Flux.create(sink -> sink.onRequest(r -> this.onRequest((FluxSink<String>)sink, interval))).subscribeOn(Schedulers.parallel()).log("manyInner   |")).log("rcManyDelay |");
    }

    private void onRequest(FluxSink<String> sink, long interval) {
        long lastPublished = System.currentTimeMillis();
        while (!sink.isCancelled() && sink.requestedFromDownstream() > 0L) {
            long now = System.currentTimeMillis();
            if (sink.requestedFromDownstream() > 0L && now - lastPublished > interval) {
                lastPublished = now;
                sink.next((Object)this.toResponse(now));
                continue;
            }
            LockSupport.parkNanos(10000L);
        }
    }

    private String toResponse(long now) {
        String currentThread = Thread.currentThread().getName();
        LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(now), ZoneId.systemDefault());
        return "|" + currentThread + "| response: " + time;
    }
}

