From a3f7bac07fbb3ac9840ec0077ae455d84aa29d54 Mon Sep 17 00:00:00 2001 From: wurui Date: Thu, 21 Sep 2023 22:27:38 +0800 Subject: [PATCH] feat: router allows duplicate name sfn (#623) --- core/router/default.go | 29 +++----- core/router/default_test.go | 12 ++- core/router/router.go | 2 +- core/server.go | 2 +- core/yerr/errors.go | 141 ------------------------------------ core/yerr/errors_test.go | 55 -------------- 6 files changed, 21 insertions(+), 220 deletions(-) delete mode 100644 core/yerr/errors.go delete mode 100644 core/yerr/errors_test.go diff --git a/core/router/default.go b/core/router/default.go index 1b19cd502..1e8dbbe41 100644 --- a/core/router/default.go +++ b/core/router/default.go @@ -2,12 +2,10 @@ package router import ( - "fmt" "sync" "github.com/yomorun/yomo/core/frame" "github.com/yomorun/yomo/core/metadata" - "github.com/yomorun/yomo/core/yerr" ) // DefaultRouter providers a default implement of `router`, @@ -37,39 +35,32 @@ func (r *DefaultRouter) Clean() { } type defaultRoute struct { - data map[frame.Tag]map[string]string - mu sync.RWMutex + // mu protects data. + mu sync.RWMutex + + // data stores tag and connID connection. + // The key is frame tag, The value is connID connection. + data map[frame.Tag]map[string]struct{} } func newRoute() *defaultRoute { return &defaultRoute{ - data: make(map[frame.Tag]map[string]string), + data: make(map[frame.Tag]map[string]struct{}), mu: sync.RWMutex{}, } } -func (r *defaultRoute) Add(connID string, name string, observeDataTags []frame.Tag) (err error) { +func (r *defaultRoute) Add(connID string, observeDataTags []frame.Tag) (err error) { r.mu.Lock() defer r.mu.Unlock() -LOOP: - for _, conns := range r.data { - for connID, n := range conns { - if n == name { - err = yerr.NewDuplicateNameError(connID, fmt.Errorf("SFN[%s] is already linked to another stream", name)) - delete(conns, connID) - break LOOP - } - } - } - for _, tag := range observeDataTags { conns := r.data[tag] if conns == nil { - conns = make(map[string]string) + conns = map[string]struct{}{} r.data[tag] = conns } - r.data[tag][connID] = name + r.data[tag][connID] = struct{}{} } return err diff --git a/core/router/default_test.go b/core/router/default_test.go index c6e5ff959..35bfcac32 100644 --- a/core/router/default_test.go +++ b/core/router/default_test.go @@ -15,17 +15,23 @@ func TestRouter(t *testing.T) { route := router.Route(m) - err := route.Add("conn-1", "sfn-1", []frame.Tag{frame.Tag(1)}) + err := route.Add("conn-1", []frame.Tag{frame.Tag(1)}) + assert.NoError(t, err) + + err = route.Add("conn-2", []frame.Tag{frame.Tag(1)}) + assert.NoError(t, err) + + err = route.Add("conn-3", []frame.Tag{frame.Tag(1)}) assert.NoError(t, err) ids := route.GetForwardRoutes(frame.Tag(1)) - assert.Equal(t, []string{"conn-1"}, ids) + assert.Equal(t, []string{"conn-1", "conn-2", "conn-3"}, ids) err = route.Remove("conn-1") assert.NoError(t, err) ids = route.GetForwardRoutes(frame.Tag(1)) - assert.Equal(t, []string(nil), ids) + assert.Equal(t, []string{"conn-2", "conn-3"}, ids) router.Clean() diff --git a/core/router/router.go b/core/router/router.go index 45bc7adb7..cf09b70e4 100644 --- a/core/router/router.go +++ b/core/router/router.go @@ -17,7 +17,7 @@ type Router interface { // Route manages data subscribers according to their observed data tags. type Route interface { // Add a route. - Add(connID string, name string, observeDataTags []frame.Tag) error + Add(connID string, observeDataTags []frame.Tag) error // Remove a route. Remove(connID string) error // GetForwardRoutes returns all the subscribers by the given data tag. diff --git a/core/server.go b/core/server.go index 9f7207d87..fa2738f72 100644 --- a/core/server.go +++ b/core/server.go @@ -219,7 +219,7 @@ func (s *Server) handleRoute(hf *frame.HandshakeFrame, md metadata.M) (router.Ro if route == nil { return nil, errors.New("yomo: can't find route in handshake metadata") } - err := route.Add(hf.ID, hf.Name, hf.ObserveDataTags) + err := route.Add(hf.ID, hf.ObserveDataTags) if err != nil { return nil, err } diff --git a/core/yerr/errors.go b/core/yerr/errors.go deleted file mode 100644 index 862bfb36d..000000000 --- a/core/yerr/errors.go +++ /dev/null @@ -1,141 +0,0 @@ -// Package yerr describes yomo errors -package yerr - -import ( - "fmt" - - "github.com/quic-go/quic-go" -) - -// YomoError yomo error -type YomoError interface { - error - // ErrorCode getter method - ErrorCode() ErrorCode -} - -type yomoError struct { - errorCode ErrorCode - err error -} - -// New create yomo error -func New(code ErrorCode, err error) YomoError { - return &yomoError{ - errorCode: code, - err: err, - } -} - -// Error is the built-in error interface -func (e *yomoError) Error() string { - return fmt.Sprintf("%s error: message=%s", e.errorCode, e.err.Error()) -} - -// ErrorCode getter method -func (e *yomoError) ErrorCode() ErrorCode { - return e.errorCode -} - -// ErrorCode error code -type ErrorCode uint64 - -const ( - // ErrorCodeAuthenticateFailed client auth failed. - ErrorCodeAuthenticateFailed ErrorCode = 0xC9 - // ErrorCodeClientAbort client abort - ErrorCodeClientAbort ErrorCode = 0xC7 - // ErrorCodeUnknown unknown error - ErrorCodeUnknown ErrorCode = 0xC0 - // ErrorCodeClosed net closed - ErrorCodeClosed ErrorCode = 0xC1 - // ErrorCodeBeforeHandler befor handler - ErrorCodeBeforeHandler ErrorCode = 0xC2 - // ErrorCodeMainHandler main handler - ErrorCodeMainHandler ErrorCode = 0xC3 - // ErrorCodeAfterHandler after handler - ErrorCodeAfterHandler ErrorCode = 0xC4 - // ErrorCodeHandshake handshake frame - ErrorCodeHandshake ErrorCode = 0xC5 - // ErrorCodeRejected server rejected - ErrorCodeRejected ErrorCode = 0xCC - // ErrorCodeGoaway goaway frame - ErrorCodeGoaway ErrorCode = 0xCF - // ErrorCodeData data frame - ErrorCodeData ErrorCode = 0xCE - // ErrorCodeUnknownClient unknown client error - ErrorCodeUnknownClient ErrorCode = 0xCD - // ErrorCodeDuplicateName unknown client error - ErrorCodeDuplicateName ErrorCode = 0xC6 - // ErrorCodeStartHandler start handler - ErrorCodeStartHandler ErrorCode = 0xC8 -) - -var errCodeStringMap = map[ErrorCode]string{ - ErrorCodeAuthenticateFailed: "AuthenticateFailed", - ErrorCodeClientAbort: "ClientAbort", - ErrorCodeUnknown: "UnknownError", - ErrorCodeClosed: "NetClosed", - ErrorCodeBeforeHandler: "BeforeHandler", - ErrorCodeMainHandler: "MainHandler", - ErrorCodeAfterHandler: "AfterHandler", - ErrorCodeHandshake: "Handshake", - ErrorCodeRejected: "Rejected", - ErrorCodeGoaway: "Goaway", - ErrorCodeData: "DataFrame", - ErrorCodeUnknownClient: "UnknownClient", - ErrorCodeDuplicateName: "DuplicateName", - ErrorCodeStartHandler: "StartHandler", -} - -func (e ErrorCode) String() string { - msg, ok := errCodeStringMap[e] - if !ok { - return "XXX" - } - return msg -} - -// Is parse quic ApplicationErrorCode to yomo ErrorCode -func Is(qerr quic.ApplicationErrorCode, yerr ErrorCode) bool { - return uint64(qerr) == uint64(yerr) -} - -// Parse parse quic ApplicationErrorCode -func Parse(qerr quic.ApplicationErrorCode) ErrorCode { - return ErrorCode(qerr) -} - -// To convert yomo ErrorCode to quic ApplicationErrorCode -func (e ErrorCode) To() quic.ApplicationErrorCode { - return quic.ApplicationErrorCode(e) -} - -// DuplicateNameError duplicate name(sfn) -type DuplicateNameError struct { - streamID string - err error -} - -// NewDuplicateNameError create a duplicate name error -func NewDuplicateNameError(streamID string, err error) DuplicateNameError { - return DuplicateNameError{ - streamID: streamID, - err: err, - } -} - -// Error raw error -func (e DuplicateNameError) Error() string { - return e.err.Error() -} - -// ErrorCode getter method -func (e DuplicateNameError) ErrorCode() ErrorCode { - return ErrorCodeDuplicateName -} - -// StreamID duplicate stream ID -func (e DuplicateNameError) StreamID() string { - return e.streamID -} diff --git a/core/yerr/errors_test.go b/core/yerr/errors_test.go deleted file mode 100644 index bb9d03d9d..000000000 --- a/core/yerr/errors_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package yerr - -import ( - "errors" - "testing" - - "github.com/quic-go/quic-go" - "github.com/stretchr/testify/assert" -) - -func TestError(t *testing.T) { - var ( - err = errors.New("closed") - code = ErrorCodeClosed - ) - - se := New(code, err) - - assert.Equal(t, "NetClosed error: message=closed", se.Error()) - assert.Equal(t, code, se.ErrorCode()) -} - -func TestErrorCode(t *testing.T) { - code := ErrorCodeData - - assert.Equal(t, "DataFrame", code.String()) - - unknown := ErrorCode(123321) - assert.Equal(t, "XXX", unknown.String()) - - qcode := quic.ApplicationErrorCode(206) - - yes := Is(qcode, code) - - assert.True(t, yes) - - parsed := Parse(qcode) - assert.Equal(t, code, parsed) - - to := code.To() - assert.Equal(t, to, qcode) -} - -func TestDuplicateName(t *testing.T) { - var ( - err = errors.New("errmsg") - connID = "mock-id" - ) - - se := NewDuplicateNameError(connID, err) - - assert.Equal(t, err.Error(), se.Error()) - assert.Equal(t, ErrorCodeDuplicateName, se.ErrorCode()) - assert.Equal(t, connID, se.StreamID()) -}