package org.eclipse.hono.client.kafka.consumer;

import com.fasterxml.jackson.core.JsonLocation;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.11.3.jar:org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer.class */
public class AsyncHandlingAutoCommitKafkaConsumer extends HonoKafkaConsumer {
    public static final String CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS = "hono.offsets.skip.recommit.period.seconds";
    public static final String CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS = "hono.offsets.commit.record.completion.timeout.millis";
    public static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
    public static final Duration DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD = Duration.ofHours(1);
    public static final Duration DEFAULT_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT = Duration.ofMillis(300);
    public static final Duration MAX_POLL_PAUSE = Duration.ofMillis(200);
    public static final int THROTTLING_THRESHOLD_PERCENTAGE_OF_MAX_POLL_RECORDS = 50;
    private final int throttlingThreshold;
    private final int throttlingResumeDelta;
    private final long commitIntervalMillis;
    private final long skipOffsetRecommitPeriodSeconds;
    private final long offsetsCommitRecordCompletionTimeoutMillis;
    private final Map<TopicPartition, TopicPartitionOffsets> offsetsMap;
    private final Map<TopicPartition, Long> lastKnownCommittedOffsets;
    private final AtomicBoolean periodicCommitInvocationInProgress;
    private final AtomicBoolean periodicCommitRetryAfterRebalanceNeeded;
    private final AtomicBoolean skipPeriodicCommit;
    private final AtomicInteger recordsInProcessingCounter;
    private final AtomicInteger recordsLeftInBatchCounter;
    private final AtomicReference<UncompletedRecordsCompletionLatch> uncompletedRecordsCompletionLatchRef;
    private Instant fetchingPauseStartTime;
    private Long periodicCommitTimerId;
    private Instant lastPollInstant;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.11.3.jar:org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer$OffsetsQueueEntry.class */
    public static class OffsetsQueueEntry {
        private final long offset;
        private final AtomicBoolean handlingComplete = new AtomicBoolean();

        OffsetsQueueEntry(long j) {
            this.offset = j;
        }

        public long getOffset() {
            return this.offset;
        }

        public void setHandlingComplete() {
            this.handlingComplete.set(true);
        }

        public boolean isHandlingComplete() {
            return this.handlingComplete.get();
        }

