timely/dataflow/operators/map.rs
1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::Data;
4use crate::dataflow::{Stream, Scope};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::operators::core::{Map as MapCore};
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, D: Data> {
11 /// Consumes each element of the stream and yields a new element.
12 ///
13 /// # Examples
14 /// ```
15 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
16 ///
17 /// timely::example(|scope| {
18 /// (0..10).to_stream(scope)
19 /// .map(|x| x + 1)
20 /// .inspect(|x| println!("seen: {:?}", x));
21 /// });
22 /// ```
23 fn map<D2: Data, L: FnMut(D)->D2+'static>(&self, mut logic: L) -> Stream<S, D2> {
24 self.flat_map(move |x| std::iter::once(logic(x)))
25 }
26 /// Updates each element of the stream and yields the element, re-using memory where possible.
27 ///
28 /// # Examples
29 /// ```
30 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
31 ///
32 /// timely::example(|scope| {
33 /// (0..10).to_stream(scope)
34 /// .map_in_place(|x| *x += 1)
35 /// .inspect(|x| println!("seen: {:?}", x));
36 /// });
37 /// ```
38 fn map_in_place<L: FnMut(&mut D)+'static>(&self, logic: L) -> Stream<S, D>;
39 /// Consumes each element of the stream and yields some number of new elements.
40 ///
41 /// # Examples
42 /// ```
43 /// use timely::dataflow::operators::{ToStream, Map, Inspect};
44 ///
45 /// timely::example(|scope| {
46 /// (0..10).to_stream(scope)
47 /// .flat_map(|x| (0..x))
48 /// .inspect(|x| println!("seen: {:?}", x));
49 /// });
50 /// ```
51 fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data;
52}
53
54impl<S: Scope, D: Data> Map<S, D> for Stream<S, D> {
55 fn map_in_place<L: FnMut(&mut D)+'static>(&self, mut logic: L) -> Stream<S, D> {
56 self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| {
57 input.for_each(|time, data| {
58 for datum in data.iter_mut() { logic(datum); }
59 output.session(&time).give_container(data);
60 })
61 })
62 }
63 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
64 // TODO : number of elements from the iterator. This would allow iterators that produce many
65 // TODO : records without taking arbitrarily long and arbitrarily much memory.
66 fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data {
67 MapCore::flat_map(self, logic)
68 }
69}