diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 6b11081681ac2..7427edbb88069 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; + import org.slf4j.Logger; import java.io.Closeable; @@ -158,7 +159,8 @@ public PollResult poll(long currentTimeMs) { log.trace("Skipping fetch for partition {} because previous fetch request to {} has not been processed", partition, node.id()); } else { // if there is a leader and no in-flight requests, issue a new fetch - ShareSessionHandler handler = handlerMap.computeIfAbsent(node, k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); + ShareSessionHandler handler = handlerMap.computeIfAbsent(node, + k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip); @@ -167,7 +169,7 @@ public PollResult poll(long currentTimeMs) { } handler.addPartitionToFetch(tip, acknowledgementsToSend); - log.debug("Added fetch request for partition {} to node {}", partition, node); + log.debug("Added fetch request for partition {} to node {}", partition, node.id()); } } @@ -178,6 +180,7 @@ public PollResult poll(long currentTimeMs) { List requests = builderMap.entrySet().stream().map(entry -> { Node target = entry.getKey(); + log.trace("Building ShareFetch request to send to node {}", target.id()); ShareFetchRequest.Builder requestBuilder = entry.getValue(); nodesWithPendingRequests.add(target.id()); @@ -247,6 +250,7 @@ private PollResult processAcknowledgements(long currentTimeMs) { pollResult = PollResult.EMPTY; } else if (closing) { if (!closeFuture.isDone()) { + log.trace("Completing acknowledgement on close"); closeFuture.complete(null); } pollResult = PollResult.EMPTY; @@ -283,7 +287,7 @@ public CompletableFuture> commitSync( acknowledgementsMapForNode.put(tip, acknowledgements); metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); resultCount.incrementAndGet(); } } @@ -326,7 +330,7 @@ public void commitAsync(final Map acknowledg acknowledgementsMapForNode.put(tip, acknowledgements); metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node); + log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); resultCount.incrementAndGet(); } } @@ -344,8 +348,6 @@ public void commitAsync(final Map acknowledg )); } }); - - resultHandler.completeIfEmpty(); } /** @@ -376,7 +378,7 @@ public CompletableFuture acknowledgeOnClose(final Map acknowledgeOnClose(final Map topic.partitions().forEach(partition -> { TopicIdPartition tip = new TopicIdPartition(topic.topicId(), partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip)); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); })); } finally { - log.debug("Removing pending request for node {} - failed", fetchTarget); + log.debug("Removing pending request for node {} - failed", fetchTarget.id()); nodesWithPendingRequests.remove(fetchTarget.id()); } } @@ -575,6 +580,7 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, ClientResponse resp, long currentTimeMs) { try { + log.debug("Completed ShareAcknowledge on close request from node {} successfully", fetchTarget.id()); final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody(); response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { @@ -590,7 +596,7 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, metricsManager.recordLatency(resp.requestLatencyMs()); acknowledgeRequestState.processingComplete(); } finally { - log.debug("Removing pending request for node {} - success", fetchTarget); + log.debug("Removing pending request for node {} - success", fetchTarget.id()); nodesWithPendingRequests.remove(fetchTarget.id()); sessionHandlers.remove(fetchTarget.id()); } @@ -602,6 +608,7 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget, Throwable error, long currentTimeMs) { try { + log.debug("Completed ShareAcknowledge on close request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error)); acknowledgeRequestState.sessionHandler().handleError(error); requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { @@ -612,7 +619,7 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget, acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); })); } finally { - log.debug("Removing pending request for node {} - failed", fetchTarget); + log.debug("Removing pending request for node {} - failed", fetchTarget.id()); nodesWithPendingRequests.remove(fetchTarget.id()); sessionHandlers.remove(fetchTarget.id()); } @@ -751,9 +758,11 @@ UnsentRequest buildRequest(long currentTimeMs) { }; if (requestBuilder == null) { + log.trace("Building ShareAcknowledge request to send to node {} failed", nodeToSend.id()); handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND); return null; } else { + log.trace("Building ShareAcknowledge request to send to node {}", nodeToSend.id()); return new UnsentRequest(requestBuilder, Optional.of(nodeToSend)).whenComplete(responseHandler); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 4e75652e376fc..deb7951d3b199 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -311,6 +311,9 @@ public void testCommitSync() { client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0)); + completedAcknowledgements.clear(); } @Test @@ -339,6 +342,9 @@ public void testCommitAsync() { client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0)); + completedAcknowledgements.clear(); } @Test @@ -368,6 +374,45 @@ public void testAcknowledgeOnClose() { client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0)); + completedAcknowledgements.clear(); + } + + @Test + public void testAcknowledgeOnCloseWithPendingCommitAsync() { + buildRequestManager(); + + assignFromSubscribed(Collections.singleton(tp0)); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + acknowledgements.add(3L, AcknowledgeType.REJECT); + + shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements)); + shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), + calculateDeadlineMs(time.timer(100))); + + assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + + client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + + client.prepareResponse(emptyAcknowledgeResponse()); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0)); + completedAcknowledgements.clear(); } @Test @@ -753,6 +798,11 @@ private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); } + private ShareAcknowledgeResponse emptyAcknowledgeResponse() { + Map partitions = Collections.emptyMap(); + return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); + } + private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error) { Map partitions = Collections.singletonMap(tp, partitionDataForAcknowledge(tp, error));