        public String toString() {
            long j = this.offset;
            if (this.handlingComplete.get()) {
            }
            return j + j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.11.3.jar:org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer$TopicPartitionOffsets.class */
    public class TopicPartitionOffsets {
        private static final long UNDEFINED_OFFSET = -2;
        private final TopicPartition topicPartition;
        private final Deque<OffsetsQueueEntry> queue;
        private long lastSequentiallyCompletedOffset;
        private long lastCommittedOffset;
        private Instant lastCommitTime;

        TopicPartitionOffsets(TopicPartition topicPartition) {
            this.queue = new LinkedList();
            this.lastSequentiallyCompletedOffset = -2L;
            this.lastCommittedOffset = -2L;
            this.topicPartition = (TopicPartition) Objects.requireNonNull(topicPartition);
        }

        TopicPartitionOffsets(AsyncHandlingAutoCommitKafkaConsumer asyncHandlingAutoCommitKafkaConsumer, TopicPartition topicPartition, long j, boolean z) {
            this(topicPartition);
            this.lastSequentiallyCompletedOffset = j - 1;
            this.lastCommittedOffset = z ? this.lastSequentiallyCompletedOffset : -2L;
        }

        public OffsetsQueueEntry addOffset(long j) {
            cleanupAndUpdateLastCompletedOffset();
            OffsetsQueueEntry offsetsQueueEntry = new OffsetsQueueEntry(j);
            this.queue.add(offsetsQueueEntry);
            return offsetsQueueEntry;
        }

        public Optional<Long> getLastSequentiallyCompletedOffsetForCommit() {
            cleanupAndUpdateLastCompletedOffset();
            if (this.lastSequentiallyCompletedOffset == -2) {
                return Optional.empty();
            }
            if (!this.queue.isEmpty()) {
                AsyncHandlingAutoCommitKafkaConsumer.this.log.trace("getOffsetsToCommit: offset {} to use for commit is {} entries behind last received offset {}; partition [{}]", Long.valueOf(this.lastSequentiallyCompletedOffset), Integer.valueOf(this.queue.size()), Long.valueOf(this.queue.getLast().getOffset()), this.topicPartition);
            }
            if (this.lastSequentiallyCompletedOffset != this.lastCommittedOffset) {
                return Optional.of(Long.valueOf(this.lastSequentiallyCompletedOffset));
            }
            if (this.lastCommitTime == null || !this.lastCommitTime.isBefore(Instant.now().minusSeconds(AsyncHandlingAutoCommitKafkaConsumer.this.skipOffsetRecommitPeriodSeconds))) {
                return Optional.empty();
            }
            AsyncHandlingAutoCommitKafkaConsumer.this.log.trace("getOffsetsToCommit: offset {} will be recommitted (last commit {} too long ago); partition [{}]", Long.valueOf(this.lastSequentiallyCompletedOffset), this.lastCommitTime, this.topicPartition);
            return Optional.of(Long.valueOf(this.lastSequentiallyCompletedOffset));
        }

        private void cleanupAndUpdateLastCompletedOffset() {
            while (((Boolean) Optional.ofNullable(this.queue.peek()).map((v0) -> {
                return v0.isHandlingComplete();
            }).orElse(false)).booleanValue()) {
                this.lastSequentiallyCompletedOffset = this.queue.remove().getOffset();
            }
        }

        public void setLastCommittedOffset(long j) {
            if (j >= this.lastCommittedOffset) {
                this.lastCommitTime = Instant.now();
                this.lastCommittedOffset = j;
            }
        }

        public boolean allCompleted() {
            cleanupAndUpdateLastCompletedOffset();
            return this.queue.isEmpty();
        }

        public boolean needsCommit() {
            cleanupAndUpdateLastCompletedOffset();
            return (this.lastSequentiallyCompletedOffset == -2 || this.lastSequentiallyCompletedOffset == this.lastCommittedOffset) ? false : true;
        }

        public String getStateInfo() {
            cleanupAndUpdateLastCompletedOffset();
            return "{lastSequentiallyCompletedOffset=" + getOffsetString(this.lastSequentiallyCompletedOffset) + ", lastCommittedOffset=" + getOffsetString(this.lastCommittedOffset) + (this.queue.size() <= 20 ? ", queue=" + this.queue : ", queue.size=" + this.queue.size()) + "}";
        }

        private String getOffsetString(long j) {
            return j == -2 ? "undefined" : Long.toString(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/hono-client-kafka-common-1.11.3.jar:org/eclipse/hono/client/kafka/consumer/AsyncHandlingAutoCommitKafkaConsumer$UncompletedRecordsCompletionLatch.class */
    public static class UncompletedRecordsCompletionLatch {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final Map<TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions;

        UncompletedRecordsCompletionLatch(Map<TopicPartition, TopicPartitionOffsets> map) {
            this.uncompletedRecordsPartitions = map;
        }

        public void onRecordHandlingCompleted(TopicPartition topicPartition) {
            TopicPartitionOffsets topicPartitionOffsets = this.uncompletedRecordsPartitions.get(topicPartition);
            if (topicPartitionOffsets == null || !topicPartitionOffsets.allCompleted()) {
                return;
            }
            this.uncompletedRecordsPartitions.remove(topicPartition);
            if (this.uncompletedRecordsPartitions.isEmpty()) {
                this.latch.countDown();
            }
        }

        public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.latch.await(j, timeUnit);
        }
    }

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Set<String> set, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> function, Map<String, String> map) {
        this(new AtomicReference(), vertx, set, null, function, map);
    }

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Pattern pattern, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> function, Map<String, String> map) {
        this(new AtomicReference(), vertx, null, pattern, function, map);
    }

    private AsyncHandlingAutoCommitKafkaConsumer(AtomicReference<AsyncHandlingAutoCommitKafkaConsumer> atomicReference, Vertx vertx, Set<String> set, Pattern pattern, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> function, Map<String, String> map) {
        super(vertx, set, pattern, kafkaConsumerRecord -> {
            ((AsyncHandlingAutoCommitKafkaConsumer) atomicReference.getPlain()).handleRecord(kafkaConsumerRecord, function);
        }, validateAndAdaptConsumerConfig(map));
        this.offsetsMap = new HashMap();
        this.lastKnownCommittedOffsets = new HashMap();
        this.periodicCommitInvocationInProgress = new AtomicBoolean();
        this.periodicCommitRetryAfterRebalanceNeeded = new AtomicBoolean();
        this.skipPeriodicCommit = new AtomicBoolean();
        this.recordsInProcessingCounter = new AtomicInteger();
        this.recordsLeftInBatchCounter = new AtomicInteger();
        this.uncompletedRecordsCompletionLatchRef = new AtomicReference<>();
        this.fetchingPauseStartTime = Instant.MAX;
        this.lastPollInstant = Instant.EPOCH;
        atomicReference.setPlain(this);
        this.throttlingThreshold = Math.max((getMaxPollRecordsConfig(map) * 50) / 100, 1);
        this.throttlingResumeDelta = (this.throttlingThreshold * 5) / 100;
        this.commitIntervalMillis = getCommitInterval(map);
        this.skipOffsetRecommitPeriodSeconds = getSkipOffsetRecommitPeriodSeconds(map);
        this.offsetsCommitRecordCompletionTimeoutMillis = getOffsetsCommitRecordCompletionTimeoutMillis(map);
    }

    @Override // org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer
    protected final void onBatchOfRecordsReceived(KafkaConsumerRecords<String, Buffer> kafkaConsumerRecords) {
        this.recordsLeftInBatchCounter.set(kafkaConsumerRecords.size());
        this.lastPollInstant = Instant.now();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRecord(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord, Function<KafkaConsumerRecord<String, Buffer>, Future<Void>> function) {
        int decrementAndGet = this.recordsLeftInBatchCounter.decrementAndGet();
        int incrementAndGet = this.recordsInProcessingCounter.incrementAndGet();
        if (incrementAndGet >= this.throttlingThreshold) {
            if (this.lastPollInstant.plus((TemporalAmount) MAX_POLL_PAUSE).isAfter(Instant.now()) && ((decrementAndGet > 0 || incrementAndGet == this.throttlingThreshold) && pauseRecordHandlingAndPolling(MAX_POLL_PAUSE))) {
                this.log.debug("paused consumer record handling/polling; no. of records in processing: {}, throttling threshold: {} [client-id: {}]", Integer.valueOf(incrementAndGet), Integer.valueOf(this.throttlingThreshold), getClientId());
            } else if (decrementAndGet == 0 && pauseRecordFetching()) {
                this.log.info("suspending record fetching; no. of records in processing: {}, throttling threshold: {} [client-id: {}]", Integer.valueOf(incrementAndGet), Integer.valueOf(this.throttlingThreshold), getClientId());
                this.fetchingPauseStartTime = Instant.now();
            }
        }
        TopicPartition topicPartition = new TopicPartition(kafkaConsumerRecord.topic(), kafkaConsumerRecord.partition());
        OffsetsQueueEntry recordReceived = setRecordReceived(kafkaConsumerRecord.offset(), topicPartition);
        try {
            function.apply(kafkaConsumerRecord).onComplete2(asyncResult -> {
                setRecordHandlingComplete(recordReceived, topicPartition);
            });
        } catch (Exception e) {
            this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}] [client-id: {}]", kafkaConsumerRecord.topic(), Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset()), kafkaConsumerRecord.headers(), getClientId(), e);
            setRecordHandlingComplete(recordReceived, topicPartition);
        }
    }

