Skip to main content

timely/dataflow/operators/core/
map.rs

1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::{Scope, Stream};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: DrainContainer> : Sized {
11    /// Consumes each element of the stream and yields a new element.
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::{ToStream, Inspect};
16    /// use timely::dataflow::operators::core::Map;
17    ///
18    /// timely::example(|scope| {
19    ///     (0..10).to_stream(scope)
20    ///            .container::<Vec<_>>()
21    ///            .map(|x| x + 1)
22    ///            .container::<Vec<_>>()
23    ///            .inspect(|x| println!("seen: {:?}", x));
24    /// });
25    /// ```
26    fn map<C2, D2, L>(self, mut logic: L) -> Stream<S, C2>
27    where
28        C2: Container + SizableContainer + PushInto<D2>,
29        L: FnMut(C::Item<'_>)->D2 + 'static,
30    {
31        self.flat_map(move |x| std::iter::once(logic(x)))
32    }
33    /// Consumes each element of the stream and yields some number of new elements.
34    ///
35    /// # Examples
36    /// ```
37    /// use timely::dataflow::operators::{ToStream, Inspect};
38    /// use timely::dataflow::operators::core::Map;
39    ///
40    /// timely::example(|scope| {
41    ///     (0..10).to_stream(scope)
42    ///            .container::<Vec<_>>()
43    ///            .flat_map(|x| (0..x))
44    ///            .container::<Vec<_>>()
45    ///            .inspect(|x| println!("seen: {:?}", x));
46    /// });
47    /// ```
48    fn flat_map<C2, I, L>(self, logic: L) -> Stream<S, C2>
49    where
50        I: IntoIterator,
51        C2: Container + SizableContainer + PushInto<I::Item>,
52        L: FnMut(C::Item<'_>)->I + 'static,
53    ;
54
55    /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
56    ///
57    /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
58    /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
59    /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
60    /// of intermediate stages.
61    ///
62    /// # Examples
63    /// ```
64    /// use timely::dataflow::operators::{Capture, ToStream};
65    /// use timely::dataflow::operators::core::Map;
66    /// use timely::dataflow::operators::capture::Extract;
67    ///
68    /// let data = timely::example(|scope| {
69    ///     (0..10i32)
70    ///         .to_stream(scope)
71    ///         .container::<Vec<_>>()
72    ///         .flat_map_builder(|x| x + 1)
73    ///         .map(|x| x + 1)
74    ///         .map(|x| x + 1)
75    ///         .map(|x| x + 1)
76    ///         .map(Some)
77    ///         .into_stream::<_,Vec<i32>>()
78    ///         .capture()
79    /// });
80    ///
81    /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
82    /// ```
83    fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
84    where
85        L: for<'a> Fn(C::Item<'a>) -> I,
86    {
87        FlatMapBuilder::new(self, logic)
88    }
89}
90
91impl<S: Scope, C: Container + DrainContainer> Map<S, C> for Stream<S, C> {
92    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
93    // TODO : number of elements from the iterator. This would allow iterators that produce many
94    // TODO : records without taking arbitrarily long and arbitrarily much memory.
95    fn flat_map<C2, I, L>(self, mut logic: L) -> Stream<S, C2>
96    where
97        I: IntoIterator,
98        C2: Container + SizableContainer + PushInto<I::Item>,
99        L: FnMut(C::Item<'_>)->I + 'static,
100    {
101        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
102            input.for_each_time(|time, data| {
103                output.session(&time)
104                      .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic));
105            });
106        })
107    }
108}
109
110
111/// A stream wrapper that allows the accumulation of flatmap logic.
112pub struct FlatMapBuilder<T, C: DrainContainer, F: 'static, I>
113where
114    for<'a> F: Fn(C::Item<'a>) -> I,
115{
116    stream: T,
117    logic: F,
118    marker: std::marker::PhantomData<C>,
119}
120
121impl<T, C: DrainContainer, F, I> FlatMapBuilder<T, C, F, I>
122where
123    for<'a> F: Fn(C::Item<'a>) -> I,
124{
125    /// Create a new wrapper with no action on the stream.
126    pub fn new(stream: T, logic: F) -> Self {
127        FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
128    }
129
130    /// Transform a flatmapped stream through additional logic.
131    pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
132        let logic = self.logic;
133        FlatMapBuilder {
134            stream: self.stream,
135            logic: move |x| g(logic(x)),
136            marker: std::marker::PhantomData,
137        }
138    }
139    /// Convert the wrapper into a stream.
140    pub fn into_stream<S, C2>(self) -> Stream<S, C2>
141    where
142        I: IntoIterator,
143        S: Scope,
144        T: Map<S, C>,
145        C2: Container + SizableContainer + PushInto<I::Item>,
146    {
147        Map::flat_map(self.stream, self.logic)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use crate::dataflow::operators::{Capture, ToStream, core::Map};
154    use crate::dataflow::operators::capture::Extract;
155
156    #[test]
157    fn test_builder() {
158        let data = crate::example(|scope| {
159            let stream = (0..10i32).to_stream(scope).container::<Vec<_>>();
160            stream.flat_map_builder(|x| x + 1)
161                .map(|x| x + 1)
162                .map(|x| x + 1)
163                .map(|x| x + 1)
164                .map(Some)
165                .into_stream::<_,Vec<i32>>()
166                .capture()
167        });
168
169        assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
170    }
171}