From 2434c4a0eae61aaa07466a77f2b140d5c0bc0c4c Mon Sep 17 00:00:00 2001 From: Lorcan Wogan <69468264+LWogan@users.noreply.github.com> Date: Thu, 14 Nov 2024 13:31:19 +0000 Subject: [PATCH] CORE-20867 Implement retry topic to handle persistent transient RPC Client errors (#6385) The current mediator messaging pattern in Corda can encounter an retry loop when transient errors are received from other Corda workers. This retry loop blocks flow topic partitions from progressing and it has been observed that the corda cluster affected can become permanently unstable due to the effects of consumer lag. This pattern is used by the flow worker to perform synchronous HTTP calls to various workers, including verification, token, crypto, uniqueness, and persistence workers. To address this issue, a separate Kafka topic is dedicated to handling retries. This will allow the primary ingestion topics to continue processing unaffected flows, while introducing finite retry logic for flows impacted by transient errors. Additionally AVRO version is bumped to fix a vulnerability --- .../events/impl/ExternalEventManager.kt | 14 ++- .../events/impl/ExternalEventManagerImpl.kt | 7 ++ .../mediator/FlowEventMediatorFactoryImpl.kt | 71 +++++++++++- .../ExternalEventRetryRequestHandler.kt | 69 +++++++++++ .../impl/FlowGlobalPostProcessorImpl.kt | 40 ++++--- .../impl/ExternalEventManagerImplTest.kt | 39 ++++++- .../FlowEventMediatorFactoryImplTest.kt | 4 + .../ExternalEventRetryRequestHandlerTest.kt | 74 ++++++++++++ .../impl/FlowGlobalPostProcessorImplTest.kt | 19 ++- gradle.properties | 4 +- .../mediator/processor/ConsumerProcessor.kt | 2 + .../mediator/processor/EventProcessor.kt | 109 +++++++++++++----- .../kotlin/net/corda/messaging/TestUtils.kt | 4 +- .../mediator/processor/EventProcessorTest.kt | 71 ++++++++++-- .../mediator/config/EventMediatorConfig.kt | 3 + .../config/EventMediatorConfigBuilder.kt | 10 ++ .../api/mediator/config/RetryConfig.kt | 14 +++ 17 files changed, 489 insertions(+), 65 deletions(-) create mode 100644 components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt create mode 100644 components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt create mode 100644 libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt index 35ae3298aed..352ea549847 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt @@ -1,6 +1,5 @@ package net.corda.flow.external.events.impl -import java.time.Instant import net.corda.data.flow.event.external.ExternalEvent import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.state.external.ExternalEventState @@ -8,6 +7,7 @@ import net.corda.flow.external.events.factory.ExternalEventFactory import net.corda.flow.external.events.factory.ExternalEventRecord import net.corda.messaging.api.records.Record import java.time.Duration +import java.time.Instant /** * [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s. @@ -86,4 +86,16 @@ interface ExternalEventManager { instant: Instant, retryWindow: Duration ): Pair?> + + /** + * Get the external event to send for the transient error retry scenario. + * Returns the event as is from the state. No additional checks required. + * @param externalEventState The [ExternalEventState] to get the event from. + * @param instant The current time. Used to set timestamp. + * @return The external event request to resend + * */ + fun getRetryEvent( + externalEventState: ExternalEventState, + instant: Instant, + ): Record<*, *> } \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt index 6927f481deb..15d08da0deb 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt @@ -174,6 +174,13 @@ class ExternalEventManagerImpl( return externalEventState to record } + override fun getRetryEvent( + externalEventState: ExternalEventState, + instant: Instant, + ): Record<*, *> { + return generateRecord(externalEventState, instant) + } + private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) { when { (externalEventState.sendTimestamp + retryWindow) >= instant -> { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 374f2fce4fc..0c4e8aaa676 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -1,8 +1,10 @@ package net.corda.flow.messaging.mediator +import com.typesafe.config.ConfigValueFactory import net.corda.avro.serialization.CordaAvroSerializationFactory import net.corda.data.crypto.wire.ops.flow.FlowOpsRequest import net.corda.data.flow.event.FlowEvent +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.output.FlowStatus import net.corda.data.flow.state.checkpoint.Checkpoint @@ -30,6 +32,7 @@ import net.corda.messaging.api.mediator.RoutingDestination.Companion.routeTo import net.corda.messaging.api.mediator.RoutingDestination.Type.ASYNCHRONOUS import net.corda.messaging.api.mediator.RoutingDestination.Type.SYNCHRONOUS import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder +import net.corda.messaging.api.mediator.config.RetryConfig import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory import net.corda.messaging.api.mediator.factory.MessageRouterFactory @@ -47,12 +50,14 @@ import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_END import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.WORKER_MEDIATOR_REPLICAS_FLOW_SESSION +import net.corda.schema.configuration.MessagingConfig.Bus.KAFKA_CONSUMER_MAX_POLL_RECORDS import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_THREAD_POOL_SIZE import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.LoggerFactory +import java.time.Instant import java.util.UUID @Suppress("LongParameterList") @@ -77,7 +82,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( private const val CONSUMER_GROUP = "FlowEventConsumer" private const val MESSAGE_BUS_CLIENT = "MessageBusClient" private const val RPC_CLIENT = "RpcClient" - + private const val RETRY_TOPIC_POLL_LIMIT = 5 + private const val RETRY_TOPIC = FLOW_EVENT_TOPIC + private const val TOKEN_RETRY = "TokenRetry" private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } @@ -123,12 +130,65 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .threadName("flow-event-mediator") .stateManager(stateManager) .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) + .retryConfig(RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) .build() + + /** + * Build a request to trigger a resend of external events via the flow event pipeline. + * Request id is calculated from the previous request payload when possible to allow for some validation in the pipeline. + * This validation is an enhancement and not strictly required. + * A new Timestamp is set on each request to ensure each request is unique for replay logic handling. + * @param key the key of the input record + * @param syncRpcRequest the previous sync request which failed. + * @return list of output retry events. + */ + private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : List> { + return try { + val requestId = getRequestId(syncRpcRequest) + val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() + .setRequestId(requestId) + .setTimestamp(Instant.now()) + .build() + val flowEvent = FlowEvent.newBuilder() + .setFlowId(key) + .setPayload(externalEventRetryRequest) + .build() + //ensure key is set correctly on new message destined for flow topic + val properties = syncRpcRequest.properties.toMutableMap().apply { this["key"] = key } + listOf(MediatorMessage(flowEvent, properties)) + } catch (ex: Exception) { + //In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor + // kicks in. This shouldn't be possible and is just a safety net. + logger.warn("Failed to generate a retry event for key $key. No retry will be triggered.", ex) + emptyList() + } + } + + /** + * Determine the external event request id where possible. + * Note, some token events have no request id as there is no response. + * For these use a hardcoded request id which will be ignored at the validation step. + * @param syncRpcRequest the previous request + * @return Request ID to set in the retry event. + */ + private fun getRequestId(syncRpcRequest: MediatorMessage): String { + return when (val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray)) { + is EntityRequest -> entityRequest.flowExternalEventContext.requestId + is FlowOpsRequest -> entityRequest.flowExternalEventContext.requestId + is LedgerPersistenceRequest -> entityRequest.flowExternalEventContext.requestId + is TransactionVerificationRequest -> entityRequest.flowExternalEventContext.requestId + is UniquenessCheckRequestAvro -> entityRequest.flowExternalEventContext.requestId + is TokenPoolCacheEvent -> TOKEN_RETRY + else -> "InvalidEntityType" + } + } + private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List { + val retryTopicMessagingConfig = getRetryTopicConfig(messagingConfig) val mediatorConsumerFactory: MutableList = mutableListOf( mediatorConsumerFactory(FLOW_START, messagingConfig), - mediatorConsumerFactory(FLOW_EVENT_TOPIC, messagingConfig) + mediatorConsumerFactory(FLOW_EVENT_TOPIC, retryTopicMessagingConfig) ) val mediatorReplicas = bootConfig.getIntOrDefault(WORKER_MEDIATOR_REPLICAS_FLOW_SESSION, 1) @@ -140,6 +200,10 @@ class FlowEventMediatorFactoryImpl @Activate constructor( return mediatorConsumerFactory } + private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig { + return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT)) + } + private fun mediatorConsumerFactory( topic: String, messagingConfig: SmartConfig @@ -178,8 +242,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( rpcEndpoint(VERIFICATION_WORKER_REST_ENDPOINT, VERIFICATION_PATH), SYNCHRONOUS) is UniquenessCheckRequestAvro -> routeTo(rpcClient, rpcEndpoint(UNIQUENESS_WORKER_REST_ENDPOINT, UNIQUENESS_PATH), SYNCHRONOUS) + //RETRIES will appear as FlowEvents is FlowEvent -> routeTo(messageBusClient, - FLOW_EVENT_TOPIC, ASYNCHRONOUS) + RETRY_TOPIC, ASYNCHRONOUS) is String -> routeTo(messageBusClient, // Handling external messaging message.properties[MSG_PROP_TOPIC] as String, ASYNCHRONOUS) else -> { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt new file mode 100644 index 00000000000..b1d6cf8c8c5 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -0,0 +1,69 @@ +package net.corda.flow.pipeline.handlers.events + +import net.corda.data.flow.event.external.ExternalEventResponse +import net.corda.data.flow.event.external.ExternalEventRetryRequest +import net.corda.flow.pipeline.events.FlowEventContext +import net.corda.flow.pipeline.exceptions.FlowEventException +import net.corda.utilities.debug +import org.osgi.service.component.annotations.Component +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * Handles pre-processing of events that are intended to trigger a resend of an external event. + * This can be triggered by the mediator in the event of transient errors. + */ +@Component(service = [FlowEventHandler::class]) +class ExternalEventRetryRequestHandler : FlowEventHandler { + + private companion object { + val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + private const val TOKEN_RETRY = "TokenRetry" + } + + override val type = ExternalEventRetryRequest::class.java + + override fun preProcess(context: FlowEventContext): FlowEventContext { + val checkpoint = context.checkpoint + val externalEventRetryRequest = context.inputEventPayload + + if (!checkpoint.doesExist) { + log.debug { + "Received a ${ExternalEventRetryRequest::class.simpleName} for flow [${context.inputEvent.flowId}] that " + + "does not exist. The event will be discarded. ${ExternalEventRetryRequest::class.simpleName}: " + + externalEventRetryRequest + } + throw FlowEventException( + "ExternalEventRetryRequestHandler received a ${ExternalEventRetryRequest::class.simpleName} for flow" + + " [${context.inputEvent.flowId}] that does not exist" + ) + } + + val externalEventState = checkpoint.externalEventState + val retryRequestId: String = externalEventRetryRequest.requestId + val externalEventStateRequestId = externalEventState?.requestId + if (externalEventState == null) { + log.debug { + "Received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " + + "for an ${ExternalEventResponse::class.simpleName}. " + + "${ExternalEventRetryRequest::class.simpleName}: $externalEventRetryRequest" + } + throw FlowEventException( + "ExternalEventRetryRequestHandler received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " + + "for an ${ExternalEventResponse::class.simpleName}" + ) + } + //Discard events not related. Some token requests do not contain the external event id so this validation will allow all token + // requests to be resent. e.g TokenForceClaimRelease + else if (externalEventStateRequestId != retryRequestId && retryRequestId != TOKEN_RETRY) { + throw FlowEventException( + "Discarding retry request received with requestId $retryRequestId. This is likely a stale record polled. Checkpoint " + + "is currently waiting to receive a response for requestId $externalEventStateRequestId" + ) + } + + return context + } +} diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt index 8b8801f3615..e9fa7b251a6 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt @@ -1,7 +1,9 @@ package net.corda.flow.pipeline.impl +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.event.mapper.ScheduleCleanup +import net.corda.data.flow.state.external.ExternalEventState import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.flow.external.events.impl.ExternalEventManager @@ -51,11 +53,11 @@ class FlowGlobalPostProcessorImpl @Activate constructor( postProcessPendingPlatformError(context) val outputRecords = getSessionEvents(context, now) + - getFlowMapperSessionCleanupEvents(context, now) + - getExternalEvent(context, now) + getFlowMapperSessionCleanupEvents(context, now) + + getExternalEvent(context, now) context.flowMetrics.flowEventCompleted(context.inputEvent.payload::class.java.name) - val metadata = getStateMetadata(context) + val metadata = updateFlowSessionMetadata(context) return context.copy( outputRecords = context.outputRecords + outputRecords, @@ -111,7 +113,7 @@ class FlowGlobalPostProcessorImpl @Activate constructor( if (!counterpartyExists) { val msg = "[${context.checkpoint.holdingIdentity.x500Name}] has failed to create a flow with counterparty: " + - "[${counterparty}] as the recipient doesn't exist in the network." + "[${counterparty}] as the recipient doesn't exist in the network." sessionManager.errorSession(sessionState) if (doesCheckpointExist) { log.debug { "$msg. Throwing FlowFatalException" } @@ -157,24 +159,26 @@ class FlowGlobalPostProcessorImpl @Activate constructor( * Check to see if any external events needs to be sent or resent. */ private fun getExternalEvent(context: FlowEventContext, now: Instant): List> { - val externalEventState = context.checkpoint.externalEventState - return if (externalEventState == null) { - listOf() - } else { - val retryWindow = context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW) - externalEventManager.getEventToSend(externalEventState, now, Duration.ofMillis(retryWindow)) - .let { (updatedExternalEventState, record) -> - context.checkpoint.externalEventState = updatedExternalEventState - if (record != null) { - listOf(record) - } else { - listOf() - } + val externalEventState = context.checkpoint.externalEventState ?: return emptyList() + + return when (context.inputEvent.payload) { + is ExternalEventRetryRequest -> getTransientRetryRequest(externalEventState, now) + else -> { + val retryWindow = Duration.ofMillis(context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW)) + externalEventManager.getEventToSend(externalEventState, now, retryWindow).let { (updatedState, record) -> + context.checkpoint.externalEventState = updatedState + listOfNotNull(record) } + } } } - private fun getStateMetadata(context: FlowEventContext): Metadata? { + private fun getTransientRetryRequest(externalEventState: ExternalEventState, now: Instant): + List> { + return listOf(externalEventManager.getRetryEvent(externalEventState, now)) + } + + private fun updateFlowSessionMetadata(context: FlowEventContext): Metadata? { val checkpoint = context.checkpoint // Find the earliest expiry time for any open sessions. val lastReceivedMessageTime = checkpoint.sessions.filter { diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt index 87bff35c100..4038583529f 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt @@ -1,9 +1,5 @@ package net.corda.flow.external.events.impl -import java.nio.ByteBuffer -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.stream.Stream import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer import net.corda.data.ExceptionEnvelope @@ -34,7 +30,11 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.verifyNoInteractions import org.mockito.kotlin.whenever +import java.nio.ByteBuffer import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.stream.Stream class ExternalEventManagerImplTest { @@ -530,4 +530,35 @@ class ExternalEventManagerImplTest { assertEquals(null, record) } + + @Test + fun `getRetryEvent returns an external event`() { + val now = Instant.now().truncatedTo(ChronoUnit.MILLIS) + val key = ByteBuffer.wrap(KEY.toByteArray()) + val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3)) + + val externalEvent = ExternalEvent().apply { + this.topic = TOPIC + this.key = key + this.payload = payload + this.timestamp = now.minusSeconds(10) + } + + val externalEventState = ExternalEventState().apply { + requestId = REQUEST_ID_1 + eventToSend = externalEvent + sendTimestamp = null + status = ExternalEventStateStatus(ExternalEventStateType.OK, null) + } + + val record = externalEventManager.getRetryEvent( + externalEventState, + now, + ) + + + assertEquals(TOPIC, record.topic) + assertEquals(key.array(), record.key) + assertEquals(payload.array(), record.value) + } } \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt index 0eb33f47c40..df31c6b8e05 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt @@ -30,6 +30,7 @@ import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MessagingClientFinder import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.configuration.ConfigKeys @@ -115,6 +116,9 @@ class FlowEventMediatorFactoryImplTest { assertThat(router.getDestination(MediatorMessage(UniquenessCheckRequestAvro())).endpoint).isEqualTo( endpoint(UNIQUENESS_PATH) ) + assertThat(router.getDestination(MediatorMessage(FlowEvent())).endpoint).isEqualTo( + FLOW_EVENT_TOPIC + ) // External messaging val externalMessagingKafkaTopic = "custom.kafka.topic" diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt new file mode 100644 index 00000000000..0c9a18bdce9 --- /dev/null +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt @@ -0,0 +1,74 @@ +package net.corda.flow.pipeline.handlers.events + +import net.corda.data.flow.event.external.ExternalEventRetryRequest +import net.corda.data.flow.state.external.ExternalEventState +import net.corda.flow.pipeline.exceptions.FlowEventException +import net.corda.flow.state.FlowCheckpoint +import net.corda.flow.test.utils.buildFlowEventContext +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever +import java.time.Instant + +class ExternalEventRetryRequestHandlerTest { + + private val externalEventRetryRequest = ExternalEventRetryRequest("requestId", Instant.now()) + + private val checkpoint = mock() + private val externalEventRetryRequestHandler = ExternalEventRetryRequestHandler() + + @Test + fun `does not throw a flow event exception if the checkpoint exists and it is the correct request id`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "requestId" }) + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + externalEventRetryRequestHandler.preProcess(context) + } + + @Test + fun `does not throw a flow event exception if the checkpoint exists and it a token request id`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "requestId" }) + + val context = buildFlowEventContext(checkpoint, ExternalEventRetryRequest("TokenRetry", Instant.now())) + externalEventRetryRequestHandler.preProcess(context) + } + + @Test + fun `throws a flow event exception if the checkpoint does not exist`() { + whenever(checkpoint.doesExist).thenReturn(false) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + + @Test + fun `throws a flow event exception if the flow is not waiting for an external event response`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(null) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + + @Test + fun `throws a flow event exception if the flow is waiting for a different external event response`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "OtherRequestId" }) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + +} \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt index 38b41ed43a3..dd1daf6280f 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt @@ -2,6 +2,7 @@ package net.corda.flow.pipeline.impl import net.corda.data.flow.FlowKey import net.corda.data.flow.event.SessionEvent +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.event.session.SessionData @@ -15,6 +16,7 @@ import net.corda.flow.BOB_X500_NAME import net.corda.flow.FLOW_ID_1 import net.corda.flow.REQUEST_ID_1 import net.corda.flow.external.events.impl.ExternalEventManager +import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.factory.FlowRecordFactory import net.corda.flow.state.FlowCheckpoint @@ -103,7 +105,7 @@ class FlowGlobalPostProcessorImplTest { private val membershipGroupReaderProvider = mock() private val membershipGroupReader = mock() private val checkpoint = mock() - private val testContext = buildFlowEventContext(checkpoint, Any()) + private lateinit var testContext: FlowEventContext private val flowGlobalPostProcessor = FlowGlobalPostProcessorImpl( externalEventManager, sessionManager, @@ -114,6 +116,7 @@ class FlowGlobalPostProcessorImplTest { @Suppress("Unused") @BeforeEach fun setup() { + testContext = buildFlowEventContext(checkpoint, Any()) whenever(checkpoint.sessions).thenReturn(listOf(sessionState1, sessionState2)) whenever(checkpoint.flowKey).thenReturn(FlowKey(FLOW_ID_1, ALICE_X500_HOLDING_IDENTITY)) whenever(checkpoint.holdingIdentity).thenReturn(ALICE_X500_HOLDING_IDENTITY.toCorda()) @@ -262,6 +265,20 @@ class FlowGlobalPostProcessorImplTest { verify(checkpoint).clearPendingPlatformError() } + @Test + fun `Adds external event record when there is a retry instruction`() { + val externalEventState = ExternalEventState() + + testContext = buildFlowEventContext(checkpoint, ExternalEventRetryRequest(REQUEST_ID_1, Instant.now())) + whenever(checkpoint.externalEventState).thenReturn(externalEventState) + whenever(externalEventManager.getRetryEvent(eq(externalEventState), any())) + .thenReturn(externalEventRecord) + + val outputContext = flowGlobalPostProcessor.postProcess(testContext) + + assertThat(outputContext.outputRecords).contains(externalEventRecord) + } + @Test fun `Adds external event record when there is an external event to send`() { val externalEventState = ExternalEventState() diff --git a/gradle.properties b/gradle.properties index 6afaca45353..6c6287a28ae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -33,13 +33,13 @@ activationVersion=1.2.0 ariesDynamicFrameworkExtensionVersion=1.3.6 antlrVersion=2.7.7 asmVersion=9.5 -avroVersion=1.11.3 +avroVersion=1.11.4 commonsVersion = 1.7 commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.1.53-beta+ +cordaApiVersion=5.2.1.54-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt index 446f5bcbab0..a0e3f7674f8 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt @@ -118,6 +118,8 @@ class ConsumerProcessor( metrics.processorTimer.recordCallable { try { val inputs = getInputs(consumer) + // If no records polled return early. + if (inputs.isEmpty()) return@recordCallable val outputs = processInputs(inputs) categorizeOutputs(outputs, failureCounts) commit(consumer, outputs, failureCounts) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index 7463fcf6136..24a67a269c7 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -15,6 +15,7 @@ import net.corda.messaging.api.records.Record import net.corda.messaging.mediator.StateManagerHelper import net.corda.messaging.mediator.metrics.EventMediatorMetrics import net.corda.tracing.addTraceContextToRecord +import org.slf4j.LoggerFactory /** * Class to process records received from the consumer. @@ -31,6 +32,9 @@ class EventProcessor( ) { private val metrics = EventMediatorMetrics(config.name) + private val retryConfig = config.retryConfig + private val logger = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}") + /** * Process a group of events. @@ -74,13 +78,15 @@ class EventProcessor( var processorState = inputProcessorState val asyncOutputs = mutableMapOf, MutableList>>() val stateChangeAndOperation = try { + var isNoopState = false input.records.forEach { consumerInputEvent -> - val (updatedProcessorState, newAsyncOutputs) = processConsumerInput(consumerInputEvent, processorState, key) + val (updatedProcessorState, newAsyncOutputs, isNoop) = processConsumerInput(consumerInputEvent, processorState, key) processorState = updatedProcessorState asyncOutputs.addOutputs(consumerInputEvent, newAsyncOutputs) + if (isNoop) isNoopState = true } val state = stateManagerHelper.createOrUpdateState(key.toString(), inputState, processorState) - stateChangeAndOperation(inputState, state) + if (isNoopState) StateChangeAndOperation.Noop else stateChangeAndOperation(inputState, state) } catch (e: EventProcessorSyncEventsIntermittentException) { asyncOutputs.clear() StateChangeAndOperation.Transient @@ -127,10 +133,11 @@ class EventProcessor( consumerInputEvent: Record, processorState: StateAndEventProcessor.State?, key: K, - ): Pair?, List>> { + ): ConsumerInputOutput { var processorStateUpdated = processorState val newAsyncOutputs = mutableListOf>() val consumerInputHash = mediatorInputService.getHash(consumerInputEvent) + val isRetryTopic = consumerInputEvent.topic == config.retryConfig?.retryTopic val queue = ArrayDeque(listOf(consumerInputEvent)) while (queue.isNotEmpty()) { val event = getNextEvent(queue, consumerInputHash) @@ -141,14 +148,19 @@ class EventProcessor( } newAsyncOutputs.addAll(asyncEvents) try { - queue.addAll(processSyncEvents(key, syncEvents)) - } catch (e: CordaMessageAPIIntermittentException) { - throw EventProcessorSyncEventsIntermittentException(processorStateUpdated, e) + val (syncResponses, isNoopRetry, asyncOutputs) = processSyncEvents(key, syncEvents, isRetryTopic) + newAsyncOutputs.addAll(asyncOutputs) + queue.addAll(syncResponses) + if (isNoopRetry) { + // return early if no state update is needed + return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs, true) + } } catch (e: Exception) { + logger.warn("Failed process synchronous events for key $key", e) throw EventProcessorSyncEventsFatalException(processorStateUpdated, e) } } - return Pair(processorStateUpdated, newAsyncOutputs) + return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs) } private fun getNextEvent( @@ -177,32 +189,52 @@ class EventProcessor( /** * Send any synchronous events immediately and feed results back onto the queue. - */ + * If a sync request returns from the RPC client with a transient error and retry is enabled + * then push a retry event onto the retry topic. + * If retrying again via the retry topic and transient errors occur, resend retry event and do not update state + **/ private fun processSyncEvents( key: K, - syncEvents: List> - ): List> { - return syncEvents.mapNotNull { message -> + syncEvents: List>, + isRetryTopic: Boolean + ): SyncProcessingOutput { + val outputEvents = syncEvents.mapNotNull { message -> val destination = messageRouter.getDestination(message) - @Suppress("UNCHECKED_CAST") - val reply = with(destination) { - message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) - client.send(message) as MediatorMessage? - } - reply?.let { - addTraceContextToRecord( - Record( - "", - key, - reply.payload, - 0, - listOf(Pair(SYNC_RESPONSE_HEADER, "true")) - ), - message.properties - ) + try { + @Suppress("UNCHECKED_CAST") + val reply = with(destination) { + message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) + client.send(message) as MediatorMessage? + } + reply?.let { + addTraceContextToRecord( + Record( + "", + key, + reply.payload, + 0, + listOf(Pair(SYNC_RESPONSE_HEADER, "true")) + ), + message.properties + ) + } + } catch (e: CordaMessageAPIIntermittentException) { + val asyncOutputEvents: MutableList> = mutableListOf() + if (retryConfig != null) { + retryConfig.buildRetryRequest?.let { it(key, message) }?.let { + asyncOutputEvents.addAll(it) + } + } + + // If we're on the retry topic and run into another transient error, exit early and do not update the state to save + // performance. + // If we are not on the retry topic then we need to save the state before adding the retry event. This will allow the + // flow cleanup processors to execute on an idle flow checkpoint + return SyncProcessingOutput(emptyList(), isRetryTopic, asyncOutputEvents) } } + return SyncProcessingOutput(outputEvents) } private fun convertToMessage(record: Record<*, *>): MediatorMessage { @@ -219,4 +251,27 @@ class EventProcessor( private fun List>.toMessageProperties() = associateTo(mutableMapOf()) { (key, value) -> key to (value as Any) } + + + /** + * The outputs from processing a single consumer input event from the bus. + * This will includes updates from all sync events which are processed as a result of a consumer input + * @property updatedState The state and event processor output state after processing + */ + data class ConsumerInputOutput( + val updatedState: StateAndEventProcessor.State?, + val outputEvents: List>, + val isNoop: Boolean = false + ) + + /** + * The outputs from processing a single consumer input event from the bus. + * This will includes updates from all sync events which are processed as a result of a consumer input + * @property updatedState The state and event processor output state after processing + */ + data class SyncProcessingOutput( + val syncResponses: List>, + val isNoopRetry: Boolean = false, + val asyncOutputs: List> = emptyList() + ) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt index 928ebeea7b2..7180fa6f25e 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt @@ -50,10 +50,10 @@ fun generateMockCordaConsumerRecordList(numberOfRecords: Long, topic: String, pa /** * Generate [recordCount] string key/value records */ -fun getStringRecords(recordCount: Int, key: String): List> { +fun getStringRecords(recordCount: Int, key: String, topic: String = "topic"): List> { val records = mutableListOf>() for (j in 1..recordCount) { - records.add(Record("topic", key, j.toString())) + records.add(Record(topic, key, j.toString())) } return records diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index 0969349bd3f..bd33f7cf4d8 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -1,6 +1,7 @@ package net.corda.messaging.mediator.processor import net.corda.libs.configuration.SmartConfigImpl +import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException @@ -10,6 +11,7 @@ import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.RoutingDestination import net.corda.messaging.api.mediator.config.EventMediatorConfig +import net.corda.messaging.api.mediator.config.RetryConfig import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.Response @@ -34,6 +36,7 @@ import java.util.UUID @Execution(ExecutionMode.SAME_THREAD) class EventProcessorTest { private lateinit var eventMediatorConfig: EventMediatorConfig + private lateinit var eventMediatorRetryConfig: EventMediatorConfig private lateinit var stateManagerHelper: StateManagerHelper private lateinit var client: MessagingClient private lateinit var messageRouter: MessageRouter @@ -42,6 +45,7 @@ class EventProcessorTest { private lateinit var eventProcessor: EventProcessor private val inputState1: State = mock() + private val retryTopic: String = "flow.event" private val asyncMessage: String = "ASYNC_PAYLOAD" private val syncMessage: String = "SYNC_PAYLOAD" private val updatedProcessingState = StateAndEventProcessor.State("bar", null) @@ -65,6 +69,8 @@ class EventProcessorTest { } else RoutingDestination(client, "endpoint", RoutingDestination.Type.ASYNCHRONOUS) } eventMediatorConfig = buildTestConfig() + val retryConfig = RetryConfig(retryTopic, buildRetryRequest) + eventMediatorRetryConfig = buildTestConfig(retryConfig) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { Response( @@ -181,9 +187,11 @@ class EventProcessorTest { } @Test - fun `when sync processing fails with a transient error, a transient state change signal is sent`() { - val mockedState = mock() + fun `when sync processing fails with a transient error, retry is OFF, a CREATE state change signal is set and no retry event is sent` + () { val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key"), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), anyOrNull())).thenReturn(mockedState) whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { @@ -194,17 +202,61 @@ class EventProcessorTest { ) ) } - whenever(stateManagerHelper.failStateProcessing(any(), eq(null), any())).thenReturn(mockedState) - val outputMap = eventProcessor.processEvents(input) val output = outputMap["key"] assertEquals(emptyList>(), output?.asyncOutputs) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(mockedState) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) + } + + @Test + fun `when transient error while processing retry topic, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key", retryTopic), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), anyOrNull())).thenReturn(mockedState) + whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) + whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { + Response( + null, + listOf( + Record("", "key", syncMessage) + ) + ) + } + eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) + val outputMap = eventProcessor.processEvents(input) + + val output = outputMap["key"] + assertEquals(1, output?.asyncOutputs?.size) assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(null) - assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Transient::class.java) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Noop::class.java) + } + + @Test + fun `when transient error while processing event topic, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key", "flow.start"), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), any())).thenReturn(mockedState) + whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) + whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { + Response( + StateAndEventProcessor.State("", Metadata(mapOf())), + listOf( + Record("", "key", syncMessage) + ) + ) + } + eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) + val outputMap = eventProcessor.processEvents(input) + + val output = outputMap["key"] + assertEquals(1, output?.asyncOutputs?.size) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(mockedState) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) } - private fun buildTestConfig() = EventMediatorConfig( + private fun buildTestConfig(retryConfig: RetryConfig? = null) = EventMediatorConfig( "", SmartConfigImpl.empty(), emptyList(), @@ -214,6 +266,11 @@ class EventProcessorTest { 1, "", mock(), - 20 + 20, + retryConfig ) + + private val buildRetryRequest: ((String, MediatorMessage) -> List>) = { _, message -> + listOf(message) + } } diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt index 1d49c60bfb9..032ce9455b0 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt @@ -28,6 +28,8 @@ import java.time.Duration * @property stateManager State manager. * @property minGroupSize Minimum size for group of records passed to task manager for processing in a single thread. Does not block if * group size is not met by polled record count. + * @property retryConfig Topic to push retry events to as well as the function to build retry events. Set to null to not retry transient + * errors originating from the message pattern and fail early instead. */ data class EventMediatorConfig( val name: String, @@ -40,6 +42,7 @@ data class EventMediatorConfig( val threadName: String, val stateManager: StateManager, val minGroupSize: Int, + val retryConfig: RetryConfig? = null ) { /** * Timeout for polling consumers. diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt index a6e502a6b5c..b928ea3eb78 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt @@ -27,6 +27,7 @@ class EventMediatorConfigBuilder { private var threadName: String? = null private var stateManager: StateManager? = null private var minGroupSize: Int? = null + private var retryConfig: RetryConfig? = null /** Sets name for [MultiSourceEventMediator]. */ fun name(name: String) = @@ -74,6 +75,13 @@ class EventMediatorConfigBuilder { fun stateManager(stateManager: StateManager) = apply { this.stateManager = stateManager } + /** + * Sets the topic to push retry events triggered by transient errors in the message pattern when sending RPC calls. + * As well as setting how to build a retry event from the sync request + */ + fun retryConfig(retryConfig: RetryConfig) = + apply { this.retryConfig = retryConfig } + /** Builds [EventMediatorConfig]. */ fun build(): EventMediatorConfig { check(consumerFactories.isNotEmpty()) { "At least on consumer factory has to be set" } @@ -89,6 +97,8 @@ class EventMediatorConfigBuilder { threadName = checkNotNull(threadName) { "Thread name not set" }, stateManager = checkNotNull(stateManager) { "State manager not set" }, minGroupSize = checkNotNull(minGroupSize) { "Min group size not set" }, + retryConfig = retryConfig ) } + } \ No newline at end of file diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt new file mode 100644 index 00000000000..bf7009abd8a --- /dev/null +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt @@ -0,0 +1,14 @@ +package net.corda.messaging.api.mediator.config + +import net.corda.messaging.api.mediator.MediatorMessage + +/** + * Config used to setup retry events in the mediator. + * These are used to retry transient errors that occur. + * @property retryTopic The topic to send retry events to + * @property buildRetryRequest lambda used to generate the retry requests + */ +data class RetryConfig( + val retryTopic: String, + val buildRetryRequest: ((K, MediatorMessage) -> List>)? = null, +)