Skip to content

Commit

Permalink
[improve] return ErrMaxConcurrentOpsReached when too many concurren…
Browse files Browse the repository at this point in the history
…t ops in transaction coordinator client (#1242)

### Motivation

Currently, the client will return an UnknownError when there are too many concurrent ops in transaction coordinator client

### Modifications

- Add new error `ErrMaxConcurrentOpsReached` 
- Return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client
  • Loading branch information
RobertIndie authored Jul 11, 2024
1 parent 64d1e00 commit e7a771f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ const (
// fenced. Applications are now supposed to close it and create a
// new producer
ProducerFenced
// MaxConcurrentOperationsReached indicates that the maximum number of concurrent operations
// has been reached. This means that no additional operations can be started until some
// of the current operations complete.
MaxConcurrentOperationsReached
// TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled.
// This error is returned when an operation that requires the transaction coordinator is attempted
// but the transaction coordinator feature is not enabled in the system or the transaction coordinator
Expand Down
3 changes: 2 additions & 1 deletion pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type transactionCoordinatorClient struct {
// where the TC located.
const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign"

var ErrMaxConcurrentOpsReached = newError(MaxConcurrentOperationsReached, "Max concurrent operations reached")
var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+
"the transaction coordinator, or the transaction coordinator has not initialized")

Expand Down Expand Up @@ -212,7 +213,7 @@ func getTCAssignTopicName(partition uint64) string {

func (tc *transactionCoordinatorClient) canSendRequest() error {
if !tc.semaphore.Acquire(context.Background()) {
return newError(UnknownError, "Failed to acquire semaphore")
return ErrMaxConcurrentOpsReached
}
return nil
}
Expand Down

0 comments on commit e7a771f

Please sign in to comment.