timely/dataflow/operators/core/feedback.rs
1//! Create cycles in a timely dataflow graph.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
6use crate::dataflow::scopes::child::Iterative;
7use crate::dataflow::{StreamCore, Scope};
8use crate::order::Product;
9use crate::progress::frontier::Antichain;
10use crate::progress::{Timestamp, PathSummary};
11
12/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
13pub trait Feedback<G: Scope> {
14
15 /// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`.
16 ///
17 /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
18 /// its `Handle` passed as an argument. Containers passed through the stream will have their
19 /// timestamps advanced by `summary`.
20 ///
21 /// # Examples
22 /// ```
23 /// use timely::dataflow::Scope;
24 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
25 ///
26 /// timely::example(|scope| {
27 /// // circulate 0..10 for 100 iterations.
28 /// let (handle, cycle) = scope.feedback(1);
29 /// (0..10).to_stream(scope)
30 /// .concat(&cycle)
31 /// .inspect(|x| println!("seen: {:?}", x))
32 /// .branch_when(|t| t < &100).1
33 /// .connect_loop(handle);
34 /// });
35 /// ```
36 fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>);
37}
38
39/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
40pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
41 /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
42 ///
43 /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
44 /// its `Handle` passed as an argument. Containers passed through the stream will have their
45 /// timestamps advanced by `summary`.
46 ///
47 /// # Examples
48 /// ```
49 /// use timely::dataflow::Scope;
50 /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
51 ///
52 /// timely::example(|scope| {
53 /// // circulate 0..10 for 100 iterations.
54 /// scope.iterative::<usize,_,_>(|inner| {
55 /// let (handle, cycle) = inner.loop_variable(1);
56 /// (0..10).to_stream(inner)
57 /// .concat(&cycle)
58 /// .inspect(|x| println!("seen: {:?}", x))
59 /// .branch_when(|t| t.inner < 100).1
60 /// .connect_loop(handle);
61 /// });
62 /// });
63 /// ```
64 fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
65}
66
67impl<G: Scope> Feedback<G> for G {
68
69 fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {
70
71 let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
72 builder.set_notify(false);
73 let (output, stream) = builder.new_output();
74
75 (Handle { builder, summary, output }, stream)
76 }
77}
78
79impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
80 fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
81 self.feedback(Product::new(Default::default(), summary))
82 }
83}
84
85/// Connect a `Stream` to the input of a loop variable.
86pub trait ConnectLoop<G: Scope, C: Container> {
87 /// Connect a `Stream` to be the input of a loop variable.
88 ///
89 /// # Examples
90 /// ```
91 /// use timely::dataflow::Scope;
92 /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
93 ///
94 /// timely::example(|scope| {
95 /// // circulate 0..10 for 100 iterations.
96 /// let (handle, cycle) = scope.feedback(1);
97 /// (0..10).to_stream(scope)
98 /// .concat(&cycle)
99 /// .inspect(|x| println!("seen: {:?}", x))
100 /// .branch_when(|t| t < &100).1
101 /// .connect_loop(handle);
102 /// });
103 /// ```
104 fn connect_loop(&self, handle: Handle<G, C>);
105}
106
107impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {
108 fn connect_loop(&self, handle: Handle<G, C>) {
109
110 let mut builder = handle.builder;
111 let summary = handle.summary;
112 let mut output = handle.output;
113
114 let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
115
116 builder.build(move |_capability| move |_frontier| {
117 let mut output = output.activate();
118 input.for_each(|cap, data| {
119 if let Some(new_time) = summary.results_in(cap.time()) {
120 let new_cap = cap.delayed(&new_time);
121 output.give(&new_cap, data);
122 }
123 });
124 });
125 }
126}
127
128/// A handle used to bind the source of a loop variable.
129#[derive(Debug)]
130pub struct Handle<G: Scope, C: Container> {
131 builder: OperatorBuilder<G>,
132 summary: <G::Timestamp as Timestamp>::Summary,
133 output: crate::dataflow::channels::pushers::Output<G::Timestamp, C>,
134}