Skip to main content

timely/dataflow/operators/core/
map.rs

1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::progress::Timestamp;
5use crate::Container;
6use crate::dataflow::Stream;
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::operators::generic::operator::Operator;
9
10/// Extension trait for `Stream`.
11pub trait Map<'scope, T: Timestamp, C: DrainContainer> : Sized {
12    /// Consumes each element of the stream and yields a new element.
13    ///
14    /// # Examples
15    /// ```
16    /// use timely::dataflow::operators::{ToStream, Inspect};
17    /// use timely::dataflow::operators::core::Map;
18    ///
19    /// timely::example(|scope| {
20    ///     (0..10).to_stream(scope)
21    ///            .container::<Vec<_>>()
22    ///            .map(|x| x + 1)
23    ///            .container::<Vec<_>>()
24    ///            .inspect(|x| println!("seen: {:?}", x));
25    /// });
26    /// ```
27    fn map<C2, D2, L>(self, mut logic: L) -> Stream<'scope, T, C2>
28    where
29        C2: Container + SizableContainer + PushInto<D2>,
30        L: FnMut(C::Item<'_>)->D2 + 'static,
31    {
32        self.flat_map(move |x| std::iter::once(logic(x)))
33    }
34    /// Consumes each element of the stream and yields some number of new elements.
35    ///
36    /// # Examples
37    /// ```
38    /// use timely::dataflow::operators::{ToStream, Inspect};
39    /// use timely::dataflow::operators::core::Map;
40    ///
41    /// timely::example(|scope| {
42    ///     (0..10).to_stream(scope)
43    ///            .container::<Vec<_>>()
44    ///            .flat_map(|x| (0..x))
45    ///            .container::<Vec<_>>()
46    ///            .inspect(|x| println!("seen: {:?}", x));
47    /// });
48    /// ```
49    fn flat_map<C2, I, L>(self, logic: L) -> Stream<'scope, T, C2>
50    where
51        I: IntoIterator,
52        C2: Container + SizableContainer + PushInto<I::Item>,
53        L: FnMut(C::Item<'_>)->I + 'static,
54    ;
55
56    /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
57    ///
58    /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
59    /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
60    /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
61    /// of intermediate stages.
62    ///
63    /// # Examples
64    /// ```
65    /// use timely::dataflow::operators::{Capture, ToStream};
66    /// use timely::dataflow::operators::core::Map;
67    /// use timely::dataflow::operators::capture::Extract;
68    ///
69    /// let data = timely::example(|scope| {
70    ///     (0..10i32)
71    ///         .to_stream(scope)
72    ///         .container::<Vec<_>>()
73    ///         .flat_map_builder(|x| x + 1)
74    ///         .map(|x| x + 1)
75    ///         .map(|x| x + 1)
76    ///         .map(|x| x + 1)
77    ///         .map(Some)
78    ///         .into_stream::<_,Vec<i32>>()
79    ///         .capture()
80    /// });
81    ///
82    /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
83    /// ```
84    fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
85    where
86        L: for<'a> Fn(C::Item<'a>) -> I,
87    {
88        FlatMapBuilder::new(self, logic)
89    }
90}
91
92impl<'scope, T: Timestamp, C: Container + DrainContainer> Map<'scope, T, C> for Stream<'scope, T, C> {
93    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
94    // TODO : number of elements from the iterator. This would allow iterators that produce many
95    // TODO : records without taking arbitrarily long and arbitrarily much memory.
96    fn flat_map<C2, I, L>(self, mut logic: L) -> Stream<'scope, T, C2>
97    where
98        I: IntoIterator,
99        C2: Container + SizableContainer + PushInto<I::Item>,
100        L: FnMut(C::Item<'_>)->I + 'static,
101    {
102        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
103            input.for_each_time(|time, data| {
104                output.session(&time)
105                      .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic));
106            });
107        })
108    }
109}
110
111
112/// A stream wrapper that allows the accumulation of flatmap logic.
113pub struct FlatMapBuilder<S, C: DrainContainer, F: 'static, I>
114where
115    for<'a> F: Fn(C::Item<'a>) -> I,
116{
117    stream: S,
118    logic: F,
119    marker: std::marker::PhantomData<C>,
120}
121
122impl<S, C: DrainContainer, F, I> FlatMapBuilder<S, C, F, I>
123where
124    for<'a> F: Fn(C::Item<'a>) -> I,
125{
126    /// Create a new wrapper with no action on the stream.
127    pub fn new(stream: S, logic: F) -> Self {
128        FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
129    }
130
131    /// Transform a flatmapped stream through additional logic.
132    pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<S, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
133        let logic = self.logic;
134        FlatMapBuilder {
135            stream: self.stream,
136            logic: move |x| g(logic(x)),
137            marker: std::marker::PhantomData,
138        }
139    }
140    /// Convert the wrapper into a stream.
141    pub fn into_stream<'scope, T, C2>(self) -> Stream<'scope, T, C2>
142    where
143        I: IntoIterator,
144        T: Timestamp,
145        S: Map<'scope, T, C>,
146        C2: Container + SizableContainer + PushInto<I::Item>,
147    {
148        Map::flat_map(self.stream, self.logic)
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use crate::dataflow::operators::{Capture, ToStream, core::Map};
155    use crate::dataflow::operators::capture::Extract;
156
157    #[test]
158    fn test_builder() {
159        let data = crate::example(|scope| {
160            let stream = (0..10i32).to_stream(scope).container::<Vec<_>>();
161            stream.flat_map_builder(|x| x + 1)
162                .map(|x| x + 1)
163                .map(|x| x + 1)
164                .map(|x| x + 1)
165                .map(Some)
166                .into_stream::<_,Vec<i32>>()
167                .capture()
168        });
169
170        assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
171    }
172}