timely/dataflow/operators/core/map.rs
1//! Extension methods for `StreamCore` based on record-by-record transformation.
2
3use crate::container::{Container, SizableContainer, PushInto};
4use crate::Data;
5use crate::dataflow::{Scope, StreamCore};
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::operator::Operator;
8
9/// Extension trait for `Stream`.
10pub trait Map<S: Scope, C: Container> {
11 /// Consumes each element of the stream and yields a new element.
12 ///
13 /// # Examples
14 /// ```
15 /// use timely::dataflow::operators::ToStream;
16 /// use timely::dataflow::operators::core::{Map, Inspect};
17 ///
18 /// timely::example(|scope| {
19 /// (0..10).to_stream(scope)
20 /// .map(|x| x + 1)
21 /// .container::<Vec<_>>()
22 /// .inspect(|x| println!("seen: {:?}", x));
23 /// });
24 /// ```
25 fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
26 where
27 C2: SizableContainer + PushInto<D2> + Data,
28 L: FnMut(C::Item<'_>)->D2 + 'static,
29 {
30 self.flat_map(move |x| std::iter::once(logic(x)))
31 }
32 /// Consumes each element of the stream and yields some number of new elements.
33 ///
34 /// # Examples
35 /// ```
36 /// use timely::dataflow::operators::ToStream;
37 /// use timely::dataflow::operators::core::{Map, Inspect};
38 ///
39 /// timely::example(|scope| {
40 /// (0..10).to_stream(scope)
41 /// .flat_map(|x| (0..x))
42 /// .container::<Vec<_>>()
43 /// .inspect(|x| println!("seen: {:?}", x));
44 /// });
45 /// ```
46 fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
47 where
48 I: IntoIterator,
49 C2: SizableContainer + PushInto<I::Item> + Data,
50 L: FnMut(C::Item<'_>)->I + 'static,
51 ;
52}
53
54impl<S: Scope, C: Container + Data> Map<S, C> for StreamCore<S, C> {
55 // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
56 // TODO : number of elements from the iterator. This would allow iterators that produce many
57 // TODO : records without taking arbitrarily long and arbitrarily much memory.
58 fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2>
59 where
60 I: IntoIterator,
61 C2: SizableContainer + PushInto<I::Item> + Data,
62 L: FnMut(C::Item<'_>)->I + 'static,
63 {
64 self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
65 input.for_each(|time, data| {
66 output.session(&time).give_iterator(data.drain().flat_map(&mut logic));
67 });
68 })
69 }
70}