timely/dataflow/operators/core/
map.rs

1//! Extension methods for `StreamCore` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: DrainContainer> {
11    /// Consumes each element of the stream and yields a new element.
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::ToStream;
16    /// use timely::dataflow::operators::core::{Map, Inspect};
17    ///
18    /// timely::example(|scope| {
19    ///     (0..10).to_stream(scope)
20    ///            .map(|x| x + 1)
21    ///            .container::<Vec<_>>()
22    ///            .inspect(|x| println!("seen: {:?}", x));
23    /// });
24    /// ```
25    fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
26    where
27        C2: Container + SizableContainer + PushInto<D2>,
28        L: FnMut(C::Item<'_>)->D2 + 'static,
29    {
30        self.flat_map(move |x| std::iter::once(logic(x)))
31    }
32    /// Consumes each element of the stream and yields some number of new elements.
33    ///
34    /// # Examples
35    /// ```
36    /// use timely::dataflow::operators::ToStream;
37    /// use timely::dataflow::operators::core::{Map, Inspect};
38    ///
39    /// timely::example(|scope| {
40    ///     (0..10).to_stream(scope)
41    ///            .flat_map(|x| (0..x))
42    ///            .container::<Vec<_>>()
43    ///            .inspect(|x| println!("seen: {:?}", x));
44    /// });
45    /// ```
46    fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
47    where
48        I: IntoIterator,
49        C2: Container + SizableContainer + PushInto<I::Item>,
50        L: FnMut(C::Item<'_>)->I + 'static,
51    ;
52
53    /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
54    ///
55    /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
56    /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
57    /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
58    /// of intermediate stages.
59    ///
60    /// # Examples
61    /// ```
62    /// use timely::dataflow::operators::{Capture, ToStream, core::Map};
63    /// use timely::dataflow::operators::capture::Extract;
64    ///
65    /// let data = timely::example(|scope| {
66    ///     (0..10i32)
67    ///         .to_stream(scope)
68    ///         .flat_map_builder(|x| x + 1)
69    ///         .map(|x| x + 1)
70    ///         .map(|x| x + 1)
71    ///         .map(|x| x + 1)
72    ///         .map(Some)
73    ///         .into_stream::<_,Vec<i32>>()
74    ///         .capture()
75    /// });
76    ///
77    /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
78    /// ```
79    fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I>
80    where
81        C: Clone + 'static,
82        L: for<'a> Fn(C::Item<'a>) -> I,
83        Self: Sized,
84    {
85        FlatMapBuilder::new(self, logic)
86    }
87}
88
89impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
90    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
91    // TODO : number of elements from the iterator. This would allow iterators that produce many
92    // TODO : records without taking arbitrarily long and arbitrarily much memory.
93    fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
94    where
95        I: IntoIterator,
96        C2: Container + SizableContainer + PushInto<I::Item>,
97        L: FnMut(C::Item<'_>)->I + 'static,
98    {
99        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
100            input.for_each_time(|time, data| {
101                output.session(&time)
102                      .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic));
103            });
104        })
105    }
106}
107
108
109/// A stream wrapper that allows the accumulation of flatmap logic.
110pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I>
111where
112    for<'a> F: Fn(C::Item<'a>) -> I,
113{
114    stream: &'t T,
115    logic: F,
116    marker: std::marker::PhantomData<C>,
117}
118
119impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I>
120where
121    for<'a> F: Fn(C::Item<'a>) -> I,
122{
123    /// Create a new wrapper with no action on the stream.
124    pub fn new(stream: &'t T, logic: F) -> Self {
125        FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
126    }
127
128    /// Transform a flatmapped stream through addiitonal logic.
129    pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
130        let logic = self.logic;
131        FlatMapBuilder {
132            stream: self.stream,
133            logic: move |x| g(logic(x)),
134            marker: std::marker::PhantomData,
135        }
136    }
137    /// Convert the wrapper into a stream.
138    pub fn into_stream<S, C2>(self) -> StreamCore<S, C2>
139    where
140        I: IntoIterator,
141        S: Scope,
142        T: Map<S, C>,
143        C2: Container + SizableContainer + PushInto<I::Item>,
144    {
145        Map::flat_map(self.stream, self.logic)
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use crate::dataflow::operators::{Capture, ToStream, core::Map};
152    use crate::dataflow::operators::capture::Extract;
153
154    #[test]
155    fn test_builder() {
156        let data = crate::example(|scope| {
157            let stream = (0..10i32).to_stream(scope);
158            stream.flat_map_builder(|x| x + 1)
159                .map(|x| x + 1)
160                .map(|x| x + 1)
161                .map(|x| x + 1)
162                .map(Some)
163                .into_stream::<_,Vec<i32>>()
164                .capture()
165        });
166
167        assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
168    }
169}