timely/dataflow/operators/core/
rc.rs1use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::Operator;
5use crate::dataflow::{Scope, Stream};
6use crate::Container;
7use std::rc::Rc;
8
9pub trait SharedStream<S: Scope, C> {
11 fn shared(self) -> Stream<S, Rc<C>>;
26}
27
28impl<S: Scope, C: Container> SharedStream<S, C> for Stream<S, C> {
29 fn shared(self) -> Stream<S, Rc<C>> {
30 self.unary(Pipeline, "Shared", move |_, _| {
31 move |input, output| {
32 input.for_each_time(|time, data| {
33 let mut session = output.session(&time);
34 for data in data {
35 session.give_container(&mut Rc::new(std::mem::take(data)));
36 }
37 });
38 }
39 })
40 }
41}
42
43#[cfg(test)]
44mod test {
45 use crate::dataflow::channels::pact::Pipeline;
46 use crate::dataflow::operators::capture::Extract;
47 use crate::dataflow::operators::rc::SharedStream;
48 use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
49
50 #[test]
51 fn test_shared() {
52 let output = crate::example(|scope| {
53 let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
54 scope
55 .concatenate([
56 shared.clone().unary(Pipeline, "read shared 1", |_, _| {
57 move |input, output| {
58 input.for_each_time(|time, data| {
59 output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
60 });
61 }
62 }),
63 shared.unary(Pipeline, "read shared 2", |_, _| {
64 move |input, output| {
65 input.for_each_time(|time, data| {
66 output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
67 });
68 }
69 }),
70 ])
71 .container::<Vec<_>>()
72 .capture()
73 });
74 let output = &mut output.extract()[0].1;
75 output.sort();
76 output.dedup();
77 assert_eq!(output.len(), 1);
78 }
79}