Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKCORE-250: Additional tracing and testing of ShareConsumeRequestManager #1421

Open
wants to merge 2 commits into
base: kip-932
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;

import java.io.Closeable;
Expand Down Expand Up @@ -158,7 +159,8 @@ public PollResult poll(long currentTimeMs) {
log.trace("Skipping fetch for partition {} because previous fetch request to {} has not been processed", partition, node.id());
} else {
// if there is a leader and no in-flight requests, issue a new fetch
ShareSessionHandler handler = handlerMap.computeIfAbsent(node, k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId)));
ShareSessionHandler handler = handlerMap.computeIfAbsent(node,
k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId)));

TopicIdPartition tip = new TopicIdPartition(topicId, partition);
Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip);
Expand All @@ -167,7 +169,7 @@ public PollResult poll(long currentTimeMs) {
}
handler.addPartitionToFetch(tip, acknowledgementsToSend);

log.debug("Added fetch request for partition {} to node {}", partition, node);
log.debug("Added fetch request for partition {} to node {}", partition, node.id());
}
}

Expand All @@ -178,6 +180,7 @@ public PollResult poll(long currentTimeMs) {

List<UnsentRequest> requests = builderMap.entrySet().stream().map(entry -> {
Node target = entry.getKey();
log.trace("Building ShareFetch request to send to node {}", target.id());
ShareFetchRequest.Builder requestBuilder = entry.getValue();

nodesWithPendingRequests.add(target.id());
Expand Down Expand Up @@ -247,6 +250,7 @@ private PollResult processAcknowledgements(long currentTimeMs) {
pollResult = PollResult.EMPTY;
} else if (closing) {
if (!closeFuture.isDone()) {
log.trace("Completing acknowledgement on close");
closeFuture.complete(null);
}
pollResult = PollResult.EMPTY;
Expand Down Expand Up @@ -283,7 +287,7 @@ public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node);
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
Expand Down Expand Up @@ -326,7 +330,7 @@ public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledg
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added acknowledge request for partition {} to node {}", tip.topicPartition(), node);
log.debug("Added async acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
Expand All @@ -344,8 +348,6 @@ public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledg
));
}
});

resultHandler.completeIfEmpty();
}

/**
Expand Down Expand Up @@ -376,7 +378,7 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node);
log.debug("Added closing acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
Expand All @@ -396,14 +398,14 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
}
});

resultHandler.completeIfEmpty();
return closeFuture;
}

private void handleShareFetchSuccess(Node fetchTarget,
ShareFetchRequestData requestData,
@SuppressWarnings("unused") ShareFetchRequestData requestData,
ClientResponse resp) {
try {
log.debug("Completed ShareFetch request from node {} successfully", fetchTarget.id());
final ShareFetchResponse response = (ShareFetchResponse) resp.responseBody();
final ShareSessionHandler handler = sessionHandler(fetchTarget.id());

Expand Down Expand Up @@ -467,7 +469,7 @@ private void handleShareFetchSuccess(Node fetchTarget,

metricsManager.recordLatency(resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget);
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand All @@ -476,6 +478,7 @@ private void handleShareFetchFailure(Node fetchTarget,
ShareFetchRequestData requestData,
Throwable error) {
try {
log.debug("Completed ShareFetch request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
final ShareSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(error);
Expand All @@ -496,7 +499,7 @@ private void handleShareFetchFailure(Node fetchTarget,
}
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget);
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand All @@ -507,6 +510,7 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
ClientResponse resp,
long currentTimeMs) {
try {
log.debug("Completed ShareAcknowledge request from node {} successfully", fetchTarget.id());
final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody();
final ShareSessionHandler handler = acknowledgeRequestState.sessionHandler();

Expand Down Expand Up @@ -543,7 +547,7 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,

metricsManager.recordLatency(resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget);
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand All @@ -554,17 +558,18 @@ private void handleShareAcknowledgeFailure(Node fetchTarget,
Throwable error,
long currentTimeMs) {
try {
log.debug("Completed ShareAcknowledge request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
acknowledgeRequestState.sessionHandler().handleError(error);

requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> {
TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
partition.partitionIndex(),
metadata.topicNames().get(topic.topicId()));
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget);
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand All @@ -575,6 +580,7 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget,
ClientResponse resp,
long currentTimeMs) {
try {
log.debug("Completed ShareAcknowledge on close request from node {} successfully", fetchTarget.id());
final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody();

response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> {
Expand All @@ -590,7 +596,7 @@ private void handleShareAcknowledgeCloseSuccess(Node fetchTarget,
metricsManager.recordLatency(resp.requestLatencyMs());
acknowledgeRequestState.processingComplete();
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget);
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
Expand All @@ -602,6 +608,7 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget,
Throwable error,
long currentTimeMs) {
try {
log.debug("Completed ShareAcknowledge on close request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error));
acknowledgeRequestState.sessionHandler().handleError(error);

requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> {
Expand All @@ -612,7 +619,7 @@ private void handleShareAcknowledgeCloseFailure(Node fetchTarget,
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error));
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget);
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
sessionHandlers.remove(fetchTarget.id());
}
Expand Down Expand Up @@ -751,9 +758,11 @@ UnsentRequest buildRequest(long currentTimeMs) {
};

if (requestBuilder == null) {
log.trace("Building ShareAcknowledge request to send to node {} failed", nodeToSend.id());
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
} else {
log.trace("Building ShareAcknowledge request to send to node {}", nodeToSend.id());
return new UnsentRequest(requestBuilder, Optional.of(nodeToSend)).whenComplete(responseHandler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ public void testCommitSync() {
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}

@Test
Expand Down Expand Up @@ -339,6 +342,9 @@ public void testCommitAsync() {
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}

@Test
Expand Down Expand Up @@ -368,6 +374,45 @@ public void testAcknowledgeOnClose() {
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}

@Test
public void testAcknowledgeOnCloseWithPendingCommitAsync() {
buildRequestManager();

assignFromSubscribed(Collections.singleton(tp0));

// normal fetch
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());

client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

Acknowledgements acknowledgements = Acknowledgements.empty();
acknowledgements.add(1L, AcknowledgeType.ACCEPT);
acknowledgements.add(2L, AcknowledgeType.ACCEPT);
acknowledgements.add(3L, AcknowledgeType.REJECT);

shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, acknowledgements));
shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
calculateDeadlineMs(time.timer(100)));

assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());

client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));

client.prepareResponse(emptyAcknowledgeResponse());
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());

assertEquals(Collections.singletonMap(tip0, acknowledgements), completedAcknowledgements.get(0));
completedAcknowledgements.clear();
}

@Test
Expand Down Expand Up @@ -753,6 +798,11 @@ private ShareFetchResponse fullFetchResponse(TopicIdPartition tp,
return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
}

private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Collections.emptyMap();
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
}

private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Collections.singletonMap(tp,
partitionDataForAcknowledge(tp, error));
Expand Down