    private void setRecordHandlingComplete(OffsetsQueueEntry offsetsQueueEntry, TopicPartition topicPartition) {
        offsetsQueueEntry.setHandlingComplete();
        synchronized (this.uncompletedRecordsCompletionLatchRef) {
            Optional.ofNullable(this.uncompletedRecordsCompletionLatchRef.get()).ifPresent(uncompletedRecordsCompletionLatch -> {
                uncompletedRecordsCompletionLatch.onRecordHandlingCompleted(topicPartition);
            });
        }
        int decrementAndGet = this.recordsInProcessingCounter.decrementAndGet();
        if (decrementAndGet <= this.throttlingThreshold && resumeRecordFetching()) {
            this.log.info("resumed consumer record fetching after {}ms; current no. of records in processing: {} [client-id: {}]", Long.valueOf(Duration.between(this.fetchingPauseStartTime, Instant.now()).toMillis()), Integer.valueOf(decrementAndGet), getClientId());
        } else {
            if (decrementAndGet > this.throttlingThreshold - this.throttlingResumeDelta || !resumeRecordHandlingAndPolling()) {
                return;
            }
            this.log.debug("resumed consumer record polling; current no. of records in processing: {} [client-id: {}]", Integer.valueOf(decrementAndGet), getClientId());
        }
    }

