Skip to content

Commit

Permalink
[feat][admin] PIP-330: getMessagesById gets all messages (apache#21918)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>

(cherry picked from commit 22f7323)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Dec 18, 2024
1 parent 89b300c commit 0ff335c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@
*/
package org.apache.pulsar.broker.admin;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;

@Test(groups = "broker-admin")
public class AdminTopicApiTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AdminTopicApiTest.class);
Expand Down Expand Up @@ -97,4 +107,57 @@ public void testPeekMessages() throws Exception {
Assert.assertEquals(new String(messages.get(3).getValue(), UTF_8), "value-3");
Assert.assertEquals(new String(messages.get(4).getValue(), UTF_8), "value-4");
}

@Test
public void testGetMessagesId() throws PulsarClientException, ExecutionException, InterruptedException {
String topic = newTopicName();

int numMessages = 10;
int batchingMaxMessages = numMessages / 2;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(batchingMaxMessages)
.batchingMaxPublishDelay(60, TimeUnit.SECONDS)
.create();

List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
futures.add(producer.sendAsync(("msg-" + i).getBytes(UTF_8)));
}
FutureUtil.waitForAll(futures).get();

Map<MessageIdImpl, Integer> messageIdMap = new HashMap<>();
futures.forEach(n -> {
try {
MessageId messageId = n.get();
if (messageId instanceof MessageIdImpl) {
MessageIdImpl impl = (MessageIdImpl) messageId;
MessageIdImpl key = new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1);
Integer i = messageIdMap.computeIfAbsent(key, __ -> 0);
messageIdMap.put(key, i + 1);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

messageIdMap.forEach((key, value) -> {
assertEquals(value, Integer.valueOf(batchingMaxMessages));
try {
List<Message<byte[]>> messages = admin.topics().getMessagesById(topic,
key.getLedgerId(), key.getEntryId());
assertNotNull(messages);
assertEquals(messages.size(), batchingMaxMessages);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
}
});

// The message id doesn't exist.
assertThrows(PulsarAdminException.NotFoundException.class, () -> admin.topics()
.getMessagesById(topic, 1024, 2048));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,9 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
* @deprecated Using {@link #getMessagesById(String, long, long)} instead.
*/
@Deprecated
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
Expand All @@ -1602,9 +1604,32 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
* @deprecated Using {@link #getMessagesByIdAsync(String, long, long)} instead.
*/
@Deprecated
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
* Get the messages by messageId.
*
* @param topic Topic name
* @param ledgerId Ledger id
* @param entryId Entry id
* @return A set of messages.
* @throws PulsarAdminException Unexpected error
*/
List<Message<byte[]>> getMessagesById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
* Get the messages by messageId asynchronously.
*
* @param topic Topic name
* @param ledgerId Ledger id
* @param entryId Entry id
* @return A future that can be used to track when a set of messages is returned.
*/
CompletableFuture<List<Message<byte[]>>> getMessagesByIdAsync(String topic, long ledgerId, long entryId);

/**
* Get message ID published at or just after this absolute timestamp (in ms).
* @param topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,20 +1075,16 @@ public CompletableFuture<Void> truncateAsync(String topic) {
}

@Override
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
return getRemoteMessageById(topic, ledgerId, entryId);
}

private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
public CompletableFuture<List<Message<byte[]>>> getMessagesByIdAsync(String topic, long ledgerId, long entryId) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0));
future.complete(getMessagesFromHttpResponse(topicName.toString(), response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
Expand All @@ -1102,6 +1098,19 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public List<Message<byte[]>> getMessagesById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
return sync(() -> getMessagesByIdAsync(topic, ledgerId, entryId));
}

@Deprecated
@Override
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
return getMessagesByIdAsync(topic, ledgerId, entryId).thenApply(n -> n.get(0));
}

@Deprecated
@Override
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
Expand Down

0 comments on commit 0ff335c

Please sign in to comment.