timely/progress/reachability.rs
1//! Manages pointstamp reachability within a timely dataflow graph.
2//!
3//! Timely dataflow is concerned with understanding and communicating the potential
4//! for capabilities to reach nodes in a directed graph, by following paths through
5//! the graph (along edges and through nodes). This module contains one abstraction
6//! for managing this information.
7//!
8//! # Examples
9//!
10//! ```rust
11//! use timely::progress::{Location, Port};
12//! use timely::progress::frontier::Antichain;
13//! use timely::progress::{Source, Target};
14//! use timely::progress::reachability::{Builder, Tracker};
15//!
16//! // allocate a new empty topology builder.
17//! let mut builder = Builder::<usize>::new();
18//!
19//! // Each node with one input connected to one output.
20//! builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
21//! builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
22//! builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
23//!
24//! // Connect nodes in sequence, looping around to the first from the last.
25//! builder.add_edge(Source::new(0, 0), Target::new(1, 0));
26//! builder.add_edge(Source::new(1, 0), Target::new(2, 0));
27//! builder.add_edge(Source::new(2, 0), Target::new(0, 0));
28//!
29//! // Construct a reachability tracker.
30//! let (mut tracker, _) = builder.build(None);
31//!
32//! // Introduce a pointstamp at the output of the first node.
33//! tracker.update_source(Source::new(0, 0), 17, 1);
34//!
35//! // Propagate changes; until this call updates are simply buffered.
36//! tracker.propagate_all();
37//!
38//! let mut results =
39//! tracker
40//! .pushed()
41//! .0
42//! .drain()
43//! .filter(|((location, time), delta)| location.is_target())
44//! .collect::<Vec<_>>();
45//!
46//! results.sort();
47//!
48//! println!("{:?}", results);
49//!
50//! assert_eq!(results.len(), 3);
51//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), 1));
52//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), 1));
53//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), 1));
54//!
55//! // Introduce a pointstamp at the output of the first node.
56//! tracker.update_source(Source::new(0, 0), 17, -1);
57//!
58//! // Propagate changes; until this call updates are simply buffered.
59//! tracker.propagate_all();
60//!
61//! let mut results =
62//! tracker
63//! .pushed()
64//! .0
65//! .drain()
66//! .filter(|((location, time), delta)| location.is_target())
67//! .collect::<Vec<_>>();
68//!
69//! results.sort();
70//!
71//! assert_eq!(results.len(), 3);
72//! assert_eq!(results[0], ((Location::new_target(0, 0), 18), -1));
73//! assert_eq!(results[1], ((Location::new_target(1, 0), 17), -1));
74//! assert_eq!(results[2], ((Location::new_target(2, 0), 17), -1));
75//! ```
76
77use std::collections::{BinaryHeap, HashMap, VecDeque};
78use std::cmp::Reverse;
79
80use columnar::{Vecs, Index as ColumnarIndex};
81
82use crate::progress::Timestamp;
83use crate::progress::{Source, Target};
84use crate::progress::ChangeBatch;
85use crate::progress::{Location, Port};
86use crate::progress::operate::{Connectivity, PortConnectivity};
87use crate::progress::frontier::MutableAntichain;
88use crate::progress::timestamp::PathSummary;
89
90/// Build a `Vecs<Vecs<Vec<S>>>` from nested iterators.
91///
92/// The outer iterator yields nodes, each node yields ports, each port yields data items.
93fn build_nested_vecs<S>(nodes: impl Iterator<Item = impl Iterator<Item = impl Iterator<Item = S>>>) -> Vecs<Vecs<Vec<S>>> {
94 let mut result: Vecs<Vecs<Vec<S>>> = Default::default();
95 for node in nodes {
96 for port in node {
97 result.values.push_iter(port);
98 }
99 result.bounds.push(result.values.bounds.len() as u64);
100 }
101 result
102}
103
104/// A topology builder, which can summarize reachability along paths.
105///
106/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
107/// a static summary of the minimal actions a timestamp must endure going from any
108/// input or output port to a destination input port.
109///
110/// A graph is provides as (i) several indexed nodes, each with some number of input
111/// and output ports, and each with a summary of the internal paths connecting each
112/// input to each output, and (ii) a set of edges connecting output ports to input
113/// ports. Edges do not adjust timestamps; only nodes do this.
114///
115/// The resulting summary describes, for each origin port in the graph and destination
116/// input port, a set of incomparable path summaries, each describing what happens to
117/// a timestamp as it moves along the path. There may be multiple summaries for each
118/// part of origin and destination due to the fact that the actions on timestamps may
119/// not be totally ordered (e.g., "increment the timestamp" and "take the maximum of
120/// the timestamp and seven").
121///
122/// # Examples
123///
124/// ```rust
125/// use timely::progress::frontier::Antichain;
126/// use timely::progress::{Source, Target};
127/// use timely::progress::reachability::Builder;
128///
129/// // allocate a new empty topology builder.
130/// let mut builder = Builder::<usize>::new();
131///
132/// // Each node with one input connected to one output.
133/// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
134/// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
135/// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(1))].into_iter().collect()]);
136///
137/// // Connect nodes in sequence, looping around to the first from the last.
138/// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
139/// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
140/// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
141///
142/// // Summarize reachability information.
143/// let (tracker, _) = builder.build(None);
144/// ```
145#[derive(Clone, Debug)]
146pub struct Builder<T: Timestamp> {
147 /// Internal connections within hosted operators.
148 ///
149 /// Indexed by operator index, then input port, then output port. This is the
150 /// same format returned by `initialize`, as if we simply appended
151 /// all of the summaries for the hosted nodes.
152 pub nodes: Vec<Connectivity<T::Summary>>,
153 /// Direct connections from sources to targets.
154 ///
155 /// Edges do not affect timestamps, so we only need to know the connectivity.
156 /// Indexed by operator index then output port.
157 pub edges: Vec<Vec<Vec<Target>>>,
158 /// Numbers of inputs and outputs for each node.
159 pub shape: Vec<(usize, usize)>,
160}
161
162impl<T: Timestamp> Builder<T> {
163
164 /// Create a new empty topology builder.
165 pub fn new() -> Self {
166 Builder {
167 nodes: Vec::new(),
168 edges: Vec::new(),
169 shape: Vec::new(),
170 }
171 }
172
173 /// Add links internal to operators.
174 ///
175 /// This method overwrites any existing summary, instead of anything more sophisticated.
176 pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Connectivity<T::Summary>) {
177
178 // Assert that all summaries exist.
179 debug_assert_eq!(inputs, summary.len());
180 debug_assert!(summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)));
181
182 while self.nodes.len() <= index {
183 self.nodes.push(Vec::new());
184 self.edges.push(Vec::new());
185 self.shape.push((0, 0));
186 }
187
188 self.nodes[index] = summary;
189 if self.edges[index].len() != outputs {
190 self.edges[index] = vec![Vec::new(); outputs];
191 }
192 self.shape[index] = (inputs, outputs);
193 }
194
195 /// Add links between operators.
196 ///
197 /// This method does not check that the associated nodes and ports exist. References to
198 /// missing nodes or ports are discovered in `build`.
199 pub fn add_edge(&mut self, source: Source, target: Target) {
200
201 // Assert that the edge is between existing ports.
202 debug_assert!(source.port < self.shape[source.node].1);
203 debug_assert!(target.port < self.shape[target.node].0);
204
205 self.edges[source.node][source.port].push(target);
206 }
207
208 /// Compiles the current nodes and edges into immutable path summaries.
209 ///
210 /// This method has the opportunity to perform some error checking that the path summaries
211 /// are valid, including references to undefined nodes and ports, as well as self-loops with
212 /// default summaries (a serious liveness issue).
213 ///
214 /// The optional logger information is baked into the resulting tracker.
215 pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Connectivity<T::Summary>) {
216
217 if !self.is_acyclic() {
218 println!("Cycle detected without timestamp increment");
219 println!("{:?}", self);
220 }
221
222 Tracker::allocate_from(self, logger)
223 }
224
225 /// Tests whether the graph includes a cycle of default path summaries.
226 ///
227 /// Graphs containing cycles of default path summaries will most likely
228 /// not work well with progress tracking, as a timestamp can result in
229 /// itself. Such computations can still *run*, but one should not block
230 /// on frontier information before yielding results, as you many never
231 /// unblock.
232 ///
233 /// # Examples
234 ///
235 /// ```rust
236 /// use timely::progress::frontier::Antichain;
237 /// use timely::progress::{Source, Target};
238 /// use timely::progress::reachability::Builder;
239 ///
240 /// // allocate a new empty topology builder.
241 /// let mut builder = Builder::<usize>::new();
242 ///
243 /// // Each node with one input connected to one output.
244 /// builder.add_node(0, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
245 /// builder.add_node(1, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
246 /// builder.add_node(2, 1, 1, vec![[(0, Antichain::from_elem(0))].into_iter().collect()]);
247 ///
248 /// // Connect nodes in sequence, looping around to the first from the last.
249 /// builder.add_edge(Source::new(0, 0), Target::new(1, 0));
250 /// builder.add_edge(Source::new(1, 0), Target::new(2, 0));
251 ///
252 /// assert!(builder.is_acyclic());
253 ///
254 /// builder.add_edge(Source::new(2, 0), Target::new(0, 0));
255 ///
256 /// assert!(!builder.is_acyclic());
257 /// ```
258 ///
259 /// This test exists because it is possible to describe dataflow graphs that
260 /// do not contain non-incrementing cycles, but without feedback nodes that
261 /// strictly increment timestamps. For example,
262 ///
263 /// ```rust
264 /// use timely::progress::frontier::Antichain;
265 /// use timely::progress::{Source, Target};
266 /// use timely::progress::reachability::Builder;
267 ///
268 /// // allocate a new empty topology builder.
269 /// let mut builder = Builder::<usize>::new();
270 ///
271 /// // Two inputs and outputs, only one of which advances.
272 /// builder.add_node(0, 2, 2, vec![
273 /// [(0,Antichain::from_elem(0)),(1,Antichain::new())].into_iter().collect(),
274 /// [(0,Antichain::new()),(1,Antichain::from_elem(1))].into_iter().collect(),
275 /// ]);
276 ///
277 /// // Connect each output to the opposite input.
278 /// builder.add_edge(Source::new(0, 0), Target::new(0, 1));
279 /// builder.add_edge(Source::new(0, 1), Target::new(0, 0));
280 ///
281 /// assert!(builder.is_acyclic());
282 /// ```
283 pub fn is_acyclic(&self) -> bool {
284
285 let locations = self.shape.iter().map(|(targets, sources)| targets + sources).sum();
286 let mut in_degree = HashMap::with_capacity(locations);
287
288 // Load edges as default summaries.
289 for (index, ports) in self.edges.iter().enumerate() {
290 for (output, targets) in ports.iter().enumerate() {
291 let source = Location::new_source(index, output);
292 in_degree.entry(source).or_insert(0);
293 for &target in targets.iter() {
294 let target = Location::from(target);
295 *in_degree.entry(target).or_insert(0) += 1;
296 }
297 }
298 }
299
300 // Load default intra-node summaries.
301 for (index, summary) in self.nodes.iter().enumerate() {
302 for (input, outputs) in summary.iter().enumerate() {
303 let target = Location::new_target(index, input);
304 in_degree.entry(target).or_insert(0);
305 for (output, summaries) in outputs.iter_ports() {
306 let source = Location::new_source(index, output);
307 for summary in summaries.elements().iter() {
308 if summary == &Default::default() {
309 *in_degree.entry(source).or_insert(0) += 1;
310 }
311 }
312 }
313 }
314 }
315
316 // A worklist of nodes that cannot be reached from the whole graph.
317 // Initially this list contains observed locations with no incoming
318 // edges, but as the algorithm develops we add to it any locations
319 // that can only be reached by nodes that have been on this list.
320 let mut worklist = Vec::with_capacity(in_degree.len());
321 for (key, val) in in_degree.iter() {
322 if *val == 0 {
323 worklist.push(*key);
324 }
325 }
326 in_degree.retain(|_key, val| val != &0);
327
328 // Repeatedly remove nodes and update adjacent in-edges.
329 while let Some(Location { node, port }) = worklist.pop() {
330 match port {
331 Port::Source(port) => {
332 for target in self.edges[node][port].iter() {
333 let target = Location::from(*target);
334 *in_degree.get_mut(&target).unwrap() -= 1;
335 if in_degree[&target] == 0 {
336 in_degree.remove(&target);
337 worklist.push(target);
338 }
339 }
340 },
341 Port::Target(port) => {
342 for (output, summaries) in self.nodes[node][port].iter_ports() {
343 let source = Location::new_source(node, output);
344 for summary in summaries.elements().iter() {
345 if summary == &Default::default() {
346 *in_degree.get_mut(&source).unwrap() -= 1;
347 if in_degree[&source] == 0 {
348 in_degree.remove(&source);
349 worklist.push(source);
350 }
351 }
352 }
353 }
354 },
355 }
356 }
357
358 // Acyclic graphs should reduce to empty collections.
359 in_degree.is_empty()
360 }
361}
362
363impl<T: Timestamp> Default for Builder<T> {
364 fn default() -> Self {
365 Self::new()
366 }
367}
368
369/// An interactive tracker of propagated reachability information.
370///
371/// A `Tracker` tracks, for a fixed graph topology, the implications of
372/// pointstamp changes at various node input and output ports. These changes may
373/// alter the potential pointstamps that could arrive at downstream input ports.
374pub struct Tracker<T:Timestamp> {
375
376 /// Internal operator connectivity, columnar form of `Vec<Vec<PortConnectivity<T::Summary>>>`.
377 /// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs.
378 nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
379 /// Edge connectivity, columnar form of `Vec<Vec<Vec<Target>>>`.
380 /// Indexed by `(node, output_port)` to yield target slices.
381 edges: Vecs<Vecs<Vec<Target>>>,
382
383 /// Summaries from each target (operator input) to scope outputs.
384 /// Indexed by `(node, target_port)` to yield `(scope_output, summary)` pairs.
385 target_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
386 /// Summaries from each source (operator output) to scope outputs.
387 /// Indexed by `(node, source_port)` to yield `(scope_output, summary)` pairs.
388 source_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
389
390 /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
391 /// rather than their multiplicities. We separately track the frontiers resulting from propagated
392 /// frontiers, to protect them from transient negativity in inbound target updates.
393 per_operator: Vec<PerOperator<T>>,
394
395 /// Source and target changes are buffered, which allows us to delay processing until propagation,
396 /// and so consolidate updates, but to leap directly to those frontiers that may have changed.
397 target_changes: ChangeBatch<(Target, T)>,
398 source_changes: ChangeBatch<(Source, T)>,
399
400 /// Worklist of updates to perform, ordered by increasing timestamp and target.
401 worklist: BinaryHeap<Reverse<(T, Location, i64)>>,
402
403 /// Buffer of consequent changes.
404 pushed_changes: ChangeBatch<(Location, T)>,
405
406 /// Compiled summaries from each internal location (not scope inputs) to each scope output.
407 output_changes: Vec<ChangeBatch<T>>,
408
409 /// A non-negative sum of post-filtration input changes.
410 ///
411 /// This sum should be zero exactly when the accumulated input changes are zero,
412 /// indicating that the progress tracker is currently tracking nothing. It should
413 /// always be exactly equal to the sum across all operators of the frontier sizes
414 /// of the target and source `pointstamps` member.
415 total_counts: i64,
416
417 /// Optionally, a unique logging identifier and logging for tracking events.
418 logger: Option<logging::TrackerLogger<T>>,
419}
420
421/// Target and source information for each operator.
422pub struct PerOperator<T: Timestamp> {
423 /// Port information for each target.
424 pub targets: Vec<PortInformation<T>>,
425 /// Port information for each source.
426 pub sources: Vec<PortInformation<T>>,
427 /// Sum across outputs of capabilities.
428 pub cap_counts: i64,
429}
430
431impl<T: Timestamp> PerOperator<T> {
432 /// A new PerOperator bundle from numbers of input and output ports.
433 pub fn new(inputs: usize, outputs: usize) -> Self {
434 PerOperator {
435 targets: vec![PortInformation::new(); inputs],
436 sources: vec![PortInformation::new(); outputs],
437 cap_counts: 0,
438 }
439 }
440}
441
442/// Per-port progress-tracking information.
443#[derive(Clone)]
444pub struct PortInformation<T: Timestamp> {
445 /// Current counts of active pointstamps.
446 pub pointstamps: MutableAntichain<T>,
447 /// Current implications of active pointstamps across the dataflow.
448 pub implications: MutableAntichain<T>,
449}
450
451impl<T: Timestamp> PortInformation<T> {
452 /// Creates empty port information.
453 pub fn new() -> Self {
454 PortInformation {
455 pointstamps: MutableAntichain::new(),
456 implications: MutableAntichain::new(),
457 }
458 }
459
460 /// Returns `true` if updates at this pointstamp uniquely block progress.
461 ///
462 /// This method returns `true` if the currently maintained pointstamp
463 /// counts are such that zeroing out outstanding updates at *this*
464 /// pointstamp would change the frontiers at this operator. When the
465 /// method returns `false` it means that, temporarily at least, there
466 /// are outstanding pointstamp updates that are strictly less than
467 /// this pointstamp.
468 #[inline]
469 pub fn is_global(&self, time: &T) -> bool {
470 let dominated = self.implications.frontier().iter().any(|t| t.less_than(time));
471 let redundant = self.implications.count_for(time) > 1;
472 !dominated && !redundant
473 }
474}
475
476impl<T: Timestamp> Default for PortInformation<T> {
477 fn default() -> Self {
478 Self::new()
479 }
480}
481
482impl<T:Timestamp> Tracker<T> {
483
484 /// Updates the count for a time at a location.
485 #[inline]
486 pub fn update(&mut self, location: Location, time: T, value: i64) {
487 match location.port {
488 Port::Target(port) => self.update_target(Target::new(location.node, port), time, value),
489 Port::Source(port) => self.update_source(Source::new(location.node, port), time, value),
490 };
491 }
492
493 /// Updates the count for a time at a target (operator input, scope output).
494 #[inline]
495 pub fn update_target(&mut self, target: Target, time: T, value: i64) {
496 self.target_changes.update((target, time), value);
497 }
498 /// Updates the count for a time at a source (operator output, scope input).
499 #[inline]
500 pub fn update_source(&mut self, source: Source, time: T, value: i64) {
501 self.source_changes.update((source, time), value);
502 }
503
504 /// Indicates if any pointstamps have positive count.
505 pub fn tracking_anything(&mut self) -> bool {
506 !self.source_changes.is_empty() ||
507 !self.target_changes.is_empty() ||
508 self.total_counts > 0
509 }
510
511 /// Allocate a new `Tracker` using the shape from `summaries`.
512 ///
513 /// The result is a pair of tracker, and the summaries from each input port to each
514 /// output port.
515 ///
516 /// If the optional logger is provided, it will be used to log various tracker events.
517 pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {
518
519 // Allocate buffer space for each input and input port.
520 let per_operator =
521 builder
522 .shape
523 .iter()
524 .map(|&(inputs, outputs)| PerOperator::new(inputs, outputs))
525 .collect::<Vec<_>>();
526
527 // Summary of scope inputs to scope outputs.
528 let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];
529
530 // Compile summaries from each location to each scope output.
531 // Collect into per-node, per-port buckets for flattening.
532 let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);
533
534 // Temporary storage: target_sum[node][port] and source_sum[node][port].
535 let mut target_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
536 .map(|&(inputs, _)| vec![PortConnectivity::default(); inputs])
537 .collect();
538 let mut source_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
539 .map(|&(_, outputs)| vec![PortConnectivity::default(); outputs])
540 .collect();
541
542 for (location, summaries) in output_summaries.into_iter() {
543 // Summaries from scope inputs are useful in summarizing the scope.
544 if location.node == 0 {
545 if let Port::Source(port) = location.port {
546 builder_summary[port] = summaries;
547 }
548 else {
549 // Ignore (ideally trivial) output to output summaries.
550 }
551 }
552 // Summaries from internal nodes are important for projecting capabilities.
553 else {
554 match location.port {
555 Port::Target(port) => {
556 target_sum[location.node][port] = summaries;
557 },
558 Port::Source(port) => {
559 source_sum[location.node][port] = summaries;
560 },
561 }
562 }
563 }
564
565 // Build columnar nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>.
566 let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| {
567 connectivity.iter().map(|port_conn| {
568 port_conn.iter_ports().flat_map(|(port, antichain)| {
569 antichain.elements().iter().map(move |s| (port, s.clone()))
570 })
571 })
572 }));
573
574 // Build columnar edges: Vecs<Vecs<Vec<Target>>>.
575 let edges = build_nested_vecs(builder.edges.iter().map(|node_edges| {
576 node_edges.iter().map(|port_edges| port_edges.iter().cloned())
577 }));
578
579 // Build columnar target and source summaries.
580 let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| {
581 ports.iter().map(|port_conn| {
582 port_conn.iter_ports().flat_map(|(port, antichain)| {
583 antichain.elements().iter().map(move |s| (port, s.clone()))
584 })
585 })
586 }));
587 let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| {
588 ports.iter().map(|port_conn| {
589 port_conn.iter_ports().flat_map(|(port, antichain)| {
590 antichain.elements().iter().map(move |s| (port, s.clone()))
591 })
592 })
593 }));
594
595 let scope_outputs = builder.shape[0].0;
596 let output_changes = vec![ChangeBatch::new(); scope_outputs];
597
598 let tracker =
599 Tracker {
600 nodes,
601 edges,
602 target_summaries,
603 source_summaries,
604 per_operator,
605 target_changes: ChangeBatch::new(),
606 source_changes: ChangeBatch::new(),
607 worklist: BinaryHeap::new(),
608 pushed_changes: ChangeBatch::new(),
609 output_changes,
610 total_counts: 0,
611 logger,
612 };
613
614 (tracker, builder_summary)
615 }
616
617 /// Propagates all pending updates.
618 ///
619 /// The method drains `self.input_changes` and circulates their implications
620 /// until we cease deriving new implications.
621 pub fn propagate_all(&mut self) {
622
623 // Step 0: If logging is enabled, construct and log inbound changes.
624 if let Some(logger) = &mut self.logger {
625
626 let target_changes =
627 self.target_changes
628 .iter()
629 .map(|((target, time), diff)| (target.node, target.port, time, *diff));
630
631 logger.log_target_updates(target_changes);
632
633 let source_changes =
634 self.source_changes
635 .iter()
636 .map(|((source, time), diff)| (source.node, source.port, time, *diff));
637
638 logger.log_source_updates(source_changes);
639 }
640
641 // Step 1: Drain `self.input_changes` and determine actual frontier changes.
642 //
643 // Not all changes in `self.input_changes` may alter the frontier at a location.
644 // By filtering the changes through `self.pointstamps` we react only to discrete
645 // changes in the frontier, rather than changes in the pointstamp counts that
646 // witness that frontier.
647 use itertools::Itertools;
648 let mut target_changes = self.target_changes.drain().peekable();
649 while let Some(((target, _), _)) = target_changes.peek() {
650
651 let target = *target;
652 let operator = &mut self.per_operator[target.node].targets[target.port];
653 let target_updates = target_changes.peeking_take_while(|((t, _),_)| t == &target).map(|((_,time),diff)| (time,diff));
654 let changes = operator.pointstamps.update_iter(target_updates);
655
656 for (time, diff) in changes {
657 self.total_counts += diff;
658 for &(output, ref summary) in (&self.target_summaries).get(target.node).get(target.port).into_index_iter() {
659 if let Some(out_time) = summary.results_in(&time) {
660 self.output_changes[output].update(out_time, diff);
661 }
662 }
663 self.worklist.push(Reverse((time, Location::from(target), diff)));
664 }
665 }
666
667 let mut source_changes = self.source_changes.drain().peekable();
668 while let Some(((source, _), _)) = source_changes.peek() {
669
670 let source = *source;
671 let operator = &mut self.per_operator[source.node];
672 let op_source = &mut operator.sources[source.port];
673 let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff));
674 let changes = op_source.pointstamps.update_iter(source_updates);
675
676 for (time, diff) in changes {
677 self.total_counts += diff;
678 operator.cap_counts += diff;
679 for &(output, ref summary) in (&self.source_summaries).get(source.node).get(source.port).into_index_iter() {
680 if let Some(out_time) = summary.results_in(&time) {
681 self.output_changes[output].update(out_time, diff);
682 }
683 }
684 self.worklist.push(Reverse((time, Location::from(source), diff)));
685 }
686 }
687
688 // Step 2: Circulate implications of changes to `self.pointstamps`.
689 //
690 // TODO: The argument that this always terminates is subtle, and should be made.
691 // The intent is that that by moving forward in layers through `time`, we
692 // will discover zero-change times when we first visit them, as no further
693 // changes can be made to them once we complete them.
694 while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {
695
696 // Drain and accumulate all updates that have the same time and location.
697 while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
698 diff += (self.worklist.pop().unwrap().0).2;
699 }
700
701 // Only act if there is a net change, positive or negative.
702 if diff != 0 {
703
704 match location.port {
705 // Update to an operator input.
706 // Propagate any changes forward across the operator.
707 Port::Target(port_index) => {
708
709 let changes =
710 self.per_operator[location.node]
711 .targets[port_index]
712 .implications
713 .update_iter(Some((time, diff)));
714
715 for (time, diff) in changes {
716 for &(output_port, ref summary) in (&self.nodes).get(location.node).get(port_index).into_index_iter() {
717 if let Some(new_time) = summary.results_in(&time) {
718 let source = Location { node: location.node, port: Port::Source(output_port) };
719 self.worklist.push(Reverse((new_time, source, diff)));
720 }
721 }
722 self.pushed_changes.update((location, time), diff);
723 }
724 }
725 // Update to an operator output.
726 // Propagate any changes forward along outgoing edges.
727 Port::Source(port_index) => {
728
729 let changes =
730 self.per_operator[location.node]
731 .sources[port_index]
732 .implications
733 .update_iter(Some((time, diff)));
734
735 for (time, diff) in changes {
736 for new_target in (&self.edges).get(location.node).get(port_index).into_index_iter() {
737 self.worklist.push(Reverse((
738 time.clone(),
739 Location::from(*new_target),
740 diff,
741 )));
742 }
743 self.pushed_changes.update((location, time), diff);
744 }
745 },
746 };
747 }
748 }
749 }
750
751 /// Implications of maintained capabilities projected to each output.
752 pub fn pushed_output(&mut self) -> &mut [ChangeBatch<T>] {
753 &mut self.output_changes[..]
754 }
755
756 /// A mutable reference to the pushed results of changes.
757 pub fn pushed(&mut self) -> (&mut ChangeBatch<(Location, T)>, &[PerOperator<T>]) {
758 (&mut self.pushed_changes, &self.per_operator)
759 }
760
761 /// Reveals per-operator frontier state.
762 pub fn node_state(&self, index: usize) -> &PerOperator<T> {
763 &self.per_operator[index]
764 }
765
766 /// Indicates if pointstamp is in the scope-wide frontier.
767 ///
768 /// Such a pointstamp would, if removed from `self.pointstamps`, cause a change
769 /// to `self.implications`, which is what we track for per operator input frontiers.
770 /// If the above do not hold, then its removal either 1. shouldn't be possible,
771 /// or 2. will not affect the output of `self.implications`.
772 pub fn is_global(&self, location: Location, time: &T) -> bool {
773 match location.port {
774 Port::Target(port) => self.per_operator[location.node].targets[port].is_global(time),
775 Port::Source(port) => self.per_operator[location.node].sources[port].is_global(time),
776 }
777 }
778}
779
780/// Determines summaries from locations to scope outputs.
781///
782/// Specifically, for each location whose node identifier is non-zero, we compile
783/// the summaries along which they can reach each output.
784///
785/// Graph locations may be missing from the output, in which case they have no
786/// paths to scope outputs.
787fn summarize_outputs<T: Timestamp>(
788 nodes: &[Connectivity<T::Summary>],
789 edges: &[Vec<Vec<Target>>],
790 ) -> HashMap<Location, PortConnectivity<T::Summary>>
791{
792 // A reverse edge map, to allow us to walk back up the dataflow graph.
793 let mut reverse = HashMap::new();
794 for (node, outputs) in edges.iter().enumerate() {
795 for (output, targets) in outputs.iter().enumerate() {
796 for target in targets.iter() {
797 reverse.insert(
798 Location::from(*target),
799 Location { node, port: Port::Source(output) }
800 );
801 }
802 }
803 }
804
805 // A reverse map from operator outputs to inputs, along their internal summaries.
806 let mut reverse_internal: HashMap<_, Vec<_>> = HashMap::new();
807 for (node, connectivity) in nodes.iter().enumerate() {
808 for (input, outputs) in connectivity.iter().enumerate() {
809 for (output, summary) in outputs.iter_ports() {
810 reverse_internal
811 .entry(Location::new_source(node, output))
812 .or_default()
813 .push((input, summary));
814 }
815 }
816 }
817
818 let mut results: HashMap<Location, PortConnectivity<T::Summary>> = HashMap::new();
819 let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();
820
821 let outputs =
822 edges
823 .iter()
824 .flat_map(|x| x.iter())
825 .flat_map(|x| x.iter())
826 .filter(|target| target.node == 0);
827
828 // The scope may have no outputs, in which case we can do no work.
829 for output_target in outputs {
830 worklist.push_back((Location::from(*output_target), output_target.port, Default::default()));
831 }
832
833 // Loop until we stop discovering novel reachability paths.
834 while let Some((location, output, summary)) = worklist.pop_front() {
835 match location.port {
836
837 // This is an output port of an operator, or a scope input.
838 // We want to crawl up the operator, to its inputs.
839 Port::Source(_output_port) => {
840 if let Some(inputs) = reverse_internal.get(&location) {
841 for (input_port, operator_summary) in inputs.iter() {
842 let new_location = Location::new_target(location.node, *input_port);
843 for op_summary in operator_summary.elements().iter() {
844 if let Some(combined) = op_summary.followed_by(&summary) {
845 if results.entry(new_location).or_default().insert_ref(output, &combined) {
846 worklist.push_back((new_location, output, combined));
847 }
848 }
849 }
850 }
851 }
852 }
853
854 // This is an input port of an operator, or a scope output.
855 // We want to walk back the edges leading to it.
856 Port::Target(_port) => {
857 // Each target should have (at most) one source.
858 if let Some(&source) = reverse.get(&location) {
859 if results.entry(source).or_default().insert_ref(output, &summary) {
860 worklist.push_back((source, output, summary));
861 }
862 }
863 },
864 }
865 }
866
867 results
868}
869
870/// Logging types for reachability tracking events.
871pub mod logging {
872 use std::time::Duration;
873
874 use timely_container::CapacityContainerBuilder;
875 use timely_logging::TypedLogger;
876 use crate::logging_core::Logger;
877
878 /// A container builder for tracker events.
879 pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;
880
881 /// A logger with additional identifying information about the tracker.
882 pub struct TrackerLogger<T: Clone + 'static> {
883 identifier: usize,
884 logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
885 }
886
887 impl<T: Clone + 'static> TrackerLogger<T> {
888 /// Create a new tracker logger from its fields.
889 pub fn new(identifier: usize, logger: Logger<TrackerEventBuilder<T>>) -> Self {
890 Self { identifier, logger: logger.into() }
891 }
892
893 /// Log source update events with additional identifying information.
894 pub fn log_source_updates<'a, I>(&mut self, updates: I)
895 where
896 I: IntoIterator<Item = (usize, usize, &'a T, i64)>
897 {
898 let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
899 if !updates.is_empty() {
900 self.logger.log({
901 SourceUpdate {
902 tracker_id: self.identifier,
903 updates
904 }
905 });
906 }
907 }
908 /// Log target update events with additional identifying information.
909 pub fn log_target_updates<'a, I>(&mut self, updates: I)
910 where
911 I: IntoIterator<Item = (usize, usize, &'a T, i64)>
912 {
913 let updates: Vec<_> = updates.into_iter().map(|(a,b,c,d)| (a,b,c.clone(),d)).collect();
914 if !updates.is_empty() {
915 self.logger.log({
916 TargetUpdate {
917 tracker_id: self.identifier,
918 updates
919 }
920 });
921 }
922 }
923 }
924
925 /// Events that the tracker may record.
926 #[derive(Debug, Clone)]
927 pub enum TrackerEvent<T> {
928 /// Updates made at a source of data.
929 SourceUpdate(SourceUpdate<T>),
930 /// Updates made at a target of data.
931 TargetUpdate(TargetUpdate<T>),
932 }
933
934 /// An update made at a source of data.
935 #[derive(Debug, Clone)]
936 pub struct SourceUpdate<T> {
937 /// An identifier for the tracker.
938 pub tracker_id: usize,
939 /// Updates themselves, as `(node, port, time, diff)`.
940 pub updates: Vec<(usize, usize, T, i64)>,
941 }
942
943 /// An update made at a target of data.
944 #[derive(Debug, Clone)]
945 pub struct TargetUpdate<T> {
946 /// An identifier for the tracker.
947 pub tracker_id: usize,
948 /// Updates themselves, as `(node, port, time, diff)`.
949 pub updates: Vec<(usize, usize, T, i64)>,
950 }
951
952 impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
953 fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
954 }
955
956 impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
957 fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
958 }
959}
960
961// The Drop implementation for `Tracker` makes sure that reachability logging is correct for
962// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`,
963// because in all other cases the tracker stays alive while it has outstanding work, leaving no
964// remaining work for this Drop implementation.
965impl<T: Timestamp> Drop for Tracker<T> {
966 fn drop(&mut self) {
967 let logger = if let Some(logger) = &mut self.logger {
968 logger
969 } else {
970 // No cleanup necessary when there is no logger.
971 return;
972 };
973
974 // Retract pending data that `propagate_all` would normally log.
975 for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
976 let target_changes = per_operator.targets
977 .iter_mut()
978 .enumerate()
979 .flat_map(|(port, target)| {
980 target.pointstamps
981 .updates()
982 .map(move |(time, diff)| (index, port, time, -diff))
983 });
984
985 logger.log_target_updates(target_changes);
986
987 let source_changes = per_operator.sources
988 .iter_mut()
989 .enumerate()
990 .flat_map(|(port, source)| {
991 source.pointstamps
992 .updates()
993 .map(move |(time, diff)| (index, port, time, -diff))
994 });
995
996 logger.log_source_updates(source_changes);
997 }
998 }
999}