Skip to content

Commit

Permalink
bmp: correctly encode path identifier in NLRI when needed
Browse files Browse the repository at this point in the history
The FSM has the right decoding options. First, we change the
`marshallingOptions` field to a slice instead of a single option for
consistency with the remaining of the code (`Serialize()` and
`Parse*()` accepts several options). This is not really needed as we
only initialize a single option or none, but I suppose this may matter
in the future.

Then, we pass the decoding options to the `watchUpdateEvent` struct.
From my understanding, this is the only one needing it. When
transmitting the local RIB, we don't need to encode paths with
AddPath. And when mirroring routes, the routes are already encoded.

I am however unable to correctly pass the decoding options in the
post-policy case. I don't see an obvious way to access the FSM in this
case.
  • Loading branch information
vincentbernat committed Sep 6, 2022
1 parent ed0b7f9 commit 0f273a9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 70 deletions.
12 changes: 6 additions & 6 deletions pkg/server/bmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (b *bmpClient) loop() {
tickerCh = t.C
}

write := func(msg *bmp.BMPMessage) error {
buf, _ := msg.Serialize()
write := func(msg *bmp.BMPMessage, options ...*bgp.MarshallingOption) error {
buf, _ := msg.Serialize(options...)
_, err := conn.Write(buf)
if err != nil {
b.s.logger.Warn("failed to write to bmp server",
Expand Down Expand Up @@ -197,14 +197,14 @@ func (b *bmpClient) loop() {
}
}
for _, path := range pathList {
for _, u := range table.CreateUpdateMsgFromPaths([]*table.Path{path}) {
payload, _ := u.Serialize()
if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, true, info, path.GetTimestamp().Unix(), payload)); err != nil {
for _, u := range table.CreateUpdateMsgFromPaths([]*table.Path{path}, msg.MarshallingOptions...) {
payload, _ := u.Serialize(msg.MarshallingOptions...)
if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, true, info, path.GetTimestamp().Unix(), payload), msg.MarshallingOptions...); err != nil {
return false
}
}
}
} else if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, msg.FourBytesAs, info, msg.Timestamp.Unix(), msg.Payload)); err != nil {
} else if err := write(bmpPeerRoute(bmp.BMP_PEER_TYPE_GLOBAL, msg.PostPolicy, 0, msg.FourBytesAs, info, msg.Timestamp.Unix(), msg.Payload), msg.MarshallingOptions...); err != nil {
return false
}
case *watchEventBestPath:
Expand Down
14 changes: 7 additions & 7 deletions pkg/server/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type fsm struct {
peerInfo *table.PeerInfo
gracefulRestartTimer *time.Timer
twoByteAsTrans bool
marshallingOptions *bgp.MarshallingOption
marshallingOptions []*bgp.MarshallingOption
notification chan *bgp.BGPMessage
logger log.Logger
}
Expand Down Expand Up @@ -985,7 +985,7 @@ func (h *fsmHandler) recvMessageWithError() (*fsmMsg, error) {
options := h.fsm.marshallingOptions
h.fsm.lock.RUnlock()

m, err := bgp.ParseBGPBody(hd, bodyBuf, options)
m, err := bgp.ParseBGPBody(hd, bodyBuf, options...)
if err != nil {
handling = h.handlingError(m, err, useRevisedError)
h.fsm.bgpMessageStateUpdate(0, true)
Expand Down Expand Up @@ -1324,9 +1324,9 @@ func (h *fsmHandler) opensent(ctx context.Context) (bgp.FSMState, *fsmStateReaso
fsm.capMap, fsm.rfMap = open2Cap(body, fsm.pConf)

if _, y := fsm.capMap[bgp.BGP_CAP_ADD_PATH]; y {
fsm.marshallingOptions = &bgp.MarshallingOption{
fsm.marshallingOptions = []*bgp.MarshallingOption{{
AddPath: fsm.rfMap,
}
}}
} else {
fsm.marshallingOptions = nil
}
Expand Down Expand Up @@ -1616,7 +1616,7 @@ func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) er
table.UpdatePathAttrs2ByteAs(m.Body.(*bgp.BGPUpdate))
table.UpdatePathAggregator2ByteAs(m.Body.(*bgp.BGPUpdate))
}
b, err := m.Serialize(h.fsm.marshallingOptions)
b, err := m.Serialize(h.fsm.marshallingOptions...)
fsm.lock.RUnlock()
if err != nil {
fsm.lock.RLock()
Expand Down Expand Up @@ -1720,7 +1720,7 @@ func (h *fsmHandler) sendMessageloop(ctx context.Context, wg *sync.WaitGroup) er
h.fsm.lock.RLock()
options := h.fsm.marshallingOptions
h.fsm.lock.RUnlock()
for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options) {
for _, msg := range table.CreateUpdateMsgFromPaths(m.Paths, options...) {
if err := send(msg); err != nil {
return nil
}
Expand Down Expand Up @@ -1789,7 +1789,7 @@ func (h *fsmHandler) established(ctx context.Context) (bgp.FSMState, *fsmStateRe
case <-ctx.Done():
select {
case m := <-fsm.notification:
b, _ := m.Serialize(h.fsm.marshallingOptions)
b, _ := m.Serialize(h.fsm.marshallingOptions...)
h.conn.Write(b)
default:
// nothing to do
Expand Down
120 changes: 63 additions & 57 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,19 @@ func (s *BgpServer) notifyPrePolicyUpdateWatcher(peer *peer, pathList []*table.P
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watchEventUpdate{
Message: msg,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Timestamp: timestamp,
Payload: payload,
PostPolicy: false,
PathList: cloned,
Neighbor: n,
Message: msg,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Timestamp: timestamp,
Payload: payload,
PostPolicy: false,
PathList: cloned,
Neighbor: n,
MarshallingOptions: peer.fsm.marshallingOptions,
}
peer.fsm.lock.RUnlock()
s.notifyWatcher(watchEventTypePreUpdate, ev)
Expand All @@ -897,16 +898,17 @@ func (s *BgpServer) notifyPostPolicyUpdateWatcher(peer *peer, pathList []*table.
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
ev := &watchEventUpdate{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Timestamp: cloned[0].GetTimestamp(),
PostPolicy: true,
PathList: cloned,
Neighbor: n,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Timestamp: cloned[0].GetTimestamp(),
PostPolicy: true,
PathList: cloned,
Neighbor: n,
MarshallingOptions: peer.fsm.marshallingOptions,
}
peer.fsm.lock.RUnlock()
s.notifyWatcher(watchEventTypePostUpdate, ev)
Expand Down Expand Up @@ -4143,19 +4145,20 @@ type watchEvent interface {
}

type watchEventUpdate struct {
Message *bgp.BGPMessage
PeerAS uint32
LocalAS uint32
PeerAddress net.IP
LocalAddress net.IP
PeerID net.IP
FourBytesAs bool
Timestamp time.Time
Payload []byte
PostPolicy bool
Init bool
PathList []*table.Path
Neighbor *config.Neighbor
Message *bgp.BGPMessage
PeerAS uint32
LocalAS uint32
PeerAddress net.IP
LocalAddress net.IP
PeerID net.IP
FourBytesAs bool
Timestamp time.Time
Payload []byte
PostPolicy bool
Init bool
PathList []*table.Path
Neighbor *config.Neighbor
MarshallingOptions []*bgp.MarshallingOption
}

type PeerEventType uint32
Expand Down Expand Up @@ -4442,16 +4445,17 @@ func (s *BgpServer) watch(opts ...watchOption) (w *watcher) {
_, y := peer.fsm.capMap[bgp.BGP_CAP_FOUR_OCTET_AS_NUMBER]
l, _ := peer.fsm.LocalHostPort()
update := &watchEventUpdate{
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Init: true,
PostPolicy: false,
Neighbor: configNeighbor,
PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false),
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Init: true,
PostPolicy: false,
Neighbor: configNeighbor,
PathList: peer.adjRibIn.PathList([]bgp.RouteFamily{rf}, false),
MarshallingOptions: peer.fsm.marshallingOptions,
}
peer.fsm.lock.RUnlock()
w.notify(update)
Expand All @@ -4460,18 +4464,19 @@ func (s *BgpServer) watch(opts ...watchOption) (w *watcher) {
eorBuf, _ := eor.Serialize()
peer.fsm.lock.RLock()
update = &watchEventUpdate{
Message: eor,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Timestamp: time.Now(),
Init: true,
Payload: eorBuf,
PostPolicy: false,
Neighbor: configNeighbor,
Message: eor,
PeerAS: peer.fsm.peerInfo.AS,
LocalAS: peer.fsm.peerInfo.LocalAS,
PeerAddress: peer.fsm.peerInfo.Address,
LocalAddress: net.ParseIP(l),
PeerID: peer.fsm.peerInfo.ID,
FourBytesAs: y,
Timestamp: time.Now(),
Init: true,
Payload: eorBuf,
PostPolicy: false,
Neighbor: configNeighbor,
MarshallingOptions: peer.fsm.marshallingOptions,
}
peer.fsm.lock.RUnlock()
w.notify(update)
Expand Down Expand Up @@ -4506,6 +4511,7 @@ func (s *BgpServer) watch(opts ...watchOption) (w *watcher) {
Neighbor: configNeighbor,
PathList: paths,
Init: true,
// TODO: MarshallingOptions: peer.fsm.marshallingOptions,
})

eor := bgp.NewEndOfRib(rf)
Expand Down

0 comments on commit 0f273a9

Please sign in to comment.