Skip to main content

timely/progress/
subgraph.rs

1//! A dataflow subgraph
2//!
3//! Timely dataflow graphs can be nested hierarchically, where some region of
4//! graph is grouped, and presents upwards as an operator. This grouping needs
5//! some care, to make sure that the presented operator reflects the behavior
6//! of the grouped operators.
7
8use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::TimelyLogger as Logger;
14use crate::logging::TimelySummaryLogger as SummaryLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30// IMPORTANT : by convention, a child identifier of zero is used to indicate inputs and outputs of
31// the Subgraph itself. An identifier greater than zero corresponds to an actual child, which can
32// be found at position (id - 1) in the `children` field of the Subgraph.
33
34/// A builder for interactively initializing a `Subgraph`.
35///
36/// This collects all the information necessary to get a `Subgraph` up and
37/// running, and is important largely through its `build` method which
38/// actually creates a `Subgraph`.
39pub struct SubgraphBuilder<TOuter, TInner>
40where
41    TOuter: Timestamp,
42    TInner: Timestamp,
43{
44    /// The name of this subgraph.
45    pub name: String,
46
47    /// A sequence of integers uniquely identifying the subgraph.
48    pub path: Rc<[usize]>,
49
50    /// The index assigned to the subgraph by its parent.
51    index: usize,
52
53    /// A global identifier for this subgraph.
54    identifier: usize,
55
56    // handles to the children of the scope. index i corresponds to entry i-1, unless things change.
57    children: Vec<PerOperatorState<TInner>>,
58    child_count: usize,
59
60    edge_stash: Vec<(Source, Target)>,
61
62    // shared state written to by the datapath, counting records entering this subgraph instance.
63    input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
64
65    // expressed capabilities, used to filter changes against.
66    output_capabilities: Vec<MutableAntichain<TOuter>>,
67
68    /// Logging handle
69    logging: Option<Logger>,
70    /// Typed logging handle for operator summaries.
71    summary_logging: Option<SummaryLogger<TInner::Summary>>,
72}
73
74impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
75where
76    TOuter: Timestamp,
77    TInner: Timestamp+Refines<TOuter>,
78{
79    /// Allocates a new input to the subgraph and returns the target to that input in the outer graph.
80    pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
81        self.input_messages.push(shared_counts);
82        Target::new(self.index, self.input_messages.len() - 1)
83    }
84
85    /// Allocates a new output from the subgraph and returns the source of that output in the outer graph.
86    pub fn new_output(&mut self) -> Source {
87        self.output_capabilities.push(MutableAntichain::new());
88        Source::new(self.index, self.output_capabilities.len() - 1)
89    }
90
91    /// Introduces a dependence from the source to the target.
92    ///
93    /// This method does not effect data movement, but rather reveals to the progress tracking infrastructure
94    /// that messages produced by `source` should be expected to be consumed at `target`.
95    pub fn connect(&mut self, source: Source, target: Target) {
96        self.edge_stash.push((source, target));
97    }
98
99    /// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph,
100    /// terminating with the local index of the new subgraph itself.
101    pub fn new_from(
102        path: Rc<[usize]>,
103        identifier: usize,
104        logging: Option<Logger>,
105        summary_logging: Option<SummaryLogger<TInner::Summary>>,
106        name: &str,
107    )
108        -> SubgraphBuilder<TOuter, TInner>
109    {
110        // Put an empty placeholder for "outer scope" representative.
111        let children = vec![PerOperatorState::empty(0, 0)];
112        let index = path[path.len() - 1];
113
114        SubgraphBuilder {
115            name: name.to_owned(),
116            path,
117            index,
118            identifier,
119            children,
120            child_count: 1,
121            edge_stash: Vec::new(),
122            input_messages: Vec::new(),
123            output_capabilities: Vec::new(),
124            logging,
125            summary_logging,
126        }
127    }
128
129    /// Allocates a new child identifier, for later use.
130    pub fn allocate_child_id(&mut self) -> usize {
131        self.child_count += 1;
132        self.child_count - 1
133    }
134
135    /// Adds a new child to the subgraph.
136    pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
137        let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging);
138        if let Some(l) = &mut self.logging {
139            let mut child_path = Vec::with_capacity(self.path.len() + 1);
140            child_path.extend_from_slice(&self.path[..]);
141            child_path.push(index);
142
143            l.log(crate::logging::OperatesEvent {
144                id: identifier,
145                addr: child_path,
146                name: child.name.to_owned(),
147            });
148        }
149        self.children.push(child);
150    }
151
152    /// Now that initialization is complete, actually build a subgraph.
153    pub fn build<A: crate::worker::AsWorker>(mut self, worker: &mut A) -> Subgraph<TOuter, TInner> {
154        // at this point, the subgraph is frozen. we should initialize any internal state which
155        // may have been determined after construction (e.g. the numbers of inputs and outputs).
156        // we also need to determine what to return as a summary and initial capabilities, which
157        // will depend on child summaries and capabilities, as well as edges in the subgraph.
158
159        // perhaps first check that the children are sanely identified
160        self.children.sort_unstable_by(|x,y| x.index.cmp(&y.index));
161        assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));
162
163        let inputs = self.input_messages.len();
164        let outputs = self.output_capabilities.len();
165
166        // Create empty child zero representative.
167        self.children[0] = PerOperatorState::empty(outputs, inputs);
168
169        let mut builder = reachability::Builder::new();
170
171        // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected.
172        let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
173        builder.add_node(0, outputs, inputs, summary);
174        for (index, child) in self.children.iter().enumerate().skip(1) {
175            builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
176        }
177
178        for (source, target) in self.edge_stash {
179            self.children[source.node].edges[source.port].push(target);
180            builder.add_edge(source, target);
181        }
182
183        // The `None` argument is optional logging infrastructure.
184        let type_name = std::any::type_name::<TInner>();
185        let reachability_logging =
186        worker.logger_for(&format!("timely/reachability/{type_name}"))
187              .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
188        let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
189        let (tracker, scope_summary) = builder.build(reachability_logging);
190
191        let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);
192
193        let mut incomplete = vec![true; self.children.len()];
194        incomplete[0] = false;
195        let incomplete_count = incomplete.len() - 1;
196
197        let activations = worker.activations();
198
199        activations.borrow_mut().activate(&self.path[..]);
200
201        // The subgraph's per-input interest is conservatively the max across all children's inputs.
202        let max_interest = self.children.iter()
203            .flat_map(|c| c.notify.iter().copied())
204            .max()
205            .unwrap_or(FrontierInterest::Never);
206        let notify_me: Vec<FrontierInterest> = vec![max_interest; inputs];
207
208        Subgraph {
209            name: self.name,
210            path: self.path,
211            inputs,
212            outputs,
213            incomplete,
214            incomplete_count,
215            activations,
216            temp_active: BinaryHeap::new(),
217            maybe_shutdown: Vec::new(),
218            children: self.children,
219            input_messages: self.input_messages,
220            output_capabilities: self.output_capabilities,
221
222            local_pointstamp: ChangeBatch::new(),
223            final_pointstamp: ChangeBatch::new(),
224            progcaster,
225            pointstamp_tracker: tracker,
226
227            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
228            scope_summary,
229
230            progress_mode: worker.config().progress_mode,
231            notify_me,
232        }
233    }
234}
235
236
237/// A dataflow subgraph.
238///
239/// The subgraph type contains the infrastructure required to describe the topology of and track
240/// progress within a dataflow subgraph.
241pub struct Subgraph<TOuter, TInner>
242where
243    TOuter: Timestamp,
244    TInner: Timestamp+Refines<TOuter>,
245{
246    name: String,           // an informative name.
247    /// Path of identifiers from the root.
248    pub path: Rc<[usize]>,
249    inputs: usize,          // number of inputs.
250    outputs: usize,         // number of outputs.
251
252    // handles to the children of the scope. index i corresponds to entry i-1, unless things change.
253    children: Vec<PerOperatorState<TInner>>,
254
255    incomplete: Vec<bool>,   // the incompletion status of each child.
256    incomplete_count: usize, // the number of incomplete children.
257
258    // shared activations (including children).
259    activations: Rc<RefCell<Activations>>,
260    temp_active: BinaryHeap<Reverse<usize>>,
261    maybe_shutdown: Vec<usize>,
262
263    // shared state written to by the datapath, counting records entering this subgraph instance.
264    input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
265
266    // expressed capabilities, used to filter changes against.
267    output_capabilities: Vec<MutableAntichain<TOuter>>,
268
269    // pointstamp messages to exchange. ultimately destined for `messages` or `internal`.
270    local_pointstamp: ChangeBatch<(Location, TInner)>,
271    final_pointstamp: ChangeBatch<(Location, TInner)>,
272
273    // Graph structure and pointstamp tracker.
274    // pointstamp_builder: reachability::Builder<TInner>,
275    pointstamp_tracker: reachability::Tracker<TInner>,
276
277    // channel / whatever used to communicate pointstamp updates to peers.
278    progcaster: Progcaster<TInner>,
279
280    shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
281    scope_summary: Connectivity<TInner::Summary>,
282
283    progress_mode: ProgressMode,
284
285    notify_me: Vec<FrontierInterest>,
286}
287
288impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
289where
290    TOuter: Timestamp,
291    TInner: Timestamp+Refines<TOuter>,
292{
293    fn name(&self) -> &str { &self.name }
294
295    fn path(&self) -> &[usize] { &self.path }
296
297    fn schedule(&mut self) -> bool {
298
299        // This method performs several actions related to progress tracking
300        // and child operator scheduling. The actions have been broken apart
301        // into atomic actions that should be able to be safely executed in
302        // isolation, by a potentially clueless user (yours truly).
303
304        self.accept_frontier();         // Accept supplied frontier changes.
305        self.harvest_inputs();          // Count records entering the scope.
306
307        // Receive post-exchange progress updates.
308        self.progcaster.recv(&mut self.final_pointstamp);
309
310        // Commit and propagate final pointstamps.
311        self.propagate_pointstamps();
312
313        {   // Enqueue active children; scoped to let borrow drop.
314            let temp_active = &mut self.temp_active;
315            self.activations
316                .borrow_mut()
317                .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
318        }
319
320        // Schedule child operators.
321        //
322        // We should be able to schedule arbitrary subsets of children, as
323        // long as we eventually schedule all children that need to do work.
324        let mut previous = 0;
325        while let Some(Reverse(index)) = self.temp_active.pop() {
326            // De-duplicate, and don't revisit.
327            if index > previous {
328                // TODO: This is a moment where a scheduling decision happens.
329                self.activate_child(index);
330                previous = index;
331            }
332        }
333
334        // Transmit produced progress updates.
335        self.send_progress();
336
337        // If child scopes surface more final pointstamp updates we must re-execute.
338        if !self.final_pointstamp.is_empty() {
339            self.activations.borrow_mut().activate(&self.path[..]);
340        }
341
342        // A subgraph is incomplete if any child is incomplete, or there are outstanding messages.
343        let incomplete = self.incomplete_count > 0;
344        let tracking = self.pointstamp_tracker.tracking_anything();
345
346        incomplete || tracking
347    }
348}
349
350
351impl<TOuter, TInner> Subgraph<TOuter, TInner>
352where
353    TOuter: Timestamp,
354    TInner: Timestamp+Refines<TOuter>,
355{
356    /// Schedules a child operator and collects progress statements.
357    ///
358    /// The return value indicates that the child task cannot yet shut down.
359    fn activate_child(&mut self, child_index: usize) -> bool {
360
361        let child = &mut self.children[child_index];
362
363        let incomplete = child.schedule();
364
365        if incomplete != self.incomplete[child_index] {
366            if incomplete { self.incomplete_count += 1; }
367            else          { self.incomplete_count -= 1; }
368            self.incomplete[child_index] = incomplete;
369        }
370
371        if !incomplete {
372            // Consider shutting down the child, if neither capabilities nor input frontier.
373            let child_state = self.pointstamp_tracker.node_state(child_index);
374            let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
375            let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
376            if frontiers_empty && no_capabilities {
377                child.shut_down();
378            }
379        }
380        else {
381            // In debug mode, check that the progress statements do not violate invariants.
382            #[cfg(debug_assertions)] {
383                child.validate_progress(self.pointstamp_tracker.node_state(child_index));
384            }
385        }
386
387        // Extract progress statements into either pre- or post-exchange buffers.
388        if child.local {
389            child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
390        }
391        else {
392            child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
393        }
394
395        incomplete
396    }
397
398    /// Move frontier changes from parent into progress statements.
399    fn accept_frontier(&mut self) {
400        for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
401            let source = Source::new(0, port);
402            for (time, value) in changes.drain() {
403                self.pointstamp_tracker.update_source(
404                    source,
405                    TInner::to_inner(time),
406                    value
407                );
408            }
409        }
410    }
411
412    /// Collects counts of records entering the scope.
413    ///
414    /// This method moves message counts from the output of child zero to the inputs to
415    /// attached operators. This is a bit of a hack, because normally one finds capabilities
416    /// at an operator output, rather than message counts. These counts are used only at
417    /// mark [XXX] where they are reported upwards to the parent scope.
418    fn harvest_inputs(&mut self) {
419        for input in 0 .. self.inputs {
420            let source = Location::new_source(0, input);
421            let mut borrowed = self.input_messages[input].borrow_mut();
422            for (time, delta) in borrowed.drain() {
423                for target in &self.children[0].edges[input] {
424                    self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
425                }
426                self.local_pointstamp.update((source, time), -delta);
427            }
428        }
429    }
430
431    /// Commits pointstamps in `self.final_pointstamp`.
432    ///
433    /// This method performs several steps that for reasons of correctness must
434    /// be performed atomically, before control is returned. These are:
435    ///
436    /// 1. Changes to child zero's outputs are reported as consumed messages.
437    /// 2. Changes to child zero's inputs are reported as produced messages.
438    /// 3. Frontiers for child zero's inputs are reported as internal capabilities.
439    ///
440    /// Perhaps importantly, the frontiers for child zero are determined *without*
441    /// the messages that are produced for child zero inputs, as we only want to
442    /// report retained internal capabilities, and not now-external messages.
443    ///
444    /// In the course of propagating progress changes, we also propagate progress
445    /// changes for all of the managed child operators.
446    fn propagate_pointstamps(&mut self) {
447
448        // Process exchanged pointstamps. Handle child 0 statements carefully.
449        for ((location, timestamp), delta) in self.final_pointstamp.drain() {
450
451            // Child 0 corresponds to the parent scope and has special handling.
452            if location.node == 0 {
453                match location.port {
454                    // [XXX] Report child 0's capabilities as consumed messages.
455                    //       Note the re-negation of delta, to make counts positive.
456                    Port::Source(scope_input) => {
457                        self.shared_progress
458                            .borrow_mut()
459                            .consumeds[scope_input]
460                            .update(timestamp.to_outer(), -delta);
461                    },
462                    // [YYY] Report child 0's input messages as produced messages.
463                    //       Do not otherwise record, as we will not see subtractions,
464                    //       and we do not want to present their implications upward.
465                    Port::Target(scope_output) => {
466                        self.shared_progress
467                            .borrow_mut()
468                            .produceds[scope_output]
469                            .update(timestamp.to_outer(), delta);
470                    },
471                }
472            }
473            else {
474                self.pointstamp_tracker.update(location, timestamp, delta);
475            }
476        }
477
478        // Propagate implications of progress changes.
479        self.pointstamp_tracker.propagate_all();
480
481        // Drain propagated information into shared progress structure.
482        let (pushed, operators) = self.pointstamp_tracker.pushed();
483        for ((location, time), diff) in pushed.drain() {
484            self.maybe_shutdown.push(location.node);
485            // Targets are actionable, sources are not.
486            if let crate::progress::Port::Target(port) = location.port {
487                // Activate based on expressed frontier interest for this input.
488                let activate = match self.children[location.node].notify[port] {
489                    FrontierInterest::Always => true,
490                    FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 }
491                    FrontierInterest::Never => false,
492                };
493                if activate { self.temp_active.push(Reverse(location.node)); }
494
495                // Keep this current independent of the interest.
496                self.children[location.node]
497                    .shared_progress
498                    .borrow_mut()
499                    .frontiers[port]
500                    .update(time, diff);
501            }
502        }
503
504        // Consider scheduling each recipient of progress information to shut down.
505        self.maybe_shutdown.sort_unstable();
506        self.maybe_shutdown.dedup();
507        for child_index in self.maybe_shutdown.drain(..) {
508            let child_state = self.pointstamp_tracker.node_state(child_index);
509            let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
510            let no_capabilities = child_state.cap_counts == 0;
511            if frontiers_empty && no_capabilities {
512                self.temp_active.push(Reverse(child_index));
513            }
514        }
515
516        // Extract child zero frontier changes and report as internal capability changes.
517        for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
518            self.pointstamp_tracker
519                .pushed_output()[output]
520                .drain()
521                .map(|(time, diff)| (time.to_outer(), diff))
522                .filter_through(&mut self.output_capabilities[output])
523                .for_each(|(time, diff)| internal.update(time, diff));
524        }
525    }
526
527    /// Sends local progress updates to all workers.
528    ///
529    /// This method does not guarantee that all of `self.local_pointstamps` are
530    /// sent, but that no blocking pointstamps remain
531    fn send_progress(&mut self) {
532
533        // If we are requested to eagerly send progress updates, or if there are
534        // updates visible in the scope-wide frontier, we must send all updates.
535        let must_send = self.progress_mode == ProgressMode::Eager || {
536            let tracker = &mut self.pointstamp_tracker;
537            self.local_pointstamp
538                .iter()
539                .any(|((location, time), diff)|
540                    // Must publish scope-wide visible subtractions.
541                    tracker.is_global(*location, time) && *diff < 0 ||
542                    // Must confirm the receipt of inbound messages.
543                    location.node == 0
544                )
545        };
546
547        if must_send {
548            self.progcaster.send(&mut self.local_pointstamp);
549        }
550    }
551}
552
553
554impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
555where
556    TOuter: Timestamp,
557    TInner: Timestamp+Refines<TOuter>,
558{
559    fn local(&self) -> bool { false }
560    fn inputs(&self)  -> usize { self.inputs }
561    fn outputs(&self) -> usize { self.outputs }
562
563    // produces connectivity summaries from inputs to outputs, and reports initial internal
564    // capabilities on each of the outputs (projecting capabilities from contained scopes).
565    fn initialize(mut self: Box<Self>) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, Box<dyn Schedule>) {
566
567        // double-check that child 0 (the outside world) is correctly shaped.
568        assert_eq!(self.children[0].outputs, self.inputs());
569        assert_eq!(self.children[0].inputs, self.outputs());
570
571        // Note that we need to have `self.inputs()` elements in the summary
572        // with each element containing `self.outputs()` antichains regardless
573        // of how long `self.scope_summary` is
574        let mut internal_summary = vec![PortConnectivity::default(); self.inputs()];
575        for (input_idx, input) in self.scope_summary.iter().enumerate() {
576            for (output_idx, output) in input.iter_ports() {
577                for outer in output.elements().iter().cloned().map(TInner::summarize) {
578                    internal_summary[input_idx].insert(output_idx, outer);
579                }
580            }
581        }
582
583        debug_assert_eq!(
584            internal_summary.len(),
585            self.inputs(),
586            "the internal summary should have as many elements as there are inputs",
587        );
588        debug_assert!(
589            internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())),
590            "each element of the internal summary should only reference valid outputs",
591        );
592
593        // Each child has expressed initial capabilities (their `shared_progress.internals`).
594        // We introduce these into the progress tracker to determine the scope's initial
595        // internal capabilities.
596        for child in self.children.iter_mut() {
597            child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
598        }
599
600        self.propagate_pointstamps();  // Propagate expressed capabilities to output frontiers.
601
602        // Return summaries and shared progress information.
603        (internal_summary, Rc::clone(&self.shared_progress), self)
604    }
605
606    fn notify_me(&self) -> &[FrontierInterest] { &self.notify_me }
607}
608
609struct PerOperatorState<T: Timestamp> {
610
611    name: String,       // name of the operator
612    index: usize,       // index of the operator within its parent scope
613    id: usize,          // worker-unique identifier
614
615    local: bool,        // indicates whether the operator will exchange data or not
616    notify: Vec<FrontierInterest>,
617    inputs: usize,      // number of inputs to the operator
618    outputs: usize,     // number of outputs from the operator
619
620    operator: Option<Box<dyn Schedule>>,
621
622    edges: Vec<Vec<Target>>,    // edges from the outputs of the operator
623
624    shared_progress: Rc<RefCell<SharedProgress<T>>>,
625
626    internal_summary: Connectivity<T::Summary>,   // cached result from initialize.
627
628    logging: Option<Logger>,
629}
630
631impl<T: Timestamp> PerOperatorState<T> {
632
633    fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
634        PerOperatorState {
635            name:       "External".to_owned(),
636            operator:   None,
637            index:      0,
638            id:         usize::MAX,
639            local:      false,
640            notify:     vec![FrontierInterest::IfCapability; inputs],
641            inputs,
642            outputs,
643
644            edges: vec![Vec::new(); outputs],
645
646            logging: None,
647
648            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
649            internal_summary: Vec::new(),
650        }
651    }
652
653    pub fn new(
654        scope: Box<dyn Operate<T>>,
655        index: usize,
656        identifier: usize,
657        logging: Option<Logger>,
658        summary_logging: &mut Option<SummaryLogger<T::Summary>>,
659    ) -> PerOperatorState<T>
660    {
661        let local = scope.local();
662        let inputs = scope.inputs();
663        let outputs = scope.outputs();
664        let notify = scope.notify_me().to_vec();
665
666        let (internal_summary, shared_progress, operator) = scope.initialize();
667
668        if let Some(l) = summary_logging {
669            l.log(crate::logging::OperatesSummaryEvent {
670                id: identifier,
671                summary: internal_summary.clone(),
672            })
673        }
674
675        assert_eq!(
676            internal_summary.len(),
677            inputs,
678            "operator summary has {} inputs when {} were expected",
679            internal_summary.len(),
680            inputs,
681        );
682        assert!(
683            internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)),
684            "operator summary references invalid output port",
685        );
686
687        PerOperatorState {
688            name:               operator.name().to_owned(),
689            operator:           Some(operator),
690            index,
691            id:                 identifier,
692            local,
693            notify,
694            inputs,
695            outputs,
696            edges:              vec![vec![]; outputs],
697
698            logging,
699
700            shared_progress,
701            internal_summary,
702        }
703    }
704
705    pub fn schedule(&mut self) -> bool {
706
707        if let Some(ref mut operator) = self.operator {
708
709            // Perhaps log information about the start of the schedule call.
710            if let Some(l) = self.logging.as_mut() {
711                // FIXME: There is no contract that the operator must consume frontier changes.
712                //        This report could be spurious.
713                // TODO:  Perhaps fold this in to `ScheduleEvent::start()` as a "reason"?
714                let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
715                if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
716                    l.log(crate::logging::PushProgressEvent { op_id: self.id })
717                }
718
719                l.log(crate::logging::ScheduleEvent::start(self.id));
720            }
721
722            let incomplete = operator.schedule();
723
724            // Perhaps log information about the stop of the schedule call.
725            if let Some(l) = self.logging.as_mut() {
726                l.log(crate::logging::ScheduleEvent::stop(self.id));
727            }
728
729            incomplete
730        }
731        else {
732
733            // If the operator is closed and we are reporting progress at it, something has surely gone wrong.
734            if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
735                println!("Operator prematurely shut down: {}", self.name);
736                println!("  {:?}", self.notify);
737                println!("  {:?}", self.shared_progress.borrow_mut().frontiers);
738                panic!();
739            }
740
741            // A closed operator shouldn't keep anything open.
742            false
743        }
744    }
745
746    fn shut_down(&mut self) {
747        if self.operator.is_some() {
748            if let Some(l) = self.logging.as_mut() {
749                l.log(crate::logging::ShutdownEvent{ id: self.id });
750            }
751            self.operator = None;
752        }
753    }
754
755    /// Extracts shared progress information and converts to pointstamp changes.
756    fn extract_progress(&self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
757
758        let shared_progress = &mut *self.shared_progress.borrow_mut();
759
760        // Migrate consumeds, internals, produceds into progress statements.
761        for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
762            let target = Location::new_target(self.index, input);
763            for (time, delta) in consumed.drain() {
764                pointstamps.update((target, time), -delta);
765            }
766        }
767        for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
768            let source = Location::new_source(self.index, output);
769            for (time, delta) in internal.drain() {
770                pointstamps.update((source, time.clone()), delta);
771            }
772        }
773        for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
774            for (time, delta) in produced.drain() {
775                for target in &self.edges[output] {
776                    pointstamps.update((Location::from(*target), time.clone()), delta);
777                    temp_active.push(Reverse(target.node));
778                }
779            }
780        }
781    }
782
783    /// Test the validity of `self.shared_progress`.
784    ///
785    /// The validity of shared progress information depends on both the external frontiers and the
786    /// internal capabilities, as events can occur that cannot be explained locally otherwise.
787    #[allow(dead_code)]
788    fn validate_progress(&self, child_state: &reachability::PerOperator<T>) {
789
790        let shared_progress = &mut *self.shared_progress.borrow_mut();
791
792        // Increments to internal capabilities require a consumed input message, a
793        for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
794            for (time, diff) in internal.iter() {
795                if *diff > 0 {
796                    let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
797                    let internal = child_state.sources[output].implications.less_equal(time);
798                    if !consumed && !internal {
799                        println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
800                        panic!("Progress error; internal {:?}", self.name);
801                    }
802                }
803            }
804        }
805        for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
806            for (time, diff) in produced.iter() {
807                if *diff > 0 {
808                    let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
809                    let internal = child_state.sources[output].implications.less_equal(time);
810                    if !consumed && !internal {
811                        println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
812                        panic!("Progress error; produced {:?}", self.name);
813                    }
814                }
815            }
816        }
817    }
818}
819
820// Explicitly shut down the operator to get logged information.
821impl<T: Timestamp> Drop for PerOperatorState<T> {
822    fn drop(&mut self) {
823        self.shut_down();
824    }
825}