Skip to content

Commit

Permalink
feat: router allows duplicate name sfn (#623)
Browse files Browse the repository at this point in the history
  • Loading branch information
woorui authored Sep 21, 2023
1 parent bf4d4bc commit a3f7bac
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 220 deletions.
29 changes: 10 additions & 19 deletions core/router/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
package router

import (
"fmt"
"sync"

"github.com/yomorun/yomo/core/frame"
"github.com/yomorun/yomo/core/metadata"
"github.com/yomorun/yomo/core/yerr"
)

// DefaultRouter providers a default implement of `router`,
Expand Down Expand Up @@ -37,39 +35,32 @@ func (r *DefaultRouter) Clean() {
}

type defaultRoute struct {
data map[frame.Tag]map[string]string
mu sync.RWMutex
// mu protects data.
mu sync.RWMutex

// data stores tag and connID connection.
// The key is frame tag, The value is connID connection.
data map[frame.Tag]map[string]struct{}
}

func newRoute() *defaultRoute {
return &defaultRoute{
data: make(map[frame.Tag]map[string]string),
data: make(map[frame.Tag]map[string]struct{}),
mu: sync.RWMutex{},
}
}

func (r *defaultRoute) Add(connID string, name string, observeDataTags []frame.Tag) (err error) {
func (r *defaultRoute) Add(connID string, observeDataTags []frame.Tag) (err error) {
r.mu.Lock()
defer r.mu.Unlock()

LOOP:
for _, conns := range r.data {
for connID, n := range conns {
if n == name {
err = yerr.NewDuplicateNameError(connID, fmt.Errorf("SFN[%s] is already linked to another stream", name))
delete(conns, connID)
break LOOP
}
}
}

for _, tag := range observeDataTags {
conns := r.data[tag]
if conns == nil {
conns = make(map[string]string)
conns = map[string]struct{}{}
r.data[tag] = conns
}
r.data[tag][connID] = name
r.data[tag][connID] = struct{}{}
}

return err
Expand Down
12 changes: 9 additions & 3 deletions core/router/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@ func TestRouter(t *testing.T) {

route := router.Route(m)

err := route.Add("conn-1", "sfn-1", []frame.Tag{frame.Tag(1)})
err := route.Add("conn-1", []frame.Tag{frame.Tag(1)})
assert.NoError(t, err)

err = route.Add("conn-2", []frame.Tag{frame.Tag(1)})
assert.NoError(t, err)

err = route.Add("conn-3", []frame.Tag{frame.Tag(1)})
assert.NoError(t, err)

ids := route.GetForwardRoutes(frame.Tag(1))
assert.Equal(t, []string{"conn-1"}, ids)
assert.Equal(t, []string{"conn-1", "conn-2", "conn-3"}, ids)

err = route.Remove("conn-1")
assert.NoError(t, err)

ids = route.GetForwardRoutes(frame.Tag(1))
assert.Equal(t, []string(nil), ids)
assert.Equal(t, []string{"conn-2", "conn-3"}, ids)

router.Clean()

Expand Down
2 changes: 1 addition & 1 deletion core/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Router interface {
// Route manages data subscribers according to their observed data tags.
type Route interface {
// Add a route.
Add(connID string, name string, observeDataTags []frame.Tag) error
Add(connID string, observeDataTags []frame.Tag) error
// Remove a route.
Remove(connID string) error
// GetForwardRoutes returns all the subscribers by the given data tag.
Expand Down
2 changes: 1 addition & 1 deletion core/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *Server) handleRoute(hf *frame.HandshakeFrame, md metadata.M) (router.Ro
if route == nil {
return nil, errors.New("yomo: can't find route in handshake metadata")
}
err := route.Add(hf.ID, hf.Name, hf.ObserveDataTags)
err := route.Add(hf.ID, hf.ObserveDataTags)
if err != nil {
return nil, err
}
Expand Down
141 changes: 0 additions & 141 deletions core/yerr/errors.go

This file was deleted.

55 changes: 0 additions & 55 deletions core/yerr/errors_test.go

This file was deleted.

0 comments on commit a3f7bac

Please sign in to comment.