timely/dataflow/operators/core/
map.rs

1//! Extension methods for `StreamCore` based on record-by-record transformation.
2
3use crate::container::{Container, SizableContainer, PushInto};
4use crate::Data;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: Container> {
11    /// Consumes each element of the stream and yields a new element.
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::ToStream;
16    /// use timely::dataflow::operators::core::{Map, Inspect};
17    ///
18    /// timely::example(|scope| {
19    ///     (0..10).to_stream(scope)
20    ///            .map(|x| x + 1)
21    ///            .container::<Vec<_>>()
22    ///            .inspect(|x| println!("seen: {:?}", x));
23    /// });
24    /// ```
25    fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
26    where
27        C2: SizableContainer + PushInto<D2> + Data,
28        L: FnMut(C::Item<'_>)->D2 + 'static,
29    {
30        self.flat_map(move |x| std::iter::once(logic(x)))
31    }
32    /// Consumes each element of the stream and yields some number of new elements.
33    ///
34    /// # Examples
35    /// ```
36    /// use timely::dataflow::operators::ToStream;
37    /// use timely::dataflow::operators::core::{Map, Inspect};
38    ///
39    /// timely::example(|scope| {
40    ///     (0..10).to_stream(scope)
41    ///            .flat_map(|x| (0..x))
42    ///            .container::<Vec<_>>()
43    ///            .inspect(|x| println!("seen: {:?}", x));
44    /// });
45    /// ```
46    fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
47    where
48        I: IntoIterator,
49        C2: SizableContainer + PushInto<I::Item> + Data,
50        L: FnMut(C::Item<'_>)->I + 'static,
51    ;
52}
53
54impl<S: Scope, C: Container + Data> Map<S, C> for StreamCore<S, C> {
55    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
56    // TODO : number of elements from the iterator. This would allow iterators that produce many
57    // TODO : records without taking arbitrarily long and arbitrarily much memory.
58    fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
59    where
60        I: IntoIterator,
61        C2: SizableContainer + PushInto<I::Item> + Data,
62        L: FnMut(C::Item<'_>)->I + 'static,
63    {
64        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
65            input.for_each(|time, data| {
66                output.session(&time).give_iterator(data.drain().flat_map(&mut logic));
67            });
68        })
69    }
70}