Revision control
Copy as Markdown
Other Tools
use std::{
cmp::min,
io::{ErrorKind, IoSliceMut},
net::{Ipv4Addr, Ipv6Addr, UdpSocket},
};
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::{io::Interest, runtime::Runtime};
use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE};
pub fn criterion_benchmark(c: &mut Criterion) {
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
const SEGMENT_SIZE: usize = 1280;
let rt = Runtime::new().unwrap();
let _guard = rt.enter();
let (send_state, send_socket) = new_socket();
let (recv_state, recv_socket) = new_socket();
let dst_addr = recv_socket.local_addr().unwrap();
let mut permutations = vec![];
for gso_enabled in [
false,
#[cfg(any(target_os = "linux", target_os = "windows", apple))]
true,
] {
for gro_enabled in [false, true] {
#[cfg(target_os = "windows")]
if gso_enabled && !gro_enabled {
// Windows requires receive buffer to fit entire datagram on GRO
// enabled socket.
//
// OS error: "A message sent on a datagram socket was larger
// than the internal message buffer or some other network limit,
// or the buffer used to receive a datagram into was smaller
// than the datagram itself."
continue;
}
for recvmmsg_enabled in [false, true] {
permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled));
}
}
}
for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations {
let mut group = c.benchmark_group(format!(
"gso_{}_gro_{}_recvmmsg_{}",
gso_enabled, gro_enabled, recvmmsg_enabled
));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));
let gso_segments = if gso_enabled {
send_state.max_gso_segments()
} else {
1
};
let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)];
let transmit = Transmit {
destination: dst_addr,
ecn: None,
contents: &msg,
segment_size: gso_enabled.then_some(SEGMENT_SIZE),
src_ip: None,
};
let gro_segments = if gro_enabled {
recv_state.gro_segments()
} else {
1
};
let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 };
group.bench_function("throughput", |b| {
b.to_async(&rt).iter(|| async {
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size];
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); batch_size];
let mut sent: usize = 0;
let mut received: usize = 0;
while sent < TOTAL_BYTES {
send_socket.writable().await.unwrap();
send_socket
.try_io(Interest::WRITABLE, || {
send_state.send((&send_socket).into(), &transmit)
})
.unwrap();
sent += transmit.contents.len();
while received < sent {
recv_socket.readable().await.unwrap();
let n = match recv_socket.try_io(Interest::READABLE, || {
recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta)
}) {
Ok(n) => n,
// recv.readable() can lead to false positives. Try again.
Err(e) if e.kind() == ErrorKind::WouldBlock => continue,
e => e.unwrap(),
};
received += meta.iter().map(|m| m.len).take(n).sum::<usize>();
}
}
})
});
}
}
fn new_socket() -> (UdpSocketState, tokio::net::UdpSocket) {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();
(
UdpSocketState::new((&socket).into()).unwrap(),
tokio::net::UdpSocket::from_std(socket).unwrap(),
)
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
const MAX_IP_UDP_HEADER_SIZE: usize = 48;
const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE;