diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index b6aab4ee44f..1f1f56332ac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) { if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) { Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY}; - connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null); + connectSender(queue, queue.getAddress().toString(), null, null,null, null, null, dispatchCapability, null); connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability); } else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { - connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null); + connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null, null); } else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { connectReceiver(protonRemotingConnection, session, sessionContext, queue); } @@ -450,21 +451,25 @@ private void doConnect() { final Queue queue = server.locateQueue(getMirrorSNF(replica)); final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica); - final Symbol[] desiredCapabilities; + ArrayList desiredCapabilitiesList = new ArrayList<>(); + desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); if (coreTunnelingEnabled) { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}; - } else { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + if (replica.getNoForward()) { + desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } - final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{}); + + final Symbol[] requiredOfferedCapabilities = replica.getNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; connectSender(queue, queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), + (r) -> mirrorControllerSource.filterMessage(r), server.getNodeID().toString(), desiredCapabilities, null, @@ -771,6 +776,7 @@ private void connectSender(Queue queue, String targetName, java.util.function.Consumer senderConsumer, java.util.function.Consumer beforeDeliver, + java.util.function.Predicate beforeDeliverFiltering, String brokerID, Symbol[] desiredCapabilities, Symbol[] targetCapabilities, @@ -831,7 +837,7 @@ private void connectSender(Queue queue, // Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> { - ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver); + ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering); try { if (!cancelled.get()) { if (futureTimeout != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 0d31363c6f1..36d62e2b4ed 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -53,6 +53,7 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.transport.Target; import org.apache.qpid.proton.engine.Sender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,9 +90,15 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im // Capabilities public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror"); public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint"); + public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward"); + public static final Symbol NO_FORWARD_SOURCE = Symbol.getSymbol("amq.no.forward.source"); + public static final Symbol RECEIVER_ID_FILTER = Symbol.getSymbol("amq.receiver.id.filter"); public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString()); + public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString()); + public static final SimpleString INTERNAL_NO_FORWARD_SOURCE = SimpleString.of(NO_FORWARD_SOURCE.toString()); + public static final SimpleString INTERNAL_RECEIVER_ID_FILTER = SimpleString.of(RECEIVER_ID_FILTER.toString()); private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null)); @@ -230,12 +237,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception { public void deleteAddress(AddressInfo addressInfo) throws Exception { logger.trace("{} deleteAddress {}", server, addressInfo); + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) { return; } if (ignoreAddress(addressInfo.getName())) { return; } + if (deleteQueues) { Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON()); routeMirrorCommand(server, message); @@ -246,6 +258,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception { public void createQueue(QueueConfiguration queueConfiguration) throws Exception { logger.trace("{} createQueue {}", server, queueConfiguration); + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) { if (logger.isTraceEnabled()) { logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse()); @@ -264,6 +280,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } return; } + if (addQueues) { Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON()); routeMirrorCommand(server, message); @@ -276,6 +293,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti logger.trace("{} deleteQueue {}/{}", server, address, queue); } + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse())) { return; } @@ -310,6 +331,14 @@ private boolean invalidTarget(MirrorController controller) { return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); } + private boolean isBlockedByNoForward() { + return getControllerInUse() != null && getControllerInUse().isNoForward(); + } + + private boolean isBlockedByNoForward(Message message) { + return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD)); + } + private boolean ignoreAddress(SimpleString address) { if (address.startsWith(server.getConfiguration().getManagementAddress())) { return true; @@ -338,6 +367,12 @@ Message copyMessageForPaging(Message message) { public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); + if (isBlockedByNoForward(message)) { + String remoteID = getRemoteMirrorId(); + logger.trace("sendMessage::server {} is discarding the message because its source is setting a noForward policy", server); + return; + } + if (context.isInternal()) { logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); return; @@ -353,6 +388,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context) return; } + logger.trace("sendMessage::{} send message {}", server, message); + try { context.setReusable(false); @@ -467,6 +504,28 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, } } + /** + * Checks if the message ref should be filtered or not. + * @param ref the message to filter + * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value + * that the remoteMirrorID, false otherwise. + */ + public boolean filterMessage(MessageReference ref) { + Object filterID = ref.getMessage().getAnnotation(INTERNAL_RECEIVER_ID_FILTER); + if (filterID != null) { + String remoteMirrorId = getRemoteMirrorId(); + if (remoteMirrorId != null) { + if(remoteMirrorId.equals(filterID)){ + return false; + } else { + return true; + } + } + return false; + } + return false; + } + /** This method will return the brokerID used by the message */ private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) { String brokerID = referenceIDSupplier.getServerID(ref); @@ -543,6 +602,17 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason); } + SimpleString noForwardSource = null; + if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) { + noForwardSource = (SimpleString) ref.getMessage().getBrokerProperty(INTERNAL_NO_FORWARD_SOURCE); + String remoteMirrorId = getRemoteMirrorId(); + if (remoteMirrorId != null) { + if (!SimpleString.of(remoteMirrorId).equals(noForwardSource)) { + return; + } + } + } + MirrorController controllerInUse = getControllerInUse(); // Retried ACKs are not forwarded. @@ -578,6 +648,9 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker. long internalID = idSupplier.getID(ref); Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason); + if (noForwardSource != null) { + messageCommand.setBrokerProperty(INTERNAL_RECEIVER_ID_FILTER, noForwardSource); + } if (sync) { OperationContext operationContext; operationContext = OperationContextImpl.getContext(server.getExecutorFactory()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 27177f6ab30..d121a86cf85 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -39,7 +39,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver; import org.apache.activemq.artemis.utils.ByteUtil; @@ -77,8 +78,11 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD_SOURCE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; @@ -86,20 +90,27 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; -public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController { +public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); - public static void setControllerInUse(MirrorController controller) { + public static void setControllerInUse(TargetMirrorController controller) { CONTROLLER_THREAD_LOCAL.set(controller); } - public static MirrorController getControllerInUse() { + public static TargetMirrorController getControllerInUse() { return CONTROLLER_THREAD_LOCAL.get(); } + private boolean noMessageForwarding = false; + + @Override + public boolean isNoForward() { + return noMessageForwarding; + } + /** * Objects of this class can be used by either transaction or by OperationContext. * It is important that when you're using the transactions you clear any references to @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI, this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); + this.noMessageForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD); } @Override @@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID); + if (noMessageForwarding) { + message.setBrokerProperty(INTERNAL_NO_FORWARD, true); + message.setBrokerProperty(INTERNAL_NO_FORWARD_SOURCE, getRemoteMirrorId()); + } if (internalAddress != null) { message.setAddress(internalAddress); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 0ef1a9b6497..b5138420e4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -54,7 +54,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; @@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap> acksToRetry) { - MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); + TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); logger.trace("retrying address {} on server {}", address, server); try { AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController); @@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap map) { - private static class DisabledAckMirrorController implements MirrorController { + private static class DisabledAckMirrorController implements TargetMirrorController { @Override public boolean isRetryACK() { @@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso public String getRemoteMirrorId() { return null; } + + @Override + public boolean isNoForward() { + return false; + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java index 114cf9ad6f8..d825beb648c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.slf4j.Logger; @@ -34,7 +34,7 @@ public class MirrorTransaction extends TransactionImpl { boolean allowPageTransaction; - MirrorController controlInUse; + TargetMirrorController controlInUse; public MirrorTransaction(StorageManager storageManager) { super(storageManager); @@ -44,7 +44,7 @@ public MirrorTransaction(StorageManager storageManager) { @Override protected synchronized void afterCommit(List operationsToComplete) { - MirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse(); + TargetMirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse(); AMQPMirrorControllerTarget.setControllerInUse(controlInUse); try { super.afterCommit(operationsToComplete); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 17b54cf15b2..a7a93cd2f94 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec return; } + ArrayList offeredCapabilitiesList = new ArrayList<>(); + offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); // We need to check if the remote desires to send us tunneled core messages or not, and if // we support that we need to offer that back so it knows it can actually do core tunneling. if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) { - receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}); - } else { - receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); + offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + + // If the remote wants us to not forward any messages to other mirrors we need to offer that capability + if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)) { + offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } + receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{})); protonSession.addReplicaTarget(receiver); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ad0d42c20e2..f9826dea15d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -89,6 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private int credits = 0; private AtomicInteger pending = new AtomicInteger(0); private java.util.function.Consumer beforeDelivery; + private java.util.function.Predicate beforeDeliveryFiltering; protected volatile Runnable afterDelivery; protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER; @@ -120,6 +121,11 @@ public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer beforeDeliveryFiltering) { + this.beforeDeliveryFiltering = beforeDeliveryFiltering; + return this; + } + public ServerConsumer getBrokerConsumer() { return brokerConsumer; } @@ -475,6 +481,13 @@ public int deliverMessage(final MessageReference messageReference, final ServerC beforeDelivery.accept(messageReference); } + if (beforeDeliveryFiltering != null) { + boolean test = beforeDeliveryFiltering.test(messageReference); + if (test){ + return 0; + } + } + synchronized (creditsLock) { if (sender.getLocalState() == EndpointState.CLOSED) { return 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java index 944099c2a00..daf1be233ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme boolean queueCreation = true; + boolean noForward = false; + boolean queueRemoval = true; boolean messageAcknowledgements = true; @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation) return this; } + public boolean getNoForward() { + return noForward; + } + + public AMQPMirrorBrokerConnectionElement setNoForward(boolean noForward) { + this.noForward = noForward; + return this; + } + public boolean isQueueRemoval() { return queueRemoval; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6819b307ade..ea9656b399c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2194,10 +2194,11 @@ private void parseAMQPBrokerConnections(final Element e, boolean durable = getBooleanAttribute(e2, "durable", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); boolean sync = getBooleanAttribute(e2, "sync", false); + boolean noForward = !getBooleanAttribute(e2, "no-forward", false); String addressFilter = getAttributeValue(e2, "address-filter"); AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); - amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync); + amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setNoForward(noForward); connectionElement = amqpMirrorConnectionElement; connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java new file mode 100644 index 00000000000..bbddf19571c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.mirror; + +public interface TargetMirrorController extends MirrorController { + boolean isNoForward(); +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index afd809ad1a9..4d0e01c5994 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2306,6 +2306,14 @@ + + + + If this is true, the mirror at the opposite end of the link will not forward messages or instructions coming from this broker to any other mirrors down the line. + This is false by default. + + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java index 9796fe00417..0dbefbbbe36 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java @@ -20,6 +20,7 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_FORCED; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; @@ -27,6 +28,7 @@ import java.lang.invoke.MethodHandles; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -43,11 +45,14 @@ import javax.jms.Topic; import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; @@ -186,6 +191,43 @@ public void testBrokerHandlesSenderLinkOmitsMirrorCapability() throws Exception } } + @Test + @Timeout(20) + public void testBrokerHandlesSenderLinkOmitsNoForwardCapability() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS"); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString(), AMQPMirrorControllerSource.NO_FORWARD.toString()) + .respond() + .withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + peer.expectClose().withError(CONNECTION_FORCED.toString()).optional(); // Can hit the wire in rare instances. + peer.expectConnectionToDrop(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + // No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); + assertEquals(1, loggerHandler.countText("AMQ119018")); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.stop(); + } + } + } + @Test @Timeout(20) public void testBrokerAddsAddressAndQueue() throws Exception { @@ -229,6 +271,127 @@ public void testBrokerAddsAddressAndQueue() throws Exception { } } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + final Map brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()) + .respond() + .withOfferedCapabilities("amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final int AMQP_PORT_2 = BROKER_PORT_NUM + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setQueueCreation(true).setAddressFilter(getQueueName() + ",sometest")); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + BROKER_PORT_NUM).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(50); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + BROKER_PORT_NUM+"?amqp.idleTimeout=-1"); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2+"?amqp.idleTimeout=-1"); + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage("message")); + producer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + try (Connection conn = factory_2.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message", message.getText()); + consumer.close(); + } + + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.utils.Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + + // give some time to peer_3 to receive messages (if any) + Thread.sleep(100); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages are received here, this should error out + + // Then send messages on the broker directly connected to the peer, the messages should make it to the peer. + // Receiving these 3 messages in that order confirms that no previous data reched the Peer, therefore validating + // the test. + peer_3.expectTransfer().accept(); // Address create + peer_3.expectTransfer().accept(); // Queue create + peer_3.expectTransfer().withMessageFormat(AMQP_TUNNELED_CORE_MESSAGE_FORMAT).accept(); // Producer Message + + server_2.addAddressInfo(new AddressInfo(SimpleString.of("sometest"), RoutingType.ANYCAST)); + server_2.createQueue(QueueConfiguration.of("sometest").setRoutingType(RoutingType.ANYCAST)); + + try (Connection connection = factory_2.createConnection()) { + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createQueue("sometest")); + final TextMessage message = session.createTextMessage("test"); + + connection.start(); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.send(message); + } + + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.stop(); + server_2.stop(); + } + } + @Test @Timeout(20) public void testCreateDurableConsumerReplicatesAddressAndQueue() throws Exception { @@ -349,20 +512,41 @@ public void testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageF doTestProducerMessageIsMirroredWithCorrectMessageFormat(false); } + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(true, true); + } + + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunelingAndWithoutTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(false, true); + } + private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling) throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(tunneling, false); + } + + private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling, boolean noForward) throws Exception { final Map brokerProperties = new HashMap<>(); brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); final String[] capabilities; + ArrayList capabilitiesList = new ArrayList<>(); final int messageFormat; + capabilitiesList.add("amq.mirror"); if (tunneling) { - capabilities = new String[] {"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}; + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; } else { - capabilities = new String[] {"amq.mirror"}; messageFormat = 0; // AMQP default } + if (noForward) { + capabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD.toString()); + } + capabilities = capabilitiesList.toArray(new String[]{}); try (ProtonTestServer peer = new ProtonTestServer()) { peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); @@ -387,6 +571,7 @@ private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tun AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement(); mirrorElement.addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)); mirrorElement.setQueueCreation(true); + mirrorElement.setNoForward(noForward); AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java new file mode 100644 index 00000000000..94faf14bae8 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + + ActiveMQServer server_2; + ActiveMQServer server_3; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testNoForward() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + + /** + * + * the mirroring topology: + * + * v---------| + * 1 ------> 2 The link between 1 and 2 is noForward=true + * ^ + * v + * 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + + // send from 1, 2 receives, 3 don't. + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage("message ")); + } + Wait.assertEquals(1L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(1L, q2::getMessageCount, 1000, 100); + + // consume from 2, 1 and 2 counters go back to 0 + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message ", message.getText()); + consumer.close(); + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + + Thread.sleep(100); // some time to allow eventual loops + + // queue creation was originated on server, with noForward in place, + // the messages never reached server_3, for the rest of the test suite, + // we need server_3 to have access to the queue + createAddressAndQueues(server_3); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Queue q3 = server_3.locateQueue(getQueueName()); + assertNotNull(q3); + + // produce on 2. 1, 2 and 3 receive messages. + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 1. 1, 2, and 3 counters are back to 0 + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // produce on 2. 1, 2 and 3 receive messages. + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 3. 1, 2 counters are still at 10 and 3 is at 0. + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // consume on 2. 1, 2 and 3 counters are back to 0 + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // produce on 3. only 3 has messages. + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 3. 1, 2, and 3 counters are back to 0 + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java new file mode 100644 index 00000000000..e1d4d034d89 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPSquareMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_3.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_4.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + server_4.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server.locateQueue(getQueueName()); + assertNotNull(q2); + + Queue q3 = server.locateQueue(getQueueName()); + assertNotNull(q3); + + Queue q4 = server.locateQueue(getQueueName()); + assertNotNull(q4); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + ConnectionFactory factory4 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_4); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Thread.sleep(100); // some time to allow eventual loops + + Wait.assertEquals(40L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(30L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(20L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + } +} \ No newline at end of file