Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved monitoring of replica state during large binlog load #151

Merged
merged 12 commits into from
Dec 24, 2024
4 changes: 3 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ func (app *App) removeMaintenanceFile() {
// separate goroutine performing health checks
func (app *App) healthChecker(ctx context.Context) {
ticker := time.NewTicker(app.config.HealthCheckInterval)
var oldBinLogPos string
for {
select {
case <-ticker.C:
hc := app.getLocalNodeState()
oldBinLogPos = hc.UpdateBinlogStatus(oldBinLogPos)
app.logger.Infof("healthcheck: %v", hc)
err := app.dcs.SetEphemeral(dcs.JoinPath(pathHealthPrefix, app.config.Hostname), hc)
if err != nil {
Expand Down Expand Up @@ -963,7 +965,7 @@ func (app *App) calcActiveNodesChanges(clusterState map[string]*NodeState, activ
slaveState := clusterState[host].SlaveState
dataLag := calcLagBytes(masterBinlogs, slaveState.MasterLogFile, slaveState.MasterLogPos)
if dataLag > app.config.SemiSyncEnableLag {
newBinLogPos := fmt.Sprintf("%s%019d", slaveState.MasterLogFile, slaveState.MasterLogPos)
newBinLogPos := slaveState.GetCurrentBinlogPosition()
oldBinLogPos := app.slaveReadPositions[host]

if newBinLogPos <= oldBinLogPos {
Expand Down
33 changes: 24 additions & 9 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type NodeState struct {
IsOffline bool `json:"is_offline"`
IsCascade bool `json:"is_cascade"`
IsFileSystemReadonly bool `json:"is_file_system_readonly"`
IsLoadingBinlog bool `json:"is_loading_binlog"`
Error string `json:"error"`
DiskState *DiskState `json:"disk_state"`
DaemonState *DaemonState `json:"daemon_state"`
Expand Down Expand Up @@ -144,6 +145,20 @@ func (ns *NodeState) CalcGTIDDiffWithMaster() (string, error) {
return gtids.GTIDDiff(replicaGTID, sourceGTID)
}

func (ss *SlaveState) GetCurrentBinlogPosition() string {
return fmt.Sprintf("%s%019d", ss.MasterLogFile, ss.MasterLogPos)
}

func (ns *NodeState) UpdateBinlogStatus(oldBinloPos string) (newBinlogPos string) {
if ns.SlaveState != nil {
newBinlogPos = ns.SlaveState.GetCurrentBinlogPosition()

ns.IsLoadingBinlog = newBinlogPos > oldBinloPos
}

return
}

func (ns *NodeState) String() string {
ping := "ok"
if !ns.PingOk {
Expand Down Expand Up @@ -249,15 +264,15 @@ type SlaveState struct {
LastSQLErrno int `json:"last_sql_errno"`
}

func (ns *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) {
ns.ExecutedGtidSet = replStatus.GetExecutedGtidSet()
ns.RetrievedGtidSet = replStatus.GetRetrievedGtidSet()
ns.MasterHost = replStatus.GetMasterHost()
ns.ReplicationState = replStatus.ReplicationState()
ns.MasterLogFile = replStatus.GetMasterLogFile()
ns.MasterLogPos = replStatus.GetReadMasterLogPos()
ns.LastIOErrno = replStatus.GetLastIOErrno()
ns.LastSQLErrno = replStatus.GetLastSQLErrno()
func (ss *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) {
ss.ExecutedGtidSet = replStatus.GetExecutedGtidSet()
ss.RetrievedGtidSet = replStatus.GetRetrievedGtidSet()
ss.MasterHost = replStatus.GetMasterHost()
ss.ReplicationState = replStatus.ReplicationState()
ss.MasterLogFile = replStatus.GetMasterLogFile()
ss.MasterLogPos = replStatus.GetReadMasterLogPos()
ss.LastIOErrno = replStatus.GetLastIOErrno()
ss.LastSQLErrno = replStatus.GetLastSQLErrno()
}

// SemiSyncState contains semi sync host settings
Expand Down
56 changes: 48 additions & 8 deletions internal/app/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app
import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestStringerWorksOnNodeState(t *testing.T) {
Expand All @@ -19,15 +21,21 @@ func TestStringerWorksOnNodeState(t *testing.T) {
ns.SlaveState.ExecutedGtidSet = "6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-40"

nsStr = fmt.Sprintf("%v", ns)
if nsStr != "<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-40>" {
t.Errorf("%s", ns)
}

require.Equal(
t,
"<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-40>",
nsStr,
)

ns.ShowOnlyGTIDDiff = true
nsStr = fmt.Sprintf("%v", ns)
if nsStr != "<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=source ahead on: 6dbc0b04-4b09-43dc-86cc-9af852ded919:41-101>" {
t.Errorf("%s", ns)
}

require.Equal(
t,
"<ping=ERR repl= sync=??? ro=false offline=false lag=NaN du=??? cr=??? gtid=source ahead on: 6dbc0b04-4b09-43dc-86cc-9af852ded919:41-101>",
nsStr,
)
}

func TestStringerWorksOnNodeStateMap(t *testing.T) {
Expand All @@ -37,7 +45,39 @@ func TestStringerWorksOnNodeStateMap(t *testing.T) {
m["c"] = &NodeState{}

mStr := fmt.Sprintf("%v", m)
if mStr != "map[a:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> b:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> c:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???>]" {
t.Errorf("%s", mStr)

require.Equal(
t,
"map[a:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> b:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???> c:<ping=ERR repl=??? sync=??? ro=false offline=false lag=0.00 du=??? cr=??? gtid=???>]",
mStr,
)
}

func newMockNodeState() *NodeState {
return &NodeState{
SlaveState: &SlaveState{
MasterLogFile: "test_master_log_file",
MasterLogPos: 2,
},
}
}

func TestUpdateBinlogWithChanges(t *testing.T) {
oldBinlogPostion := "test_master_log_file0000000000000000001"
ns := newMockNodeState()

newBinlogPos := ns.UpdateBinlogStatus(oldBinlogPostion)

require.Equal(t, "test_master_log_file0000000000000000002", newBinlogPos)
require.Equal(t, true, ns.IsLoadingBinlog)
}

func TestUpdateBinlogWithoutChanges(t *testing.T) {
oldBinlogPostion := "test_master_log_file0000000000000000002"
ns := newMockNodeState()

newBinlogPos := ns.UpdateBinlogStatus(oldBinlogPostion)

require.Equal(t, "test_master_log_file0000000000000000002", newBinlogPos)
require.Equal(t, false, ns.IsLoadingBinlog)
}
28 changes: 28 additions & 0 deletions tests/features/repl_mon.feature
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ Feature: repl_mon tests
"""
[{"res":1}]
"""
And mysql host "mysql2" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql2" should match json within "20" seconds
"""
{
"is_loading_binlog": true
}
"""
And mysql host "mysql3" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql3" should match json within "20" seconds
"""
{
"is_loading_binlog": true
}
"""

Scenario: repl_mon disabled
Given cluster environment is
Expand All @@ -51,3 +65,17 @@ Feature: repl_mon tests
"""
SELECT ts FROM mysql.mysync_repl_mon
"""
And mysql host "mysql2" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql2" should match json within "20" seconds
"""
{
"is_loading_binlog": false
}
"""
And mysql host "mysql3" should be replica of "mysql1"
Then zookeeper node "/test/health/mysql3" should match json within "20" seconds
"""
{
"is_loading_binlog": false
}
"""
4 changes: 2 additions & 2 deletions tests/images/base/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ubuntu:jammy
ENV container docker
ENV DEBIAN_FRONTEND noninteractive
ENV container=docker
ENV DEBIAN_FRONTEND=noninteractive
ENV ZK_VERSION=3.7.1
ARG MYSQL_VERSION=""
ENV MYSQL_VERSION="${MYSQL_VERSION}"
Expand Down
Loading