1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
//! Methods which describe an operators topology, and the progress it makes.
use std::rc::Rc;
use std::cell::RefCell;
use crate::scheduling::Schedule;
use crate::progress::{Timestamp, ChangeBatch, Antichain};
/// Methods for describing an operators topology, and the progress it makes.
pub trait Operate<T: Timestamp> : Schedule {
/// Indicates if the operator is strictly local to this worker.
///
/// A parent scope must understand whether the progress information returned by the worker
/// reflects only this worker's progress, so that it knows whether to send and receive the
/// corresponding progress messages to its peers. If the operator is strictly local, it must
/// exchange this information, whereas if the operator is itself implemented by the same set
/// of workers, the parent scope understands that progress information already reflects the
/// aggregate information among the workers.
///
/// This is a coarse approximation to refined worker sets. In a future better world, operators
/// would explain how their implementations are partitioned, so that a parent scope knows what
/// progress information to exchange with which peers. Right now the two choices are either
/// "all" or "none", but it could be more detailed. In the more detailed case, this method
/// should / could return a pair (index, peers), indicating the group id of the worker out of
/// how many groups. This becomes complicated, as a full all-to-all exchange would result in
/// multiple copies of the same progress messages (but aggregated variously) arriving at
/// arbitrary times.
fn local(&self) -> bool { true }
/// The number of inputs.
fn inputs(&self) -> usize;
/// The number of outputs.
fn outputs(&self) -> usize;
/// Fetches summary information about internal structure of the operator.
///
/// Each operator must summarize its internal structure by a map from pairs `(input, output)`
/// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
/// be transformed to timestamps on any of its outputs.
///
/// Each operator must also indicate whether it initially holds any capabilities on any of its
/// outputs, so that the parent operator can properly initialize its progress information.
///
/// The default behavior is to indicate that timestamps on any input can emerge unchanged on
/// any output, and no initial capabilities are held.
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);
/// Signals that external frontiers have been set.
///
/// By default this method does nothing, and leaves all changes in the `frontiers` element
/// of the shared progress state. An operator should be able to consult `frontiers` at any
/// point and read out the current frontier information, or the changes from the last time
/// that `frontiers` was drained.
fn set_external_summary(&mut self) { }
/// Indicates of whether the operator requires `push_external_progress` information or not.
fn notify_me(&self) -> bool { true }
}
/// Progress information shared between parent and child.
#[derive(Debug)]
pub struct SharedProgress<T: Timestamp> {
/// Frontier capability changes reported by the parent scope.
pub frontiers: Vec<ChangeBatch<T>>,
/// Consumed message changes reported by the child operator.
pub consumeds: Vec<ChangeBatch<T>>,
/// Internal capability changes reported by the child operator.
pub internals: Vec<ChangeBatch<T>>,
/// Produced message changes reported by the child operator.
pub produceds: Vec<ChangeBatch<T>>,
}
impl<T: Timestamp> SharedProgress<T> {
/// Allocates a new shared progress structure.
pub fn new(inputs: usize, outputs: usize) -> Self {
SharedProgress {
frontiers: vec![ChangeBatch::new(); inputs],
consumeds: vec![ChangeBatch::new(); inputs],
internals: vec![ChangeBatch::new(); outputs],
produceds: vec![ChangeBatch::new(); outputs],
}
}
}