Skip to content

Commit

Permalink
CORE-20867 Implement retry topic to handle persistent transient RPC C…
Browse files Browse the repository at this point in the history
…lient errors (#6385)

The current mediator messaging pattern in Corda can encounter an retry loop when transient errors are received from other Corda workers. This retry loop blocks flow topic partitions from progressing and it has been observed that the corda cluster affected can become permanently unstable due to the effects of consumer lag. This pattern is used by the flow worker to perform synchronous HTTP calls to various workers, including verification, token, crypto, uniqueness, and persistence workers.

To address this issue, a separate Kafka topic is dedicated to handling retries. This will allow the primary ingestion topics to continue processing unaffected flows, while introducing finite retry logic for flows impacted by transient errors.

Additionally AVRO version is bumped to fix a vulnerability
  • Loading branch information
LWogan authored Nov 14, 2024
1 parent 37233f6 commit 2434c4a
Show file tree
Hide file tree
Showing 17 changed files with 489 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package net.corda.flow.external.events.impl

import java.time.Instant
import net.corda.data.flow.event.external.ExternalEvent
import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.state.external.ExternalEventState
import net.corda.flow.external.events.factory.ExternalEventFactory
import net.corda.flow.external.events.factory.ExternalEventRecord
import net.corda.messaging.api.records.Record
import java.time.Duration
import java.time.Instant

/**
* [ExternalEventManager] encapsulates external event behaviour by creating and modifying [ExternalEventState]s.
Expand Down Expand Up @@ -86,4 +86,16 @@ interface ExternalEventManager {
instant: Instant,
retryWindow: Duration
): Pair<ExternalEventState, Record<*, *>?>

/**
* Get the external event to send for the transient error retry scenario.
* Returns the event as is from the state. No additional checks required.
* @param externalEventState The [ExternalEventState] to get the event from.
* @param instant The current time. Used to set timestamp.
* @return The external event request to resend
* */
fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *>
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ class ExternalEventManagerImpl(
return externalEventState to record
}

override fun getRetryEvent(
externalEventState: ExternalEventState,
instant: Instant,
): Record<*, *> {
return generateRecord(externalEventState, instant)
}

private fun checkRetry(externalEventState: ExternalEventState, instant: Instant, retryWindow: Duration) {
when {
(externalEventState.sendTimestamp + retryWindow) >= instant -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package net.corda.flow.messaging.mediator

import com.typesafe.config.ConfigValueFactory
import net.corda.avro.serialization.CordaAvroSerializationFactory
import net.corda.data.crypto.wire.ops.flow.FlowOpsRequest
import net.corda.data.flow.event.FlowEvent
import net.corda.data.flow.event.external.ExternalEventRetryRequest
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.output.FlowStatus
import net.corda.data.flow.state.checkpoint.Checkpoint
Expand Down Expand Up @@ -30,6 +32,7 @@ import net.corda.messaging.api.mediator.RoutingDestination.Companion.routeTo
import net.corda.messaging.api.mediator.RoutingDestination.Type.ASYNCHRONOUS
import net.corda.messaging.api.mediator.RoutingDestination.Type.SYNCHRONOUS
import net.corda.messaging.api.mediator.config.EventMediatorConfigBuilder
import net.corda.messaging.api.mediator.config.RetryConfig
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactory
import net.corda.messaging.api.mediator.factory.MediatorConsumerFactoryFactory
import net.corda.messaging.api.mediator.factory.MessageRouterFactory
Expand All @@ -47,12 +50,14 @@ import net.corda.schema.configuration.BootConfig.TOKEN_SELECTION_WORKER_REST_END
import net.corda.schema.configuration.BootConfig.UNIQUENESS_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.VERIFICATION_WORKER_REST_ENDPOINT
import net.corda.schema.configuration.BootConfig.WORKER_MEDIATOR_REPLICAS_FLOW_SESSION
import net.corda.schema.configuration.MessagingConfig.Bus.KAFKA_CONSUMER_MAX_POLL_RECORDS
import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT
import net.corda.schema.configuration.MessagingConfig.Subscription.MEDIATOR_PROCESSING_THREAD_POOL_SIZE
import org.osgi.service.component.annotations.Activate
import org.osgi.service.component.annotations.Component
import org.osgi.service.component.annotations.Reference
import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.UUID

@Suppress("LongParameterList")
Expand All @@ -77,7 +82,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
private const val CONSUMER_GROUP = "FlowEventConsumer"
private const val MESSAGE_BUS_CLIENT = "MessageBusClient"
private const val RPC_CLIENT = "RpcClient"

private const val RETRY_TOPIC_POLL_LIMIT = 5
private const val RETRY_TOPIC = FLOW_EVENT_TOPIC
private const val TOKEN_RETRY = "TokenRetry"
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

Expand Down Expand Up @@ -123,12 +130,65 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
.threadName("flow-event-mediator")
.stateManager(stateManager)
.minGroupSize(messagingConfig.getInt(MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT))
.retryConfig(RetryConfig(RETRY_TOPIC, ::buildRetryRequest))
.build()


/**
* Build a request to trigger a resend of external events via the flow event pipeline.
* Request id is calculated from the previous request payload when possible to allow for some validation in the pipeline.
* This validation is an enhancement and not strictly required.
* A new Timestamp is set on each request to ensure each request is unique for replay logic handling.
* @param key the key of the input record
* @param syncRpcRequest the previous sync request which failed.
* @return list of output retry events.
*/
private fun buildRetryRequest(key: String, syncRpcRequest: MediatorMessage<Any>) : List<MediatorMessage<Any>> {
return try {
val requestId = getRequestId(syncRpcRequest)
val externalEventRetryRequest = ExternalEventRetryRequest.newBuilder()
.setRequestId(requestId)
.setTimestamp(Instant.now())
.build()
val flowEvent = FlowEvent.newBuilder()
.setFlowId(key)
.setPayload(externalEventRetryRequest)
.build()
//ensure key is set correctly on new message destined for flow topic
val properties = syncRpcRequest.properties.toMutableMap().apply { this["key"] = key }
listOf(MediatorMessage(flowEvent, properties))
} catch (ex: Exception) {
//In this scenario we failed to build the retry event. This will likely result in the flow hanging until the idle processor
// kicks in. This shouldn't be possible and is just a safety net.
logger.warn("Failed to generate a retry event for key $key. No retry will be triggered.", ex)
emptyList()
}
}

/**
* Determine the external event request id where possible.
* Note, some token events have no request id as there is no response.
* For these use a hardcoded request id which will be ignored at the validation step.
* @param syncRpcRequest the previous request
* @return Request ID to set in the retry event.
*/
private fun getRequestId(syncRpcRequest: MediatorMessage<Any>): String {
return when (val entityRequest = deserializer.deserialize(syncRpcRequest.payload as ByteArray)) {
is EntityRequest -> entityRequest.flowExternalEventContext.requestId
is FlowOpsRequest -> entityRequest.flowExternalEventContext.requestId
is LedgerPersistenceRequest -> entityRequest.flowExternalEventContext.requestId
is TransactionVerificationRequest -> entityRequest.flowExternalEventContext.requestId
is UniquenessCheckRequestAvro -> entityRequest.flowExternalEventContext.requestId
is TokenPoolCacheEvent -> TOKEN_RETRY
else -> "InvalidEntityType"
}
}

private fun createMediatorConsumerFactories(messagingConfig: SmartConfig, bootConfig: SmartConfig): List<MediatorConsumerFactory> {
val retryTopicMessagingConfig = getRetryTopicConfig(messagingConfig)
val mediatorConsumerFactory: MutableList<MediatorConsumerFactory> = mutableListOf(
mediatorConsumerFactory(FLOW_START, messagingConfig),
mediatorConsumerFactory(FLOW_EVENT_TOPIC, messagingConfig)
mediatorConsumerFactory(FLOW_EVENT_TOPIC, retryTopicMessagingConfig)
)

val mediatorReplicas = bootConfig.getIntOrDefault(WORKER_MEDIATOR_REPLICAS_FLOW_SESSION, 1)
Expand All @@ -140,6 +200,10 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
return mediatorConsumerFactory
}

private fun getRetryTopicConfig(messagingConfig: SmartConfig): SmartConfig {
return messagingConfig.withValue(KAFKA_CONSUMER_MAX_POLL_RECORDS, ConfigValueFactory.fromAnyRef(RETRY_TOPIC_POLL_LIMIT))
}

private fun mediatorConsumerFactory(
topic: String,
messagingConfig: SmartConfig
Expand Down Expand Up @@ -178,8 +242,9 @@ class FlowEventMediatorFactoryImpl @Activate constructor(
rpcEndpoint(VERIFICATION_WORKER_REST_ENDPOINT, VERIFICATION_PATH), SYNCHRONOUS)
is UniquenessCheckRequestAvro -> routeTo(rpcClient,
rpcEndpoint(UNIQUENESS_WORKER_REST_ENDPOINT, UNIQUENESS_PATH), SYNCHRONOUS)
//RETRIES will appear as FlowEvents
is FlowEvent -> routeTo(messageBusClient,
FLOW_EVENT_TOPIC, ASYNCHRONOUS)
RETRY_TOPIC, ASYNCHRONOUS)
is String -> routeTo(messageBusClient, // Handling external messaging
message.properties[MSG_PROP_TOPIC] as String, ASYNCHRONOUS)
else -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package net.corda.flow.pipeline.handlers.events

import net.corda.data.flow.event.external.ExternalEventResponse
import net.corda.data.flow.event.external.ExternalEventRetryRequest
import net.corda.flow.pipeline.events.FlowEventContext
import net.corda.flow.pipeline.exceptions.FlowEventException
import net.corda.utilities.debug
import org.osgi.service.component.annotations.Component
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/**
* Handles pre-processing of events that are intended to trigger a resend of an external event.
* This can be triggered by the mediator in the event of transient errors.
*/
@Component(service = [FlowEventHandler::class])
class ExternalEventRetryRequestHandler : FlowEventHandler<ExternalEventRetryRequest> {

private companion object {
val log: Logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
private const val TOKEN_RETRY = "TokenRetry"
}

override val type = ExternalEventRetryRequest::class.java

override fun preProcess(context: FlowEventContext<ExternalEventRetryRequest>): FlowEventContext<ExternalEventRetryRequest> {
val checkpoint = context.checkpoint
val externalEventRetryRequest = context.inputEventPayload

if (!checkpoint.doesExist) {
log.debug {
"Received a ${ExternalEventRetryRequest::class.simpleName} for flow [${context.inputEvent.flowId}] that " +
"does not exist. The event will be discarded. ${ExternalEventRetryRequest::class.simpleName}: " +
externalEventRetryRequest
}
throw FlowEventException(
"ExternalEventRetryRequestHandler received a ${ExternalEventRetryRequest::class.simpleName} for flow" +
" [${context.inputEvent.flowId}] that does not exist"
)
}

val externalEventState = checkpoint.externalEventState
val retryRequestId: String = externalEventRetryRequest.requestId
val externalEventStateRequestId = externalEventState?.requestId
if (externalEventState == null) {
log.debug {
"Received an ${ExternalEventRetryRequest::class.simpleName} with request id: " +
"$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " +
"for an ${ExternalEventResponse::class.simpleName}. " +
"${ExternalEventRetryRequest::class.simpleName}: $externalEventRetryRequest"
}
throw FlowEventException(
"ExternalEventRetryRequestHandler received an ${ExternalEventRetryRequest::class.simpleName} with request id: " +
"$retryRequestId while flow [${context.inputEvent.flowId} is not waiting " +
"for an ${ExternalEventResponse::class.simpleName}"
)
}
//Discard events not related. Some token requests do not contain the external event id so this validation will allow all token
// requests to be resent. e.g TokenForceClaimRelease
else if (externalEventStateRequestId != retryRequestId && retryRequestId != TOKEN_RETRY) {
throw FlowEventException(
"Discarding retry request received with requestId $retryRequestId. This is likely a stale record polled. Checkpoint " +
"is currently waiting to receive a response for requestId $externalEventStateRequestId"
)
}

return context
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package net.corda.flow.pipeline.impl

import net.corda.data.flow.event.external.ExternalEventRetryRequest
import net.corda.data.flow.event.mapper.FlowMapperEvent
import net.corda.data.flow.event.mapper.ScheduleCleanup
import net.corda.data.flow.state.external.ExternalEventState
import net.corda.data.flow.state.session.SessionState
import net.corda.data.flow.state.session.SessionStateType
import net.corda.flow.external.events.impl.ExternalEventManager
Expand Down Expand Up @@ -51,11 +53,11 @@ class FlowGlobalPostProcessorImpl @Activate constructor(
postProcessPendingPlatformError(context)

val outputRecords = getSessionEvents(context, now) +
getFlowMapperSessionCleanupEvents(context, now) +
getExternalEvent(context, now)
getFlowMapperSessionCleanupEvents(context, now) +
getExternalEvent(context, now)

context.flowMetrics.flowEventCompleted(context.inputEvent.payload::class.java.name)
val metadata = getStateMetadata(context)
val metadata = updateFlowSessionMetadata(context)

return context.copy(
outputRecords = context.outputRecords + outputRecords,
Expand Down Expand Up @@ -111,7 +113,7 @@ class FlowGlobalPostProcessorImpl @Activate constructor(
if (!counterpartyExists) {
val msg =
"[${context.checkpoint.holdingIdentity.x500Name}] has failed to create a flow with counterparty: " +
"[${counterparty}] as the recipient doesn't exist in the network."
"[${counterparty}] as the recipient doesn't exist in the network."
sessionManager.errorSession(sessionState)
if (doesCheckpointExist) {
log.debug { "$msg. Throwing FlowFatalException" }
Expand Down Expand Up @@ -157,24 +159,26 @@ class FlowGlobalPostProcessorImpl @Activate constructor(
* Check to see if any external events needs to be sent or resent.
*/
private fun getExternalEvent(context: FlowEventContext<Any>, now: Instant): List<Record<*, *>> {
val externalEventState = context.checkpoint.externalEventState
return if (externalEventState == null) {
listOf()
} else {
val retryWindow = context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW)
externalEventManager.getEventToSend(externalEventState, now, Duration.ofMillis(retryWindow))
.let { (updatedExternalEventState, record) ->
context.checkpoint.externalEventState = updatedExternalEventState
if (record != null) {
listOf(record)
} else {
listOf()
}
val externalEventState = context.checkpoint.externalEventState ?: return emptyList()

return when (context.inputEvent.payload) {
is ExternalEventRetryRequest -> getTransientRetryRequest(externalEventState, now)
else -> {
val retryWindow = Duration.ofMillis(context.flowConfig.getLong(EXTERNAL_EVENT_MESSAGE_RESEND_WINDOW))
externalEventManager.getEventToSend(externalEventState, now, retryWindow).let { (updatedState, record) ->
context.checkpoint.externalEventState = updatedState
listOfNotNull(record)
}
}
}
}

private fun getStateMetadata(context: FlowEventContext<Any>): Metadata? {
private fun getTransientRetryRequest(externalEventState: ExternalEventState, now: Instant):
List<Record<*, *>> {
return listOf(externalEventManager.getRetryEvent(externalEventState, now))
}

private fun updateFlowSessionMetadata(context: FlowEventContext<Any>): Metadata? {
val checkpoint = context.checkpoint
// Find the earliest expiry time for any open sessions.
val lastReceivedMessageTime = checkpoint.sessions.filter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package net.corda.flow.external.events.impl

import java.nio.ByteBuffer
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.stream.Stream
import net.corda.avro.serialization.CordaAvroDeserializer
import net.corda.avro.serialization.CordaAvroSerializer
import net.corda.data.ExceptionEnvelope
Expand Down Expand Up @@ -34,7 +30,11 @@ import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoInteractions
import org.mockito.kotlin.whenever
import java.nio.ByteBuffer
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.stream.Stream

class ExternalEventManagerImplTest {

Expand Down Expand Up @@ -530,4 +530,35 @@ class ExternalEventManagerImplTest {

assertEquals(null, record)
}

@Test
fun `getRetryEvent returns an external event`() {
val now = Instant.now().truncatedTo(ChronoUnit.MILLIS)
val key = ByteBuffer.wrap(KEY.toByteArray())
val payload = ByteBuffer.wrap(byteArrayOf(1, 2, 3))

val externalEvent = ExternalEvent().apply {
this.topic = TOPIC
this.key = key
this.payload = payload
this.timestamp = now.minusSeconds(10)
}

val externalEventState = ExternalEventState().apply {
requestId = REQUEST_ID_1
eventToSend = externalEvent
sendTimestamp = null
status = ExternalEventStateStatus(ExternalEventStateType.OK, null)
}

val record = externalEventManager.getRetryEvent(
externalEventState,
now,
)


assertEquals(TOPIC, record.topic)
assertEquals(key.array(), record.key)
assertEquals(payload.array(), record.value)
}
}
Loading

0 comments on commit 2434c4a

Please sign in to comment.