timely/dataflow/operators/vec/map.rs
1//! Extension methods for `StreamVec` based on record-by-record transformation.
2
3use crate::dataflow::{StreamVec, Scope};
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::operators::generic::operator::Operator;
6use crate::dataflow::operators::core::{Map as MapCore};
7
8/// Extension trait for `StreamVec`.
9pub trait Map<S: Scope, D: 'static> : Sized {
10 /// Consumes each element of the stream and yields a new element.
11 ///
12 /// # Examples
13 /// ```
14 /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
15 ///
16 /// timely::example(|scope| {
17 /// (0..10).to_stream(scope)
18 /// .map(|x| x + 1)
19 /// .inspect(|x| println!("seen: {:?}", x));
20 /// });
21 /// ```
22 fn map<D2: 'static, L: FnMut(D)->D2+'static>(self, mut logic: L) -> StreamVec<S, D2> {
23 self.flat_map(move |x| std::iter::once(logic(x)))
24 }
25 /// Updates each element of the stream and yields the element, re-using memory where possible.
26 ///
27 /// # Examples
28 /// ```
29 /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
30 ///
31 /// timely::example(|scope| {
32 /// (0..10).to_stream(scope)
33 /// .map_in_place(|x| *x += 1)
34 /// .inspect(|x| println!("seen: {:?}", x));
35 /// });
36 /// ```
37 fn map_in_place<L: FnMut(&mut D)+'static>(self, logic: L) -> StreamVec<S, D>;
38 /// Consumes each element of the stream and yields some number of new elements.
39 ///
40 /// # Examples
41 /// ```
42 /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
43 ///
44 /// timely::example(|scope| {
45 /// (0..10).to_stream(scope)
46 /// .flat_map(|x| (0..x))
47 /// .inspect(|x| println!("seen: {:?}", x));
48 /// });
49 /// ```
50 fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(self, logic: L) -> StreamVec<S, I::Item> where I::Item: 'static;
51}
52
53impl<S: Scope, D: 'static> Map<S, D> for StreamVec<S, D> {
54 fn map_in_place<L: FnMut(&mut D)+'static>(self, mut logic: L) -> StreamVec<S, D> {
55 self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| {
56 input.for_each_time(|time, data| {
57 let mut session = output.session(&time);
58 for data in data {
59 for datum in data.iter_mut() { logic(datum); }
60 session.give_container(data);
61 }
62 })
63 })
64 }
65 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
66 // TODO : number of elements from the iterator. This would allow iterators that produce many
67 // TODO : records without taking arbitrarily long and arbitrarily much memory.
68 fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(self, logic: L) -> StreamVec<S, I::Item> where I::Item: 'static {
69 MapCore::flat_map(self, logic)
70 }
71}