timely/dataflow/operators/core/map.rs
1//! Extension methods for `StreamCore` based on record-by-record transformation.
2
3use crate::container::{DrainContainer, SizableContainer, PushInto};
4use crate::Container;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: DrainContainer> {
11 /// Consumes each element of the stream and yields a new element.
12 ///
13 /// # Examples
14 /// ```
15 /// use timely::dataflow::operators::ToStream;
16 /// use timely::dataflow::operators::core::{Map, Inspect};
17 ///
18 /// timely::example(|scope| {
19 /// (0..10).to_stream(scope)
20 /// .map(|x| x + 1)
21 /// .container::<Vec<_>>()
22 /// .inspect(|x| println!("seen: {:?}", x));
23 /// });
24 /// ```
25 fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
26 where
27 C2: Container + SizableContainer + PushInto<D2>,
28 L: FnMut(C::Item<'_>)->D2 + 'static,
29 {
30 self.flat_map(move |x| std::iter::once(logic(x)))
31 }
32 /// Consumes each element of the stream and yields some number of new elements.
33 ///
34 /// # Examples
35 /// ```
36 /// use timely::dataflow::operators::ToStream;
37 /// use timely::dataflow::operators::core::{Map, Inspect};
38 ///
39 /// timely::example(|scope| {
40 /// (0..10).to_stream(scope)
41 /// .flat_map(|x| (0..x))
42 /// .container::<Vec<_>>()
43 /// .inspect(|x| println!("seen: {:?}", x));
44 /// });
45 /// ```
46 fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
47 where
48 I: IntoIterator,
49 C2: Container + SizableContainer + PushInto<I::Item>,
50 L: FnMut(C::Item<'_>)->I + 'static,
51 ;
52
53 /// Creates a `FlatMapBuilder`, which allows chaining of iterator logic before finalization into a stream.
54 ///
55 /// This pattern exists to make it easier to provide the ergonomics of iterator combinators without the
56 /// overhead of multiple dataflow operators. The resulting single operator will internally use compiled
57 /// iterators to go record-by-record, and unlike a chain of operators will not need to stage the records
58 /// of intermediate stages.
59 ///
60 /// # Examples
61 /// ```
62 /// use timely::dataflow::operators::{Capture, ToStream, core::Map};
63 /// use timely::dataflow::operators::capture::Extract;
64 ///
65 /// let data = timely::example(|scope| {
66 /// (0..10i32)
67 /// .to_stream(scope)
68 /// .flat_map_builder(|x| x + 1)
69 /// .map(|x| x + 1)
70 /// .map(|x| x + 1)
71 /// .map(|x| x + 1)
72 /// .map(Some)
73 /// .into_stream::<_,Vec<i32>>()
74 /// .capture()
75 /// });
76 ///
77 /// assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
78 /// ```
79 fn flat_map_builder<'t, I, L>(&'t self, logic: L) -> FlatMapBuilder<'t, Self, C, L, I>
80 where
81 C: Clone + 'static,
82 L: for<'a> Fn(C::Item<'a>) -> I,
83 Self: Sized,
84 {
85 FlatMapBuilder::new(self, logic)
86 }
87}
88
89impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C> {
90 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
91 // TODO : number of elements from the iterator. This would allow iterators that produce many
92 // TODO : records without taking arbitrarily long and arbitrarily much memory.
93 fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
94 where
95 I: IntoIterator,
96 C2: Container + SizableContainer + PushInto<I::Item>,
97 L: FnMut(C::Item<'_>)->I + 'static,
98 {
99 self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
100 input.for_each(|time, data| {
101 output.session(&time).give_iterator(data.drain().flat_map(&mut logic));
102 });
103 })
104 }
105}
106
107
108/// A stream wrapper that allows the accumulation of flatmap logic.
109pub struct FlatMapBuilder<'t, T, C: DrainContainer, F: 'static, I>
110where
111 for<'a> F: Fn(C::Item<'a>) -> I,
112{
113 stream: &'t T,
114 logic: F,
115 marker: std::marker::PhantomData<C>,
116}
117
118impl<'t, T, C: DrainContainer + Clone + 'static, F, I> FlatMapBuilder<'t, T, C, F, I>
119where
120 for<'a> F: Fn(C::Item<'a>) -> I,
121{
122 /// Create a new wrapper with no action on the stream.
123 pub fn new(stream: &'t T, logic: F) -> Self {
124 FlatMapBuilder { stream, logic, marker: std::marker::PhantomData }
125 }
126
127 /// Transform a flatmapped stream through addiitonal logic.
128 pub fn map<G: Fn(I) -> I2 + 'static, I2>(self, g: G) -> FlatMapBuilder<'t, T, C, impl Fn(C::Item<'_>) -> I2 + 'static, I2> {
129 let logic = self.logic;
130 FlatMapBuilder {
131 stream: self.stream,
132 logic: move |x| g(logic(x)),
133 marker: std::marker::PhantomData,
134 }
135 }
136 /// Convert the wrapper into a stream.
137 pub fn into_stream<S, C2>(self) -> StreamCore<S, C2>
138 where
139 I: IntoIterator,
140 S: Scope,
141 T: Map<S, C>,
142 C2: Container + SizableContainer + PushInto<I::Item>,
143 {
144 Map::flat_map(self.stream, self.logic)
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use crate::dataflow::operators::{Capture, ToStream, core::Map};
151 use crate::dataflow::operators::capture::Extract;
152
153 #[test]
154 fn test_builder() {
155 let data = crate::example(|scope| {
156 let stream = (0..10i32).to_stream(scope);
157 stream.flat_map_builder(|x| x + 1)
158 .map(|x| x + 1)
159 .map(|x| x + 1)
160 .map(|x| x + 1)
161 .map(Some)
162 .into_stream::<_,Vec<i32>>()
163 .capture()
164 });
165
166 assert_eq!((4..14).collect::<Vec<_>>(), data.extract()[0].1);
167 }
168}