/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RackUtils {
    private static final Logger LOG = LoggerFactory.getLogger(RackUtils.class);

    private RackUtils() {
    }

    public static void annotateTopicPartitionsWithRackInfo(Cluster cluster, InternalTopicManager internalTopicManager, Set<DefaultTaskTopicPartition> topicPartitions) {
        Set<String> topicsToDescribe = topicPartitions.stream().filter(tp -> !tp.isSource()).map(topicPartition -> topicPartition.topicPartition().topic()).collect(Collectors.toSet());
        Set<TopicPartition> nonChangelogTopics = topicPartitions.stream().filter(taskTopicPartition -> !taskTopicPartition.isChangelog()).map(TaskTopicPartition::topicPartition).collect(Collectors.toSet());
        topicsToDescribe.addAll(RackUtils.topicsWithMissingMetadata(cluster, nonChangelogTopics));
        Map<String, List<TopicPartitionInfo>> freshTopicPartitionInfo = RackUtils.describeTopics(internalTopicManager, topicsToDescribe);
        Set<TopicPartition> topicsWithUpToDateMetadata = topicPartitions.stream().map(TaskTopicPartition::topicPartition).filter(topicPartition -> !topicsToDescribe.contains(topicPartition.topic())).collect(Collectors.toSet());
        Map<TopicPartition, Set<String>> racksForTopicPartition = RackUtils.knownRacksForPartition(cluster, topicsWithUpToDateMetadata);
        freshTopicPartitionInfo.forEach((topic, partitionInfos) -> {
            for (TopicPartitionInfo partitionInfo : partitionInfos) {
                int partition = partitionInfo.partition();
                TopicPartition topicPartition = new TopicPartition(topic, partition);
                List replicas = partitionInfo.replicas();
                if (replicas == null || replicas.isEmpty()) {
                    LOG.error("No replicas found for topic partition {}: {}", topic, (Object)partition);
                    continue;
                }
                Set racks = replicas.stream().filter(Node::hasRack).map(Node::rack).collect(Collectors.toSet());
                racksForTopicPartition.computeIfAbsent(topicPartition, k -> new HashSet());
                ((Set)racksForTopicPartition.get(topicPartition)).addAll(racks);
            }
        });
        for (DefaultTaskTopicPartition topicPartition2 : topicPartitions) {
            if (!racksForTopicPartition.containsKey(topicPartition2.topicPartition())) continue;
            Set<String> racks = racksForTopicPartition.get(topicPartition2.topicPartition());
            topicPartition2.annotateWithRackIds(racks);
        }
    }

    public static Set<String> topicsWithMissingMetadata(Cluster cluster, Set<TopicPartition> topicPartitions) {
        HashSet<String> topicsWithStaleMetadata = new HashSet<String>();
        for (TopicPartition topicPartition : topicPartitions) {
            PartitionInfo partitionInfo = cluster.partition(topicPartition);
            if (partitionInfo == null) {
                LOG.error("TopicPartition {} doesn't exist in cluster", (Object)topicPartition);
                continue;
            }
            Node[] replica = partitionInfo.replicas();
            if (replica != null && replica.length != 0) continue;
            topicsWithStaleMetadata.add(topicPartition.topic());
        }
        return topicsWithStaleMetadata;
    }

    public static Map<TopicPartition, Set<String>> knownRacksForPartition(Cluster cluster, Set<TopicPartition> topicPartitions) {
        HashMap<TopicPartition, Set<String>> racksForPartition = new HashMap<TopicPartition, Set<String>>();
        for (TopicPartition topicPartition : topicPartitions) {
            PartitionInfo partitionInfo = cluster.partition(topicPartition);
            if (partitionInfo == null) {
                LOG.error("TopicPartition {} doesn't exist in cluster", (Object)topicPartition);
                continue;
            }
            Node[] replicas = partitionInfo.replicas();
            if (replicas == null || replicas.length == 0) continue;
            Arrays.stream(replicas).filter(node -> !node.hasRack()).forEach(node -> LOG.warn("Node {} for topic partition {} doesn't have rack", node, (Object)topicPartition));
            Set racks = Arrays.stream(replicas).filter(Node::hasRack).map(Node::rack).collect(Collectors.toSet());
            racksForPartition.put(topicPartition, racks);
        }
        return racksForPartition;
    }

    private static Map<String, List<TopicPartitionInfo>> describeTopics(InternalTopicManager internalTopicManager, Set<String> topicsToDescribe) {
        if (topicsToDescribe.isEmpty()) {
            return new HashMap<String, List<TopicPartitionInfo>>();
        }
        LOG.info("Describing topics for rack information: {}", (Object)Arrays.toString(topicsToDescribe.toArray()));
        try {
            Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
            if (topicsToDescribe.size() > topicPartitionInfo.size()) {
                topicsToDescribe.removeAll(topicPartitionInfo.keySet());
                LOG.error("Failed to describe topic for {}", topicsToDescribe);
            }
            return topicPartitionInfo;
        }
        catch (Exception e) {
            LOG.error("Failed to describe topics {}", topicsToDescribe, (Object)e);
            return new HashMap<String, List<TopicPartitionInfo>>();
        }
    }
}

