diff --git a/pkg/server/fsm.go b/pkg/server/fsm.go index 9d8ffabe4..3181c08e6 100644 --- a/pkg/server/fsm.go +++ b/pkg/server/fsm.go @@ -38,6 +38,7 @@ import ( const ( minConnectRetryInterval = 5 + bufferSize = 16 ) type fsmStateReasonType uint8 @@ -177,7 +178,7 @@ type fsm struct { lock sync.RWMutex state bgp.FSMState outgoingCh *channels.InfiniteChannel - incomingCh *channels.InfiniteChannel + incomingCh chan interface{} reason *fsmStateReason conn net.Conn connCh chan net.Conn @@ -274,7 +275,7 @@ func newFSM(gConf *oc.Global, pConf *oc.Neighbor, logger log.Logger) *fsm { pConf: pConf, state: bgp.BGP_FSM_IDLE, outgoingCh: channels.NewInfiniteChannel(), - incomingCh: channels.NewInfiniteChannel(), + incomingCh: make(chan interface{}, bufferSize), connCh: make(chan net.Conn, 1), opensentHoldTime: float64(holdtimeOpensent), adminState: adminState, @@ -382,9 +383,9 @@ func (fsm *fsm) sendNotification(code, subType uint8, data []byte, msg string) ( type fsmHandler struct { fsm *fsm conn net.Conn - msgCh *channels.InfiniteChannel + msgCh chan interface{} stateReasonCh chan fsmStateReason - incoming *channels.InfiniteChannel + incoming chan interface{} outgoing *channels.InfiniteChannel holdTimerResetCh chan bool sentNotification *bgp.BGPMessage @@ -1184,12 +1185,12 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) { func (h *fsmHandler) recvMessage(ctx context.Context, wg *sync.WaitGroup) error { defer func() { - h.msgCh.Close() + close(h.msgCh) wg.Done() }() fmsg, _ := h.recvMessageWithError() if fmsg != nil { - h.msgCh.In() <- fmsg + h.msgCh <- fmsg } return nil } @@ -1262,7 +1263,7 @@ func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReaso fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - h.msgCh = channels.NewInfiniteChannel() + h.msgCh = make(chan interface{}, bufferSize) fsm.lock.RLock() h.conn = fsm.conn @@ -1313,7 +1314,7 @@ func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReaso h.conn.Close() return bgp.BGP_FSM_IDLE, newfsmStateReason(fsmRestartTimerExpired, nil, nil) } - case i, ok := <-h.msgCh.Out(): + case i, ok := <-h.msgCh: if !ok { continue } @@ -1527,7 +1528,7 @@ func keepaliveTicker(fsm *fsm) *time.Ticker { func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateReason) { fsm := h.fsm ticker := keepaliveTicker(fsm) - h.msgCh = channels.NewInfiniteChannel() + h.msgCh = make(chan interface{}, bufferSize) fsm.lock.RLock() h.conn = fsm.conn @@ -1584,7 +1585,7 @@ func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateRe // TODO: check error fsm.conn.Write(b) fsm.bgpMessageStateUpdate(m.Header.Type, false) - case i, ok := <-h.msgCh.Out(): + case i, ok := <-h.msgCh: if !ok { continue } @@ -1812,7 +1813,7 @@ func (h *fsmHandler) recvMessageloop(ctx context.Context, wg *sync.WaitGroup) er for { fmsg, err := h.recvMessageWithError() if fmsg != nil { - h.msgCh.In() <- fmsg + h.msgCh <- fmsg } if err != nil { return nil @@ -2005,7 +2006,7 @@ func (h *fsmHandler) loop(ctx context.Context, wg *sync.WaitGroup) error { fsm.lock.RUnlock() fsm.lock.RLock() - h.incoming.In() <- &fsmMsg{ + h.incoming <- &fsmMsg{ fsm: fsm, MsgType: fsmMsgStateChange, MsgSrc: fsm.pConf.State.NeighborAddress, diff --git a/pkg/server/fsm_test.go b/pkg/server/fsm_test.go index 1920935d7..eebc4db8b 100644 --- a/pkg/server/fsm_test.go +++ b/pkg/server/fsm_test.go @@ -376,7 +376,7 @@ func makePeerAndHandler() (*peer, *fsmHandler) { h := &fsmHandler{ fsm: p.fsm, stateReasonCh: make(chan fsmStateReason, 2), - incoming: channels.NewInfiniteChannel(), + incoming: make(chan interface{}, bufferSize), outgoing: channels.NewInfiniteChannel(), } diff --git a/pkg/server/server.go b/pkg/server/server.go index e54c56b8b..f980830d4 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -167,7 +167,7 @@ type BgpServer struct { apiServer *server bgpConfig oc.Bgp acceptCh chan *net.TCPConn - incomings []*channels.InfiniteChannel + incomings []chan interface{} mgmtCh chan *mgmtOp policy *table.RoutingPolicy listeners []*tcpListener @@ -232,11 +232,11 @@ func (s *BgpServer) Stop() { } } -func (s *BgpServer) addIncoming(ch *channels.InfiniteChannel) { +func (s *BgpServer) addIncoming(ch chan interface{}) { s.incomings = append(s.incomings, ch) } -func (s *BgpServer) delIncoming(ch *channels.InfiniteChannel) { +func (s *BgpServer) delIncoming(ch chan interface{}) { for i, c := range s.incomings { if c == ch { s.incomings = append(s.incomings[:i], s.incomings[i+1:]...) @@ -444,7 +444,7 @@ func (s *BgpServer) Serve() { } cleanInfiniteChannel(fsm.outgoingCh) - cleanInfiniteChannel(fsm.incomingCh) + cleanFiniteChannel(fsm.incomingCh) s.delIncoming(fsm.incomingCh) if s.shutdownWG != nil && len(s.incomings) == 0 { s.shutdownWG.Done() @@ -480,7 +480,7 @@ func (s *BgpServer) Serve() { for i := firstPeerCaseIndex; i < len(cases); i++ { cases[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex].Out()), + Chan: reflect.ValueOf(s.incomings[i-firstPeerCaseIndex]), } } @@ -1467,7 +1467,7 @@ func (s *BgpServer) deleteDynamicNeighbor(peer *peer, oldState bgp.FSMState, e * delete(s.neighborMap, peer.fsm.pConf.State.NeighborAddress) peer.fsm.lock.RUnlock() cleanInfiniteChannel(peer.fsm.outgoingCh) - cleanInfiniteChannel(peer.fsm.incomingCh) + cleanFiniteChannel(peer.fsm.incomingCh) s.delIncoming(peer.fsm.incomingCh) s.broadcastPeerState(peer, oldState, e) } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index bb6c5dbeb..2c8d6625e 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -1435,7 +1435,7 @@ func TestTcpConnectionClosedAfterPeerDel(t *testing.T) { assert.Nil(err) // Wait for the s1 to receive the tcp connection from s2. - ev := <-incoming.Out() + ev := <-incoming msg := ev.(*fsmMsg) nextState := msg.MsgData.(bgp.FSMState) assert.Equal(nextState, bgp.BGP_FSM_OPENSENT) @@ -1453,7 +1453,7 @@ func TestTcpConnectionClosedAfterPeerDel(t *testing.T) { assert.Nil(err) // Send the message OPENSENT transition message again to the server. - incoming.In() <- msg + incoming <- msg // Wait for peer connection channel to be closed and check that the open // tcp connection has also been closed. diff --git a/pkg/server/util.go b/pkg/server/util.go index 9b0b71818..88a47bc52 100644 --- a/pkg/server/util.go +++ b/pkg/server/util.go @@ -27,6 +27,12 @@ import ( "github.com/osrg/gobgp/v3/pkg/packet/bgp" ) +func cleanFiniteChannel(ch chan interface{}) { + close(ch) + for range ch { + } +} + func cleanInfiniteChannel(ch *channels.InfiniteChannel) { ch.Close() // drain all remaining items