Skip to main content

timely/progress/
reachability.rs

1//! Manages pointstamp reachability within a timely dataflow graph.
2//!
3//! Timely dataflow is concerned with understanding and communicating the potential
4//! for capabilities to reach nodes in a directed graph, by following paths through
5//! the graph (along edges and through nodes). This module contains one abstraction
6//! for managing this information.
7//!
8//! # Examples
9//!
10//! ```rust
11//! use timely::progress::{Location, Port};
12//! use timely::progress::frontier::Antichain;
13//! use timely::progress::{Source, Target};
14//! use timely::progress::reachability::{Builder, Tracker};
15//!
16//! // allocate a new empty topology builder.
17//! let mut builder = Builder::<usize>::new();
18//!
19//! // Each node with one input connected to one output.
20//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
21//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
22//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
23//!
24//! // Connect nodes in sequence, looping around to the first from the last.
25//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
26//! builder.add_edge(Source::new(1, 0), Target::new(2, 0));
27//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
28//!
29//! // Construct a reachability tracker.
30//! let (mut tracker, _) = builder.build(None);
31//!
32//! // Introduce a pointstamp at the output of the first node.
33//! tracker.update_source(Source::new(0, 0), 17, 1);
34//!
35//! // Propagate changes; until this call updates are simply buffered.
36//! tracker.propagate_all();
37//!
38//! let mut results =
39//! tracker
40//!     .pushed()
41//!     .0
42//!     .drain()
43//!     .filter(|((location, time), delta)| location.is_target())
44//!     .collect::<Vec<_>>();
45//!
46//! results.sort();
47//!
48//! println!("{:?}", results);
49//!
50//! assert_eq!(results.len(), 3);
51//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), 1));
52//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), 1));
53//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1));
54//!
55//! // Introduce a pointstamp at the output of the first node.
56//! tracker.update_source(Source::new(0, 0), 17, -1);
57//!
58//! // Propagate changes; until this call updates are simply buffered.
59//! tracker.propagate_all();
60//!
61//! let mut results =
62//! tracker
63//!     .pushed()
64//!     .0
65//!     .drain()
66//!     .filter(|((location, time), delta)| location.is_target())
67//!     .collect::<Vec<_>>();
68//!
69//! results.sort();
70//!
71//! assert_eq!(results.len(), 3);
72//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), -1));
73//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), -1));
74//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), -1));
75//! ```
76
77use std::collections::{BinaryHeap, HashMap, VecDeque};
78use std::cmp::Reverse;
79
80use columnar::{Vecs, Index as ColumnarIndex};
81
82use crate::progress::Timestamp;
83use crate::progress::{Source, Target};
84use crate::progress::ChangeBatch;
85use crate::progress::{Location, Port};
86use crate::progress::operate::{Connectivity, PortConnectivity};
87use crate::progress::frontier::MutableAntichain;
88use crate::progress::timestamp::PathSummary;
89
90/// Build a `Vecs<Vecs<Vec<S>>>` from nested iterators.
91///
92/// The outer iterator yields nodes, each node yields ports, each port yields data items.
93fn build_nested_vecs<S>(nodes: impl Iterator<Item = impl Iterator<Item = impl Iterator<Item = S>>>) -> Vecs<Vecs<Vec<S>>> {
94    let mut result: Vecs<Vecs<Vec<S>>> = Default::default();
95    for node in nodes {
96        for port in node {
97            result.values.push_iter(port);
98        }
99        result.bounds.push(result.values.bounds.len() as u64);
100    }
101    result
102}
103
104/// A topology builder, which can summarize reachability along paths.
105///
106/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
107/// a static summary of the minimal actions a timestamp must endure going from any
108/// input or output port to a destination input port.
109///
110/// A graph is provides as (i) several indexed nodes, each with some number of input
111/// and output ports, and each with a summary of the internal paths connecting each
112/// input to each output, and (ii) a set of edges connecting output ports to input
113/// ports. Edges do not adjust timestamps; only nodes do this.
114///
115/// The resulting summary describes, for each origin port in the graph and destination
116/// input port, a set of incomparable path summaries, each describing what happens to
117/// a timestamp as it moves along the path. There may be multiple summaries for each
118/// part of origin and destination due to the fact that the actions on timestamps may
119/// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of
120/// the timestamp and seven").
121///
122/// # Examples
123///
124/// ```rust
125/// use timely::progress::frontier::Antichain;
126/// use timely::progress::{Source, Target};
127/// use timely::progress::reachability::Builder;
128///
129/// // allocate a new empty topology builder.
130/// let mut builder = Builder::<usize>::new();
131///
132/// // Each node with one input connected to one output.
133/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
134/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
135/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
136///
137/// // Connect nodes in sequence, looping around to the first from the last.
138/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
139/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
140/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
141///
142/// // Summarize reachability information.
143/// let (tracker, _) = builder.build(None);
144/// ```
145#[derive(Clone, Debug)]
146pub struct Builder<T: Timestamp> {
147    /// Internal connections within hosted operators.
148    ///
149    /// Indexed by operator index, then input port, then output port. This is the
150    /// same format returned by `initialize`, as if we simply appended
151    /// all of the summaries for the hosted nodes.
152    pub nodes: Vec<Connectivity<T::Summary>>,
153    /// Direct connections from sources to targets.
154    ///
155    /// Edges do not affect timestamps, so we only need to know the connectivity.
156    /// Indexed by operator index then output port.
157    pub edges: Vec<Vec<Vec<Target>>>,
158    /// Numbers of inputs and outputs for each node.
159    pub shape: Vec<(usize, usize)>,
160}
161
162impl<T: Timestamp> Builder<T> {
163
164    /// Create a new empty topology builder.
165    pub fn new() -> Self {
166        Builder {
167            nodes: Vec::new(),
168            edges: Vec::new(),
169            shape: Vec::new(),
170        }
171    }
172
173    /// Add links internal to operators.
174    ///
175    /// This method overwrites any existing summary, instead of anything more sophisticated.
176    pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
177
178        // Assert that all summaries exist.
179        debug_assert_eq!(inputs, summary.len());
180        debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)));
181
182        while self.nodes.len() <= index {
183            self.nodes.push(Vec::new());
184            self.edges.push(Vec::new());
185            self.shape.push((0, 0));
186        }
187
188        self.nodes[index] = summary;
189        if self.edges[index].len() != outputs {
190            self.edges[index] = vec![Vec::new(); outputs];
191        }
192        self.shape[index] = (inputs, outputs);
193    }
194
195    /// Add links between operators.
196    ///
197    /// This method does not check that the associated nodes and ports exist. References to
198    /// missing nodes or ports are discovered in `build`.
199    pub fn add_edge(&mut self, source: Source, target: Target) {
200
201        // Assert that the edge is between existing ports.
202        debug_assert!(source.port < self.shape[source.node].1);
203        debug_assert!(target.port < self.shape[target.node].0);
204
205        self.edges[source.node][source.port].push(target);
206    }
207
208    /// Compiles the current nodes and edges into immutable path summaries.
209    ///
210    /// This method has the opportunity to perform some error checking that the path summaries
211    /// are valid, including references to undefined nodes and ports, as well as self-loops with
212    /// default summaries (a serious liveness issue).
213    ///
214    /// The optional logger information is baked into the resulting tracker.
215    pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
216
217        if !self.is_acyclic() {
218            println!("Cycle detected without timestamp increment");
219            println!("{:?}", self);
220        }
221
222        Tracker::allocate_from(self, logger)
223    }
224
225    /// Tests whether the graph includes a cycle of default path summaries.
226    ///
227    /// Graphs containing cycles of default path summaries will most likely
228    /// not work well with progress tracking, as a timestamp can result in
229    /// itself. Such computations can still *run*, but one should not block
230    /// on frontier information before yielding results, as you many never
231    /// unblock.
232    ///
233    /// # Examples
234    ///
235    /// ```rust
236    /// use timely::progress::frontier::Antichain;
237    /// use timely::progress::{Source, Target};
238    /// use timely::progress::reachability::Builder;
239    ///
240    /// // allocate a new empty topology builder.
241    /// let mut builder = Builder::<usize>::new();
242    ///
243    /// // Each node with one input connected to one output.
244    /// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
245    /// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
246    /// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
247    ///
248    /// // Connect nodes in sequence, looping around to the first from the last.
249    /// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
250    /// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
251    ///
252    /// assert!(builder.is_acyclic());
253    ///
254    /// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
255    ///
256    /// assert!(!builder.is_acyclic());
257    /// ```
258    ///
259    /// This test exists because it is possible to describe dataflow graphs that
260    /// do not contain non-incrementing cycles, but without feedback nodes that
261    /// strictly increment timestamps. For example,
262    ///
263    /// ```rust
264    /// use timely::progress::frontier::Antichain;
265    /// use timely::progress::{Source, Target};
266    /// use timely::progress::reachability::Builder;
267    ///
268    /// // allocate a new empty topology builder.
269    /// let mut builder = Builder::<usize>::new();
270    ///
271    /// // Two inputs and outputs, only one of which advances.
272    /// builder.add_node(0, 2, 2, vec![
273    ///     [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(),
274    ///     [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(),
275    /// ]);
276    ///
277    /// // Connect each output to the opposite input.
278    /// builder.add_edge(Source::new(0, 0), Target::new(0, 1));
279    /// builder.add_edge(Source::new(0, 1), Target::new(0, 0));
280    ///
281    /// assert!(builder.is_acyclic());
282    /// ```
283    pub fn is_acyclic(&self) -> bool {
284
285        let locations = self.shape.iter().map(|(targets, sources)| targets + sources).sum();
286        let mut in_degree = HashMap::with_capacity(locations);
287
288        // Load edges as default summaries.
289        for (index, ports) in self.edges.iter().enumerate() {
290            for (output, targets) in ports.iter().enumerate() {
291                let source = Location::new_source(index, output);
292                in_degree.entry(source).or_insert(0);
293                for &target in targets.iter() {
294                    let target = Location::from(target);
295                    *in_degree.entry(target).or_insert(0) += 1;
296                }
297            }
298        }
299
300        // Load default intra-node summaries.
301        for (index, summary) in self.nodes.iter().enumerate() {
302            for (input, outputs) in summary.iter().enumerate() {
303                let target = Location::new_target(index, input);
304                in_degree.entry(target).or_insert(0);
305                for (output, summaries) in outputs.iter_ports() {
306                    let source = Location::new_source(index, output);
307                    for summary in summaries.elements().iter() {
308                        if summary == &Default::default() {
309                            *in_degree.entry(source).or_insert(0) += 1;
310                        }
311                    }
312                }
313            }
314        }
315
316        // A worklist of nodes that cannot be reached from the whole graph.
317        // Initially this list contains observed locations with no incoming
318        // edges, but as the algorithm develops we add to it any locations
319        // that can only be reached by nodes that have been on this list.
320        let mut worklist = Vec::with_capacity(in_degree.len());
321        for (key, val) in in_degree.iter() {
322            if *val == 0 {
323                worklist.push(*key);
324            }
325        }
326        in_degree.retain(|_key, val| val != &0);
327
328        // Repeatedly remove nodes and update adjacent in-edges.
329        while let Some(Location { node, port }) = worklist.pop() {
330            match port {
331                Port::Source(port) => {
332                    for target in self.edges[node][port].iter() {
333                        let target = Location::from(*target);
334                        *in_degree.get_mut(&target).unwrap() -= 1;
335                        if in_degree[&target] == 0 {
336                            in_degree.remove(&target);
337                            worklist.push(target);
338                        }
339                    }
340                },
341                Port::Target(port) => {
342                    for (output, summaries) in self.nodes[node][port].iter_ports() {
343                        let source = Location::new_source(node, output);
344                        for summary in summaries.elements().iter() {
345                            if summary == &Default::default() {
346                                *in_degree.get_mut(&source).unwrap() -= 1;
347                                if in_degree[&source] == 0 {
348                                    in_degree.remove(&source);
349                                    worklist.push(source);
350                                }
351                            }
352                        }
353                    }
354                },
355            }
356        }
357
358        // Acyclic graphs should reduce to empty collections.
359        in_degree.is_empty()
360    }
361}
362
363impl<T: Timestamp> Default for Builder<T> {
364    fn default() -> Self {
365        Self::new()
366    }
367}
368
369/// An interactive tracker of propagated reachability information.
370///
371/// A `Tracker` tracks, for a fixed graph topology, the implications of
372/// pointstamp changes at various node input and output ports. These changes may
373/// alter the potential pointstamps that could arrive at downstream input ports.
374pub struct Tracker<T:Timestamp> {
375
376    /// Internal operator connectivity, columnar form of `Vec<Vec<PortConnectivity<T::Summary>>>`.
377    /// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs.
378    nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
379    /// Edge connectivity, columnar form of `Vec<Vec<Vec<Target>>>`.
380    /// Indexed by `(node, output_port)` to yield target slices.
381    edges: Vecs<Vecs<Vec<Target>>>,
382
383    /// Summaries from each target (operator input) to scope outputs.
384    /// Indexed by `(node, target_port)` to yield `(scope_output, summary)` pairs.
385    target_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
386    /// Summaries from each source (operator output) to scope outputs.
387    /// Indexed by `(node, source_port)` to yield `(scope_output, summary)` pairs.
388    source_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
389
390    /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
391    /// rather than their multiplicities. We separately track the frontiers resulting from propagated
392    /// frontiers, to protect them from transient negativity in inbound target updates.
393    per_operator: Vec<PerOperator<T>>,
394
395    /// Source and target changes are buffered, which allows us to delay processing until propagation,
396    /// and so consolidate updates, but to leap directly to those frontiers that may have changed.
397    target_changes: ChangeBatch<(Target, T)>,
398    source_changes: ChangeBatch<(Source, T)>,
399
400    /// Worklist of updates to perform, ordered by increasing timestamp and target.
401    worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
402
403    /// Buffer of consequent changes.
404    pushed_changes: ChangeBatch<(Location, T)>,
405
406    /// Compiled summaries from each internal location (not scope inputs) to each scope output.
407    output_changes: Vec<ChangeBatch<T>>,
408
409    /// A non-negative sum of post-filtration input changes.
410    ///
411    /// This sum should be zero exactly when the accumulated input changes are zero,
412    /// indicating that the progress tracker is currently tracking nothing. It should
413    /// always be exactly equal to the sum across all operators of the frontier sizes
414    /// of the target and source `pointstamps` member.
415    total_counts: i64,
416
417    /// Optionally, a unique logging identifier and logging for tracking events.
418    logger: Option<logging::TrackerLogger<T>>,
419}
420
421/// Target and source information for each operator.
422pub struct PerOperator<T: Timestamp> {
423    /// Port information for each target.
424    pub targets: Vec<PortInformation<T>>,
425    /// Port information for each source.
426    pub sources: Vec<PortInformation<T>>,
427    /// Sum across outputs of capabilities.
428    pub cap_counts: i64,
429}
430
431impl<T: Timestamp> PerOperator<T> {
432    /// A new PerOperator bundle from numbers of input and output ports.
433    pub fn new(inputs: usize, outputs: usize) -> Self {
434        PerOperator {
435            targets: vec![PortInformation::new(); inputs],
436            sources: vec![PortInformation::new(); outputs],
437            cap_counts: 0,
438        }
439    }
440}
441
442/// Per-port progress-tracking information.
443#[derive(Clone)]
444pub struct PortInformation<T: Timestamp> {
445    /// Current counts of active pointstamps.
446    pub pointstamps: MutableAntichain<T>,
447    /// Current implications of active pointstamps across the dataflow.
448    pub implications: MutableAntichain<T>,
449}
450
451impl<T: Timestamp> PortInformation<T> {
452    /// Creates empty port information.
453    pub fn new() -> Self {
454        PortInformation {
455            pointstamps: MutableAntichain::new(),
456            implications: MutableAntichain::new(),
457        }
458    }
459
460    /// Returns `true` if updates at this pointstamp uniquely block progress.
461    ///
462    /// This method returns `true` if the currently maintained pointstamp
463    /// counts are such that zeroing out outstanding updates at *this*
464    /// pointstamp would change the frontiers at this operator. When the
465    /// method returns `false` it means that, temporarily at least, there
466    /// are outstanding pointstamp updates that are strictly less than
467    /// this pointstamp.
468    #[inline]
469    pub fn is_global(&self, time: &T) -> bool {
470        let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
471        let redundant = self.implications.count_for(time) > 1;
472        !dominated && !redundant
473    }
474}
475
476impl<T: Timestamp> Default for PortInformation<T> {
477    fn default() -> Self {
478        Self::new()
479    }
480}
481
482impl<T:Timestamp> Tracker<T> {
483
484    /// Updates the count for a time at a location.
485    #[inline]
486    pub fn update(&mut self, location: Location, time: T, value: i64) {
487        match location.port {
488            Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
489            Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
490        };
491    }
492
493    /// Updates the count for a time at a target (operator input, scope output).
494    #[inline]
495    pub fn update_target(&mut self, target: Target, time: T, value: i64) {
496        self.target_changes.update((target, time), value);
497    }
498    /// Updates the count for a time at a source (operator output, scope input).
499    #[inline]
500    pub fn update_source(&mut self, source: Source, time: T, value: i64) {
501        self.source_changes.update((source, time), value);
502    }
503
504    /// Indicates if any pointstamps have positive count.
505    pub fn tracking_anything(&mut self) -> bool {
506        !self.source_changes.is_empty() ||
507        !self.target_changes.is_empty() ||
508        self.total_counts > 0
509    }
510
511    /// Allocate a new `Tracker` using the shape from `summaries`.
512    ///
513    /// The result is a pair of tracker, and the summaries from each input port to each
514    /// output port.
515    ///
516    /// If the optional logger is provided, it will be used to log various tracker events.
517    pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
518
519        // Allocate buffer space for each input and input port.
520        let per_operator =
521        builder
522            .shape
523            .iter()
524            .map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
525            .collect::<Vec<_>>();
526
527        // Summary of scope inputs to scope outputs.
528        let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];
529
530        // Compile summaries from each location to each scope output.
531        // Collect into per-node, per-port buckets for flattening.
532        let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
533
534        // Temporary storage: target_sum[node][port] and source_sum[node][port].
535        let mut target_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
536            .map(|&(inputs, _)| vec![PortConnectivity::default(); inputs])
537            .collect();
538        let mut source_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
539            .map(|&(_, outputs)| vec![PortConnectivity::default(); outputs])
540            .collect();
541
542        for (location, summaries) in output_summaries.into_iter() {
543            // Summaries from scope inputs are useful in summarizing the scope.
544            if location.node == 0 {
545                if let Port::Source(port) = location.port {
546                    builder_summary[port] = summaries;
547                }
548                else {
549                    // Ignore (ideally trivial) output to output summaries.
550                }
551            }
552            // Summaries from internal nodes are important for projecting capabilities.
553            else {
554                match location.port {
555                    Port::Target(port) => {
556                        target_sum[location.node][port] = summaries;
557                    },
558                    Port::Source(port) => {
559                        source_sum[location.node][port] = summaries;
560                    },
561                }
562            }
563        }
564
565        // Build columnar nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>.
566        let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| {
567            connectivity.iter().map(|port_conn| {
568                port_conn.iter_ports().flat_map(|(port, antichain)| {
569                    antichain.elements().iter().map(move |s| (port, s.clone()))
570                })
571            })
572        }));
573
574        // Build columnar edges: Vecs<Vecs<Vec<Target>>>.
575        let edges = build_nested_vecs(builder.edges.iter().map(|node_edges| {
576            node_edges.iter().map(|port_edges| port_edges.iter().cloned())
577        }));
578
579        // Build columnar target and source summaries.
580        let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| {
581            ports.iter().map(|port_conn| {
582                port_conn.iter_ports().flat_map(|(port, antichain)| {
583                    antichain.elements().iter().map(move |s| (port, s.clone()))
584                })
585            })
586        }));
587        let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| {
588            ports.iter().map(|port_conn| {
589                port_conn.iter_ports().flat_map(|(port, antichain)| {
590                    antichain.elements().iter().map(move |s| (port, s.clone()))
591                })
592            })
593        }));
594
595        let scope_outputs = builder.shape[0].0;
596        let output_changes = vec![ChangeBatch::new(); scope_outputs];
597
598        let tracker =
599        Tracker {
600            nodes,
601            edges,
602            target_summaries,
603            source_summaries,
604            per_operator,
605            target_changes: ChangeBatch::new(),
606            source_changes: ChangeBatch::new(),
607            worklist: BinaryHeap::new(),
608            pushed_changes: ChangeBatch::new(),
609            output_changes,
610            total_counts: 0,
611            logger,
612        };
613
614        (tracker, builder_summary)
615    }
616
617    /// Propagates all pending updates.
618    ///
619    /// The method drains `self.input_changes` and circulates their implications
620    /// until we cease deriving new implications.
621    pub fn propagate_all(&mut self) {
622
623        // Step 0: If logging is enabled, construct and log inbound changes.
624        if let Some(logger) = &mut self.logger {
625
626            let target_changes =
627            self.target_changes
628                .iter()
629                .map(|((target, time), diff)| (target.node, target.port, time, *diff));
630
631            logger.log_target_updates(target_changes);
632
633            let source_changes =
634            self.source_changes
635                .iter()
636                .map(|((source, time), diff)| (source.node, source.port, time, *diff));
637
638            logger.log_source_updates(source_changes);
639        }
640
641        // Step 1: Drain `self.input_changes` and determine actual frontier changes.
642        //
643        // Not all changes in `self.input_changes` may alter the frontier at a location.
644        // By filtering the changes through `self.pointstamps` we react only to discrete
645        // changes in the frontier, rather than changes in the pointstamp counts that
646        // witness that frontier.
647        use itertools::Itertools;
648        let mut target_changes = self.target_changes.drain().peekable();
649        while let Some(((target, _), _)) = target_changes.peek() {
650
651            let target = *target;
652            let operator = &mut self.per_operator[target.node].targets[target.port];
653            let target_updates = target_changes.peeking_take_while(|((t, _),_)| t == &target).map(|((_,time),diff)| (time,diff));
654            let changes = operator.pointstamps.update_iter(target_updates);
655
656            for (time, diff) in changes {
657                self.total_counts += diff;
658                for &(output, ref summary) in (&self.target_summaries).get(target.node).get(target.port).into_index_iter() {
659                    if let Some(out_time) = summary.results_in(&time) {
660                        self.output_changes[output].update(out_time, diff);
661                    }
662                }
663                self.worklist.push(Reverse((time, Location::from(target), diff)));
664            }
665        }
666
667        let mut source_changes = self.source_changes.drain().peekable();
668        while let Some(((source, _), _)) = source_changes.peek() {
669
670            let source = *source;
671            let operator = &mut self.per_operator[source.node];
672            let op_source = &mut operator.sources[source.port];
673            let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff));
674            let changes = op_source.pointstamps.update_iter(source_updates);
675
676            for (time, diff) in changes {
677                self.total_counts += diff;
678                operator.cap_counts += diff;
679                for &(output, ref summary) in (&self.source_summaries).get(source.node).get(source.port).into_index_iter() {
680                    if let Some(out_time) = summary.results_in(&time) {
681                        self.output_changes[output].update(out_time, diff);
682                    }
683                }
684                self.worklist.push(Reverse((time, Location::from(source), diff)));
685            }
686        }
687
688        // Step 2: Circulate implications of changes to `self.pointstamps`.
689        //
690        // TODO: The argument that this always terminates is subtle, and should be made.
691        //       The intent is that that by moving forward in layers through `time`, we
692        //       will discover zero-change times when we first visit them, as no further
693        //       changes can be made to them once we complete them.
694        while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
695
696            // Drain and accumulate all updates that have the same time and location.
697            while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
698                diff += (self.worklist.pop().unwrap().0).2;
699            }
700
701            // Only act if there is a net change, positive or negative.
702            if diff != 0 {
703
704                match location.port {
705                    // Update to an operator input.
706                    // Propagate any changes forward across the operator.
707                    Port::Target(port_index) => {
708
709                        let changes =
710                        self.per_operator[location.node]
711                            .targets[port_index]
712                            .implications
713                            .update_iter(Some((time, diff)));
714
715                        for (time, diff) in changes {
716                            for &(output_port, ref summary) in (&self.nodes).get(location.node).get(port_index).into_index_iter() {
717                                if let Some(new_time) = summary.results_in(&time) {
718                                    let source = Location { node: location.node, port: Port::Source(output_port) };
719                                    self.worklist.push(Reverse((new_time, source, diff)));
720                                }
721                            }
722                            self.pushed_changes.update((location, time), diff);
723                        }
724                    }
725                    // Update to an operator output.
726                    // Propagate any changes forward along outgoing edges.
727                    Port::Source(port_index) => {
728
729                        let changes =
730                        self.per_operator[location.node]
731                            .sources[port_index]
732                            .implications
733                            .update_iter(Some((time, diff)));
734
735                        for (time, diff) in changes {
736                            for new_target in (&self.edges).get(location.node).get(port_index).into_index_iter() {
737                                self.worklist.push(Reverse((
738                                    time.clone(),
739                                    Location::from(*new_target),
740                                    diff,
741                                )));
742                            }
743                            self.pushed_changes.update((location, time), diff);
744                        }
745                    },
746                };
747            }
748        }
749    }
750
751    /// Implications of maintained capabilities projected to each output.
752    pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
753        &mut self.output_changes[..]
754    }
755
756    /// A mutable reference to the pushed results of changes.
757    pub fn pushed(&mut self) -> (&mut ChangeBatch<(Location, T)>, &[PerOperator<T>]) {
758        (&mut self.pushed_changes, &self.per_operator)
759    }
760
761    /// Reveals per-operator frontier state.
762    pub fn node_state(&self, index: usize) -> &PerOperator<T> {
763        &self.per_operator[index]
764    }
765
766    /// Indicates if pointstamp is in the scope-wide frontier.
767    ///
768    /// Such a pointstamp would, if removed from `self.pointstamps`, cause a change
769    /// to `self.implications`, which is what we track for per operator input frontiers.
770    /// If the above do not hold, then its removal either 1. shouldn't be possible,
771    /// or 2. will not affect the output of `self.implications`.
772    pub fn is_global(&self, location: Location, time: &T) -> bool {
773        match location.port {
774            Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
775            Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
776        }
777    }
778}
779
780/// Determines summaries from locations to scope outputs.
781///
782/// Specifically, for each location whose node identifier is non-zero, we compile
783/// the summaries along which they can reach each output.
784///
785/// Graph locations may be missing from the output, in which case they have no
786/// paths to scope outputs.
787fn summarize_outputs<T: Timestamp>(
788    nodes: &[Connectivity<T::Summary>],
789    edges: &[Vec<Vec<Target>>],
790    ) -> HashMap<Location, PortConnectivity<T::Summary>>
791{
792    // A reverse edge map, to allow us to walk back up the dataflow graph.
793    let mut reverse = HashMap::new();
794    for (node, outputs) in edges.iter().enumerate() {
795        for (output, targets) in outputs.iter().enumerate() {
796            for target in targets.iter() {
797                reverse.insert(
798                    Location::from(*target),
799                    Location { node, port: Port::Source(output) }
800                );
801            }
802        }
803    }
804
805    // A reverse map from operator outputs to inputs, along their internal summaries.
806    let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
807    for (node, connectivity) in nodes.iter().enumerate() {
808        for (input, outputs) in connectivity.iter().enumerate() {
809            for (output, summary) in outputs.iter_ports() {
810                reverse_internal
811                    .entry(Location::new_source(node, output))
812                    .or_default()
813                    .push((input, summary));
814            }
815        }
816    }
817
818    let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
819    let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
820
821    let outputs =
822    edges
823        .iter()
824        .flat_map(|x| x.iter())
825        .flat_map(|x| x.iter())
826        .filter(|target| target.node == 0);
827
828    // The scope may have no outputs, in which case we can do no work.
829    for output_target in outputs {
830        worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
831    }
832
833    // Loop until we stop discovering novel reachability paths.
834    while let Some((location, output, summary)) = worklist.pop_front() {
835        match location.port {
836
837            // This is an output port of an operator, or a scope input.
838            // We want to crawl up the operator, to its inputs.
839            Port::Source(_output_port) => {
840                if let Some(inputs) = reverse_internal.get(&location) {
841                    for (input_port, operator_summary) in inputs.iter() {
842                        let new_location = Location::new_target(location.node, *input_port);
843                        for op_summary in operator_summary.elements().iter() {
844                            if let Some(combined) = op_summary.followed_by(&summary) {
845                                if results.entry(new_location).or_default().insert_ref(output, &combined) {
846                                    worklist.push_back((new_location, output, combined));
847                                }
848                            }
849                        }
850                    }
851                }
852            }
853
854            // This is an input port of an operator, or a scope output.
855            // We want to walk back the edges leading to it.
856            Port::Target(_port) => {
857                // Each target should have (at most) one source.
858                if let Some(&source) = reverse.get(&location) {
859                    if results.entry(source).or_default().insert_ref(output, &summary) {
860                        worklist.push_back((source, output, summary));
861                    }
862                }
863            },
864        }
865    }
866
867    results
868}
869
870/// Logging types for reachability tracking events.
871pub mod logging {
872    use std::time::Duration;
873
874    use timely_container::CapacityContainerBuilder;
875    use timely_logging::TypedLogger;
876    use crate::logging_core::Logger;
877
878    /// A container builder for tracker events.
879    pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;
880
881    /// A logger with additional identifying information about the tracker.
882    pub struct TrackerLogger<T: Clone + 'static> {
883        identifier: usize,
884        logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
885    }
886
887    impl<T: Clone + 'static> TrackerLogger<T> {
888        /// Create a new tracker logger from its fields.
889        pub fn new(identifier: usize, logger: Logger<TrackerEventBuilder<T>>) -> Self {
890            Self { identifier, logger: logger.into() }
891        }
892
893        /// Log source update events with additional identifying information.
894        pub fn log_source_updates<'a, I>(&mut self, updates: I)
895        where
896            I: IntoIterator<Item = (usize, usize, &'a T, i64)>
897        {
898            let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
899            if !updates.is_empty() {
900                self.logger.log({
901                    SourceUpdate {
902                        tracker_id: self.identifier,
903                        updates
904                    }
905                });
906            }
907        }
908        /// Log target update events with additional identifying information.
909        pub fn log_target_updates<'a, I>(&mut self, updates: I)
910        where
911            I: IntoIterator<Item = (usize, usize, &'a T, i64)>
912        {
913            let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
914            if !updates.is_empty() {
915                self.logger.log({
916                    TargetUpdate {
917                        tracker_id: self.identifier,
918                        updates
919                    }
920                });
921            }
922        }
923    }
924
925    /// Events that the tracker may record.
926    #[derive(Debug, Clone)]
927    pub enum TrackerEvent<T> {
928        /// Updates made at a source of data.
929        SourceUpdate(SourceUpdate<T>),
930        /// Updates made at a target of data.
931        TargetUpdate(TargetUpdate<T>),
932    }
933
934    /// An update made at a source of data.
935    #[derive(Debug, Clone)]
936    pub struct SourceUpdate<T> {
937        /// An identifier for the tracker.
938        pub tracker_id: usize,
939        /// Updates themselves, as `(node, port, time, diff)`.
940        pub updates: Vec<(usize, usize, T, i64)>,
941    }
942
943    /// An update made at a target of data.
944    #[derive(Debug, Clone)]
945    pub struct TargetUpdate<T> {
946        /// An identifier for the tracker.
947        pub tracker_id: usize,
948        /// Updates themselves, as `(node, port, time, diff)`.
949        pub updates: Vec<(usize, usize, T, i64)>,
950    }
951
952    impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
953        fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
954    }
955
956    impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
957        fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
958    }
959}
960
961// The Drop implementation for `Tracker` makes sure that reachability logging is correct for
962// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`,
963// because in all other cases the tracker stays alive while it has outstanding work, leaving no
964// remaining work for this Drop implementation.
965impl<T: Timestamp> Drop for Tracker<T> {
966    fn drop(&mut self) {
967        let logger = if let Some(logger) = &mut self.logger {
968            logger
969        } else {
970            // No cleanup necessary when there is no logger.
971            return;
972        };
973
974        // Retract pending data that `propagate_all` would normally log.
975        for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
976            let target_changes = per_operator.targets
977                .iter_mut()
978                .enumerate()
979                .flat_map(|(port, target)| {
980                    target.pointstamps
981                        .updates()
982                        .map(move |(time, diff)| (index, port, time, -diff))
983                });
984
985            logger.log_target_updates(target_changes);
986
987            let source_changes = per_operator.sources
988                .iter_mut()
989                .enumerate()
990                .flat_map(|(port, source)| {
991                    source.pointstamps
992                        .updates()
993                        .map(move |(time, diff)| (index, port, time, -diff))
994                });
995
996            logger.log_source_updates(source_changes);
997        }
998    }
999}