diff --git a/daemon/src/config/validate.rs b/daemon/src/config/validate.rs index 1d04780..217dac4 100644 --- a/daemon/src/config/validate.rs +++ b/daemon/src/config/validate.rs @@ -379,7 +379,32 @@ impl TryFrom<&Statement> for api::Statement { med: None, as_prepend: None, ext_community: None, - nexthop: None, + nexthop: match a.bgp_actions.as_ref() { + Some(a) => { + if let Some(s) = a.set_next_hop.as_ref() { + match s.as_str() { + "self" => Some(api::NexthopAction { + self_: true, + unchanged: false, + address: String::new(), + }), + "unchanged" => Some(api::NexthopAction { + self_: false, + unchanged: true, + address: String::new(), + }), + _ => Some(api::NexthopAction { + self_: false, + unchanged: false, + address: s.to_string(), + }), + } + } else { + None + } + } + None => None, + }, local_pref: None, large_community: None, }); diff --git a/daemon/src/event.rs b/daemon/src/event.rs index 7f9b9df..e56d577 100644 --- a/daemon/src/event.rs +++ b/daemon/src/event.rs @@ -773,7 +773,7 @@ impl GrpcService { if attr.is_empty() { None } else { - Some(Arc::new(attr)) + Some(attr) } }, ), @@ -952,7 +952,7 @@ impl GobgpApi for GrpcService { request: tonic::Request, ) -> Result, tonic::Status> { if let Ok(peer_addr) = IpAddr::from_str(&request.into_inner().address) { - for (addr, mut p) in &mut GLOBAL.write().await.peers { + for (addr, p) in &mut GLOBAL.write().await.peers { if addr == &peer_addr { if p.admin_down { p.admin_down = false; @@ -981,7 +981,7 @@ impl GobgpApi for GrpcService { request: tonic::Request, ) -> Result, tonic::Status> { if let Ok(peer_addr) = IpAddr::from_str(&request.into_inner().address) { - for (addr, mut p) in &mut GLOBAL.write().await.peers { + for (addr, p) in &mut GLOBAL.write().await.peers { if addr == &peer_addr { if p.admin_down { return Err(tonic::Status::new( @@ -2413,7 +2413,7 @@ impl RpkiClient { .await { let (tx, rx) = mpsc::unbounded_channel(); - let state = if let Some(mut client) = + let state = if let Some(client) = GLOBAL.write().await.rpki_clients.get_mut(&sockaddr) { client.mgmt_tx = Some(tx); @@ -2425,7 +2425,7 @@ impl RpkiClient { } else { tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; } - if let Some(mut client) = GLOBAL.write().await.rpki_clients.get_mut(&sockaddr) { + if let Some(client) = GLOBAL.write().await.rpki_clients.get_mut(&sockaddr) { if client.configured_time != configured_time { break; } @@ -2977,7 +2977,7 @@ enum TableEvent { Arc, Family, Vec<(packet::Net, u32)>, - Option>>, + Option>, ), Disconnected(Arc), // RPKI events @@ -3011,7 +3011,7 @@ impl Table { if let Some(Some(msg)) = futures.next().await { match msg { TableEvent::PassUpdate(source, family, nets, attrs) => match attrs { - Some(attrs) => { + Some(mut attrs) => { let mut t = TABLE[idx].lock().await; for bmp_tx in t.bmp_event_tx.values() { let addpath = if let Some(e) = t.addpath.get(&source.remote_addr) { @@ -3063,7 +3063,7 @@ impl Table { for net in nets { let mut filtered = false; if let Some(a) = t.global_import_policy.as_ref() { - if t.rtable.apply_policy(a, &source, &net.0, &attrs) + if t.rtable.apply_policy(a, &source, &net.0, &mut attrs) == table::Disposition::Reject { filtered = true; @@ -3078,7 +3078,7 @@ impl Table { filtered, ) { if let Some(a) = t.global_export_policy.as_ref() { - if t.rtable.apply_policy(a, &source, &net.0, &attrs) + if t.rtable.apply_policy(a, &source, &net.0, &mut attrs) == table::Disposition::Reject { continue; @@ -3108,7 +3108,7 @@ impl Table { ), update: packet::bgp::Message::Update { reach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: Some((family, nets.to_owned())), }, addpath, @@ -3131,7 +3131,7 @@ impl Table { ), body: bgp::Message::Update { reach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: Some((family, nets.to_owned())), }, addpath, @@ -3148,7 +3148,7 @@ impl Table { a, &source, &net.0, - &Arc::new(Vec::new()), + &mut Vec::new(), ) == table::Disposition::Reject { continue; @@ -3320,7 +3320,7 @@ impl Handler { &mut self, reach: Option<(Family, Vec<(packet::Net, u32)>)>, unreach: Option<(Family, Vec<(packet::Net, u32)>)>, - attr: Arc>, + attr: Vec, ) { if let Some((family, reach)) = reach { for net in reach { @@ -3469,9 +3469,9 @@ impl Handler { for i in 0..*NUM_TABLES { let mut t = TABLE[i].lock().await; for f in codec.channel.keys() { - for c in t.rtable.best(f).into_iter() { + for mut c in t.rtable.best(f).into_iter() { if let Some(a) = t.global_export_policy.as_ref() { - if t.rtable.apply_policy(a, &c.source, &c.net, &c.attr) + if t.rtable.apply_policy(a, &c.source, &c.net, &mut c.attr) == table::Disposition::Reject { continue; @@ -3717,7 +3717,7 @@ impl Handler { txbuf = bytes::BytesMut::with_capacity(txbuf_size); let msg = bgp::Message::Update{ reach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: Some((*family, unreach)), }; let _ = codec.encode(&msg, &mut txbuf); @@ -3832,9 +3832,9 @@ impl Handler { #[derive(Default)] struct PendingTx { - reach: FnvHashMap>>, + reach: FnvHashMap>, unreach: FnvHashSet, - bucket: FnvHashMap>, FnvHashSet>, + bucket: FnvHashMap, FnvHashSet>, sync: bool, } @@ -3933,43 +3933,29 @@ fn bucket() { source: src.clone(), family, net: net1, - attr: Arc::new(attr1.clone()), + attr: attr1.clone(), }); pending.insert_change(table::Change { source: src.clone(), family: Family::IPV4, net: net2, - attr: Arc::new(vec![packet::Attribute::new_with_value( - packet::Attribute::ORIGIN, - 0, - ) - .unwrap()]), + attr: vec![packet::Attribute::new_with_value(packet::Attribute::ORIGIN, 0).unwrap()], }); // a-1) and a-2) properly marged? assert_eq!(1, pending.bucket.len()); - assert_eq!( - 2, - pending.bucket.get(&Arc::new(attr1.clone())).unwrap().len() - ); + assert_eq!(2, pending.bucket.get(&attr1.clone()).unwrap().len()); // b-1) pending.insert_change(table::Change { source: src.clone(), family, net: net2, - attr: Arc::new(vec![packet::Attribute::new_with_value( - packet::Attribute::ORIGIN, - 0, - ) - .unwrap()]), + attr: vec![packet::Attribute::new_with_value(packet::Attribute::ORIGIN, 0).unwrap()], }); assert_eq!(1, pending.bucket.len()); - assert_eq!( - 2, - pending.bucket.get(&Arc::new(attr1.clone())).unwrap().len() - ); + assert_eq!(2, pending.bucket.get(&attr1.clone()).unwrap().len()); // b-2-2) let attr2 = vec![packet::Attribute::new_with_value(packet::Attribute::ORIGIN, 1).unwrap()]; @@ -3977,16 +3963,9 @@ fn bucket() { source: src.clone(), family, net: net2, - attr: Arc::new(vec![packet::Attribute::new_with_value( - packet::Attribute::ORIGIN, - 1, - ) - .unwrap()]), + attr: vec![packet::Attribute::new_with_value(packet::Attribute::ORIGIN, 1).unwrap()], }); assert_eq!(2, pending.bucket.len()); - assert_eq!(&Arc::new(attr2), pending.reach.get(&net2).unwrap()); - assert_eq!( - 1, - pending.bucket.get(&Arc::new(attr1.clone())).unwrap().len() - ); + assert_eq!(&attr2, pending.reach.get(&net2).unwrap()); + assert_eq!(1, pending.bucket.get(&attr1.clone()).unwrap().len()); } diff --git a/daemon/src/packet/bgp.rs b/daemon/src/packet/bgp.rs index 9f27007..2ed4857 100644 --- a/daemon/src/packet/bgp.rs +++ b/daemon/src/packet/bgp.rs @@ -23,7 +23,6 @@ use std::convert::{Into, TryFrom}; use std::io::Cursor; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; -use std::sync::Arc; use std::{fmt, io}; use tokio_util::codec::{Decoder, Encoder}; @@ -1475,7 +1474,7 @@ pub(crate) enum Message { Update { reach: Option<(Family, Vec<(Net, u32)>)>, unreach: Option<(Family, Vec<(Net, u32)>)>, - attr: Arc>, + attr: Vec, }, Notification { code: u8, @@ -1504,13 +1503,13 @@ impl Message { if family == Family::IPV4 { Message::Update { reach: Some((Family::IPV4, Vec::new())), - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: None, } } else { Message::Update { reach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: Some((family, Vec::new())), } } @@ -1685,7 +1684,7 @@ impl Codec { fn mp_reach_encode( &self, buf_head: usize, - attrs: Arc>, + attrs: Vec, dst: &mut BytesMut, reach: &(Family, Vec<(Net, u32)>), reach_idx: &mut usize, @@ -1753,7 +1752,7 @@ impl Codec { fn mp_unreach_encode( &self, buf_head: usize, - _: Arc>, + _: Vec, dst: &mut BytesMut, unreach: &(Family, Vec<(Net, u32)>), unreach_idx: &mut usize, @@ -2263,7 +2262,7 @@ impl Decoder for Codec { return Ok(Some(Message::Update { reach: Some((Family::IPV4, Vec::new())), unreach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), })); } @@ -2424,7 +2423,7 @@ impl Decoder for Codec { } else { Some((reach_family, reach)) }, - attr: Arc::new(attr), + attr: attr, unreach: if unreach.is_empty() { None } else { @@ -2552,11 +2551,11 @@ fn build_many_v4_route() { let reach: Vec<(Net, u32)> = net.iter().cloned().map(|n| (n, 0)).collect(); let mut msg = Message::Update { reach: Some((Family::IPV4, reach)), - attr: Arc::new(vec![ + attr: vec![ Attribute::new_with_value(Attribute::ORIGIN, 0).unwrap(), Attribute::new_with_bin(Attribute::AS_PATH, vec![2, 1, 1, 0, 0, 0]).unwrap(), Attribute::new_with_bin(Attribute::NEXTHOP, vec![0, 0, 0, 0]).unwrap(), - ]), + ], unreach: None, }; let mut set = fnv::FnvHashSet::default(); @@ -2596,7 +2595,7 @@ fn build_many_v4_route() { let unreach = net.iter().cloned().map(|n| (n, 0)).collect(); msg = Message::Update { reach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: Some((Family::IPV4, unreach)), }; @@ -2647,11 +2646,11 @@ fn many_mp_reach() { let msg = Message::Update { reach: Some((Family::IPV6, reach)), - attr: Arc::new(vec![ + attr: vec![ Attribute::new_with_value(Attribute::ORIGIN, 0).unwrap(), Attribute::new_with_bin(Attribute::AS_PATH, vec![2, 1, 1, 0, 0, 0]).unwrap(), Attribute::new_with_bin(Attribute::NEXTHOP, (0..31).collect::>()).unwrap(), - ]), + ], unreach: None, }; @@ -2701,7 +2700,7 @@ fn many_mp_unreach() { let msg = Message::Update { reach: None, - attr: Arc::new(Vec::new()), + attr: Vec::new(), unreach: Some((Family::IPV6, unreach)), }; diff --git a/daemon/src/table.rs b/daemon/src/table.rs index 18acf86..ddfebdf 100644 --- a/daemon/src/table.rs +++ b/daemon/src/table.rs @@ -34,11 +34,11 @@ use crate::error::Error; use crate::packet::{self, bgp, Attribute, Family}; struct PathAttribute { - attr: Arc>, + attr: Vec, } impl PathAttribute { - fn new(attr: Arc>) -> Self { + fn new(attr: Vec) -> Self { PathAttribute { attr } } @@ -152,7 +152,7 @@ pub(crate) struct Reach { pub(crate) source: Arc, pub(crate) family: Family, pub(crate) net: (packet::Net, u32), - pub(crate) attr: Arc>, + pub(crate) attr: Vec, } impl From for bgp::Message { @@ -170,7 +170,7 @@ pub(crate) struct Change { pub(crate) source: Arc, pub(crate) family: Family, pub(crate) net: packet::Net, - pub(crate) attr: Arc>, + pub(crate) attr: Vec, } impl From for bgp::Message { @@ -357,7 +357,7 @@ impl RoutingTable { .keep_aspath(p.source.rs_client) .keep_nexthop(p.source.rs_client) .build(); - let attr = Arc::new( + let mut attr = p.pa.attr .iter() .cloned() @@ -369,10 +369,9 @@ impl RoutingTable { a } }) - .collect::>(), - ); + .collect::>(); if let Some(pa) = &export_policy { - if self.apply_policy(pa, &p.source, net, &attr) + if self.apply_policy(pa, &p.source, net, &mut attr) == Disposition::Reject { None @@ -410,7 +409,7 @@ impl RoutingTable { family: Family, net: packet::Net, remote_id: u32, - attr: Arc>, + attr: Vec, filtered: bool, ) -> Option { let mut replaced = false; @@ -544,7 +543,7 @@ impl RoutingTable { source: source.clone(), family, net, - attr: Arc::new(Vec::new()), + attr: Vec::new(), }); } if was_best { @@ -591,7 +590,7 @@ impl RoutingTable { source: source.clone(), family: *family, net: *net, - attr: Arc::new(Vec::new()), + attr: Vec::new(), }); } !dst.entry.is_empty() @@ -696,7 +695,7 @@ impl RoutingTable { assignment: &PolicyAssignment, source: &Arc, net: &packet::Net, - attr: &Arc>, + attr: &mut Vec, ) -> Disposition { assignment.apply(&self.rpki, source, net, attr) } @@ -738,7 +737,7 @@ fn drop() { let mut rt = RoutingTable::new(); let family = Family::IPV4; - let attrs = Arc::new(Vec::new()); + let attrs = Vec::new(); rt.insert(s1.clone(), family, n1, 0, attrs.clone(), false); rt.insert(s2, family, n1, 0, attrs.clone(), false); @@ -1076,7 +1075,7 @@ impl Condition { &self, source: &Arc, net: &packet::Net, - attr: &Arc>, + attr: &Vec, ) -> bool { match self { Condition::Prefix(_name, opt, set) => { @@ -1100,7 +1099,7 @@ impl Condition { Condition::AsPath(_name, opt, set) => { if let Some(a) = attr.iter().find(|a| a.code() == packet::Attribute::AS_PATH) { for set in &set.single_sets { - if set.is_match(a) { + if set.is_match(&a) { return *opt == MatchOption::Any; } } @@ -1163,12 +1162,47 @@ impl From for i32 { } } +pub trait Action { + //TODO(Kuroame): Should we return some kind of error? + fn apply(&self, source: &Arc, net: &packet::Net, attr: &mut Vec); +} + +#[derive(PartialEq, Clone, Copy, Debug)] +pub(crate) enum NextHopAction { + Address(IpAddr), + Self_(bool), + Unchanged(bool), +} + +impl Action for NextHopAction { + fn apply(&self, source: &Arc, _net: &packet::Net, attr: &mut Vec) { + match self { + NextHopAction::Address(addr) => { + attr.iter_mut().for_each(|a| { + if a.code() == packet::Attribute::NEXTHOP { + *a = a.nexthop_update(addr.clone()); + } + }); + } + NextHopAction::Self_(_) => { + attr.clone().iter_mut().for_each(|a| { + if a.code() == packet::Attribute::NEXTHOP { + *a = a.nexthop_update(source.local_addr.clone()); + } + }); + } + NextHopAction::Unchanged(_) => {} + } + } +} + #[derive(Clone)] pub(crate) struct Statement { name: Arc, // ALL the conditions are matched, the action will be executed. conditions: Vec, disposition: Option, + mod_actions: Vec>, // pub route_action: Action, } @@ -1177,7 +1211,7 @@ impl Statement { &self, source: &Arc, net: &packet::Net, - attr: &Arc>, + attr: &mut Vec, ) -> Disposition { let mut result = true; // if any in the conditions returns false, this statement becomes false. @@ -1187,11 +1221,16 @@ impl Statement { break; } } + if !result { + return Disposition::Pass; + } - if result { - if let Some(disposition) = &self.disposition { - return *disposition; - } + for action in &self.mod_actions { + action.apply(source, net, attr); + } + + if let Some(disposition) = &self.disposition { + return *disposition; } Disposition::Pass } @@ -1343,7 +1382,7 @@ impl Policy { &self, source: &Arc, net: &packet::Net, - attr: &Arc>, + attr: &mut Vec, ) -> Disposition { for statement in &self.statements { let r = statement.apply(source, net, attr); @@ -1376,7 +1415,7 @@ impl PolicyAssignment { _rpki: &RpkiTable, source: &Arc, net: &packet::Net, - attr: &Arc>, + attr: &mut Vec, ) -> Disposition { for policy in &self.policies { let r = policy.apply(source, net, attr); @@ -1759,21 +1798,46 @@ impl PolicyTable { } } } + let mut disposition = None; + let mut mod_actions: Vec< + std::sync::Arc<(dyn Action + std::marker::Send + Sync + 'static)>, + > = Vec::new(); + if let Some(actions) = actions { - match api::RouteAction::from_i32(actions.route_action) { - Some(a) => match a { + if let Some(a) = api::RouteAction::from_i32(actions.route_action) { + match a { api::RouteAction::Accept => disposition = Some(Disposition::Accept), api::RouteAction::Reject => disposition = Some(Disposition::Reject), _ => {} - }, - None => return Err(Error::InvalidArgument("invalid action".to_string())), + } + } else { + return Err(Error::InvalidArgument("invalid route action".to_string())); + } + + if let Some(a) = actions.nexthop { + if a.unchanged { + mod_actions.push(Arc::new(NextHopAction::Unchanged(true))); + } else if a.self_ { + mod_actions.push(Arc::new(NextHopAction::Self_(true))); + } else { + match IpAddr::from_str(&a.address) { + Ok(addr) => mod_actions.push(Arc::new(NextHopAction::Address(addr))), + Err(_) => { + return Err(Error::InvalidArgument( + "invalid nexthop action".to_string(), + )) + } + } + } } } + let s = Statement { name: Arc::from(name), conditions: v, disposition, + mod_actions, }; self.statements.insert(s.name.clone(), Arc::new(s)); Ok(()) @@ -1879,7 +1943,7 @@ impl RpkiTable { family: Family, source: &Arc, net: &packet::Net, - attr: &Arc>, + attr: &Vec, ) -> Option { match self.roas.get(&family) { None => None,