timely/progress/
operate.rs

1//! Methods which describe an operators topology, and the progress it makes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::Schedule;
7use crate::progress::{Timestamp, ChangeBatch, Antichain};
8
9/// Methods for describing an operators topology, and the progress it makes.
10pub trait Operate<T: Timestamp> : Schedule {
11
12    /// Indicates if the operator is strictly local to this worker.
13    ///
14    /// A parent scope must understand whether the progress information returned by the worker
15    /// reflects only this worker's progress, so that it knows whether to send and receive the
16    /// corresponding progress messages to its peers. If the operator is strictly local, it must
17    /// exchange this information, whereas if the operator is itself implemented by the same set
18    /// of workers, the parent scope understands that progress information already reflects the
19    /// aggregate information among the workers.
20    ///
21    /// This is a coarse approximation to refined worker sets. In a future better world, operators
22    /// would explain how their implementations are partitioned, so that a parent scope knows what
23    /// progress information to exchange with which peers. Right now the two choices are either
24    /// "all" or "none", but it could be more detailed. In the more detailed case, this method
25    /// should / could return a pair (index, peers), indicating the group id of the worker out of
26    /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
27    /// multiple copies of the same progress messages (but aggregated variously) arriving at
28    /// arbitrary times.
29    fn local(&self) -> bool { true }
30
31    /// The number of inputs.
32    fn inputs(&self) -> usize;
33    /// The number of outputs.
34    fn outputs(&self) -> usize;
35
36    /// Fetches summary information about internal structure of the operator.
37    ///
38    /// Each operator must summarize its internal structure by a map from pairs `(input, output)`
39    /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
40    /// be transformed to timestamps on any of its outputs.
41    ///
42    /// Each operator must also indicate whether it initially holds any capabilities on any of its
43    /// outputs, so that the parent operator can properly initialize its progress information.
44    ///
45    /// The default behavior is to indicate that timestamps on any input can emerge unchanged on
46    /// any output, and no initial capabilities are held.
47    fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>);
48
49    /// Signals that external frontiers have been set.
50    ///
51    /// By default this method does nothing, and leaves all changes in the `frontiers` element
52    /// of the shared progress state. An operator should be able to consult `frontiers` at any
53    /// point and read out the current frontier information, or the changes from the last time
54    /// that `frontiers` was drained.
55    fn set_external_summary(&mut self) { }
56
57    /// Indicates of whether the operator requires `push_external_progress` information or not.
58    fn notify_me(&self) -> bool { true }
59}
60
61/// Operator internal connectivity, from inputs to outputs.
62pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
63/// Internal connectivity from one port to any number of opposing ports.
64#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
65pub struct PortConnectivity<TS> {
66    tree: std::collections::BTreeMap<usize, Antichain<TS>>,
67}
68
69impl<TS> Default for PortConnectivity<TS> {
70    fn default() -> Self {
71        Self { tree: std::collections::BTreeMap::new() }
72    }
73}
74
75impl<TS> PortConnectivity<TS> {
76    /// Inserts an element by reference, ensuring that the index exists.
77    pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
78        self.tree.entry(index).or_default().insert(element)
79    }
80    /// Inserts an element by reference, ensuring that the index exists.
81    pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
82        self.tree.entry(index).or_default().insert_ref(element)
83    }
84    /// Introduces a summary for `port`. Panics if a summary already exists.
85    pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
86        if !summary.is_empty() {
87            let prior = self.tree.insert(port, summary);
88            assert!(prior.is_none());
89        }
90        else {
91            assert!(self.tree.remove(&port).is_none());
92        }
93    }
94    /// Borrowing iterator of port identifiers and antichains.
95    pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
96        self.tree.iter().map(|(o,p)| (*o, p))
97    }
98    /// Returns the associated path summary, if it exists.
99    pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
100        self.tree.get(&index)
101    }
102}
103
104impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
105    fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
106        Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
107    }
108}
109
110/// Progress information shared between parent and child.
111#[derive(Debug)]
112pub struct SharedProgress<T: Timestamp> {
113    /// Frontier capability changes reported by the parent scope.
114    pub frontiers: Vec<ChangeBatch<T>>,
115    /// Consumed message changes reported by the child operator.
116    pub consumeds: Vec<ChangeBatch<T>>,
117    /// Internal capability changes reported by the child operator.
118    pub internals: Vec<ChangeBatch<T>>,
119    /// Produced message changes reported by the child operator.
120    pub produceds: Vec<ChangeBatch<T>>,
121}
122
123impl<T: Timestamp> SharedProgress<T> {
124    /// Allocates a new shared progress structure.
125    pub fn new(inputs: usize, outputs: usize) -> Self {
126        SharedProgress {
127            frontiers: vec![ChangeBatch::new(); inputs],
128            consumeds: vec![ChangeBatch::new(); inputs],
129            internals: vec![ChangeBatch::new(); outputs],
130            produceds: vec![ChangeBatch::new(); outputs],
131        }
132    }
133}