Skip to main content

timely/dataflow/operators/vec/
map.rs

1//! Extension methods for `StreamVec` based on record-by-record transformation.
2
3use crate::dataflow::{StreamVec, Scope};
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::operators::generic::operator::Operator;
6use crate::dataflow::operators::core::{Map as MapCore};
7
8/// Extension trait for `StreamVec`.
9pub trait Map<S: Scope, D: 'static> : Sized {
10    /// Consumes each element of the stream and yields a new element.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
15    ///
16    /// timely::example(|scope| {
17    ///     (0..10).to_stream(scope)
18    ///            .map(|x| x + 1)
19    ///            .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn map<D2: 'static, L: FnMut(D)->D2+'static>(self, mut logic: L) -> StreamVec<S, D2> {
23        self.flat_map(move |x| std::iter::once(logic(x)))
24    }
25    /// Updates each element of the stream and yields the element, re-using memory where possible.
26    ///
27    /// # Examples
28    /// ```
29    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
30    ///
31    /// timely::example(|scope| {
32    ///     (0..10).to_stream(scope)
33    ///            .map_in_place(|x| *x += 1)
34    ///            .inspect(|x| println!("seen: {:?}", x));
35    /// });
36    /// ```
37    fn map_in_place<L: FnMut(&mut D)+'static>(self, logic: L) -> StreamVec<S, D>;
38    /// Consumes each element of the stream and yields some number of new elements.
39    ///
40    /// # Examples
41    /// ```
42    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map};
43    ///
44    /// timely::example(|scope| {
45    ///     (0..10).to_stream(scope)
46    ///            .flat_map(|x| (0..x))
47    ///            .inspect(|x| println!("seen: {:?}", x));
48    /// });
49    /// ```
50    fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(self, logic: L) -> StreamVec<S, I::Item> where I::Item: 'static;
51}
52
53impl<S: Scope, D: 'static> Map<S, D> for StreamVec<S, D> {
54    fn map_in_place<L: FnMut(&mut D)+'static>(self, mut logic: L) -> StreamVec<S, D> {
55        self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| {
56            input.for_each_time(|time, data| {
57                let mut session = output.session(&time);
58                for data in data {
59                    for datum in data.iter_mut() { logic(datum); }
60                    session.give_container(data);
61                }
62            })
63        })
64    }
65    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
66    // TODO : number of elements from the iterator. This would allow iterators that produce many
67    // TODO : records without taking arbitrarily long and arbitrarily much memory.
68    fn flat_map<I: IntoIterator, L: FnMut(D)->I+'static>(self, logic: L) -> StreamVec<S, I::Item> where I::Item: 'static {
69        MapCore::flat_map(self, logic)
70    }
71}