timely/dataflow/operators/core/feedback.rs
1//! Create cycles in a timely dataflow graph.
2
3use crate::{Container, Data};
4use crate::container::CapacityContainerBuilder;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::channels::pushers::Tee;
7use crate::dataflow::operators::generic::OutputWrapper;
8use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
9use crate::dataflow::scopes::child::Iterative;
10use crate::dataflow::{StreamCore, Scope};
11use crate::order::Product;
12use crate::progress::frontier::Antichain;
13use crate::progress::{Timestamp, PathSummary};
14
15/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
16pub trait Feedback<G: Scope> {
17
18 /// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`.
19 ///
20 /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
21 /// its `Handle` passed as an argument. Containers passed through the stream will have their
22 /// timestamps advanced by `summary`.
23 ///
24 /// # Examples
25 /// ```
26 /// use timely::dataflow::Scope;
27 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
28 ///
29 /// timely::example(|scope| {
30 /// // circulate 0..10 for 100 iterations.
31 /// let (handle, cycle) = scope.feedback(1);
32 /// (0..10).to_stream(scope)
33 /// .concat(&cycle)
34 /// .inspect(|x| println!("seen: {:?}", x))
35 /// .branch_when(|t| t < &100).1
36 /// .connect_loop(handle);
37 /// });
38 /// ```
39 fn feedback<C: Container + Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>);
40}
41
42/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
43pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
44 /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
45 ///
46 /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
47 /// its `Handle` passed as an argument. Containers passed through the stream will have their
48 /// timestamps advanced by `summary`.
49 ///
50 /// # Examples
51 /// ```
52 /// use timely::dataflow::Scope;
53 /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
54 ///
55 /// timely::example(|scope| {
56 /// // circulate 0..10 for 100 iterations.
57 /// scope.iterative::<usize,_,_>(|inner| {
58 /// let (handle, cycle) = inner.loop_variable(1);
59 /// (0..10).to_stream(inner)
60 /// .concat(&cycle)
61 /// .inspect(|x| println!("seen: {:?}", x))
62 /// .branch_when(|t| t.inner < 100).1
63 /// .connect_loop(handle);
64 /// });
65 /// });
66 /// ```
67 fn loop_variable<C: Container + Data>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
68}
69
70impl<G: Scope> Feedback<G> for G {
71
72 fn feedback<C: Container + Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {
73
74 let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
75 let (output, stream) = builder.new_output();
76
77 (Handle { builder, summary, output }, stream)
78 }
79}
80
81impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
82 fn loop_variable<C: Container + Data>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
83 self.feedback(Product::new(Default::default(), summary))
84 }
85}
86
87/// Connect a `Stream` to the input of a loop variable.
88pub trait ConnectLoop<G: Scope, C: Container + Data> {
89 /// Connect a `Stream` to be the input of a loop variable.
90 ///
91 /// # Examples
92 /// ```
93 /// use timely::dataflow::Scope;
94 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
95 ///
96 /// timely::example(|scope| {
97 /// // circulate 0..10 for 100 iterations.
98 /// let (handle, cycle) = scope.feedback(1);
99 /// (0..10).to_stream(scope)
100 /// .concat(&cycle)
101 /// .inspect(|x| println!("seen: {:?}", x))
102 /// .branch_when(|t| t < &100).1
103 /// .connect_loop(handle);
104 /// });
105 /// ```
106 fn connect_loop(&self, handle: Handle<G, C>);
107}
108
109impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
110 fn connect_loop(&self, handle: Handle<G, C>) {
111
112 let mut builder = handle.builder;
113 let summary = handle.summary;
114 let mut output = handle.output;
115
116 let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
117
118 builder.build(move |_capability| move |_frontier| {
119 let mut output = output.activate();
120 input.for_each(|cap, data| {
121 if let Some(new_time) = summary.results_in(cap.time()) {
122 let new_cap = cap.delayed(&new_time);
123 output
124 .session(&new_cap)
125 .give_container(data);
126 }
127 });
128 });
129 }
130}
131
132/// A handle used to bind the source of a loop variable.
133#[derive(Debug)]
134pub struct Handle<G: Scope, C: Container + Data> {
135 builder: OperatorBuilder<G>,
136 summary: <G::Timestamp as Timestamp>::Summary,
137 output: OutputWrapper<G::Timestamp, CapacityContainerBuilder<C>, Tee<G::Timestamp, C>>,
138}