timely/dataflow/operators/core/
map.rs

1//! Extension methods for `StreamCore` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: DrainContainer> {
11    /// Consumes each element of the stream and yields a new element.
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::ToStream;
16    /// use timely::dataflow::operators::core::{Map, Inspect};
17    ///
18    /// timely::example(|scope| {
19    ///     (0..10).to_stream(scope)
20    ///            .map(|x| x + 1)
21    ///            .container::<Vec<_>>()
22    ///            .inspect(|x| println!("seen: {:?}", x));
23    /// });
24    /// ```
25    fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
26    where
27        C2: Container + SizableContainer + PushInto<D2>,
28        L: FnMut(C::Item<'_>)->D2 + 'static,
29    {
30        self.flat_map(move |x| std::iter::once(logic(x)))
31    }
32    /// Consumes each element of the stream and yields some number of new elements.
33    ///
34    /// # Examples
35    /// ```
36    /// use timely::dataflow::operators::ToStream;
37    /// use timely::dataflow::operators::core::{Map, Inspect};
38    ///
39    /// timely::example(|scope| {
40    ///     (0..10).to_stream(scope)
41    ///            .flat_map(|x| (0..x))
42    ///            .container::<Vec<_>>()
43    ///            .inspect(|x| println!("seen: {:?}", x));
44    /// });
45    /// ```
46    fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
47    where
48        I: IntoIterator,
49        C2: Container + SizableContainer + PushInto<I::Item>,
50        L: FnMut(C::Item<'_>)->I + 'static,
51    ;
52
53    /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
54    ///
55    /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
56    /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
57    /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
58    /// of intermediate stages.
59    ///
60    /// # Examples
61    /// ```
62    /// use timely::dataflow::operators::{Capture, ToStream, core::Map};
63    /// use timely::dataflow::operators::capture::Extract;
64    ///
65    /// let data = timely::example(|scope| {
66    ///     (0..10i32)
67    ///         .to_stream(scope)
68    ///         .flat_map_builder(|x| x + 1)
69    ///         .map(|x| x + 1)
70    ///         .map(|x| x + 1)
71    ///         .map(|x| x + 1)
72    ///         .map(Some)
73    ///         .into_stream::<_,Vec<i32>>()
74    ///         .capture()
75    /// });
76    ///
77    /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
78    /// ```
79    fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I>
80    where
81        C: Clone + 'static,
82        L: for<'a> Fn(C::Item<'a>) -> I,
83        Self: Sized,
84    {
85        FlatMapBuilder::new(self, logic)
86    }
87}
88
89impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
90    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
91    // TODO : number of elements from the iterator. This would allow iterators that produce many
92    // TODO : records without taking arbitrarily long and arbitrarily much memory.
93    fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
94    where
95        I: IntoIterator,
96        C2: Container + SizableContainer + PushInto<I::Item>,
97        L: FnMut(C::Item<'_>)->I + 'static,
98    {
99        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
100            input.for_each(|time, data| {
101                output.session(&time).give_iterator(data.drain().flat_map(&mut logic));
102            });
103        })
104    }
105}
106
107
108/// A stream wrapper that allows the accumulation of flatmap logic.
109pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I>
110where
111    for<'a> F: Fn(C::Item<'a>) -> I,
112{
113    stream: &'t T,
114    logic: F,
115    marker: std::marker::PhantomData<C>,
116}
117
118impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I>
119where
120    for<'a> F: Fn(C::Item<'a>) -> I,
121{
122    /// Create a new wrapper with no action on the stream.
123    pub fn new(stream: &'t T, logic: F) -> Self {
124        FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
125    }
126
127    /// Transform a flatmapped stream through addiitonal logic.
128    pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
129        let logic = self.logic;
130        FlatMapBuilder {
131            stream: self.stream,
132            logic: move |x| g(logic(x)),
133            marker: std::marker::PhantomData,
134        }
135    }
136    /// Convert the wrapper into a stream.
137    pub fn into_stream<S, C2>(self) -> StreamCore<S, C2>
138    where
139        I: IntoIterator,
140        S: Scope,
141        T: Map<S, C>,
142        C2: Container + SizableContainer + PushInto<I::Item>,
143    {
144        Map::flat_map(self.stream, self.logic)
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use crate::dataflow::operators::{Capture, ToStream, core::Map};
151    use crate::dataflow::operators::capture::Extract;
152
153    #[test]
154    fn test_builder() {
155        let data = crate::example(|scope| {
156            let stream = (0..10i32).to_stream(scope);
157            stream.flat_map_builder(|x| x + 1)
158                .map(|x| x + 1)
159                .map(|x| x + 1)
160                .map(|x| x + 1)
161                .map(Some)
162                .into_stream::<_,Vec<i32>>()
163                .capture()
164        });
165
166        assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
167    }
168}