timely/dataflow/operators/core/
rc.rs

1//! Shared containers
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::Operator;
5use crate::dataflow::{Scope, StreamCore};
6use crate::{Container, Data};
7use std::rc::Rc;
8
9/// Convert a stream into a stream of shared containers
10pub trait SharedStream<S: Scope, C: Container> {
11    /// Convert a stream into a stream of shared data
12    ///
13    /// # Examples
14    /// ```
15    /// use timely::dataflow::operators::{ToStream, Inspect};
16    /// use timely::dataflow::operators::rc::SharedStream;
17    ///
18    /// timely::example(|scope| {
19    ///     (0..10).to_stream(scope)
20    ///            .shared()
21    ///            .inspect(|x| println!("seen: {:?}", x));
22    /// });
23    /// ```
24    fn shared(&self) -> StreamCore<S, Rc<C>>;
25}
26
27impl<S: Scope, C: Container + Data> SharedStream<S, C> for StreamCore<S, C> {
28    fn shared(&self) -> StreamCore<S, Rc<C>> {
29        self.unary(Pipeline, "Shared", move |_, _| {
30            move |input, output| {
31                input.for_each(|time, data| {
32                    output
33                        .session(&time)
34                        .give_container(&mut Rc::new(std::mem::take(data)));
35                });
36            }
37        })
38    }
39}
40
41#[cfg(test)]
42mod test {
43    use crate::dataflow::channels::pact::Pipeline;
44    use crate::dataflow::operators::capture::Extract;
45    use crate::dataflow::operators::rc::SharedStream;
46    use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
47
48    #[test]
49    fn test_shared() {
50        let output = crate::example(|scope| {
51            let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
52            scope
53                .concatenate([
54                    shared.unary(Pipeline, "read shared 1", |_, _| {
55                        move |input, output| {
56                            input.for_each(|time, data| {
57                                output.session(&time).give(data.as_ptr() as usize);
58                            });
59                        }
60                    }),
61                    shared.unary(Pipeline, "read shared 2", |_, _| {
62                        move |input, output| {
63                            input.for_each(|time, data| {
64                                output.session(&time).give(data.as_ptr() as usize);
65                            });
66                        }
67                    }),
68                ])
69                .container::<Vec<_>>()
70                .capture()
71        });
72        let output = &mut output.extract()[0].1;
73        output.sort();
74        output.dedup();
75        assert_eq!(output.len(), 1);
76    }
77}