Skip to content

Commit

Permalink
Bug fix for replication repair on long queries (#66)
Browse files Browse the repository at this point in the history
* bug fix

* bug fix: full gtid replaced with channel gtid

* Resolve Issues

* Small performance improvement

* Fix

---------

Co-authored-by: Evgeniy <[email protected]>
  • Loading branch information
noname0443 and noname0443 authored Jan 10, 2024
1 parent a9b6d27 commit 68b6a93
Showing 1 changed file with 38 additions and 11 deletions.
49 changes: 38 additions & 11 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/yandex/mysync/internal/mysql"
"github.com/yandex/mysync/internal/mysql/gtids"
)

type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string, channel string) error
Expand All @@ -17,8 +18,9 @@ const (
)

type ReplicationRepairState struct {
LastAttempt time.Time
History map[ReplicationRepairAlgorithmType]int
LastAttempt time.Time
History map[ReplicationRepairAlgorithmType]int
LastGTIDExecuted string
}

func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
Expand All @@ -31,12 +33,26 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
}

if replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
delete(app.replRepairState, key)
status, err := node.ReplicaStatusWithTimeout(app.config.DBTimeout, channel)
if err != nil {
return
}

newGtidSet := gtids.ParseGtidSet(status.GetExecutedGtidSet())
oldGtidSet := gtids.ParseGtidSet(replState.LastGTIDExecuted)

if !isGTIDLessOrEqual(newGtidSet, oldGtidSet) {
delete(app.replRepairState, key)
}
}
}

func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) {
replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel))
replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host(), channel)
if err != nil {
app.logger.Errorf("repair error: host %s, %v", node.Host(), err)
return
}

if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
return
Expand Down Expand Up @@ -138,29 +154,40 @@ func (state *ReplicationRepairState) cooldownPassed(replicationRepairCooldown ti
return state.LastAttempt.Before(cooldown)
}

func (app *App) getOrCreateHostRepairState(stateKey string) *ReplicationRepairState {
func (app *App) getOrCreateHostRepairState(stateKey, hostname, channel string) (*ReplicationRepairState, error) {
var replState *ReplicationRepairState
if state, ok := app.replRepairState[stateKey]; ok {
replState = state
} else {
replState = app.createRepairState()
var err error
replState, err = app.createRepairState(hostname, channel)
if err != nil {
return nil, err
}

app.replRepairState[stateKey] = replState
}

return replState
return replState, nil
}

func (app *App) createRepairState() *ReplicationRepairState {
func (app *App) createRepairState(hostname, channel string) (*ReplicationRepairState, error) {
status, err := app.cluster.Get(hostname).ReplicaStatusWithTimeout(app.config.DBTimeout, channel)
if err != nil {
return nil, err
}

result := ReplicationRepairState{
LastAttempt: time.Now(),
History: make(map[ReplicationRepairAlgorithmType]int),
LastAttempt: time.Now(),
History: make(map[ReplicationRepairAlgorithmType]int),
LastGTIDExecuted: status.GetExecutedGtidSet(),
}

for i := range app.getAlgorithmOrder() {
result.History[ReplicationRepairAlgorithmType(i)] = 0
}

return &result
return &result, nil
}

var defaultOrder = []ReplicationRepairAlgorithmType{
Expand Down

0 comments on commit 68b6a93

Please sign in to comment.