1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//! Extension methods for `StreamCore` based on record-by-record transformation.

use crate::container::{Container, PushContainer, PushInto};
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::operator::Operator;

/// Extension trait for `Stream`.
pub trait Map<S: Scope, C: Container> {
    /// Consumes each element of the stream and yields a new element.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::ToStream;
    /// use timely::dataflow::operators::core::{Map, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .map(|x| x + 1)
    ///            .container::<Vec<_>>()
    ///            .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn map<C2, D2, L>(&self, mut logic: L) -> StreamCore<S, C2>
    where 
        C2: PushContainer, 
        D2: PushInto<C2>,
        L: FnMut(C::Item<'_>)->D2 + 'static,
    {
        self.flat_map(move |x| std::iter::once(logic(x)))
    }
    /// Consumes each element of the stream and yields some number of new elements.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::ToStream;
    /// use timely::dataflow::operators::core::{Map, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .flat_map(|x| (0..x))
    ///            .container::<Vec<_>>()
    ///            .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2> 
    where 
        C2: PushContainer,
        I: IntoIterator,
        I::Item: PushInto<C2>,
        L: FnMut(C::Item<'_>)->I + 'static,
    ;
}

impl<S: Scope, C: Container> Map<S, C> for StreamCore<S, C> {
    // TODO : This would be more robust if it captured an iterator and then pulled an appropriate
    // TODO : number of elements from the iterator. This would allow iterators that produce many
    // TODO : records without taking arbitrarily long and arbitrarily much memory.
    fn flat_map<C2, I, L>(&self, mut logic: L) -> StreamCore<S, C2> 
    where 
        C2: PushContainer,
        I: IntoIterator,
        I::Item: PushInto<C2>,
        L: FnMut(C::Item<'_>)->I + 'static,
    {
        let mut container = Default::default();
        self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| {
            input.for_each(|time, data| {
                data.swap(&mut container);
                output.session(&time).give_iterator(container.drain().flat_map(&mut logic));
            });
        })
    }
}