Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make fsm ingoing channel finite #2862

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions pkg/server/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

const (
minConnectRetryInterval = 5
bufferSize = 16
)

type fsmStateReasonType uint8
Expand Down Expand Up @@ -177,7 +178,7 @@ type fsm struct {
lock sync.RWMutex
state bgp.FSMState
outgoingCh *channels.InfiniteChannel
incomingCh *channels.InfiniteChannel
incomingCh chan interface{}
reason *fsmStateReason
conn net.Conn
connCh chan net.Conn
Expand Down Expand Up @@ -274,7 +275,7 @@ func newFSM(gConf *oc.Global, pConf *oc.Neighbor, logger log.Logger) *fsm {
pConf: pConf,
state: bgp.BGP_FSM_IDLE,
outgoingCh: channels.NewInfiniteChannel(),
incomingCh: channels.NewInfiniteChannel(),
incomingCh: make(chan interface{}, bufferSize),
connCh: make(chan net.Conn, 1),
opensentHoldTime: float64(holdtimeOpensent),
adminState: adminState,
Expand Down Expand Up @@ -382,9 +383,9 @@ func (fsm *fsm) sendNotification(code, subType uint8, data []byte, msg string) (
type fsmHandler struct {
fsm *fsm
conn net.Conn
msgCh *channels.InfiniteChannel
msgCh chan interface{}
stateReasonCh chan fsmStateReason
incoming *channels.InfiniteChannel
incoming chan interface{}
outgoing *channels.InfiniteChannel
holdTimerResetCh chan bool
sentNotification *bgp.BGPMessage
Expand Down Expand Up @@ -1184,12 +1185,12 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {

func (h *fsmHandler) recvMessage(ctx context.Context, wg *sync.WaitGroup) error {
defer func() {
h.msgCh.Close()
close(h.msgCh)
wg.Done()
}()
fmsg, _ := h.recvMessageWithError()
if fmsg != nil {
h.msgCh.In() <- fmsg
h.msgCh <- fmsg
}
return nil
}
Expand Down Expand Up @@ -1262,7 +1263,7 @@ func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReaso
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)

h.msgCh = channels.NewInfiniteChannel()
h.msgCh = make(chan interface{}, bufferSize)

fsm.lock.RLock()
h.conn = fsm.conn
Expand Down Expand Up @@ -1313,7 +1314,7 @@ func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReaso
h.conn.Close()
return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmRestartTimerExpired, nil, nil)
}
case i, ok := <-h.msgCh.Out():
case i, ok := <-h.msgCh:
if !ok {
continue
}
Expand Down Expand Up @@ -1527,7 +1528,7 @@ func keepaliveTicker(fsm *fsm) *time.Ticker {
func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateReason) {
fsm := h.fsm
ticker := keepaliveTicker(fsm)
h.msgCh = channels.NewInfiniteChannel()
h.msgCh = make(chan interface{}, bufferSize)
fsm.lock.RLock()
h.conn = fsm.conn

Expand Down Expand Up @@ -1584,7 +1585,7 @@ func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateRe
// TODO: check error
fsm.conn.Write(b)
fsm.bgpMessageStateUpdate(m.Header.Type, false)
case i, ok := <-h.msgCh.Out():
case i, ok := <-h.msgCh:
if !ok {
continue
}
Expand Down Expand Up @@ -1812,7 +1813,7 @@ func (h *fsmHandler) recvMessageloop(ctx context.Context, wg *sync.WaitGroup) er
for {
fmsg, err := h.recvMessageWithError()
if fmsg != nil {
h.msgCh.In() <- fmsg
h.msgCh <- fmsg
}
if err != nil {
return nil
Expand Down Expand Up @@ -2005,7 +2006,7 @@ func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error {
fsm.lock.RUnlock()

fsm.lock.RLock()
h.incoming.In() <- &fsmMsg{
h.incoming <- &fsmMsg{
fsm: fsm,
MsgType: fsmMsgStateChange,
MsgSrc: fsm.pConf.State.NeighborAddress,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func makePeerAndHandler() (*peer, *fsmHandler) {
h := &fsmHandler{
fsm: p.fsm,
stateReasonCh: make(chan fsmStateReason, 2),
incoming: channels.NewInfiniteChannel(),
incoming: make(chan interface{}, bufferSize),
outgoing: channels.NewInfiniteChannel(),
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type BgpServer struct {
apiServer *server
bgpConfig oc.Bgp
acceptCh chan *net.TCPConn
incomings []*channels.InfiniteChannel
incomings []chan interface{}
mgmtCh chan *mgmtOp
policy *table.RoutingPolicy
listeners []*tcpListener
Expand Down Expand Up @@ -232,11 +232,11 @@ func (s *BgpServer) Stop() {
}
}

func (s *BgpServer) addIncoming(ch *channels.InfiniteChannel) {
func (s *BgpServer) addIncoming(ch chan interface{}) {
s.incomings = append(s.incomings, ch)
}

func (s *BgpServer) delIncoming(ch *channels.InfiniteChannel) {
func (s *BgpServer) delIncoming(ch chan interface{}) {
for i, c := range s.incomings {
if c == ch {
s.incomings = append(s.incomings[:i], s.incomings[i+1:]...)
Expand Down Expand Up @@ -444,7 +444,7 @@ func (s *BgpServer) Serve() {
}

cleanInfiniteChannel(fsm.outgoingCh)
cleanInfiniteChannel(fsm.incomingCh)
cleanFiniteChannel(fsm.incomingCh)
s.delIncoming(fsm.incomingCh)
if s.shutdownWG != nil && len(s.incomings) == 0 {
s.shutdownWG.Done()
Expand Down Expand Up @@ -480,7 +480,7 @@ func (s *BgpServer) Serve() {
for i := firstPeerCaseIndex; i < len(cases); i++ {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex].Out()),
Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex]),
}
}

Expand Down Expand Up @@ -1467,7 +1467,7 @@ func (s *BgpServer) deleteDynamicNeighbor(peer *peer, oldState bgp.FSMState, e *
delete(s.neighborMap, peer.fsm.pConf.State.NeighborAddress)
peer.fsm.lock.RUnlock()
cleanInfiniteChannel(peer.fsm.outgoingCh)
cleanInfiniteChannel(peer.fsm.incomingCh)
cleanFiniteChannel(peer.fsm.incomingCh)
s.delIncoming(peer.fsm.incomingCh)
s.broadcastPeerState(peer, oldState, e)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ func TestTcpConnectionClosedAfterPeerDel(t *testing.T) {
assert.Nil(err)

// Wait for the s1 to receive the tcp connection from s2.
ev := <-incoming.Out()
ev := <-incoming
msg := ev.(*fsmMsg)
nextState := msg.MsgData.(bgp.FSMState)
assert.Equal(nextState, bgp.BGP_FSM_OPENSENT)
Expand All @@ -1453,7 +1453,7 @@ func TestTcpConnectionClosedAfterPeerDel(t *testing.T) {
assert.Nil(err)

// Send the message OPENSENT transition message again to the server.
incoming.In() <- msg
incoming <- msg

// Wait for peer connection channel to be closed and check that the open
// tcp connection has also been closed.
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
"github.com/osrg/gobgp/v3/pkg/packet/bgp"
)

func cleanFiniteChannel(ch chan interface{}) {
close(ch)
for range ch {
}
}

func cleanInfiniteChannel(ch *channels.InfiniteChannel) {
ch.Close()
// drain all remaining items
Expand Down