timely/dataflow/operators/core/map.rs
1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::progress::Timestamp;
5use crate::Container;
6use crate::dataflow::Stream;
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::operators::generic::operator::Operator;
9
10/// Extension trait for `Stream`.
11pub trait Map<'scope, T: Timestamp, C: DrainContainer> : Sized {
12 /// Consumes each element of the stream and yields a new element.
13 ///
14 /// # Examples
15 /// ```
16 /// use timely::dataflow::operators::{ToStream, Inspect};
17 /// use timely::dataflow::operators::core::Map;
18 ///
19 /// timely::example(|scope| {
20 /// (0..10).to_stream(scope)
21 /// .container::<Vec<_>>()
22 /// .map(|x| x + 1)
23 /// .container::<Vec<_>>()
24 /// .inspect(|x| println!("seen: {:?}", x));
25 /// });
26 /// ```
27 fn map<C2, D2, L>(self, mut logic: L) -> Stream<'scope, T, C2>
28 where
29 C2: Container + SizableContainer + PushInto<D2>,
30 L: FnMut(C::Item<'_>)->D2 + 'static,
31 {
32 self.flat_map(move |x| std::iter::once(logic(x)))
33 }
34 /// Consumes each element of the stream and yields some number of new elements.
35 ///
36 /// # Examples
37 /// ```
38 /// use timely::dataflow::operators::{ToStream, Inspect};
39 /// use timely::dataflow::operators::core::Map;
40 ///
41 /// timely::example(|scope| {
42 /// (0..10).to_stream(scope)
43 /// .container::<Vec<_>>()
44 /// .flat_map(|x| (0..x))
45 /// .container::<Vec<_>>()
46 /// .inspect(|x| println!("seen: {:?}", x));
47 /// });
48 /// ```
49 fn flat_map<C2, I, L>(self, logic: L) -> Stream<'scope, T, C2>
50 where
51 I: IntoIterator,
52 C2: Container + SizableContainer + PushInto<I::Item>,
53 L: FnMut(C::Item<'_>)->I + 'static,
54 ;
55
56 /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
57 ///
58 /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
59 /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
60 /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
61 /// of intermediate stages.
62 ///
63 /// # Examples
64 /// ```
65 /// use timely::dataflow::operators::{Capture, ToStream};
66 /// use timely::dataflow::operators::core::Map;
67 /// use timely::dataflow::operators::capture::Extract;
68 ///
69 /// let data = timely::example(|scope| {
70 /// (0..10i32)
71 /// .to_stream(scope)
72 /// .container::<Vec<_>>()
73 /// .flat_map_builder(|x| x + 1)
74 /// .map(|x| x + 1)
75 /// .map(|x| x + 1)
76 /// .map(|x| x + 1)
77 /// .map(Some)
78 /// .into_stream::<_,Vec<i32>>()
79 /// .capture()
80 /// });
81 ///
82 /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
83 /// ```
84 fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
85 where
86 L: for<'a> Fn(C::Item<'a>) -> I,
87 {
88 FlatMapBuilder::new(self, logic)
89 }
90}
91
92impl<'scope, T: Timestamp, C: Container + DrainContainer> Map<'scope, T, C> for Stream<'scope, T, C> {
93 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
94 // TODO : number of elements from the iterator. This would allow iterators that produce many
95 // TODO : records without taking arbitrarily long and arbitrarily much memory.
96 fn flat_map<C2, I, L>(self, mut logic: L) -> Stream<'scope, T, C2>
97 where
98 I: IntoIterator,
99 C2: Container + SizableContainer + PushInto<I::Item>,
100 L: FnMut(C::Item<'_>)->I + 'static,
101 {
102 self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
103 input.for_each_time(|time, data| {
104 output.session(&time)
105 .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic));
106 });
107 })
108 }
109}
110
111
112/// A stream wrapper that allows the accumulation of flatmap logic.
113pub struct FlatMapBuilder<S, C: DrainContainer, F: 'static, I>
114where
115 for<'a> F: Fn(C::Item<'a>) -> I,
116{
117 stream: S,
118 logic: F,
119 marker: std::marker::PhantomData<C>,
120}
121
122impl<S, C: DrainContainer, F, I> FlatMapBuilder<S, C, F, I>
123where
124 for<'a> F: Fn(C::Item<'a>) -> I,
125{
126 /// Create a new wrapper with no action on the stream.
127 pub fn new(stream: S, logic: F) -> Self {
128 FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
129 }
130
131 /// Transform a flatmapped stream through additional logic.
132 pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<S, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
133 let logic = self.logic;
134 FlatMapBuilder {
135 stream: self.stream,
136 logic: move |x| g(logic(x)),
137 marker: std::marker::PhantomData,
138 }
139 }
140 /// Convert the wrapper into a stream.
141 pub fn into_stream<'scope, T, C2>(self) -> Stream<'scope, T, C2>
142 where
143 I: IntoIterator,
144 T: Timestamp,
145 S: Map<'scope, T, C>,
146 C2: Container + SizableContainer + PushInto<I::Item>,
147 {
148 Map::flat_map(self.stream, self.logic)
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use crate::dataflow::operators::{Capture, ToStream, core::Map};
155 use crate::dataflow::operators::capture::Extract;
156
157 #[test]
158 fn test_builder() {
159 let data = crate::example(|scope| {
160 let stream = (0..10i32).to_stream(scope).container::<Vec<_>>();
161 stream.flat_map_builder(|x| x + 1)
162 .map(|x| x + 1)
163 .map(|x| x + 1)
164 .map(|x| x + 1)
165 .map(Some)
166 .into_stream::<_,Vec<i32>>()
167 .capture()
168 });
169
170 assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
171 }
172}