timely/dataflow/operators/core/map.rs
1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::{Scope, Stream};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: DrainContainer> : Sized {
11 /// Consumes each element of the stream and yields a new element.
12 ///
13 /// # Examples
14 /// ```
15 /// use timely::dataflow::operators::{ToStream, Inspect};
16 /// use timely::dataflow::operators::core::Map;
17 ///
18 /// timely::example(|scope| {
19 /// (0..10).to_stream(scope)
20 /// .container::<Vec<_>>()
21 /// .map(|x| x + 1)
22 /// .container::<Vec<_>>()
23 /// .inspect(|x| println!("seen: {:?}", x));
24 /// });
25 /// ```
26 fn map<C2, D2, L>(self, mut logic: L) -> Stream<S, C2>
27 where
28 C2: Container + SizableContainer + PushInto<D2>,
29 L: FnMut(C::Item<'_>)->D2 + 'static,
30 {
31 self.flat_map(move |x| std::iter::once(logic(x)))
32 }
33 /// Consumes each element of the stream and yields some number of new elements.
34 ///
35 /// # Examples
36 /// ```
37 /// use timely::dataflow::operators::{ToStream, Inspect};
38 /// use timely::dataflow::operators::core::Map;
39 ///
40 /// timely::example(|scope| {
41 /// (0..10).to_stream(scope)
42 /// .container::<Vec<_>>()
43 /// .flat_map(|x| (0..x))
44 /// .container::<Vec<_>>()
45 /// .inspect(|x| println!("seen: {:?}", x));
46 /// });
47 /// ```
48 fn flat_map<C2, I, L>(self, logic: L) -> Stream<S, C2>
49 where
50 I: IntoIterator,
51 C2: Container + SizableContainer + PushInto<I::Item>,
52 L: FnMut(C::Item<'_>)->I + 'static,
53 ;
54
55 /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
56 ///
57 /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
58 /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
59 /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
60 /// of intermediate stages.
61 ///
62 /// # Examples
63 /// ```
64 /// use timely::dataflow::operators::{Capture, ToStream};
65 /// use timely::dataflow::operators::core::Map;
66 /// use timely::dataflow::operators::capture::Extract;
67 ///
68 /// let data = timely::example(|scope| {
69 /// (0..10i32)
70 /// .to_stream(scope)
71 /// .container::<Vec<_>>()
72 /// .flat_map_builder(|x| x + 1)
73 /// .map(|x| x + 1)
74 /// .map(|x| x + 1)
75 /// .map(|x| x + 1)
76 /// .map(Some)
77 /// .into_stream::<_,Vec<i32>>()
78 /// .capture()
79 /// });
80 ///
81 /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
82 /// ```
83 fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
84 where
85 L: for<'a> Fn(C::Item<'a>) -> I,
86 {
87 FlatMapBuilder::new(self, logic)
88 }
89}
90
91impl<S: Scope, C: Container + DrainContainer> Map<S, C> for Stream<S, C> {
92 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
93 // TODO : number of elements from the iterator. This would allow iterators that produce many
94 // TODO : records without taking arbitrarily long and arbitrarily much memory.
95 fn flat_map<C2, I, L>(self, mut logic: L) -> Stream<S, C2>
96 where
97 I: IntoIterator,
98 C2: Container + SizableContainer + PushInto<I::Item>,
99 L: FnMut(C::Item<'_>)->I + 'static,
100 {
101 self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
102 input.for_each_time(|time, data| {
103 output.session(&time)
104 .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic));
105 });
106 })
107 }
108}
109
110
111/// A stream wrapper that allows the accumulation of flatmap logic.
112pub struct FlatMapBuilder<T, C: DrainContainer, F: 'static, I>
113where
114 for<'a> F: Fn(C::Item<'a>) -> I,
115{
116 stream: T,
117 logic: F,
118 marker: std::marker::PhantomData<C>,
119}
120
121impl<T, C: DrainContainer, F, I> FlatMapBuilder<T, C, F, I>
122where
123 for<'a> F: Fn(C::Item<'a>) -> I,
124{
125 /// Create a new wrapper with no action on the stream.
126 pub fn new(stream: T, logic: F) -> Self {
127 FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
128 }
129
130 /// Transform a flatmapped stream through additional logic.
131 pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
132 let logic = self.logic;
133 FlatMapBuilder {
134 stream: self.stream,
135 logic: move |x| g(logic(x)),
136 marker: std::marker::PhantomData,
137 }
138 }
139 /// Convert the wrapper into a stream.
140 pub fn into_stream<S, C2>(self) -> Stream<S, C2>
141 where
142 I: IntoIterator,
143 S: Scope,
144 T: Map<S, C>,
145 C2: Container + SizableContainer + PushInto<I::Item>,
146 {
147 Map::flat_map(self.stream, self.logic)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use crate::dataflow::operators::{Capture, ToStream, core::Map};
154 use crate::dataflow::operators::capture::Extract;
155
156 #[test]
157 fn test_builder() {
158 let data = crate::example(|scope| {
159 let stream = (0..10i32).to_stream(scope).container::<Vec<_>>();
160 stream.flat_map_builder(|x| x + 1)
161 .map(|x| x + 1)
162 .map(|x| x + 1)
163 .map(|x| x + 1)
164 .map(Some)
165 .into_stream::<_,Vec<i32>>()
166 .capture()
167 });
168
169 assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
170 }
171}