Skip to content

Commit

Permalink
ARTEMIS-5184 STOMP noLocal is scoped to session not subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Dec 20, 2024
1 parent 2b77373 commit f596686
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ protected void sendServerMessage(ICoreMessage message, String txID) throws Activ
try {
StompSession stompSession = getSession(txID);

if (stompSession.isNoLocal()) {
// only set the connection ID property if we have a noLocal subscription
if (stompSession.getNoLocalSubscriptionCount() > 0) {
message.putStringProperty(CONNECTION_ID_PROPERTY_NAME_STRING, getID().toString());
}
if (isEnableMessageID()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,12 @@ public StompPostReceiptFunction subscribe(StompConnection connection,
boolean noLocal,
Integer consumerWindowSize) throws Exception {
StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) {
throw new ActiveMQStompException(connection, "There already is a subscription for: " + subscriptionID +
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, consumerWindowSize);
return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize);
}

public void unsubscribe(StompConnection connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class StompSession implements SessionCallback {
// key = consumer ID and message ID, value = frame length
private final Map<Pair<Long, Long>, Integer> messagesToAck = new ConcurrentHashMap<>();

private volatile boolean noLocal = false;
private volatile int noLocalSubscriptionCount = 0;

private boolean txPending = false;

Expand Down Expand Up @@ -231,6 +231,9 @@ public void closed() {
public void disconnect(ServerConsumer consumerId, String errorDescription) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());
if (stompSubscription != null) {
if (stompSubscription.isNoLocal()) {
noLocalSubscriptionCount--;
}
StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR);
frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
frame.setBody("consumer with ID " + consumerId + " disconnected by server");
Expand Down Expand Up @@ -306,6 +309,7 @@ public StompPostReceiptFunction addSubscription(long consumerID,
String destination,
String selector,
String ack,
boolean noLocal,
Integer consumerWindowSize) throws Exception {
SimpleString address = SimpleString.of(destination);
SimpleString queueName = SimpleString.of(destination);
Expand Down Expand Up @@ -342,8 +346,11 @@ public StompPostReceiptFunction addSubscription(long consumerID,
session.createQueue(QueueConfiguration.of(queueName).setAddress(address).setFilterString(selectorSimple).setDurable(false).setTemporary(true));
}
}
if (noLocal) {
noLocalSubscriptionCount++;
}
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast, finalConsumerWindowSize);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast, noLocal, finalConsumerWindowSize);
subscriptions.put(consumerID, subscription);
session.start();
/*
Expand Down Expand Up @@ -402,12 +409,8 @@ public OperationContext getContext() {
return sessionContext;
}

public boolean isNoLocal() {
return noLocal;
}

public void setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
public int getNoLocalSubscriptionCount() {
return noLocalSubscriptionCount;
}

public void sendInternal(Message message, boolean direct) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ public class StompSubscription {
// whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
private final boolean multicast;

private final boolean noLocal;

private final int consumerWindowSize;

public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast, int consumerWindowSize) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast, boolean noLocal, int consumerWindowSize) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.multicast = multicast;
this.noLocal = noLocal;
this.consumerWindowSize = consumerWindowSize;
}

Expand All @@ -55,13 +58,16 @@ public boolean isMulticast() {
return multicast;
}

public boolean isNoLocal() {
return noLocal;
}

public int getConsumerWindowSize() {
return consumerWindowSize;
}

@Override
public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + ", consumerWindowSize=" + consumerWindowSize + "]";
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + ", noLocal=" + noLocal + ", consumerWindowSize=" + consumerWindowSize + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,26 @@ public void testSubscribeToTopicWithNoLocal() throws Exception {
conn.disconnect();
}

@Test
public void testSubscribeToTopicWithNoLocalAndNormal() throws Exception {
conn.connect(defUser, defPass);
subscribeTopic(conn, RandomUtil.randomString(), null, null, true, true);
subscribeTopic(conn, RandomUtil.randomString(), null, null, true, false);
int normalCount = 0;
int noLocalCount = 0;
for (Binding binding : server.getPostOffice().getBindingsForAddress(SimpleString.of(getTopicPrefix() + getTopicName())).getBindings()) {
if (binding.getFilter() != null && binding.getFilter().getFilterString().toString().contains("__AMQ_CID")) {
noLocalCount++;
} else {
normalCount++;
}
}
assertEquals(1, noLocalCount);
assertEquals(1, normalCount);

conn.disconnect();
}

@Test
public void testSubscribeToTopicWithNoLocalAndSelector() throws Exception {
conn.connect(defUser, defPass);
Expand Down

0 comments on commit f596686

Please sign in to comment.