diff --git a/pulsar/error.go b/pulsar/error.go index ccadb72436..1a9345b0c3 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -121,6 +121,10 @@ const ( // fenced. Applications are now supposed to close it and create a // new producer ProducerFenced + // MaxConcurrentOperationsReached indicates that the maximum number of concurrent operations + // has been reached. This means that no additional operations can be started until some + // of the current operations complete. + MaxConcurrentOperationsReached // TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled. // This error is returned when an operation that requires the transaction coordinator is attempted // but the transaction coordinator feature is not enabled in the system or the transaction coordinator diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index d402d7ec41..c4b7a6a200 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -41,6 +41,7 @@ type transactionCoordinatorClient struct { // where the TC located. const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign" +var ErrMaxConcurrentOpsReached = newError(MaxConcurrentOperationsReached, "Max concurrent operations reached") var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+ "the transaction coordinator, or the transaction coordinator has not initialized") @@ -212,7 +213,7 @@ func getTCAssignTopicName(partition uint64) string { func (tc *transactionCoordinatorClient) canSendRequest() error { if !tc.semaphore.Acquire(context.Background()) { - return newError(UnknownError, "Failed to acquire semaphore") + return ErrMaxConcurrentOpsReached } return nil }