Skip to main content

timely/progress/
subgraph.rs

1//! A dataflow subgraph
2//!
3//! Timely dataflow graphs can be nested hierarchically, where some region of
4//! graph is grouped, and presents upwards as an operator. This grouping needs
5//! some care, to make sure that the presented operator reflects the behavior
6//! of the grouped operators.
7
8use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::TimelyLogger as Logger;
14use crate::logging::TimelySummaryLogger as SummaryLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30// IMPORTANT : by convention, a child identifier of zero is used to indicate inputs and outputs of
31// the Subgraph itself. An identifier greater than zero corresponds to an actual child, which can
32// be found at position (id - 1) in the `children` field of the Subgraph.
33
34/// A builder for interactively initializing a `Subgraph`.
35///
36/// This collects all the information necessary to get a `Subgraph` up and
37/// running, and is important largely through its `build` method which
38/// actually creates a `Subgraph`.
39pub struct SubgraphBuilder<TInner>
40where
41    TInner: Timestamp,
42{
43    /// The name of this subgraph.
44    pub name: String,
45
46    /// A sequence of integers uniquely identifying the subgraph.
47    pub path: Rc<[usize]>,
48
49    /// The index assigned to the subgraph by its parent.
50    index: usize,
51
52    /// A global identifier for this subgraph.
53    identifier: usize,
54
55    // Deferred children: (operator, index, identifier). Built into PerOperatorState at build time.
56    children: Vec<(Box<dyn Operate<TInner>>, usize, usize)>,
57    child_count: usize,
58
59    edge_stash: Vec<(Source, Target)>,
60
61    // shared state written to by the datapath, counting records entering this subgraph instance.
62    input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
63
64    // expressed capabilities, used to filter changes against.
65    outputs: usize,
66
67}
68
69impl<TInner> SubgraphBuilder<TInner>
70where
71    TInner: Timestamp,
72{
73    /// Allocates a new input to the subgraph and returns the target to that input in the outer graph.
74    pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
75        self.input_messages.push(shared_counts);
76        Target::new(self.index, self.input_messages.len() - 1)
77    }
78
79    /// Allocates a new output from the subgraph and returns the source of that output in the outer graph.
80    pub fn new_output(&mut self) -> Source {
81        self.outputs += 1;
82        Source::new(self.index, self.outputs - 1)
83    }
84
85    /// Introduces a dependence from the source to the target.
86    ///
87    /// This method does not effect data movement, but rather reveals to the progress tracking infrastructure
88    /// that messages produced by `source` should be expected to be consumed at `target`.
89    pub fn connect(&mut self, source: Source, target: Target) {
90        self.edge_stash.push((source, target));
91    }
92
93    /// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph,
94    /// terminating with the local index of the new subgraph itself.
95    pub fn new_from(
96        path: Rc<[usize]>,
97        identifier: usize,
98        name: &str,
99    )
100        -> SubgraphBuilder<TInner>
101    {
102        let index = path[path.len() - 1];
103
104        SubgraphBuilder {
105            name: name.to_owned(),
106            path,
107            index,
108            identifier,
109            children: Vec::new(),
110            child_count: 1,
111            edge_stash: Vec::new(),
112            input_messages: Vec::new(),
113            outputs: 0,
114        }
115    }
116
117    /// Allocates a new child identifier, for later use.
118    pub fn allocate_child_id(&mut self) -> usize {
119        self.child_count += 1;
120        self.child_count - 1
121    }
122
123    /// Adds a new child to the subgraph.
124    ///
125    /// The child will be initialized and logged when [`build`] is called.
126    pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
127        self.children.push((child, index, identifier));
128    }
129
130    /// Now that initialization is complete, actually build a subgraph.
131    pub fn build<TOuter: Timestamp>(mut self, worker: &crate::worker::Worker) -> Subgraph<TOuter, TInner> {
132        // at this point, the subgraph is frozen. we should initialize any internal state which
133        // may have been determined after construction (e.g. the numbers of inputs and outputs).
134        // we also need to determine what to return as a summary and initial capabilities, which
135        // will depend on child summaries and capabilities, as well as edges in the subgraph.
136
137        let inputs = self.input_messages.len();
138        let outputs = self.outputs;
139
140        let type_name = std::any::type_name::<TInner>();
141        let mut logging = worker.logging();
142        let mut summary_logging = worker.logger_for(&format!("timely/summary/{type_name}"));
143
144        // Sort stashed children by index, and preface with a child zero mirroring the subgraph shape.
145        self.children.sort_unstable_by_key(|&(_, index, _)| index);
146        let mut children: Vec<_> = [PerOperatorState::empty(outputs, inputs)]
147            .into_iter()
148            .chain(self.children.into_iter().map(|(operator, index, identifier)| {
149                let child = PerOperatorState::new(operator, index, identifier, logging.clone(), &mut summary_logging);
150                if let Some(l) = &mut logging {
151                    let mut child_path = Vec::with_capacity(self.path.len() + 1);
152                    child_path.extend_from_slice(&self.path[..]);
153                    child_path.push(index);
154                    l.log(crate::logging::OperatesEvent {
155                        id: identifier,
156                        addr: child_path,
157                        name: child.name.to_owned(),
158                    });
159                }
160                child
161            }))
162            .collect();
163        assert!(children.iter().enumerate().all(|(i,x)| i == x.index));
164
165        let mut builder = reachability::Builder::new();
166
167        // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
168        let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
169        builder.add_node(0, outputs, inputs, summary);
170        for (index, child) in children.iter().enumerate().skip(1) {
171            builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
172        }
173
174        for (source, target) in self.edge_stash {
175            children[source.node].edges[source.port].push(target);
176            builder.add_edge(source, target);
177        }
178
179        let reachability_logging =
180        worker.logger_for(&format!("timely/reachability/{type_name}"))
181              .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
182        let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
183        let (tracker, scope_summary) = builder.build(reachability_logging);
184
185        let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, logging, progress_logging);
186
187        let mut incomplete = vec![true; children.len()];
188        incomplete[0] = false;
189        let incomplete_count = incomplete.len() - 1;
190
191        let activations = worker.activations();
192
193        activations.borrow_mut().activate(&self.path[..]);
194
195        // The subgraph's per-input interest is conservatively the max across all children's inputs.
196        let max_interest = children.iter()
197            .flat_map(|c| c.notify.iter().copied())
198            .max()
199            .unwrap_or(FrontierInterest::Never);
200        let notify_me: Vec<FrontierInterest> = vec![max_interest; inputs];
201
202        Subgraph {
203            name: self.name,
204            path: self.path,
205            inputs,
206            outputs,
207            incomplete,
208            incomplete_count,
209            activations,
210            temp_active: BinaryHeap::new(),
211            maybe_shutdown: Vec::new(),
212            children,
213            input_messages: self.input_messages,
214            output_capabilities: vec![MutableAntichain::new(); self.outputs],
215
216            local_pointstamp: ChangeBatch::new(),
217            final_pointstamp: ChangeBatch::new(),
218            progcaster,
219            pointstamp_tracker: tracker,
220
221            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
222            scope_summary,
223
224            progress_mode: worker.config().progress_mode,
225            notify_me,
226        }
227    }
228}
229
230
231/// A dataflow subgraph.
232///
233/// The subgraph type contains the infrastructure required to describe the topology of and track
234/// progress within a dataflow subgraph.
235pub struct Subgraph<TOuter, TInner>
236where
237    TOuter: Timestamp,
238    TInner: Timestamp,
239{
240    name: String,           // an informative name.
241    /// Path of identifiers from the root.
242    pub path: Rc<[usize]>,
243    inputs: usize,          // number of inputs.
244    outputs: usize,         // number of outputs.
245
246    // handles to the children of the scope. index i corresponds to entry i-1, unless things change.
247    children: Vec<PerOperatorState<TInner>>,
248
249    incomplete: Vec<bool>,   // the incompletion status of each child.
250    incomplete_count: usize, // the number of incomplete children.
251
252    // shared activations (including children).
253    activations: Rc<RefCell<Activations>>,
254    temp_active: BinaryHeap<Reverse<usize>>,
255    maybe_shutdown: Vec<usize>,
256
257    // shared state written to by the datapath, counting records entering this subgraph instance.
258    input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
259
260    // expressed capabilities, used to filter changes against.
261    output_capabilities: Vec<MutableAntichain<TOuter>>,
262
263    // pointstamp messages to exchange. ultimately destined for `messages` or `internal`.
264    local_pointstamp: ChangeBatch<(Location, TInner)>,
265    final_pointstamp: ChangeBatch<(Location, TInner)>,
266
267    // Graph structure and pointstamp tracker.
268    // pointstamp_builder: reachability::Builder<TInner>,
269    pointstamp_tracker: reachability::Tracker<TInner>,
270
271    // channel / whatever used to communicate pointstamp updates to peers.
272    progcaster: Progcaster<TInner>,
273
274    shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
275    scope_summary: Connectivity<TInner::Summary>,
276
277    progress_mode: ProgressMode,
278
279    notify_me: Vec<FrontierInterest>,
280}
281
282impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
283where
284    TOuter: Timestamp,
285    TInner: Timestamp+Refines<TOuter>,
286{
287    fn name(&self) -> &str { &self.name }
288
289    fn path(&self) -> &[usize] { &self.path }
290
291    fn schedule(&mut self) -> bool {
292
293        // This method performs several actions related to progress tracking
294        // and child operator scheduling. The actions have been broken apart
295        // into atomic actions that should be able to be safely executed in
296        // isolation, by a potentially clueless user (yours truly).
297
298        self.accept_frontier();         // Accept supplied frontier changes.
299        self.harvest_inputs();          // Count records entering the scope.
300
301        // Receive post-exchange progress updates.
302        self.progcaster.recv(&mut self.final_pointstamp);
303
304        // Commit and propagate final pointstamps.
305        self.propagate_pointstamps();
306
307        {   // Enqueue active children; scoped to let borrow drop.
308            let temp_active = &mut self.temp_active;
309            self.activations
310                .borrow_mut()
311                .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
312        }
313
314        // Schedule child operators.
315        //
316        // We should be able to schedule arbitrary subsets of children, as
317        // long as we eventually schedule all children that need to do work.
318        let mut previous = 0;
319        while let Some(Reverse(index)) = self.temp_active.pop() {
320            // De-duplicate, and don't revisit.
321            if index > previous {
322                // TODO: This is a moment where a scheduling decision happens.
323                self.activate_child(index);
324                previous = index;
325            }
326        }
327
328        // Transmit produced progress updates.
329        self.send_progress();
330
331        // If child scopes surface more final pointstamp updates we must re-execute.
332        if !self.final_pointstamp.is_empty() {
333            self.activations.borrow_mut().activate(&self.path[..]);
334        }
335
336        // A subgraph is incomplete if any child is incomplete, or there are outstanding messages.
337        let incomplete = self.incomplete_count > 0;
338        let tracking = self.pointstamp_tracker.tracking_anything();
339
340        incomplete || tracking
341    }
342}
343
344
345impl<TOuter, TInner> Subgraph<TOuter, TInner>
346where
347    TOuter: Timestamp,
348    TInner: Timestamp+Refines<TOuter>,
349{
350    /// Schedules a child operator and collects progress statements.
351    ///
352    /// The return value indicates that the child task cannot yet shut down.
353    fn activate_child(&mut self, child_index: usize) -> bool {
354
355        let child = &mut self.children[child_index];
356
357        let incomplete = child.schedule();
358
359        if incomplete != self.incomplete[child_index] {
360            if incomplete { self.incomplete_count += 1; }
361            else          { self.incomplete_count -= 1; }
362            self.incomplete[child_index] = incomplete;
363        }
364
365        if !incomplete {
366            // Consider shutting down the child, if neither capabilities nor input frontier.
367            let child_state = self.pointstamp_tracker.node_state(child_index);
368            let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
369            let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
370            if frontiers_empty && no_capabilities {
371                child.shut_down();
372            }
373        }
374        else {
375            // In debug mode, check that the progress statements do not violate invariants.
376            #[cfg(debug_assertions)] {
377                child.validate_progress(self.pointstamp_tracker.node_state(child_index));
378            }
379        }
380
381        // Extract progress statements into either pre- or post-exchange buffers.
382        if child.local {
383            child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
384        }
385        else {
386            child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
387        }
388
389        incomplete
390    }
391
392    /// Move frontier changes from parent into progress statements.
393    fn accept_frontier(&mut self) {
394        for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
395            let source = Source::new(0, port);
396            for (time, value) in changes.drain() {
397                self.pointstamp_tracker.update_source(
398                    source,
399                    TInner::to_inner(time),
400                    value
401                );
402            }
403        }
404    }
405
406    /// Collects counts of records entering the scope.
407    ///
408    /// This method moves message counts from the output of child zero to the inputs to
409    /// attached operators. This is a bit of a hack, because normally one finds capabilities
410    /// at an operator output, rather than message counts. These counts are used only at
411    /// mark [XXX] where they are reported upwards to the parent scope.
412    fn harvest_inputs(&mut self) {
413        for input in 0 .. self.inputs {
414            let source = Location::new_source(0, input);
415            let mut borrowed = self.input_messages[input].borrow_mut();
416            for (time, delta) in borrowed.drain() {
417                for target in &self.children[0].edges[input] {
418                    self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
419                }
420                self.local_pointstamp.update((source, time), -delta);
421            }
422        }
423    }
424
425    /// Commits pointstamps in `self.final_pointstamp`.
426    ///
427    /// This method performs several steps that for reasons of correctness must
428    /// be performed atomically, before control is returned. These are:
429    ///
430    /// 1. Changes to child zero's outputs are reported as consumed messages.
431    /// 2. Changes to child zero's inputs are reported as produced messages.
432    /// 3. Frontiers for child zero's inputs are reported as internal capabilities.
433    ///
434    /// Perhaps importantly, the frontiers for child zero are determined *without*
435    /// the messages that are produced for child zero inputs, as we only want to
436    /// report retained internal capabilities, and not now-external messages.
437    ///
438    /// In the course of propagating progress changes, we also propagate progress
439    /// changes for all of the managed child operators.
440    fn propagate_pointstamps(&mut self) {
441
442        // Process exchanged pointstamps. Handle child 0 statements carefully.
443        for ((location, timestamp), delta) in self.final_pointstamp.drain() {
444
445            // Child 0 corresponds to the parent scope and has special handling.
446            if location.node == 0 {
447                match location.port {
448                    // [XXX] Report child 0's capabilities as consumed messages.
449                    //       Note the re-negation of delta, to make counts positive.
450                    Port::Source(scope_input) => {
451                        self.shared_progress
452                            .borrow_mut()
453                            .consumeds[scope_input]
454                            .update(timestamp.to_outer(), -delta);
455                    },
456                    // [YYY] Report child 0's input messages as produced messages.
457                    //       Do not otherwise record, as we will not see subtractions,
458                    //       and we do not want to present their implications upward.
459                    Port::Target(scope_output) => {
460                        self.shared_progress
461                            .borrow_mut()
462                            .produceds[scope_output]
463                            .update(timestamp.to_outer(), delta);
464                    },
465                }
466            }
467            else {
468                self.pointstamp_tracker.update(location, timestamp, delta);
469            }
470        }
471
472        // Propagate implications of progress changes.
473        self.pointstamp_tracker.propagate_all();
474
475        // Drain propagated information into shared progress structure.
476        let (pushed, operators) = self.pointstamp_tracker.pushed();
477        for ((location, time), diff) in pushed.drain() {
478            self.maybe_shutdown.push(location.node);
479            // Targets are actionable, sources are not.
480            if let crate::progress::Port::Target(port) = location.port {
481                // Activate based on expressed frontier interest for this input.
482                let activate = match self.children[location.node].notify[port] {
483                    FrontierInterest::Always => true,
484                    FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 }
485                    FrontierInterest::Never => false,
486                };
487                if activate { self.temp_active.push(Reverse(location.node)); }
488
489                // Keep this current independent of the interest.
490                self.children[location.node]
491                    .shared_progress
492                    .borrow_mut()
493                    .frontiers[port]
494                    .update(time, diff);
495            }
496        }
497
498        // Consider scheduling each recipient of progress information to shut down.
499        self.maybe_shutdown.sort_unstable();
500        self.maybe_shutdown.dedup();
501        for child_index in self.maybe_shutdown.drain(..) {
502            let child_state = self.pointstamp_tracker.node_state(child_index);
503            let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
504            let no_capabilities = child_state.cap_counts == 0;
505            if frontiers_empty && no_capabilities {
506                self.temp_active.push(Reverse(child_index));
507            }
508        }
509
510        // Extract child zero frontier changes and report as internal capability changes.
511        for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
512            self.pointstamp_tracker
513                .pushed_output()[output]
514                .drain()
515                .map(|(time, diff)| (time.to_outer(), diff))
516                .filter_through(&mut self.output_capabilities[output])
517                .for_each(|(time, diff)| internal.update(time, diff));
518        }
519    }
520
521    /// Sends local progress updates to all workers.
522    ///
523    /// This method does not guarantee that all of `self.local_pointstamps` are
524    /// sent, but that no blocking pointstamps remain
525    fn send_progress(&mut self) {
526
527        // If we are requested to eagerly send progress updates, or if there are
528        // updates visible in the scope-wide frontier, we must send all updates.
529        let must_send = self.progress_mode == ProgressMode::Eager || {
530            let tracker = &mut self.pointstamp_tracker;
531            self.local_pointstamp
532                .iter()
533                .any(|((location, time), diff)|
534                    // Must publish scope-wide visible subtractions.
535                    tracker.is_global(*location, time) && *diff < 0 ||
536                    // Must confirm the receipt of inbound messages.
537                    location.node == 0
538                )
539        };
540
541        if must_send {
542            self.progcaster.send(&mut self.local_pointstamp);
543        }
544    }
545}
546
547
548impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
549where
550    TOuter: Timestamp,
551    TInner: Timestamp+Refines<TOuter>,
552{
553    fn local(&self) -> bool { false }
554    fn inputs(&self)  -> usize { self.inputs }
555    fn outputs(&self) -> usize { self.outputs }
556
557    // produces connectivity summaries from inputs to outputs, and reports initial internal
558    // capabilities on each of the outputs (projecting capabilities from contained scopes).
559    fn initialize(mut self: Box<Self>) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, Box<dyn Schedule>) {
560
561        // double-check that child 0 (the outside world) is correctly shaped.
562        assert_eq!(self.children[0].outputs, self.inputs());
563        assert_eq!(self.children[0].inputs, self.outputs());
564
565        // Note that we need to have `self.inputs()` elements in the summary
566        // with each element containing `self.outputs()` antichains regardless
567        // of how long `self.scope_summary` is
568        let mut internal_summary = vec![PortConnectivity::default(); self.inputs()];
569        for (input_idx, input) in self.scope_summary.iter().enumerate() {
570            for (output_idx, output) in input.iter_ports() {
571                for outer in output.elements().iter().cloned().map(TInner::summarize) {
572                    internal_summary[input_idx].insert(output_idx, outer);
573                }
574            }
575        }
576
577        debug_assert_eq!(
578            internal_summary.len(),
579            self.inputs(),
580            "the internal summary should have as many elements as there are inputs",
581        );
582        debug_assert!(
583            internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())),
584            "each element of the internal summary should only reference valid outputs",
585        );
586
587        // Each child has expressed initial capabilities (their `shared_progress.internals`).
588        // We introduce these into the progress tracker to determine the scope's initial
589        // internal capabilities.
590        for child in self.children.iter_mut() {
591            child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
592        }
593
594        self.propagate_pointstamps();  // Propagate expressed capabilities to output frontiers.
595
596        // Return summaries and shared progress information.
597        (internal_summary, Rc::clone(&self.shared_progress), self)
598    }
599
600    fn notify_me(&self) -> &[FrontierInterest] { &self.notify_me }
601}
602
603struct PerOperatorState<T: Timestamp> {
604
605    name: String,       // name of the operator
606    index: usize,       // index of the operator within its parent scope
607    id: usize,          // worker-unique identifier
608
609    local: bool,        // indicates whether the operator will exchange data or not
610    notify: Vec<FrontierInterest>,
611    inputs: usize,      // number of inputs to the operator
612    outputs: usize,     // number of outputs from the operator
613
614    operator: Option<Box<dyn Schedule>>,
615
616    edges: Vec<Vec<Target>>,    // edges from the outputs of the operator
617
618    shared_progress: Rc<RefCell<SharedProgress<T>>>,
619
620    internal_summary: Connectivity<T::Summary>,   // cached result from initialize.
621
622    logging: Option<Logger>,
623}
624
625impl<T: Timestamp> PerOperatorState<T> {
626
627    fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
628        PerOperatorState {
629            name:       "External".to_owned(),
630            operator:   None,
631            index:      0,
632            id:         usize::MAX,
633            local:      false,
634            notify:     vec![FrontierInterest::IfCapability; inputs],
635            inputs,
636            outputs,
637
638            edges: vec![Vec::new(); outputs],
639
640            logging: None,
641
642            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
643            internal_summary: Vec::new(),
644        }
645    }
646
647    pub fn new(
648        scope: Box<dyn Operate<T>>,
649        index: usize,
650        identifier: usize,
651        logging: Option<Logger>,
652        summary_logging: &mut Option<SummaryLogger<T::Summary>>,
653    ) -> PerOperatorState<T>
654    {
655        let local = scope.local();
656        let inputs = scope.inputs();
657        let outputs = scope.outputs();
658        let notify = scope.notify_me().to_vec();
659
660        let (internal_summary, shared_progress, operator) = scope.initialize();
661
662        if let Some(l) = summary_logging {
663            l.log(crate::logging::OperatesSummaryEvent {
664                id: identifier,
665                summary: internal_summary.clone(),
666            })
667        }
668
669        assert_eq!(
670            internal_summary.len(),
671            inputs,
672            "operator summary has {} inputs when {} were expected",
673            internal_summary.len(),
674            inputs,
675        );
676        assert!(
677            internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)),
678            "operator summary references invalid output port",
679        );
680
681        PerOperatorState {
682            name:               operator.name().to_owned(),
683            operator:           Some(operator),
684            index,
685            id:                 identifier,
686            local,
687            notify,
688            inputs,
689            outputs,
690            edges:              vec![vec![]; outputs],
691
692            logging,
693
694            shared_progress,
695            internal_summary,
696        }
697    }
698
699    pub fn schedule(&mut self) -> bool {
700
701        if let Some(ref mut operator) = self.operator {
702
703            // Perhaps log information about the start of the schedule call.
704            if let Some(l) = self.logging.as_mut() {
705                // FIXME: There is no contract that the operator must consume frontier changes.
706                //        This report could be spurious.
707                // TODO:  Perhaps fold this in to `ScheduleEvent::start()` as a "reason"?
708                let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
709                if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
710                    l.log(crate::logging::PushProgressEvent { op_id: self.id })
711                }
712
713                l.log(crate::logging::ScheduleEvent::start(self.id));
714            }
715
716            let incomplete = operator.schedule();
717
718            // Perhaps log information about the stop of the schedule call.
719            if let Some(l) = self.logging.as_mut() {
720                l.log(crate::logging::ScheduleEvent::stop(self.id));
721            }
722
723            incomplete
724        }
725        else {
726
727            // If the operator is closed and we are reporting progress at it, something has surely gone wrong.
728            if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
729                println!("Operator prematurely shut down: {}", self.name);
730                println!("  {:?}", self.notify);
731                println!("  {:?}", self.shared_progress.borrow_mut().frontiers);
732                panic!();
733            }
734
735            // A closed operator shouldn't keep anything open.
736            false
737        }
738    }
739
740    fn shut_down(&mut self) {
741        if self.operator.is_some() {
742            if let Some(l) = self.logging.as_mut() {
743                l.log(crate::logging::ShutdownEvent{ id: self.id });
744            }
745            self.operator = None;
746        }
747    }
748
749    /// Extracts shared progress information and converts to pointstamp changes.
750    fn extract_progress(&self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
751
752        let shared_progress = &mut *self.shared_progress.borrow_mut();
753
754        // Migrate consumeds, internals, produceds into progress statements.
755        for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
756            let target = Location::new_target(self.index, input);
757            for (time, delta) in consumed.drain() {
758                pointstamps.update((target, time), -delta);
759            }
760        }
761        for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
762            let source = Location::new_source(self.index, output);
763            for (time, delta) in internal.drain() {
764                pointstamps.update((source, time.clone()), delta);
765            }
766        }
767        for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
768            for (time, delta) in produced.drain() {
769                for target in &self.edges[output] {
770                    pointstamps.update((Location::from(*target), time.clone()), delta);
771                    temp_active.push(Reverse(target.node));
772                }
773            }
774        }
775    }
776
777    /// Test the validity of `self.shared_progress`.
778    ///
779    /// The validity of shared progress information depends on both the external frontiers and the
780    /// internal capabilities, as events can occur that cannot be explained locally otherwise.
781    #[allow(dead_code)]
782    fn validate_progress(&self, child_state: &reachability::PerOperator<T>) {
783
784        let shared_progress = &mut *self.shared_progress.borrow_mut();
785
786        // Increments to internal capabilities require a consumed input message, a
787        for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
788            for (time, diff) in internal.iter() {
789                if *diff > 0 {
790                    let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
791                    let internal = child_state.sources[output].implications.less_equal(time);
792                    if !consumed && !internal {
793                        println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
794                        panic!("Progress error; internal {:?}", self.name);
795                    }
796                }
797            }
798        }
799        for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
800            for (time, diff) in produced.iter() {
801                if *diff > 0 {
802                    let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
803                    let internal = child_state.sources[output].implications.less_equal(time);
804                    if !consumed && !internal {
805                        println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
806                        panic!("Progress error; produced {:?}", self.name);
807                    }
808                }
809            }
810        }
811    }
812}
813
814// Explicitly shut down the operator to get logged information.
815impl<T: Timestamp> Drop for PerOperatorState<T> {
816    fn drop(&mut self) {
817        self.shut_down();
818    }
819}