use futures::{stream::TryStreamExt, FutureExt}; use netlink_packet_route::{ neighbour::{NeighbourAddress, NeighbourAttribute, NeighbourMessage}, route::RouteType, }; use rtnetlink::{new_connection, IpVersion}; use tokio::sync::mpsc::Sender; use crate::Host; pub struct Netlink; pub async fn neighbours(tx: Sender) -> Result<(), rtnetlink::Error> { let (connection, handle, _) = new_connection().unwrap(); tokio::spawn(connection); handle .neighbours() .get() .set_family(IpVersion::V4) .execute() .try_filter_map(|r| async move { Ok(filter(r)) }) .try_for_each(|host| tx.send(host).then(|_| async { Ok(()) })) .await } pub fn filter(route: NeighbourMessage) -> Option { if route.header.kind != RouteType::Unicast { return None; } route.attributes.into_iter().find_map(|attr| match attr { NeighbourAttribute::Destination(NeighbourAddress::Inet(ip)) => Some(Host::from(ip)), NeighbourAttribute::Destination(NeighbourAddress::Inet6(ip)) => Some(Host::from(ip)), _ => None, }) } #[cfg(test)] mod tests { use tokio::sync::mpsc; use super::*; #[tokio::test] async fn test_dump_neighbours() -> Result<(), ()> { let (tx, mut rx) = mpsc::channel::(100); tokio::spawn(neighbours(tx)); while let Some(res) = rx.recv().await { println!("{res}"); } Ok(()) } }