1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//! Shared containers

use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::Operator;
use crate::dataflow::{Scope, StreamCore};
use crate::Container;
use std::rc::Rc;

/// Convert a stream into a stream of shared containers
pub trait SharedStream<S: Scope, C: Container> {
    /// Convert a stream into a stream of shared data
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect};
    /// use timely::dataflow::operators::rc::SharedStream;
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .shared()
    ///            .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn shared(&self) -> StreamCore<S, Rc<C>>;
}

impl<S: Scope, C: Container> SharedStream<S, C> for StreamCore<S, C> {
    fn shared(&self) -> StreamCore<S, Rc<C>> {
        self.unary(Pipeline, "Shared", move |_, _| {
            move |input, output| {
                input.for_each(|time, data| {
                    output
                        .session(&time)
                        .give_container(&mut Rc::new(std::mem::take(data)));
                });
            }
        })
    }
}

#[cfg(test)]
mod test {
    use crate::dataflow::channels::pact::Pipeline;
    use crate::dataflow::operators::capture::Extract;
    use crate::dataflow::operators::rc::SharedStream;
    use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};

    #[test]
    fn test_shared() {
        let output = crate::example(|scope| {
            let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
            scope
                .concatenate([
                    shared.unary(Pipeline, "read shared 1", |_, _| {
                        move |input, output| {
                            input.for_each(|time, data| {
                                output.session(&time).give(data.as_ptr() as usize);
                            });
                        }
                    }),
                    shared.unary(Pipeline, "read shared 2", |_, _| {
                        move |input, output| {
                            input.for_each(|time, data| {
                                output.session(&time).give(data.as_ptr() as usize);
                            });
                        }
                    }),
                ])
                .container::<Vec<_>>()
                .capture()
        });
        let output = &mut output.extract()[0].1;
        output.sort();
        output.dedup();
        assert_eq!(output.len(), 1);
    }
}