Source code

Revision control

Copy as Markdown

Other Tools

use crate::Stream;
use std::borrow::Borrow;
use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Combine many streams into one, indexing each source stream with a unique
/// key.
///
/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
/// streams into a single merged stream that yields values in the order that
/// they arrive from the source streams. However, `StreamMap` has a lot more
/// flexibility in usage patterns.
///
/// `StreamMap` can:
///
/// * Merge an arbitrary number of streams.
/// * Track which source stream the value was received from.
/// * Handle inserting and removing streams from the set of managed streams at
/// any point during iteration.
///
/// All source streams held by `StreamMap` are indexed using a key. This key is
/// included with the value when a source stream yields a value. The key is also
/// used to remove the stream from the `StreamMap` before the stream has
/// completed streaming.
///
/// # `Unpin`
///
/// Because the `StreamMap` API moves streams during runtime, both streams and
/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
/// pin the stream in the heap.
///
/// # Implementation
///
/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
/// internal implementation detail will persist in future versions, but it is
/// important to know the runtime implications. In general, `StreamMap` works
/// best with a "smallish" number of streams as all entries are scanned on
/// insert, remove, and polling. In cases where a large number of streams need
/// to be merged, it may be advisable to use tasks sending values on a shared
/// [`mpsc`] channel.
///
/// [`StreamExt::merge`]: crate::StreamExt::merge
/// [`Box::pin`]: std::boxed::Box::pin
///
/// # Examples
///
/// Merging two streams, then remove them after receiving the first value
///
/// ```
/// use tokio_stream::{StreamExt, StreamMap, Stream};
/// use tokio::sync::mpsc;
/// use std::pin::Pin;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
///
/// // Convert the channels to a `Stream`.
/// let rx1 = Box::pin(async_stream::stream! {
/// while let Some(item) = rx1.recv().await {
/// yield item;
/// }
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
///
/// let rx2 = Box::pin(async_stream::stream! {
/// while let Some(item) = rx2.recv().await {
/// yield item;
/// }
/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
///
/// tokio::spawn(async move {
/// tx1.send(1).await.unwrap();
///
/// // This value will never be received. The send may or may not return
/// // `Err` depending on if the remote end closed first or not.
/// let _ = tx1.send(2).await;
/// });
///
/// tokio::spawn(async move {
/// tx2.send(3).await.unwrap();
/// let _ = tx2.send(4).await;
/// });
///
/// let mut map = StreamMap::new();
///
/// // Insert both streams
/// map.insert("one", rx1);
/// map.insert("two", rx2);
///
/// // Read twice
/// for _ in 0..2 {
/// let (key, val) = map.next().await.unwrap();
///
/// if key == "one" {
/// assert_eq!(val, 1);
/// } else {
/// assert_eq!(val, 3);
/// }
///
/// // Remove the stream to prevent reading the next value
/// map.remove(key);
/// }
/// }
/// ```
///
/// This example models a read-only client to a chat system with channels. The
/// client sends commands to join and leave channels. `StreamMap` is used to
/// manage active channel subscriptions.
///
/// For simplicity, messages are displayed with `println!`, but they could be
/// sent to the client over a socket.
///
/// ```no_run
/// use tokio_stream::{Stream, StreamExt, StreamMap};
///
/// enum Command {
/// Join(String),
/// Leave(String),
/// }
///
/// fn commands() -> impl Stream<Item = Command> {
/// // Streams in user commands by parsing `stdin`.
/// # tokio_stream::pending()
/// }
///
/// // Join a channel, returns a stream of messages received on the channel.
/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
/// // left as an exercise to the reader
/// # tokio_stream::pending()
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let mut channels = StreamMap::new();
///
/// // Input commands (join / leave channels).
/// let cmds = commands();
/// tokio::pin!(cmds);
///
/// loop {
/// tokio::select! {
/// Some(cmd) = cmds.next() => {
/// match cmd {
/// Command::Join(chan) => {
/// // Join the channel and add it to the `channels`
/// // stream map
/// let msgs = join(&chan);
/// channels.insert(chan, msgs);
/// }
/// Command::Leave(chan) => {
/// channels.remove(&chan);
/// }
/// }
/// }
/// Some((chan, msg)) = channels.next() => {
/// // Received a message, display it on stdout with the channel
/// // it originated from.
/// println!("{}: {}", chan, msg);
/// }
/// // Both the `commands` stream and the `channels` stream are
/// // complete. There is no more work to do, so leave the loop.
/// else => break,
/// }
/// }
/// }
/// ```
#[derive(Debug)]
pub struct StreamMap<K, V> {
/// Streams stored in the map
entries: Vec<(K, V)>,
}
impl<K, V> StreamMap<K, V> {
/// An iterator visiting all key-value pairs in arbitrary order.
///
/// The iterator element type is &'a (K, V).
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
///
/// map.insert("a", pending::<i32>());
/// map.insert("b", pending());
/// map.insert("c", pending());
///
/// for (key, stream) in map.iter() {
/// println!("({}, {:?})", key, stream);
/// }
/// ```
pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
self.entries.iter()
}
/// An iterator visiting all key-value pairs mutably in arbitrary order.
///
/// The iterator element type is &'a mut (K, V).
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
///
/// map.insert("a", pending::<i32>());
/// map.insert("b", pending());
/// map.insert("c", pending());
///
/// for (key, stream) in map.iter_mut() {
/// println!("({}, {:?})", key, stream);
/// }
/// ```
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
self.entries.iter_mut()
}
/// Creates an empty `StreamMap`.
///
/// The stream map is initially created with a capacity of `0`, so it will
/// not allocate until it is first inserted into.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, Pending};
///
/// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
/// ```
pub fn new() -> StreamMap<K, V> {
StreamMap { entries: vec![] }
}
/// Creates an empty `StreamMap` with the specified capacity.
///
/// The stream map will be able to hold at least `capacity` elements without
/// reallocating. If `capacity` is 0, the stream map will not allocate.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, Pending};
///
/// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
/// ```
pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
StreamMap {
entries: Vec::with_capacity(capacity),
}
}
/// Returns an iterator visiting all keys in arbitrary order.
///
/// The iterator element type is &'a K.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
///
/// map.insert("a", pending::<i32>());
/// map.insert("b", pending());
/// map.insert("c", pending());
///
/// for key in map.keys() {
/// println!("{}", key);
/// }
/// ```
pub fn keys(&self) -> impl Iterator<Item = &K> {
self.iter().map(|(k, _)| k)
}
/// An iterator visiting all values in arbitrary order.
///
/// The iterator element type is &'a V.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
///
/// map.insert("a", pending::<i32>());
/// map.insert("b", pending());
/// map.insert("c", pending());
///
/// for stream in map.values() {
/// println!("{:?}", stream);
/// }
/// ```
pub fn values(&self) -> impl Iterator<Item = &V> {
self.iter().map(|(_, v)| v)
}
/// An iterator visiting all values mutably in arbitrary order.
///
/// The iterator element type is &'a mut V.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
///
/// map.insert("a", pending::<i32>());
/// map.insert("b", pending());
/// map.insert("c", pending());
///
/// for stream in map.values_mut() {
/// println!("{:?}", stream);
/// }
/// ```
pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
self.iter_mut().map(|(_, v)| v)
}
/// Returns the number of streams the map can hold without reallocating.
///
/// This number is a lower bound; the `StreamMap` might be able to hold
/// more, but is guaranteed to be able to hold at least this many.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, Pending};
///
/// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
/// assert!(map.capacity() >= 100);
/// ```
pub fn capacity(&self) -> usize {
self.entries.capacity()
}
/// Returns the number of streams in the map.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut a = StreamMap::new();
/// assert_eq!(a.len(), 0);
/// a.insert(1, pending::<i32>());
/// assert_eq!(a.len(), 1);
/// ```
pub fn len(&self) -> usize {
self.entries.len()
}
/// Returns `true` if the map contains no elements.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut a = StreamMap::new();
/// assert!(a.is_empty());
/// a.insert(1, pending::<i32>());
/// assert!(!a.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Clears the map, removing all key-stream pairs. Keeps the allocated
/// memory for reuse.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut a = StreamMap::new();
/// a.insert(1, pending::<i32>());
/// a.clear();
/// assert!(a.is_empty());
/// ```
pub fn clear(&mut self) {
self.entries.clear();
}
/// Insert a key-stream pair into the map.
///
/// If the map did not have this key present, `None` is returned.
///
/// If the map did have this key present, the new `stream` replaces the old
/// one and the old stream is returned.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
///
/// assert!(map.insert(37, pending::<i32>()).is_none());
/// assert!(!map.is_empty());
///
/// map.insert(37, pending());
/// assert!(map.insert(37, pending()).is_some());
/// ```
pub fn insert(&mut self, k: K, stream: V) -> Option<V>
where
K: Hash + Eq,
{
let ret = self.remove(&k);
self.entries.push((k, stream));
ret
}
/// Removes a key from the map, returning the stream at the key if the key was previously in the map.
///
/// The key may be any borrowed form of the map's key type, but `Hash` and
/// `Eq` on the borrowed form must match those for the key type.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
/// map.insert(1, pending::<i32>());
/// assert!(map.remove(&1).is_some());
/// assert!(map.remove(&1).is_none());
/// ```
pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
for i in 0..self.entries.len() {
if self.entries[i].0.borrow() == k {
return Some(self.entries.swap_remove(i).1);
}
}
None
}
/// Returns `true` if the map contains a stream for the specified key.
///
/// The key may be any borrowed form of the map's key type, but `Hash` and
/// `Eq` on the borrowed form must match those for the key type.
///
/// # Examples
///
/// ```
/// use tokio_stream::{StreamMap, pending};
///
/// let mut map = StreamMap::new();
/// map.insert(1, pending::<i32>());
/// assert_eq!(map.contains_key(&1), true);
/// assert_eq!(map.contains_key(&2), false);
/// ```
pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq,
{
for i in 0..self.entries.len() {
if self.entries[i].0.borrow() == k {
return true;
}
}
false
}
}
impl<K, V> StreamMap<K, V>
where
K: Unpin,
V: Stream + Unpin,
{
/// Polls the next value, includes the vec entry index
fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
use Poll::*;
let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
let mut idx = start;
for _ in 0..self.entries.len() {
let (_, stream) = &mut self.entries[idx];
match Pin::new(stream).poll_next(cx) {
Ready(Some(val)) => return Ready(Some((idx, val))),
Ready(None) => {
// Remove the entry
self.entries.swap_remove(idx);
// Check if this was the last entry, if so the cursor needs
// to wrap
if idx == self.entries.len() {
idx = 0;
} else if idx < start && start <= self.entries.len() {
// The stream being swapped into the current index has
// already been polled, so skip it.
idx = idx.wrapping_add(1) % self.entries.len();
}
}
Pending => {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
}
// If the map is empty, then the stream is complete.
if self.entries.is_empty() {
Ready(None)
} else {
Pending
}
}
}
impl<K, V> Default for StreamMap<K, V> {
fn default() -> Self {
Self::new()
}
}
impl<K, V> Stream for StreamMap<K, V>
where
K: Clone + Unpin,
V: Stream + Unpin,
{
type Item = (K, V::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
let key = self.entries[idx].0.clone();
Poll::Ready(Some((key, val)))
} else {
Poll::Ready(None)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let mut ret = (0, Some(0));
for (_, stream) in &self.entries {
let hint = stream.size_hint();
ret.0 += hint.0;
match (ret.1, hint.1) {
(Some(a), Some(b)) => ret.1 = Some(a + b),
(Some(_), None) => ret.1 = None,
_ => {}
}
}
ret
}
}
impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
where
K: Hash + Eq,
{
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
let iterator = iter.into_iter();
let (lower_bound, _) = iterator.size_hint();
let mut stream_map = Self::with_capacity(lower_bound);
for (key, value) in iterator {
stream_map.insert(key, value);
}
stream_map
}
}
impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
fn extend<T>(&mut self, iter: T)
where
T: IntoIterator<Item = (K, V)>,
{
self.entries.extend(iter);
}
}
mod rand {
use std::cell::Cell;
mod loom {
#[cfg(not(loom))]
pub(crate) mod rand {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::Relaxed;
static COUNTER: AtomicU32 = AtomicU32::new(1);
pub(crate) fn seed() -> u64 {
let rand_state = RandomState::new();
let mut hasher = rand_state.build_hasher();
// Hash some unique-ish data to generate some new state
COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
// Get the seed
hasher.finish()
}
}
#[cfg(loom)]
pub(crate) mod rand {
pub(crate) fn seed() -> u64 {
1
}
}
}
/// Fast random number generate
///
/// Implement xorshift64+: 2 32-bit xorshift sequences added together.
/// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
/// This generator passes the SmallCrush suite, part of TestU01 framework:
#[derive(Debug)]
pub(crate) struct FastRand {
one: Cell<u32>,
two: Cell<u32>,
}
impl FastRand {
/// Initialize a new, thread-local, fast random number generator.
pub(crate) fn new(seed: u64) -> FastRand {
let one = (seed >> 32) as u32;
let mut two = seed as u32;
if two == 0 {
// This value cannot be zero
two = 1;
}
FastRand {
one: Cell::new(one),
two: Cell::new(two),
}
}
pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
// This is similar to fastrand() % n, but faster.
let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
(mul >> 32) as u32
}
fn fastrand(&self) -> u32 {
let mut s1 = self.one.get();
let s0 = self.two.get();
s1 ^= s1 << 17;
s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
self.one.set(s0);
self.two.set(s1);
s0.wrapping_add(s1)
}
}
// Used by `StreamMap`
pub(crate) fn thread_rng_n(n: u32) -> u32 {
thread_local! {
static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
}
THREAD_RNG.with(|rng| rng.fastrand_n(n))
}
}