/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.queue;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.slf4j.Logger;

public final class KafkaEventQueue
implements EventQueue {
    public static final String EVENT_HANDLER_THREAD_SUFFIX = "event-handler";
    private final Time time;
    private final EventQueue.Event cleanupEvent;
    private final ReentrantLock lock;
    private final Logger log;
    private final EventHandler eventHandler;
    private final Thread eventHandlerThread;
    private boolean shuttingDown;
    private boolean interrupted;

    public KafkaEventQueue(Time time, LogContext logContext, String threadNamePrefix) {
        this(time, logContext, threadNamePrefix, EventQueue.VoidEvent::new);
    }

    public KafkaEventQueue(Time time, LogContext logContext, String threadNamePrefix, EventQueue.Event cleanupEvent) {
        this.time = time;
        this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
        this.lock = new ReentrantLock();
        this.log = logContext.logger(KafkaEventQueue.class);
        this.eventHandler = new EventHandler();
        this.eventHandlerThread = new KafkaThread(threadNamePrefix + EVENT_HANDLER_THREAD_SUFFIX, (Runnable)this.eventHandler, false);
        this.shuttingDown = false;
        this.interrupted = false;
        this.eventHandlerThread.start();
    }

    public Time time() {
        return this.time;
    }

    @Override
    public void enqueue(EventQueue.EventInsertionType insertionType, String tag, Function<OptionalLong, OptionalLong> deadlineNsCalculator, EventQueue.Event event) {
        EventContext eventContext = new EventContext(event, insertionType, tag);
        Exception e = this.eventHandler.enqueue(eventContext, deadlineNsCalculator);
        if (e != null) {
            eventContext.completeWithException(this.log, e);
        }
    }

    @Override
    public void cancelDeferred(String tag) {
        this.eventHandler.cancelDeferred(tag);
    }

    @Override
    public void beginShutdown(String source) {
        this.lock.lock();
        try {
            if (this.shuttingDown) {
                this.log.debug("{}: Event queue is already shutting down.", (Object)source);
                return;
            }
            this.log.info("{}: shutting down event queue.", (Object)source);
            this.shuttingDown = true;
            this.eventHandler.cond.signal();
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public int size() {
        return this.eventHandler.size();
    }

    @Override
    public void wakeup() {
        this.eventHandler.wakeUp();
    }

    @Override
    public void close() throws InterruptedException {
        this.beginShutdown("KafkaEventQueue#close");
        this.eventHandlerThread.join();
        this.log.info("closed event queue.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Optional<EventQueue.Event> firstDeferredIfIdling() {
        this.lock.lock();
        try {
            if (this.eventHandler.head.next != this.eventHandler.head) {
                Optional<EventQueue.Event> optional = Optional.empty();
                return optional;
            }
            Map.Entry entry = this.eventHandler.deadlineMap.firstEntry();
            if (entry == null) {
                Optional<EventQueue.Event> optional = Optional.empty();
                return optional;
            }
            EventContext eventContext = (EventContext)entry.getValue();
            if (eventContext.insertionType != EventQueue.EventInsertionType.DEFERRED) {
                Optional<EventQueue.Event> optional = Optional.empty();
                return optional;
            }
            Optional<EventQueue.Event> optional = Optional.of(eventContext.event);
            return optional;
        }
        finally {
            this.lock.unlock();
        }
    }

    private class EventHandler
    implements Runnable {
        private int size = 0;
        private final Map<String, EventContext> tagToEventContext = new HashMap<String, EventContext>();
        private final EventContext head = new EventContext(null, null, null);
        private final TreeMap<Long, EventContext> deadlineMap = new TreeMap();
        private final Condition cond = KafkaEventQueue.access$000(KafkaEventQueue.this).newCondition();

        private EventHandler() {
        }

        @Override
        public void run() {
            try {
                this.handleEvents();
            }
            catch (Throwable e) {
                KafkaEventQueue.this.log.warn("event handler thread exiting with exception", e);
            }
            try {
                KafkaEventQueue.this.cleanupEvent.run();
            }
            catch (Throwable e) {
                KafkaEventQueue.this.log.warn("cleanup event threw exception", e);
            }
        }

        private void remove(EventContext eventContext) {
            eventContext.remove();
            if (eventContext.deadlineNs.isPresent()) {
                this.deadlineMap.remove(eventContext.deadlineNs.getAsLong());
                eventContext.deadlineNs = OptionalLong.empty();
            }
            if (eventContext.tag != null) {
                this.tagToEventContext.remove(eventContext.tag, eventContext);
                eventContext.tag = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleEvents() {
            Exception toDeliver = null;
            EventContext toRun = null;
            boolean wasInterrupted = false;
            while (true) {
                if (toRun != null) {
                    wasInterrupted = toRun.run(KafkaEventQueue.this.log, toDeliver);
                }
                KafkaEventQueue.this.lock.lock();
                try {
                    if (toRun != null) {
                        --this.size;
                        if (wasInterrupted) {
                            KafkaEventQueue.this.interrupted = wasInterrupted;
                        }
                        toDeliver = null;
                        toRun = null;
                        wasInterrupted = false;
                    }
                    long awaitNs = Long.MAX_VALUE;
                    Map.Entry<Long, EventContext> entry = this.deadlineMap.firstEntry();
                    if (entry != null) {
                        long now = KafkaEventQueue.this.time.nanoseconds();
                        long timeoutNs = entry.getKey();
                        EventContext eventContext = entry.getValue();
                        if (timeoutNs <= now) {
                            if (eventContext.insertionType == EventQueue.EventInsertionType.DEFERRED) {
                                this.remove(eventContext);
                                toDeliver = null;
                                toRun = eventContext;
                                continue;
                            }
                            this.remove(eventContext);
                            toDeliver = new TimeoutException();
                            toRun = eventContext;
                            continue;
                        }
                        if (KafkaEventQueue.this.interrupted) {
                            this.remove(eventContext);
                            toDeliver = new InterruptedException("The event handler thread is interrupted");
                            toRun = eventContext;
                            continue;
                        }
                        if (KafkaEventQueue.this.shuttingDown) {
                            this.remove(eventContext);
                            toDeliver = new RejectedExecutionException("The event queue is shutting down");
                            toRun = eventContext;
                            continue;
                        }
                        awaitNs = timeoutNs - now;
                    }
                    if (this.head.next == this.head) {
                        if (this.deadlineMap.isEmpty() && (KafkaEventQueue.this.shuttingDown || KafkaEventQueue.this.interrupted)) {
                            return;
                        }
                    } else {
                        toDeliver = KafkaEventQueue.this.interrupted ? new InterruptedException("The event handler thread is interrupted") : null;
                        toRun = this.head.next;
                        this.remove(toRun);
                        continue;
                    }
                    if (awaitNs == Long.MAX_VALUE) {
                        try {
                            this.cond.await();
                        }
                        catch (InterruptedException e) {
                            KafkaEventQueue.this.log.warn("Interrupted while waiting for a new event. Shutting down event queue");
                            KafkaEventQueue.this.interrupted = true;
                        }
                        continue;
                    }
                    try {
                        this.cond.awaitNanos(awaitNs);
                    }
                    catch (InterruptedException e) {
                        KafkaEventQueue.this.log.warn("Interrupted while waiting for a deferred event. Shutting down event queue");
                        KafkaEventQueue.this.interrupted = true;
                    }
                    continue;
                }
                finally {
                    KafkaEventQueue.this.lock.unlock();
                    continue;
                }
                break;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Exception enqueue(EventContext eventContext, Function<OptionalLong, OptionalLong> deadlineNsCalculator) {
            KafkaEventQueue.this.lock.lock();
            try {
                EventContext toRemove;
                if (KafkaEventQueue.this.shuttingDown) {
                    RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("The event queue is shutting down");
                    return rejectedExecutionException;
                }
                if (KafkaEventQueue.this.interrupted) {
                    InterruptedException interruptedException = new InterruptedException("The event handler thread is interrupted");
                    return interruptedException;
                }
                OptionalLong existingDeadlineNs = OptionalLong.empty();
                if (eventContext.tag != null && (toRemove = this.tagToEventContext.put(eventContext.tag, eventContext)) != null) {
                    existingDeadlineNs = toRemove.deadlineNs;
                    this.remove(toRemove);
                    --this.size;
                }
                OptionalLong deadlineNs = deadlineNsCalculator.apply(existingDeadlineNs);
                boolean queueWasEmpty = this.head.isSingleton();
                boolean shouldSignal = false;
                switch (eventContext.insertionType) {
                    case APPEND: {
                        this.head.insertBefore(eventContext);
                        if (!queueWasEmpty) break;
                        shouldSignal = true;
                        break;
                    }
                    case PREPEND: {
                        this.head.insertAfter(eventContext);
                        if (!queueWasEmpty) break;
                        shouldSignal = true;
                        break;
                    }
                    case DEFERRED: {
                        if (deadlineNs.isPresent()) break;
                        RuntimeException runtimeException = new RuntimeException("You must specify a deadline for deferred events.");
                        return runtimeException;
                    }
                }
                if (deadlineNs.isPresent()) {
                    long prevStartNs;
                    long insertNs = deadlineNs.getAsLong();
                    long l = prevStartNs = this.deadlineMap.isEmpty() ? Long.MAX_VALUE : this.deadlineMap.firstKey();
                    while (this.deadlineMap.putIfAbsent(insertNs, eventContext) != null) {
                        ++insertNs;
                    }
                    eventContext.deadlineNs = OptionalLong.of(insertNs);
                    if (insertNs <= prevStartNs) {
                        shouldSignal = true;
                    }
                }
                ++this.size;
                if (shouldSignal) {
                    this.cond.signal();
                }
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
            return null;
        }

        void cancelDeferred(String tag) {
            KafkaEventQueue.this.lock.lock();
            try {
                EventContext eventContext = this.tagToEventContext.get(tag);
                if (eventContext != null) {
                    this.remove(eventContext);
                    --this.size;
                }
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        void wakeUp() {
            KafkaEventQueue.this.lock.lock();
            try {
                ((KafkaEventQueue)KafkaEventQueue.this).eventHandler.cond.signal();
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        int size() {
            KafkaEventQueue.this.lock.lock();
            try {
                int n = this.size;
                return n;
            }
            finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }
    }

    private static class EventContext {
        private final EventQueue.Event event;
        private final EventQueue.EventInsertionType insertionType;
        private EventContext prev = this;
        private EventContext next = this;
        private OptionalLong deadlineNs = OptionalLong.empty();
        private String tag;

        EventContext(EventQueue.Event event, EventQueue.EventInsertionType insertionType, String tag) {
            this.event = event;
            this.insertionType = insertionType;
            this.tag = tag;
        }

        void insertAfter(EventContext other) {
            this.next.prev = other;
            other.next = this.next;
            other.prev = this;
            this.next = other;
        }

        void insertBefore(EventContext other) {
            this.prev.next = other;
            other.prev = this.prev;
            other.next = this;
            this.prev = other;
        }

        void remove() {
            this.prev.next = this.next;
            this.next.prev = this.prev;
            this.prev = this;
            this.next = this;
        }

        boolean isSingleton() {
            return this.prev == this && this.next == this;
        }

        boolean run(Logger log, Throwable exceptionToDeliver) {
            if (exceptionToDeliver == null) {
                try {
                    this.event.run();
                }
                catch (InterruptedException e) {
                    log.warn("Interrupted while running event. Shutting down event queue");
                    return true;
                }
                catch (Throwable e) {
                    log.debug("Got exception while running {}. Invoking handleException.", (Object)this.event, (Object)e);
                    exceptionToDeliver = e;
                }
            }
            if (exceptionToDeliver != null) {
                this.completeWithException(log, exceptionToDeliver);
            }
            return Thread.currentThread().isInterrupted();
        }

        void completeWithException(Logger log, Throwable t) {
            try {
                this.event.handleException(t);
            }
            catch (Exception e) {
                log.error("Unexpected exception in handleException", (Throwable)e);
            }
        }
    }
}

