timely/progress/operate.rs
1//! Methods which describe an operators topology, and the progress it makes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::Schedule;
7use crate::progress::{Timestamp, ChangeBatch, Antichain};
8
9/// Methods for describing an operators topology, and the progress it makes.
10pub trait Operate<T: Timestamp> : Schedule {
11
12 /// Indicates if the operator is strictly local to this worker.
13 ///
14 /// A parent scope must understand whether the progress information returned by the worker
15 /// reflects only this worker's progress, so that it knows whether to send and receive the
16 /// corresponding progress messages to its peers. If the operator is strictly local, it must
17 /// exchange this information, whereas if the operator is itself implemented by the same set
18 /// of workers, the parent scope understands that progress information already reflects the
19 /// aggregate information among the workers.
20 ///
21 /// This is a coarse approximation to refined worker sets. In a future better world, operators
22 /// would explain how their implementations are partitioned, so that a parent scope knows what
23 /// progress information to exchange with which peers. Right now the two choices are either
24 /// "all" or "none", but it could be more detailed. In the more detailed case, this method
25 /// should / could return a pair (index, peers), indicating the group id of the worker out of
26 /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
27 /// multiple copies of the same progress messages (but aggregated variously) arriving at
28 /// arbitrary times.
29 fn local(&self) -> bool { true }
30
31 /// The number of inputs.
32 fn inputs(&self) -> usize;
33 /// The number of outputs.
34 fn outputs(&self) -> usize;
35
36 /// Fetches summary information about internal structure of the operator.
37 ///
38 /// Each operator must summarize its internal structure by a map from pairs `(input, output)`
39 /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
40 /// be transformed to timestamps on any of its outputs.
41 ///
42 /// Each operator must also indicate whether it initially holds any capabilities on any of its
43 /// outputs, so that the parent operator can properly initialize its progress information.
44 ///
45 /// The default behavior is to indicate that timestamps on any input can emerge unchanged on
46 /// any output, and no initial capabilities are held.
47 fn get_internal_summary(&mut self) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>);
48
49 /// Signals that external frontiers have been set.
50 ///
51 /// By default this method does nothing, and leaves all changes in the `frontiers` element
52 /// of the shared progress state. An operator should be able to consult `frontiers` at any
53 /// point and read out the current frontier information, or the changes from the last time
54 /// that `frontiers` was drained.
55 fn set_external_summary(&mut self) { }
56
57 /// Indicates of whether the operator requires `push_external_progress` information or not.
58 fn notify_me(&self) -> bool { true }
59}
60
61/// Operator internal connectivity, from inputs to outputs.
62pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
63/// Internal connectivity from one port to any number of opposing ports.
64#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
65pub struct PortConnectivity<TS> {
66 tree: std::collections::BTreeMap<usize, Antichain<TS>>,
67}
68
69impl<TS> Default for PortConnectivity<TS> {
70 fn default() -> Self {
71 Self { tree: std::collections::BTreeMap::new() }
72 }
73}
74
75impl<TS> PortConnectivity<TS> {
76 /// Inserts an element by reference, ensuring that the index exists.
77 pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
78 self.tree.entry(index).or_default().insert(element)
79 }
80 /// Inserts an element by reference, ensuring that the index exists.
81 pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
82 self.tree.entry(index).or_default().insert_ref(element)
83 }
84 /// Introduces a summary for `port`. Panics if a summary already exists.
85 pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
86 if !summary.is_empty() {
87 let prior = self.tree.insert(port, summary);
88 assert!(prior.is_none());
89 }
90 else {
91 assert!(self.tree.remove(&port).is_none());
92 }
93 }
94 /// Borrowing iterator of port identifiers and antichains.
95 pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
96 self.tree.iter().map(|(o,p)| (*o, p))
97 }
98 /// Returns the associated path summary, if it exists.
99 pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
100 self.tree.get(&index)
101 }
102}
103
104impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
105 fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
106 Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
107 }
108}
109
110/// Progress information shared between parent and child.
111#[derive(Debug)]
112pub struct SharedProgress<T: Timestamp> {
113 /// Frontier capability changes reported by the parent scope.
114 pub frontiers: Vec<ChangeBatch<T>>,
115 /// Consumed message changes reported by the child operator.
116 pub consumeds: Vec<ChangeBatch<T>>,
117 /// Internal capability changes reported by the child operator.
118 pub internals: Vec<ChangeBatch<T>>,
119 /// Produced message changes reported by the child operator.
120 pub produceds: Vec<ChangeBatch<T>>,
121}
122
123impl<T: Timestamp> SharedProgress<T> {
124 /// Allocates a new shared progress structure.
125 pub fn new(inputs: usize, outputs: usize) -> Self {
126 SharedProgress {
127 frontiers: vec![ChangeBatch::new(); inputs],
128 consumeds: vec![ChangeBatch::new(); inputs],
129 internals: vec![ChangeBatch::new(); outputs],
130 produceds: vec![ChangeBatch::new(); outputs],
131 }
132 }
133}