Skip to content

Commit

Permalink
Fix transaction coordinator client cann't reconnect to the broker (#1237
Browse files Browse the repository at this point in the history
)

Fixes #1227

### Motivation

There are some issues with the `transactionCoordinatorClient` in the go client. When using the transaction, if there are any reconnection happens during the transaction operation. The connection to the transaction coordinator won't be reconnected. This causes all following operations to fail with the `connection closed` error.

### Modifications

- Introduced `transactionHandler` to manage reconnections and handle requests.
- Fix the tc client would crash if the broker doesn't enable the transaction
- Improved the error handling in the tc client.
  • Loading branch information
RobertIndie authored Jul 11, 2024
1 parent e7a771f commit 50dce7e
Show file tree
Hide file tree
Showing 4 changed files with 398 additions and 100 deletions.
5 changes: 5 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Connection interface {
ID() string
GetMaxMessageSize() int32
Close()
WaitForClose() <-chan interface{}
IsProxied() bool
}

Expand Down Expand Up @@ -1049,6 +1050,10 @@ func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
return time.Since(c.lastActive) > maxIdleTime
}

func (c *connection) WaitForClose() <-chan interface{} {
return c.closeCh
}

// Close closes the connection by
// closing underlying socket connection and closeCh.
// This also triggers callbacks to the ConnectionClosed listeners.
Expand Down
13 changes: 13 additions & 0 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)

// GetConnections get all connections in the pool.
GetConnections() map[string]Connection

// Close all the connections in the pool
Close()
}
Expand Down Expand Up @@ -124,6 +127,16 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
return conn, err
}

func (p *connectionPool) GetConnections() map[string]Connection {
p.Lock()
conns := make(map[string]Connection)
for k, c := range p.connections {
conns[k] = c
}
p.Unlock()
return conns
}

func (p *connectionPool) Close() {
p.Lock()
close(p.closeCh)
Expand Down
Loading

0 comments on commit 50dce7e

Please sign in to comment.