timely/dataflow/operators/core/
rc.rs1use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::Operator;
5use crate::dataflow::{Scope, StreamCore};
6use crate::Container;
7use std::rc::Rc;
8
9pub trait SharedStream<S: Scope, C> {
11 fn shared(&self) -> StreamCore<S, Rc<C>>;
25}
26
27impl<S: Scope, C: Container> SharedStream<S, C> for StreamCore<S, C> {
28 fn shared(&self) -> StreamCore<S, Rc<C>> {
29 self.unary(Pipeline, "Shared", move |_, _| {
30 move |input, output| {
31 input.for_each_time(|time, data| {
32 let mut session = output.session(&time);
33 for data in data {
34 session.give_container(&mut Rc::new(std::mem::take(data)));
35 }
36 });
37 }
38 })
39 }
40}
41
42#[cfg(test)]
43mod test {
44 use crate::dataflow::channels::pact::Pipeline;
45 use crate::dataflow::operators::capture::Extract;
46 use crate::dataflow::operators::rc::SharedStream;
47 use crate::dataflow::operators::{Capture, Concatenate, InspectCore, Operator, ToStream};
48
49 #[test]
50 fn test_shared() {
51 let output = crate::example(|scope| {
52 let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
53 let shared = shared.inspect_container(|x| println!("seen: {x:?}"));
54 scope
55 .concatenate([
56 shared.unary(Pipeline, "read shared 1", |_, _| {
57 move |input, output| {
58 input.for_each_time(|time, data| {
59 output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
60 });
61 }
62 }),
63 shared.unary(Pipeline, "read shared 2", |_, _| {
64 move |input, output| {
65 input.for_each_time(|time, data| {
66 output.session(&time).give_iterator(data.map(|d| d.as_ptr() as usize));
67 });
68 }
69 }),
70 ])
71 .container::<Vec<_>>()
72 .capture()
73 });
74 let output = &mut output.extract()[0].1;
75 output.sort();
76 output.dedup();
77 assert_eq!(output.len(), 1);
78 }
79}