    private static Map<String, String> validateAndAdaptConsumerConfig(Map<String, String> map) {
        if (Strings.isNullOrEmpty(map.get("group.id"))) {
            throw new IllegalArgumentException("group.id config entry has to be set");
        }
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return map;
    }

    private int getMaxPollRecordsConfig(Map<String, String> map) {
        return ((Integer) Optional.ofNullable(map.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).map(Integer::parseInt).orElse(Integer.valueOf(JsonLocation.MAX_CONTENT_SNIPPET))).intValue();
    }

    private static long getCommitInterval(Map<String, String> map) {
        return ((Long) Optional.ofNullable(map.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)).map(Long::parseLong).orElse(Long.valueOf(DEFAULT_COMMIT_INTERVAL.toMillis()))).longValue();
    }

    private static long getSkipOffsetRecommitPeriodSeconds(Map<String, String> map) {
        return ((Long) Optional.ofNullable(map.get(CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS)).map(Long::parseLong).orElse(Long.valueOf(DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD.toSeconds()))).longValue();
    }

    private static long getOffsetsCommitRecordCompletionTimeoutMillis(Map<String, String> map) {
        return ((Long) Optional.ofNullable(map.get(CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS)).map(Long::parseUnsignedLong).orElse(Long.valueOf(DEFAULT_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT.toMillis()))).longValue();
    }

