timely/dataflow/operators/core/map.rs
1//! Extension methods for `StreamCore` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: DrainContainer> {
11 /// Consumes each element of the stream and yields a new element.
12 ///
13 /// # Examples
14 /// ```
15 /// use timely::dataflow::operators::ToStream;
16 /// use timely::dataflow::operators::core::{Map, Inspect};
17 ///
18 /// timely::example(|scope| {
19 /// (0..10).to_stream(scope)
20 /// .map(|x| x + 1)
21 /// .container::<Vec<_>>()
22 /// .inspect(|x| println!("seen: {:?}", x));
23 /// });
24 /// ```
25 fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
26 where
27 C2: Container + SizableContainer + PushInto<D2>,
28 L: FnMut(C::Item<'_>)->D2 + 'static,
29 {
30 self.flat_map(move |x| std::iter::once(logic(x)))
31 }
32 /// Consumes each element of the stream and yields some number of new elements.
33 ///
34 /// # Examples
35 /// ```
36 /// use timely::dataflow::operators::ToStream;
37 /// use timely::dataflow::operators::core::{Map, Inspect};
38 ///
39 /// timely::example(|scope| {
40 /// (0..10).to_stream(scope)
41 /// .flat_map(|x| (0..x))
42 /// .container::<Vec<_>>()
43 /// .inspect(|x| println!("seen: {:?}", x));
44 /// });
45 /// ```
46 fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
47 where
48 I: IntoIterator,
49 C2: Container + SizableContainer + PushInto<I::Item>,
50 L: FnMut(C::Item<'_>)->I + 'static,
51 ;
52
53 /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
54 ///
55 /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
56 /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
57 /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
58 /// of intermediate stages.
59 ///
60 /// # Examples
61 /// ```
62 /// use timely::dataflow::operators::{Capture, ToStream, core::Map};
63 /// use timely::dataflow::operators::capture::Extract;
64 ///
65 /// let data = timely::example(|scope| {
66 /// (0..10i32)
67 /// .to_stream(scope)
68 /// .flat_map_builder(|x| x + 1)
69 /// .map(|x| x + 1)
70 /// .map(|x| x + 1)
71 /// .map(|x| x + 1)
72 /// .map(Some)
73 /// .into_stream::<_,Vec<i32>>()
74 /// .capture()
75 /// });
76 ///
77 /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
78 /// ```
79 fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I>
80 where
81 C: Clone + 'static,
82 L: for<'a> Fn(C::Item<'a>) -> I,
83 Self: Sized,
84 {
85 FlatMapBuilder::new(self, logic)
86 }
87}
88
89impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
90 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
91 // TODO : number of elements from the iterator. This would allow iterators that produce many
92 // TODO : records without taking arbitrarily long and arbitrarily much memory.
93 fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
94 where
95 I: IntoIterator,
96 C2: Container + SizableContainer + PushInto<I::Item>,
97 L: FnMut(C::Item<'_>)->I + 'static,
98 {
99 self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
100 input.for_each_time(|time, data| {
101 output.session(&time)
102 .give_iterator(data.flat_map(|d| d.drain()).flat_map(&mut logic));
103 });
104 })
105 }
106}
107
108
109/// A stream wrapper that allows the accumulation of flatmap logic.
110pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I>
111where
112 for<'a> F: Fn(C::Item<'a>) -> I,
113{
114 stream: &'t T,
115 logic: F,
116 marker: std::marker::PhantomData<C>,
117}
118
119impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I>
120where
121 for<'a> F: Fn(C::Item<'a>) -> I,
122{
123 /// Create a new wrapper with no action on the stream.
124 pub fn new(stream: &'t T, logic: F) -> Self {
125 FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
126 }
127
128 /// Transform a flatmapped stream through addiitonal logic.
129 pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
130 let logic = self.logic;
131 FlatMapBuilder {
132 stream: self.stream,
133 logic: move |x| g(logic(x)),
134 marker: std::marker::PhantomData,
135 }
136 }
137 /// Convert the wrapper into a stream.
138 pub fn into_stream<S, C2>(self) -> StreamCore<S, C2>
139 where
140 I: IntoIterator,
141 S: Scope,
142 T: Map<S, C>,
143 C2: Container + SizableContainer + PushInto<I::Item>,
144 {
145 Map::flat_map(self.stream, self.logic)
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use crate::dataflow::operators::{Capture, ToStream, core::Map};
152 use crate::dataflow::operators::capture::Extract;
153
154 #[test]
155 fn test_builder() {
156 let data = crate::example(|scope| {
157 let stream = (0..10i32).to_stream(scope);
158 stream.flat_map_builder(|x| x + 1)
159 .map(|x| x + 1)
160 .map(|x| x + 1)
161 .map(|x| x + 1)
162 .map(Some)
163 .into_stream::<_,Vec<i32>>()
164 .capture()
165 });
166
167 assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
168 }
169}