/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.scheduler;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Arrays;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.ReactorThreadFactory;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.SchedulerState;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/*
 * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
 */
final class BoundedElasticThreadPerTaskScheduler
implements Scheduler,
SchedulerState.DisposeAwaiter<BoundedServices>,
Scannable {
    static final Logger LOGGER = Loggers.getLogger(BoundedElasticThreadPerTaskScheduler.class);
    final int maxThreads;
    final int maxTasksQueuedPerThread;
    final ThreadFactory factory;
    volatile SchedulerState<BoundedServices> state;
    static final AtomicReferenceFieldUpdater<BoundedElasticThreadPerTaskScheduler, SchedulerState> STATE = AtomicReferenceFieldUpdater.newUpdater(BoundedElasticThreadPerTaskScheduler.class, SchedulerState.class, "state");
    private static final SchedulerState<BoundedServices> INIT = SchedulerState.init(BoundedServices.SHUTDOWN);

    BoundedElasticThreadPerTaskScheduler(int maxThreads, int maxTasksQueuedPerThread, ThreadFactory threadFactory) {
        if (maxThreads <= 0) {
            throw new IllegalArgumentException("maxThreads must be strictly positive, was " + maxThreads);
        }
        if (maxTasksQueuedPerThread <= 0) {
            throw new IllegalArgumentException("maxTasksQueuedPerThread must be strictly positive, was " + maxTasksQueuedPerThread);
        }
        this.maxThreads = maxThreads;
        this.maxTasksQueuedPerThread = maxTasksQueuedPerThread;
        this.factory = threadFactory;
        STATE.lazySet(this, INIT);
    }

    @Override
    public boolean isDisposed() {
        SchedulerState<BoundedServices> current = this.state;
        return current != INIT && current.currentResource == BoundedServices.SHUTDOWN;
    }

    @Override
    public void init() {
        SchedulerState<BoundedServices> b;
        do {
            SchedulerState<BoundedServices> a;
            if ((a = this.state) == INIT) continue;
            if (a.currentResource == BoundedServices.SHUTDOWN) {
                throw new IllegalStateException("Initializing a disposed scheduler is not permitted");
            }
            return;
        } while (STATE.compareAndSet(this, INIT, b = SchedulerState.init(new BoundedServices(this))));
    }

    @Override
    @Deprecated
    public void start() {
        throw new UnsupportedOperationException("Use init method instead");
    }

    @Override
    public boolean await(BoundedServices boundedServices, long timeout, TimeUnit timeUnit) throws InterruptedException {
        if (!boundedServices.sharedDelayedTasksScheduler.awaitTermination(timeout, timeUnit)) {
            return false;
        }
        for (SequentialThreadPerTaskExecutor bs : boundedServices.activeExecutorsState.array) {
            if (bs.await(timeout, timeUnit)) continue;
            return false;
        }
        return true;
    }

    @Override
    public void dispose() {
        SchedulerState<BoundedServices> previous = this.state;
        if (previous.currentResource == BoundedServices.SHUTDOWN) {
            if (previous.initialResource != null) {
                ((BoundedServices)previous.initialResource).sharedDelayedTasksScheduler.shutdownNow();
                for (SequentialThreadPerTaskExecutor bs : ((BoundedServices)previous.initialResource).activeExecutorsState.array) {
                    bs.shutdown(true);
                }
            }
            return;
        }
        SequentialThreadPerTaskExecutor[] toAwait = ((BoundedServices)previous.currentResource).dispose();
        SchedulerState<BoundedServices> shutDown = SchedulerState.transition((BoundedServices)previous.currentResource, BoundedServices.SHUTDOWN, this);
        STATE.compareAndSet(this, previous, shutDown);
        assert (shutDown.initialResource != null);
        ((BoundedServices)shutDown.initialResource).sharedDelayedTasksScheduler.shutdownNow();
        for (SequentialThreadPerTaskExecutor bs : toAwait) {
            bs.shutdown(true);
        }
    }

    @Override
    public Mono<Void> disposeGracefully() {
        return Mono.defer(() -> {
            SchedulerState<BoundedServices> previous = this.state;
            if (previous.currentResource == BoundedServices.SHUTDOWN) {
                return previous.onDispose;
            }
            SequentialThreadPerTaskExecutor[] toAwait = ((BoundedServices)previous.currentResource).dispose();
            SchedulerState<BoundedServices> shutDown = SchedulerState.transition((BoundedServices)previous.currentResource, BoundedServices.SHUTDOWN, this);
            STATE.compareAndSet(this, previous, shutDown);
            assert (shutDown.initialResource != null);
            ((BoundedServices)shutDown.initialResource).sharedDelayedTasksScheduler.shutdown();
            for (SequentialThreadPerTaskExecutor bs : toAwait) {
                bs.shutdown(false);
            }
            return shutDown.onDispose;
        });
    }

    @Override
    public Disposable schedule(Runnable task) {
        SequentialThreadPerTaskExecutor picked = ((BoundedServices)this.state.currentResource).pickOrAllocate();
        try {
            return picked.schedule(task, null);
        }
        catch (RejectedExecutionException ex) {
            picked.dispose();
            throw ex;
        }
    }

    @Override
    public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
        SequentialThreadPerTaskExecutor picked = ((BoundedServices)this.state.currentResource).pickOrAllocate();
        try {
            return picked.schedule(task, delay, unit, null);
        }
        catch (RejectedExecutionException ex) {
            picked.dispose();
            throw ex;
        }
    }

    @Override
    public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
        SequentialThreadPerTaskExecutor picked = ((BoundedServices)this.state.currentResource).pickOrAllocate();
        try {
            return picked.schedulePeriodically(task, initialDelay, period, unit, null);
        }
        catch (RejectedExecutionException ex) {
            picked.dispose();
            throw ex;
        }
    }

    public String toString() {
        StringBuilder ts = new StringBuilder("boundedElastic").append('(');
        if (this.factory instanceof ReactorThreadFactory) {
            ts.append('\"').append(((ReactorThreadFactory)this.factory).get()).append("\",");
        }
        ts.append("maxThreads=").append(this.maxThreads).append(",maxTasksQueuedPerThread=").append(this.maxTasksQueuedPerThread == Integer.MAX_VALUE ? "unbounded" : Integer.valueOf(this.maxTasksQueuedPerThread));
        return ts.toString();
    }

    int estimateSize() {
        return ((BoundedServices)this.state.currentResource).activeExecutorsState.array.length;
    }

    int estimateRemainingTaskCapacity() {
        if (this.maxTasksQueuedPerThread == Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        SequentialThreadPerTaskExecutor[] busyArray = ((BoundedServices)this.state.currentResource).activeExecutorsState.array;
        long numberOfTotalAvailableSlots = 0L;
        for (SequentialThreadPerTaskExecutor state : busyArray) {
            numberOfTotalAvailableSlots += (long)state.numberOfAvailableSlots();
        }
        return (int)Math.min(numberOfTotalAvailableSlots += (long)(this.maxThreads - busyArray.length) * (long)this.maxTasksQueuedPerThread, Integer.MAX_VALUE);
    }

    @Override
    public Object scanUnsafe(Scannable.Attr key) {
        if (key == Scannable.Attr.TERMINATED || key == Scannable.Attr.CANCELLED) {
            return this.isDisposed();
        }
        if (key == Scannable.Attr.BUFFERED) {
            return this.estimateSize();
        }
        if (key == Scannable.Attr.CAPACITY) {
            return this.maxThreads;
        }
        if (key == Scannable.Attr.NAME) {
            return this.toString();
        }
        return null;
    }

    @Override
    public Stream<? extends Scannable> inners() {
        BoundedServices services = (BoundedServices)this.state.currentResource;
        return Stream.of(services.activeExecutorsState.array).filter(obj -> obj != null && obj != BoundedServices.CREATING);
    }

    @Override
    public Scheduler.Worker createWorker() {
        return new SingleThreadExecutorWorker(((BoundedServices)this.state.currentResource).pickOrAllocate());
    }

    /*
     * Multiple versions of this class in jar - see https://www.benf.org/other/cfr/multi-version-jar.html
     */
    static final class BoundedServices {
        static final ActiveExecutorsState INITIAL = new ActiveExecutorsState(new SequentialThreadPerTaskExecutor[0], false);
        static final ActiveExecutorsState ALL_SHUTDOWN = new ActiveExecutorsState(new SequentialThreadPerTaskExecutor[0], true);
        static final ScheduledExecutorService DELAYED_TASKS_SCHEDULER_SHUTDOWN = Executors.newSingleThreadScheduledExecutor();
        static final BoundedServices SHUTDOWN;
        static final BoundedServices SHUTTING_DOWN;
        static final SequentialThreadPerTaskExecutor CREATING;
        static final AtomicLong DELAYED_TASKS_SCHEDULER_COUNTER;
        static final ThreadFactory DELAYED_TASKS_SCHEDULER_FACTORY;
        final BoundedElasticThreadPerTaskScheduler parent;
        final ScheduledExecutorService sharedDelayedTasksScheduler;
        final ThreadFactory factory;
        final int maxTasksQueuedPerThread;
        volatile ActiveExecutorsState activeExecutorsState;
        static final AtomicReferenceFieldUpdater<BoundedServices, ActiveExecutorsState> ACTIVE_EXECUTORS_STATE;

        private BoundedServices() {
            this.parent = null;
            this.maxTasksQueuedPerThread = 0;
            this.factory = null;
            this.activeExecutorsState = ALL_SHUTDOWN;
            this.sharedDelayedTasksScheduler = DELAYED_TASKS_SCHEDULER_SHUTDOWN;
        }

        BoundedServices(BoundedElasticThreadPerTaskScheduler parent) {
            this.parent = parent;
            this.maxTasksQueuedPerThread = parent.maxTasksQueuedPerThread;
            this.factory = parent.factory;
            this.sharedDelayedTasksScheduler = new ScheduledThreadPoolExecutor(1, DELAYED_TASKS_SCHEDULER_FACTORY);
            ACTIVE_EXECUTORS_STATE.lazySet(this, INITIAL);
        }

        void remove(SequentialThreadPerTaskExecutor sequentialThreadPerTaskExecutor) {
            ActiveExecutorsState replacement;
            ActiveExecutorsState current;
            do {
                block4: {
                    int len;
                    SequentialThreadPerTaskExecutor[] arr;
                    block3: {
                        current = this.activeExecutorsState;
                        arr = this.activeExecutorsState.array;
                        len = arr.length;
                        if (len == 0 || current.shutdown) {
                            return;
                        }
                        replacement = null;
                        if (len != 1) break block3;
                        if (arr[0] != sequentialThreadPerTaskExecutor) break block4;
                        replacement = INITIAL;
                        break block4;
                    }
                    for (int i = 0; i < len; ++i) {
                        SequentialThreadPerTaskExecutor state = arr[i];
                        if (state != sequentialThreadPerTaskExecutor) continue;
                        replacement = new ActiveExecutorsState(new SequentialThreadPerTaskExecutor[len - 1], false);
                        System.arraycopy(arr, 0, replacement.array, 0, i);
                        System.arraycopy(arr, i + 1, replacement.array, i, len - i - 1);
                        break;
                    }
                }
                if (replacement != null) continue;
                return;
            } while (!ACTIVE_EXECUTORS_STATE.compareAndSet(this, current, replacement));
        }

        SequentialThreadPerTaskExecutor pickOrAllocate() {
            SequentialThreadPerTaskExecutor choice;
            while (true) {
                ActiveExecutorsState activeState;
                if ((activeState = this.activeExecutorsState) == ALL_SHUTDOWN || activeState.shutdown) {
                    return CREATING;
                }
                SequentialThreadPerTaskExecutor[] arr = activeState.array;
                int len = arr.length;
                if (len < this.parent.maxThreads) {
                    SequentialThreadPerTaskExecutor newExecutor = new SequentialThreadPerTaskExecutor(this, true);
                    SequentialThreadPerTaskExecutor[] replacement = new SequentialThreadPerTaskExecutor[len + 1];
                    System.arraycopy(arr, 0, replacement, 0, len);
                    replacement[len] = newExecutor;
                    if (!ACTIVE_EXECUTORS_STATE.compareAndSet(this, activeState, new ActiveExecutorsState(replacement, false))) continue;
                    return newExecutor;
                }
                choice = arr[0];
                int leastBusy = Integer.MAX_VALUE;
                for (int i = 0; i < len; ++i) {
                    SequentialThreadPerTaskExecutor state = arr[i];
                    int busy = state.refCnt();
                    if (busy >= leastBusy) continue;
                    leastBusy = busy;
                    choice = state;
                }
                if (choice.retain()) break;
            }
            return choice;
        }

        public SequentialThreadPerTaskExecutor[] dispose() {
            ActiveExecutorsState current;
            do {
                current = this.activeExecutorsState;
                if (!current.shutdown) continue;
                return current.array;
            } while (!ACTIVE_EXECUTORS_STATE.compareAndSet(this, current, new ActiveExecutorsState(current.array, true)));
            return Arrays.copyOf(current.array, current.array.length);
        }

        static {
            DELAYED_TASKS_SCHEDULER_SHUTDOWN.shutdownNow();
            SHUTDOWN = new BoundedServices();
            SHUTTING_DOWN = new BoundedServices();
            SHUTDOWN.dispose();
            SHUTTING_DOWN.dispose();
            CREATING = new SequentialThreadPerTaskExecutor(SHUTDOWN, false){

                @Override
                public String toString() {
                    return "CREATING SingleThreadExecutor";
                }
            };
            BoundedServices.CREATING.wipAndRefCnt = -1L;
            DELAYED_TASKS_SCHEDULER_COUNTER = new AtomicLong();
            DELAYED_TASKS_SCHEDULER_FACTORY = r -> {
                Thread t = new Thread(r, "loomBoundedElastic-delayed-tasks-scheduler-" + DELAYED_TASKS_SCHEDULER_COUNTER.incrementAndGet());
                t.setDaemon(true);
                return t;
            };
            ACTIVE_EXECUTORS_STATE = AtomicReferenceFieldUpdater.newUpdater(BoundedServices.class, ActiveExecutorsState.class, "activeExecutorsState");
        }

        static final class ActiveExecutorsState {
            final SequentialThreadPerTaskExecutor[] array;
            final boolean shutdown;

            public ActiveExecutorsState(SequentialThreadPerTaskExecutor[] array, boolean shutdown) {
                this.array = array;
                this.shutdown = shutdown;
            }
        }
    }

    static class SequentialThreadPerTaskExecutor
    extends CountDownLatch
    implements Disposable,
    Scannable {
        final BoundedServices parent;
        final int queueCapacity;
        final Queue<SchedulerTask> tasksQueue;
        final ScheduledExecutorService scheduledTasksExecutor;
        volatile long size;
        static final AtomicLongFieldUpdater<SequentialThreadPerTaskExecutor> SIZE = AtomicLongFieldUpdater.newUpdater(SequentialThreadPerTaskExecutor.class, "size");
        volatile long wipAndRefCnt;
        static final VarHandle WIP_AND_REF_CNT;
        final ThreadFactory factory;
        SchedulerTask activeTask;
        static final long WIP_MASK = Integer.MAX_VALUE;
        static final long REF_CNT_MASK = 4611686016279904256L;
        static final long SHUTDOWN_FLAG = Long.MIN_VALUE;
        static final long SHUTDOWN_NOW_FLAG = 0x4000000000000000L;

        SequentialThreadPerTaskExecutor(BoundedServices parent, boolean markPicked) {
            super(1);
            this.parent = parent;
            this.tasksQueue = Queues.unboundedMultiproducer().get();
            this.queueCapacity = parent.maxTasksQueuedPerThread;
            this.factory = parent.factory;
            this.scheduledTasksExecutor = parent.sharedDelayedTasksScheduler;
            if (markPicked) {
                WIP_AND_REF_CNT.set(this, 0x80000000L);
            }
        }

        void incrementTasksCount() {
            long nextSize;
            long size;
            do {
                if (SequentialThreadPerTaskExecutor.canNotAcceptTasks(size = this.size)) {
                    throw Exceptions.failWithRejected();
                }
                nextSize = size + 1L;
                if (this.queueCapacity == Integer.MAX_VALUE || nextSize <= (long)this.queueCapacity) continue;
                throw Exceptions.failWithRejected("Task capacity of bounded elastic scheduler reached while scheduling a new tasks (" + nextSize + "/" + this.queueCapacity + ")");
            } while (!SIZE.compareAndSet(this, size, nextSize));
        }

        void decrementTasksCount() {
            long actualState = SIZE.decrementAndGet(this);
            if (SequentialThreadPerTaskExecutor.canNotAcceptTasks(actualState) && SequentialThreadPerTaskExecutor.tasksCount(actualState) == 0L) {
                this.trySchedule();
            }
        }

        void stopAcceptingTasks() {
            long size;
            do {
                if (!SequentialThreadPerTaskExecutor.canNotAcceptTasks(size = this.size)) continue;
                return;
            } while (!SIZE.weakCompareAndSet(this, size, size | Long.MIN_VALUE));
        }

        long numberOfEnqueuedTasks() {
            return SequentialThreadPerTaskExecutor.tasksCount(this.size);
        }

        int numberOfAvailableSlots() {
            if (this.queueCapacity == Integer.MAX_VALUE) {
                return Integer.MAX_VALUE;
            }
            return this.queueCapacity - (int)this.numberOfEnqueuedTasks();
        }

        boolean retain() {
            long previousState = SequentialThreadPerTaskExecutor.retain(this);
            return !SequentialThreadPerTaskExecutor.isShutdown(previousState);
        }

        void release() {
            long previousState = SequentialThreadPerTaskExecutor.release(this);
            if (SequentialThreadPerTaskExecutor.isShutdown(previousState)) {
                return;
            }
            if (SequentialThreadPerTaskExecutor.refCnt(previousState) == 1) {
                this.stopAcceptingTasks();
                this.parent.remove(this);
                if (!SequentialThreadPerTaskExecutor.hasWork(previousState)) {
                    this.clearAllTask();
                    this.countDown();
                }
            }
        }

        int refCnt() {
            return SequentialThreadPerTaskExecutor.refCnt(this.wipAndRefCnt);
        }

        void shutdown(boolean now) {
            long previousState = SequentialThreadPerTaskExecutor.markShutdown(this, now);
            if (SequentialThreadPerTaskExecutor.isShutdown(previousState)) {
                return;
            }
            this.stopAcceptingTasks();
            if (SequentialThreadPerTaskExecutor.hasWork(previousState)) {
                return;
            }
            this.drain();
        }

        @Override
        public void dispose() {
            this.release();
        }

        Disposable schedule(Runnable task, @Nullable Disposable.Composite disposables) {
            this.incrementTasksCount();
            Runnable decoratedTask = Schedulers.onSchedule(task);
            boolean isDirect = disposables == null;
            SchedulerTask disposable = new SchedulerTask(this, decoratedTask, -1L, TimeUnit.NANOSECONDS, disposables);
            if (!isDirect && !disposables.add(disposable)) {
                throw Exceptions.failWithRejected();
            }
            this.tasksQueue.offer(disposable);
            this.trySchedule();
            return disposable;
        }

        Disposable schedule(Runnable task, long delay, TimeUnit unit, @Nullable Disposable.Composite disposables) {
            Objects.requireNonNull(unit, "TimeUnit should be non-null");
            this.incrementTasksCount();
            Runnable decoratedTask = Schedulers.onSchedule(task);
            boolean isDirect = disposables == null;
            SchedulerTask disposable = new SchedulerTask(this, decoratedTask, -1L, unit, disposables);
            if (!isDirect && !disposables.add(disposable)) {
                throw Exceptions.failWithRejected();
            }
            if (delay <= 0L) {
                this.tasksQueue.offer(disposable);
                this.trySchedule();
            } else {
                disposable.schedule(delay, unit);
            }
            return disposable;
        }

        Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit, @Nullable Disposable.Composite disposables) {
            Objects.requireNonNull(unit, "TimeUnit should be non-null");
            this.incrementTasksCount();
            Runnable decoratedTask = Schedulers.onSchedule(task);
            boolean isDirect = disposables == null;
            SchedulerTask disposable = new SchedulerTask(this, decoratedTask, period < 0L ? 0L : period, unit, disposables);
            if (!isDirect && !disposables.add(disposable)) {
                throw Exceptions.failWithRejected();
            }
            if (period <= 0L) {
                if (initialDelay <= 0L) {
                    this.tasksQueue.offer(disposable);
                    this.trySchedule();
                } else {
                    disposable.schedule(initialDelay, unit);
                }
            } else {
                disposable.scheduleAtFixedRate(initialDelay, period, unit);
            }
            return disposable;
        }

        void trySchedule() {
            long previousState = SequentialThreadPerTaskExecutor.addWork(this);
            if (SequentialThreadPerTaskExecutor.hasWork(previousState) || SequentialThreadPerTaskExecutor.isShutdownNow(previousState)) {
                return;
            }
            this.drain();
        }

        void drain() {
            Queue<SchedulerTask> q = this.tasksQueue;
            long state = this.wipAndRefCnt;
            while (true) {
                if (SequentialThreadPerTaskExecutor.isShutdownNow(this.wipAndRefCnt)) {
                    this.clearAllTask();
                    this.countDown();
                    return;
                }
                SchedulerTask task = q.poll();
                if (task != null) {
                    this.activeTask = task;
                    if (!task.start()) continue;
                    return;
                }
                state = SequentialThreadPerTaskExecutor.markWorkDone(this, state);
                if (SequentialThreadPerTaskExecutor.isShutdown(state) && this.numberOfEnqueuedTasks() == 0L) {
                    this.countDown();
                    return;
                }
                if (!SequentialThreadPerTaskExecutor.hasWork(state)) break;
            }
        }

        void clearAllTask() {
            SchedulerTask d;
            SchedulerTask activeTask = this.activeTask;
            if (activeTask != null) {
                activeTask.dispose();
            }
            Queue<SchedulerTask> q = this.tasksQueue;
            while ((d = q.poll()) != null) {
                d.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return SequentialThreadPerTaskExecutor.isShutdown(this.wipAndRefCnt) && this.numberOfEnqueuedTasks() == 0L;
        }

        boolean isShutdown() {
            return SequentialThreadPerTaskExecutor.isShutdown(this.wipAndRefCnt);
        }

        @Override
        public Object scanUnsafe(Scannable.Attr key) {
            if (Scannable.Attr.TERMINATED == key) {
                return this.isDisposed();
            }
            if (Scannable.Attr.BUFFERED == key) {
                return this.numberOfEnqueuedTasks();
            }
            if (Scannable.Attr.CAPACITY == key) {
                return this.queueCapacity;
            }
            return null;
        }

        @Override
        public String toString() {
            return "SingleThreadExecutor@" + System.identityHashCode(this) + "{ backing=" + SequentialThreadPerTaskExecutor.refCnt(this.wipAndRefCnt) + '}';
        }

        static boolean canNotAcceptTasks(long state) {
            return (state & Long.MIN_VALUE) == Long.MIN_VALUE;
        }

        static long tasksCount(long state) {
            return state & Long.MAX_VALUE;
        }

        static int refCnt(long state) {
            return (int)((state & 0x3FFFFFFF80000000L) >> 31);
        }

        static long retain(SequentialThreadPerTaskExecutor instance) {
            long nextState;
            long state;
            do {
                if (!SequentialThreadPerTaskExecutor.isShutdown(state = instance.wipAndRefCnt)) continue;
                return state;
            } while (!WIP_AND_REF_CNT.weakCompareAndSet(instance, state, nextState = SequentialThreadPerTaskExecutor.incrementRefCnt(state)));
            return state;
        }

        static long incrementRefCnt(long state) {
            long rawRefCnt = state & 0x3FFFFFFF80000000L;
            return rawRefCnt == 4611686016279904256L ? state : (rawRefCnt >> 31) + 1L << 31 | state & 0xC00000007FFFFFFFL;
        }

        static long release(SequentialThreadPerTaskExecutor instance) {
            long nextState;
            long state;
            do {
                if (SequentialThreadPerTaskExecutor.isShutdown(state = instance.wipAndRefCnt)) {
                    return state;
                }
                long refCnt = (state & 0x3FFFFFFF80000000L) >> 31;
                long nextRefCnt = refCnt - 1L;
                if (nextRefCnt == 0L) {
                    nextState = SequentialThreadPerTaskExecutor.incrementWork(state & 0xC00000007FFFFFFFL | 0x4000000000000000L | Long.MIN_VALUE);
                    continue;
                }
                long l = nextState = refCnt == 0L ? state : nextRefCnt << 31 | state & 0xC00000007FFFFFFFL;
            } while (!WIP_AND_REF_CNT.weakCompareAndSetPlain(instance, state, nextState));
            return state;
        }

        static long markShutdown(SequentialThreadPerTaskExecutor instance, boolean now) {
            long state;
            do {
                if (!SequentialThreadPerTaskExecutor.isShutdownNow(state = instance.wipAndRefCnt) && (now || !SequentialThreadPerTaskExecutor.isShutdown(state))) continue;
                return state;
            } while (!WIP_AND_REF_CNT.weakCompareAndSetPlain(instance, state, state | Long.MIN_VALUE | (now ? 0x4000000000000000L : 0L)));
            return state;
        }

        static boolean isShutdown(long state) {
            return (state & Long.MIN_VALUE) == Long.MIN_VALUE;
        }

        static boolean isShutdownNow(long state) {
            return (state & 0x4000000000000000L) == 0x4000000000000000L;
        }

        static boolean hasWork(long state) {
            return (state & Integer.MAX_VALUE) > 0L;
        }

        static long markWorkDone(SequentialThreadPerTaskExecutor instance, long expectedState) {
            long nextState;
            long currentState;
            do {
                if (expectedState == (currentState = instance.wipAndRefCnt)) continue;
                return currentState;
            } while (!WIP_AND_REF_CNT.weakCompareAndSetPlain(instance, currentState, nextState = currentState & Integer.MIN_VALUE));
            return nextState;
        }

        static long addWork(SequentialThreadPerTaskExecutor instance) {
            long nextState;
            long state;
            while (!WIP_AND_REF_CNT.weakCompareAndSetPlain(instance, state = instance.wipAndRefCnt, nextState = SequentialThreadPerTaskExecutor.incrementWork(state))) {
            }
            return state;
        }

        static long incrementWork(long state) {
            return ((state & Integer.MAX_VALUE) == Integer.MAX_VALUE ? state & Integer.MIN_VALUE : state) + 1L;
        }

        static {
            try {
                WIP_AND_REF_CNT = MethodHandles.lookup().findVarHandle(SequentialThreadPerTaskExecutor.class, "wipAndRefCnt", Long.TYPE);
            }
            catch (IllegalAccessException | NoSuchFieldException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class SingleThreadExecutorWorker
    implements Scheduler.Worker,
    Disposable,
    Scannable {
        final Disposable.Composite disposables;
        final SequentialThreadPerTaskExecutor executor;

        SingleThreadExecutorWorker(SequentialThreadPerTaskExecutor executor) {
            this.executor = executor;
            this.disposables = Disposables.composite();
        }

        @Override
        public void dispose() {
            this.disposables.dispose();
            this.executor.release();
        }

        @Override
        public boolean isDisposed() {
            return this.disposables.isDisposed();
        }

        @Override
        public Disposable schedule(Runnable task) {
            return this.executor.schedule(task, this.disposables);
        }

        @Override
        public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
            return this.executor.schedule(task, delay, unit, this.disposables);
        }

        @Override
        public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
            return this.executor.schedulePeriodically(task, initialDelay, period, unit, this.disposables);
        }

        @Override
        public Object scanUnsafe(Scannable.Attr key) {
            if (key == Scannable.Attr.BUFFERED) {
                return this.disposables.size();
            }
            if (key == Scannable.Attr.TERMINATED || key == Scannable.Attr.CANCELLED) {
                return this.isDisposed();
            }
            if (key == Scannable.Attr.NAME) {
                return "SingleThreadExecutorWorker";
            }
            return this.executor.scanUnsafe(key);
        }
    }

    static final class SchedulerTask
    extends AtomicInteger
    implements Disposable,
    Callable<Void>,
    Runnable {
        static final int INITIAL_STATE = 0;
        static final int SCHEDULED_STATE = 1;
        static final int STARTING_STATE = 2;
        static final int RUNNING_STATE = 4;
        static final int COMPLETED_STATE = 8;
        static final int DISPOSED_FLAG = Integer.MIN_VALUE;
        static final int HAS_FUTURE_FLAG = 0x8000000;
        final long fixedRatePeriod;
        final TimeUnit timeUnit;
        final SequentialThreadPerTaskExecutor holder;
        final Runnable task;
        @Nullable
        final Disposable.Composite tracker;
        Thread carrier;
        Future<?> scheduledFuture;

        SchedulerTask(SequentialThreadPerTaskExecutor holder, Runnable task, long fixedRatePeriod, TimeUnit timeUnit, @Nullable Disposable.Composite tracker) {
            this.fixedRatePeriod = fixedRatePeriod;
            this.timeUnit = timeUnit;
            this.holder = holder;
            this.task = task;
            this.tracker = tracker;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int previousState = SchedulerTask.markRunning(this);
            if (SchedulerTask.isDisposed(previousState)) {
                this.holder.drain();
                return;
            }
            try {
                this.task.run();
            }
            catch (Throwable ex) {
                boolean handled = false;
                try {
                    Schedulers.handleError(ex);
                    handled = true;
                }
                finally {
                    if (!handled) {
                        if (this.isPeriodic()) {
                            this.holder.decrementTasksCount();
                        }
                        if (this.tracker == null) {
                            this.holder.release();
                        } else {
                            this.tracker.remove(this);
                        }
                        this.holder.drain();
                    }
                }
            }
            if (this.isPeriodic()) {
                boolean isInstant = this.fixedRatePeriod == 0L;
                previousState = isInstant ? SchedulerTask.markInitial(this) : SchedulerTask.markRescheduled(this);
                boolean isDisposed = SchedulerTask.isDisposed(previousState);
                boolean isShutdown = this.holder.isShutdown();
                if (isInstant && !isDisposed && !isShutdown) {
                    this.holder.tasksQueue.offer(this);
                }
                if (isDisposed || isShutdown) {
                    this.holder.decrementTasksCount();
                    if (this.tracker == null) {
                        this.holder.release();
                    }
                }
                this.holder.drain();
                return;
            }
            SchedulerTask.markCompleted(this);
            if (this.tracker == null) {
                this.holder.release();
            } else {
                this.tracker.remove(this);
            }
            this.holder.drain();
        }

        @Override
        public Void call() {
            if (this.isDisposed()) {
                return null;
            }
            SequentialThreadPerTaskExecutor parent = this.holder;
            parent.tasksQueue.offer(this);
            parent.trySchedule();
            return null;
        }

        @Override
        public void dispose() {
            boolean isDirect;
            int previousState = SchedulerTask.markDisposed(this);
            if (SchedulerTask.isDisposed(previousState) || SchedulerTask.isCompleted(previousState)) {
                return;
            }
            boolean bl = isDirect = this.tracker == null;
            if (!isDirect) {
                this.tracker.remove(this);
            }
            if (SchedulerTask.isScheduled(previousState)) {
                this.scheduledFuture.cancel(true);
                this.holder.decrementTasksCount();
                if (isDirect) {
                    this.holder.release();
                }
                return;
            }
            if (SchedulerTask.isRunning(previousState)) {
                if (SchedulerTask.hasFuture(previousState)) {
                    this.scheduledFuture.cancel(true);
                }
                this.carrier.interrupt();
                return;
            }
            if (SchedulerTask.isInitialState(previousState)) {
                this.holder.decrementTasksCount();
            } else if (this.isPeriodic()) {
                if (SchedulerTask.hasFuture(previousState)) {
                    this.scheduledFuture.cancel(true);
                }
                this.holder.decrementTasksCount();
            }
            if (isDirect) {
                this.holder.release();
            }
        }

        @Override
        public boolean isDisposed() {
            int state = this.get();
            return SchedulerTask.isDisposed(state) || SchedulerTask.isCompleted(state);
        }

        boolean start() {
            Thread carrier;
            int previousState = SchedulerTask.markStarting(this);
            if (SchedulerTask.isDisposed(previousState)) {
                return false;
            }
            this.carrier = carrier = this.holder.factory.newThread(this);
            if (!this.isPeriodic()) {
                this.holder.decrementTasksCount();
            }
            carrier.start();
            return true;
        }

        boolean isPeriodic() {
            return this.fixedRatePeriod >= 0L;
        }

        void schedule(long delay, TimeUnit unit) {
            ScheduledFuture<Void> future = this.holder.scheduledTasksExecutor.schedule(this, delay, unit);
            this.scheduledFuture = future;
            int previousState = SchedulerTask.markScheduled(this);
            if (SchedulerTask.isDisposed(previousState)) {
                future.cancel(true);
            }
        }

        void scheduleAtFixedRate(long initialDelay, long delay, TimeUnit unit) {
            ScheduledFuture<?> future = this.holder.scheduledTasksExecutor.scheduleAtFixedRate(this::call, initialDelay, delay, unit);
            this.scheduledFuture = future;
            int previousState = SchedulerTask.markScheduled(this);
            if (SchedulerTask.isDisposed(previousState)) {
                future.cancel(true);
            }
        }

        static boolean isInitialState(int state) {
            return state == 0;
        }

        static boolean isStarting(int state) {
            return (state & 2) == 2;
        }

        static boolean isRunning(int state) {
            return (state & 4) == 4;
        }

        static boolean isCompleted(int state) {
            return (state & 8) == 8;
        }

        static boolean isScheduled(int state) {
            return (state & 1) == 1;
        }

        static boolean isDisposed(int state) {
            return (state & Integer.MIN_VALUE) == Integer.MIN_VALUE;
        }

        static boolean hasFuture(int state) {
            return (state & 0x8000000) == 0x8000000;
        }

        static int markInitial(SchedulerTask disposable) {
            int state;
            do {
                if (!SchedulerTask.isDisposed(state = disposable.get())) continue;
                return state;
            } while (!disposable.weakCompareAndSetPlain(state, 0));
            return state;
        }

        static int markStarting(SchedulerTask disposable) {
            int state;
            do {
                if (!SchedulerTask.isDisposed(state = disposable.get())) continue;
                return state;
            } while (!disposable.weakCompareAndSetPlain(state, state & 0x8000000 | 2));
            return state;
        }

        static int markRunning(SchedulerTask disposable) {
            int state;
            do {
                if (!SchedulerTask.isDisposed(state = disposable.get())) continue;
                return state;
            } while (!disposable.weakCompareAndSetPlain(state, state & 0x8000000 | 4));
            return state;
        }

        static int markScheduled(SchedulerTask disposable) {
            int state;
            do {
                if (!SchedulerTask.isDisposed(state = disposable.get())) continue;
                return state;
            } while (!disposable.weakCompareAndSetRelease(state, !SchedulerTask.isInitialState(state) ? 0x8000000 : 0x8000001));
            return state;
        }

        static int markRescheduled(SchedulerTask disposable) {
            int state;
            do {
                if (!SchedulerTask.isDisposed(state = disposable.get())) continue;
                return state;
            } while (!disposable.weakCompareAndSetRelease(state, 0x8000001));
            return state;
        }

        static int markDisposed(SchedulerTask disposable) {
            int state;
            do {
                if (!SchedulerTask.isDisposed(state = disposable.get()) && !SchedulerTask.isCompleted(state)) continue;
                return state;
            } while (!disposable.weakCompareAndSetAcquire(state, state | Integer.MIN_VALUE));
            return state;
        }

        static void markCompleted(SchedulerTask disposable) {
            int state = disposable.get();
            if (SchedulerTask.isDisposed(state)) {
                return;
            }
            disposable.weakCompareAndSetPlain(state, 8);
        }

        @Override
        public String toString() {
            return (this.isPeriodic() ? (this.fixedRatePeriod == 0L ? "InstantPeriodic" : "Periodic") : "") + "SchedulerTask(" + this.hashCode() + "){carrier=" + this.carrier + ", scheduledFuture=" + this.scheduledFuture + ", state= " + Integer.toBinaryString(this.get()) + '}';
        }
    }
}

