Skip to main content

timely/dataflow/operators/core/
feedback.rs

1//! Create cycles in a timely dataflow graph.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
6use crate::dataflow::scopes::child::Iterative;
7use crate::dataflow::{Stream, Scope};
8use crate::order::Product;
9use crate::progress::frontier::Antichain;
10use crate::progress::{Timestamp, PathSummary};
11
12/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
13pub trait Feedback<G: Scope> {
14
15    /// Creates a [Stream] and a [Handle] to later bind the source of that `Stream`.
16    ///
17    /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
18    /// its `Handle` passed as an argument. Containers passed through the stream will have their
19    /// timestamps advanced by `summary`.
20    ///
21    /// # Examples
22    /// ```
23    /// use timely::dataflow::Scope;
24    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect};
25    /// use timely::dataflow::operators::vec::BranchWhen;
26    ///
27    /// timely::example(|scope| {
28    ///     // circulate 0..10 for 100 iterations.
29    ///     let (handle, cycle) = scope.feedback(1);
30    ///     (0..10).to_stream(scope)
31    ///            .container::<Vec<_>>()
32    ///            .concat(cycle)
33    ///            .inspect(|x| println!("seen: {:?}", x))
34    ///            .branch_when(|t| t < &100).1
35    ///            .connect_loop(handle);
36    /// });
37    /// ```
38    fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, Stream<G, C>);
39}
40
41/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
42pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
43    /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
44    ///
45    /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
46    /// its `Handle` passed as an argument. Containers passed through the stream will have their
47    /// timestamps advanced by `summary`.
48    ///
49    /// # Examples
50    /// ```
51    /// use timely::dataflow::Scope;
52    /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect};
53    /// use timely::dataflow::operators::vec::BranchWhen;
54    ///
55    /// timely::example(|scope| {
56    ///     // circulate 0..10 for 100 iterations.
57    ///     scope.iterative::<usize,_,_>(|inner| {
58    ///         let (handle, cycle) = inner.loop_variable(1);
59    ///         (0..10).to_stream(inner)
60    ///                .container::<Vec<_>>()
61    ///                .concat(cycle)
62    ///                .inspect(|x| println!("seen: {:?}", x))
63    ///                .branch_when(|t| t.inner < 100).1
64    ///                .connect_loop(handle);
65    ///     });
66    /// });
67    /// ```
68    fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, Stream<Iterative<'a, G, T>, C>);
69}
70
71impl<G: Scope> Feedback<G> for G {
72
73    fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, Stream<G, C>) {
74
75        let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
76        builder.set_notify(false);
77        let (output, stream) = builder.new_output();
78
79        (Handle { builder, summary, output }, stream)
80    }
81}
82
83impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
84    fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, Stream<Iterative<'a, G, T>, C>) {
85        self.feedback(Product::new(Default::default(), summary))
86    }
87}
88
89/// Connect a `Stream` to the input of a loop variable.
90pub trait ConnectLoop<G: Scope, C: Container> {
91    /// Connect a `Stream` to be the input of a loop variable.
92    ///
93    /// # Examples
94    /// ```
95    /// use timely::dataflow::Scope;
96    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect};
97    /// use timely::dataflow::operators::vec::BranchWhen;
98    ///
99    /// timely::example(|scope| {
100    ///     // circulate 0..10 for 100 iterations.
101    ///     let (handle, cycle) = scope.feedback(1);
102    ///     (0..10).to_stream(scope)
103    ///            .container::<Vec<_>>()
104    ///            .concat(cycle)
105    ///            .inspect(|x| println!("seen: {:?}", x))
106    ///            .branch_when(|t| t < &100).1
107    ///            .connect_loop(handle);
108    /// });
109    /// ```
110    fn connect_loop(self, handle: Handle<G, C>);
111}
112
113impl<G: Scope, C: Container> ConnectLoop<G, C> for Stream<G, C> {
114    fn connect_loop(self, handle: Handle<G, C>) {
115
116        let mut builder = handle.builder;
117        let summary = handle.summary;
118        let mut output = handle.output;
119
120        let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
121
122        builder.build(move |_capability| move |_frontier| {
123            let mut output = output.activate();
124            input.for_each(|cap, data| {
125                if let Some(new_time) = summary.results_in(cap.time()) {
126                    let new_cap = cap.delayed(&new_time, output.output_index());
127                    output.give(&new_cap, data);
128                }
129            });
130        });
131    }
132}
133
134/// A handle used to bind the source of a loop variable.
135#[derive(Debug)]
136pub struct Handle<G: Scope, C: Container> {
137    builder: OperatorBuilder<G>,
138    summary: <G::Timestamp as Timestamp>::Summary,
139    output: crate::dataflow::channels::pushers::Output<G::Timestamp, C>,
140}