Source code

Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;
/// Ensure that UDP sockets have functional budgeting
///
/// # Design
/// Two sockets communicate by spamming packets from one to the other.
///
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
/// send system call because we are using the loopback interface.
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
/// entirety of the lifecycle of a packet within the kernel network stack.
///
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
/// is through budgeting.
///
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
/// and there are two budget events per packet, a send and a recv.
#[tokio::test]
async fn coop_budget_udp_send_recv() {
const BUDGET: usize = 128;
const N_ITERATIONS: usize = 1024;
const PACKET: &[u8] = b"Hello, world";
const PACKET_LEN: usize = 12;
assert_eq!(
PACKET_LEN,
PACKET.len(),
"Defect in test, programmer can't do math"
);
// bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
tx.connect(rx.local_addr().unwrap()).await.unwrap();
rx.connect(tx.local_addr().unwrap()).await.unwrap();
let tracker = Arc::new(AtomicUsize::default());
let tracker_clone = Arc::clone(&tracker);
tokio::task::yield_now().await;
tokio::spawn(async move {
loop {
tracker_clone.fetch_add(1, Ordering::SeqCst);
tokio::task::yield_now().await;
}
});
for _ in 0..N_ITERATIONS {
tx.send(PACKET).await.unwrap();
let mut tmp = [0; PACKET_LEN];
// ensure that we aren't somehow accumulating other
assert_eq!(
PACKET_LEN,
rx.recv(&mut tmp).await.unwrap(),
"Defect in test case, received unexpected result from socket"
);
assert_eq!(
PACKET, &tmp,
"Defect in test case, received unexpected result from socket"
);
}
assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}