Skip to content

Commit

Permalink
[fix][client] Cannot access message data inside ProducerInterceptor#o…
Browse files Browse the repository at this point in the history
…nSendAcknowledgement
  • Loading branch information
Shawyeok committed Dec 27, 2024
1 parent 369c352 commit ea703c8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -216,6 +218,48 @@ public void onSendAcknowledgement(Producer<String> producer, Message<String> mes
producer.close();
}

@Test
public void testProducerInterceptorAccessMessageData() throws PulsarClientException {
List<String> messageDataInBeforeSend = Collections.synchronizedList(new ArrayList<>());
List<String> messageDataOnSendAcknowledgement = Collections.synchronizedList(new ArrayList<>());
ProducerInterceptor<String> interceptor = new ProducerInterceptor<>() {
@Override
public void close() {
}

@Override
public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
messageDataInBeforeSend.add(new String(message.getData()));
return message;
}

@Override
public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId,
Throwable exception) {
messageDataOnSendAcknowledgement.add(new String(message.getData()));
}
};
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.intercept(interceptor)
.create();

final String messageValue = UUID.randomUUID().toString();
try {
producer.newMessage().value(messageValue).send();
} catch (Exception ignore) {
}
Assert.assertEquals(messageDataInBeforeSend.size(), 1,
"Message data should be available in beforeSend");
Assert.assertEquals(messageDataInBeforeSend.get(0), messageValue,
"Message data should be available in beforeSend");
Assert.assertEquals(messageDataOnSendAcknowledgement.size(), 1,
"Message data should be available in onSendAcknowledgement");
Assert.assertEquals(messageDataOnSendAcknowledgement.get(0), messageValue,
"Message data should be available in onSendAcknowledgement");
}

@Test
public void testConsumerInterceptorWithErrors() throws PulsarClientException {
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,20 +436,22 @@ private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl<
if (payload == null) {
log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.",
topic, producerName);
} else {
ReferenceCountUtil.safeRelease(payload);
}
if (e != null) {
latencyHistogram.recordFailure(latencyNanos);
stats.incrementSendFailed();
onSendAcknowledgement(msg, null, e);
sendCallback.getFuture().completeExceptionally(e);
} else {
latencyHistogram.recordSuccess(latencyNanos);
publishedBytesCounter.add(msgSize);
stats.incrementNumAcksReceived(latencyNanos);
onSendAcknowledgement(msg, msg.getMessageId(), null);
sendCallback.getFuture().complete(msg.getMessageId());
try {
if (e != null) {
latencyHistogram.recordFailure(latencyNanos);
stats.incrementSendFailed();
onSendAcknowledgement(msg, null, e);
sendCallback.getFuture().completeExceptionally(e);
} else {
latencyHistogram.recordSuccess(latencyNanos);
publishedBytesCounter.add(msgSize);
stats.incrementNumAcksReceived(latencyNanos);
onSendAcknowledgement(msg, msg.getMessageId(), null);
sendCallback.getFuture().complete(msg.getMessageId());
}
} finally {
ReferenceCountUtil.safeRelease(payload);
}
}

Expand Down

0 comments on commit ea703c8

Please sign in to comment.