Revision control

Copy as Markdown

Other Tools

use super::plumbing::*;
use super::*;
struct BlocksCallback<S, C> {
sizes: S,
consumer: C,
len: usize,
}
impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
where
C: UnindexedConsumer<T>,
S: Iterator<Item = usize>,
{
type Output = C::Result;
fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
let mut remaining_len = self.len;
let mut consumer = self.consumer;
// we need a local variable for the accumulated results
// we call the reducer's identity by splitting at 0
let (left_consumer, right_consumer, _) = consumer.split_at(0);
let mut leftmost_res = left_consumer.into_folder().complete();
consumer = right_consumer;
// now we loop on each block size
while remaining_len > 0 && !consumer.full() {
// we compute the next block's size
let size = self.sizes.next().unwrap_or(std::usize::MAX);
let capped_size = remaining_len.min(size);
remaining_len -= capped_size;
// split the producer
let (left_producer, right_producer) = producer.split_at(capped_size);
producer = right_producer;
// split the consumer
let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
consumer = right_consumer;
leftmost_res = consumer.to_reducer().reduce(
leftmost_res,
bridge_producer_consumer(capped_size, left_producer, left_consumer),
);
}
leftmost_res
}
}
/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence
/// of parallel blocks of increasing sizes (exponentially).
///
/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`]
///
/// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct ExponentialBlocks<I> {
base: I,
}
impl<I> ExponentialBlocks<I> {
pub(super) fn new(base: I) -> Self {
Self { base }
}
}
impl<I> ParallelIterator for ExponentialBlocks<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let first = crate::current_num_threads();
let callback = BlocksCallback {
consumer,
sizes: std::iter::successors(Some(first), exponential_size),
len: self.base.len(),
};
self.base.with_producer(callback)
}
}
fn exponential_size(size: &usize) -> Option<usize> {
Some(size.saturating_mul(2))
}
/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
/// of parallel blocks of constant sizes.
///
/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`]
///
/// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks
/// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
#[derive(Debug, Clone)]
pub struct UniformBlocks<I> {
base: I,
block_size: usize,
}
impl<I> UniformBlocks<I> {
pub(super) fn new(base: I, block_size: usize) -> Self {
Self { base, block_size }
}
}
impl<I> ParallelIterator for UniformBlocks<I>
where
I: IndexedParallelIterator,
{
type Item = I::Item;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where
C: UnindexedConsumer<Self::Item>,
{
let callback = BlocksCallback {
consumer,
sizes: std::iter::repeat(self.block_size),
len: self.base.len(),
};
self.base.with_producer(callback)
}
}