timely/dataflow/operators/core/
rc.rs1use crate::dataflow::channels::pact::Pipeline;
4use crate::progress::Timestamp;
5use crate::dataflow::operators::Operator;
6use crate::dataflow::Stream;
7use crate::Container;
8use std::rc::Rc;
9
10pub trait SharedStream<'scope, T: Timestamp, C> {
12 fn shared(self) -> Stream<'scope, T, Rc<C>>;
27}
28
29impl<'scope, T: Timestamp, C: Container> SharedStream<'scope, T, C> for Stream<'scope, T, C> {
30 fn shared(self) -> Stream<'scope, T, Rc<C>> {
31 self.unary(Pipeline, "Shared", move |_, _| {
32 move |input, output| {
33 input.for_each_time(|time, data| {
34 let mut session = output.session(&time);
35 for data in data {
36 session.give_container(&mut Rc::new(std::mem::take(data)));
37 }
38 });
39 }
40 })
41 }
42}
43
44#[cfg(test)]
45mod test {
46 use crate::dataflow::channels::pact::Pipeline;
47 use crate::dataflow::operators::capture::Extract;
48 use crate::dataflow::operators::rc::SharedStream;
49 use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
50
51 #[test]
52 fn test_shared() {
53 let output = crate::example(|scope| {
54 let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
55 scope
56 .concatenate([
57 shared.clone().unary(Pipeline, "read shared 1", |_, _| {
58 move |input, output| {
59 input.for_each_time(|time, data| {
60 output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
61 });
62 }
63 }),
64 shared.unary(Pipeline, "read shared 2", |_, _| {
65 move |input, output| {
66 input.for_each_time(|time, data| {
67 output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
68 });
69 }
70 }),
71 ])
72 .container::<Vec<_>>()
73 .capture()
74 });
75 let output = &mut output.extract()[0].1;
76 output.sort();
77 output.dedup();
78 assert_eq!(output.len(), 1);
79 }
80}