Skip to content

Commit

Permalink
ARTEMIS-5037: option to limit mirror propagation
Browse files Browse the repository at this point in the history
Add a new option in the Mirror settings to prevent a broker from
propagating messages.

When working with a topology where 4 nodes are forming a square and
where each node in that square mirrors its two neighbors: a message
leaving a corner can reach the opposite corner of the square by two
different routes. This is causing the message ordering to get broken.

example:
1 <-> 2
^     ^
|     |
v     v
4 <-> 3

A message from 1 will reach 3 by 2 and 4. Message duplication checks
will prevent the message from being duplicated but won't help regarding
the order of the messages. This is because a either the route by 2 or 4
can be faster than the other, so whomever wins the race sets the message
first.

Fixing the example:
Using the new option to not forward messages coming from a link, we
break the possibilities to have two routes to reach the opposite corner.

The above example is updated as followed:
* 2 never forwards messages coming from 1
* 1 never forwards messages coming from 2
* 3 never forwards messages coming from 4
* 4 never forwards messages coming from 3

Now, when a messages leaves 1:
* it reaches 2 and stops there
* it reaches 4
* it reaches 3 through 4 and stops there

Now, when a messages leaves 2:
* it reaches 1 and stops there
* it reaches 3
* it reaches 4 through 3 and stops there

Now, when a messages leaves 3:
* it reaches 4 and stops there
* it reaches 2
* it reaches 1 through 2 and stops there

Now, when a messages leaves 4:
* it reaches 3 and stops there
* it reaches 1
* it reaches 2 through 1 and stops there

The new test AMQPSquareMirroringTest.java is testing this exact setup.
  • Loading branch information
lavocatt committed Sep 27, 2024
1 parent 5808985 commit d74aa05
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -430,16 +431,19 @@ private void doConnect() {
final Queue queue = server.locateQueue(getMirrorSNF(replica));

final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}
if (replica.getNoForward()) {
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});

final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};

connectSender(queue,
queue.getName().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down Expand Up @@ -236,6 +238,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
if (ignoreAddress(addressInfo.getName())) {
return;
}
if (isBlockedByNoForward()) {
return;
}

if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
routeMirrorCommand(server, message);
Expand Down Expand Up @@ -264,6 +270,10 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}
return;
}
if (isBlockedByNoForward()) {
return;
}

if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -283,6 +293,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
if (ignoreAddress(address)) {
return;
}
if (isBlockedByNoForward()) {
return;
}


if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
Expand Down Expand Up @@ -310,6 +324,14 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean isBlockedByNoForward() {
return getControllerInUse() != null && getControllerInUse().getNoForward();
}

private boolean isBlockedByNoForward(Message message) {
return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
Expand Down Expand Up @@ -353,6 +375,12 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

logger.trace("sendMessage::{} send message {}", server, message);

if (isBlockedByNoForward(message)) {
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
return;
}
try {
context.setReusable(false);

Expand Down Expand Up @@ -559,6 +587,10 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
return;
}

if (isBlockedByNoForward()) {
return;
}

if ((ref.getQueue() != null && (ref.getQueue().isInternalQueue() || ref.getQueue().isMirrorController()))) {
if (logger.isDebugEnabled()) {
logger.debug("preAcknowledge::{} rejecting preAcknowledge queue={}, ref={} to avoid infinite loop with the mirror (reflection)", server, ref.getQueue().getName(), ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.NoForwardMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
Expand All @@ -53,10 +53,12 @@
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand All @@ -77,29 +79,38 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;

public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements NoForwardMirrorController {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<NoForwardMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();

public static void setControllerInUse(MirrorController controller) {
public static void setControllerInUse(NoForwardMirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}

public static MirrorController getControllerInUse() {
public static NoForwardMirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}

private boolean noMessageForwarding = false;

@Override
public boolean getNoForward() {
return noMessageForwarding;
}

/**
* Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
Expand Down Expand Up @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD);
}

@Override
Expand Down Expand Up @@ -534,6 +546,9 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (noMessageForwarding) {
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
}

if (internalAddress != null) {
message.setAddress(internalAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.NoForwardMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
Expand Down Expand Up @@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Que

// to be used with the same executor as the PagingStore executor
public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
NoForwardMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
logger.trace("retrying address {} on server {}", address, server);
try {
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
Expand Down Expand Up @@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap<AckRetry, AckRetry, Queue> map) {



private static class DisabledAckMirrorController implements MirrorController {
private static class DisabledAckMirrorController implements NoForwardMirrorController {

@Override
public boolean isRetryACK() {
Expand Down Expand Up @@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso
public String getRemoteMirrorId() {
return null;
}

@Override
public boolean getNoForward() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.activemq.artemis.core.io.OperationConsistencyLevel;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.NoForwardMirrorController;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.slf4j.Logger;
Expand All @@ -34,7 +34,7 @@ public class MirrorTransaction extends TransactionImpl {

boolean allowPageTransaction;

MirrorController controlInUse;
NoForwardMirrorController controlInUse;

public MirrorTransaction(StorageManager storageManager) {
super(storageManager);
Expand All @@ -44,7 +44,7 @@ public MirrorTransaction(StorageManager storageManager) {

@Override
protected synchronized void afterCommit(List<TransactionOperation> operationsToComplete) {
MirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse();
NoForwardMirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse();
AMQPMirrorControllerTarget.setControllerInUse(controlInUse);
try {
super.afterCommit(operationsToComplete);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec
return;
}

ArrayList<Symbol> offeredCapabilitiesList = new ArrayList<>();
offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
// We need to check if the remote desires to send us tunneled core messages or not, and if
// we support that we need to offer that back so it knows it can actually do core tunneling.
if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) {
receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT});
} else {
receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}

// If the remote wants us to not forward any messages to other mirrors we need to offer that capability
if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)){
offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}
receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{}));

protonSession.addReplicaTarget(receiver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme

boolean queueCreation = true;

boolean noForward = false;

boolean queueRemoval = true;

boolean messageAcknowledgements = true;
Expand Down Expand Up @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation)
return this;
}

public boolean getNoForward() {
return noForward;
}

public AMQPMirrorBrokerConnectionElement setNoForward(boolean noForward) {
this.noForward = noForward;
return this;
}

public boolean isQueueRemoval() {
return queueRemoval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2194,10 +2194,11 @@ private void parseAMQPBrokerConnections(final Element e,
boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
boolean sync = getBooleanAttribute(e2, "sync", false);
boolean noForward = !getBooleanAttribute(e2, "no-forwarding", false);
String addressFilter = getAttributeValue(e2, "address-filter");

AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync);
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setNoForward(noForward);
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.activemq.artemis.core.server.mirror;

public interface NoForwardMirrorController extends MirrorController{
public boolean getNoForward();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="no-forwarding" type="xsd:boolean" use="optional" default="false">
<xsd:annotation>
<xsd:documentation>
If this is true, the mirror at the opposite end of the link will not forward data coming from that link to any other mirrors down the line.
This is false by default.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Expand Down
Loading

0 comments on commit d74aa05

Please sign in to comment.