Struct timely::progress::reachability::Builder
source · pub struct Builder<T: Timestamp> {
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
pub edges: Vec<Vec<Vec<Target>>>,
pub shape: Vec<(usize, usize)>,
}
Expand description
A topology builder, which can summarize reachability along paths.
A Builder
takes descriptions of the nodes and edges in a graph, and compiles
a static summary of the minimal actions a timestamp must endure going from any
input or output port to a destination input port.
A graph is provides as (i) several indexed nodes, each with some number of input and output ports, and each with a summary of the internal paths connecting each input to each output, and (ii) a set of edges connecting output ports to input ports. Edges do not adjust timestamps; only nodes do this.
The resulting summary describes, for each origin port in the graph and destination input port, a set of incomparable path summaries, each describing what happens to a timestamp as it moves along the path. There may be multiple summaries for each part of origin and destination due to the fact that the actions on timestamps may not be totally ordered (e.g., “increment the timestamp” and “take the maximum of the timestamp and seven”).
§Examples
use timely::progress::frontier::Antichain;
use timely::progress::{Source, Target};
use timely::progress::reachability::Builder;
// allocate a new empty topology builder.
let mut builder = Builder::<usize>::new();
// Each node with one input connected to one output.
builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);
// Connect nodes in sequence, looping around to the first from the last.
builder.add_edge(Source::new(0, 0), Target::new(1, 0));
builder.add_edge(Source::new(1, 0), Target::new(2, 0));
builder.add_edge(Source::new(2, 0), Target::new(0, 0));
// Summarize reachability information.
let (tracker, _) = builder.build(None);
Fields§
§nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>
Internal connections within hosted operators.
Indexed by operator index, then input port, then output port. This is the
same format returned by get_internal_summary
, as if we simply appended
all of the summaries for the hosted nodes.
edges: Vec<Vec<Vec<Target>>>
Direct connections from sources to targets.
Edges do not affect timestamps, so we only need to know the connectivity. Indexed by operator index then output port.
shape: Vec<(usize, usize)>
Numbers of inputs and outputs for each node.
Implementations§
source§impl<T: Timestamp> Builder<T>
impl<T: Timestamp> Builder<T>
sourcepub fn add_node(
&mut self,
index: usize,
inputs: usize,
outputs: usize,
summary: Vec<Vec<Antichain<T::Summary>>>,
)
pub fn add_node( &mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>, )
Add links internal to operators.
This method overwrites any existing summary, instead of anything more sophisticated.
sourcepub fn add_edge(&mut self, source: Source, target: Target)
pub fn add_edge(&mut self, source: Source, target: Target)
Add links between operators.
This method does not check that the associated nodes and ports exist. References to
missing nodes or ports are discovered in build
.
sourcepub fn build(
self,
logger: Option<TrackerLogger>,
) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>)
pub fn build( self, logger: Option<TrackerLogger>, ) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>)
Compiles the current nodes and edges into immutable path summaries.
This method has the opportunity to perform some error checking that the path summaries are valid, including references to undefined nodes and ports, as well as self-loops with default summaries (a serious liveness issue).
The optional logger information is baked into the resulting tracker.
sourcepub fn is_acyclic(&self) -> bool
pub fn is_acyclic(&self) -> bool
Tests whether the graph a cycle of default path summaries.
Graphs containing cycles of default path summaries will most likely not work well with progress tracking, as a timestamp can result in itself. Such computations can still run, but one should not block on frontier information before yielding results, as you many never unblock.
§Examples
use timely::progress::frontier::Antichain;
use timely::progress::{Source, Target};
use timely::progress::reachability::Builder;
// allocate a new empty topology builder.
let mut builder = Builder::<usize>::new();
// Each node with one input connected to one output.
builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]);
// Connect nodes in sequence, looping around to the first from the last.
builder.add_edge(Source::new(0, 0), Target::new(1, 0));
builder.add_edge(Source::new(1, 0), Target::new(2, 0));
assert!(builder.is_acyclic());
builder.add_edge(Source::new(2, 0), Target::new(0, 0));
assert!(!builder.is_acyclic());
This test exists because it is possible to describe dataflow graphs that do not contain non-incrementing cycles, but without feedback nodes that strictly increment timestamps. For example,
use timely::progress::frontier::Antichain;
use timely::progress::{Source, Target};
use timely::progress::reachability::Builder;
// allocate a new empty topology builder.
let mut builder = Builder::<usize>::new();
// Two inputs and outputs, only one of which advances.
builder.add_node(0, 2, 2, vec![
vec![Antichain::from_elem(0),Antichain::new(),],
vec![Antichain::new(),Antichain::from_elem(1),],
]);
// Connect each output to the opposite input.
builder.add_edge(Source::new(0, 0), Target::new(0, 1));
builder.add_edge(Source::new(0, 1), Target::new(0, 0));
assert!(builder.is_acyclic());
Trait Implementations§
Auto Trait Implementations§
impl<T> Freeze for Builder<T>
impl<T> RefUnwindSafe for Builder<T>
impl<T> Send for Builder<T>
impl<T> Sync for Builder<T>
impl<T> Unpin for Builder<T>
impl<T> UnwindSafe for Builder<T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)