    @Override // org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer
    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
        setRecordReceived(kafkaConsumerRecord.offset(), new TopicPartition(kafkaConsumerRecord.topic(), kafkaConsumerRecord.partition())).setHandlingComplete();
    }

    @Override // org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer, org.eclipse.hono.util.Lifecycle
    public Future<Void> start() {
        return super.start().onComplete2(asyncResult -> {
            this.periodicCommitTimerId = Long.valueOf(this.vertx.setPeriodic(this.commitIntervalMillis, l -> {
                doPeriodicCommit();
            }));
        });
    }

    @Override // org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer, org.eclipse.hono.util.Lifecycle
    public Future<Void> stop() {
        if (this.periodicCommitTimerId != null) {
            this.vertx.cancelTimer(this.periodicCommitTimerId.longValue());
        }
        return super.stop().onComplete2(asyncResult -> {
            clearObsoleteTopicPartitionOffsets(List.of());
        });
    }

    @Override // org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer
    protected void onPartitionsAssignedBlocking(Set<io.vertx.kafka.client.common.TopicPartition> set) {
        clearObsoleteTopicPartitionOffsets(getUnderlyingConsumer().assignment());
        if (this.topicPattern != null) {
            Set<String> subscribedTopicPatternTopics = getSubscribedTopicPatternTopics();
            this.lastKnownCommittedOffsets.entrySet().removeIf(entry -> {
                return !subscribedTopicPatternTopics.contains(((TopicPartition) entry.getKey()).topic());
            });
        }
        if (!set.isEmpty() && isAutoOffsetResetConfigLatest()) {
            ensureOffsetCommitsExistForNewlyAssignedPartitions(set);
        }
        this.skipPeriodicCommit.set(false);
        if (this.periodicCommitRetryAfterRebalanceNeeded.get()) {
            runOnContext(r5 -> {
                if (this.periodicCommitRetryAfterRebalanceNeeded.compareAndSet(true, false)) {
                    doPeriodicCommit();
                }
            });
        }
    }

    private synchronized void ensureOffsetCommitsExistForNewlyAssignedPartitions(Set<io.vertx.kafka.client.common.TopicPartition> set) {
        LinkedList linkedList = new LinkedList();
        fetchCommittedOffsetsOnPartitionsAssigned((Set) set.stream().map(Helper::to).filter(topicPartition -> {
            return !this.offsetsMap.containsKey(topicPartition) && this.lastKnownCommittedOffsets.get(topicPartition) == null;
        }).collect(Collectors.toSet()));
        set.stream().map(Helper::to).filter(topicPartition2 -> {
            return !this.offsetsMap.containsKey(topicPartition2);
        }).forEach(topicPartition3 -> {
            try {
                long position = getUnderlyingConsumer().position(topicPartition3);
                boolean booleanValue = ((Boolean) Optional.ofNullable(this.lastKnownCommittedOffsets.get(topicPartition3)).map(l -> {
                    return Boolean.valueOf(l.equals(Long.valueOf(position)));
                }).orElse(false)).booleanValue();
                if (!booleanValue) {
                    linkedList.add(topicPartition3);
                }
                this.offsetsMap.put(topicPartition3, new TopicPartitionOffsets(this, topicPartition3, position, booleanValue));
            } catch (Exception e) {
                this.log.warn("error fetching position for newly assigned partition [{}] [client-id: {}]", topicPartition3, getClientId(), e);
            }
        });
        if (!this.log.isDebugEnabled() || linkedList.isEmpty()) {
            return;
        }
        this.log.debug("onPartitionsAssigned: partitions to be part of next offset commit: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(linkedList));
    }

    private void fetchCommittedOffsetsOnPartitionsAssigned(Set<TopicPartition> set) {
        if (set.isEmpty()) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("onPartitionsAssigned: fetching committed offsets for [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(set));
        }
        try {
            getUnderlyingConsumer().committed(set).forEach((topicPartition, offsetAndMetadata) -> {
                if (offsetAndMetadata != null) {
                    this.lastKnownCommittedOffsets.put(topicPartition, Long.valueOf(offsetAndMetadata.offset()));
                } else {
                    this.lastKnownCommittedOffsets.remove(topicPartition);
                }
            });
        } catch (Exception e) {
            this.log.warn("error fetching committed offsets for newly assigned partitions [{}] [client-id: {}]", HonoKafkaConsumerHelper.getPartitionsDebugString(set), getClientId(), e);
        }
    }

    @Override // org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer
    protected void onPartitionsRevokedBlocking(Set<io.vertx.kafka.client.common.TopicPartition> set) {
        this.skipPeriodicCommit.set(true);
        if (!set.isEmpty() && this.offsetsCommitRecordCompletionTimeoutMillis > 0) {
            UncompletedRecordsCompletionLatch uncompletedRecordsCompletionLatch = null;
            synchronized (this.uncompletedRecordsCompletionLatchRef) {
                Map<TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions = getUncompletedRecordsPartitions(Helper.to(set));
                if (!uncompletedRecordsPartitions.isEmpty()) {
                    Logger logger = this.log;
                    Object[] objArr = new Object[3];
                    objArr[0] = Long.valueOf(this.offsetsCommitRecordCompletionTimeoutMillis);
                    objArr[1] = uncompletedRecordsPartitions.size() <= 10 ? uncompletedRecordsPartitions.keySet() : uncompletedRecordsPartitions.size() + " partitions";
                    objArr[2] = getClientId();
                    logger.info("init latch to wait up to {}ms for the completion of record handling concerning {} [client-id: {}]", objArr);
                    uncompletedRecordsCompletionLatch = new UncompletedRecordsCompletionLatch(uncompletedRecordsPartitions);
                    this.uncompletedRecordsCompletionLatchRef.set(uncompletedRecordsCompletionLatch);
                }
            }
            try {
                if (uncompletedRecordsCompletionLatch != null) {
                    try {
                        if (uncompletedRecordsCompletionLatch.await(this.offsetsCommitRecordCompletionTimeoutMillis, TimeUnit.MILLISECONDS)) {
                            this.log.trace("latch to wait for the completion of record handling was released in time");
                        } else {
                            this.log.info("timed out waiting for record handling to finish after {}ms [client-id: {}]", Long.valueOf(this.offsetsCommitRecordCompletionTimeoutMillis), getClientId());
                        }
                        this.uncompletedRecordsCompletionLatchRef.set(null);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.uncompletedRecordsCompletionLatchRef.set(null);
                    }
                }
            } catch (Throwable th) {
                this.uncompletedRecordsCompletionLatchRef.set(null);
                throw th;
            }
        }
        commitOffsetsSync();
    }

    private void commitOffsetsSync() {
        if (Vertx.currentContext() != null) {
            throw new IllegalStateException("must be run on the polling thread");
        }
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit();
        if (offsetsToCommit.isEmpty()) {
            this.log.trace("skip commitSync - no offsets to commit");
            return;
        }
        try {
            if (this.log.isTraceEnabled()) {
                this.log.trace("commitSync; offsets: [{}]", HonoKafkaConsumerHelper.getOffsetsDebugString(offsetsToCommit));
            }
            getUnderlyingConsumer().commitSync(offsetsToCommit);
            setCommittedOffsets(offsetsToCommit);
            this.log.trace("commitSync succeeded");
        } catch (Exception e) {
            this.log.warn("commit failed: {} [client-id: {}]", e, getClientId());
        }
    }

    private void doPeriodicCommit() {
        if (this.skipPeriodicCommit.get()) {
            return;
        }
        this.periodicCommitRetryAfterRebalanceNeeded.set(false);
        if (this.periodicCommitInvocationInProgress.compareAndSet(false, true)) {
            runOnKafkaWorkerThread(r6 -> {
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit();
                if (offsetsToCommit.isEmpty()) {
                    this.log.trace("skip periodic commit - no offsets to commit");
                } else {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("do periodic commit; offsets: [{}]", HonoKafkaConsumerHelper.getOffsetsDebugString(offsetsToCommit));
                    }
                    try {
                        getUnderlyingConsumer().commitAsync(offsetsToCommit, (map, exc) -> {
                            if (exc instanceof RebalanceInProgressException) {
                                this.log.debug("could not do periodic commit: {} [client-id: {}]", exc, getClientId());
                                if (isCooperativeRebalancingConfigured()) {
                                    this.periodicCommitRetryAfterRebalanceNeeded.set(true);
                                    return;
                                }
                                return;
                            }
                            if (exc != null) {
                                this.log.info("periodic commit failed: {} [client-id: {}]", exc, getClientId());
                            } else {
                                this.log.trace("periodic commit succeeded");
                                setCommittedOffsets(map);
                            }
                        });
                    } catch (Exception e) {
                        this.log.error("error doing periodic commit [client-id: {}]", getClientId(), e);
                    }
                }
                this.periodicCommitInvocationInProgress.set(false);
            });
        } else {
            this.log.trace("periodic commit already triggered, skipping invocation");
        }
    }

    private synchronized Map<TopicPartition, TopicPartitionOffsets> getUncompletedRecordsPartitions(Set<TopicPartition> set) {
        return (Map) this.offsetsMap.entrySet().stream().filter(entry -> {
            return set.contains(entry.getKey()) && !((TopicPartitionOffsets) entry.getValue()).allCompleted();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private synchronized OffsetsQueueEntry setRecordReceived(long j, TopicPartition topicPartition) {
        return this.offsetsMap.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new TopicPartitionOffsets(topicPartition);
        }).addOffset(j);
    }

    private synchronized void clearObsoleteTopicPartitionOffsets(Collection<TopicPartition> collection) {
        Objects.requireNonNull(collection);
        Iterator<Map.Entry<TopicPartition, TopicPartitionOffsets>> it = this.offsetsMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TopicPartition, TopicPartitionOffsets> next = it.next();
            if (!collection.contains(next.getKey())) {
                if (next.getValue().needsCommit()) {
                    this.log.warn("partition [{}] not assigned to consumer [{}] anymore but latest handled record offset hasn't been committed yet! {}", next.getKey(), getClientId(), next.getValue().getStateInfo());
                } else if (next.getValue().allCompleted()) {
                    this.log.trace("partition [{}] not assigned to consumer anymore; no still outstanding offset commits there", next.getKey());
                } else {
                    this.log.debug("partition [{}] not assigned to consumer [{}] anymore but not all read records have been fully processed yet! {}", next.getKey(), getClientId(), next.getValue().getStateInfo());
                }
                it.remove();
            }
        }
    }

    private synchronized Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
        return (Map) this.offsetsMap.entrySet().stream().flatMap(entry -> {
            return ((TopicPartitionOffsets) entry.getValue()).getLastSequentiallyCompletedOffsetForCommit().stream().map(l -> {
                return Pair.of((TopicPartition) entry.getKey(), new OffsetAndMetadata(l.longValue() + 1, ""));
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.one();
        }, (v0) -> {
            return v0.two();
        }));
    }

    private synchronized void setCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((topicPartition, offsetAndMetadata) -> {
            Optional.ofNullable(this.offsetsMap.get(topicPartition)).ifPresent(topicPartitionOffsets -> {
                topicPartitionOffsets.setLastCommittedOffset(offsetAndMetadata.offset() - 1);
            });
            this.lastKnownCommittedOffsets.put(topicPartition, Long.valueOf(offsetAndMetadata.offset()));
        });
    }
}
