From 41141c11c36930a19da727fd25a4876bd56f76a6 Mon Sep 17 00:00:00 2001 From: lengyuexuexuan <2428366760@qq.com> Date: Mon, 19 Feb 2024 10:59:14 +0800 Subject: [PATCH 1/6] Fix the bug that the go client wouldn't update conf when one replica coredump. Add one feature that the client would forward to the priamry when the metalist of client don't contain the primary. --- go-client/pegasus/table_connector.go | 1 + go-client/pegasus/table_connector_test.go | 8 ++- go-client/session/meta_call.go | 70 ++++++++++++++++++----- go-client/session/meta_session.go | 18 +++++- go-client/session/meta_session_test.go | 19 +++++- 5 files changed, 99 insertions(+), 17 deletions(-) diff --git a/go-client/pegasus/table_connector.go b/go-client/pegasus/table_connector.go index d1074a7bf3..c83461e393 100644 --- a/go-client/pegasus/table_connector.go +++ b/go-client/pegasus/table_connector.go @@ -703,6 +703,7 @@ func (p *pegasusTableConnector) handleReplicaError(err error, replica *session.R case base.ERR_TIMEOUT: case context.DeadlineExceeded: + confUpdate = true case context.Canceled: // timeout will not trigger a configuration update diff --git a/go-client/pegasus/table_connector_test.go b/go-client/pegasus/table_connector_test.go index 1b28747655..b4016748ea 100644 --- a/go-client/pegasus/table_connector_test.go +++ b/go-client/pegasus/table_connector_test.go @@ -269,8 +269,14 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) { assert.True(t, confUpdate) assert.False(t, retry) + confUpdate, retry, err = ptb.handleReplicaError(context.DeadlineExceeded, nil) + <-ptb.confUpdateCh + assert.Error(t, err) + assert.True(t, confUpdate) + assert.False(t, retry) + { // Ensure: The following errors should not trigger configuration update - errorTypes := []error{base.ERR_TIMEOUT, context.DeadlineExceeded, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT} + errorTypes := []error{base.ERR_TIMEOUT, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT} for _, err := range errorTypes { channelEmpty := false diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index 2db6179ab1..fc64073f6a 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -21,11 +21,12 @@ package session import ( "context" + "github.com/apache/incubator-pegasus/go-client/idl/base" + "github.com/apache/incubator-pegasus/go-client/idl/replication" + "github.com/apache/incubator-pegasus/go-client/pegalog" "sync" "sync/atomic" "time" - - "github.com/apache/incubator-pegasus/go-client/idl/base" ) type metaCallFunc func(context.Context, *metaSession) (metaResponse, error) @@ -42,21 +43,23 @@ type metaCall struct { backupCh chan interface{} callFunc metaCallFunc - metas []*metaSession - lead int + metaIPAddrs []string + metas []*metaSession + lead int // After a Run successfully ends, the current leader will be set in this field. // If there is no meta failover, `newLead` equals to `lead`. newLead uint32 } -func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc) *metaCall { +func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc, meatIPAddr []string) *metaCall { return &metaCall{ - metas: metas, - lead: lead, - newLead: uint32(lead), - respCh: make(chan metaResponse), - callFunc: callFunc, - backupCh: make(chan interface{}), + metas: metas, + metaIPAddrs: meatIPAddr, + lead: lead, + newLead: uint32(lead), + respCh: make(chan metaResponse), + callFunc: callFunc, + backupCh: make(chan interface{}), } } @@ -106,14 +109,40 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, error) { } // issueSingleMeta returns false if we should try another meta -func (c *metaCall) issueSingleMeta(ctx context.Context, i int) bool { - meta := c.metas[i] +func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool { + meta := c.metas[curLeader] resp, err := c.callFunc(ctx, meta) + + if err == nil && resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() { + forwardAddr := c.getMetaServiceForwardAddress(resp) + if forwardAddr == nil { + return false + } + addr := forwardAddr.GetAddress() + found := false + for i := range c.metaIPAddrs { + if addr == c.metaIPAddrs[i] { + found = true + break + } + } + if !found { + c.metaIPAddrs = append(c.metaIPAddrs, addr) + c.metas = append(c.metas, &metaSession{ + NodeSession: newNodeSession(addr, NodeTypeMeta), + logger: pegalog.GetLogger(), + }) + curLeader = len(c.metas) - 1 + c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr) + } + resp, err = c.callFunc(ctx, c.metas[curLeader]) + } + if err != nil || resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() { return false } // the RPC succeeds, this meta becomes the new leader now. - atomic.StoreUint32(&c.newLead, uint32(i)) + atomic.StoreUint32(&c.newLead, uint32(curLeader)) select { case <-ctx.Done(): case c.respCh <- resp: @@ -133,3 +162,16 @@ func (c *metaCall) issueBackupMetas(ctx context.Context) { }(i) } } + +func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress { + rep, ok := resp.(*replication.QueryCfgResponse) + if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() { + return nil + } else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 { + return nil + } else { + return rep.Partitions[0].Primary + + } + +} diff --git a/go-client/session/meta_session.go b/go-client/session/meta_session.go index c209cb8488..b0e962d1d9 100644 --- a/go-client/session/meta_session.go +++ b/go-client/session/meta_session.go @@ -94,10 +94,12 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager { func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) { lead := m.getCurrentLeader() - call := newMetaCall(lead, m.metas, callFunc) + call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs) resp, err := call.Run(ctx) if err == nil { m.setCurrentLeader(int(call.newLead)) + m.setNewMetas(call.metas) + m.setMetaIPAddrs(call.metaIPAddrs) } return resp, err } @@ -131,6 +133,20 @@ func (m *MetaManager) setCurrentLeader(lead int) { m.currentLeader = lead } +func (m *MetaManager) setNewMetas(metas []*metaSession) { + m.mu.Lock() + defer m.mu.Unlock() + + m.metas = metas +} + +func (m *MetaManager) setMetaIPAddrs(metaIPAddrs []string) { + m.mu.Lock() + defer m.mu.Unlock() + + m.metaIPAddrs = metaIPAddrs +} + // Close the sessions. func (m *MetaManager) Close() error { funcs := make([]func() error, len(m.metas)) diff --git a/go-client/session/meta_session_test.go b/go-client/session/meta_session_test.go index d2cbf6cc3d..0c6f54ffe0 100644 --- a/go-client/session/meta_session_test.go +++ b/go-client/session/meta_session_test.go @@ -118,7 +118,7 @@ func TestMetaManager_FirstMetaDead(t *testing.T) { for i := 0; i < 3; i++ { call := newMetaCall(mm.currentLeader, mm.metas, func(rpcCtx context.Context, ms *metaSession) (metaResponse, error) { return ms.queryConfig(rpcCtx, "temp") - }) + }, []string{"0.0.0.0:12345", "0.0.0.0:34603", "0.0.0.0:34602", "0.0.0.0:34601"}) // This a trick for testing. If metaCall issue to other meta, not only to the leader, this nil channel will cause panic. call.backupCh = nil metaResp, err := call.Run(context.Background()) @@ -126,3 +126,20 @@ func TestMetaManager_FirstMetaDead(t *testing.T) { assert.Equal(t, metaResp.GetErr().Errno, base.ERR_OK.String()) } } + +// This case mocks the case that the server primary meta is not in the client metalist. +// And the client will forward to the primary meta automatically. +func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) { + defer leaktest.Check(t)() + + metaList := []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"} + + for i := 0; i < 3; i++ { + mm := NewMetaManager(metaList[i:i+1], NewNodeSession) + defer mm.Close() + resp, err := mm.QueryConfig(context.Background(), "temp") + println(resp) + assert.Nil(t, err) + assert.Equal(t, resp.Err.Errno, base.ERR_OK.String()) + } +} From 54b478a300675ef146c9b1ce884a88e4bfc5176a Mon Sep 17 00:00:00 2001 From: lengyuexuexuan <2428366760@qq.com> Date: Thu, 22 Feb 2024 14:13:52 +0800 Subject: [PATCH 2/6] fix go client bug --- go-client/session/meta_call.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index fc64073f6a..8d425fce1a 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -134,8 +134,8 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool { }) curLeader = len(c.metas) - 1 c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr) + resp, err = c.callFunc(ctx, c.metas[curLeader]) } - resp, err = c.callFunc(ctx, c.metas[curLeader]) } if err != nil || resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() { From 90caeba1dceb8fb62681b2eb435d24d04306c086 Mon Sep 17 00:00:00 2001 From: lengyuexuexuan <2428366760@qq.com> Date: Thu, 22 Feb 2024 14:25:28 +0800 Subject: [PATCH 3/6] Solve the problem of error "File is not `goimports`-ed (goimports)" in the CI process --- go-client/session/meta_call.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index 8d425fce1a..6f8a03c0f0 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -21,12 +21,13 @@ package session import ( "context" - "github.com/apache/incubator-pegasus/go-client/idl/base" - "github.com/apache/incubator-pegasus/go-client/idl/replication" - "github.com/apache/incubator-pegasus/go-client/pegalog" "sync" "sync/atomic" "time" + + "github.com/apache/incubator-pegasus/go-client/idl/base" + "github.com/apache/incubator-pegasus/go-client/idl/replication" + "github.com/apache/incubator-pegasus/go-client/pegalog" ) type metaCallFunc func(context.Context, *metaSession) (metaResponse, error) From 0b39c960e97087a4cdc873cbf0e8c2f6110a42aa Mon Sep 17 00:00:00 2001 From: lengyuexuexuan <2428366760@qq.com> Date: Mon, 4 Mar 2024 19:36:12 +0800 Subject: [PATCH 4/6] Fix go client format issues --- go-client/session/meta_call.go | 4 +--- go-client/session/meta_session_test.go | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index 6f8a03c0f0..b0306ba79c 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -172,7 +172,5 @@ func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddr return nil } else { return rep.Partitions[0].Primary - } - -} +} \ No newline at end of file diff --git a/go-client/session/meta_session_test.go b/go-client/session/meta_session_test.go index 0c6f54ffe0..9402a132cf 100644 --- a/go-client/session/meta_session_test.go +++ b/go-client/session/meta_session_test.go @@ -138,8 +138,7 @@ func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) { mm := NewMetaManager(metaList[i:i+1], NewNodeSession) defer mm.Close() resp, err := mm.QueryConfig(context.Background(), "temp") - println(resp) assert.Nil(t, err) assert.Equal(t, resp.Err.Errno, base.ERR_OK.String()) } -} +} \ No newline at end of file From 1bdcd079460db573266cfe9e52e71c08b6590bff Mon Sep 17 00:00:00 2001 From: lengyuexuexuan <2428366760@qq.com> Date: Mon, 4 Mar 2024 19:42:38 +0800 Subject: [PATCH 5/6] Fix go client format issues --- go-client/session/meta_call.go | 2 +- go-client/session/meta_session_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index b0306ba79c..1f3c842e4e 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -173,4 +173,4 @@ func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddr } else { return rep.Partitions[0].Primary } -} \ No newline at end of file +} diff --git a/go-client/session/meta_session_test.go b/go-client/session/meta_session_test.go index 9402a132cf..5014a4680e 100644 --- a/go-client/session/meta_session_test.go +++ b/go-client/session/meta_session_test.go @@ -141,4 +141,4 @@ func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) { assert.Nil(t, err) assert.Equal(t, resp.Err.Errno, base.ERR_OK.String()) } -} \ No newline at end of file +} From de72a05cbad779847d99eef9652cb23f37d95fea Mon Sep 17 00:00:00 2001 From: lengyuexuexuan <2428366760@qq.com> Date: Wed, 13 Mar 2024 16:28:31 +0800 Subject: [PATCH 6/6] fix data race --- go-client/session/meta_call.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go index 1f3c842e4e..d846aa09b8 100644 --- a/go-client/session/meta_call.go +++ b/go-client/session/meta_call.go @@ -50,6 +50,7 @@ type metaCall struct { // After a Run successfully ends, the current leader will be set in this field. // If there is no meta failover, `newLead` equals to `lead`. newLead uint32 + lock sync.RWMutex } func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc, meatIPAddr []string) *metaCall { @@ -121,18 +122,22 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool { } addr := forwardAddr.GetAddress() found := false + c.lock.Lock() for i := range c.metaIPAddrs { if addr == c.metaIPAddrs[i] { found = true break } } + c.lock.Unlock() if !found { + c.lock.Lock() c.metaIPAddrs = append(c.metaIPAddrs, addr) c.metas = append(c.metas, &metaSession{ NodeSession: newNodeSession(addr, NodeTypeMeta), logger: pegalog.GetLogger(), }) + c.lock.Unlock() curLeader = len(c.metas) - 1 c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr) resp, err = c.callFunc(ctx, c.metas[curLeader])