/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.ThroughputThrottler;

public class VerifiableProducer
implements AutoCloseable {
    private final ObjectMapper mapper = new ObjectMapper();
    private final String topic;
    private final Producer<String, String> producer;
    private long maxMessages = -1L;
    private long numAcked = 0L;
    private long numSent = 0L;
    private final long throughput;
    private boolean stopProducing = false;
    private final Integer valuePrefix;
    private final Integer repeatingKeys;
    private int keyCounter;
    private Long createTime;
    private final Long startTime;

    public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, Integer valuePrefix, Long createTime, Integer repeatingKeys) {
        this.topic = topic;
        this.throughput = throughput;
        this.maxMessages = maxMessages;
        this.producer = producer;
        this.valuePrefix = valuePrefix;
        this.createTime = createTime;
        this.startTime = System.currentTimeMillis();
        this.repeatingKeys = repeatingKeys;
    }

    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)"verifiable-producer").defaultHelp(true).description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
        parser.addArgument(new String[]{"--topic"}).action((ArgumentAction)Arguments.store()).required(true).type(String.class).metavar(new String[]{"TOPIC"}).help("Produce messages to this topic.");
        MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group").description("Group of arguments for connection to brokers").required(true);
        connectionGroup.addArgument(new String[]{"--bootstrap-server"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"HOST1:PORT1[,HOST2:PORT2[...]]"}).dest("bootstrapServer").help("REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        connectionGroup.addArgument(new String[]{"--broker-list"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"HOST1:PORT1[,HOST2:PORT2[...]]"}).dest("brokerList").help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        parser.addArgument(new String[]{"--max-messages"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).metavar(new String[]{"MAX-MESSAGES"}).dest("maxMessages").help("Produce this many messages. If -1, produce messages until the process is killed externally.");
        parser.addArgument(new String[]{"--throughput"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).metavar(new String[]{"THROUGHPUT"}).help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
        parser.addArgument(new String[]{"--acks"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)-1).type(Integer.class).choices((Object[])new Integer[]{0, 1, -1}).metavar(new String[]{"ACKS"}).help("Acks required on each produced message. See Kafka docs on acks for details.");
        parser.addArgument(new String[]{"--producer.config"}).action((ArgumentAction)Arguments.store()).required(false).type(String.class).metavar(new String[]{"CONFIG_FILE"}).help("Producer config properties file.");
        parser.addArgument(new String[]{"--message-create-time"}).action((ArgumentAction)Arguments.store()).required(false).setDefault((Object)-1L).type(Long.class).metavar(new String[]{"CREATETIME"}).dest("createTime").help("Send messages with creation time starting at the arguments value, in milliseconds since epoch");
        parser.addArgument(new String[]{"--value-prefix"}).action((ArgumentAction)Arguments.store()).required(false).type(Integer.class).metavar(new String[]{"VALUE-PREFIX"}).dest("valuePrefix").help("If specified, each produced value will have this prefix with a dot separator");
        parser.addArgument(new String[]{"--repeating-keys"}).action((ArgumentAction)Arguments.store()).required(false).type(Integer.class).metavar(new String[]{"REPEATING-KEYS"}).dest("repeatingKeys").help("If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again");
        return parser;
    }

    public static Properties loadProps(String filename) throws IOException {
        Properties props = new Properties();
        try (InputStream propStream = Files.newInputStream(Paths.get(filename, new String[0]), new OpenOption[0]);){
            props.load(propStream);
        }
        return props;
    }

    public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
        Namespace res = parser.parseArgs(args);
        int maxMessages = res.getInt("maxMessages");
        String topic = res.getString("topic");
        int throughput = res.getInt("throughput");
        String configFile = res.getString("producer.config");
        Integer valuePrefix = res.getInt("valuePrefix");
        Long createTime = res.getLong("createTime");
        Integer repeatingKeys = res.getInt("repeatingKeys");
        if (createTime == -1L) {
            createTime = null;
        }
        Properties producerProps = new Properties();
        if (res.get("bootstrapServer") != null) {
            producerProps.put("bootstrap.servers", res.getString("bootstrapServer"));
        } else if (res.getString("brokerList") != null) {
            producerProps.put("bootstrap.servers", res.getString("brokerList"));
        } else {
            parser.printHelp();
            System.exit(0);
        }
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("acks", Integer.toString(res.getInt("acks")));
        producerProps.put("retries", "0");
        if (configFile != null) {
            try {
                producerProps.putAll((Map<?, ?>)VerifiableProducer.loadProps(configFile));
            }
            catch (IOException e) {
                throw new ArgumentParserException(e.getMessage(), parser);
            }
        }
        StringSerializer serializer = new StringSerializer();
        KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)serializer, (Serializer)serializer);
        return new VerifiableProducer((KafkaProducer<String, String>)producer, topic, throughput, maxMessages, valuePrefix, createTime, repeatingKeys);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(String key, String value) {
        ProducerRecord record;
        if (this.createTime != null) {
            record = new ProducerRecord(this.topic, null, this.createTime, (Object)key, (Object)value);
            this.createTime = this.createTime + (System.currentTimeMillis() - this.startTime);
        } else {
            record = new ProducerRecord(this.topic, (Object)key, (Object)value);
        }
        ++this.numSent;
        try {
            this.producer.send(record, (Callback)new PrintInfoCallback(key, value));
        }
        catch (Exception e) {
            PrintStream printStream = System.out;
            synchronized (printStream) {
                this.printJson(new FailedSend(key, value, this.topic, e));
            }
        }
    }

    public String getValue(long val) {
        if (this.valuePrefix != null) {
            return String.format("%d.%d", this.valuePrefix, val);
        }
        return String.format("%d", val);
    }

    public String getKey() {
        String key = null;
        if (this.repeatingKeys != null) {
            key = Integer.toString(this.keyCounter++);
            if (this.keyCounter == this.repeatingKeys) {
                this.keyCounter = 0;
            }
        }
        return key;
    }

    @Override
    public void close() {
        this.producer.close();
        this.printJson(new ShutdownComplete());
    }

    private void printJson(Object data) {
        try {
            System.out.println(this.mapper.writeValueAsString(data));
        }
        catch (JsonProcessingException e) {
            System.out.println("Bad data can't be written as json: " + e.getMessage());
        }
    }

    public void run(ThroughputThrottler throttler) {
        this.printJson(new StartupComplete());
        long maxMessages = this.maxMessages < 0L ? Long.MAX_VALUE : this.maxMessages;
        for (long i = 0L; i < maxMessages && !this.stopProducing; ++i) {
            long sendStartMs = System.currentTimeMillis();
            this.send(this.getKey(), this.getValue(i));
            if (!throttler.shouldThrottle(i, sendStartMs)) continue;
            throttler.throttle();
        }
    }

    public static void main(String[] args) {
        ArgumentParser parser = VerifiableProducer.argParser();
        if (args.length == 0) {
            parser.printHelp();
            System.exit(0);
        }
        try {
            VerifiableProducer producer = VerifiableProducer.createFromArgs(parser, args);
            long startMs = System.currentTimeMillis();
            ThroughputThrottler throttler = new ThroughputThrottler((double)producer.throughput, startMs);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                producer.stopProducing = true;
                producer.close();
                long stopMs = System.currentTimeMillis();
                double avgThroughput = 1000.0 * ((double)producer.numAcked / (double)(stopMs - startMs));
                producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
            }, "verifiable-producer-shutdown-hook"));
            producer.run(throttler);
        }
        catch (ArgumentParserException e) {
            parser.handleError(e);
            System.exit(1);
        }
    }

    private class PrintInfoCallback
    implements Callback {
        private final String key;
        private final String value;

        PrintInfoCallback(String key, String value) {
            this.key = key;
            this.value = value;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            PrintStream printStream = System.out;
            synchronized (printStream) {
                if (e == null) {
                    VerifiableProducer.this.numAcked++;
                    VerifiableProducer.this.printJson(new SuccessfulSend(this.key, this.value, recordMetadata));
                } else {
                    VerifiableProducer.this.printJson(new FailedSend(this.key, this.value, VerifiableProducer.this.topic, e));
                }
            }
        }
    }

    private static class FailedSend
    extends ProducerEvent {
        private final String topic;
        private final String key;
        private final String value;
        private final Exception exception;

        public FailedSend(String key, String value, String topic, Exception exception) {
            assert (exception != null) : "Expected non-null exception.";
            this.key = key;
            this.value = value;
            this.topic = topic;
            this.exception = exception;
        }

        @Override
        public String name() {
            return "producer_send_error";
        }

        @JsonProperty
        public String key() {
            return this.key;
        }

        @JsonProperty
        public String value() {
            return this.value;
        }

        @JsonProperty
        public String topic() {
            return this.topic;
        }

        @JsonProperty
        public String exception() {
            return this.exception.getClass().toString();
        }

        @JsonProperty
        public String message() {
            return this.exception.getMessage();
        }
    }

    private static class ShutdownComplete
    extends ProducerEvent {
        private ShutdownComplete() {
        }

        @Override
        public String name() {
            return "shutdown_complete";
        }
    }

    private static class StartupComplete
    extends ProducerEvent {
        private StartupComplete() {
        }

        @Override
        public String name() {
            return "startup_complete";
        }
    }

    private static class ToolData
    extends ProducerEvent {
        private final long sent;
        private final long acked;
        private final long targetThroughput;
        private final double avgThroughput;

        public ToolData(long sent, long acked, long targetThroughput, double avgThroughput) {
            this.sent = sent;
            this.acked = acked;
            this.targetThroughput = targetThroughput;
            this.avgThroughput = avgThroughput;
        }

        @Override
        public String name() {
            return "tool_data";
        }

        @JsonProperty
        public long sent() {
            return this.sent;
        }

        @JsonProperty
        public long acked() {
            return this.acked;
        }

        @JsonProperty(value="target_throughput")
        public long targetThroughput() {
            return this.targetThroughput;
        }

        @JsonProperty(value="avg_throughput")
        public double avgThroughput() {
            return this.avgThroughput;
        }
    }

    private static class SuccessfulSend
    extends ProducerEvent {
        private final String key;
        private final String value;
        private final RecordMetadata recordMetadata;

        public SuccessfulSend(String key, String value, RecordMetadata recordMetadata) {
            assert (recordMetadata != null) : "Expected non-null recordMetadata object.";
            this.key = key;
            this.value = value;
            this.recordMetadata = recordMetadata;
        }

        @Override
        public String name() {
            return "producer_send_success";
        }

        @JsonProperty
        public String key() {
            return this.key;
        }

        @JsonProperty
        public String value() {
            return this.value;
        }

        @JsonProperty
        public String topic() {
            return this.recordMetadata.topic();
        }

        @JsonProperty
        public int partition() {
            return this.recordMetadata.partition();
        }

        @JsonProperty
        public long offset() {
            return this.recordMetadata.offset();
        }
    }

    @JsonPropertyOrder(value={"timestamp", "name"})
    private static abstract class ProducerEvent {
        private final long timestamp = System.currentTimeMillis();

        private ProducerEvent() {
        }

        @JsonProperty
        public abstract String name();

        @JsonProperty
        public long timestamp() {
            return this.timestamp;
        }
    }
}

