timely/progress/
reachability.rs

1//! Manages pointstamp reachability within a timely dataflow graph.
2//!
3//! Timely dataflow is concerned with understanding and communicating the potential
4//! for capabilities to reach nodes in a directed graph, by following paths through
5//! the graph (along edges and through nodes). This module contains one abstraction
6//! for managing this information.
7//!
8//! # Examples
9//!
10//! ```rust
11//! use timely::progress::{Location, Port};
12//! use timely::progress::frontier::Antichain;
13//! use timely::progress::{Source, Target};
14//! use timely::progress::reachability::{Builder, Tracker};
15//!
16//! // allocate a new empty topology builder.
17//! let mut builder = Builder::<usize>::new();
18//!
19//! // Each node with one input connected to one output.
20//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
21//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
22//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
23//!
24//! // Connect nodes in sequence, looping around to the first from the last.
25//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
26//! builder.add_edge(Source::new(1, 0), Target::new(2, 0));
27//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
28//!
29//! // Construct a reachability tracker.
30//! let (mut tracker, _) = builder.build(None);
31//!
32//! // Introduce a pointstamp at the output of the first node.
33//! tracker.update_source(Source::new(0, 0), 17, 1);
34//!
35//! // Propagate changes; until this call updates are simply buffered.
36//! tracker.propagate_all();
37//!
38//! let mut results =
39//! tracker
40//!     .pushed()
41//!     .drain()
42//!     .filter(|((location, time), delta)| location.is_target())
43//!     .collect::<Vec<_>>();
44//!
45//! results.sort();
46//!
47//! println!("{:?}", results);
48//!
49//! assert_eq!(results.len(), 3);
50//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), 1));
51//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), 1));
52//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1));
53//!
54//! // Introduce a pointstamp at the output of the first node.
55//! tracker.update_source(Source::new(0, 0), 17, -1);
56//!
57//! // Propagate changes; until this call updates are simply buffered.
58//! tracker.propagate_all();
59//!
60//! let mut results =
61//! tracker
62//!     .pushed()
63//!     .drain()
64//!     .filter(|((location, time), delta)| location.is_target())
65//!     .collect::<Vec<_>>();
66//!
67//! results.sort();
68//!
69//! assert_eq!(results.len(), 3);
70//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), -1));
71//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), -1));
72//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), -1));
73//! ```
74
75use std::collections::{BinaryHeap, HashMap, VecDeque};
76use std::cmp::Reverse;
77
78use crate::progress::Timestamp;
79use crate::progress::{Source, Target};
80use crate::progress::ChangeBatch;
81use crate::progress::{Location, Port};
82use crate::progress::operate::{Connectivity, PortConnectivity};
83use crate::progress::frontier::MutableAntichain;
84use crate::progress::timestamp::PathSummary;
85
86
87/// A topology builder, which can summarize reachability along paths.
88///
89/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
90/// a static summary of the minimal actions a timestamp must endure going from any
91/// input or output port to a destination input port.
92///
93/// A graph is provides as (i) several indexed nodes, each with some number of input
94/// and output ports, and each with a summary of the internal paths connecting each
95/// input to each output, and (ii) a set of edges connecting output ports to input
96/// ports. Edges do not adjust timestamps; only nodes do this.
97///
98/// The resulting summary describes, for each origin port in the graph and destination
99/// input port, a set of incomparable path summaries, each describing what happens to
100/// a timestamp as it moves along the path. There may be multiple summaries for each
101/// part of origin and destination due to the fact that the actions on timestamps may
102/// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of
103/// the timestamp and seven").
104///
105/// # Examples
106///
107/// ```rust
108/// use timely::progress::frontier::Antichain;
109/// use timely::progress::{Source, Target};
110/// use timely::progress::reachability::Builder;
111///
112/// // allocate a new empty topology builder.
113/// let mut builder = Builder::<usize>::new();
114///
115/// // Each node with one input connected to one output.
116/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
117/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
118/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
119///
120/// // Connect nodes in sequence, looping around to the first from the last.
121/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
122/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
123/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
124///
125/// // Summarize reachability information.
126/// let (tracker, _) = builder.build(None);
127/// ```
128#[derive(Clone, Debug)]
129pub struct Builder<T: Timestamp> {
130    /// Internal connections within hosted operators.
131    ///
132    /// Indexed by operator index, then input port, then output port. This is the
133    /// same format returned by `get_internal_summary`, as if we simply appended
134    /// all of the summaries for the hosted nodes.
135    pub nodes: Vec<Connectivity<T::Summary>>,
136    /// Direct connections from sources to targets.
137    ///
138    /// Edges do not affect timestamps, so we only need to know the connectivity.
139    /// Indexed by operator index then output port.
140    pub edges: Vec<Vec<Vec<Target>>>,
141    /// Numbers of inputs and outputs for each node.
142    pub shape: Vec<(usize, usize)>,
143}
144
145impl<T: Timestamp> Builder<T> {
146
147    /// Create a new empty topology builder.
148    pub fn new() -> Self {
149        Builder {
150            nodes: Vec::new(),
151            edges: Vec::new(),
152            shape: Vec::new(),
153        }
154    }
155
156    /// Add links internal to operators.
157    ///
158    /// This method overwrites any existing summary, instead of anything more sophisticated.
159    pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
160
161        // Assert that all summaries exist.
162        debug_assert_eq!(inputs, summary.len());
163        debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)));
164
165        while self.nodes.len() <= index {
166            self.nodes.push(Vec::new());
167            self.edges.push(Vec::new());
168            self.shape.push((0, 0));
169        }
170
171        self.nodes[index] = summary;
172        if self.edges[index].len() != outputs {
173            self.edges[index] = vec![Vec::new(); outputs];
174        }
175        self.shape[index] = (inputs, outputs);
176    }
177
178    /// Add links between operators.
179    ///
180    /// This method does not check that the associated nodes and ports exist. References to
181    /// missing nodes or ports are discovered in `build`.
182    pub fn add_edge(&mut self, source: Source, target: Target) {
183
184        // Assert that the edge is between existing ports.
185        debug_assert!(source.port < self.shape[source.node].1);
186        debug_assert!(target.port < self.shape[target.node].0);
187
188        self.edges[source.node][source.port].push(target);
189    }
190
191    /// Compiles the current nodes and edges into immutable path summaries.
192    ///
193    /// This method has the opportunity to perform some error checking that the path summaries
194    /// are valid, including references to undefined nodes and ports, as well as self-loops with
195    /// default summaries (a serious liveness issue).
196    ///
197    /// The optional logger information is baked into the resulting tracker.
198    pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
199
200        if !self.is_acyclic() {
201            println!("Cycle detected without timestamp increment");
202            println!("{:?}", self);
203        }
204
205        Tracker::allocate_from(self, logger)
206    }
207
208    /// Tests whether the graph a cycle of default path summaries.
209    ///
210    /// Graphs containing cycles of default path summaries will most likely
211    /// not work well with progress tracking, as a timestamp can result in
212    /// itself. Such computations can still *run*, but one should not block
213    /// on frontier information before yielding results, as you many never
214    /// unblock.
215    ///
216    /// # Examples
217    ///
218    /// ```rust
219    /// use timely::progress::frontier::Antichain;
220    /// use timely::progress::{Source, Target};
221    /// use timely::progress::reachability::Builder;
222    ///
223    /// // allocate a new empty topology builder.
224    /// let mut builder = Builder::<usize>::new();
225    ///
226    /// // Each node with one input connected to one output.
227    /// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
228    /// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
229    /// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
230    ///
231    /// // Connect nodes in sequence, looping around to the first from the last.
232    /// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
233    /// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
234    ///
235    /// assert!(builder.is_acyclic());
236    ///
237    /// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
238    ///
239    /// assert!(!builder.is_acyclic());
240    /// ```
241    ///
242    /// This test exists because it is possible to describe dataflow graphs that
243    /// do not contain non-incrementing cycles, but without feedback nodes that
244    /// strictly increment timestamps. For example,
245    ///
246    /// ```rust
247    /// use timely::progress::frontier::Antichain;
248    /// use timely::progress::{Source, Target};
249    /// use timely::progress::reachability::Builder;
250    ///
251    /// // allocate a new empty topology builder.
252    /// let mut builder = Builder::<usize>::new();
253    ///
254    /// // Two inputs and outputs, only one of which advances.
255    /// builder.add_node(0, 2, 2, vec![
256    ///     [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(),
257    ///     [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(),
258    /// ]);
259    ///
260    /// // Connect each output to the opposite input.
261    /// builder.add_edge(Source::new(0, 0), Target::new(0, 1));
262    /// builder.add_edge(Source::new(0, 1), Target::new(0, 0));
263    ///
264    /// assert!(builder.is_acyclic());
265    /// ```
266    pub fn is_acyclic(&self) -> bool {
267
268        let locations = self.shape.iter().map(|(targets, sources)| targets + sources).sum();
269        let mut in_degree = HashMap::with_capacity(locations);
270
271        // Load edges as default summaries.
272        for (index, ports) in self.edges.iter().enumerate() {
273            for (output, targets) in ports.iter().enumerate() {
274                let source = Location::new_source(index, output);
275                in_degree.entry(source).or_insert(0);
276                for &target in targets.iter() {
277                    let target = Location::from(target);
278                    *in_degree.entry(target).or_insert(0) += 1;
279                }
280            }
281        }
282
283        // Load default intra-node summaries.
284        for (index, summary) in self.nodes.iter().enumerate() {
285            for (input, outputs) in summary.iter().enumerate() {
286                let target = Location::new_target(index, input);
287                in_degree.entry(target).or_insert(0);
288                for (output, summaries) in outputs.iter_ports() {
289                    let source = Location::new_source(index, output);
290                    for summary in summaries.elements().iter() {
291                        if summary == &Default::default() {
292                            *in_degree.entry(source).or_insert(0) += 1;
293                        }
294                    }
295                }
296            }
297        }
298
299        // A worklist of nodes that cannot be reached from the whole graph.
300        // Initially this list contains observed locations with no incoming
301        // edges, but as the algorithm develops we add to it any locations
302        // that can only be reached by nodes that have been on this list.
303        let mut worklist = Vec::with_capacity(in_degree.len());
304        for (key, val) in in_degree.iter() {
305            if *val == 0 {
306                worklist.push(*key);
307            }
308        }
309        in_degree.retain(|_key, val| val != &0);
310
311        // Repeatedly remove nodes and update adjacent in-edges.
312        while let Some(Location { node, port }) = worklist.pop() {
313            match port {
314                Port::Source(port) => {
315                    for target in self.edges[node][port].iter() {
316                        let target = Location::from(*target);
317                        *in_degree.get_mut(&target).unwrap() -= 1;
318                        if in_degree[&target] == 0 {
319                            in_degree.remove(&target);
320                            worklist.push(target);
321                        }
322                    }
323                },
324                Port::Target(port) => {
325                    for (output, summaries) in self.nodes[node][port].iter_ports() {
326                        let source = Location::new_source(node, output);
327                        for summary in summaries.elements().iter() {
328                            if summary == &Default::default() {
329                                *in_degree.get_mut(&source).unwrap() -= 1;
330                                if in_degree[&source] == 0 {
331                                    in_degree.remove(&source);
332                                    worklist.push(source);
333                                }
334                            }
335                        }
336                    }
337                },
338            }
339        }
340
341        // Acyclic graphs should reduce to empty collections.
342        in_degree.is_empty()
343    }
344}
345
346impl<T: Timestamp> Default for Builder<T> {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352/// An interactive tracker of propagated reachability information.
353///
354/// A `Tracker` tracks, for a fixed graph topology, the implications of
355/// pointstamp changes at various node input and output ports. These changes may
356/// alter the potential pointstamps that could arrive at downstream input ports.
357pub struct Tracker<T:Timestamp> {
358
359    /// Internal connections within hosted operators.
360    ///
361    /// Indexed by operator index, then input port, then output port. This is the
362    /// same format returned by `get_internal_summary`, as if we simply appended
363    /// all of the summaries for the hosted nodes.
364    nodes: Vec<Connectivity<T::Summary>>,
365    /// Direct connections from sources to targets.
366    ///
367    /// Edges do not affect timestamps, so we only need to know the connectivity.
368    /// Indexed by operator index then output port.
369    edges: Vec<Vec<Vec<Target>>>,
370
371    // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
372    //       It seems we should be able to flatten most of these so that there are a few allocations
373    //       independent of the numbers of nodes and ports and such.
374    //
375    // TODO: We could also change the internal representation to be a graph of targets, using usize
376    //       identifiers for each, so that internally we needn't use multiple levels of indirection.
377    //       This may make more sense once we commit to topologically ordering the targets.
378
379    /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
380    /// rather than their multiplicities. We separately track the frontiers resulting from propagated
381    /// frontiers, to protect them from transient negativity in inbound target updates.
382    per_operator: Vec<PerOperator<T>>,
383
384    /// Source and target changes are buffered, which allows us to delay processing until propagation,
385    /// and so consolidate updates, but to leap directly to those frontiers that may have changed.
386    target_changes: ChangeBatch<(Target, T)>,
387    source_changes: ChangeBatch<(Source, T)>,
388
389    /// Worklist of updates to perform, ordered by increasing timestamp and target.
390    worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
391
392    /// Buffer of consequent changes.
393    pushed_changes: ChangeBatch<(Location, T)>,
394
395    /// Compiled summaries from each internal location (not scope inputs) to each scope output.
396    output_changes: Vec<ChangeBatch<T>>,
397
398    /// A non-negative sum of post-filtration input changes.
399    ///
400    /// This sum should be zero exactly when the accumulated input changes are zero,
401    /// indicating that the progress tracker is currently tracking nothing. It should
402    /// always be exactly equal to the sum across all operators of the frontier sizes
403    /// of the target and source `pointstamps` member.
404    total_counts: i64,
405
406    /// Optionally, a unique logging identifier and logging for tracking events.
407    logger: Option<logging::TrackerLogger<T>>,
408}
409
410/// Target and source information for each operator.
411pub struct PerOperator<T: Timestamp> {
412    /// Port information for each target.
413    pub targets: Vec<PortInformation<T>>,
414    /// Port information for each source.
415    pub sources: Vec<PortInformation<T>>,
416}
417
418impl<T: Timestamp> PerOperator<T> {
419    /// A new PerOperator bundle from numbers of input and output ports.
420    pub fn new(inputs: usize, outputs: usize) -> Self {
421        PerOperator {
422            targets: vec![PortInformation::new(); inputs],
423            sources: vec![PortInformation::new(); outputs],
424        }
425    }
426}
427
428/// Per-port progress-tracking information.
429#[derive(Clone)]
430pub struct PortInformation<T: Timestamp> {
431    /// Current counts of active pointstamps.
432    pub pointstamps: MutableAntichain<T>,
433    /// Current implications of active pointstamps across the dataflow.
434    pub implications: MutableAntichain<T>,
435    /// Path summaries to each of the scope outputs.
436    pub output_summaries: PortConnectivity<T::Summary>,
437}
438
439impl<T: Timestamp> PortInformation<T> {
440    /// Creates empty port information.
441    pub fn new() -> Self {
442        PortInformation {
443            pointstamps: MutableAntichain::new(),
444            implications: MutableAntichain::new(),
445            output_summaries: PortConnectivity::default(),
446        }
447    }
448
449    /// Returns `true` if updates at this pointstamp uniquely block progress.
450    ///
451    /// This method returns `true` if the currently maintained pointstamp
452    /// counts are such that zeroing out outstanding updates at *this*
453    /// pointstamp would change the frontiers at this operator. When the
454    /// method returns `false` it means that, temporarily at least, there
455    /// are outstanding pointstamp updates that are strictly less than
456    /// this pointstamp.
457    #[inline]
458    pub fn is_global(&self, time: &T) -> bool {
459        let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
460        let redundant = self.implications.count_for(time) > 1;
461        !dominated && !redundant
462    }
463}
464
465impl<T: Timestamp> Default for PortInformation<T> {
466    fn default() -> Self {
467        Self::new()
468    }
469}
470
471impl<T:Timestamp> Tracker<T> {
472
473    /// Updates the count for a time at a location.
474    #[inline]
475    pub fn update(&mut self, location: Location, time: T, value: i64) {
476        match location.port {
477            Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
478            Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
479        };
480    }
481
482    /// Updates the count for a time at a target (operator input, scope output).
483    #[inline]
484    pub fn update_target(&mut self, target: Target, time: T, value: i64) {
485        self.target_changes.update((target, time), value);
486    }
487    /// Updates the count for a time at a source (operator output, scope input).
488    #[inline]
489    pub fn update_source(&mut self, source: Source, time: T, value: i64) {
490        self.source_changes.update((source, time), value);
491    }
492
493    /// Indicates if any pointstamps have positive count.
494    pub fn tracking_anything(&mut self) -> bool {
495        !self.source_changes.is_empty() ||
496        !self.target_changes.is_empty() ||
497        self.total_counts > 0
498    }
499
500    /// Allocate a new `Tracker` using the shape from `summaries`.
501    ///
502    /// The result is a pair of tracker, and the summaries from each input port to each
503    /// output port.
504    ///
505    /// If the optional logger is provided, it will be used to log various tracker events.
506    pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
507
508        // Allocate buffer space for each input and input port.
509        let mut per_operator =
510        builder
511            .shape
512            .iter()
513            .map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
514            .collect::<Vec<_>>();
515
516        // Summary of scope inputs to scope outputs.
517        let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];
518
519        // Compile summaries from each location to each scope output.
520        let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
521        for (location, summaries) in output_summaries.into_iter() {
522            // Summaries from scope inputs are useful in summarizing the scope.
523            if location.node == 0 {
524                if let Port::Source(port) = location.port {
525                    builder_summary[port] = summaries;
526                }
527                else {
528                    // Ignore (ideally trivial) output to output summaries.
529                }
530            }
531            // Summaries from internal nodes are important for projecting capabilities.
532            else {
533                match location.port {
534                    Port::Target(port) => {
535                        per_operator[location.node].targets[port].output_summaries = summaries;
536                    },
537                    Port::Source(port) => {
538                        per_operator[location.node].sources[port].output_summaries = summaries;
539                    },
540                }
541            }
542        }
543
544        let scope_outputs = builder.shape[0].0;
545        let output_changes = vec![ChangeBatch::new(); scope_outputs];
546
547        let tracker =
548        Tracker {
549            nodes: builder.nodes,
550            edges: builder.edges,
551            per_operator,
552            target_changes: ChangeBatch::new(),
553            source_changes: ChangeBatch::new(),
554            worklist: BinaryHeap::new(),
555            pushed_changes: ChangeBatch::new(),
556            output_changes,
557            total_counts: 0,
558            logger,
559        };
560
561        (tracker, builder_summary)
562    }
563
564    /// Propagates all pending updates.
565    ///
566    /// The method drains `self.input_changes` and circulates their implications
567    /// until we cease deriving new implications.
568    pub fn propagate_all(&mut self) {
569
570        // Step 0: If logging is enabled, construct and log inbound changes.
571        if let Some(logger) = &mut self.logger {
572
573            let target_changes =
574            self.target_changes
575                .iter()
576                .map(|((target, time), diff)| (target.node, target.port, time, *diff));
577
578            logger.log_target_updates(target_changes);
579
580            let source_changes =
581            self.source_changes
582                .iter()
583                .map(|((source, time), diff)| (source.node, source.port, time, *diff));
584
585            logger.log_source_updates(source_changes);
586        }
587
588        // Step 1: Drain `self.input_changes` and determine actual frontier changes.
589        //
590        // Not all changes in `self.input_changes` may alter the frontier at a location.
591        // By filtering the changes through `self.pointstamps` we react only to discrete
592        // changes in the frontier, rather than changes in the pointstamp counts that
593        // witness that frontier.
594        for ((target, time), diff) in self.target_changes.drain() {
595
596            let operator = &mut self.per_operator[target.node].targets[target.port];
597            let changes = operator.pointstamps.update_iter(Some((time, diff)));
598
599            for (time, diff) in changes {
600                self.total_counts += diff;
601                for (output, summaries) in operator.output_summaries.iter_ports() {
602                    let output_changes = &mut self.output_changes[output];
603                    summaries
604                        .elements()
605                        .iter()
606                        .flat_map(|summary| summary.results_in(&time))
607                        .for_each(|out_time| output_changes.update(out_time, diff));
608                }
609                self.worklist.push(Reverse((time, Location::from(target), diff)));
610            }
611        }
612
613        for ((source, time), diff) in self.source_changes.drain() {
614
615            let operator = &mut self.per_operator[source.node].sources[source.port];
616            let changes = operator.pointstamps.update_iter(Some((time, diff)));
617
618            for (time, diff) in changes {
619                self.total_counts += diff;
620                for (output, summaries) in operator.output_summaries.iter_ports() {
621                    let output_changes = &mut self.output_changes[output];
622                    summaries
623                        .elements()
624                        .iter()
625                        .flat_map(|summary| summary.results_in(&time))
626                        .for_each(|out_time| output_changes.update(out_time, diff));
627                }
628                self.worklist.push(Reverse((time, Location::from(source), diff)));
629            }
630        }
631
632        // Step 2: Circulate implications of changes to `self.pointstamps`.
633        //
634        // TODO: The argument that this always terminates is subtle, and should be made.
635        //       The intent is that that by moving forward in layers through `time`, we
636        //       will discover zero-change times when we first visit them, as no further
637        //       changes can be made to them once we complete them.
638        while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
639
640            // Drain and accumulate all updates that have the same time and location.
641            while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
642                diff += (self.worklist.pop().unwrap().0).2;
643            }
644
645            // Only act if there is a net change, positive or negative.
646            if diff != 0 {
647
648                match location.port {
649                    // Update to an operator input.
650                    // Propagate any changes forward across the operator.
651                    Port::Target(port_index) => {
652
653                        let changes =
654                        self.per_operator[location.node]
655                            .targets[port_index]
656                            .implications
657                            .update_iter(Some((time, diff)));
658
659                        for (time, diff) in changes {
660                            let nodes = &self.nodes[location.node][port_index];
661                            for (output_port, summaries) in nodes.iter_ports() {
662                                let source = Location { node: location.node, port: Port::Source(output_port) };
663                                for summary in summaries.elements().iter() {
664                                    if let Some(new_time) = summary.results_in(&time) {
665                                        self.worklist.push(Reverse((new_time, source, diff)));
666                                    }
667                                }
668                            }
669                            self.pushed_changes.update((location, time), diff);
670                        }
671                    }
672                    // Update to an operator output.
673                    // Propagate any changes forward along outgoing edges.
674                    Port::Source(port_index) => {
675
676                        let changes =
677                        self.per_operator[location.node]
678                            .sources[port_index]
679                            .implications
680                            .update_iter(Some((time, diff)));
681
682                        for (time, diff) in changes {
683                            for new_target in self.edges[location.node][port_index].iter() {
684                                self.worklist.push(Reverse((
685                                    time.clone(),
686                                    Location::from(*new_target),
687                                    diff,
688                                )));
689                            }
690                            self.pushed_changes.update((location, time), diff);
691                        }
692                    },
693                };
694            }
695        }
696    }
697
698    /// Implications of maintained capabilities projected to each output.
699    pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
700        &mut self.output_changes[..]
701    }
702
703    /// A mutable reference to the pushed results of changes.
704    pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
705        &mut self.pushed_changes
706    }
707
708    /// Reveals per-operator frontier state.
709    pub fn node_state(&self, index: usize) -> &PerOperator<T> {
710        &self.per_operator[index]
711    }
712
713    /// Indicates if pointstamp is in the scope-wide frontier.
714    ///
715    /// Such a pointstamp would, if removed from `self.pointstamps`, cause a change
716    /// to `self.implications`, which is what we track for per operator input frontiers.
717    /// If the above do not hold, then its removal either 1. shouldn't be possible,
718    /// or 2. will not affect the output of `self.implications`.
719    pub fn is_global(&self, location: Location, time: &T) -> bool {
720        match location.port {
721            Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
722            Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
723        }
724    }
725}
726
727/// Determines summaries from locations to scope outputs.
728///
729/// Specifically, for each location whose node identifier is non-zero, we compile
730/// the summaries along which they can reach each output.
731///
732/// Graph locations may be missing from the output, in which case they have no
733/// paths to scope outputs.
734fn summarize_outputs<T: Timestamp>(
735    nodes: &[Connectivity<T::Summary>],
736    edges: &[Vec<Vec<Target>>],
737    ) -> HashMap<Location, PortConnectivity<T::Summary>>
738{
739    // A reverse edge map, to allow us to walk back up the dataflow graph.
740    let mut reverse = HashMap::new();
741    for (node, outputs) in edges.iter().enumerate() {
742        for (output, targets) in outputs.iter().enumerate() {
743            for target in targets.iter() {
744                reverse.insert(
745                    Location::from(*target),
746                    Location { node, port: Port::Source(output) }
747                );
748            }
749        }
750    }
751
752    // A reverse map from operator outputs to inputs, along their internal summaries.
753    let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
754    for (node, connectivity) in nodes.iter().enumerate() {
755        for (input, outputs) in connectivity.iter().enumerate() {
756            for (output, summary) in outputs.iter_ports() {
757                reverse_internal
758                    .entry(Location::new_source(node, output))
759                    .or_default()
760                    .push((input, summary));
761            }
762        }
763    }
764    
765    let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
766    let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
767
768    let outputs =
769    edges
770        .iter()
771        .flat_map(|x| x.iter())
772        .flat_map(|x| x.iter())
773        .filter(|target| target.node == 0);
774
775    // The scope may have no outputs, in which case we can do no work.
776    for output_target in outputs {
777        worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
778    }
779
780    // Loop until we stop discovering novel reachability paths.
781    while let Some((location, output, summary)) = worklist.pop_front() {
782        match location.port {
783
784            // This is an output port of an operator, or a scope input.
785            // We want to crawl up the operator, to its inputs.
786            Port::Source(_output_port) => {
787                if let Some(inputs) = reverse_internal.get(&location) {
788                    for (input_port, operator_summary) in inputs.iter() {
789                        let new_location = Location::new_target(location.node, *input_port);
790                        for op_summary in operator_summary.elements().iter() {
791                            if let Some(combined) = op_summary.followed_by(&summary) {
792                                if results.entry(new_location).or_default().insert_ref(output, &combined) {
793                                    worklist.push_back((new_location, output, combined));
794                                }
795                            }
796                        }
797                    }
798                }
799            }
800
801            // This is an input port of an operator, or a scope output.
802            // We want to walk back the edges leading to it.
803            Port::Target(_port) => {
804                // Each target should have (at most) one source.
805                if let Some(&source) = reverse.get(&location) {
806                    if results.entry(source).or_default().insert_ref(output, &summary) {
807                        worklist.push_back((source, output, summary));
808                    }
809                }
810            },
811        }
812    }
813
814    results
815}
816
817/// Logging types for reachability tracking events.
818pub mod logging {
819    use std::time::Duration;
820
821    use timely_container::CapacityContainerBuilder;
822    use timely_logging::TypedLogger;
823    use crate::logging_core::Logger;
824
825    /// A container builder for tracker events.
826    pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;
827
828    /// A logger with additional identifying information about the tracker.
829    pub struct TrackerLogger<T: Clone + 'static> {
830        identifier: usize,
831        logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
832    }
833
834    impl<T: Clone + 'static> TrackerLogger<T> {
835        /// Create a new tracker logger from its fields.
836        pub fn new(identifier: usize, logger: Logger<TrackerEventBuilder<T>>) -> Self {
837            Self { identifier, logger: logger.into() }
838        }
839
840        /// Log source update events with additional identifying information.
841        pub fn log_source_updates<'a, I>(&mut self, updates: I)
842        where
843            I: IntoIterator<Item = (usize, usize, &'a T, i64)>
844        {
845            let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
846            if !updates.is_empty() {
847                self.logger.log({
848                    SourceUpdate {
849                        tracker_id: self.identifier,
850                        updates
851                    }
852                });
853            }
854        }
855        /// Log target update events with additional identifying information.
856        pub fn log_target_updates<'a, I>(&mut self, updates: I)
857        where
858            I: IntoIterator<Item = (usize, usize, &'a T, i64)>
859        {
860            let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
861            if !updates.is_empty() {
862                self.logger.log({
863                    TargetUpdate {
864                        tracker_id: self.identifier,
865                        updates
866                    }
867                });
868            }
869        }
870    }
871
872    /// Events that the tracker may record.
873    #[derive(Debug, Clone)]
874    pub enum TrackerEvent<T> {
875        /// Updates made at a source of data.
876        SourceUpdate(SourceUpdate<T>),
877        /// Updates made at a target of data.
878        TargetUpdate(TargetUpdate<T>),
879    }
880
881    /// An update made at a source of data.
882    #[derive(Debug, Clone)]
883    pub struct SourceUpdate<T> {
884        /// An identifier for the tracker.
885        pub tracker_id: usize,
886        /// Updates themselves, as `(node, port, time, diff)`.
887        pub updates: Vec<(usize, usize, T, i64)>,
888    }
889
890    /// An update made at a target of data.
891    #[derive(Debug, Clone)]
892    pub struct TargetUpdate<T> {
893        /// An identifier for the tracker.
894        pub tracker_id: usize,
895        /// Updates themselves, as `(node, port, time, diff)`.
896        pub updates: Vec<(usize, usize, T, i64)>,
897    }
898
899    impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
900        fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
901    }
902
903    impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
904        fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
905    }
906}
907
908// The Drop implementation for `Tracker` makes sure that reachability logging is correct for
909// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`,
910// because in all other cases the tracker stays alive while it has outstanding work, leaving no
911// remaining work for this Drop implementation.
912impl<T: Timestamp> Drop for Tracker<T> {
913    fn drop(&mut self) {
914        let logger = if let Some(logger) = &mut self.logger {
915            logger
916        } else {
917            // No cleanup necessary when there is no logger.
918            return;
919        };
920
921        // Retract pending data that `propagate_all` would normally log.
922        for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
923            let target_changes = per_operator.targets
924                .iter_mut()
925                .enumerate()
926                .flat_map(|(port, target)| {
927                    target.pointstamps
928                        .updates()
929                        .map(move |(time, diff)| (index, port, time, -diff))
930                });
931
932            logger.log_target_updates(target_changes);
933
934            let source_changes = per_operator.sources
935                .iter_mut()
936                .enumerate()
937                .flat_map(|(port, source)| {
938                    source.pointstamps
939                        .updates()
940                        .map(move |(time, diff)| (index, port, time, -diff))
941                });
942
943            logger.log_source_updates(source_changes);
944        }
945    }
946}