/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.share;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.share.PersisterStateBatchCombiner;
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
import org.apache.kafka.coordinator.share.ShareCoordinatorOffsetsManager;
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.share.ShareGroupOffset;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;

public class ShareCoordinatorShard
implements CoordinatorShard<CoordinatorRecord> {
    private final Logger log;
    private final ShareCoordinatorConfig config;
    private final CoordinatorMetrics coordinatorMetrics;
    private final CoordinatorMetricsShard metricsShard;
    private final TimelineHashMap<SharePartitionKey, ShareGroupOffset> shareStateMap;
    private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
    private final TimelineHashMap<SharePartitionKey, Integer> snapshotUpdateCount;
    private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
    private MetadataImage metadataImage;
    private final ShareCoordinatorOffsetsManager offsetsManager;
    public static final Exception NULL_TOPIC_ID = new Exception("The topic id cannot be null.");
    public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number.");

    ShareCoordinatorShard(LogContext logContext, ShareCoordinatorConfig config, CoordinatorMetrics coordinatorMetrics, CoordinatorMetricsShard metricsShard, SnapshotRegistry snapshotRegistry) {
        this(logContext, config, coordinatorMetrics, metricsShard, snapshotRegistry, new ShareCoordinatorOffsetsManager(snapshotRegistry));
    }

    ShareCoordinatorShard(LogContext logContext, ShareCoordinatorConfig config, CoordinatorMetrics coordinatorMetrics, CoordinatorMetricsShard metricsShard, SnapshotRegistry snapshotRegistry, ShareCoordinatorOffsetsManager offsetsManager) {
        this.log = logContext.logger(ShareCoordinatorShard.class);
        this.config = config;
        this.coordinatorMetrics = coordinatorMetrics;
        this.metricsShard = metricsShard;
        this.shareStateMap = new TimelineHashMap(snapshotRegistry, 0);
        this.leaderEpochMap = new TimelineHashMap(snapshotRegistry, 0);
        this.snapshotUpdateCount = new TimelineHashMap(snapshotRegistry, 0);
        this.stateEpochMap = new TimelineHashMap(snapshotRegistry, 0);
        this.offsetsManager = offsetsManager;
    }

    public void onLoaded(MetadataImage newImage) {
        this.coordinatorMetrics.activateMetricsShard(this.metricsShard);
    }

    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
        this.metadataImage = newImage;
    }

    public void onUnloaded() {
        this.coordinatorMetrics.deactivateMetricsShard(this.metricsShard);
    }

    public void replay(long offset, long producerId, short producerEpoch, CoordinatorRecord record) throws RuntimeException {
        ApiMessageAndVersion key = record.key();
        ApiMessageAndVersion value = record.value();
        switch (key.version()) {
            case 0: {
                this.handleShareSnapshot((ShareSnapshotKey)key.message(), (ShareSnapshotValue)ShareCoordinatorShard.messageOrNull(value), offset);
                break;
            }
            case 1: {
                this.handleShareUpdate((ShareUpdateKey)key.message(), (ShareUpdateValue)ShareCoordinatorShard.messageOrNull(value));
                break;
            }
        }
    }

    private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue value, long offset) {
        SharePartitionKey mapKey = SharePartitionKey.getInstance((String)key.groupId(), (Uuid)key.topicId(), (int)key.partition());
        this.maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
        this.maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
        ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
        this.shareStateMap.put((Object)mapKey, (Object)offsetRecord);
        if (this.snapshotUpdateCount.containsKey((Object)mapKey) && (Integer)this.snapshotUpdateCount.get((Object)mapKey) >= this.config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
            this.snapshotUpdateCount.put((Object)mapKey, (Object)0);
        }
        this.offsetsManager.updateState(mapKey, offset);
    }

    private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) {
        SharePartitionKey mapKey = SharePartitionKey.getInstance((String)key.groupId(), (Uuid)key.topicId(), (int)key.partition());
        this.maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
        ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
        this.shareStateMap.compute((Object)mapKey, (k, v) -> v == null ? offsetRecord : ShareCoordinatorShard.merge(v, value));
        this.snapshotUpdateCount.compute((Object)mapKey, (k, v) -> v == null ? 0 : v + 1);
    }

    private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int leaderEpoch) {
        this.leaderEpochMap.putIfAbsent((Object)mapKey, (Object)leaderEpoch);
        if ((Integer)this.leaderEpochMap.get((Object)mapKey) < leaderEpoch) {
            this.leaderEpochMap.put((Object)mapKey, (Object)leaderEpoch);
        }
    }

    private void maybeUpdateStateEpochMap(SharePartitionKey mapKey, int stateEpoch) {
        this.stateEpochMap.putIfAbsent((Object)mapKey, (Object)stateEpoch);
        if ((Integer)this.stateEpochMap.get((Object)mapKey) < stateEpoch) {
            this.stateEpochMap.put((Object)mapKey, (Object)stateEpoch);
        }
    }

    public void replayEndTransactionMarker(long producerId, short producerEpoch, TransactionResult result) throws RuntimeException {
        super.replayEndTransactionMarker(producerId, producerEpoch, result);
    }

    public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeState(WriteShareGroupStateRequestData request) {
        this.metricsShard.record("ShareCoordinatorWrite");
        Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> error = this.maybeGetWriteStateError(request);
        if (error.isPresent()) {
            return error.get();
        }
        WriteShareGroupStateRequestData.WriteStateData topicData = (WriteShareGroupStateRequestData.WriteStateData)request.topics().get(0);
        WriteShareGroupStateRequestData.PartitionData partitionData = (WriteShareGroupStateRequestData.PartitionData)topicData.partitions().get(0);
        SharePartitionKey key = SharePartitionKey.getInstance((String)request.groupId(), (Uuid)topicData.topicId(), (int)partitionData.partition());
        CoordinatorRecord record = this.generateShareStateRecord(partitionData, key);
        WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult((Uuid)key.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult((int)key.partition())))));
        return new CoordinatorResult(Collections.singletonList(record), (Object)responseData);
    }

    public CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> readStateAndMaybeUpdateLeaderEpoch(ReadShareGroupStateRequestData request) {
        Optional<ReadShareGroupStateResponseData> error = this.maybeGetReadStateError(request);
        if (error.isPresent()) {
            return new CoordinatorResult(Collections.emptyList(), (Object)error.get());
        }
        ReadShareGroupStateRequestData.ReadStateData topicData = (ReadShareGroupStateRequestData.ReadStateData)request.topics().get(0);
        ReadShareGroupStateRequestData.PartitionData partitionData = (ReadShareGroupStateRequestData.PartitionData)topicData.partitions().get(0);
        Uuid topicId = topicData.topicId();
        int partitionId = partitionData.partition();
        int leaderEpoch = partitionData.leaderEpoch();
        SharePartitionKey key = SharePartitionKey.getInstance((String)request.groupId(), (Uuid)topicId, (int)partitionId);
        ReadShareGroupStateResponseData responseData = null;
        if (!this.shareStateMap.containsKey((Object)key)) {
            responseData = ReadShareGroupStateResponse.toResponseData((Uuid)topicId, (int)partitionId, (long)-1L, (int)0, Collections.emptyList());
        } else {
            ShareGroupOffset offsetValue = (ShareGroupOffset)this.shareStateMap.get((Object)key);
            List stateBatches = offsetValue.stateBatches() != null && !offsetValue.stateBatches().isEmpty() ? offsetValue.stateBatches().stream().map(stateBatch -> new ReadShareGroupStateResponseData.StateBatch().setFirstOffset(stateBatch.firstOffset()).setLastOffset(stateBatch.lastOffset()).setDeliveryState(stateBatch.deliveryState()).setDeliveryCount(stateBatch.deliveryCount())).collect(Collectors.toList()) : Collections.emptyList();
            responseData = ReadShareGroupStateResponse.toResponseData((Uuid)topicId, (int)partitionId, (long)offsetValue.startOffset(), (int)offsetValue.stateEpoch(), stateBatches);
        }
        if (leaderEpoch == -1 || this.leaderEpochMap.get((Object)key) != null && (Integer)this.leaderEpochMap.get((Object)key) == leaderEpoch) {
            return new CoordinatorResult(Collections.emptyList(), (Object)responseData);
        }
        this.log.info("Read with leader epoch update call for key {} having new leader epoch {}.", (Object)key, (Object)leaderEpoch);
        this.metricsShard.record("ShareCoordinatorWrite");
        WriteShareGroupStateRequestData.PartitionData writePartitionData = new WriteShareGroupStateRequestData.PartitionData().setPartition(partitionId).setLeaderEpoch(leaderEpoch).setStateBatches(Collections.emptyList()).setStartOffset(((ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)responseData.results().get(0)).partitions().get(0)).startOffset()).setStateEpoch(((ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)responseData.results().get(0)).partitions().get(0)).stateEpoch());
        CoordinatorRecord record = this.generateShareStateRecord(writePartitionData, key);
        return new CoordinatorResult(Collections.singletonList(record), (Object)responseData);
    }

    public CoordinatorResult<Optional<Long>, CoordinatorRecord> lastRedundantOffset() {
        return new CoordinatorResult(Collections.emptyList(), this.offsetsManager.lastRedundantOffset());
    }

    private CoordinatorRecord generateShareStateRecord(WriteShareGroupStateRequestData.PartitionData partitionData, SharePartitionKey key) {
        if (!this.shareStateMap.containsKey((Object)key)) {
            return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(key.groupId(), key.topicId(), partitionData.partition(), new ShareGroupOffset.Builder().setSnapshotEpoch(0).setStartOffset(partitionData.startOffset()).setLeaderEpoch(partitionData.leaderEpoch()).setStateEpoch(partitionData.stateEpoch()).setStateBatches(this.mergeBatches(Collections.emptyList(), partitionData)).build());
        }
        if ((Integer)this.snapshotUpdateCount.getOrDefault((Object)key, (Object)0) >= this.config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
            ShareGroupOffset currentState = (ShareGroupOffset)this.shareStateMap.get((Object)key);
            int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? currentState.leaderEpoch() : partitionData.leaderEpoch();
            int newStateEpoch = partitionData.stateEpoch() == -1 ? currentState.stateEpoch() : partitionData.stateEpoch();
            long newStartOffset = partitionData.startOffset() == -1L ? currentState.startOffset() : partitionData.startOffset();
            return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(key.groupId(), key.topicId(), partitionData.partition(), new ShareGroupOffset.Builder().setSnapshotEpoch(currentState.snapshotEpoch() + 1).setStartOffset(newStartOffset).setLeaderEpoch(newLeaderEpoch).setStateEpoch(newStateEpoch).setStateBatches(this.mergeBatches(currentState.stateBatches(), partitionData, newStartOffset)).build());
        }
        ShareGroupOffset currentState = (ShareGroupOffset)this.shareStateMap.get((Object)key);
        return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(key.groupId(), key.topicId(), partitionData.partition(), new ShareGroupOffset.Builder().setSnapshotEpoch(currentState.snapshotEpoch()).setStartOffset(partitionData.startOffset()).setLeaderEpoch(partitionData.leaderEpoch()).setStateBatches(this.mergeBatches(Collections.emptyList(), partitionData)).build());
    }

    private List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> soFar, WriteShareGroupStateRequestData.PartitionData partitionData) {
        return this.mergeBatches(soFar, partitionData, partitionData.startOffset());
    }

    private List<PersisterStateBatch> mergeBatches(List<PersisterStateBatch> soFar, WriteShareGroupStateRequestData.PartitionData partitionData, long startOffset) {
        return new PersisterStateBatchCombiner(soFar, partitionData.stateBatches().stream().map(PersisterStateBatch::from).collect(Collectors.toList()), startOffset).combineStateBatches();
    }

    private Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> maybeGetWriteStateError(WriteShareGroupStateRequestData request) {
        String groupId = request.groupId();
        WriteShareGroupStateRequestData.WriteStateData topicData = (WriteShareGroupStateRequestData.WriteStateData)request.topics().get(0);
        WriteShareGroupStateRequestData.PartitionData partitionData = (WriteShareGroupStateRequestData.PartitionData)topicData.partitions().get(0);
        Uuid topicId = topicData.topicId();
        int partitionId = partitionData.partition();
        if (topicId == null) {
            return Optional.of(this.getWriteErrorResponse(Errors.INVALID_REQUEST, NULL_TOPIC_ID, null, partitionId));
        }
        if (partitionId < 0) {
            return Optional.of(this.getWriteErrorResponse(Errors.INVALID_REQUEST, NEGATIVE_PARTITION_ID, topicId, partitionId));
        }
        SharePartitionKey mapKey = SharePartitionKey.getInstance((String)groupId, (Uuid)topicId, (int)partitionId);
        if (partitionData.leaderEpoch() != -1 && this.leaderEpochMap.containsKey((Object)mapKey) && (Integer)this.leaderEpochMap.get((Object)mapKey) > partitionData.leaderEpoch()) {
            this.log.error("Request leader epoch smaller than last recorded.");
            return Optional.of(this.getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId));
        }
        if (partitionData.stateEpoch() != -1 && this.stateEpochMap.containsKey((Object)mapKey) && (Integer)this.stateEpochMap.get((Object)mapKey) > partitionData.stateEpoch()) {
            this.log.error("Request state epoch smaller than last recorded.");
            return Optional.of(this.getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId));
        }
        if (this.metadataImage == null) {
            this.log.error("Metadata image is null");
            return Optional.of(this.getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
        }
        if (this.metadataImage.topics().getTopic(topicId) == null || this.metadataImage.topics().getPartition(topicId, partitionId) == null) {
            this.log.error("Topic/TopicPartition not found in metadata image.");
            return Optional.of(this.getWriteErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, null, topicId, partitionId));
        }
        return Optional.empty();
    }

    private Optional<ReadShareGroupStateResponseData> maybeGetReadStateError(ReadShareGroupStateRequestData request) {
        String groupId = request.groupId();
        ReadShareGroupStateRequestData.ReadStateData topicData = (ReadShareGroupStateRequestData.ReadStateData)request.topics().get(0);
        ReadShareGroupStateRequestData.PartitionData partitionData = (ReadShareGroupStateRequestData.PartitionData)topicData.partitions().get(0);
        Uuid topicId = topicData.topicId();
        int partitionId = partitionData.partition();
        if (topicId == null) {
            this.log.error("Request topic id is null.");
            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(null, (int)partitionId, (Errors)Errors.INVALID_REQUEST, (String)NULL_TOPIC_ID.getMessage()));
        }
        if (partitionId < 0) {
            this.log.error("Request partition id is negative.");
            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData((Uuid)topicId, (int)partitionId, (Errors)Errors.INVALID_REQUEST, (String)NEGATIVE_PARTITION_ID.getMessage()));
        }
        SharePartitionKey mapKey = SharePartitionKey.getInstance((String)groupId, (Uuid)topicId, (int)partitionId);
        if (this.leaderEpochMap.containsKey((Object)mapKey) && (Integer)this.leaderEpochMap.get((Object)mapKey) > partitionData.leaderEpoch()) {
            this.log.error("Request leader epoch id is smaller than last recorded.");
            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData((Uuid)topicId, (int)partitionId, (Errors)Errors.FENCED_LEADER_EPOCH, (String)Errors.FENCED_LEADER_EPOCH.message()));
        }
        if (this.metadataImage == null) {
            this.log.error("Metadata image is null");
            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData((Uuid)topicId, (int)partitionId, (Errors)Errors.UNKNOWN_TOPIC_OR_PARTITION, (String)Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
        }
        if (this.metadataImage.topics().getTopic(topicId) == null || this.metadataImage.topics().getPartition(topicId, partitionId) == null) {
            this.log.error("Topic/TopicPartition not found in metadata image.");
            return Optional.of(ReadShareGroupStateResponse.toErrorResponseData((Uuid)topicId, (int)partitionId, (Errors)Errors.UNKNOWN_TOPIC_OR_PARTITION, (String)Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
        }
        return Optional.empty();
    }

    private CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> getWriteErrorResponse(Errors error, Exception exception, Uuid topicId, int partitionId) {
        String message = exception == null ? error.message() : exception.getMessage();
        WriteShareGroupStateResponseData responseData = WriteShareGroupStateResponse.toErrorResponseData((Uuid)topicId, (int)partitionId, (Errors)error, (String)message);
        return new CoordinatorResult(Collections.emptyList(), (Object)responseData);
    }

    Integer getLeaderMapValue(SharePartitionKey key) {
        return (Integer)this.leaderEpochMap.get((Object)key);
    }

    Integer getStateEpochMapValue(SharePartitionKey key) {
        return (Integer)this.stateEpochMap.get((Object)key);
    }

    ShareGroupOffset getShareStateMapValue(SharePartitionKey key) {
        return (ShareGroupOffset)this.shareStateMap.get((Object)key);
    }

    CoordinatorMetricsShard getMetricsShard() {
        return this.metricsShard;
    }

    private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue newData) {
        List<PersisterStateBatch> currentBatches = soFar.stateBatches();
        long newStartOffset = newData.startOffset() == -1L ? soFar.startOffset() : newData.startOffset();
        int newLeaderEpoch = newData.leaderEpoch() == -1 ? soFar.leaderEpoch() : newData.leaderEpoch();
        return new ShareGroupOffset.Builder().setSnapshotEpoch(soFar.snapshotEpoch()).setStateEpoch(soFar.stateEpoch()).setStartOffset(newStartOffset).setLeaderEpoch(newLeaderEpoch).setStateBatches(new PersisterStateBatchCombiner(currentBatches, newData.stateBatches().stream().map(ShareCoordinatorShard::toPersisterStateBatch).collect(Collectors.toList()), newStartOffset).combineStateBatches()).build();
    }

    private static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
        if (apiMessageAndVersion == null) {
            return null;
        }
        return apiMessageAndVersion.message();
    }

    private static PersisterStateBatch toPersisterStateBatch(ShareUpdateValue.StateBatch batch) {
        return new PersisterStateBatch(batch.firstOffset(), batch.lastOffset(), batch.deliveryState(), batch.deliveryCount());
    }

    public static class Builder
    implements CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
        private ShareCoordinatorConfig config;
        private LogContext logContext;
        private SnapshotRegistry snapshotRegistry;
        private CoordinatorMetrics coordinatorMetrics;
        private TopicPartition topicPartition;

        public Builder(ShareCoordinatorConfig config) {
            this.config = config;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withTime(Time time) {
            return this;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
            return this;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withExecutor(CoordinatorExecutor<CoordinatorRecord> executor) {
            return this;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withCoordinatorMetrics(CoordinatorMetrics coordinatorMetrics) {
            this.coordinatorMetrics = coordinatorMetrics;
            return this;
        }

        public CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> withTopicPartition(TopicPartition topicPartition) {
            this.topicPartition = topicPartition;
            return this;
        }

        public ShareCoordinatorShard build() {
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.config == null) {
                throw new IllegalArgumentException("Config must be set.");
            }
            if (this.snapshotRegistry == null) {
                throw new IllegalArgumentException("SnapshotRegistry must be set.");
            }
            if (this.coordinatorMetrics == null || !(this.coordinatorMetrics instanceof ShareCoordinatorMetrics)) {
                throw new IllegalArgumentException("CoordinatorMetrics must be set and be of type ShareCoordinatorMetrics.");
            }
            if (this.topicPartition == null) {
                throw new IllegalArgumentException("TopicPartition must be set.");
            }
            ShareCoordinatorMetricsShard metricsShard = ((ShareCoordinatorMetrics)this.coordinatorMetrics).newMetricsShard(this.snapshotRegistry, this.topicPartition);
            return new ShareCoordinatorShard(this.logContext, this.config, this.coordinatorMetrics, metricsShard, this.snapshotRegistry);
        }
    }
}

