/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.agent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class WorkerManager {
    private static final Logger log = LoggerFactory.getLogger(WorkerManager.class);
    private final Platform platform;
    private final String nodeName;
    private final Scheduler scheduler;
    private final Time time;
    private final Map<Long, Worker> workers;
    private final ScheduledExecutorService stateChangeExecutor;
    private final ExecutorService workerCleanupExecutor;
    private final ScheduledExecutorService shutdownExecutor;
    private final ShutdownManager shutdownManager = new ShutdownManager();

    WorkerManager(Platform platform, Scheduler scheduler) {
        this.platform = platform;
        this.nodeName = platform.curNode().name();
        this.scheduler = scheduler;
        this.time = scheduler.time();
        this.workers = new HashMap<Long, Worker>();
        this.stateChangeExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory((String)"WorkerManagerStateThread", (boolean)false));
        this.workerCleanupExecutor = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory((String)"WorkerCleanupThread%d", (boolean)false));
        this.shutdownExecutor = Executors.newScheduledThreadPool(0, ThreadUtils.createThreadFactory((String)"WorkerManagerShutdownThread%d", (boolean)false));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public KafkaFuture<String> createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
        try (ShutdownManager.Reference ref = this.shutdownManager.takeReference();){
            Worker worker = this.stateChangeExecutor.submit(new CreateWorker(workerId, taskId, spec, this.time.milliseconds())).get();
            if (worker.doneFuture != null) {
                log.info("{}: Ignoring request to create worker {}, because there is already a worker with that id.", (Object)this.nodeName, (Object)workerId);
                KafkaFutureImpl kafkaFutureImpl = worker.doneFuture;
                return kafkaFutureImpl;
            }
            worker.doneFuture = new KafkaFutureImpl();
            if (worker.spec.endMs() <= this.time.milliseconds()) {
                log.info("{}: Will not run worker {} as it has expired.", (Object)this.nodeName, (Object)worker);
                this.stateChangeExecutor.submit(new HandleWorkerHalting(worker, "worker expired", true));
                KafkaFutureImpl kafkaFutureImpl = worker.doneFuture;
                return kafkaFutureImpl;
            }
            KafkaFutureImpl haltFuture = new KafkaFutureImpl();
            haltFuture.thenApply(errorString -> {
                if (errorString == null) {
                    errorString = "";
                }
                if (errorString.isEmpty()) {
                    log.info("{}: Worker {} is halting.", (Object)this.nodeName, (Object)worker);
                } else {
                    log.info("{}: Worker {} is halting with error {}", new Object[]{this.nodeName, worker, errorString});
                }
                this.stateChangeExecutor.submit(new HandleWorkerHalting(worker, (String)errorString, false));
                return null;
            });
            try {
                worker.taskWorker.start(this.platform, worker.status, (KafkaFutureImpl<String>)haltFuture);
            }
            catch (Exception e) {
                log.info("{}: Worker {} start() exception", new Object[]{this.nodeName, worker, e});
                this.stateChangeExecutor.submit(new HandleWorkerHalting(worker, "worker.start() exception: " + Utils.stackTrace((Throwable)e), true));
            }
            this.stateChangeExecutor.submit(new FinishCreatingWorker(worker));
            KafkaFutureImpl kafkaFutureImpl = worker.doneFuture;
            return kafkaFutureImpl;
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof RequestConflictException) {
                log.info("{}: request conflict while creating worker {} for task {} with spec {}.", new Object[]{this.nodeName, workerId, taskId, spec});
                throw e.getCause();
            }
            log.info("{}: Error creating worker {} for task {} with spec {}", new Object[]{this.nodeName, workerId, taskId, spec, e});
            throw e.getCause();
        }
    }

    public void stopWorker(long workerId, boolean mustDestroy) throws Throwable {
        try (ShutdownManager.Reference ref = this.shutdownManager.takeReference();){
            this.stateChangeExecutor.submit(new StopWorker(workerId, mustDestroy)).get();
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    public TreeMap<Long, WorkerState> workerStates() throws Exception {
        try (ShutdownManager.Reference ref = this.shutdownManager.takeReference();){
            TreeMap<Long, WorkerState> treeMap = this.stateChangeExecutor.submit(new GetWorkerStates()).get();
            return treeMap;
        }
    }

    public void beginShutdown() throws Exception {
        if (this.shutdownManager.shutdown()) {
            this.shutdownExecutor.submit(new Shutdown());
        }
    }

    public void waitForShutdown() throws Exception {
        while (!this.shutdownExecutor.isShutdown()) {
            this.shutdownExecutor.awaitTermination(1L, TimeUnit.DAYS);
        }
    }

    static class ShutdownManager {
        private boolean shutdown = false;
        private long refCount = 0L;

        ShutdownManager() {
        }

        synchronized Reference takeReference() {
            if (this.shutdown) {
                throw new KafkaException("WorkerManager is shut down.");
            }
            ++this.refCount;
            return new Reference();
        }

        synchronized boolean shutdown() {
            if (this.shutdown) {
                return false;
            }
            this.shutdown = true;
            if (this.refCount == 0L) {
                this.notifyAll();
            }
            return true;
        }

        synchronized void waitForQuiescence() throws InterruptedException {
            while (!this.shutdown || this.refCount > 0L) {
                this.wait();
            }
        }

        class Reference
        implements AutoCloseable {
            AtomicBoolean closed = new AtomicBoolean(false);

            Reference() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void close() {
                if (this.closed.compareAndSet(false, true)) {
                    ShutdownManager shutdownManager = ShutdownManager.this;
                    synchronized (shutdownManager) {
                        ShutdownManager.this.refCount--;
                        if (ShutdownManager.this.shutdown && ShutdownManager.this.refCount == 0L) {
                            ShutdownManager.this.notifyAll();
                        }
                    }
                }
            }
        }
    }

    class CreateWorker
    implements Callable<Worker> {
        private final long workerId;
        private final String taskId;
        private final TaskSpec spec;
        private final long now;

        CreateWorker(long workerId, String taskId, TaskSpec spec, long now) {
            this.workerId = workerId;
            this.taskId = taskId;
            this.spec = spec;
            this.now = now;
        }

        @Override
        public Worker call() throws Exception {
            try {
                Worker worker = (Worker)WorkerManager.this.workers.get(this.workerId);
                if (worker != null) {
                    if (!worker.taskId().equals(this.taskId)) {
                        throw new RequestConflictException("There is already a worker ID " + this.workerId + " with a different task ID.");
                    }
                    if (!worker.spec().equals(this.spec)) {
                        throw new RequestConflictException("There is already a worker ID " + this.workerId + " with a different task spec.");
                    }
                    return worker;
                }
                worker = new Worker(this.workerId, this.taskId, this.spec, this.now);
                WorkerManager.this.workers.put(this.workerId, worker);
                log.info("{}: Created worker {} with spec {}", new Object[]{WorkerManager.this.nodeName, worker, this.spec});
                return worker;
            }
            catch (Exception e) {
                log.info("{}: unable to create worker {} for task {}, with spec {}", new Object[]{WorkerManager.this.nodeName, this.workerId, this.taskId, this.spec, e});
                throw e;
            }
        }
    }

    class Worker {
        private final long workerId;
        private final String taskId;
        private final TaskSpec spec;
        private final TaskWorker taskWorker;
        private final AgentWorkerStatusTracker status = new AgentWorkerStatusTracker();
        private final long startedMs;
        private State state = State.STARTING;
        private long doneMs = -1L;
        private String error = "";
        private Future<Void> timeoutFuture = null;
        private KafkaFutureImpl<String> doneFuture = null;
        private ShutdownManager.Reference reference;
        private boolean mustDestroy = false;

        Worker(long workerId, String taskId, TaskSpec spec, long now) {
            this.workerId = workerId;
            this.taskId = taskId;
            this.spec = spec;
            this.taskWorker = spec.newTaskWorker(taskId);
            this.startedMs = now;
            this.reference = WorkerManager.this.shutdownManager.takeReference();
        }

        long workerId() {
            return this.workerId;
        }

        String taskId() {
            return this.taskId;
        }

        TaskSpec spec() {
            return this.spec;
        }

        WorkerState state() {
            switch (this.state) {
                case STARTING: {
                    return new WorkerStarting(this.taskId, this.spec);
                }
                case RUNNING: {
                    return new WorkerRunning(this.taskId, this.spec, this.startedMs, this.status.get());
                }
                case CANCELLING: 
                case STOPPING: {
                    return new WorkerStopping(this.taskId, this.spec, this.startedMs, this.status.get());
                }
                case DONE: {
                    return new WorkerDone(this.taskId, this.spec, this.startedMs, this.doneMs, this.status.get(), this.error);
                }
            }
            throw new RuntimeException("unreachable");
        }

        void transitionToRunning() {
            this.state = State.RUNNING;
            this.timeoutFuture = WorkerManager.this.scheduler.schedule(WorkerManager.this.stateChangeExecutor, (Callable)new StopWorker(this.workerId, false), Math.max(0L, this.spec.endMs() - WorkerManager.this.time.milliseconds()));
        }

        Future<Void> transitionToStopping() {
            this.state = State.STOPPING;
            if (this.timeoutFuture != null) {
                this.timeoutFuture.cancel(false);
                this.timeoutFuture = null;
            }
            return WorkerManager.this.workerCleanupExecutor.submit(new HaltWorker(this));
        }

        void transitionToDone() {
            this.state = State.DONE;
            this.doneMs = WorkerManager.this.time.milliseconds();
            if (this.reference != null) {
                this.reference.close();
                this.reference = null;
            }
            this.doneFuture.complete((Object)this.error);
        }

        public String toString() {
            return String.format("%s_%d", this.taskId, this.workerId);
        }
    }

    class HandleWorkerHalting
    implements Callable<Void> {
        private final Worker worker;
        private final String failure;
        private final boolean startupHalt;

        HandleWorkerHalting(Worker worker, String failure, boolean startupHalt) {
            this.worker = worker;
            this.failure = failure;
            this.startupHalt = startupHalt;
        }

        @Override
        public Void call() throws Exception {
            if (this.worker.error.isEmpty()) {
                this.worker.error = this.failure;
            }
            String verb = this.worker.error.isEmpty() ? "halting" : "halting with error [" + this.worker.error + "]";
            switch (this.worker.state) {
                case STARTING: {
                    if (this.startupHalt) {
                        log.info("{}: Worker {} {} during startup.  Transitioning to DONE.", new Object[]{WorkerManager.this.nodeName, this.worker, verb});
                        this.worker.transitionToDone();
                        break;
                    }
                    log.info("{}: Worker {} {} during startup.  Transitioning to CANCELLING.", new Object[]{WorkerManager.this.nodeName, this.worker, verb});
                    this.worker.state = State.CANCELLING;
                    break;
                }
                case CANCELLING: {
                    log.info("{}: Cancelling worker {} {}.  ", new Object[]{WorkerManager.this.nodeName, this.worker, verb});
                    break;
                }
                case RUNNING: {
                    log.info("{}: Running worker {} {}.  Transitioning to STOPPING.", new Object[]{WorkerManager.this.nodeName, this.worker, verb});
                    this.worker.transitionToStopping();
                    break;
                }
                case STOPPING: {
                    log.info("{}: Stopping worker {} {}.", new Object[]{WorkerManager.this.nodeName, this.worker, verb});
                    break;
                }
                case DONE: {
                    log.info("{}: Can't halt worker {} because it is already DONE.", (Object)WorkerManager.this.nodeName, (Object)this.worker);
                }
            }
            return null;
        }
    }

    class FinishCreatingWorker
    implements Callable<Void> {
        private final Worker worker;

        FinishCreatingWorker(Worker worker) {
            this.worker = worker;
        }

        @Override
        public Void call() throws Exception {
            switch (this.worker.state) {
                case CANCELLING: {
                    log.info("{}: Worker {} was cancelled while it was starting up.  Transitioning to STOPPING.", (Object)WorkerManager.this.nodeName, (Object)this.worker);
                    this.worker.transitionToStopping();
                    break;
                }
                case STARTING: {
                    log.info("{}: Worker {} is now RUNNING.  Scheduled to stop in {} ms.", new Object[]{WorkerManager.this.nodeName, this.worker, this.worker.spec.durationMs()});
                    this.worker.transitionToRunning();
                    break;
                }
            }
            return null;
        }
    }

    class StopWorker
    implements Callable<Void> {
        private final long workerId;
        private final boolean mustDestroy;

        StopWorker(long workerId, boolean mustDestroy) {
            this.workerId = workerId;
            this.mustDestroy = mustDestroy;
        }

        @Override
        public Void call() throws Exception {
            Worker worker = (Worker)WorkerManager.this.workers.get(this.workerId);
            if (worker == null) {
                log.info("{}: Can't stop worker {} because there is no worker with that ID.", (Object)WorkerManager.this.nodeName, (Object)this.workerId);
                return null;
            }
            if (this.mustDestroy) {
                worker.mustDestroy = true;
            }
            switch (worker.state) {
                case STARTING: {
                    log.info("{}: Cancelling worker {} during its startup process.", (Object)WorkerManager.this.nodeName, (Object)worker);
                    worker.state = State.CANCELLING;
                    break;
                }
                case CANCELLING: {
                    log.info("{}: Can't stop worker {}, because it is already being cancelled.", (Object)WorkerManager.this.nodeName, (Object)worker);
                    break;
                }
                case RUNNING: {
                    log.info("{}: Stopping running worker {}.", (Object)WorkerManager.this.nodeName, (Object)worker);
                    worker.transitionToStopping();
                    break;
                }
                case STOPPING: {
                    log.info("{}: Can't stop worker {}, because it is already stopping.", (Object)WorkerManager.this.nodeName, (Object)worker);
                    break;
                }
                case DONE: {
                    if (worker.mustDestroy) {
                        log.info("{}: destroying worker {} with error {}", new Object[]{WorkerManager.this.nodeName, worker, worker.error});
                        WorkerManager.this.workers.remove(worker.workerId);
                        break;
                    }
                    log.debug("{}: Can't stop worker {}, because it is already done.", (Object)WorkerManager.this.nodeName, (Object)worker);
                }
            }
            return null;
        }
    }

    class GetWorkerStates
    implements Callable<TreeMap<Long, WorkerState>> {
        GetWorkerStates() {
        }

        @Override
        public TreeMap<Long, WorkerState> call() throws Exception {
            TreeMap<Long, WorkerState> workerMap = new TreeMap<Long, WorkerState>();
            for (Worker worker : WorkerManager.this.workers.values()) {
                workerMap.put(worker.workerId(), worker.state());
            }
            return workerMap;
        }
    }

    class Shutdown
    implements Callable<Void> {
        Shutdown() {
        }

        @Override
        public Void call() throws Exception {
            log.info("{}: Shutting down WorkerManager.", (Object)WorkerManager.this.nodeName);
            try {
                WorkerManager.this.stateChangeExecutor.submit(new DestroyAllWorkers()).get();
                log.info("{}: Waiting for shutdownManager quiescence...", (Object)WorkerManager.this.nodeName);
                WorkerManager.this.shutdownManager.waitForQuiescence();
                WorkerManager.this.workerCleanupExecutor.shutdownNow();
                WorkerManager.this.stateChangeExecutor.shutdownNow();
                log.info("{}: Waiting for workerCleanupExecutor to terminate...", (Object)WorkerManager.this.nodeName);
                WorkerManager.this.workerCleanupExecutor.awaitTermination(1L, TimeUnit.DAYS);
                log.info("{}: Waiting for stateChangeExecutor to terminate...", (Object)WorkerManager.this.nodeName);
                WorkerManager.this.stateChangeExecutor.awaitTermination(1L, TimeUnit.DAYS);
                log.info("{}: Shutting down shutdownExecutor.", (Object)WorkerManager.this.nodeName);
                WorkerManager.this.shutdownExecutor.shutdown();
            }
            catch (Exception e) {
                log.info("{}: Caught exception while shutting down WorkerManager", (Object)WorkerManager.this.nodeName, (Object)e);
                throw e;
            }
            return null;
        }
    }

    class DestroyAllWorkers
    implements Callable<Void> {
        DestroyAllWorkers() {
        }

        @Override
        public Void call() throws Exception {
            log.info("{}: Destroying all workers.", (Object)WorkerManager.this.nodeName);
            ArrayList workerIds = new ArrayList(WorkerManager.this.workers.keySet());
            Iterator iterator = workerIds.iterator();
            while (iterator.hasNext()) {
                long workerId = (Long)iterator.next();
                try {
                    new StopWorker(workerId, true).call();
                }
                catch (Exception e) {
                    log.error("Failed to stop worker {}", (Object)workerId, (Object)e);
                }
            }
            return null;
        }
    }

    class HaltWorker
    implements Callable<Void> {
        private final Worker worker;

        HaltWorker(Worker worker) {
            this.worker = worker;
        }

        @Override
        public Void call() throws Exception {
            String failure = "";
            try {
                this.worker.taskWorker.stop(WorkerManager.this.platform);
            }
            catch (Exception exception) {
                log.error("{}: worker.stop() exception", (Object)WorkerManager.this.nodeName, (Object)exception);
                failure = exception.getMessage();
            }
            WorkerManager.this.stateChangeExecutor.submit(new CompleteWorker(this.worker, failure));
            return null;
        }
    }

    class CompleteWorker
    implements Callable<Void> {
        private final Worker worker;
        private final String failure;

        CompleteWorker(Worker worker, String failure) {
            this.worker = worker;
            this.failure = failure;
        }

        @Override
        public Void call() throws Exception {
            if (this.worker.error.isEmpty() && !this.failure.isEmpty()) {
                this.worker.error = this.failure;
            }
            this.worker.transitionToDone();
            if (this.worker.mustDestroy) {
                log.info("{}: destroying worker {} with error {}", new Object[]{WorkerManager.this.nodeName, this.worker, this.worker.error});
                WorkerManager.this.workers.remove(this.worker.workerId);
            } else {
                log.info("{}: completed worker {} with error {}", new Object[]{WorkerManager.this.nodeName, this.worker, this.worker.error});
            }
            return null;
        }
    }

    static enum State {
        STARTING,
        CANCELLING,
        RUNNING,
        STOPPING,
        DONE;

    }
}

