timely/progress/reachability.rs
1//! Manages pointstamp reachability within a timely dataflow graph.
2//!
3//! Timely dataflow is concerned with understanding and communicating the potential
4//! for capabilities to reach nodes in a directed graph, by following paths through
5//! the graph (along edges and through nodes). This module contains one abstraction
6//! for managing this information.
7//!
8//! # Examples
9//!
10//! ```rust
11//! use timely::progress::{Location, Port};
12//! use timely::progress::frontier::Antichain;
13//! use timely::progress::{Source, Target};
14//! use timely::progress::reachability::{Builder, Tracker};
15//!
16//! // allocate a new empty topology builder.
17//! let mut builder = Builder::<usize>::new();
18//!
19//! // Each node with one input connected to one output.
20//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
21//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
22//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
23//!
24//! // Connect nodes in sequence, looping around to the first from the last.
25//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
26//! builder.add_edge(Source::new(1, 0), Target::new(2, 0));
27//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
28//!
29//! // Construct a reachability tracker.
30//! let (mut tracker, _) = builder.build(None);
31//!
32//! // Introduce a pointstamp at the output of the first node.
33//! tracker.update_source(Source::new(0, 0), 17, 1);
34//!
35//! // Propagate changes; until this call updates are simply buffered.
36//! tracker.propagate_all();
37//!
38//! let mut results =
39//! tracker
40//! .pushed()
41//! .drain()
42//! .filter(|((location, time), delta)| location.is_target())
43//! .collect::<Vec<_>>();
44//!
45//! results.sort();
46//!
47//! println!("{:?}", results);
48//!
49//! assert_eq!(results.len(), 3);
50//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), 1));
51//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), 1));
52//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1));
53//!
54//! // Introduce a pointstamp at the output of the first node.
55//! tracker.update_source(Source::new(0, 0), 17, -1);
56//!
57//! // Propagate changes; until this call updates are simply buffered.
58//! tracker.propagate_all();
59//!
60//! let mut results =
61//! tracker
62//! .pushed()
63//! .drain()
64//! .filter(|((location, time), delta)| location.is_target())
65//! .collect::<Vec<_>>();
66//!
67//! results.sort();
68//!
69//! assert_eq!(results.len(), 3);
70//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), -1));
71//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), -1));
72//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), -1));
73//! ```
74
75use std::collections::{BinaryHeap, HashMap, VecDeque};
76use std::cmp::Reverse;
77
78use crate::progress::Timestamp;
79use crate::progress::{Source, Target};
80use crate::progress::ChangeBatch;
81use crate::progress::{Location, Port};
82use crate::progress::operate::{Connectivity, PortConnectivity};
83use crate::progress::frontier::MutableAntichain;
84use crate::progress::timestamp::PathSummary;
85
86
87/// A topology builder, which can summarize reachability along paths.
88///
89/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
90/// a static summary of the minimal actions a timestamp must endure going from any
91/// input or output port to a destination input port.
92///
93/// A graph is provides as (i) several indexed nodes, each with some number of input
94/// and output ports, and each with a summary of the internal paths connecting each
95/// input to each output, and (ii) a set of edges connecting output ports to input
96/// ports. Edges do not adjust timestamps; only nodes do this.
97///
98/// The resulting summary describes, for each origin port in the graph and destination
99/// input port, a set of incomparable path summaries, each describing what happens to
100/// a timestamp as it moves along the path. There may be multiple summaries for each
101/// part of origin and destination due to the fact that the actions on timestamps may
102/// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of
103/// the timestamp and seven").
104///
105/// # Examples
106///
107/// ```rust
108/// use timely::progress::frontier::Antichain;
109/// use timely::progress::{Source, Target};
110/// use timely::progress::reachability::Builder;
111///
112/// // allocate a new empty topology builder.
113/// let mut builder = Builder::<usize>::new();
114///
115/// // Each node with one input connected to one output.
116/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
117/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
118/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
119///
120/// // Connect nodes in sequence, looping around to the first from the last.
121/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
122/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
123/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
124///
125/// // Summarize reachability information.
126/// let (tracker, _) = builder.build(None);
127/// ```
128#[derive(Clone, Debug)]
129pub struct Builder<T: Timestamp> {
130 /// Internal connections within hosted operators.
131 ///
132 /// Indexed by operator index, then input port, then output port. This is the
133 /// same format returned by `get_internal_summary`, as if we simply appended
134 /// all of the summaries for the hosted nodes.
135 pub nodes: Vec<Connectivity<T::Summary>>,
136 /// Direct connections from sources to targets.
137 ///
138 /// Edges do not affect timestamps, so we only need to know the connectivity.
139 /// Indexed by operator index then output port.
140 pub edges: Vec<Vec<Vec<Target>>>,
141 /// Numbers of inputs and outputs for each node.
142 pub shape: Vec<(usize, usize)>,
143}
144
145impl<T: Timestamp> Builder<T> {
146
147 /// Create a new empty topology builder.
148 pub fn new() -> Self {
149 Builder {
150 nodes: Vec::new(),
151 edges: Vec::new(),
152 shape: Vec::new(),
153 }
154 }
155
156 /// Add links internal to operators.
157 ///
158 /// This method overwrites any existing summary, instead of anything more sophisticated.
159 pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
160
161 // Assert that all summaries exist.
162 debug_assert_eq!(inputs, summary.len());
163 debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)));
164
165 while self.nodes.len() <= index {
166 self.nodes.push(Vec::new());
167 self.edges.push(Vec::new());
168 self.shape.push((0, 0));
169 }
170
171 self.nodes[index] = summary;
172 if self.edges[index].len() != outputs {
173 self.edges[index] = vec![Vec::new(); outputs];
174 }
175 self.shape[index] = (inputs, outputs);
176 }
177
178 /// Add links between operators.
179 ///
180 /// This method does not check that the associated nodes and ports exist. References to
181 /// missing nodes or ports are discovered in `build`.
182 pub fn add_edge(&mut self, source: Source, target: Target) {
183
184 // Assert that the edge is between existing ports.
185 debug_assert!(source.port < self.shape[source.node].1);
186 debug_assert!(target.port < self.shape[target.node].0);
187
188 self.edges[source.node][source.port].push(target);
189 }
190
191 /// Compiles the current nodes and edges into immutable path summaries.
192 ///
193 /// This method has the opportunity to perform some error checking that the path summaries
194 /// are valid, including references to undefined nodes and ports, as well as self-loops with
195 /// default summaries (a serious liveness issue).
196 ///
197 /// The optional logger information is baked into the resulting tracker.
198 pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
199
200 if !self.is_acyclic() {
201 println!("Cycle detected without timestamp increment");
202 println!("{:?}", self);
203 }
204
205 Tracker::allocate_from(self, logger)
206 }
207
208 /// Tests whether the graph a cycle of default path summaries.
209 ///
210 /// Graphs containing cycles of default path summaries will most likely
211 /// not work well with progress tracking, as a timestamp can result in
212 /// itself. Such computations can still *run*, but one should not block
213 /// on frontier information before yielding results, as you many never
214 /// unblock.
215 ///
216 /// # Examples
217 ///
218 /// ```rust
219 /// use timely::progress::frontier::Antichain;
220 /// use timely::progress::{Source, Target};
221 /// use timely::progress::reachability::Builder;
222 ///
223 /// // allocate a new empty topology builder.
224 /// let mut builder = Builder::<usize>::new();
225 ///
226 /// // Each node with one input connected to one output.
227 /// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
228 /// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
229 /// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
230 ///
231 /// // Connect nodes in sequence, looping around to the first from the last.
232 /// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
233 /// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
234 ///
235 /// assert!(builder.is_acyclic());
236 ///
237 /// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
238 ///
239 /// assert!(!builder.is_acyclic());
240 /// ```
241 ///
242 /// This test exists because it is possible to describe dataflow graphs that
243 /// do not contain non-incrementing cycles, but without feedback nodes that
244 /// strictly increment timestamps. For example,
245 ///
246 /// ```rust
247 /// use timely::progress::frontier::Antichain;
248 /// use timely::progress::{Source, Target};
249 /// use timely::progress::reachability::Builder;
250 ///
251 /// // allocate a new empty topology builder.
252 /// let mut builder = Builder::<usize>::new();
253 ///
254 /// // Two inputs and outputs, only one of which advances.
255 /// builder.add_node(0, 2, 2, vec![
256 /// [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(),
257 /// [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(),
258 /// ]);
259 ///
260 /// // Connect each output to the opposite input.
261 /// builder.add_edge(Source::new(0, 0), Target::new(0, 1));
262 /// builder.add_edge(Source::new(0, 1), Target::new(0, 0));
263 ///
264 /// assert!(builder.is_acyclic());
265 /// ```
266 pub fn is_acyclic(&self) -> bool {
267
268 let locations = self.shape.iter().map(|(targets, sources)| targets + sources).sum();
269 let mut in_degree = HashMap::with_capacity(locations);
270
271 // Load edges as default summaries.
272 for (index, ports) in self.edges.iter().enumerate() {
273 for (output, targets) in ports.iter().enumerate() {
274 let source = Location::new_source(index, output);
275 in_degree.entry(source).or_insert(0);
276 for &target in targets.iter() {
277 let target = Location::from(target);
278 *in_degree.entry(target).or_insert(0) += 1;
279 }
280 }
281 }
282
283 // Load default intra-node summaries.
284 for (index, summary) in self.nodes.iter().enumerate() {
285 for (input, outputs) in summary.iter().enumerate() {
286 let target = Location::new_target(index, input);
287 in_degree.entry(target).or_insert(0);
288 for (output, summaries) in outputs.iter_ports() {
289 let source = Location::new_source(index, output);
290 for summary in summaries.elements().iter() {
291 if summary == &Default::default() {
292 *in_degree.entry(source).or_insert(0) += 1;
293 }
294 }
295 }
296 }
297 }
298
299 // A worklist of nodes that cannot be reached from the whole graph.
300 // Initially this list contains observed locations with no incoming
301 // edges, but as the algorithm develops we add to it any locations
302 // that can only be reached by nodes that have been on this list.
303 let mut worklist = Vec::with_capacity(in_degree.len());
304 for (key, val) in in_degree.iter() {
305 if *val == 0 {
306 worklist.push(*key);
307 }
308 }
309 in_degree.retain(|_key, val| val != &0);
310
311 // Repeatedly remove nodes and update adjacent in-edges.
312 while let Some(Location { node, port }) = worklist.pop() {
313 match port {
314 Port::Source(port) => {
315 for target in self.edges[node][port].iter() {
316 let target = Location::from(*target);
317 *in_degree.get_mut(&target).unwrap() -= 1;
318 if in_degree[&target] == 0 {
319 in_degree.remove(&target);
320 worklist.push(target);
321 }
322 }
323 },
324 Port::Target(port) => {
325 for (output, summaries) in self.nodes[node][port].iter_ports() {
326 let source = Location::new_source(node, output);
327 for summary in summaries.elements().iter() {
328 if summary == &Default::default() {
329 *in_degree.get_mut(&source).unwrap() -= 1;
330 if in_degree[&source] == 0 {
331 in_degree.remove(&source);
332 worklist.push(source);
333 }
334 }
335 }
336 }
337 },
338 }
339 }
340
341 // Acyclic graphs should reduce to empty collections.
342 in_degree.is_empty()
343 }
344}
345
346impl<T: Timestamp> Default for Builder<T> {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352/// An interactive tracker of propagated reachability information.
353///
354/// A `Tracker` tracks, for a fixed graph topology, the implications of
355/// pointstamp changes at various node input and output ports. These changes may
356/// alter the potential pointstamps that could arrive at downstream input ports.
357pub struct Tracker<T:Timestamp> {
358
359 /// Internal connections within hosted operators.
360 ///
361 /// Indexed by operator index, then input port, then output port. This is the
362 /// same format returned by `get_internal_summary`, as if we simply appended
363 /// all of the summaries for the hosted nodes.
364 nodes: Vec<Connectivity<T::Summary>>,
365 /// Direct connections from sources to targets.
366 ///
367 /// Edges do not affect timestamps, so we only need to know the connectivity.
368 /// Indexed by operator index then output port.
369 edges: Vec<Vec<Vec<Target>>>,
370
371 // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
372 // It seems we should be able to flatten most of these so that there are a few allocations
373 // independent of the numbers of nodes and ports and such.
374 //
375 // TODO: We could also change the internal representation to be a graph of targets, using usize
376 // identifiers for each, so that internally we needn't use multiple levels of indirection.
377 // This may make more sense once we commit to topologically ordering the targets.
378
379 /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
380 /// rather than their multiplicities. We separately track the frontiers resulting from propagated
381 /// frontiers, to protect them from transient negativity in inbound target updates.
382 per_operator: Vec<PerOperator<T>>,
383
384 /// Source and target changes are buffered, which allows us to delay processing until propagation,
385 /// and so consolidate updates, but to leap directly to those frontiers that may have changed.
386 target_changes: ChangeBatch<(Target, T)>,
387 source_changes: ChangeBatch<(Source, T)>,
388
389 /// Worklist of updates to perform, ordered by increasing timestamp and target.
390 worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
391
392 /// Buffer of consequent changes.
393 pushed_changes: ChangeBatch<(Location, T)>,
394
395 /// Compiled summaries from each internal location (not scope inputs) to each scope output.
396 output_changes: Vec<ChangeBatch<T>>,
397
398 /// A non-negative sum of post-filtration input changes.
399 ///
400 /// This sum should be zero exactly when the accumulated input changes are zero,
401 /// indicating that the progress tracker is currently tracking nothing. It should
402 /// always be exactly equal to the sum across all operators of the frontier sizes
403 /// of the target and source `pointstamps` member.
404 total_counts: i64,
405
406 /// Optionally, a unique logging identifier and logging for tracking events.
407 logger: Option<logging::TrackerLogger<T>>,
408}
409
410/// Target and source information for each operator.
411pub struct PerOperator<T: Timestamp> {
412 /// Port information for each target.
413 pub targets: Vec<PortInformation<T>>,
414 /// Port information for each source.
415 pub sources: Vec<PortInformation<T>>,
416}
417
418impl<T: Timestamp> PerOperator<T> {
419 /// A new PerOperator bundle from numbers of input and output ports.
420 pub fn new(inputs: usize, outputs: usize) -> Self {
421 PerOperator {
422 targets: vec![PortInformation::new(); inputs],
423 sources: vec![PortInformation::new(); outputs],
424 }
425 }
426}
427
428/// Per-port progress-tracking information.
429#[derive(Clone)]
430pub struct PortInformation<T: Timestamp> {
431 /// Current counts of active pointstamps.
432 pub pointstamps: MutableAntichain<T>,
433 /// Current implications of active pointstamps across the dataflow.
434 pub implications: MutableAntichain<T>,
435 /// Path summaries to each of the scope outputs.
436 pub output_summaries: PortConnectivity<T::Summary>,
437}
438
439impl<T: Timestamp> PortInformation<T> {
440 /// Creates empty port information.
441 pub fn new() -> Self {
442 PortInformation {
443 pointstamps: MutableAntichain::new(),
444 implications: MutableAntichain::new(),
445 output_summaries: PortConnectivity::default(),
446 }
447 }
448
449 /// Returns `true` if updates at this pointstamp uniquely block progress.
450 ///
451 /// This method returns `true` if the currently maintained pointstamp
452 /// counts are such that zeroing out outstanding updates at *this*
453 /// pointstamp would change the frontiers at this operator. When the
454 /// method returns `false` it means that, temporarily at least, there
455 /// are outstanding pointstamp updates that are strictly less than
456 /// this pointstamp.
457 #[inline]
458 pub fn is_global(&self, time: &T) -> bool {
459 let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
460 let redundant = self.implications.count_for(time) > 1;
461 !dominated && !redundant
462 }
463}
464
465impl<T: Timestamp> Default for PortInformation<T> {
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471impl<T:Timestamp> Tracker<T> {
472
473 /// Updates the count for a time at a location.
474 #[inline]
475 pub fn update(&mut self, location: Location, time: T, value: i64) {
476 match location.port {
477 Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
478 Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
479 };
480 }
481
482 /// Updates the count for a time at a target (operator input, scope output).
483 #[inline]
484 pub fn update_target(&mut self, target: Target, time: T, value: i64) {
485 self.target_changes.update((target, time), value);
486 }
487 /// Updates the count for a time at a source (operator output, scope input).
488 #[inline]
489 pub fn update_source(&mut self, source: Source, time: T, value: i64) {
490 self.source_changes.update((source, time), value);
491 }
492
493 /// Indicates if any pointstamps have positive count.
494 pub fn tracking_anything(&mut self) -> bool {
495 !self.source_changes.is_empty() ||
496 !self.target_changes.is_empty() ||
497 self.total_counts > 0
498 }
499
500 /// Allocate a new `Tracker` using the shape from `summaries`.
501 ///
502 /// The result is a pair of tracker, and the summaries from each input port to each
503 /// output port.
504 ///
505 /// If the optional logger is provided, it will be used to log various tracker events.
506 pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
507
508 // Allocate buffer space for each input and input port.
509 let mut per_operator =
510 builder
511 .shape
512 .iter()
513 .map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
514 .collect::<Vec<_>>();
515
516 // Summary of scope inputs to scope outputs.
517 let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];
518
519 // Compile summaries from each location to each scope output.
520 let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
521 for (location, summaries) in output_summaries.into_iter() {
522 // Summaries from scope inputs are useful in summarizing the scope.
523 if location.node == 0 {
524 if let Port::Source(port) = location.port {
525 builder_summary[port] = summaries;
526 }
527 else {
528 // Ignore (ideally trivial) output to output summaries.
529 }
530 }
531 // Summaries from internal nodes are important for projecting capabilities.
532 else {
533 match location.port {
534 Port::Target(port) => {
535 per_operator[location.node].targets[port].output_summaries = summaries;
536 },
537 Port::Source(port) => {
538 per_operator[location.node].sources[port].output_summaries = summaries;
539 },
540 }
541 }
542 }
543
544 let scope_outputs = builder.shape[0].0;
545 let output_changes = vec![ChangeBatch::new(); scope_outputs];
546
547 let tracker =
548 Tracker {
549 nodes: builder.nodes,
550 edges: builder.edges,
551 per_operator,
552 target_changes: ChangeBatch::new(),
553 source_changes: ChangeBatch::new(),
554 worklist: BinaryHeap::new(),
555 pushed_changes: ChangeBatch::new(),
556 output_changes,
557 total_counts: 0,
558 logger,
559 };
560
561 (tracker, builder_summary)
562 }
563
564 /// Propagates all pending updates.
565 ///
566 /// The method drains `self.input_changes` and circulates their implications
567 /// until we cease deriving new implications.
568 pub fn propagate_all(&mut self) {
569
570 // Step 0: If logging is enabled, construct and log inbound changes.
571 if let Some(logger) = &mut self.logger {
572
573 let target_changes =
574 self.target_changes
575 .iter()
576 .map(|((target, time), diff)| (target.node, target.port, time, *diff));
577
578 logger.log_target_updates(target_changes);
579
580 let source_changes =
581 self.source_changes
582 .iter()
583 .map(|((source, time), diff)| (source.node, source.port, time, *diff));
584
585 logger.log_source_updates(source_changes);
586 }
587
588 // Step 1: Drain `self.input_changes` and determine actual frontier changes.
589 //
590 // Not all changes in `self.input_changes` may alter the frontier at a location.
591 // By filtering the changes through `self.pointstamps` we react only to discrete
592 // changes in the frontier, rather than changes in the pointstamp counts that
593 // witness that frontier.
594 for ((target, time), diff) in self.target_changes.drain() {
595
596 let operator = &mut self.per_operator[target.node].targets[target.port];
597 let changes = operator.pointstamps.update_iter(Some((time, diff)));
598
599 for (time, diff) in changes {
600 self.total_counts += diff;
601 for (output, summaries) in operator.output_summaries.iter_ports() {
602 let output_changes = &mut self.output_changes[output];
603 summaries
604 .elements()
605 .iter()
606 .flat_map(|summary| summary.results_in(&time))
607 .for_each(|out_time| output_changes.update(out_time, diff));
608 }
609 self.worklist.push(Reverse((time, Location::from(target), diff)));
610 }
611 }
612
613 for ((source, time), diff) in self.source_changes.drain() {
614
615 let operator = &mut self.per_operator[source.node].sources[source.port];
616 let changes = operator.pointstamps.update_iter(Some((time, diff)));
617
618 for (time, diff) in changes {
619 self.total_counts += diff;
620 for (output, summaries) in operator.output_summaries.iter_ports() {
621 let output_changes = &mut self.output_changes[output];
622 summaries
623 .elements()
624 .iter()
625 .flat_map(|summary| summary.results_in(&time))
626 .for_each(|out_time| output_changes.update(out_time, diff));
627 }
628 self.worklist.push(Reverse((time, Location::from(source), diff)));
629 }
630 }
631
632 // Step 2: Circulate implications of changes to `self.pointstamps`.
633 //
634 // TODO: The argument that this always terminates is subtle, and should be made.
635 // The intent is that that by moving forward in layers through `time`, we
636 // will discover zero-change times when we first visit them, as no further
637 // changes can be made to them once we complete them.
638 while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
639
640 // Drain and accumulate all updates that have the same time and location.
641 while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
642 diff += (self.worklist.pop().unwrap().0).2;
643 }
644
645 // Only act if there is a net change, positive or negative.
646 if diff != 0 {
647
648 match location.port {
649 // Update to an operator input.
650 // Propagate any changes forward across the operator.
651 Port::Target(port_index) => {
652
653 let changes =
654 self.per_operator[location.node]
655 .targets[port_index]
656 .implications
657 .update_iter(Some((time, diff)));
658
659 for (time, diff) in changes {
660 let nodes = &self.nodes[location.node][port_index];
661 for (output_port, summaries) in nodes.iter_ports() {
662 let source = Location { node: location.node, port: Port::Source(output_port) };
663 for summary in summaries.elements().iter() {
664 if let Some(new_time) = summary.results_in(&time) {
665 self.worklist.push(Reverse((new_time, source, diff)));
666 }
667 }
668 }
669 self.pushed_changes.update((location, time), diff);
670 }
671 }
672 // Update to an operator output.
673 // Propagate any changes forward along outgoing edges.
674 Port::Source(port_index) => {
675
676 let changes =
677 self.per_operator[location.node]
678 .sources[port_index]
679 .implications
680 .update_iter(Some((time, diff)));
681
682 for (time, diff) in changes {
683 for new_target in self.edges[location.node][port_index].iter() {
684 self.worklist.push(Reverse((
685 time.clone(),
686 Location::from(*new_target),
687 diff,
688 )));
689 }
690 self.pushed_changes.update((location, time), diff);
691 }
692 },
693 };
694 }
695 }
696 }
697
698 /// Implications of maintained capabilities projected to each output.
699 pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
700 &mut self.output_changes[..]
701 }
702
703 /// A mutable reference to the pushed results of changes.
704 pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
705 &mut self.pushed_changes
706 }
707
708 /// Reveals per-operator frontier state.
709 pub fn node_state(&self, index: usize) -> &PerOperator<T> {
710 &self.per_operator[index]
711 }
712
713 /// Indicates if pointstamp is in the scope-wide frontier.
714 ///
715 /// Such a pointstamp would, if removed from `self.pointstamps`, cause a change
716 /// to `self.implications`, which is what we track for per operator input frontiers.
717 /// If the above do not hold, then its removal either 1. shouldn't be possible,
718 /// or 2. will not affect the output of `self.implications`.
719 pub fn is_global(&self, location: Location, time: &T) -> bool {
720 match location.port {
721 Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
722 Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
723 }
724 }
725}
726
727/// Determines summaries from locations to scope outputs.
728///
729/// Specifically, for each location whose node identifier is non-zero, we compile
730/// the summaries along which they can reach each output.
731///
732/// Graph locations may be missing from the output, in which case they have no
733/// paths to scope outputs.
734fn summarize_outputs<T: Timestamp>(
735 nodes: &[Connectivity<T::Summary>],
736 edges: &[Vec<Vec<Target>>],
737 ) -> HashMap<Location, PortConnectivity<T::Summary>>
738{
739 // A reverse edge map, to allow us to walk back up the dataflow graph.
740 let mut reverse = HashMap::new();
741 for (node, outputs) in edges.iter().enumerate() {
742 for (output, targets) in outputs.iter().enumerate() {
743 for target in targets.iter() {
744 reverse.insert(
745 Location::from(*target),
746 Location { node, port: Port::Source(output) }
747 );
748 }
749 }
750 }
751
752 // A reverse map from operator outputs to inputs, along their internal summaries.
753 let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
754 for (node, connectivity) in nodes.iter().enumerate() {
755 for (input, outputs) in connectivity.iter().enumerate() {
756 for (output, summary) in outputs.iter_ports() {
757 reverse_internal
758 .entry(Location::new_source(node, output))
759 .or_default()
760 .push((input, summary));
761 }
762 }
763 }
764
765 let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
766 let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
767
768 let outputs =
769 edges
770 .iter()
771 .flat_map(|x| x.iter())
772 .flat_map(|x| x.iter())
773 .filter(|target| target.node == 0);
774
775 // The scope may have no outputs, in which case we can do no work.
776 for output_target in outputs {
777 worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
778 }
779
780 // Loop until we stop discovering novel reachability paths.
781 while let Some((location, output, summary)) = worklist.pop_front() {
782 match location.port {
783
784 // This is an output port of an operator, or a scope input.
785 // We want to crawl up the operator, to its inputs.
786 Port::Source(_output_port) => {
787 if let Some(inputs) = reverse_internal.get(&location) {
788 for (input_port, operator_summary) in inputs.iter() {
789 let new_location = Location::new_target(location.node, *input_port);
790 for op_summary in operator_summary.elements().iter() {
791 if let Some(combined) = op_summary.followed_by(&summary) {
792 if results.entry(new_location).or_default().insert_ref(output, &combined) {
793 worklist.push_back((new_location, output, combined));
794 }
795 }
796 }
797 }
798 }
799 }
800
801 // This is an input port of an operator, or a scope output.
802 // We want to walk back the edges leading to it.
803 Port::Target(_port) => {
804 // Each target should have (at most) one source.
805 if let Some(&source) = reverse.get(&location) {
806 if results.entry(source).or_default().insert_ref(output, &summary) {
807 worklist.push_back((source, output, summary));
808 }
809 }
810 },
811 }
812 }
813
814 results
815}
816
817/// Logging types for reachability tracking events.
818pub mod logging {
819 use std::time::Duration;
820
821 use timely_container::CapacityContainerBuilder;
822 use timely_logging::TypedLogger;
823 use crate::logging_core::Logger;
824
825 /// A container builder for tracker events.
826 pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;
827
828 /// A logger with additional identifying information about the tracker.
829 pub struct TrackerLogger<T: Clone + 'static> {
830 identifier: usize,
831 logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
832 }
833
834 impl<T: Clone + 'static> TrackerLogger<T> {
835 /// Create a new tracker logger from its fields.
836 pub fn new(identifier: usize, logger: Logger<TrackerEventBuilder<T>>) -> Self {
837 Self { identifier, logger: logger.into() }
838 }
839
840 /// Log source update events with additional identifying information.
841 pub fn log_source_updates<'a, I>(&mut self, updates: I)
842 where
843 I: IntoIterator<Item = (usize, usize, &'a T, i64)>
844 {
845 let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
846 if !updates.is_empty() {
847 self.logger.log({
848 SourceUpdate {
849 tracker_id: self.identifier,
850 updates
851 }
852 });
853 }
854 }
855 /// Log target update events with additional identifying information.
856 pub fn log_target_updates<'a, I>(&mut self, updates: I)
857 where
858 I: IntoIterator<Item = (usize, usize, &'a T, i64)>
859 {
860 let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
861 if !updates.is_empty() {
862 self.logger.log({
863 TargetUpdate {
864 tracker_id: self.identifier,
865 updates
866 }
867 });
868 }
869 }
870 }
871
872 /// Events that the tracker may record.
873 #[derive(Debug, Clone)]
874 pub enum TrackerEvent<T> {
875 /// Updates made at a source of data.
876 SourceUpdate(SourceUpdate<T>),
877 /// Updates made at a target of data.
878 TargetUpdate(TargetUpdate<T>),
879 }
880
881 /// An update made at a source of data.
882 #[derive(Debug, Clone)]
883 pub struct SourceUpdate<T> {
884 /// An identifier for the tracker.
885 pub tracker_id: usize,
886 /// Updates themselves, as `(node, port, time, diff)`.
887 pub updates: Vec<(usize, usize, T, i64)>,
888 }
889
890 /// An update made at a target of data.
891 #[derive(Debug, Clone)]
892 pub struct TargetUpdate<T> {
893 /// An identifier for the tracker.
894 pub tracker_id: usize,
895 /// Updates themselves, as `(node, port, time, diff)`.
896 pub updates: Vec<(usize, usize, T, i64)>,
897 }
898
899 impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
900 fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
901 }
902
903 impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
904 fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
905 }
906}
907
908// The Drop implementation for `Tracker` makes sure that reachability logging is correct for
909// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`,
910// because in all other cases the tracker stays alive while it has outstanding work, leaving no
911// remaining work for this Drop implementation.
912impl<T: Timestamp> Drop for Tracker<T> {
913 fn drop(&mut self) {
914 let logger = if let Some(logger) = &mut self.logger {
915 logger
916 } else {
917 // No cleanup necessary when there is no logger.
918 return;
919 };
920
921 // Retract pending data that `propagate_all` would normally log.
922 for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
923 let target_changes = per_operator.targets
924 .iter_mut()
925 .enumerate()
926 .flat_map(|(port, target)| {
927 target.pointstamps
928 .updates()
929 .map(move |(time, diff)| (index, port, time, -diff))
930 });
931
932 logger.log_target_updates(target_changes);
933
934 let source_changes = per_operator.sources
935 .iter_mut()
936 .enumerate()
937 .flat_map(|(port, source)| {
938 source.pointstamps
939 .updates()
940 .map(move |(time, diff)| (index, port, time, -diff))
941 });
942
943 logger.log_source_updates(source_changes);
944 }
945 }
946}