timely/dataflow/operators/core/
feedback.rs

1//! Create cycles in a timely dataflow graph.
2
3use crate::{Container, Data};
4use crate::container::CapacityContainerBuilder;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::channels::pushers::Tee;
7use crate::dataflow::operators::generic::OutputWrapper;
8use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
9use crate::dataflow::scopes::child::Iterative;
10use crate::dataflow::{StreamCore, Scope};
11use crate::order::Product;
12use crate::progress::frontier::Antichain;
13use crate::progress::{Timestamp, PathSummary};
14
15/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
16pub trait Feedback<G: Scope> {
17
18    /// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`.
19    ///
20    /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
21    /// its `Handle` passed as an argument. Containers passed through the stream will have their
22    /// timestamps advanced by `summary`.
23    ///
24    /// # Examples
25    /// ```
26    /// use timely::dataflow::Scope;
27    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
28    ///
29    /// timely::example(|scope| {
30    ///     // circulate 0..10 for 100 iterations.
31    ///     let (handle, cycle) = scope.feedback(1);
32    ///     (0..10).to_stream(scope)
33    ///            .concat(&cycle)
34    ///            .inspect(|x| println!("seen: {:?}", x))
35    ///            .branch_when(|t| t < &100).1
36    ///            .connect_loop(handle);
37    /// });
38    /// ```
39    fn feedback<C: Container + Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>);
40}
41
42/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
43pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
44    /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
45    ///
46    /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
47    /// its `Handle` passed as an argument. Containers passed through the stream will have their
48    /// timestamps advanced by `summary`.
49    ///
50    /// # Examples
51    /// ```
52    /// use timely::dataflow::Scope;
53    /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
54    ///
55    /// timely::example(|scope| {
56    ///     // circulate 0..10 for 100 iterations.
57    ///     scope.iterative::<usize,_,_>(|inner| {
58    ///         let (handle, cycle) = inner.loop_variable(1);
59    ///         (0..10).to_stream(inner)
60    ///                .concat(&cycle)
61    ///                .inspect(|x| println!("seen: {:?}", x))
62    ///                .branch_when(|t| t.inner < 100).1
63    ///                .connect_loop(handle);
64    ///     });
65    /// });
66    /// ```
67    fn loop_variable<C: Container + Data>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
68}
69
70impl<G: Scope> Feedback<G> for G {
71
72    fn feedback<C: Container + Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {
73
74        let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
75        builder.set_notify(false);
76        let (output, stream) = builder.new_output();
77
78        (Handle { builder, summary, output }, stream)
79    }
80}
81
82impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
83    fn loop_variable<C: Container + Data>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
84        self.feedback(Product::new(Default::default(), summary))
85    }
86}
87
88/// Connect a `Stream` to the input of a loop variable.
89pub trait ConnectLoop<G: Scope, C: Container + Data> {
90    /// Connect a `Stream` to be the input of a loop variable.
91    ///
92    /// # Examples
93    /// ```
94    /// use timely::dataflow::Scope;
95    /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
96    ///
97    /// timely::example(|scope| {
98    ///     // circulate 0..10 for 100 iterations.
99    ///     let (handle, cycle) = scope.feedback(1);
100    ///     (0..10).to_stream(scope)
101    ///            .concat(&cycle)
102    ///            .inspect(|x| println!("seen: {:?}", x))
103    ///            .branch_when(|t| t < &100).1
104    ///            .connect_loop(handle);
105    /// });
106    /// ```
107    fn connect_loop(&self, handle: Handle<G, C>);
108}
109
110impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C> {
111    fn connect_loop(&self, handle: Handle<G, C>) {
112
113        let mut builder = handle.builder;
114        let summary = handle.summary;
115        let mut output = handle.output;
116
117        let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
118
119        builder.build(move |_capability| move |_frontier| {
120            let mut output = output.activate();
121            input.for_each(|cap, data| {
122                if let Some(new_time) = summary.results_in(cap.time()) {
123                    let new_cap = cap.delayed(&new_time);
124                    output
125                        .session(&new_cap)
126                        .give_container(data);
127                }
128            });
129        });
130    }
131}
132
133/// A handle used to bind the source of a loop variable.
134#[derive(Debug)]
135pub struct Handle<G: Scope, C: Container + Data> {
136    builder: OperatorBuilder<G>,
137    summary: <G::Timestamp as Timestamp>::Summary,
138    output: OutputWrapper<G::Timestamp, CapacityContainerBuilder<C>, Tee<G::Timestamp, C>>,
139}