Skip to content

Commit

Permalink
Improved monitoring of replica state during large binlog load (#151)
Browse files Browse the repository at this point in the history
* Add new field `IsLoadingBinlog` to NodeState

* Add unit and feature tests for new flag `IsLoadingBinlog` of nodes
- Some reveiw of old unit tests

* Update Dockerfile:
- removed 2 warnings

* Add new func GetCurrentBinlogPosition() to SlaveState

* Some fixes during PR

* rename variable in data.go for correctly linting

* Add testing binlog loading to repl_mon.feature testing

* Microfix for linter

* microfixes for linter

* Remove gaps

* Remove another gaps
  • Loading branch information
WithSoull authored Dec 24, 2024
1 parent c91061f commit 3eeb942
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 20 deletions.
4 changes: 3 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ func (app *App) removeMaintenanceFile() {
// separate goroutine performing health checks
func (app *App) healthChecker(ctx context.Context) {
ticker := time.NewTicker(app.config.HealthCheckInterval)
var oldBinLogPos string
for {
select {
case <-ticker.C:
hc := app.getLocalNodeState()
oldBinLogPos = hc.UpdateBinlogStatus(oldBinLogPos)
app.logger.Infof("healthcheck: %v", hc)
err := app.dcs.SetEphemeral(dcs.JoinPath(pathHealthPrefix, app.config.Hostname), hc)
if err != nil {
Expand Down Expand Up @@ -963,7 +965,7 @@ func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activ
slaveState := clusterState[host].SlaveState
dataLag := calcLagBytes(masterBinlogs, slaveState.MasterLogFile, slaveState.MasterLogPos)
if dataLag > app.config.SemiSyncEnableLag {
newBinLogPos := fmt.Sprintf("%s%019d", slaveState.MasterLogFile, slaveState.MasterLogPos)
newBinLogPos := slaveState.GetCurrentBinlogPosition()
oldBinLogPos := app.slaveReadPositions[host]

if newBinLogPos <= oldBinLogPos {
Expand Down
33 changes: 24 additions & 9 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type NodeState struct {
IsOffline bool `json:"is_offline"`
IsCascade bool `json:"is_cascade"`
IsFileSystemReadonly bool `json:"is_file_system_readonly"`
IsLoadingBinlog bool `json:"is_loading_binlog"`
Error string `json:"error"`
DiskState *DiskState `json:"disk_state"`
DaemonState *DaemonState `json:"daemon_state"`
Expand Down Expand Up @@ -144,6 +145,20 @@ func (ns *NodeState) CalcGTIDDiffWithMaster() (string, error) {
return gtids.GTIDDiff(replicaGTID, sourceGTID)
}

func (ss *SlaveState) GetCurrentBinlogPosition() string {
return fmt.Sprintf("%s%019d", ss.MasterLogFile, ss.MasterLogPos)
}

func (ns *NodeState) UpdateBinlogStatus(oldBinloPos string) (newBinlogPos string) {
if ns.SlaveState != nil {
newBinlogPos = ns.SlaveState.GetCurrentBinlogPosition()

ns.IsLoadingBinlog = newBinlogPos > oldBinloPos
}

return
}

func (ns *NodeState) String() string {
ping := "ok"
if !ns.PingOk {
Expand Down Expand Up @@ -249,15 +264,15 @@ type SlaveState struct {
LastSQLErrno int `json:"last_sql_errno"`
}

func (ns *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) {
ns.ExecutedGtidSet = replStatus.GetExecutedGtidSet()
ns.RetrievedGtidSet = replStatus.GetRetrievedGtidSet()
ns.MasterHost = replStatus.GetMasterHost()
ns.ReplicationState = replStatus.ReplicationState()
ns.MasterLogFile = replStatus.GetMasterLogFile()
ns.MasterLogPos = replStatus.GetReadMasterLogPos()
ns.LastIOErrno = replStatus.GetLastIOErrno()
ns.LastSQLErrno = replStatus.GetLastSQLErrno()
func (ss *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) {
ss.ExecutedGtidSet = replStatus.GetExecutedGtidSet()
ss.RetrievedGtidSet = replStatus.GetRetrievedGtidSet()
ss.MasterHost = replStatus.GetMasterHost()
ss.ReplicationState = replStatus.ReplicationState()
ss.MasterLogFile = replStatus.GetMasterLogFile()
ss.MasterLogPos = replStatus.GetReadMasterLogPos()
ss.LastIOErrno = replStatus.GetLastIOErrno()
ss.LastSQLErrno = replStatus.GetLastSQLErrno()
}

// SemiSyncState contains semi sync host settings
Expand Down
56 changes: 48 additions & 8 deletions internal/app/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app
import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestStringerWorksOnNodeState(t *testing.T) {
Expand All @@ -19,15 +21,21 @@ func TestStringerWorksOnNodeState(t *testing.T) {
ns.SlaveState.ExecutedGtidSet = "6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-40"

nsStr = fmt.Sprintf("%v", ns)
if nsStr != "<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-40>" {
t.Errorf("%s", ns)
}

require.Equal(
t,
"<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-40>",
nsStr,
)

ns.ShowOnlyGTIDDiff = true
nsStr = fmt.Sprintf("%v", ns)
if nsStr != "<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=source ahead on: 6dbc0b04-4b09-43dc-86cc-9af852ded919:41-101>" {
t.Errorf("%s", ns)
}

require.Equal(
t,
"<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=source ahead on: 6dbc0b04-4b09-43dc-86cc-9af852ded919:41-101>",
nsStr,
)
}

func TestStringerWorksOnNodeStateMap(t *testing.T) {
Expand All @@ -37,7 +45,39 @@ func TestStringerWorksOnNodeStateMap(t *testing.T) {
m["c"] = &NodeState{}

mStr := fmt.Sprintf("%v", m)
if mStr != "map[a:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> b:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> c:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???>]" {
t.Errorf("%s", mStr)

require.Equal(
t,
"map[a:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> b:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> c:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???>]",
mStr,
)
}

func newMockNodeState() *NodeState {
return &NodeState{
SlaveState: &SlaveState{
MasterLogFile: "test_master_log_file",
MasterLogPos: 2,
},
}
}

func TestUpdateBinlogWithChanges(t *testing.T) {
oldBinlogPostion := "test_master_log_file0000000000000000001"
ns := newMockNodeState()

newBinlogPos := ns.UpdateBinlogStatus(oldBinlogPostion)

require.Equal(t, "test_master_log_file0000000000000000002", newBinlogPos)
require.Equal(t, true, ns.IsLoadingBinlog)
}

func TestUpdateBinlogWithoutChanges(t *testing.T) {
oldBinlogPostion := "test_master_log_file0000000000000000002"
ns := newMockNodeState()

newBinlogPos := ns.UpdateBinlogStatus(oldBinlogPostion)

require.Equal(t, "test_master_log_file0000000000000000002", newBinlogPos)
require.Equal(t, false, ns.IsLoadingBinlog)
}
28 changes: 28 additions & 0 deletions tests/features/repl_mon.feature
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ Feature: repl_mon tests
"""
[{"res":1}]
"""
And mysql host "mysql2" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql2" should match json within "20" seconds
"""
{
"is_loading_binlog": true
}
"""
And mysql host "mysql3" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql3" should match json within "20" seconds
"""
{
"is_loading_binlog": true
}
"""

Scenario: repl_mon disabled
Given cluster environment is
Expand All @@ -51,3 +65,17 @@ Feature: repl_mon tests
"""
SELECT ts FROM mysql.mysync_repl_mon
"""
And mysql host "mysql2" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql2" should match json within "20" seconds
"""
{
"is_loading_binlog": false
}
"""
And mysql host "mysql3" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql3" should match json within "20" seconds
"""
{
"is_loading_binlog": false
}
"""
4 changes: 2 additions & 2 deletions tests/images/base/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ubuntu:jammy
ENV container docker
ENV DEBIAN_FRONTEND noninteractive
ENV container=docker
ENV DEBIAN_FRONTEND=noninteractive
ENV ZK_VERSION=3.7.1
ARG MYSQL_VERSION=""
ENV MYSQL_VERSION="${MYSQL_VERSION}"
Expand Down

0 comments on commit 3eeb942

Please sign in to comment.