Skip to main content

timely/dataflow/operators/core/
rc.rs

1//! Shared containers
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::Operator;
5use crate::dataflow::{Scope, Stream};
6use crate::Container;
7use std::rc::Rc;
8
9/// Convert a stream into a stream of shared containers
10pub trait SharedStream<S: Scope, C> {
11    /// Convert a stream into a stream of shared data
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::{ToStream, InspectCore};
16    /// use timely::dataflow::operators::rc::SharedStream;
17    ///
18    /// timely::example(|scope| {
19    ///     (0..10).to_stream(scope)
20    ///            .container::<Vec<_>>()
21    ///            .shared()
22    ///            .inspect_container(|x| println!("seen: {:?}", x));
23    /// });
24    /// ```
25    fn shared(self) -> Stream<S, Rc<C>>;
26}
27
28impl<S: Scope, C: Container> SharedStream<S, C> for Stream<S, C> {
29    fn shared(self) -> Stream<S, Rc<C>> {
30        self.unary(Pipeline, "Shared", move |_, _| {
31            move |input, output| {
32                input.for_each_time(|time, data| {
33                    let mut session = output.session(&time);
34                    for data in data {
35                        session.give_container(&mut Rc::new(std::mem::take(data)));
36                    }
37                });
38            }
39        })
40    }
41}
42
43#[cfg(test)]
44mod test {
45    use crate::dataflow::channels::pact::Pipeline;
46    use crate::dataflow::operators::capture::Extract;
47    use crate::dataflow::operators::rc::SharedStream;
48    use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
49
50    #[test]
51    fn test_shared() {
52        let output = crate::example(|scope| {
53            let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
54            scope
55                .concatenate([
56                    shared.clone().unary(Pipeline, "read shared 1", |_, _| {
57                        move |input, output| {
58                            input.for_each_time(|time, data| {
59                                output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
60                            });
61                        }
62                    }),
63                    shared.unary(Pipeline, "read shared 2", |_, _| {
64                        move |input, output| {
65                            input.for_each_time(|time, data| {
66                                output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
67                            });
68                        }
69                    }),
70                ])
71                .container::<Vec<_>>()
72                .capture()
73        });
74        let output = &mut output.extract()[0].1;
75        output.sort();
76        output.dedup();
77        assert_eq!(output.len(), 1);
78    }
79}