timely/progress/
mod.rs

1//! Progress tracking mechanisms to support notification in timely dataflow
2
3use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5pub use self::operate::Operate;
6pub use self::subgraph::{Subgraph, SubgraphBuilder};
7pub use self::timestamp::{Timestamp, PathSummary};
8pub use self::change_batch::ChangeBatch;
9pub use self::frontier::Antichain;
10
11pub mod change_batch;
12pub mod frontier;
13pub mod timestamp;
14pub mod operate;
15pub mod broadcast;
16pub mod reachability;
17pub mod subgraph;
18
19/// A timely dataflow location.
20#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize, Columnar)]
21pub struct Location {
22    /// A scope-local operator identifier.
23    pub node: usize,
24    /// An operator port identifier.
25    pub port: Port,
26}
27
28impl Location {
29    /// Creates a new target location (operator input or scope output).
30    pub fn new_target(node: usize, port: usize) -> Location {
31        Location { node, port: Port::Target(port) }
32    }
33    /// Creates a new source location (operator output or scope input).
34    pub fn new_source(node: usize, port: usize) -> Location {
35        Location { node, port: Port::Source(port) }
36    }
37    /// If the location is a target.
38    pub fn is_target(&self) -> bool { matches!(self.port, Port::Target(_)) }
39    /// If the location is a source.
40    pub fn is_source(&self) -> bool { matches!(self.port, Port::Source(_)) }
41}
42
43impl From<Target> for Location {
44    fn from(target: Target) -> Self {
45        Location {
46            node: target.node,
47            port: Port::Target(target.port),
48        }
49    }
50}
51
52impl From<Source> for Location {
53    fn from(source: Source) -> Self {
54        Location {
55            node: source.node,
56            port: Port::Source(source.port),
57        }
58    }
59}
60
61/// An operator port.
62#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize, Columnar)]
63pub enum Port {
64    /// An operator input.
65    Target(usize),
66    /// An operator output.
67    Source(usize),
68}
69
70/// Names a source of a data stream.
71///
72/// A source of data is either a child output, or an input from a parent.
73/// Conventionally, `index` zero is used for parent input.
74#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
75pub struct Source {
76    /// Index of the source operator.
77    pub node: usize,
78    /// Number of the output port from the operator.
79    pub port: usize,
80}
81
82impl Source {
83    /// Creates a new source from node and port identifiers.
84    pub fn new(node: usize, port: usize) -> Self {
85        Self { node, port }
86    }
87}
88
89/// Names a target of a data stream.
90///
91/// A target of data is either a child input, or an output to a parent.
92/// Conventionally, `index` zero is used for parent output.
93#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
94pub struct Target {
95    /// Index of the target operator.
96    pub node: usize,
97    /// Number of the input port to the operator.
98    pub port: usize,
99}
100
101impl Target {
102    /// Creates a new target from node and port identifiers.
103    pub fn new(node: usize, port: usize) -> Self {
104        Self { node, port }
105    }
106}