1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
//! Extension methods for `StreamCore` based on record-by-record transformation.
use crate::container::{Container, SizableContainer, PushInto};
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::operator::Operator;
/// Extension trait for `Stream`.
pub trait Map<S: Scope, C: Container> {
/// Consumes each element of the stream and yields a new element.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::ToStream;
/// use timely::dataflow::operators::core::{Map, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .map(|x| x + 1)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
where
C2: SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>)->D2 + 'static,
{
self.flat_map(move |x| std::iter::once(logic(x)))
}
/// Consumes each element of the stream and yields some number of new elements.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::ToStream;
/// use timely::dataflow::operators::core::{Map, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .flat_map(|x| (0..x))
/// .container::<Vec<_>>()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
where
I: IntoIterator,
C2: SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>)->I + 'static,
;
}
impl<S: Scope, C: Container> Map<S, C> for StreamCore<S, C> {
// TODO : This would be more robust if it captured an iterator and then pulled an appropriate
// TODO : number of elements from the iterator. This would allow iterators that produce many
// TODO : records without taking arbitrarily long and arbitrarily much memory.
fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
where
I: IntoIterator,
C2: SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>)->I + 'static,
{
self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
input.for_each(|time, data| {
output.session(&time).give_iterator(data.drain().flat_map(&mut logic));
});
})
}
}