Struct timely::progress::reachability::Builder

source ·
pub struct Builder<T: Timestamp> {
    pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
    pub edges: Vec<Vec<Vec<Target>>>,
    pub shape: Vec<(usize, usize)>,
}
Expand description

A topology builder, which can summarize reachability along paths.

A Builder takes descriptions of the nodes and edges in a graph, and compiles a static summary of the minimal actions a timestamp must endure going from any input or output port to a destination input port.

A graph is provides as (i) several indexed nodes, each with some number of input and output ports, and each with a summary of the internal paths connecting each input to each output, and (ii) a set of edges connecting output ports to input ports. Edges do not adjust timestamps; only nodes do this.

The resulting summary describes, for each origin port in the graph and destination input port, a set of incomparable path summaries, each describing what happens to a timestamp as it moves along the path. There may be multiple summaries for each part of origin and destination due to the fact that the actions on timestamps may not be totally ordered (e.g., “increment the timestamp” and “take the maximum of the timestamp and seven”).

§Examples

use timely::progress::frontier::Antichain;
use timely::progress::{Source, Target};
use timely::progress::reachability::Builder;

// allocate a new empty topology builder.
let mut builder = Builder::<usize>::new();

// Each node with one input connected to one output.
builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(1)]]);

// Connect nodes in sequence, looping around to the first from the last.
builder.add_edge(Source::new(0, 0), Target::new(1, 0));
builder.add_edge(Source::new(1, 0), Target::new(2, 0));
builder.add_edge(Source::new(2, 0), Target::new(0, 0));

// Summarize reachability information.
let (tracker, _) = builder.build(None);

Fields§

§nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>

Internal connections within hosted operators.

Indexed by operator index, then input port, then output port. This is the same format returned by get_internal_summary, as if we simply appended all of the summaries for the hosted nodes.

§edges: Vec<Vec<Vec<Target>>>

Direct connections from sources to targets.

Edges do not affect timestamps, so we only need to know the connectivity. Indexed by operator index then output port.

§shape: Vec<(usize, usize)>

Numbers of inputs and outputs for each node.

Implementations§

source§

impl<T: Timestamp> Builder<T>

source

pub fn new() -> Self

Create a new empty topology builder.

source

pub fn add_node( &mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>, )

Add links internal to operators.

This method overwrites any existing summary, instead of anything more sophisticated.

source

pub fn add_edge(&mut self, source: Source, target: Target)

Add links between operators.

This method does not check that the associated nodes and ports exist. References to missing nodes or ports are discovered in build.

source

pub fn build( self, logger: Option<TrackerLogger>, ) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>)

Compiles the current nodes and edges into immutable path summaries.

This method has the opportunity to perform some error checking that the path summaries are valid, including references to undefined nodes and ports, as well as self-loops with default summaries (a serious liveness issue).

The optional logger information is baked into the resulting tracker.

source

pub fn is_acyclic(&self) -> bool

Tests whether the graph a cycle of default path summaries.

Graphs containing cycles of default path summaries will most likely not work well with progress tracking, as a timestamp can result in itself. Such computations can still run, but one should not block on frontier information before yielding results, as you many never unblock.

§Examples
use timely::progress::frontier::Antichain;
use timely::progress::{Source, Target};
use timely::progress::reachability::Builder;

// allocate a new empty topology builder.
let mut builder = Builder::<usize>::new();

// Each node with one input connected to one output.
builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(1, 1, 1, vec![vec![Antichain::from_elem(0)]]);
builder.add_node(2, 1, 1, vec![vec![Antichain::from_elem(0)]]);

// Connect nodes in sequence, looping around to the first from the last.
builder.add_edge(Source::new(0, 0), Target::new(1, 0));
builder.add_edge(Source::new(1, 0), Target::new(2, 0));

assert!(builder.is_acyclic());

builder.add_edge(Source::new(2, 0), Target::new(0, 0));

assert!(!builder.is_acyclic());

This test exists because it is possible to describe dataflow graphs that do not contain non-incrementing cycles, but without feedback nodes that strictly increment timestamps. For example,

use timely::progress::frontier::Antichain;
use timely::progress::{Source, Target};
use timely::progress::reachability::Builder;

// allocate a new empty topology builder.
let mut builder = Builder::<usize>::new();

// Two inputs and outputs, only one of which advances.
builder.add_node(0, 2, 2, vec![
    vec![Antichain::from_elem(0),Antichain::new(),],
    vec![Antichain::new(),Antichain::from_elem(1),],
]);

// Connect each output to the opposite input.
builder.add_edge(Source::new(0, 0), Target::new(0, 1));
builder.add_edge(Source::new(0, 1), Target::new(0, 0));

assert!(builder.is_acyclic());

Trait Implementations§

source§

impl<T: Clone + Timestamp> Clone for Builder<T>
where T::Summary: Clone,

source§

fn clone(&self) -> Builder<T>

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<T: Debug + Timestamp> Debug for Builder<T>
where T::Summary: Debug,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T: Timestamp> Default for Builder<T>

source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Builder<T>

§

impl<T> RefUnwindSafe for Builder<T>

§

impl<T> Send for Builder<T>
where <T as Timestamp>::Summary: Send,

§

impl<T> Sync for Builder<T>
where <T as Timestamp>::Summary: Sync,

§

impl<T> Unpin for Builder<T>
where <T as Timestamp>::Summary: Unpin,

§

impl<T> UnwindSafe for Builder<T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

default unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> CopyAs<T> for T

source§

fn copy_as(self) -> T

source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ProgressEventTimestamp for T
where T: Data + Debug + Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more
source§

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> Data for T
where T: Clone + 'static,