/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.reassign;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.admin.AdminUtils;
import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.Json;
import org.apache.kafka.server.util.json.DecodeJson;
import org.apache.kafka.server.util.json.JsonObject;
import org.apache.kafka.server.util.json.JsonValue;
import org.apache.kafka.tools.TerseException;
import org.apache.kafka.tools.ToolsUtils;
import org.apache.kafka.tools.reassign.ActiveMoveState;
import org.apache.kafka.tools.reassign.CancelledMoveState;
import org.apache.kafka.tools.reassign.CompletedMoveState;
import org.apache.kafka.tools.reassign.LogDirMoveState;
import org.apache.kafka.tools.reassign.MissingLogDirMoveState;
import org.apache.kafka.tools.reassign.MissingReplicaMoveState;
import org.apache.kafka.tools.reassign.PartitionMove;
import org.apache.kafka.tools.reassign.PartitionReassignmentState;
import org.apache.kafka.tools.reassign.ReassignPartitionsCommandOptions;
import org.apache.kafka.tools.reassign.VerifyAssignmentResult;

public class ReassignPartitionsCommand {
    private static final String ANY_LOG_DIR = "any";
    static final String HELP_TEXT = "This tool helps to move topic partitions between replicas.";
    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
    private static final DecodeJson<List<Integer>> INT_LIST = DecodeJson.decodeList((DecodeJson)INT);
    private static final DecodeJson<List<String>> STRING_LIST = DecodeJson.decodeList((DecodeJson)STRING);
    static final int EARLIEST_VERSION = 1;
    static final int EARLIEST_TOPICS_JSON_VERSION = 1;
    static final List<String> BROKER_LEVEL_THROTTLES = Arrays.asList("leader.replication.throttled.rate", "follower.replication.throttled.rate", "replica.alter.log.dirs.io.max.bytes.per.second");
    private static final List<String> TOPIC_LEVEL_THROTTLES = Arrays.asList("leader.replication.throttled.replicas", "follower.replication.throttled.replicas");
    private static final String CANNOT_EXECUTE_BECAUSE_OF_EXISTING_MESSAGE = "Cannot execute because there is an existing partition assignment.  Use --additional to override this and create a new partition assignment in addition to the existing one. The --additional flag can also be used to change the throttle by resubmitting the current reassignment.";
    private static final String YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE = "Warning: You must run --verify periodically, until the reassignment completes, to ensure the throttle is removed.";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        ReassignPartitionsCommandOptions opts = ReassignPartitionsCommand.validateAndParseArgs(args);
        boolean failed = true;
        try (Admin adminClient = null;){
            Properties props;
            Properties properties = props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps((String)((String)opts.options.valueOf(opts.commandConfigOpt))) : new Properties();
            if (opts.options.has(opts.bootstrapControllerOpt)) {
                props.put("bootstrap.controllers", opts.options.valueOf(opts.bootstrapControllerOpt));
            } else {
                props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
            }
            props.putIfAbsent("client.id", "reassign-partitions-tool");
            adminClient = Admin.create((Properties)props);
            ReassignPartitionsCommand.handleAction(adminClient, opts);
            failed = false;
        }
        if (failed) {
            Exit.exit((int)1);
        }
    }

    private static void handleAction(Admin adminClient, ReassignPartitionsCommandOptions opts) throws IOException, ExecutionException, InterruptedException, TerseException {
        if (opts.options.has(opts.verifyOpt)) {
            ReassignPartitionsCommand.verifyAssignment(adminClient, Utils.readFileAsString((String)((String)opts.options.valueOf(opts.reassignmentJsonFileOpt))), opts.options.has(opts.preserveThrottlesOpt));
        } else if (opts.options.has(opts.generateOpt)) {
            ReassignPartitionsCommand.generateAssignment(adminClient, Utils.readFileAsString((String)((String)opts.options.valueOf(opts.topicsToMoveJsonFileOpt))), (String)opts.options.valueOf(opts.brokerListOpt), !opts.options.has(opts.disableRackAware));
        } else if (opts.options.has(opts.executeOpt)) {
            ReassignPartitionsCommand.executeAssignment(adminClient, opts.options.has(opts.additionalOpt), Utils.readFileAsString((String)((String)opts.options.valueOf(opts.reassignmentJsonFileOpt))), (Long)opts.options.valueOf(opts.interBrokerThrottleOpt), (Long)opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt), (Long)opts.options.valueOf(opts.timeoutOpt), Time.SYSTEM);
        } else if (opts.options.has(opts.cancelOpt)) {
            ReassignPartitionsCommand.cancelAssignment(adminClient, Utils.readFileAsString((String)((String)opts.options.valueOf(opts.reassignmentJsonFileOpt))), opts.options.has(opts.preserveThrottlesOpt), (Long)opts.options.valueOf(opts.timeoutOpt), Time.SYSTEM);
        } else if (opts.options.has(opts.listOpt)) {
            ReassignPartitionsCommand.listReassignments(adminClient);
        } else {
            throw new RuntimeException("Unsupported action.");
        }
    }

    static VerifyAssignmentResult verifyAssignment(Admin adminClient, String jsonString, Boolean preserveThrottles) throws ExecutionException, InterruptedException, JsonProcessingException {
        Map.Entry<List<Map.Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> t0 = ReassignPartitionsCommand.parsePartitionReassignmentData(jsonString);
        List<Map.Entry<TopicPartition, List<Integer>>> targetParts = t0.getKey();
        Map<TopicPartitionReplica, String> targetLogDirs = t0.getValue();
        Map.Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> t1 = ReassignPartitionsCommand.verifyPartitionAssignments(adminClient, targetParts);
        Map<TopicPartition, PartitionReassignmentState> partStates = t1.getKey();
        Boolean partsOngoing = t1.getValue();
        Map.Entry<Map<TopicPartitionReplica, LogDirMoveState>, Boolean> t2 = ReassignPartitionsCommand.verifyReplicaMoves(adminClient, targetLogDirs);
        Map<TopicPartitionReplica, LogDirMoveState> moveStates = t2.getKey();
        Boolean movesOngoing = t2.getValue();
        if (!(partsOngoing.booleanValue() || movesOngoing.booleanValue() || preserveThrottles.booleanValue())) {
            ReassignPartitionsCommand.clearAllThrottles(adminClient, targetParts);
        }
        return new VerifyAssignmentResult(partStates, partsOngoing, moveStates, movesOngoing);
    }

    private static Map.Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> verifyPartitionAssignments(Admin adminClient, List<Map.Entry<TopicPartition, List<Integer>>> targets) throws ExecutionException, InterruptedException {
        Map.Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> t0 = ReassignPartitionsCommand.findPartitionReassignmentStates(adminClient, targets);
        System.out.println(ReassignPartitionsCommand.partitionReassignmentStatesToString(t0.getKey()));
        return t0;
    }

    static int compareTopicPartitions(TopicPartition a, TopicPartition b) {
        int topicOrder = Objects.compare(a.topic(), b.topic(), String::compareTo);
        return topicOrder == 0 ? Integer.compare(a.partition(), b.partition()) : topicOrder;
    }

    static int compareTopicPartitionReplicas(TopicPartitionReplica a, TopicPartitionReplica b) {
        int brokerOrder = Integer.compare(a.brokerId(), b.brokerId());
        if (brokerOrder != 0) {
            return brokerOrder;
        }
        int topicOrder = Objects.compare(a.topic(), b.topic(), String::compareTo);
        return topicOrder == 0 ? Integer.compare(a.partition(), b.partition()) : topicOrder;
    }

    static String partitionReassignmentStatesToString(Map<TopicPartition, PartitionReassignmentState> states) {
        ArrayList<String> bld = new ArrayList<String>();
        bld.add("Status of partition reassignment:");
        states.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).forEach(topicPartition -> {
            PartitionReassignmentState state = (PartitionReassignmentState)states.get(topicPartition);
            if (state.done) {
                if (state.currentReplicas.equals(state.targetReplicas)) {
                    bld.add(String.format("Reassignment of partition %s is completed.", topicPartition));
                } else {
                    String currentReplicaStr = state.currentReplicas.stream().map(String::valueOf).collect(Collectors.joining(","));
                    String targetReplicaStr = state.targetReplicas.stream().map(String::valueOf).collect(Collectors.joining(","));
                    bld.add("There is no active reassignment of partition " + String.valueOf(topicPartition) + ", but replica set is " + currentReplicaStr + " rather than " + targetReplicaStr + ".");
                }
            } else {
                bld.add(String.format("Reassignment of partition %s is still in progress.", topicPartition));
            }
        });
        return bld.stream().collect(Collectors.joining(System.lineSeparator()));
    }

    static Map.Entry<Map<TopicPartition, PartitionReassignmentState>, Boolean> findPartitionReassignmentStates(Admin adminClient, List<Map.Entry<TopicPartition, List<Integer>>> targetReassignments) throws ExecutionException, InterruptedException {
        Map currentReassignments = (Map)adminClient.listPartitionReassignments().reassignments().get();
        ArrayList foundReassignments = new ArrayList();
        ArrayList notFoundReassignments = new ArrayList();
        targetReassignments.forEach(reassignment -> {
            if (currentReassignments.containsKey(reassignment.getKey())) {
                foundReassignments.add(reassignment);
            } else {
                notFoundReassignments.add(reassignment);
            }
        });
        List<Map.Entry> foundResults = foundReassignments.stream().map(e -> {
            TopicPartition part = (TopicPartition)e.getKey();
            List targetReplicas = (List)e.getValue();
            return new AbstractMap.SimpleImmutableEntry<TopicPartition, PartitionReassignmentState>(part, new PartitionReassignmentState(((PartitionReassignment)currentReassignments.get(part)).replicas(), targetReplicas, false));
        }).collect(Collectors.toList());
        Set topicNamesToLookUp = notFoundReassignments.stream().map(e -> (TopicPartition)e.getKey()).filter(part -> !currentReassignments.containsKey(part)).map(TopicPartition::topic).collect(Collectors.toSet());
        Map topicDescriptions = adminClient.describeTopics(topicNamesToLookUp).topicNameValues();
        ArrayList<AbstractMap.SimpleImmutableEntry<TopicPartition, PartitionReassignmentState>> notFoundResults = new ArrayList<AbstractMap.SimpleImmutableEntry<TopicPartition, PartitionReassignmentState>>();
        for (Map.Entry e2 : notFoundReassignments) {
            TopicPartition part2 = (TopicPartition)e2.getKey();
            List targetReplicas = (List)e2.getValue();
            if (currentReassignments.containsKey(part2)) {
                PartitionReassignment reassignment2 = (PartitionReassignment)currentReassignments.get(part2);
                notFoundResults.add(new AbstractMap.SimpleImmutableEntry<TopicPartition, PartitionReassignmentState>(part2, new PartitionReassignmentState(reassignment2.replicas(), targetReplicas, false)));
                continue;
            }
            notFoundResults.add(new AbstractMap.SimpleImmutableEntry<TopicPartition, PartitionReassignmentState>(part2, ReassignPartitionsCommand.topicDescriptionFutureToState(part2.partition(), (KafkaFuture<TopicDescription>)((KafkaFuture)topicDescriptions.get(part2.topic())), targetReplicas)));
        }
        HashMap allResults = new HashMap();
        foundResults.forEach(e -> allResults.put((TopicPartition)e.getKey(), (PartitionReassignmentState)e.getValue()));
        notFoundResults.forEach(e -> allResults.put((TopicPartition)e.getKey(), (PartitionReassignmentState)e.getValue()));
        return new AbstractMap.SimpleImmutableEntry<Map<TopicPartition, PartitionReassignmentState>, Boolean>(allResults, !currentReassignments.isEmpty());
    }

    private static PartitionReassignmentState topicDescriptionFutureToState(int partition, KafkaFuture<TopicDescription> future, List<Integer> targetReplicas) throws InterruptedException, ExecutionException {
        try {
            TopicDescription topicDescription = (TopicDescription)future.get();
            if (topicDescription.partitions().size() < partition) {
                throw new ExecutionException("Too few partitions found", (Throwable)new UnknownTopicOrPartitionException());
            }
            return new PartitionReassignmentState(((TopicPartitionInfo)topicDescription.partitions().get(partition)).replicas().stream().map(Node::id).collect(Collectors.toList()), targetReplicas, true);
        }
        catch (ExecutionException t) {
            if (t.getCause() instanceof UnknownTopicOrPartitionException) {
                return new PartitionReassignmentState(Collections.emptyList(), targetReplicas, true);
            }
            throw t;
        }
    }

    private static Map.Entry<Map<TopicPartitionReplica, LogDirMoveState>, Boolean> verifyReplicaMoves(Admin adminClient, Map<TopicPartitionReplica, String> targetReassignments) throws ExecutionException, InterruptedException {
        Map<TopicPartitionReplica, LogDirMoveState> moveStates = ReassignPartitionsCommand.findLogDirMoveStates(adminClient, targetReassignments);
        System.out.println(ReassignPartitionsCommand.replicaMoveStatesToString(moveStates));
        return new AbstractMap.SimpleImmutableEntry<Map<TopicPartitionReplica, LogDirMoveState>, Boolean>(moveStates, !moveStates.values().stream().allMatch(LogDirMoveState::done));
    }

    static Map<TopicPartitionReplica, LogDirMoveState> findLogDirMoveStates(Admin adminClient, Map<TopicPartitionReplica, String> targetMoves) throws ExecutionException, InterruptedException {
        Map replicaLogDirInfos = (Map)adminClient.describeReplicaLogDirs(targetMoves.keySet()).all().get();
        return targetMoves.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
            TopicPartitionReplica replica = (TopicPartitionReplica)e.getKey();
            String targetLogDir = (String)e.getValue();
            if (!replicaLogDirInfos.containsKey(replica)) {
                return new MissingReplicaMoveState(targetLogDir);
            }
            DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = (DescribeReplicaLogDirsResult.ReplicaLogDirInfo)replicaLogDirInfos.get(replica);
            if (info.getCurrentReplicaLogDir() == null) {
                return new MissingLogDirMoveState(targetLogDir);
            }
            if (info.getFutureReplicaLogDir() == null) {
                if (info.getCurrentReplicaLogDir().equals(targetLogDir)) {
                    return new CompletedMoveState(targetLogDir);
                }
                return new CancelledMoveState(info.getCurrentReplicaLogDir(), targetLogDir);
            }
            return new ActiveMoveState(info.getCurrentReplicaLogDir(), targetLogDir, info.getFutureReplicaLogDir());
        }));
    }

    static String replicaMoveStatesToString(Map<TopicPartitionReplica, LogDirMoveState> states) {
        ArrayList bld = new ArrayList();
        states.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas).forEach(replica -> {
            LogDirMoveState state = (LogDirMoveState)states.get(replica);
            if (state instanceof MissingLogDirMoveState) {
                bld.add("Partition " + replica.topic() + "-" + replica.partition() + " is not found in any live log dir on broker " + replica.brokerId() + ". There is likely an offline log directory on the broker.");
            } else if (state instanceof MissingReplicaMoveState) {
                bld.add("Partition " + replica.topic() + "-" + replica.partition() + " cannot be found in any live log directory on broker " + replica.brokerId() + ".");
            } else if (state instanceof ActiveMoveState) {
                String targetLogDir = ((ActiveMoveState)state).targetLogDir;
                String futureLogDir = ((ActiveMoveState)state).futureLogDir;
                if (targetLogDir.equals(futureLogDir)) {
                    bld.add("Reassignment of replica " + String.valueOf(replica) + " is still in progress.");
                } else {
                    bld.add("Partition " + replica.topic() + "-" + replica.partition() + " on broker " + replica.brokerId() + " is being moved to log dir " + futureLogDir + " instead of " + targetLogDir + ".");
                }
            } else if (state instanceof CancelledMoveState) {
                String targetLogDir = ((CancelledMoveState)state).targetLogDir;
                String currentLogDir = ((CancelledMoveState)state).currentLogDir;
                bld.add("Partition " + replica.topic() + "-" + replica.partition() + " on broker " + replica.brokerId() + " is not being moved from log dir " + currentLogDir + " to " + targetLogDir + ".");
            } else if (state instanceof CompletedMoveState) {
                bld.add("Reassignment of replica " + String.valueOf(replica) + " completed successfully.");
            }
        });
        return bld.stream().collect(Collectors.joining(System.lineSeparator()));
    }

    private static void clearAllThrottles(Admin adminClient, List<Map.Entry<TopicPartition, List<Integer>>> targetParts) throws ExecutionException, InterruptedException {
        Set<Integer> brokers = ((Collection)adminClient.describeCluster().nodes().get()).stream().map(Node::id).collect(Collectors.toSet());
        targetParts.forEach(t -> brokers.addAll((Collection)t.getValue()));
        System.out.printf("Clearing broker-level throttles on broker%s %s%n", brokers.size() == 1 ? "" : "s", brokers.stream().map(Object::toString).collect(Collectors.joining(",")));
        ReassignPartitionsCommand.clearBrokerLevelThrottles(adminClient, brokers);
        Set<String> topics = targetParts.stream().map(t -> ((TopicPartition)t.getKey()).topic()).collect(Collectors.toSet());
        System.out.printf("Clearing topic-level throttles on topic%s %s%n", topics.size() == 1 ? "" : "s", String.join((CharSequence)",", topics));
        ReassignPartitionsCommand.clearTopicLevelThrottles(adminClient, topics);
    }

    private static void clearBrokerLevelThrottles(Admin adminClient, Set<Integer> brokers) throws ExecutionException, InterruptedException {
        HashMap configOps = new HashMap();
        brokers.forEach(brokerId -> configOps.put(new ConfigResource(ConfigResource.Type.BROKER, brokerId.toString()), (Collection)BROKER_LEVEL_THROTTLES.stream().map(throttle -> new AlterConfigOp(new ConfigEntry(throttle, null), AlterConfigOp.OpType.DELETE)).collect(Collectors.toList())));
        adminClient.incrementalAlterConfigs(configOps).all().get();
    }

    private static void clearTopicLevelThrottles(Admin adminClient, Set<String> topics) throws ExecutionException, InterruptedException {
        Map<ConfigResource, Collection> configOps = topics.stream().collect(Collectors.toMap(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName), topicName -> TOPIC_LEVEL_THROTTLES.stream().map(throttle -> new AlterConfigOp(new ConfigEntry(throttle, null), AlterConfigOp.OpType.DELETE)).collect(Collectors.toList())));
        adminClient.incrementalAlterConfigs(configOps).all().get();
    }

    public static Map.Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>> generateAssignment(Admin adminClient, String reassignmentJson, String brokerListString, Boolean enableRackAwareness) throws ExecutionException, InterruptedException, JsonProcessingException {
        Map.Entry<List<Integer>, List<String>> t0 = ReassignPartitionsCommand.parseGenerateAssignmentArgs(reassignmentJson, brokerListString);
        List<Integer> brokersToReassign = t0.getKey();
        List<String> topicsToReassign = t0.getValue();
        Map<TopicPartition, List<Integer>> currentAssignments = ReassignPartitionsCommand.getReplicaAssignmentForTopics(adminClient, topicsToReassign);
        List<BrokerMetadata> brokerMetadatas = ReassignPartitionsCommand.getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness);
        Map<TopicPartition, List<Integer>> proposedAssignments = ReassignPartitionsCommand.calculateAssignment(currentAssignments, brokerMetadatas);
        System.out.printf("Current partition replica assignment%n%s%n%n", ReassignPartitionsCommand.formatAsReassignmentJson(currentAssignments, Collections.emptyMap()));
        System.out.printf("Proposed partition reassignment configuration%n%s%n", ReassignPartitionsCommand.formatAsReassignmentJson(proposedAssignments, Collections.emptyMap()));
        return new AbstractMap.SimpleImmutableEntry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List<Integer>>>(proposedAssignments, currentAssignments);
    }

    private static Map<TopicPartition, List<Integer>> calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment, List<BrokerMetadata> brokerMetadatas) {
        HashMap<String, List> groupedByTopic = new HashMap<String, List>();
        for (Map.Entry<TopicPartition, List<Integer>> e : currentAssignment.entrySet()) {
            groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new ArrayList()).add(e);
        }
        HashMap<TopicPartition, List<Integer>> proposedAssignments = new HashMap<TopicPartition, List<Integer>>();
        groupedByTopic.forEach((topic, assignment) -> {
            List replicas = (List)((Map.Entry)assignment.get(0)).getValue();
            Map assignedReplicas = AdminUtils.assignReplicasToBrokers((Collection)brokerMetadatas, (int)assignment.size(), (int)replicas.size());
            assignedReplicas.forEach((partition, replicas0) -> proposedAssignments.put(new TopicPartition(topic, partition.intValue()), (List<Integer>)replicas0));
        });
        return proposedAssignments;
    }

    private static Map<String, TopicDescription> describeTopics(Admin adminClient, Set<String> topics) throws ExecutionException, InterruptedException {
        Map futures = adminClient.describeTopics(topics).topicNameValues();
        HashMap<String, TopicDescription> res = new HashMap<String, TopicDescription>();
        for (Map.Entry e : futures.entrySet()) {
            String topicName = (String)e.getKey();
            KafkaFuture topicDescriptionFuture = (KafkaFuture)e.getValue();
            try {
                res.put(topicName, (TopicDescription)topicDescriptionFuture.get());
            }
            catch (ExecutionException t) {
                if (t.getCause() instanceof UnknownTopicOrPartitionException) {
                    throw new ExecutionException((Throwable)new UnknownTopicOrPartitionException("Topic " + topicName + " not found."));
                }
                throw t;
            }
        }
        return res;
    }

    static Map<TopicPartition, List<Integer>> getReplicaAssignmentForTopics(Admin adminClient, List<String> topics) throws ExecutionException, InterruptedException {
        HashMap<TopicPartition, List<Integer>> res = new HashMap<TopicPartition, List<Integer>>();
        ReassignPartitionsCommand.describeTopics(adminClient, new HashSet<String>(topics)).forEach((topicName, topicDescription) -> topicDescription.partitions().forEach(info -> res.put(new TopicPartition(topicName, info.partition()), info.replicas().stream().map(Node::id).collect(Collectors.toList()))));
        return res;
    }

    static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admin adminClient, Set<TopicPartition> partitions) throws ExecutionException, InterruptedException {
        HashMap<TopicPartition, List<Integer>> res = new HashMap<TopicPartition, List<Integer>>();
        ReassignPartitionsCommand.describeTopics(adminClient, partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet())).forEach((topicName, topicDescription) -> topicDescription.partitions().forEach(info -> {
            TopicPartition tp = new TopicPartition(topicName, info.partition());
            if (partitions.contains(tp)) {
                res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList()));
            }
        }));
        if (!res.keySet().equals(partitions)) {
            HashSet<TopicPartition> missingPartitions = new HashSet<TopicPartition>(partitions);
            missingPartitions.removeAll(res.keySet());
            throw new ExecutionException((Throwable)new UnknownTopicOrPartitionException("Unable to find partition: " + missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))));
        }
        return res;
    }

    static List<BrokerMetadata> getBrokerMetadata(Admin adminClient, List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, InterruptedException {
        HashSet<Integer> brokerSet = new HashSet<Integer>(brokers);
        List<BrokerMetadata> results = ((Collection)adminClient.describeCluster().nodes().get()).stream().filter(node -> brokerSet.contains(node.id())).map(node -> enableRackAwareness && node.rack() != null ? new BrokerMetadata(node.id(), Optional.of(node.rack())) : new BrokerMetadata(node.id(), Optional.empty())).collect(Collectors.toList());
        long numRackless = results.stream().filter(m -> !m.rack.isPresent()).count();
        if (enableRackAwareness && numRackless != 0L && numRackless != (long)results.size()) {
            throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.");
        }
        return results;
    }

    static Map.Entry<List<Integer>, List<String>> parseGenerateAssignmentArgs(String reassignmentJson, String brokerList) throws JsonMappingException {
        List brokerListToReassign = Arrays.stream(brokerList.split(",")).map(Integer::parseInt).collect(Collectors.toList());
        Set duplicateReassignments = ToolsUtils.duplicates(brokerListToReassign);
        if (!duplicateReassignments.isEmpty()) {
            throw new AdminCommandFailedException(String.format("Broker list contains duplicate entries: %s", duplicateReassignments));
        }
        List<String> topicsToReassign = ReassignPartitionsCommand.parseTopicsData(reassignmentJson);
        Set<String> duplicateTopicsToReassign = ToolsUtils.duplicates(topicsToReassign);
        if (!duplicateTopicsToReassign.isEmpty()) {
            throw new AdminCommandFailedException(String.format("List of topics to reassign contains duplicate entries: %s", duplicateTopicsToReassign));
        }
        return new AbstractMap.SimpleImmutableEntry<List<Integer>, List<String>>(brokerListToReassign, topicsToReassign);
    }

    public static void executeAssignment(Admin adminClient, Boolean additional, String reassignmentJson, Long interBrokerThrottle, Long logDirThrottle, Long timeoutMs, Time time) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException {
        Map<TopicPartition, Throwable> errors;
        Map.Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, String>> t0 = ReassignPartitionsCommand.parseExecuteAssignmentArgs(reassignmentJson);
        Map<TopicPartition, List<Integer>> proposedParts = t0.getKey();
        Map<TopicPartitionReplica, String> proposedReplicas = t0.getValue();
        Map currentReassignments = (Map)adminClient.listPartitionReassignments().reassignments().get();
        if (!additional.booleanValue() && !currentReassignments.isEmpty()) {
            throw new TerseException(CANNOT_EXECUTE_BECAUSE_OF_EXISTING_MESSAGE);
        }
        HashSet<Integer> brokers = new HashSet<Integer>();
        proposedParts.values().forEach(brokers::addAll);
        ReassignPartitionsCommand.verifyBrokerIds(adminClient, brokers);
        Map<TopicPartition, List<Integer>> currentParts = ReassignPartitionsCommand.getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
        System.out.println(ReassignPartitionsCommand.currentPartitionReplicaAssignmentToString(proposedParts, currentParts));
        if (interBrokerThrottle >= 0L || logDirThrottle >= 0L) {
            System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
            if (interBrokerThrottle >= 0L) {
                Map<String, Map<Integer, PartitionMove>> moveMap = ReassignPartitionsCommand.calculateProposedMoveMap(currentReassignments, proposedParts, currentParts);
                ReassignPartitionsCommand.modifyReassignmentThrottle(adminClient, moveMap, interBrokerThrottle);
            }
            if (logDirThrottle >= 0L) {
                Set<Integer> movingBrokers = ReassignPartitionsCommand.calculateMovingBrokers(proposedReplicas.keySet());
                ReassignPartitionsCommand.modifyLogDirThrottle(adminClient, movingBrokers, logDirThrottle);
            }
        }
        if (!(errors = ReassignPartitionsCommand.alterPartitionReassignments(adminClient, proposedParts)).isEmpty()) {
            throw new TerseException(String.format("Error reassigning partition(s):%n%s", errors.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).map(part -> String.valueOf(part) + ": " + ((Throwable)errors.get(part)).getMessage()).collect(Collectors.joining(System.lineSeparator()))));
        }
        System.out.printf("Successfully started partition reassignment%s for %s%n", proposedParts.size() == 1 ? "" : "s", proposedParts.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).map(Objects::toString).collect(Collectors.joining(",")));
        if (!proposedReplicas.isEmpty()) {
            ReassignPartitionsCommand.executeMoves(adminClient, proposedReplicas, timeoutMs, time);
        }
    }

    private static void executeMoves(Admin adminClient, Map<TopicPartitionReplica, String> proposedReplicas, Long timeoutMs, Time time) throws InterruptedException, TerseException {
        long startTimeMs = time.milliseconds();
        HashMap<TopicPartitionReplica, String> pendingReplicas = new HashMap<TopicPartitionReplica, String>(proposedReplicas);
        boolean done = false;
        do {
            Set<TopicPartitionReplica> completed;
            if (!(completed = ReassignPartitionsCommand.alterReplicaLogDirs(adminClient, pendingReplicas)).isEmpty()) {
                completed.stream().sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas).forEach(replica -> System.out.printf("Successfully started moving log directory to %s for replica %s-%s with broker %s %n", pendingReplicas.get(replica), replica.topic(), replica.partition(), replica.brokerId()));
            }
            completed.forEach(pendingReplicas::remove);
            if (pendingReplicas.isEmpty()) {
                done = true;
                continue;
            }
            if (time.milliseconds() >= startTimeMs + timeoutMs) {
                throw new TerseException(String.format("Timed out before log directory move%s could be started for: %s", pendingReplicas.size() == 1 ? "" : "s", pendingReplicas.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas).map(Object::toString).collect(Collectors.joining(","))));
            }
            time.sleep(100L);
        } while (!done);
    }

    private static void listReassignments(Admin adminClient) throws ExecutionException, InterruptedException {
        System.out.println(ReassignPartitionsCommand.curReassignmentsToString(adminClient));
    }

    static String curReassignmentsToString(Admin adminClient) throws ExecutionException, InterruptedException {
        Map currentReassignments = (Map)adminClient.listPartitionReassignments().reassignments().get();
        String text = currentReassignments.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).map(part -> {
            PartitionReassignment reassignment = (PartitionReassignment)currentReassignments.get(part);
            List replicas = reassignment.replicas();
            List addingReplicas = reassignment.addingReplicas();
            List removingReplicas = reassignment.removingReplicas();
            return String.format("%s: replicas: %s.%s%s", part, replicas.stream().map(Object::toString).collect(Collectors.joining(",")), addingReplicas.isEmpty() ? "" : String.format(" adding: %s.", addingReplicas.stream().map(Object::toString).collect(Collectors.joining(","))), removingReplicas.isEmpty() ? "" : String.format(" removing: %s.", removingReplicas.stream().map(Object::toString).collect(Collectors.joining(","))));
        }).collect(Collectors.joining(System.lineSeparator()));
        return text.isEmpty() ? "No partition reassignments found." : String.format("Current partition reassignments:%n%s", text);
    }

    private static void verifyBrokerIds(Admin adminClient, Set<Integer> brokers) throws ExecutionException, InterruptedException {
        Set allNodeIds = ((Collection)adminClient.describeCluster().nodes().get()).stream().map(Node::id).collect(Collectors.toSet());
        Optional<Integer> unknown = brokers.stream().filter(brokerId -> !allNodeIds.contains(brokerId)).findFirst();
        if (unknown.isPresent()) {
            throw new AdminCommandFailedException("Unknown broker id " + String.valueOf(unknown.get()));
        }
    }

    static String currentPartitionReplicaAssignmentToString(Map<TopicPartition, List<Integer>> proposedParts, Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException {
        Map<TopicPartition, List<Integer>> partitionsToBeReassigned = currentParts.entrySet().stream().filter(e -> proposedParts.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        return String.format("Current partition replica assignment%n%n%s%n%nSave this to use as the %s", ReassignPartitionsCommand.formatAsReassignmentJson(partitionsToBeReassigned, Collections.emptyMap()), "--reassignment-json-file option during rollback");
    }

    static Map<TopicPartition, Throwable> alterPartitionReassignments(Admin adminClient, Map<TopicPartition, List<Integer>> reassignments) throws InterruptedException {
        HashMap args = new HashMap();
        reassignments.forEach((part, replicas) -> args.put(part, Optional.of(new NewPartitionReassignment(replicas))));
        Map results = adminClient.alterPartitionReassignments(args).values();
        HashMap<TopicPartition, Throwable> errors = new HashMap<TopicPartition, Throwable>();
        for (Map.Entry e : results.entrySet()) {
            try {
                ((KafkaFuture)e.getValue()).get();
            }
            catch (ExecutionException t) {
                errors.put((TopicPartition)e.getKey(), t.getCause());
            }
        }
        return errors;
    }

    static Map<TopicPartition, Throwable> cancelPartitionReassignments(Admin adminClient, Set<TopicPartition> reassignments) throws InterruptedException {
        HashMap args = new HashMap();
        reassignments.forEach(part -> args.put(part, Optional.empty()));
        Map results = adminClient.alterPartitionReassignments(args).values();
        HashMap<TopicPartition, Throwable> errors = new HashMap<TopicPartition, Throwable>();
        for (Map.Entry e : results.entrySet()) {
            try {
                ((KafkaFuture)e.getValue()).get();
            }
            catch (ExecutionException t) {
                errors.put((TopicPartition)e.getKey(), t.getCause());
            }
        }
        return errors;
    }

    private static Map<String, Map<Integer, PartitionMove>> calculateCurrentMoveMap(Map<TopicPartition, PartitionReassignment> currentReassignments) {
        HashMap<String, Map<Integer, PartitionMove>> moveMap = new HashMap<String, Map<Integer, PartitionMove>>();
        currentReassignments.forEach((part, reassignment) -> {
            List allReplicas = reassignment.replicas();
            List addingReplicas = reassignment.addingReplicas();
            HashSet<Integer> sources = new HashSet<Integer>(allReplicas);
            addingReplicas.forEach(sources::remove);
            HashSet<Integer> destinations = new HashSet<Integer>(addingReplicas);
            Map partMoves = moveMap.computeIfAbsent(part.topic(), k -> new HashMap());
            partMoves.put(part.partition(), new PartitionMove(sources, destinations));
        });
        return moveMap;
    }

    static Map<String, Map<Integer, PartitionMove>> calculateProposedMoveMap(Map<TopicPartition, PartitionReassignment> currentReassignments, Map<TopicPartition, List<Integer>> proposedParts, Map<TopicPartition, List<Integer>> currentParts) {
        Map<String, Map<Integer, PartitionMove>> moveMap = ReassignPartitionsCommand.calculateCurrentMoveMap(currentReassignments);
        for (Map.Entry<TopicPartition, List<Integer>> e : proposedParts.entrySet()) {
            TopicPartition part = e.getKey();
            List<Integer> replicas = e.getValue();
            Map partMoves = moveMap.computeIfAbsent(part.topic(), k -> new HashMap());
            HashSet<Integer> sources = new HashSet<Integer>();
            if (partMoves.containsKey(part.partition())) {
                PartitionMove move = (PartitionMove)partMoves.get(part.partition());
                sources.addAll(move.sources);
            } else if (currentParts.containsKey(part)) {
                sources.addAll((Collection)currentParts.get(part));
            } else {
                throw new RuntimeException("Trying to reassign a topic partition " + String.valueOf(part) + " with 0 replicas");
            }
            HashSet<Integer> destinations = new HashSet<Integer>(replicas);
            destinations.removeAll(sources);
            partMoves.put(part.partition(), new PartitionMove(sources, destinations));
        }
        return moveMap;
    }

    static Map<String, String> calculateLeaderThrottles(Map<String, Map<Integer, PartitionMove>> moveMap) {
        HashMap<String, String> results = new HashMap<String, String>();
        moveMap.forEach((topicName, partMoveMap) -> {
            TreeSet components = new TreeSet();
            partMoveMap.forEach((partId, move) -> move.sources.forEach(source -> components.add(String.format("%d:%d", partId, source))));
            results.put((String)topicName, String.join((CharSequence)",", components));
        });
        return results;
    }

    static Map<String, String> calculateFollowerThrottles(Map<String, Map<Integer, PartitionMove>> moveMap) {
        HashMap<String, String> results = new HashMap<String, String>();
        moveMap.forEach((topicName, partMoveMap) -> {
            TreeSet components = new TreeSet();
            partMoveMap.forEach((partId, move) -> move.destinations.forEach(destination -> {
                if (!move.sources.contains(destination)) {
                    components.add(String.format("%d:%d", partId, destination));
                }
            }));
            results.put((String)topicName, String.join((CharSequence)",", components));
        });
        return results;
    }

    static Set<Integer> calculateReassigningBrokers(Map<String, Map<Integer, PartitionMove>> moveMap) {
        TreeSet<Integer> reassigningBrokers = new TreeSet<Integer>();
        moveMap.values().forEach(partMoveMap -> partMoveMap.values().forEach(partMove -> {
            reassigningBrokers.addAll(partMove.sources);
            reassigningBrokers.addAll(partMove.destinations);
        }));
        return reassigningBrokers;
    }

    static Set<Integer> calculateMovingBrokers(Set<TopicPartitionReplica> replicaMoves) {
        return replicaMoves.stream().map(TopicPartitionReplica::brokerId).collect(Collectors.toSet());
    }

    static void modifyTopicThrottles(Admin adminClient, Map<String, String> leaderThrottles, Map<String, String> followerThrottles) throws ExecutionException, InterruptedException {
        HashMap configs = new HashMap();
        HashSet<String> topicNames = new HashSet<String>(leaderThrottles.keySet());
        topicNames.addAll(followerThrottles.keySet());
        topicNames.forEach(topicName -> {
            ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
            if (leaderThrottles.containsKey(topicName)) {
                ops.add(new AlterConfigOp(new ConfigEntry("leader.replication.throttled.replicas", (String)leaderThrottles.get(topicName)), AlterConfigOp.OpType.SET));
            }
            if (followerThrottles.containsKey(topicName)) {
                ops.add(new AlterConfigOp(new ConfigEntry("follower.replication.throttled.replicas", (String)followerThrottles.get(topicName)), AlterConfigOp.OpType.SET));
            }
            if (!ops.isEmpty()) {
                configs.put(new ConfigResource(ConfigResource.Type.TOPIC, topicName), ops);
            }
        });
        adminClient.incrementalAlterConfigs(configs).all().get();
    }

    private static void modifyReassignmentThrottle(Admin admin, Map<String, Map<Integer, PartitionMove>> moveMap, Long interBrokerThrottle) throws ExecutionException, InterruptedException {
        Map<String, String> leaderThrottles = ReassignPartitionsCommand.calculateLeaderThrottles(moveMap);
        Map<String, String> followerThrottles = ReassignPartitionsCommand.calculateFollowerThrottles(moveMap);
        ReassignPartitionsCommand.modifyTopicThrottles(admin, leaderThrottles, followerThrottles);
        Set<Integer> reassigningBrokers = ReassignPartitionsCommand.calculateReassigningBrokers(moveMap);
        ReassignPartitionsCommand.modifyInterBrokerThrottle(admin, reassigningBrokers, interBrokerThrottle);
    }

    static void modifyInterBrokerThrottle(Admin adminClient, Set<Integer> reassigningBrokers, long interBrokerThrottle) throws ExecutionException, InterruptedException {
        if (interBrokerThrottle >= 0L) {
            HashMap configs = new HashMap();
            reassigningBrokers.forEach(brokerId -> {
                ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
                ops.add(new AlterConfigOp(new ConfigEntry("leader.replication.throttled.rate", Long.toString(interBrokerThrottle)), AlterConfigOp.OpType.SET));
                ops.add(new AlterConfigOp(new ConfigEntry("follower.replication.throttled.rate", Long.toString(interBrokerThrottle)), AlterConfigOp.OpType.SET));
                configs.put(new ConfigResource(ConfigResource.Type.BROKER, Long.toString(brokerId.intValue())), ops);
            });
            adminClient.incrementalAlterConfigs(configs).all().get();
            System.out.println("The inter-broker throttle limit was set to " + interBrokerThrottle + " B/s");
        }
    }

    static void modifyLogDirThrottle(Admin admin, Set<Integer> movingBrokers, long logDirThrottle) throws ExecutionException, InterruptedException {
        if (logDirThrottle >= 0L) {
            HashMap configs = new HashMap();
            movingBrokers.forEach(brokerId -> {
                ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
                ops.add(new AlterConfigOp(new ConfigEntry("replica.alter.log.dirs.io.max.bytes.per.second", Long.toString(logDirThrottle)), AlterConfigOp.OpType.SET));
                configs.put(new ConfigResource(ConfigResource.Type.BROKER, Long.toString(brokerId.intValue())), ops);
            });
            admin.incrementalAlterConfigs(configs).all().get();
            System.out.println("The replica-alter-dir throttle limit was set to " + logDirThrottle + " B/s");
        }
    }

    static Map.Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, String>> parseExecuteAssignmentArgs(String reassignmentJson) throws JsonProcessingException {
        Map.Entry<List<Map.Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> t0 = ReassignPartitionsCommand.parsePartitionReassignmentData(reassignmentJson);
        List<Map.Entry<TopicPartition, List<Integer>>> partitionsToBeReassigned = t0.getKey();
        Map<TopicPartitionReplica, String> replicaAssignment = t0.getValue();
        if (partitionsToBeReassigned.isEmpty()) {
            throw new AdminCommandFailedException("Partition reassignment list cannot be empty");
        }
        if (partitionsToBeReassigned.stream().anyMatch(t -> ((List)t.getValue()).isEmpty())) {
            throw new AdminCommandFailedException("Partition replica list cannot be empty");
        }
        Set duplicateReassignedPartitions = ToolsUtils.duplicates(partitionsToBeReassigned.stream().map(t -> (TopicPartition)t.getKey()).collect(Collectors.toList()));
        if (!duplicateReassignedPartitions.isEmpty()) {
            throw new AdminCommandFailedException(String.format("Partition reassignment contains duplicate topic partitions: %s", duplicateReassignedPartitions.stream().map(Object::toString).collect(Collectors.joining(","))));
        }
        List duplicateEntries = partitionsToBeReassigned.stream().map(t -> new AbstractMap.SimpleImmutableEntry((TopicPartition)t.getKey(), ToolsUtils.duplicates((List)t.getValue()))).filter(t -> !((Set)t.getValue()).isEmpty()).collect(Collectors.toList());
        if (!duplicateEntries.isEmpty()) {
            String duplicatesMsg = duplicateEntries.stream().map(t -> String.format("%s contains multiple entries for %s", t.getKey(), ((Set)t.getValue()).stream().map(Object::toString).collect(Collectors.joining(",")))).collect(Collectors.joining(". "));
            throw new AdminCommandFailedException(String.format("Partition replica lists may not contain duplicate entries: %s", duplicatesMsg));
        }
        return new AbstractMap.SimpleImmutableEntry<Map<TopicPartition, List<Integer>>, Map<TopicPartitionReplica, String>>(partitionsToBeReassigned.stream().collect(Collectors.toMap(t -> (TopicPartition)t.getKey(), t -> (List)t.getValue())), replicaAssignment);
    }

    static Map.Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> cancelAssignment(Admin adminClient, String jsonString, Boolean preserveThrottles, Long timeoutMs, Time time) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException {
        Map.Entry<List<Map.Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> t0 = ReassignPartitionsCommand.parsePartitionReassignmentData(jsonString);
        List<Map.Entry<TopicPartition, List<Integer>>> targetParts = t0.getKey();
        Map<TopicPartitionReplica, String> targetReplicas = t0.getValue();
        Set targetPartsSet = targetParts.stream().map(t -> (TopicPartition)t.getKey()).collect(Collectors.toSet());
        HashSet<TopicPartition> curReassigningParts = new HashSet<TopicPartition>();
        ((Map)adminClient.listPartitionReassignments(targetPartsSet).reassignments().get()).forEach((part, reassignment) -> {
            if (reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) {
                curReassigningParts.add((TopicPartition)part);
            }
        });
        if (!curReassigningParts.isEmpty()) {
            Map<TopicPartition, Throwable> errors = ReassignPartitionsCommand.cancelPartitionReassignments(adminClient, curReassigningParts);
            if (!errors.isEmpty()) {
                throw new TerseException(String.format("Error cancelling partition reassignment%s for:%n%s", errors.size() == 1 ? "" : "s", errors.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).map(part -> String.valueOf(part) + ": " + ((Throwable)errors.get(part)).getMessage()).collect(Collectors.joining(System.lineSeparator()))));
            }
            System.out.printf("Successfully cancelled partition reassignment%s for: %s%n", curReassigningParts.size() == 1 ? "" : "s", curReassigningParts.stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).map(Object::toString).collect(Collectors.joining(",")));
        } else {
            System.out.println("None of the specified partition reassignments are active.");
        }
        HashMap<TopicPartitionReplica, String> curMovingParts = new HashMap<TopicPartitionReplica, String>();
        ReassignPartitionsCommand.findLogDirMoveStates(adminClient, targetReplicas).forEach((part, moveState) -> {
            if (moveState instanceof ActiveMoveState) {
                curMovingParts.put((TopicPartitionReplica)part, ((ActiveMoveState)moveState).currentLogDir);
            }
        });
        if (curMovingParts.isEmpty()) {
            System.out.print("None of the specified partition moves are active.");
        } else {
            ReassignPartitionsCommand.executeMoves(adminClient, curMovingParts, timeoutMs, time);
        }
        if (!preserveThrottles.booleanValue()) {
            ReassignPartitionsCommand.clearAllThrottles(adminClient, targetParts);
        }
        return new AbstractMap.SimpleImmutableEntry<Set<TopicPartition>, Set<TopicPartitionReplica>>(curReassigningParts, curMovingParts.keySet());
    }

    public static String formatAsReassignmentJson(Map<TopicPartition, List<Integer>> partitionsToBeReassigned, Map<TopicPartitionReplica, String> replicaLogDirAssignment) throws JsonProcessingException {
        ArrayList partitions = new ArrayList();
        partitionsToBeReassigned.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).forEach(tp -> {
            List replicas = (List)partitionsToBeReassigned.get(tp);
            LinkedHashMap<String, Object> data = new LinkedHashMap<String, Object>();
            data.put("topic", tp.topic());
            data.put("partition", tp.partition());
            data.put("replicas", replicas);
            data.put("log_dirs", replicas.stream().map(r -> replicaLogDirAssignment.getOrDefault(new TopicPartitionReplica(tp.topic(), tp.partition(), r.intValue()), ANY_LOG_DIR)).collect(Collectors.toList()));
            partitions.add(data);
        });
        LinkedHashMap<String, Serializable> results = new LinkedHashMap<String, Serializable>();
        results.put("version", Integer.valueOf(1));
        results.put("partitions", partitions);
        return Json.encodeAsString(results);
    }

    private static List<String> parseTopicsData(String jsonData) throws JsonMappingException {
        Optional parsed = Json.parseFull((String)jsonData);
        if (parsed.isPresent()) {
            JsonValue js = (JsonValue)parsed.get();
            Optional version = js.asJsonObject().get("version");
            return ReassignPartitionsCommand.parseTopicsData(version.isPresent() ? (Integer)((JsonValue)version.get()).to((DecodeJson)INT) : 1, js);
        }
        throw new AdminOperationException("The input string is not a valid JSON");
    }

    private static List<String> parseTopicsData(int version, JsonValue js) throws JsonMappingException {
        switch (version) {
            case 1: {
                ArrayList<String> results = new ArrayList<String>();
                Optional partitionsSeq = js.asJsonObject().get("topics");
                if (partitionsSeq.isPresent()) {
                    Iterator iter = ((JsonValue)partitionsSeq.get()).asJsonArray().iterator();
                    while (iter.hasNext()) {
                        results.add((String)((JsonValue)iter.next()).asJsonObject().apply("topic").to((DecodeJson)STRING));
                    }
                }
                return results;
            }
        }
        throw new AdminOperationException("Not supported version field value " + version);
    }

    private static Map.Entry<List<Map.Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(String jsonData) throws JsonProcessingException {
        JsonValue js;
        try {
            js = Json.tryParseFull((String)jsonData);
        }
        catch (JsonParseException f) {
            throw new AdminOperationException((Throwable)f);
        }
        Optional version = js.asJsonObject().get("version");
        return ReassignPartitionsCommand.parsePartitionReassignmentData(version.isPresent() ? (Integer)((JsonValue)version.get()).to((DecodeJson)INT) : 1, js);
    }

    private static Map.Entry<List<Map.Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(int version, JsonValue jsonData) throws JsonMappingException {
        switch (version) {
            case 1: {
                ArrayList<AbstractMap.SimpleImmutableEntry<TopicPartition, List>> partitionAssignment = new ArrayList<AbstractMap.SimpleImmutableEntry<TopicPartition, List>>();
                HashMap<TopicPartitionReplica, String> replicaAssignment = new HashMap<TopicPartitionReplica, String>();
                Optional partitionsSeq = jsonData.asJsonObject().get("partitions");
                if (partitionsSeq.isPresent()) {
                    Iterator iter = ((JsonValue)partitionsSeq.get()).asJsonArray().iterator();
                    while (iter.hasNext()) {
                        JsonObject partitionFields = ((JsonValue)iter.next()).asJsonObject();
                        String topic = (String)partitionFields.apply("topic").to((DecodeJson)STRING);
                        int partition = (Integer)partitionFields.apply("partition").to((DecodeJson)INT);
                        List newReplicas = (List)partitionFields.apply("replicas").to(INT_LIST);
                        Optional logDirsOpts = partitionFields.get("log_dirs");
                        List newLogDirs = logDirsOpts.isPresent() ? (List)((JsonValue)logDirsOpts.get()).to(STRING_LIST) : newReplicas.stream().map(r -> ANY_LOG_DIR).collect(Collectors.toList());
                        if (newReplicas.size() != newLogDirs.size()) {
                            throw new AdminCommandFailedException("Size of replicas list " + String.valueOf(newReplicas) + " is different from size of log dirs list " + String.valueOf(newLogDirs) + " for partition " + String.valueOf(new TopicPartition(topic, partition)));
                        }
                        partitionAssignment.add(new AbstractMap.SimpleImmutableEntry<TopicPartition, List>(new TopicPartition(topic, partition), newReplicas));
                        for (int i = 0; i < newLogDirs.size(); ++i) {
                            Integer replica = (Integer)newReplicas.get(i);
                            String logDir = (String)newLogDirs.get(i);
                            if (logDir.equals(ANY_LOG_DIR)) continue;
                            replicaAssignment.put(new TopicPartitionReplica(topic, partition, replica.intValue()), logDir);
                        }
                    }
                }
                return new AbstractMap.SimpleImmutableEntry<List<Map.Entry<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>>(partitionAssignment, replicaAssignment);
            }
        }
        throw new AdminOperationException("Not supported version field value " + version);
    }

    static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
        ReassignPartitionsCommandOptions opts = new ReassignPartitionsCommandOptions(args);
        CommandLineUtils.maybePrintHelpOrVersion((CommandDefaultOptions)opts, (String)HELP_TEXT);
        List<OptionSpec> validActions = Arrays.asList(opts.generateOpt, opts.executeOpt, opts.verifyOpt, opts.cancelOpt, opts.listOpt);
        List allActions = validActions.stream().filter(a -> opts.options.has(a)).collect(Collectors.toList());
        if (allActions.size() != 1) {
            CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)String.format("Command must include exactly one action: %s", validActions.stream().map(a -> "--" + (String)a.options().get(0)).collect(Collectors.joining(", "))));
        }
        OptionSpec action = (OptionSpec)allActions.get(0);
        if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) {
            CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)"Please don't specify both --bootstrap-server and --bootstrap-controller");
        } else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) {
            CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)"Please specify either --bootstrap-server or --bootstrap-controller");
        }
        boolean isBootstrapServer = opts.options.has(opts.bootstrapServerOpt);
        HashMap requiredArgs = new HashMap();
        requiredArgs.put(opts.verifyOpt, Collections.singletonList(opts.reassignmentJsonFileOpt));
        requiredArgs.put(opts.generateOpt, Arrays.asList(opts.topicsToMoveJsonFileOpt, opts.brokerListOpt));
        requiredArgs.put(opts.executeOpt, Collections.singletonList(opts.reassignmentJsonFileOpt));
        requiredArgs.put(opts.cancelOpt, Collections.singletonList(opts.reassignmentJsonFileOpt));
        requiredArgs.put(opts.listOpt, Collections.emptyList());
        CommandLineUtils.checkRequiredArgs((OptionParser)opts.parser, (OptionSet)opts.options, (OptionSpec[])((List)requiredArgs.get(action)).toArray(new OptionSpec[0]));
        HashMap permittedArgs = new HashMap();
        permittedArgs.put(opts.verifyOpt, Arrays.asList(opts.bootstrapServerOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt));
        permittedArgs.put(opts.generateOpt, Arrays.asList(opts.bootstrapServerOpt, opts.brokerListOpt, opts.commandConfigOpt, opts.disableRackAware));
        permittedArgs.put(opts.executeOpt, Arrays.asList(opts.additionalOpt, opts.bootstrapServerOpt, opts.commandConfigOpt, opts.interBrokerThrottleOpt, opts.replicaAlterLogDirsThrottleOpt, opts.timeoutOpt));
        permittedArgs.put(opts.cancelOpt, Arrays.asList(isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, opts.commandConfigOpt, opts.preserveThrottlesOpt, opts.timeoutOpt));
        permittedArgs.put(opts.listOpt, Arrays.asList(isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt, opts.commandConfigOpt));
        opts.options.specs().forEach(opt -> {
            if (!(opt.equals(action) || requiredArgs.getOrDefault(action, Collections.emptyList()).contains(opt) || permittedArgs.getOrDefault(action, Collections.emptyList()).contains(opt))) {
                CommandLineUtils.printUsageAndExit((OptionParser)opts.parser, (String)String.format("Option \"%s\" can't be used with action \"%s\"", opt, action));
            }
        });
        return opts;
    }

    static Set<TopicPartitionReplica> alterReplicaLogDirs(Admin adminClient, Map<TopicPartitionReplica, String> assignment) throws InterruptedException {
        HashSet<TopicPartitionReplica> results = new HashSet<TopicPartitionReplica>();
        Map values = adminClient.alterReplicaLogDirs(assignment).values();
        for (Map.Entry e : values.entrySet()) {
            TopicPartitionReplica replica = (TopicPartitionReplica)e.getKey();
            KafkaFuture future = (KafkaFuture)e.getValue();
            try {
                future.get();
                results.add(replica);
            }
            catch (ExecutionException t) {
                if (t.getCause() instanceof ReplicaNotAvailableException) continue;
                throw new AdminCommandFailedException("Failed to alter dir for " + String.valueOf(replica), (Throwable)t);
            }
        }
        return results;
    }
}

