Skip to main content

timely/dataflow/operators/core/
rc.rs

1//! Shared containers
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::progress::Timestamp;
5use crate::dataflow::operators::Operator;
6use crate::dataflow::Stream;
7use crate::Container;
8use std::rc::Rc;
9
10/// Convert a stream into a stream of shared containers
11pub trait SharedStream<'scope, T: Timestamp, C> {
12    /// Convert a stream into a stream of shared data
13    ///
14    /// # Examples
15    /// ```
16    /// use timely::dataflow::operators::{ToStream, InspectCore};
17    /// use timely::dataflow::operators::rc::SharedStream;
18    ///
19    /// timely::example(|scope| {
20    ///     (0..10).to_stream(scope)
21    ///            .container::<Vec<_>>()
22    ///            .shared()
23    ///            .inspect_container(|x| println!("seen: {:?}", x));
24    /// });
25    /// ```
26    fn shared(self) -> Stream<'scope, T, Rc<C>>;
27}
28
29impl<'scope, T: Timestamp, C: Container> SharedStream<'scope, T, C> for Stream<'scope, T, C> {
30    fn shared(self) -> Stream<'scope, T, Rc<C>> {
31        self.unary(Pipeline, "Shared", move |_, _| {
32            move |input, output| {
33                input.for_each_time(|time, data| {
34                    let mut session = output.session(&time);
35                    for data in data {
36                        session.give_container(&mut Rc::new(std::mem::take(data)));
37                    }
38                });
39            }
40        })
41    }
42}
43
44#[cfg(test)]
45mod test {
46    use crate::dataflow::channels::pact::Pipeline;
47    use crate::dataflow::operators::capture::Extract;
48    use crate::dataflow::operators::rc::SharedStream;
49    use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
50
51    #[test]
52    fn test_shared() {
53        let output = crate::example(|scope| {
54            let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
55            scope
56                .concatenate([
57                    shared.clone().unary(Pipeline, "read shared 1", |_, _| {
58                        move |input, output| {
59                            input.for_each_time(|time, data| {
60                                output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
61                            });
62                        }
63                    }),
64                    shared.unary(Pipeline, "read shared 2", |_, _| {
65                        move |input, output| {
66                            input.for_each_time(|time, data| {
67                                output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
68                            });
69                        }
70                    }),
71                ])
72                .container::<Vec<_>>()
73                .capture()
74        });
75        let output = &mut output.extract()[0].1;
76        output.sort();
77        output.dedup();
78        assert_eq!(output.len(), 1);
79    }
80}