diff options
Diffstat (limited to 'src/netlink.rs')
-rw-r--r-- | src/netlink.rs | 46 |
1 files changed, 16 insertions, 30 deletions
diff --git a/src/netlink.rs b/src/netlink.rs index 550f84e..feaac90 100644 --- a/src/netlink.rs +++ b/src/netlink.rs @@ -1,4 +1,6 @@ -use futures::{stream::TryStreamExt, FutureExt}; +use std::net::IpAddr; + +use futures::stream::TryStreamExt; use netlink_packet_route::{ neighbour::{NeighbourAddress, NeighbourAttribute, NeighbourMessage}, route::RouteType, @@ -6,11 +8,11 @@ use netlink_packet_route::{ use rtnetlink::{new_connection, IpVersion}; use tokio::sync::mpsc::Sender; -use crate::Host; +use crate::{Error, Result}; pub struct Netlink; -pub async fn neighbours(tx: Sender<Host>) -> Result<(), rtnetlink::Error> { +pub async fn neighbours(tx: Sender<IpAddr>) -> Result<()> { let (connection, handle, _) = new_connection().unwrap(); tokio::spawn(connection); @@ -19,39 +21,23 @@ pub async fn neighbours(tx: Sender<Host>) -> Result<(), rtnetlink::Error> { .get() .set_family(IpVersion::V4) .execute() - .try_filter_map(|r| async move { Ok(filter(r)) }) - .try_for_each(|host| tx.send(host).then(|_| async { Ok(()) })) + .or_else(|res| async { Err(Error::from(res)) }) + .try_filter_map(|msg| async { Ok(filter(msg)) }) + .try_for_each(|host| { + let tx = tx.clone(); + async move { tx.send(host).await.map_err(Error::from) } + }) .await } -pub fn filter(route: NeighbourMessage) -> Option<Host> { - if route.header.kind != RouteType::Unicast { +pub fn filter(msg: NeighbourMessage) -> Option<IpAddr> { + if msg.header.kind != RouteType::Unicast { return None; } - route.attributes.into_iter().find_map(|attr| match attr { - NeighbourAttribute::Destination(NeighbourAddress::Inet(ip)) => Some(Host::from(ip)), - NeighbourAttribute::Destination(NeighbourAddress::Inet6(ip)) => Some(Host::from(ip)), + msg.attributes.into_iter().find_map(|attr| match attr { + NeighbourAttribute::Destination(NeighbourAddress::Inet(ip)) => Some(ip.into()), + NeighbourAttribute::Destination(NeighbourAddress::Inet6(ip)) => Some(ip.into()), _ => None, }) } - -#[cfg(test)] -mod tests { - use tokio::sync::mpsc; - - use super::*; - - #[tokio::test] - async fn test_dump_neighbours() -> Result<(), ()> { - let (tx, mut rx) = mpsc::channel::<Host>(100); - - tokio::spawn(neighbours(tx)); - - while let Some(res) = rx.recv().await { - println!("{res}"); - } - - Ok(()) - } -} |