timely/dataflow/operators/core/
feedback.rs

1//! Create cycles in a timely dataflow graph.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
6use crate::dataflow::scopes::child::Iterative;
7use crate::dataflow::{StreamCore, Scope};
8use crate::order::Product;
9use crate::progress::frontier::Antichain;
10use crate::progress::{Timestamp, PathSummary};
11
12/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
13pub trait Feedback<G: Scope> {
14
15    /// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`.
16    ///
17    /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
18    /// its `Handle` passed as an argument. Containers passed through the stream will have their
19    /// timestamps advanced by `summary`.
20    ///
21    /// # Examples
22    /// ```
23    /// use timely::dataflow::Scope;
24    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
25    ///
26    /// timely::example(|scope| {
27    ///     // circulate 0..10 for 100 iterations.
28    ///     let (handle, cycle) = scope.feedback(1);
29    ///     (0..10).to_stream(scope)
30    ///            .concat(&cycle)
31    ///            .inspect(|x| println!("seen: {:?}", x))
32    ///            .branch_when(|t| t < &100).1
33    ///            .connect_loop(handle);
34    /// });
35    /// ```
36    fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>);
37}
38
39/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
40pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
41    /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
42    ///
43    /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
44    /// its `Handle` passed as an argument. Containers passed through the stream will have their
45    /// timestamps advanced by `summary`.
46    ///
47    /// # Examples
48    /// ```
49    /// use timely::dataflow::Scope;
50    /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
51    ///
52    /// timely::example(|scope| {
53    ///     // circulate 0..10 for 100 iterations.
54    ///     scope.iterative::<usize,_,_>(|inner| {
55    ///         let (handle, cycle) = inner.loop_variable(1);
56    ///         (0..10).to_stream(inner)
57    ///                .concat(&cycle)
58    ///                .inspect(|x| println!("seen: {:?}", x))
59    ///                .branch_when(|t| t.inner < 100).1
60    ///                .connect_loop(handle);
61    ///     });
62    /// });
63    /// ```
64    fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
65}
66
67impl<G: Scope> Feedback<G> for G {
68
69    fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {
70
71        let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
72        builder.set_notify(false);
73        let (output, stream) = builder.new_output();
74
75        (Handle { builder, summary, output }, stream)
76    }
77}
78
79impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
80    fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
81        self.feedback(Product::new(Default::default(), summary))
82    }
83}
84
85/// Connect a `Stream` to the input of a loop variable.
86pub trait ConnectLoop<G: Scope, C: Container> {
87    /// Connect a `Stream` to be the input of a loop variable.
88    ///
89    /// # Examples
90    /// ```
91    /// use timely::dataflow::Scope;
92    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
93    ///
94    /// timely::example(|scope| {
95    ///     // circulate 0..10 for 100 iterations.
96    ///     let (handle, cycle) = scope.feedback(1);
97    ///     (0..10).to_stream(scope)
98    ///            .concat(&cycle)
99    ///            .inspect(|x| println!("seen: {:?}", x))
100    ///            .branch_when(|t| t < &100).1
101    ///            .connect_loop(handle);
102    /// });
103    /// ```
104    fn connect_loop(&self, handle: Handle<G, C>);
105}
106
107impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {
108    fn connect_loop(&self, handle: Handle<G, C>) {
109
110        let mut builder = handle.builder;
111        let summary = handle.summary;
112        let mut output = handle.output;
113
114        let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
115
116        builder.build(move |_capability| move |_frontier| {
117            let mut output = output.activate();
118            input.for_each(|cap, data| {
119                if let Some(new_time) = summary.results_in(cap.time()) {
120                    let new_cap = cap.delayed(&new_time);
121                    output.give(&new_cap, data);
122                }
123            });
124        });
125    }
126}
127
128/// A handle used to bind the source of a loop variable.
129#[derive(Debug)]
130pub struct Handle<G: Scope, C: Container> {
131    builder: OperatorBuilder<G>,
132    summary: <G::Timestamp as Timestamp>::Summary,
133    output: crate::dataflow::channels::pushers::Output<G::Timestamp, C>,
134}