Skip to content

Commit

Permalink
ARTEMIS-5037: limit mirror propagation
Browse files Browse the repository at this point in the history
Add a new option in the Mirror settings to prevent a broker from
propagating messages.

On a topology such as:

```
1 ---> 2 ---> 3
^______|
```

Where 1 is connected to 2 via a noForward link, the behavior is as
follows:

* Every command from 1 are reaching 2 and are stopping at 2 not reaching
  3.
* If a message is produced on 1 and consumed on 2, the message is
  acknowledged on 1 and 2. No ack is reaching 3.
* Every message produced on 2 are mirrored on 1 and 3.
* If a message is produced on 2 and consumed on 1, it is acked on 1 2
  and 3
* If a message is produced on 3 and consumed on 3, it is acked only on
  3.
  • Loading branch information
lavocatt committed Nov 26, 2024
1 parent a3028f1 commit 7b146bd
Show file tree
Hide file tree
Showing 14 changed files with 927 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection
public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null);
connectSender(queue, queue.getAddress().toString(), null, null,null, null, null, dispatchCapability, null);
connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null, null);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
}
Expand Down Expand Up @@ -450,21 +451,25 @@ private void doConnect() {
final Queue queue = server.locateQueue(getMirrorSNF(replica));

final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}
if (replica.getNoForward()) {
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});

final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};

connectSender(queue,
queue.getName().toString(),
mirrorControllerSource::setLink,
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
(r) -> mirrorControllerSource.filterMessage(r),
server.getNodeID().toString(),
desiredCapabilities,
null,
Expand Down Expand Up @@ -771,6 +776,7 @@ private void connectSender(Queue queue,
String targetName,
java.util.function.Consumer<Sender> senderConsumer,
java.util.function.Consumer<? super MessageReference> beforeDeliver,
java.util.function.Predicate<? super MessageReference> beforeDeliverFiltering,
String brokerID,
Symbol[] desiredCapabilities,
Symbol[] targetCapabilities,
Expand Down Expand Up @@ -831,7 +837,7 @@ private void connectSender(Queue queue,

// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver);
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering);
try {
if (!cancelled.get()) {
if (futureTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,9 +90,15 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");
public static final Symbol NO_FORWARD_SOURCE = Symbol.getSymbol("amq.no.forward.source");
public static final Symbol RECEIVER_ID_FILTER = Symbol.getSymbol("amq.receiver.id.filter");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());
public static final SimpleString INTERNAL_NO_FORWARD_SOURCE = SimpleString.of(NO_FORWARD_SOURCE.toString());
public static final SimpleString INTERNAL_RECEIVER_ID_FILTER = SimpleString.of(RECEIVER_ID_FILTER.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down Expand Up @@ -230,12 +237,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception {
public void deleteAddress(AddressInfo addressInfo) throws Exception {
logger.trace("{} deleteAddress {}", server, addressInfo);

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
return;
}
if (ignoreAddress(addressInfo.getName())) {
return;
}

if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -246,6 +258,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
logger.trace("{} createQueue {}", server, queueConfiguration);

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse());
Expand All @@ -264,6 +280,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}
return;
}

if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -276,6 +293,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
logger.trace("{} deleteQueue {}/{}", server, address, queue);
}

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse())) {
return;
}
Expand Down Expand Up @@ -310,6 +331,14 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean isBlockedByNoForward() {
return getControllerInUse() != null && getControllerInUse().isNoForward();
}

private boolean isBlockedByNoForward(Message message) {
return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
Expand Down Expand Up @@ -338,6 +367,12 @@ Message copyMessageForPaging(Message message) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);

if (isBlockedByNoForward(message)) {
String remoteID = getRemoteMirrorId();
logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server);
return;
}

if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
return;
Expand All @@ -353,6 +388,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

logger.trace("sendMessage::{} send message {}", server, message);

try {
context.setReusable(false);

Expand Down Expand Up @@ -467,6 +504,28 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
}
}

/**
* Checks if the message ref should be filtered or not.
* @param ref the message to filter
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
* that the remoteMirrorID, false otherwise.
*/
public boolean filterMessage(MessageReference ref) {
Object filterID = ref.getMessage().getAnnotation(INTERNAL_RECEIVER_ID_FILTER);
if (filterID != null) {
String remoteMirrorId = getRemoteMirrorId();
if (remoteMirrorId != null) {
if(remoteMirrorId.equals(filterID)){
return false;
} else {
return true;
}
}
return false;
}
return false;
}

/** This method will return the brokerID used by the message */
private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) {
String brokerID = referenceIDSupplier.getServerID(ref);
Expand Down Expand Up @@ -543,6 +602,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason);
}

SimpleString noForwardSource = null;
if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) {
noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE);
String remoteMirrorId = getRemoteMirrorId();
if (remoteMirrorId != null) {
if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) {
return;
}
}
}

MirrorController controllerInUse = getControllerInUse();

// Retried ACKs are not forwarded.
Expand Down Expand Up @@ -578,6 +648,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);
if (noForwardSource != null) {
messageCommand.setBrokerProperty(INTERNAL_RECEIVER_ID_FILTER, noForwardSource);
}
if (sync) {
OperationContext operationContext;
operationContext = OperationContextImpl.getContext(server.getExecutorFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
Expand All @@ -53,6 +53,7 @@
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
Expand All @@ -77,29 +78,39 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD_SOURCE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;

public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<TargetMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();

public static void setControllerInUse(MirrorController controller) {
public static void setControllerInUse(TargetMirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}

public static MirrorController getControllerInUse() {
public static TargetMirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}

private boolean noMessageForwarding = false;

@Override
public boolean isNoForward() {
return noMessageForwarding;
}

/**
* Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
Expand Down Expand Up @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD);
}

@Override
Expand Down Expand Up @@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (noMessageForwarding) {
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
message.setBrokerProperty(INTERNAL_NO_FORWARD_SOURCE, getRemoteMirrorId());
}

if (internalAddress != null) {
message.setAddress(internalAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
Expand Down Expand Up @@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Que

// to be used with the same executor as the PagingStore executor
public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse();
logger.trace("retrying address {} on server {}", address, server);
try {
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
Expand Down Expand Up @@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap<AckRetry, AckRetry, Queue> map) {



private static class DisabledAckMirrorController implements MirrorController {
private static class DisabledAckMirrorController implements TargetMirrorController {

@Override
public boolean isRetryACK() {
Expand Down Expand Up @@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso
public String getRemoteMirrorId() {
return null;
}

@Override
public boolean isNoForward() {
return false;
}
}
}
Loading

0 comments on commit 7b146bd

Please sign in to comment.