timely/progress/operate.rs
1//! Methods which describe an operators topology, and the progress it makes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::Schedule;
7use crate::progress::{Timestamp, ChangeBatch, Antichain};
8
9/// A dataflow operator that progress with a specific timestamp type.
10///
11/// This trait describes the methods necessary to present as a dataflow operator.
12/// This trait is a "builder" for operators, in that it reveals the structure of the operator
13/// and its requirements, but then (through `initialize`) consumes itself to produce a boxed
14/// schedulable object. At the moment of initialization, the values of the other methods are
15/// captured and frozen.
16pub trait Operate<T: Timestamp> {
17
18 /// Indicates if the operator is strictly local to this worker.
19 ///
20 /// A parent scope must understand whether the progress information returned by the worker
21 /// reflects only this worker's progress, so that it knows whether to send and receive the
22 /// corresponding progress messages to its peers. If the operator is strictly local, it must
23 /// exchange this information, whereas if the operator is itself implemented by the same set
24 /// of workers, the parent scope understands that progress information already reflects the
25 /// aggregate information among the workers.
26 ///
27 /// This is a coarse approximation to refined worker sets. In a future better world, operators
28 /// would explain how their implementations are partitioned, so that a parent scope knows what
29 /// progress information to exchange with which peers. Right now the two choices are either
30 /// "all" or "none", but it could be more detailed. In the more detailed case, this method
31 /// should / could return a pair (index, peers), indicating the group id of the worker out of
32 /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
33 /// multiple copies of the same progress messages (but aggregated variously) arriving at
34 /// arbitrary times.
35 fn local(&self) -> bool { true }
36
37 /// The number of inputs.
38 fn inputs(&self) -> usize;
39 /// The number of outputs.
40 fn outputs(&self) -> usize;
41
42 /// Initializes the operator, converting the operator builder to a schedulable object.
43 ///
44 /// In addition, initialization produces internal connectivity, and a shared progress conduit
45 /// which must contain any initial output capabilities the operator would like to hold.
46 ///
47 /// The internal connectivity summarizes the operator by a map from pairs `(input, output)`
48 /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
49 /// be transformed to timestamps on any of its outputs. The conservative and most common result
50 /// is full connectivity between all inputs and outputs, each with the identity summary.
51 ///
52 /// The shared progress object allows information to move between the host and the schedulable.
53 /// Importantly, it also indicates the initial internal capabilities for all of its outputs.
54 /// This must happen at this moment, as it is the only moment where an operator is allowed to
55 /// safely "create" capabilities without basing them on other, prior capabilities.
56 fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>);
57
58 /// Indicates for each input whether the operator should be invoked when that input's frontier changes.
59 ///
60 /// Returns a `Vec<FrontierInterest>` with one entry per input. Each entry describes whether
61 /// frontier changes on that input should cause the operator to be scheduled. The conservative
62 /// default is `Always` for each input.
63 fn notify_me(&self) -> &[FrontierInterest];// { &vec![FrontierInterest::Always; self.inputs()] }
64}
65
66/// The ways in which an operator can express interest in activation when an input frontier changes.
67#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
68pub enum FrontierInterest {
69 /// Never interested in frontier changes, as for example the `map()` and `filter()` operators.
70 Never,
71 /// Interested when the operator holds capabilities.
72 IfCapability,
73 /// Always interested in frontier changes, as for example the `probe()` and `capture()` operators.
74 Always,
75}
76
77/// Operator internal connectivity, from inputs to outputs.
78pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
79/// Internal connectivity from one port to any number of opposing ports.
80#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
81pub struct PortConnectivity<TS> {
82 tree: std::collections::BTreeMap<usize, Antichain<TS>>,
83}
84
85impl<TS> Default for PortConnectivity<TS> {
86 fn default() -> Self {
87 Self { tree: std::collections::BTreeMap::new() }
88 }
89}
90
91impl<TS> PortConnectivity<TS> {
92 /// Inserts an element by reference, ensuring that the index exists.
93 pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
94 self.tree.entry(index).or_default().insert(element)
95 }
96 /// Inserts an element by reference, ensuring that the index exists.
97 pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
98 self.tree.entry(index).or_default().insert_ref(element)
99 }
100 /// Introduces a summary for `port`. Panics if a summary already exists.
101 pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
102 if !summary.is_empty() {
103 let prior = self.tree.insert(port, summary);
104 assert!(prior.is_none());
105 }
106 else {
107 assert!(self.tree.remove(&port).is_none());
108 }
109 }
110 /// Borrowing iterator of port identifiers and antichains.
111 pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
112 self.tree.iter().map(|(o,p)| (*o, p))
113 }
114 /// Returns the associated path summary, if it exists.
115 pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
116 self.tree.get(&index)
117 }
118}
119
120impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
121 fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
122 Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
123 }
124}
125
126/// Progress information shared between parent and child.
127#[derive(Debug)]
128pub struct SharedProgress<T: Timestamp> {
129 /// Frontier capability changes reported by the parent scope.
130 pub frontiers: Vec<ChangeBatch<T>>,
131 /// Consumed message changes reported by the child operator.
132 pub consumeds: Vec<ChangeBatch<T>>,
133 /// Internal capability changes reported by the child operator.
134 pub internals: Vec<ChangeBatch<T>>,
135 /// Produced message changes reported by the child operator.
136 pub produceds: Vec<ChangeBatch<T>>,
137}
138
139impl<T: Timestamp> SharedProgress<T> {
140 /// Allocates a new shared progress structure.
141 pub fn new(inputs: usize, outputs: usize) -> Self {
142 SharedProgress {
143 frontiers: vec![ChangeBatch::new(); inputs],
144 consumeds: vec![ChangeBatch::new(); inputs],
145 internals: vec![ChangeBatch::new(); outputs],
146 produceds: vec![ChangeBatch::new(); outputs],
147 }
148 }
149}