timely/dataflow/operators/core/
rc.rs
1use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::Operator;
5use crate::dataflow::{Scope, StreamCore};
6use crate::{Container, Data};
7use std::rc::Rc;
8
9pub trait SharedStream<S: Scope, C: Container> {
11 fn shared(&self) -> StreamCore<S, Rc<C>>;
25}
26
27impl<S: Scope, C: Container + Data> SharedStream<S, C> for StreamCore<S, C> {
28 fn shared(&self) -> StreamCore<S, Rc<C>> {
29 self.unary(Pipeline, "Shared", move |_, _| {
30 move |input, output| {
31 input.for_each(|time, data| {
32 output
33 .session(&time)
34 .give_container(&mut Rc::new(std::mem::take(data)));
35 });
36 }
37 })
38 }
39}
40
41#[cfg(test)]
42mod test {
43 use crate::dataflow::channels::pact::Pipeline;
44 use crate::dataflow::operators::capture::Extract;
45 use crate::dataflow::operators::rc::SharedStream;
46 use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
47
48 #[test]
49 fn test_shared() {
50 let output = crate::example(|scope| {
51 let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
52 scope
53 .concatenate([
54 shared.unary(Pipeline, "read shared 1", |_, _| {
55 move |input, output| {
56 input.for_each(|time, data| {
57 output.session(&time).give(data.as_ptr() as usize);
58 });
59 }
60 }),
61 shared.unary(Pipeline, "read shared 2", |_, _| {
62 move |input, output| {
63 input.for_each(|time, data| {
64 output.session(&time).give(data.as_ptr() as usize);
65 });
66 }
67 }),
68 ])
69 .container::<Vec<_>>()
70 .capture()
71 });
72 let output = &mut output.extract()[0].1;
73 output.sort();
74 output.dedup();
75 assert_eq!(output.len(), 1);
76 }
77}