diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt index 35ae3298aed..352ea549847 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManager.kt @@ -1,6 +1,5 @@ package net.corda.flow.external.events.impl -import java.time.Instant import net.corda.data.flow.event.external.ExternalEvent import net.corda.data.flow.event.external.ExternalEventResponse import net.corda.data.flow.state.external.ExternalEventState @@ -8,6 +7,7 @@ import net.corda.flow.external.events.factory.ExternalEventFactory import net.corda.flow.external.events.factory.ExternalEventRecord import net.corda.messaging.api.records.Record import java.time.Duration +import java.time.Instant /** * [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s. @@ -86,4 +86,16 @@ interface ExternalEventManager { instant: Instant, retryWindow: Duration ): Pair?> + + /** + * Get the external event to send for the transient error retry scenario. + * Returns the event as is from the state. No additional checks required. + * @param externalEventState The [ExternalEventState] to get the event from. + * @param instant The current time. Used to set timestamp. + * @return The external event request to resend + * */ + fun getRetryEvent( + externalEventState: ExternalEventState, + instant: Instant, + ): Record<*, *> } \ No newline at end of file diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt index 6927f481deb..15d08da0deb 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImpl.kt @@ -174,6 +174,13 @@ class ExternalEventManagerImpl( return externalEventState to record } + override fun getRetryEvent( + externalEventState: ExternalEventState, + instant: Instant, + ): Record<*, *> { + return generateRecord(externalEventState, instant) + } + private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) { when { (externalEventState.sendTimestamp + retryWindow) >= instant -> { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt index 374f2fce4fc..0c4e8aaa676 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/messaging/mediator/FlowEventMediatorFactoryImpl.kt @@ -1,8 +1,10 @@ package net.corda.flow.messaging.mediator +import com.typesafe.config.ConfigValueFactory import net.corda.avro.serialization.CordaAvroSerializationFactory import net.corda.data.crypto.wire.ops.flow.FlowOpsRequest import net.corda.data.flow.event.FlowEvent +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.output.FlowStatus import net.corda.data.flow.state.checkpoint.Checkpoint @@ -30,6 +32,7 @@ import net.corda.messaging.api.mediator.RoutingDestination.Companion.routeTo import net.corda.messaging.api.mediator.RoutingDestination.Type.ASYNCHRONOUS import net.corda.messaging.api.mediator.RoutingDestination.Type.SYNCHRONOUS import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder +import net.corda.messaging.api.mediator.config.RetryConfig import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory import net.corda.messaging.api.mediator.factory.MessageRouterFactory @@ -47,12 +50,14 @@ import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_END import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT import net.corda.schema.configuration.BootConfig.WORKER_MEDIATOR_REPLICAS_FLOW_SESSION +import net.corda.schema.configuration.MessagingConfig.Bus.KAFKA_CONSUMER_MAX_POLL_RECORDS import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_THREAD_POOL_SIZE import org.osgi.service.component.annotations.Activate import org.osgi.service.component.annotations.Component import org.osgi.service.component.annotations.Reference import org.slf4j.LoggerFactory +import java.time.Instant import java.util.UUID @Suppress("LongParameterList") @@ -77,7 +82,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( private const val CONSUMER_GROUP = "FlowEventConsumer" private const val MESSAGE_BUS_CLIENT = "MessageBusClient" private const val RPC_CLIENT = "RpcClient" - + private const val RETRY_TOPIC_POLL_LIMIT = 5 + private const val RETRY_TOPIC = FLOW_EVENT_TOPIC + private const val TOKEN_RETRY = "TokenRetry" private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) } @@ -123,12 +130,65 @@ class FlowEventMediatorFactoryImpl @Activate constructor( .threadName("flow-event-mediator") .stateManager(stateManager) .minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT)) + .retryConfig(RetryConfig(RETRY_TOPIC, ::buildRetryRequest)) .build() + + /** + * Build a request to trigger a resend of external events via the flow event pipeline. + * Request id is calculated from the previous request payload when possible to allow for some validation in the pipeline. + * This validation is an enhancement and not strictly required. + * A new Timestamp is set on each request to ensure each request is unique for replay logic handling. + * @param key the key of the input record + * @param syncRpcRequest the previous sync request which failed. + * @return list of output retry events. + */ + private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage) : List> { + return try { + val requestId = getRequestId(syncRpcRequest) + val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder() + .setRequestId(requestId) + .setTimestamp(Instant.now()) + .build() + val flowEvent = FlowEvent.newBuilder() + .setFlowId(key) + .setPayload(externalEventRetryRequest) + .build() + //ensure key is set correctly on new message destined for flow topic + val properties = syncRpcRequest.properties.toMutableMap().apply { this["key"] = key } + listOf(MediatorMessage(flowEvent, properties)) + } catch (ex: Exception) { + //In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor + // kicks in. This shouldn't be possible and is just a safety net. + logger.warn("Failed to generate a retry event for key $key. No retry will be triggered.", ex) + emptyList() + } + } + + /** + * Determine the external event request id where possible. + * Note, some token events have no request id as there is no response. + * For these use a hardcoded request id which will be ignored at the validation step. + * @param syncRpcRequest the previous request + * @return Request ID to set in the retry event. + */ + private fun getRequestId(syncRpcRequest: MediatorMessage): String { + return when (val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray)) { + is EntityRequest -> entityRequest.flowExternalEventContext.requestId + is FlowOpsRequest -> entityRequest.flowExternalEventContext.requestId + is LedgerPersistenceRequest -> entityRequest.flowExternalEventContext.requestId + is TransactionVerificationRequest -> entityRequest.flowExternalEventContext.requestId + is UniquenessCheckRequestAvro -> entityRequest.flowExternalEventContext.requestId + is TokenPoolCacheEvent -> TOKEN_RETRY + else -> "InvalidEntityType" + } + } + private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List { + val retryTopicMessagingConfig = getRetryTopicConfig(messagingConfig) val mediatorConsumerFactory: MutableList = mutableListOf( mediatorConsumerFactory(FLOW_START, messagingConfig), - mediatorConsumerFactory(FLOW_EVENT_TOPIC, messagingConfig) + mediatorConsumerFactory(FLOW_EVENT_TOPIC, retryTopicMessagingConfig) ) val mediatorReplicas = bootConfig.getIntOrDefault(WORKER_MEDIATOR_REPLICAS_FLOW_SESSION, 1) @@ -140,6 +200,10 @@ class FlowEventMediatorFactoryImpl @Activate constructor( return mediatorConsumerFactory } + private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig { + return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT)) + } + private fun mediatorConsumerFactory( topic: String, messagingConfig: SmartConfig @@ -178,8 +242,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor( rpcEndpoint(VERIFICATION_WORKER_REST_ENDPOINT, VERIFICATION_PATH), SYNCHRONOUS) is UniquenessCheckRequestAvro -> routeTo(rpcClient, rpcEndpoint(UNIQUENESS_WORKER_REST_ENDPOINT, UNIQUENESS_PATH), SYNCHRONOUS) + //RETRIES will appear as FlowEvents is FlowEvent -> routeTo(messageBusClient, - FLOW_EVENT_TOPIC, ASYNCHRONOUS) + RETRY_TOPIC, ASYNCHRONOUS) is String -> routeTo(messageBusClient, // Handling external messaging message.properties[MSG_PROP_TOPIC] as String, ASYNCHRONOUS) else -> { diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt new file mode 100644 index 00000000000..b1d6cf8c8c5 --- /dev/null +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandler.kt @@ -0,0 +1,69 @@ +package net.corda.flow.pipeline.handlers.events + +import net.corda.data.flow.event.external.ExternalEventResponse +import net.corda.data.flow.event.external.ExternalEventRetryRequest +import net.corda.flow.pipeline.events.FlowEventContext +import net.corda.flow.pipeline.exceptions.FlowEventException +import net.corda.utilities.debug +import org.osgi.service.component.annotations.Component +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * Handles pre-processing of events that are intended to trigger a resend of an external event. + * This can be triggered by the mediator in the event of transient errors. + */ +@Component(service = [FlowEventHandler::class]) +class ExternalEventRetryRequestHandler : FlowEventHandler { + + private companion object { + val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + private const val TOKEN_RETRY = "TokenRetry" + } + + override val type = ExternalEventRetryRequest::class.java + + override fun preProcess(context: FlowEventContext): FlowEventContext { + val checkpoint = context.checkpoint + val externalEventRetryRequest = context.inputEventPayload + + if (!checkpoint.doesExist) { + log.debug { + "Received a ${ExternalEventRetryRequest::class.simpleName} for flow [${context.inputEvent.flowId}] that " + + "does not exist. The event will be discarded. ${ExternalEventRetryRequest::class.simpleName}: " + + externalEventRetryRequest + } + throw FlowEventException( + "ExternalEventRetryRequestHandler received a ${ExternalEventRetryRequest::class.simpleName} for flow" + + " [${context.inputEvent.flowId}] that does not exist" + ) + } + + val externalEventState = checkpoint.externalEventState + val retryRequestId: String = externalEventRetryRequest.requestId + val externalEventStateRequestId = externalEventState?.requestId + if (externalEventState == null) { + log.debug { + "Received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " + + "for an ${ExternalEventResponse::class.simpleName}. " + + "${ExternalEventRetryRequest::class.simpleName}: $externalEventRetryRequest" + } + throw FlowEventException( + "ExternalEventRetryRequestHandler received an ${ExternalEventRetryRequest::class.simpleName} with request id: " + + "$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " + + "for an ${ExternalEventResponse::class.simpleName}" + ) + } + //Discard events not related. Some token requests do not contain the external event id so this validation will allow all token + // requests to be resent. e.g TokenForceClaimRelease + else if (externalEventStateRequestId != retryRequestId && retryRequestId != TOKEN_RETRY) { + throw FlowEventException( + "Discarding retry request received with requestId $retryRequestId. This is likely a stale record polled. Checkpoint " + + "is currently waiting to receive a response for requestId $externalEventStateRequestId" + ) + } + + return context + } +} diff --git a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt index 8b8801f3615..e9fa7b251a6 100644 --- a/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt +++ b/components/flow/flow-service/src/main/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImpl.kt @@ -1,7 +1,9 @@ package net.corda.flow.pipeline.impl +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.event.mapper.ScheduleCleanup +import net.corda.data.flow.state.external.ExternalEventState import net.corda.data.flow.state.session.SessionState import net.corda.data.flow.state.session.SessionStateType import net.corda.flow.external.events.impl.ExternalEventManager @@ -51,11 +53,11 @@ class FlowGlobalPostProcessorImpl @Activate constructor( postProcessPendingPlatformError(context) val outputRecords = getSessionEvents(context, now) + - getFlowMapperSessionCleanupEvents(context, now) + - getExternalEvent(context, now) + getFlowMapperSessionCleanupEvents(context, now) + + getExternalEvent(context, now) context.flowMetrics.flowEventCompleted(context.inputEvent.payload::class.java.name) - val metadata = getStateMetadata(context) + val metadata = updateFlowSessionMetadata(context) return context.copy( outputRecords = context.outputRecords + outputRecords, @@ -111,7 +113,7 @@ class FlowGlobalPostProcessorImpl @Activate constructor( if (!counterpartyExists) { val msg = "[${context.checkpoint.holdingIdentity.x500Name}] has failed to create a flow with counterparty: " + - "[${counterparty}] as the recipient doesn't exist in the network." + "[${counterparty}] as the recipient doesn't exist in the network." sessionManager.errorSession(sessionState) if (doesCheckpointExist) { log.debug { "$msg. Throwing FlowFatalException" } @@ -157,24 +159,26 @@ class FlowGlobalPostProcessorImpl @Activate constructor( * Check to see if any external events needs to be sent or resent. */ private fun getExternalEvent(context: FlowEventContext, now: Instant): List> { - val externalEventState = context.checkpoint.externalEventState - return if (externalEventState == null) { - listOf() - } else { - val retryWindow = context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW) - externalEventManager.getEventToSend(externalEventState, now, Duration.ofMillis(retryWindow)) - .let { (updatedExternalEventState, record) -> - context.checkpoint.externalEventState = updatedExternalEventState - if (record != null) { - listOf(record) - } else { - listOf() - } + val externalEventState = context.checkpoint.externalEventState ?: return emptyList() + + return when (context.inputEvent.payload) { + is ExternalEventRetryRequest -> getTransientRetryRequest(externalEventState, now) + else -> { + val retryWindow = Duration.ofMillis(context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW)) + externalEventManager.getEventToSend(externalEventState, now, retryWindow).let { (updatedState, record) -> + context.checkpoint.externalEventState = updatedState + listOfNotNull(record) } + } } } - private fun getStateMetadata(context: FlowEventContext): Metadata? { + private fun getTransientRetryRequest(externalEventState: ExternalEventState, now: Instant): + List> { + return listOf(externalEventManager.getRetryEvent(externalEventState, now)) + } + + private fun updateFlowSessionMetadata(context: FlowEventContext): Metadata? { val checkpoint = context.checkpoint // Find the earliest expiry time for any open sessions. val lastReceivedMessageTime = checkpoint.sessions.filter { diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt index 87bff35c100..4038583529f 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/external/events/impl/ExternalEventManagerImplTest.kt @@ -1,9 +1,5 @@ package net.corda.flow.external.events.impl -import java.nio.ByteBuffer -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.stream.Stream import net.corda.avro.serialization.CordaAvroDeserializer import net.corda.avro.serialization.CordaAvroSerializer import net.corda.data.ExceptionEnvelope @@ -34,7 +30,11 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.verifyNoInteractions import org.mockito.kotlin.whenever +import java.nio.ByteBuffer import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.stream.Stream class ExternalEventManagerImplTest { @@ -530,4 +530,35 @@ class ExternalEventManagerImplTest { assertEquals(null, record) } + + @Test + fun `getRetryEvent returns an external event`() { + val now = Instant.now().truncatedTo(ChronoUnit.MILLIS) + val key = ByteBuffer.wrap(KEY.toByteArray()) + val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3)) + + val externalEvent = ExternalEvent().apply { + this.topic = TOPIC + this.key = key + this.payload = payload + this.timestamp = now.minusSeconds(10) + } + + val externalEventState = ExternalEventState().apply { + requestId = REQUEST_ID_1 + eventToSend = externalEvent + sendTimestamp = null + status = ExternalEventStateStatus(ExternalEventStateType.OK, null) + } + + val record = externalEventManager.getRetryEvent( + externalEventState, + now, + ) + + + assertEquals(TOPIC, record.topic) + assertEquals(key.array(), record.key) + assertEquals(payload.array(), record.value) + } } \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt index 0eb33f47c40..df31c6b8e05 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/messaging/FlowEventMediatorFactoryImplTest.kt @@ -30,6 +30,7 @@ import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory import net.corda.messaging.api.mediator.factory.MessagingClientFactoryFactory import net.corda.messaging.api.mediator.factory.MessagingClientFinder import net.corda.messaging.api.mediator.factory.MultiSourceEventMediatorFactory +import net.corda.schema.Schemas.Flow.FLOW_EVENT_TOPIC import net.corda.schema.Schemas.Flow.FLOW_MAPPER_SESSION_OUT import net.corda.schema.Schemas.Flow.FLOW_STATUS_TOPIC import net.corda.schema.configuration.ConfigKeys @@ -115,6 +116,9 @@ class FlowEventMediatorFactoryImplTest { assertThat(router.getDestination(MediatorMessage(UniquenessCheckRequestAvro())).endpoint).isEqualTo( endpoint(UNIQUENESS_PATH) ) + assertThat(router.getDestination(MediatorMessage(FlowEvent())).endpoint).isEqualTo( + FLOW_EVENT_TOPIC + ) // External messaging val externalMessagingKafkaTopic = "custom.kafka.topic" diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt new file mode 100644 index 00000000000..0c9a18bdce9 --- /dev/null +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/handlers/events/ExternalEventRetryRequestHandlerTest.kt @@ -0,0 +1,74 @@ +package net.corda.flow.pipeline.handlers.events + +import net.corda.data.flow.event.external.ExternalEventRetryRequest +import net.corda.data.flow.state.external.ExternalEventState +import net.corda.flow.pipeline.exceptions.FlowEventException +import net.corda.flow.state.FlowCheckpoint +import net.corda.flow.test.utils.buildFlowEventContext +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever +import java.time.Instant + +class ExternalEventRetryRequestHandlerTest { + + private val externalEventRetryRequest = ExternalEventRetryRequest("requestId", Instant.now()) + + private val checkpoint = mock() + private val externalEventRetryRequestHandler = ExternalEventRetryRequestHandler() + + @Test + fun `does not throw a flow event exception if the checkpoint exists and it is the correct request id`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "requestId" }) + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + externalEventRetryRequestHandler.preProcess(context) + } + + @Test + fun `does not throw a flow event exception if the checkpoint exists and it a token request id`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "requestId" }) + + val context = buildFlowEventContext(checkpoint, ExternalEventRetryRequest("TokenRetry", Instant.now())) + externalEventRetryRequestHandler.preProcess(context) + } + + @Test + fun `throws a flow event exception if the checkpoint does not exist`() { + whenever(checkpoint.doesExist).thenReturn(false) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + + @Test + fun `throws a flow event exception if the flow is not waiting for an external event response`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(null) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + + @Test + fun `throws a flow event exception if the flow is waiting for a different external event response`() { + whenever(checkpoint.doesExist).thenReturn(true) + whenever(checkpoint.externalEventState).thenReturn(ExternalEventState().apply { requestId = "OtherRequestId" }) + + val context = buildFlowEventContext(checkpoint, externalEventRetryRequest) + + assertThrows { + externalEventRetryRequestHandler.preProcess(context) + } + } + +} \ No newline at end of file diff --git a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt index 38b41ed43a3..dd1daf6280f 100644 --- a/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt +++ b/components/flow/flow-service/src/test/kotlin/net/corda/flow/pipeline/impl/FlowGlobalPostProcessorImplTest.kt @@ -2,6 +2,7 @@ package net.corda.flow.pipeline.impl import net.corda.data.flow.FlowKey import net.corda.data.flow.event.SessionEvent +import net.corda.data.flow.event.external.ExternalEventRetryRequest import net.corda.data.flow.event.mapper.FlowMapperEvent import net.corda.data.flow.event.mapper.ScheduleCleanup import net.corda.data.flow.event.session.SessionData @@ -15,6 +16,7 @@ import net.corda.flow.BOB_X500_NAME import net.corda.flow.FLOW_ID_1 import net.corda.flow.REQUEST_ID_1 import net.corda.flow.external.events.impl.ExternalEventManager +import net.corda.flow.pipeline.events.FlowEventContext import net.corda.flow.pipeline.exceptions.FlowFatalException import net.corda.flow.pipeline.factory.FlowRecordFactory import net.corda.flow.state.FlowCheckpoint @@ -103,7 +105,7 @@ class FlowGlobalPostProcessorImplTest { private val membershipGroupReaderProvider = mock() private val membershipGroupReader = mock() private val checkpoint = mock() - private val testContext = buildFlowEventContext(checkpoint, Any()) + private lateinit var testContext: FlowEventContext private val flowGlobalPostProcessor = FlowGlobalPostProcessorImpl( externalEventManager, sessionManager, @@ -114,6 +116,7 @@ class FlowGlobalPostProcessorImplTest { @Suppress("Unused") @BeforeEach fun setup() { + testContext = buildFlowEventContext(checkpoint, Any()) whenever(checkpoint.sessions).thenReturn(listOf(sessionState1, sessionState2)) whenever(checkpoint.flowKey).thenReturn(FlowKey(FLOW_ID_1, ALICE_X500_HOLDING_IDENTITY)) whenever(checkpoint.holdingIdentity).thenReturn(ALICE_X500_HOLDING_IDENTITY.toCorda()) @@ -262,6 +265,20 @@ class FlowGlobalPostProcessorImplTest { verify(checkpoint).clearPendingPlatformError() } + @Test + fun `Adds external event record when there is a retry instruction`() { + val externalEventState = ExternalEventState() + + testContext = buildFlowEventContext(checkpoint, ExternalEventRetryRequest(REQUEST_ID_1, Instant.now())) + whenever(checkpoint.externalEventState).thenReturn(externalEventState) + whenever(externalEventManager.getRetryEvent(eq(externalEventState), any())) + .thenReturn(externalEventRecord) + + val outputContext = flowGlobalPostProcessor.postProcess(testContext) + + assertThat(outputContext.outputRecords).contains(externalEventRecord) + } + @Test fun `Adds external event record when there is an external event to send`() { val externalEventState = ExternalEventState() diff --git a/gradle.properties b/gradle.properties index 6afaca45353..6c6287a28ae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -33,13 +33,13 @@ activationVersion=1.2.0 ariesDynamicFrameworkExtensionVersion=1.3.6 antlrVersion=2.7.7 asmVersion=9.5 -avroVersion=1.11.3 +avroVersion=1.11.4 commonsVersion = 1.7 commonsLangVersion = 3.12.0 commonsTextVersion = 1.10.0 # Corda API libs revision (change in 4th digit indicates a breaking change) # Change to 5.2.1.xx-SNAPSHOT to pick up maven local published copy -cordaApiVersion=5.2.1.53-beta+ +cordaApiVersion=5.2.1.54-beta+ disruptorVersion=3.4.4 felixConfigAdminVersion=1.9.26 diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt index 446f5bcbab0..a0e3f7674f8 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/ConsumerProcessor.kt @@ -118,6 +118,8 @@ class ConsumerProcessor( metrics.processorTimer.recordCallable { try { val inputs = getInputs(consumer) + // If no records polled return early. + if (inputs.isEmpty()) return@recordCallable val outputs = processInputs(inputs) categorizeOutputs(outputs, failureCounts) commit(consumer, outputs, failureCounts) diff --git a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt index 7463fcf6136..24a67a269c7 100644 --- a/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt +++ b/libs/messaging/messaging-impl/src/main/kotlin/net/corda/messaging/mediator/processor/EventProcessor.kt @@ -15,6 +15,7 @@ import net.corda.messaging.api.records.Record import net.corda.messaging.mediator.StateManagerHelper import net.corda.messaging.mediator.metrics.EventMediatorMetrics import net.corda.tracing.addTraceContextToRecord +import org.slf4j.LoggerFactory /** * Class to process records received from the consumer. @@ -31,6 +32,9 @@ class EventProcessor( ) { private val metrics = EventMediatorMetrics(config.name) + private val retryConfig = config.retryConfig + private val logger = LoggerFactory.getLogger("${this.javaClass.name}-${config.name}") + /** * Process a group of events. @@ -74,13 +78,15 @@ class EventProcessor( var processorState = inputProcessorState val asyncOutputs = mutableMapOf, MutableList>>() val stateChangeAndOperation = try { + var isNoopState = false input.records.forEach { consumerInputEvent -> - val (updatedProcessorState, newAsyncOutputs) = processConsumerInput(consumerInputEvent, processorState, key) + val (updatedProcessorState, newAsyncOutputs, isNoop) = processConsumerInput(consumerInputEvent, processorState, key) processorState = updatedProcessorState asyncOutputs.addOutputs(consumerInputEvent, newAsyncOutputs) + if (isNoop) isNoopState = true } val state = stateManagerHelper.createOrUpdateState(key.toString(), inputState, processorState) - stateChangeAndOperation(inputState, state) + if (isNoopState) StateChangeAndOperation.Noop else stateChangeAndOperation(inputState, state) } catch (e: EventProcessorSyncEventsIntermittentException) { asyncOutputs.clear() StateChangeAndOperation.Transient @@ -127,10 +133,11 @@ class EventProcessor( consumerInputEvent: Record, processorState: StateAndEventProcessor.State?, key: K, - ): Pair?, List>> { + ): ConsumerInputOutput { var processorStateUpdated = processorState val newAsyncOutputs = mutableListOf>() val consumerInputHash = mediatorInputService.getHash(consumerInputEvent) + val isRetryTopic = consumerInputEvent.topic == config.retryConfig?.retryTopic val queue = ArrayDeque(listOf(consumerInputEvent)) while (queue.isNotEmpty()) { val event = getNextEvent(queue, consumerInputHash) @@ -141,14 +148,19 @@ class EventProcessor( } newAsyncOutputs.addAll(asyncEvents) try { - queue.addAll(processSyncEvents(key, syncEvents)) - } catch (e: CordaMessageAPIIntermittentException) { - throw EventProcessorSyncEventsIntermittentException(processorStateUpdated, e) + val (syncResponses, isNoopRetry, asyncOutputs) = processSyncEvents(key, syncEvents, isRetryTopic) + newAsyncOutputs.addAll(asyncOutputs) + queue.addAll(syncResponses) + if (isNoopRetry) { + // return early if no state update is needed + return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs, true) + } } catch (e: Exception) { + logger.warn("Failed process synchronous events for key $key", e) throw EventProcessorSyncEventsFatalException(processorStateUpdated, e) } } - return Pair(processorStateUpdated, newAsyncOutputs) + return ConsumerInputOutput(processorStateUpdated, newAsyncOutputs) } private fun getNextEvent( @@ -177,32 +189,52 @@ class EventProcessor( /** * Send any synchronous events immediately and feed results back onto the queue. - */ + * If a sync request returns from the RPC client with a transient error and retry is enabled + * then push a retry event onto the retry topic. + * If retrying again via the retry topic and transient errors occur, resend retry event and do not update state + **/ private fun processSyncEvents( key: K, - syncEvents: List> - ): List> { - return syncEvents.mapNotNull { message -> + syncEvents: List>, + isRetryTopic: Boolean + ): SyncProcessingOutput { + val outputEvents = syncEvents.mapNotNull { message -> val destination = messageRouter.getDestination(message) - @Suppress("UNCHECKED_CAST") - val reply = with(destination) { - message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) - client.send(message) as MediatorMessage? - } - reply?.let { - addTraceContextToRecord( - Record( - "", - key, - reply.payload, - 0, - listOf(Pair(SYNC_RESPONSE_HEADER, "true")) - ), - message.properties - ) + try { + @Suppress("UNCHECKED_CAST") + val reply = with(destination) { + message.addProperty(MessagingClient.MSG_PROP_ENDPOINT, endpoint) + client.send(message) as MediatorMessage? + } + reply?.let { + addTraceContextToRecord( + Record( + "", + key, + reply.payload, + 0, + listOf(Pair(SYNC_RESPONSE_HEADER, "true")) + ), + message.properties + ) + } + } catch (e: CordaMessageAPIIntermittentException) { + val asyncOutputEvents: MutableList> = mutableListOf() + if (retryConfig != null) { + retryConfig.buildRetryRequest?.let { it(key, message) }?.let { + asyncOutputEvents.addAll(it) + } + } + + // If we're on the retry topic and run into another transient error, exit early and do not update the state to save + // performance. + // If we are not on the retry topic then we need to save the state before adding the retry event. This will allow the + // flow cleanup processors to execute on an idle flow checkpoint + return SyncProcessingOutput(emptyList(), isRetryTopic, asyncOutputEvents) } } + return SyncProcessingOutput(outputEvents) } private fun convertToMessage(record: Record<*, *>): MediatorMessage { @@ -219,4 +251,27 @@ class EventProcessor( private fun List>.toMessageProperties() = associateTo(mutableMapOf()) { (key, value) -> key to (value as Any) } + + + /** + * The outputs from processing a single consumer input event from the bus. + * This will includes updates from all sync events which are processed as a result of a consumer input + * @property updatedState The state and event processor output state after processing + */ + data class ConsumerInputOutput( + val updatedState: StateAndEventProcessor.State?, + val outputEvents: List>, + val isNoop: Boolean = false + ) + + /** + * The outputs from processing a single consumer input event from the bus. + * This will includes updates from all sync events which are processed as a result of a consumer input + * @property updatedState The state and event processor output state after processing + */ + data class SyncProcessingOutput( + val syncResponses: List>, + val isNoopRetry: Boolean = false, + val asyncOutputs: List> = emptyList() + ) } \ No newline at end of file diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt index 928ebeea7b2..7180fa6f25e 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/TestUtils.kt @@ -50,10 +50,10 @@ fun generateMockCordaConsumerRecordList(numberOfRecords: Long, topic: String, pa /** * Generate [recordCount] string key/value records */ -fun getStringRecords(recordCount: Int, key: String): List> { +fun getStringRecords(recordCount: Int, key: String, topic: String = "topic"): List> { val records = mutableListOf>() for (j in 1..recordCount) { - records.add(Record("topic", key, j.toString())) + records.add(Record(topic, key, j.toString())) } return records diff --git a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt index 0969349bd3f..bd33f7cf4d8 100644 --- a/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt +++ b/libs/messaging/messaging-impl/src/test/kotlin/net/corda/messaging/mediator/processor/EventProcessorTest.kt @@ -1,6 +1,7 @@ package net.corda.messaging.mediator.processor import net.corda.libs.configuration.SmartConfigImpl +import net.corda.libs.statemanager.api.Metadata import net.corda.libs.statemanager.api.State import net.corda.messaging.api.exception.CordaMessageAPIFatalException import net.corda.messaging.api.exception.CordaMessageAPIIntermittentException @@ -10,6 +11,7 @@ import net.corda.messaging.api.mediator.MessageRouter import net.corda.messaging.api.mediator.MessagingClient import net.corda.messaging.api.mediator.RoutingDestination import net.corda.messaging.api.mediator.config.EventMediatorConfig +import net.corda.messaging.api.mediator.config.RetryConfig import net.corda.messaging.api.mediator.factory.MessageRouterFactory import net.corda.messaging.api.processor.StateAndEventProcessor import net.corda.messaging.api.processor.StateAndEventProcessor.Response @@ -34,6 +36,7 @@ import java.util.UUID @Execution(ExecutionMode.SAME_THREAD) class EventProcessorTest { private lateinit var eventMediatorConfig: EventMediatorConfig + private lateinit var eventMediatorRetryConfig: EventMediatorConfig private lateinit var stateManagerHelper: StateManagerHelper private lateinit var client: MessagingClient private lateinit var messageRouter: MessageRouter @@ -42,6 +45,7 @@ class EventProcessorTest { private lateinit var eventProcessor: EventProcessor private val inputState1: State = mock() + private val retryTopic: String = "flow.event" private val asyncMessage: String = "ASYNC_PAYLOAD" private val syncMessage: String = "SYNC_PAYLOAD" private val updatedProcessingState = StateAndEventProcessor.State("bar", null) @@ -65,6 +69,8 @@ class EventProcessorTest { } else RoutingDestination(client, "endpoint", RoutingDestination.Type.ASYNCHRONOUS) } eventMediatorConfig = buildTestConfig() + val retryConfig = RetryConfig(retryTopic, buildRetryRequest) + eventMediatorRetryConfig = buildTestConfig(retryConfig) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { Response( @@ -181,9 +187,11 @@ class EventProcessorTest { } @Test - fun `when sync processing fails with a transient error, a transient state change signal is sent`() { - val mockedState = mock() + fun `when sync processing fails with a transient error, retry is OFF, a CREATE state change signal is set and no retry event is sent` + () { val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key"), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), anyOrNull())).thenReturn(mockedState) whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { @@ -194,17 +202,61 @@ class EventProcessorTest { ) ) } - whenever(stateManagerHelper.failStateProcessing(any(), eq(null), any())).thenReturn(mockedState) - val outputMap = eventProcessor.processEvents(input) val output = outputMap["key"] assertEquals(emptyList>(), output?.asyncOutputs) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(mockedState) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) + } + + @Test + fun `when transient error while processing retry topic, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key", retryTopic), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), anyOrNull())).thenReturn(mockedState) + whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) + whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { + Response( + null, + listOf( + Record("", "key", syncMessage) + ) + ) + } + eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) + val outputMap = eventProcessor.processEvents(input) + + val output = outputMap["key"] + assertEquals(1, output?.asyncOutputs?.size) assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(null) - assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Transient::class.java) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Noop::class.java) + } + + @Test + fun `when transient error while processing event topic, retry is ON, a NOOP state change signal is set and a retry event is sent`() { + val input = mapOf("key" to EventProcessingInput("key", getStringRecords(1, "key", "flow.start"), null)) + val mockedState = mock() + whenever(stateManagerHelper.createOrUpdateState(any(), anyOrNull(), any())).thenReturn(mockedState) + whenever(client.send(any())).thenThrow(CordaMessageAPIIntermittentException("baz")) + whenever(stateAndEventProcessor.onNext(anyOrNull(), any())).thenAnswer { + Response( + StateAndEventProcessor.State("", Metadata(mapOf())), + listOf( + Record("", "key", syncMessage) + ) + ) + } + eventProcessor = EventProcessor(eventMediatorRetryConfig, stateManagerHelper, messageRouter, mediatorInputService) + val outputMap = eventProcessor.processEvents(input) + + val output = outputMap["key"] + assertEquals(1, output?.asyncOutputs?.size) + assertThat(output?.stateChangeAndOperation?.outputState).isEqualTo(mockedState) + assertThat(output?.stateChangeAndOperation).isInstanceOf(StateChangeAndOperation.Create::class.java) } - private fun buildTestConfig() = EventMediatorConfig( + private fun buildTestConfig(retryConfig: RetryConfig? = null) = EventMediatorConfig( "", SmartConfigImpl.empty(), emptyList(), @@ -214,6 +266,11 @@ class EventProcessorTest { 1, "", mock(), - 20 + 20, + retryConfig ) + + private val buildRetryRequest: ((String, MediatorMessage) -> List>) = { _, message -> + listOf(message) + } } diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt index 1d49c60bfb9..032ce9455b0 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfig.kt @@ -28,6 +28,8 @@ import java.time.Duration * @property stateManager State manager. * @property minGroupSize Minimum size for group of records passed to task manager for processing in a single thread. Does not block if * group size is not met by polled record count. + * @property retryConfig Topic to push retry events to as well as the function to build retry events. Set to null to not retry transient + * errors originating from the message pattern and fail early instead. */ data class EventMediatorConfig( val name: String, @@ -40,6 +42,7 @@ data class EventMediatorConfig( val threadName: String, val stateManager: StateManager, val minGroupSize: Int, + val retryConfig: RetryConfig? = null ) { /** * Timeout for polling consumers. diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt index a6e502a6b5c..b928ea3eb78 100644 --- a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/EventMediatorConfigBuilder.kt @@ -27,6 +27,7 @@ class EventMediatorConfigBuilder { private var threadName: String? = null private var stateManager: StateManager? = null private var minGroupSize: Int? = null + private var retryConfig: RetryConfig? = null /** Sets name for [MultiSourceEventMediator]. */ fun name(name: String) = @@ -74,6 +75,13 @@ class EventMediatorConfigBuilder { fun stateManager(stateManager: StateManager) = apply { this.stateManager = stateManager } + /** + * Sets the topic to push retry events triggered by transient errors in the message pattern when sending RPC calls. + * As well as setting how to build a retry event from the sync request + */ + fun retryConfig(retryConfig: RetryConfig) = + apply { this.retryConfig = retryConfig } + /** Builds [EventMediatorConfig]. */ fun build(): EventMediatorConfig { check(consumerFactories.isNotEmpty()) { "At least on consumer factory has to be set" } @@ -89,6 +97,8 @@ class EventMediatorConfigBuilder { threadName = checkNotNull(threadName) { "Thread name not set" }, stateManager = checkNotNull(stateManager) { "State manager not set" }, minGroupSize = checkNotNull(minGroupSize) { "Min group size not set" }, + retryConfig = retryConfig ) } + } \ No newline at end of file diff --git a/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt new file mode 100644 index 00000000000..bf7009abd8a --- /dev/null +++ b/libs/messaging/messaging/src/main/kotlin/net/corda/messaging/api/mediator/config/RetryConfig.kt @@ -0,0 +1,14 @@ +package net.corda.messaging.api.mediator.config + +import net.corda.messaging.api.mediator.MediatorMessage + +/** + * Config used to setup retry events in the mediator. + * These are used to retry transient errors that occur. + * @property retryTopic The topic to send retry events to + * @property buildRetryRequest lambda used to generate the retry requests + */ +data class RetryConfig( + val retryTopic: String, + val buildRetryRequest: ((K, MediatorMessage) -> List>)? = null, +)