Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: improve zipper log readability #626

Merged
merged 5 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ go run main.go

#### Write the serverless function

See [../example/9-cli/sfn/main.go](../example/9-cli/sfn/main.go)
See [../example/9-cli/sfn/app.go](../example/9-cli/sfn/app.go)

#### Build

Expand Down
1 change: 1 addition & 0 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func (c *Client) Name() string { return c.name }
// FrameWriterConnection represents a frame writer that can connect to an addr.
type FrameWriterConnection interface {
frame.Writer
ClientID() string
Name() string
Close() error
Connect(context.Context, string) error
Expand Down
6 changes: 5 additions & 1 deletion core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/yomorun/yomo/core/router"
"github.com/yomorun/yomo/core/ylog"
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
)

const testaddr = "127.0.0.1:19999"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestFrameRoundTrip(t *testing.T) {
recorder := newFrameWriterRecorder("mockClient")
server.AddDownstreamServer("mockAddr", recorder)

assert.Equal(t, map[string]string{"mockAddr": "mockClient"}, server.Downstreams())
assert.Equal(t, server.Downstreams()["mockAddr"], recorder.ClientID())

go func() {
err := server.ListenAndServe(ctx, testaddr)
Expand Down Expand Up @@ -224,6 +225,7 @@ func createTestStreamFunction(name string, observedTag frame.Tag) *Client {

// frameWriterRecorder frames be writen.
type frameWriterRecorder struct {
id string
name string
codec frame.Codec
packetReader frame.PacketReadWriter
Expand All @@ -233,13 +235,15 @@ type frameWriterRecorder struct {

func newFrameWriterRecorder(name string) *frameWriterRecorder {
return &frameWriterRecorder{
id: id.New(),
name: name,
codec: y3codec.Codec(),
packetReader: y3codec.PacketReadWriter(),
buf: new(bytes.Buffer),
}
}

func (w *frameWriterRecorder) ClientID() string { return w.id }
func (w *frameWriterRecorder) Name() string { return w.name }
func (w *frameWriterRecorder) Close() error { return nil }
func (w *frameWriterRecorder) Connect(_ context.Context, _ string) error { return nil }
Expand Down
8 changes: 0 additions & 8 deletions core/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ func newContext(conn Connection, route router.Route, logger *slog.Logger) (c *Co
c = v.(*Context)
}

logger = logger.With(
"conn_id", conn.ID(),
"conn_name", conn.Name(),
"conn_type", conn.ClientType().String(),
)

c.Connection = conn
c.Route = route
c.BaseLogger = logger
Expand All @@ -130,8 +124,6 @@ func (c *Context) WithFrame(f frame.Frame) error {
return err
}

c.Logger = c.BaseLogger.With(MetadataSlogAttr(fmd))

// merge connection metadata.
c.Connection.Metadata().Range(func(k, v string) bool {
fmd.Set(k, v)
Expand Down
37 changes: 16 additions & 21 deletions core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@
return err
}

s.logger = s.logger.With("zipper_addr", addr)

// connect to all downstreams.
for addr, client := range s.downstreams {
go client.Connect(ctx, addr)
Expand Down Expand Up @@ -145,12 +143,14 @@

func (s *Server) handleConnection(qconn quic.Connection, fs *FrameStream, logger *slog.Logger) {
ok, route, conn := s.handshake(qconn, fs)

if !ok {
logger.Info("handshake failed")
logger.Error("handshake failed")
return
}

logger = logger.With("conn_id", conn.ID(), "conn_name", conn.Name())
logger.Info("client connected", "remote_addr", qconn.RemoteAddr().String(), "client_type", conn.ClientType().String())

c := newContext(conn, route, logger)

s.handleContext(c)
Expand Down Expand Up @@ -233,7 +233,6 @@
s.logger.Warn("authentication failed", "credential", hf.AuthName)
return nil, fmt.Errorf("authentication failed: client credential name is %s", hf.AuthName)
}
s.logger.Info("authentication successful", "credential", hf.AuthName)

conn := newConnection(hf.Name, hf.ID, ClientType(hf.ClientType), md, hf.ObserveDataTags, qconn, fs)

Expand Down Expand Up @@ -265,7 +264,7 @@
}
s.listener = listener

s.logger.Info("zipper is up and running", "pid", os.Getpid(), "quic", s.opts.quicConfig.Versions, "auth_name", s.authNames())
s.logger.Info("zipper is up and running", "zipper_addr", s.listener.Addr().String(), "pid", os.Getpid(), "quic", s.opts.quicConfig.Versions, "auth_name", s.authNames())

defer closeServer(s.downstreams, s.connector, s.listener, s.router)

Expand All @@ -278,7 +277,6 @@
s.logger.Error("accepted an error when accepting a connection", "err", err)
return err
}
logger := s.logger.With("remote_addr", qconn.RemoteAddr().String(), "local_addr", conn.LocalAddr().String())

stream, err := qconn.AcceptStream(ctx)
if err != nil {
Expand All @@ -287,7 +285,7 @@

fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter())

go s.handleConnection(qconn, fs, logger)
go s.handleConnection(qconn, fs, s.logger)
}
}

Expand Down Expand Up @@ -342,6 +340,7 @@

func (s *Server) handleDataFrame(c *Context) error {
dataFrame := c.Frame.(*frame.DataFrame)
data_length := len(dataFrame.Payload)

// counter +1
atomic.AddInt64(&s.counterOfDataFrame, 1)
Expand Down Expand Up @@ -401,8 +400,10 @@

// find stream function ids from the route.
streamIDs := route.GetForwardRoutes(dataFrame.Tag)

c.Logger.Debug("sfn routing", "data_tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot())
if len(streamIDs) == 0 {
c.Logger.Info("no observed", "tag", dataFrame.Tag, "data_length", data_length)
}
c.Logger.Debug("connector snapshot", "tag", dataFrame.Tag, "sfn_stream_ids", streamIDs, "connector", s.connector.Snapshot())
wujunzhuo marked this conversation as resolved.
Show resolved Hide resolved

for _, toID := range streamIDs {
stream, ok, err := s.connector.Get(toID)
Expand All @@ -414,13 +415,7 @@
continue
}

c.Logger.Info(
"routing data frame",
"from_stream_name", from.Name(),
"from_stream_id", from.ID(),
"to_stream_name", stream.Name(),
"to_stream_id", toID,
)
c.Logger.Info("data routing", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", data_length, "to_id", toID, "to_name", stream.Name())

// write data frame to stream
if err := stream.WriteFrame(dataFrame); err != nil {
Expand Down Expand Up @@ -487,7 +482,7 @@

snapshotOfDownstream := make(map[string]string, len(s.downstreams))
for addr, client := range s.downstreams {
snapshotOfDownstream[addr] = client.Name()
snapshotOfDownstream[addr] = client.ClientID()
}
return snapshotOfDownstream
}
Expand All @@ -512,7 +507,7 @@
func (s *Server) dispatchToDownstreams(c *Context) {
dataFrame := c.Frame.(*frame.DataFrame)
if c.Connection.ClientType() == ClientTypeUpstreamZipper {
c.Logger.Warn("ignored client", "client_type", c.Connection.ClientType().String())
c.Logger.Debug("ignored client", "client_type", c.Connection.ClientType().String())

Check warning on line 510 in core/server.go

View check run for this annotation

Codecov / codecov/patch

core/server.go#L510

Added line #L510 was not covered by tests
// loop protection
return
}
Expand All @@ -528,8 +523,8 @@
}
dataFrame.Metadata = mdBytes

for streamID, ds := range s.downstreams {
c.Logger.Info("dispatching to downstream", "dispatch_stream_id", streamID, "tid", tid, "sid", sid)
for _, ds := range s.downstreams {
c.Logger.Info("dispatching to downstream", "tid", tid, "sid", sid, "tag", dataFrame.Tag, "data_length", len(dataFrame.Payload), "downstream_id", ds.ClientID())
_ = ds.WriteFrame(dataFrame)
}
}
Expand Down
4 changes: 2 additions & 2 deletions example/4-cascading-zipper/zipper_1.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name: Zipper-1
name: zipper-1
host: 0.0.0.0
port: 9001
auth:
type: token
token: z1
downstreams:
Zipper-2:
zipper-2:
host: 127.0.0.1
port: 9002
credential: "token:z2"
2 changes: 1 addition & 1 deletion example/4-cascading-zipper/zipper_2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ auth:
type: token
token: z2
downstreams:
Zipper-1:
zipper-1:
host: 127.0.0.1
port: 9001
credential: "token:z1"
6 changes: 1 addition & 5 deletions zipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ func NewZipper(name string, meshConfig map[string]config.Downstream, options ...
)
downstream := core.NewClient(name, core.ClientTypeUpstreamZipper, clientOptions...)

server.Logger().Debug("add downstream",
"downstream_name", downstreamName,
"downstream_addr", addr,
"client_id", downstream.ClientID(),
)
server.Logger().Info("add downstream", "name", downstreamName, "addr", addr, "downstream_id", downstream.ClientID())
server.AddDownstreamServer(addr, downstream)
}

Expand Down