Skip to main content

timely/progress/
operate.rs

1//! Methods which describe an operators topology, and the progress it makes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::Schedule;
7use crate::progress::{Timestamp, ChangeBatch, Antichain};
8
9/// A dataflow operator that progress with a specific timestamp type.
10///
11/// This trait describes the methods necessary to present as a dataflow operator.
12/// This trait is a "builder" for operators, in that it reveals the structure of the operator
13/// and its requirements, but then (through `initialize`) consumes itself to produce a boxed
14/// schedulable object. At the moment of initialization, the values of the other methods are
15/// captured and frozen.
16pub trait Operate<T: Timestamp> {
17
18    /// Indicates if the operator is strictly local to this worker.
19    ///
20    /// A parent scope must understand whether the progress information returned by the worker
21    /// reflects only this worker's progress, so that it knows whether to send and receive the
22    /// corresponding progress messages to its peers. If the operator is strictly local, it must
23    /// exchange this information, whereas if the operator is itself implemented by the same set
24    /// of workers, the parent scope understands that progress information already reflects the
25    /// aggregate information among the workers.
26    ///
27    /// This is a coarse approximation to refined worker sets. In a future better world, operators
28    /// would explain how their implementations are partitioned, so that a parent scope knows what
29    /// progress information to exchange with which peers. Right now the two choices are either
30    /// "all" or "none", but it could be more detailed. In the more detailed case, this method
31    /// should / could return a pair (index, peers), indicating the group id of the worker out of
32    /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
33    /// multiple copies of the same progress messages (but aggregated variously) arriving at
34    /// arbitrary times.
35    fn local(&self) -> bool { true }
36
37    /// The number of inputs.
38    fn inputs(&self) -> usize;
39    /// The number of outputs.
40    fn outputs(&self) -> usize;
41
42    /// Initializes the operator, converting the operator builder to a schedulable object.
43    ///
44    /// In addition, initialization produces internal connectivity, and a shared progress conduit
45    /// which must contain any initial output capabilities the operator would like to hold.
46    ///
47    /// The internal connectivity summarizes the operator by a map from pairs `(input, output)`
48    /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
49    /// be transformed to timestamps on any of its outputs. The conservative and most common result
50    /// is full connectivity between all inputs and outputs, each with the identity summary.
51    ///
52    /// The shared progress object allows information to move between the host and the schedulable.
53    /// Importantly, it also indicates the initial internal capabilities for all of its outputs.
54    /// This must happen at this moment, as it is the only moment where an operator is allowed to
55    /// safely "create" capabilities without basing them on other, prior capabilities.
56    fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>);
57
58    /// Indicates for each input whether the operator should be invoked when that input's frontier changes.
59    ///
60    /// Returns a `Vec<FrontierInterest>` with one entry per input. Each entry describes whether
61    /// frontier changes on that input should cause the operator to be scheduled. The conservative
62    /// default is `Always` for each input.
63    fn notify_me(&self) -> &[FrontierInterest];// { &vec![FrontierInterest::Always; self.inputs()] }
64}
65
66/// The ways in which an operator can express interest in activation when an input frontier changes.
67#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
68pub enum FrontierInterest {
69    /// Never interested in frontier changes, as for example the `map()` and `filter()` operators.
70    Never,
71    /// Interested when the operator holds capabilities.
72    IfCapability,
73    /// Always interested in frontier changes, as for example the `probe()` and `capture()` operators.
74    Always,
75}
76
77/// Operator internal connectivity, from inputs to outputs.
78pub type Connectivity<TS> = Vec<PortConnectivity<TS>>;
79/// Internal connectivity from one port to any number of opposing ports.
80#[derive(serde::Serialize, serde::Deserialize, columnar::Columnar, Debug, Clone, Eq, PartialEq)]
81pub struct PortConnectivity<TS> {
82    tree: std::collections::BTreeMap<usize, Antichain<TS>>,
83}
84
85impl<TS> Default for PortConnectivity<TS> {
86    fn default() -> Self {
87        Self { tree: std::collections::BTreeMap::new() }
88    }
89}
90
91impl<TS> PortConnectivity<TS> {
92    /// Inserts an element by reference, ensuring that the index exists.
93    pub fn insert(&mut self, index: usize, element: TS) -> bool where TS : crate::PartialOrder {
94        self.tree.entry(index).or_default().insert(element)
95    }
96    /// Inserts an element by reference, ensuring that the index exists.
97    pub fn insert_ref(&mut self, index: usize, element: &TS) -> bool where TS : crate::PartialOrder + Clone {
98        self.tree.entry(index).or_default().insert_ref(element)
99    }
100    /// Introduces a summary for `port`. Panics if a summary already exists.
101    pub fn add_port(&mut self, port: usize, summary: Antichain<TS>) {
102        if !summary.is_empty() {
103            let prior = self.tree.insert(port, summary);
104            assert!(prior.is_none());
105        }
106        else {
107            assert!(self.tree.remove(&port).is_none());
108        }
109    }
110    /// Borrowing iterator of port identifiers and antichains.
111    pub fn iter_ports(&self) -> impl Iterator<Item = (usize, &Antichain<TS>)> {
112        self.tree.iter().map(|(o,p)| (*o, p))
113    }
114    /// Returns the associated path summary, if it exists.
115    pub fn get(&self, index: usize) -> Option<&Antichain<TS>> {
116        self.tree.get(&index)
117    }
118}
119
120impl<TS> FromIterator<(usize, Antichain<TS>)> for PortConnectivity<TS> {
121    fn from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = (usize, Antichain<TS>)> {
122        Self { tree: iter.into_iter().filter(|(_,p)| !p.is_empty()).collect() }
123    }
124}
125
126/// Progress information shared between parent and child.
127#[derive(Debug)]
128pub struct SharedProgress<T: Timestamp> {
129    /// Frontier capability changes reported by the parent scope.
130    pub frontiers: Vec<ChangeBatch<T>>,
131    /// Consumed message changes reported by the child operator.
132    pub consumeds: Vec<ChangeBatch<T>>,
133    /// Internal capability changes reported by the child operator.
134    pub internals: Vec<ChangeBatch<T>>,
135    /// Produced message changes reported by the child operator.
136    pub produceds: Vec<ChangeBatch<T>>,
137}
138
139impl<T: Timestamp> SharedProgress<T> {
140    /// Allocates a new shared progress structure.
141    pub fn new(inputs: usize, outputs: usize) -> Self {
142        SharedProgress {
143            frontiers: vec![ChangeBatch::new(); inputs],
144            consumeds: vec![ChangeBatch::new(); inputs],
145            internals: vec![ChangeBatch::new(); outputs],
146            produceds: vec![ChangeBatch::new(); outputs],
147        }
148    }
149}