summaryrefslogtreecommitdiffstats
path: root/src/netlink.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/netlink.rs')
-rw-r--r--src/netlink.rs46
1 files changed, 16 insertions, 30 deletions
diff --git a/src/netlink.rs b/src/netlink.rs
index 550f84e..feaac90 100644
--- a/src/netlink.rs
+++ b/src/netlink.rs
@@ -1,4 +1,6 @@
-use futures::{stream::TryStreamExt, FutureExt};
+use std::net::IpAddr;
+
+use futures::stream::TryStreamExt;
use netlink_packet_route::{
neighbour::{NeighbourAddress, NeighbourAttribute, NeighbourMessage},
route::RouteType,
@@ -6,11 +8,11 @@ use netlink_packet_route::{
use rtnetlink::{new_connection, IpVersion};
use tokio::sync::mpsc::Sender;
-use crate::Host;
+use crate::{Error, Result};
pub struct Netlink;
-pub async fn neighbours(tx: Sender<Host>) -> Result<(), rtnetlink::Error> {
+pub async fn neighbours(tx: Sender<IpAddr>) -> Result<()> {
let (connection, handle, _) = new_connection().unwrap();
tokio::spawn(connection);
@@ -19,39 +21,23 @@ pub async fn neighbours(tx: Sender<Host>) -> Result<(), rtnetlink::Error> {
.get()
.set_family(IpVersion::V4)
.execute()
- .try_filter_map(|r| async move { Ok(filter(r)) })
- .try_for_each(|host| tx.send(host).then(|_| async { Ok(()) }))
+ .or_else(|res| async { Err(Error::from(res)) })
+ .try_filter_map(|msg| async { Ok(filter(msg)) })
+ .try_for_each(|host| {
+ let tx = tx.clone();
+ async move { tx.send(host).await.map_err(Error::from) }
+ })
.await
}
-pub fn filter(route: NeighbourMessage) -> Option<Host> {
- if route.header.kind != RouteType::Unicast {
+pub fn filter(msg: NeighbourMessage) -> Option<IpAddr> {
+ if msg.header.kind != RouteType::Unicast {
return None;
}
- route.attributes.into_iter().find_map(|attr| match attr {
- NeighbourAttribute::Destination(NeighbourAddress::Inet(ip)) => Some(Host::from(ip)),
- NeighbourAttribute::Destination(NeighbourAddress::Inet6(ip)) => Some(Host::from(ip)),
+ msg.attributes.into_iter().find_map(|attr| match attr {
+ NeighbourAttribute::Destination(NeighbourAddress::Inet(ip)) => Some(ip.into()),
+ NeighbourAttribute::Destination(NeighbourAddress::Inet6(ip)) => Some(ip.into()),
_ => None,
})
}
-
-#[cfg(test)]
-mod tests {
- use tokio::sync::mpsc;
-
- use super::*;
-
- #[tokio::test]
- async fn test_dump_neighbours() -> Result<(), ()> {
- let (tx, mut rx) = mpsc::channel::<Host>(100);
-
- tokio::spawn(neighbours(tx));
-
- while let Some(res) = rx.recv().await {
- println!("{res}");
- }
-
- Ok(())
- }
-}