timely/dataflow/operators/
map.rs

1//! Extension methods for `Stream` based on record-by-record transformation.
2
3use crate::Data;
4use crate::dataflow::{Stream, Scope};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::operators::core::{Map as MapCore};
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, D: Data> {
11    /// Consumes each element of the stream and yields a new element.
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
16    ///
17    /// timely::example(|scope| {
18    ///     (0..10).to_stream(scope)
19    ///            .map(|x| x + 1)
20    ///            .inspect(|x| println!("seen: {:?}", x));
21    /// });
22    /// ```
23    fn map<D2: Data, L: FnMut(D)->D2+'static>(&self, mut logic: L) -> Stream<S, D2> {
24        self.flat_map(move |x| std::iter::once(logic(x)))
25    }
26    /// Updates each element of the stream and yields the element, re-using memory where possible.
27    ///
28    /// # Examples
29    /// ```
30    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
31    ///
32    /// timely::example(|scope| {
33    ///     (0..10).to_stream(scope)
34    ///            .map_in_place(|x| *x += 1)
35    ///            .inspect(|x| println!("seen: {:?}", x));
36    /// });
37    /// ```
38    fn map_in_place<L: FnMut(&mut D)+'static>(&self, logic: L) -> Stream<S, D>;
39    /// Consumes each element of the stream and yields some number of new elements.
40    ///
41    /// # Examples
42    /// ```
43    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
44    ///
45    /// timely::example(|scope| {
46    ///     (0..10).to_stream(scope)
47    ///            .flat_map(|x| (0..x))
48    ///            .inspect(|x| println!("seen: {:?}", x));
49    /// });
50    /// ```
51    fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data;
52}
53
54impl<S: Scope, D: Data> Map<S, D> for Stream<S, D> {
55    fn map_in_place<L: FnMut(&mut D)+'static>(&self, mut logic: L) -> Stream<S, D> {
56        self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| {
57            input.for_each(|time, data| {
58                for datum in data.iter_mut() { logic(datum); }
59                output.session(&time).give_container(data);
60            })
61        })
62    }
63    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
64    // TODO : number of elements from the iterator. This would allow iterators that produce many
65    // TODO : records without taking arbitrarily long and arbitrarily much memory.
66    fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(&self, logic: L) -> Stream<S, I::Item> where I::Item: Data {
67        MapCore::flat_map(self, logic)
68    }
69}