From 4fa59d73df62abf37f46688190bee6837e7f87ff Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 22 Sep 2023 01:15:22 +0800 Subject: [PATCH 1/5] style: improve zipper log readability --- cli/README.md | 2 +- core/client.go | 1 + core/client_test.go | 4 +++ core/context.go | 8 ------ core/server.go | 32 +++++++++--------------- example/4-cascading-zipper/zipper_1.yaml | 4 +-- example/4-cascading-zipper/zipper_2.yaml | 2 +- zipper.go | 6 +---- 8 files changed, 22 insertions(+), 37 deletions(-) diff --git a/cli/README.md b/cli/README.md index 0ac00b79f..9bb512607 100644 --- a/cli/README.md +++ b/cli/README.md @@ -54,7 +54,7 @@ go run main.go #### Write the serverless function -See [../example/9-cli/sfn/main.go](../example/9-cli/sfn/main.go) +See [../example/9-cli/sfn/app.go](../example/9-cli/sfn/app.go) #### Build diff --git a/core/client.go b/core/client.go index b95db4972..fb617af98 100644 --- a/core/client.go +++ b/core/client.go @@ -333,6 +333,7 @@ func (c *Client) Name() string { return c.name } // FrameWriterConnection represents a frame writer that can connect to an addr. type FrameWriterConnection interface { frame.Writer + ClientID() string Name() string Close() error Connect(context.Context, string) error diff --git a/core/client_test.go b/core/client_test.go index c856f8951..0815e6a6a 100644 --- a/core/client_test.go +++ b/core/client_test.go @@ -16,6 +16,7 @@ import ( "github.com/yomorun/yomo/core/router" "github.com/yomorun/yomo/core/ylog" "github.com/yomorun/yomo/pkg/frame-codec/y3codec" + "github.com/yomorun/yomo/pkg/id" ) const testaddr = "127.0.0.1:19999" @@ -224,6 +225,7 @@ func createTestStreamFunction(name string, observedTag frame.Tag) *Client { // frameWriterRecorder frames be writen. type frameWriterRecorder struct { + id string name string codec frame.Codec packetReader frame.PacketReadWriter @@ -233,6 +235,7 @@ type frameWriterRecorder struct { func newFrameWriterRecorder(name string) *frameWriterRecorder { return &frameWriterRecorder{ + id: id.New(), name: name, codec: y3codec.Codec(), packetReader: y3codec.PacketReadWriter(), @@ -240,6 +243,7 @@ func newFrameWriterRecorder(name string) *frameWriterRecorder { } } +func (w *frameWriterRecorder) ClientID() string { return w.id } func (w *frameWriterRecorder) Name() string { return w.name } func (w *frameWriterRecorder) Close() error { return nil } func (w *frameWriterRecorder) Connect(_ context.Context, _ string) error { return nil } diff --git a/core/context.go b/core/context.go index f98a46d6e..791e9ab55 100644 --- a/core/context.go +++ b/core/context.go @@ -98,12 +98,6 @@ func newContext(conn Connection, route router.Route, logger *slog.Logger) (c *Co c = v.(*Context) } - logger = logger.With( - "conn_id", conn.ID(), - "conn_name", conn.Name(), - "conn_type", conn.ClientType().String(), - ) - c.Connection = conn c.Route = route c.BaseLogger = logger @@ -130,8 +124,6 @@ func (c *Context) WithFrame(f frame.Frame) error { return err } - c.Logger = c.BaseLogger.With(MetadataSlogAttr(fmd)) - // merge connection metadata. c.Connection.Metadata().Range(func(k, v string) bool { fmd.Set(k, v) diff --git a/core/server.go b/core/server.go index fa2738f72..f5e04e475 100644 --- a/core/server.go +++ b/core/server.go @@ -95,8 +95,6 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) error { return err } - s.logger = s.logger.With("zipper_addr", addr) - // connect to all downstreams. for addr, client := range s.downstreams { go client.Connect(ctx, addr) @@ -145,12 +143,14 @@ func (s *Server) handshake(qconn quic.Connection, fs *FrameStream) (bool, router func (s *Server) handleConnection(qconn quic.Connection, fs *FrameStream, logger *slog.Logger) { ok, route, conn := s.handshake(qconn, fs) - if !ok { - logger.Info("handshake failed") + logger.Error("handshake failed") return } + logger = logger.With("conn_id", conn.ID()) + logger.Info("client connected", "remote_addr", qconn.RemoteAddr().String(), "client_type", conn.ClientType().String(), "name", conn.Name()) + c := newContext(conn, route, logger) s.handleContext(c) @@ -233,7 +233,6 @@ func (s *Server) handleHandshakeFrame(qconn quic.Connection, fs *FrameStream, hf s.logger.Warn("authentication failed", "credential", hf.AuthName) return nil, fmt.Errorf("authentication failed: client credential name is %s", hf.AuthName) } - s.logger.Info("authentication successful", "credential", hf.AuthName) conn := newConnection(hf.Name, hf.ID, ClientType(hf.ClientType), md, hf.ObserveDataTags, qconn, fs) @@ -265,7 +264,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { } s.listener = listener - s.logger.Info("zipper is up and running", "pid", os.Getpid(), "quic", s.opts.quicConfig.Versions, "auth_name", s.authNames()) + s.logger.Info("zipper is up and running", "zipper_addr", s.listener.Addr().String(), "pid", os.Getpid(), "quic", s.opts.quicConfig.Versions, "auth_name", s.authNames()) defer closeServer(s.downstreams, s.connector, s.listener, s.router) @@ -278,7 +277,6 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { s.logger.Error("accepted an error when accepting a connection", "err", err) return err } - logger := s.logger.With("remote_addr", qconn.RemoteAddr().String(), "local_addr", conn.LocalAddr().String()) stream, err := qconn.AcceptStream(ctx) if err != nil { @@ -287,7 +285,7 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error { fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter()) - go s.handleConnection(qconn, fs, logger) + go s.handleConnection(qconn, fs, s.logger) } } @@ -402,7 +400,7 @@ func (s *Server) handleDataFrame(c *Context) error { // find stream function ids from the route. streamIDs := route.GetForwardRoutes(dataFrame.Tag) - c.Logger.Debug("sfn routing", "data_tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot()) + c.Logger.Debug("sfn routing", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot()) for _, toID := range streamIDs { stream, ok, err := s.connector.Get(toID) @@ -414,13 +412,7 @@ func (s *Server) handleDataFrame(c *Context) error { continue } - c.Logger.Info( - "routing data frame", - "from_stream_name", from.Name(), - "from_stream_id", from.ID(), - "to_stream_name", stream.Name(), - "to_stream_id", toID, - ) + c.Logger.Info("routing data frame", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "to", toID) // write data frame to stream if err := stream.WriteFrame(dataFrame); err != nil { @@ -487,7 +479,7 @@ func (s *Server) Downstreams() map[string]string { snapshotOfDownstream := make(map[string]string, len(s.downstreams)) for addr, client := range s.downstreams { - snapshotOfDownstream[addr] = client.Name() + snapshotOfDownstream[addr] = client.ClientID() } return snapshotOfDownstream } @@ -512,7 +504,7 @@ func (s *Server) AddDownstreamServer(addr string, c FrameWriterConnection) { func (s *Server) dispatchToDownstreams(c *Context) { dataFrame := c.Frame.(*frame.DataFrame) if c.Connection.ClientType() == ClientTypeUpstreamZipper { - c.Logger.Warn("ignored client", "client_type", c.Connection.ClientType().String()) + c.Logger.Debug("ignored client", "client_type", c.Connection.ClientType().String()) // loop protection return } @@ -528,8 +520,8 @@ func (s *Server) dispatchToDownstreams(c *Context) { } dataFrame.Metadata = mdBytes - for streamID, ds := range s.downstreams { - c.Logger.Info("dispatching to downstream", "dispatch_stream_id", streamID, "tid", tid, "sid", sid) + for _, ds := range s.downstreams { + c.Logger.Info("dispatching to downstream", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "downstream_id", ds.ClientID()) _ = ds.WriteFrame(dataFrame) } } diff --git a/example/4-cascading-zipper/zipper_1.yaml b/example/4-cascading-zipper/zipper_1.yaml index f454c8cdf..f09ce0168 100644 --- a/example/4-cascading-zipper/zipper_1.yaml +++ b/example/4-cascading-zipper/zipper_1.yaml @@ -1,11 +1,11 @@ -name: Zipper-1 +name: zipper-1 host: 0.0.0.0 port: 9001 auth: type: token token: z1 downstreams: - Zipper-2: + zipper-2: host: 127.0.0.1 port: 9002 credential: "token:z2" diff --git a/example/4-cascading-zipper/zipper_2.yaml b/example/4-cascading-zipper/zipper_2.yaml index 7d3827b2b..0b89c587b 100644 --- a/example/4-cascading-zipper/zipper_2.yaml +++ b/example/4-cascading-zipper/zipper_2.yaml @@ -5,7 +5,7 @@ auth: type: token token: z2 downstreams: - Zipper-1: + zipper-1: host: 127.0.0.1 port: 9001 credential: "token:z1" diff --git a/zipper.go b/zipper.go index 5cedbf716..823fc85d1 100644 --- a/zipper.go +++ b/zipper.go @@ -73,11 +73,7 @@ func NewZipper(name string, meshConfig map[string]config.Downstream, options ... ) downstream := core.NewClient(name, core.ClientTypeUpstreamZipper, clientOptions...) - server.Logger().Debug("add downstream", - "downstream_name", downstreamName, - "downstream_addr", addr, - "client_id", downstream.ClientID(), - ) + server.Logger().Info("add downstream", "name", downstreamName, "addr", addr, "downstream_id", downstream.ClientID()) server.AddDownstreamServer(addr, downstream) } From cb07e40597ab1571fe481d1292a9eb76ff35d3c0 Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 22 Sep 2023 10:15:23 +0800 Subject: [PATCH 2/5] f --- core/server.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/server.go b/core/server.go index f5e04e475..b9fb25e43 100644 --- a/core/server.go +++ b/core/server.go @@ -148,8 +148,8 @@ func (s *Server) handleConnection(qconn quic.Connection, fs *FrameStream, logger return } - logger = logger.With("conn_id", conn.ID()) - logger.Info("client connected", "remote_addr", qconn.RemoteAddr().String(), "client_type", conn.ClientType().String(), "name", conn.Name()) + logger = logger.With("conn_id", conn.ID(), "conn_name", conn.Name()) + logger.Info("client connected", "remote_addr", qconn.RemoteAddr().String(), "client_type", conn.ClientType().String()) c := newContext(conn, route, logger) @@ -340,6 +340,7 @@ func (s *Server) mainFrameHandler(c *Context) error { func (s *Server) handleDataFrame(c *Context) error { dataFrame := c.Frame.(*frame.DataFrame) + data_length := len(dataFrame.Payload) // counter +1 atomic.AddInt64(&s.counterOfDataFrame, 1) @@ -399,8 +400,10 @@ func (s *Server) handleDataFrame(c *Context) error { // find stream function ids from the route. streamIDs := route.GetForwardRoutes(dataFrame.Tag) - - c.Logger.Debug("sfn routing", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot()) + if len(streamIDs) == 0 { + c.Logger.Info("no observed", "tag", dataFrame.Tag, "data_length", data_length) + } + c.Logger.Debug("connector snapshot", "tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot()) for _, toID := range streamIDs { stream, ok, err := s.connector.Get(toID) @@ -412,7 +415,7 @@ func (s *Server) handleDataFrame(c *Context) error { continue } - c.Logger.Info("routing data frame", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "to", toID) + c.Logger.Info("data routing", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", data_length, "to_id", toID, "to_name", stream.Name()) // write data frame to stream if err := stream.WriteFrame(dataFrame); err != nil { From 15d12801d9a15f12ca1fd6c7b9534e82470e624e Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 22 Sep 2023 10:25:23 +0800 Subject: [PATCH 3/5] unit test --- core/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/client_test.go b/core/client_test.go index 0815e6a6a..7eee0c4ee 100644 --- a/core/client_test.go +++ b/core/client_test.go @@ -63,7 +63,7 @@ func TestFrameRoundTrip(t *testing.T) { recorder := newFrameWriterRecorder("mockClient") server.AddDownstreamServer("mockAddr", recorder) - assert.Equal(t, map[string]string{"mockAddr": "mockClient"}, server.Downstreams()) + assert.Equal(t, server.Downstreams()["mockAddr"], recorder.ClientID()) go func() { err := server.ListenAndServe(ctx, testaddr) From f67a4e35e4adf0ba3f275e1cd3cd3bcb8f84192f Mon Sep 17 00:00:00 2001 From: wujunzhuo Date: Fri, 22 Sep 2023 10:53:00 +0800 Subject: [PATCH 4/5] rename stream_id to conn_id --- core/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/server.go b/core/server.go index b9fb25e43..9b28b57ba 100644 --- a/core/server.go +++ b/core/server.go @@ -399,13 +399,13 @@ func (s *Server) handleDataFrame(c *Context) error { } // find stream function ids from the route. - streamIDs := route.GetForwardRoutes(dataFrame.Tag) - if len(streamIDs) == 0 { + connIDs := route.GetForwardRoutes(dataFrame.Tag) + if len(connIDs) == 0 { c.Logger.Info("no observed", "tag", dataFrame.Tag, "data_length", data_length) } - c.Logger.Debug("connector snapshot", "tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot()) + c.Logger.Debug("connector snapshot", "tag", dataFrame.Tag, "sfn_conn_ids", connIDs, "connector", s.connector.Snapshot()) - for _, toID := range streamIDs { + for _, toID := range connIDs { stream, ok, err := s.connector.Get(toID) if err != nil { continue From 3179d450f87d97e3f164c6eab86d5710f2e03cfc Mon Sep 17 00:00:00 2001 From: woorui Date: Fri, 22 Sep 2023 11:31:28 +0800 Subject: [PATCH 5/5] test: stable test --- core/router/default_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/router/default_test.go b/core/router/default_test.go index 35bfcac32..e621e8b25 100644 --- a/core/router/default_test.go +++ b/core/router/default_test.go @@ -25,7 +25,7 @@ func TestRouter(t *testing.T) { assert.NoError(t, err) ids := route.GetForwardRoutes(frame.Tag(1)) - assert.Equal(t, []string{"conn-1", "conn-2", "conn-3"}, ids) + assert.ElementsMatch(t, []string{"conn-1", "conn-2", "conn-3"}, ids) err = route.Remove("conn-1") assert.NoError(t, err)