1use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::TimelyLogger as Logger;
14use crate::logging::TimelySummaryLogger as SummaryLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30pub struct SubgraphBuilder<TInner>
40where
41 TInner: Timestamp,
42{
43 pub name: String,
45
46 pub path: Rc<[usize]>,
48
49 index: usize,
51
52 identifier: usize,
54
55 children: Vec<(Box<dyn Operate<TInner>>, usize, usize)>,
57 child_count: usize,
58
59 edge_stash: Vec<(Source, Target)>,
60
61 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
63
64 outputs: usize,
66
67}
68
69impl<TInner> SubgraphBuilder<TInner>
70where
71 TInner: Timestamp,
72{
73 pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
75 self.input_messages.push(shared_counts);
76 Target::new(self.index, self.input_messages.len() - 1)
77 }
78
79 pub fn new_output(&mut self) -> Source {
81 self.outputs += 1;
82 Source::new(self.index, self.outputs - 1)
83 }
84
85 pub fn connect(&mut self, source: Source, target: Target) {
90 self.edge_stash.push((source, target));
91 }
92
93 pub fn new_from(
96 path: Rc<[usize]>,
97 identifier: usize,
98 name: &str,
99 )
100 -> SubgraphBuilder<TInner>
101 {
102 let index = path[path.len() - 1];
103
104 SubgraphBuilder {
105 name: name.to_owned(),
106 path,
107 index,
108 identifier,
109 children: Vec::new(),
110 child_count: 1,
111 edge_stash: Vec::new(),
112 input_messages: Vec::new(),
113 outputs: 0,
114 }
115 }
116
117 pub fn allocate_child_id(&mut self) -> usize {
119 self.child_count += 1;
120 self.child_count - 1
121 }
122
123 pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
127 self.children.push((child, index, identifier));
128 }
129
130 pub fn build<TOuter: Timestamp>(mut self, worker: &crate::worker::Worker) -> Subgraph<TOuter, TInner> {
132 let inputs = self.input_messages.len();
138 let outputs = self.outputs;
139
140 let type_name = std::any::type_name::<TInner>();
141 let mut logging = worker.logging();
142 let mut summary_logging = worker.logger_for(&format!("timely/summary/{type_name}"));
143
144 self.children.sort_unstable_by_key(|&(_, index, _)| index);
146 let mut children: Vec<_> = [PerOperatorState::empty(outputs, inputs)]
147 .into_iter()
148 .chain(self.children.into_iter().map(|(operator, index, identifier)| {
149 let child = PerOperatorState::new(operator, index, identifier, logging.clone(), &mut summary_logging);
150 if let Some(l) = &mut logging {
151 let mut child_path = Vec::with_capacity(self.path.len() + 1);
152 child_path.extend_from_slice(&self.path[..]);
153 child_path.push(index);
154 l.log(crate::logging::OperatesEvent {
155 id: identifier,
156 addr: child_path,
157 name: child.name.to_owned(),
158 });
159 }
160 child
161 }))
162 .collect();
163 assert!(children.iter().enumerate().all(|(i,x)| i == x.index));
164
165 let mut builder = reachability::Builder::new();
166
167 let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
169 builder.add_node(0, outputs, inputs, summary);
170 for (index, child) in children.iter().enumerate().skip(1) {
171 builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
172 }
173
174 for (source, target) in self.edge_stash {
175 children[source.node].edges[source.port].push(target);
176 builder.add_edge(source, target);
177 }
178
179 let reachability_logging =
180 worker.logger_for(&format!("timely/reachability/{type_name}"))
181 .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
182 let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
183 let (tracker, scope_summary) = builder.build(reachability_logging);
184
185 let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, logging, progress_logging);
186
187 let mut incomplete = vec![true; children.len()];
188 incomplete[0] = false;
189 let incomplete_count = incomplete.len() - 1;
190
191 let activations = worker.activations();
192
193 activations.borrow_mut().activate(&self.path[..]);
194
195 let max_interest = children.iter()
197 .flat_map(|c| c.notify.iter().copied())
198 .max()
199 .unwrap_or(FrontierInterest::Never);
200 let notify_me: Vec<FrontierInterest> = vec![max_interest; inputs];
201
202 Subgraph {
203 name: self.name,
204 path: self.path,
205 inputs,
206 outputs,
207 incomplete,
208 incomplete_count,
209 activations,
210 temp_active: BinaryHeap::new(),
211 maybe_shutdown: Vec::new(),
212 children,
213 input_messages: self.input_messages,
214 output_capabilities: vec![MutableAntichain::new(); self.outputs],
215
216 local_pointstamp: ChangeBatch::new(),
217 final_pointstamp: ChangeBatch::new(),
218 progcaster,
219 pointstamp_tracker: tracker,
220
221 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
222 scope_summary,
223
224 progress_mode: worker.config().progress_mode,
225 notify_me,
226 }
227 }
228}
229
230
231pub struct Subgraph<TOuter, TInner>
236where
237 TOuter: Timestamp,
238 TInner: Timestamp,
239{
240 name: String, pub path: Rc<[usize]>,
243 inputs: usize, outputs: usize, children: Vec<PerOperatorState<TInner>>,
248
249 incomplete: Vec<bool>, incomplete_count: usize, activations: Rc<RefCell<Activations>>,
254 temp_active: BinaryHeap<Reverse<usize>>,
255 maybe_shutdown: Vec<usize>,
256
257 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
259
260 output_capabilities: Vec<MutableAntichain<TOuter>>,
262
263 local_pointstamp: ChangeBatch<(Location, TInner)>,
265 final_pointstamp: ChangeBatch<(Location, TInner)>,
266
267 pointstamp_tracker: reachability::Tracker<TInner>,
270
271 progcaster: Progcaster<TInner>,
273
274 shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
275 scope_summary: Connectivity<TInner::Summary>,
276
277 progress_mode: ProgressMode,
278
279 notify_me: Vec<FrontierInterest>,
280}
281
282impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
283where
284 TOuter: Timestamp,
285 TInner: Timestamp+Refines<TOuter>,
286{
287 fn name(&self) -> &str { &self.name }
288
289 fn path(&self) -> &[usize] { &self.path }
290
291 fn schedule(&mut self) -> bool {
292
293 self.accept_frontier(); self.harvest_inputs(); self.progcaster.recv(&mut self.final_pointstamp);
303
304 self.propagate_pointstamps();
306
307 { let temp_active = &mut self.temp_active;
309 self.activations
310 .borrow_mut()
311 .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
312 }
313
314 let mut previous = 0;
319 while let Some(Reverse(index)) = self.temp_active.pop() {
320 if index > previous {
322 self.activate_child(index);
324 previous = index;
325 }
326 }
327
328 self.send_progress();
330
331 if !self.final_pointstamp.is_empty() {
333 self.activations.borrow_mut().activate(&self.path[..]);
334 }
335
336 let incomplete = self.incomplete_count > 0;
338 let tracking = self.pointstamp_tracker.tracking_anything();
339
340 incomplete || tracking
341 }
342}
343
344
345impl<TOuter, TInner> Subgraph<TOuter, TInner>
346where
347 TOuter: Timestamp,
348 TInner: Timestamp+Refines<TOuter>,
349{
350 fn activate_child(&mut self, child_index: usize) -> bool {
354
355 let child = &mut self.children[child_index];
356
357 let incomplete = child.schedule();
358
359 if incomplete != self.incomplete[child_index] {
360 if incomplete { self.incomplete_count += 1; }
361 else { self.incomplete_count -= 1; }
362 self.incomplete[child_index] = incomplete;
363 }
364
365 if !incomplete {
366 let child_state = self.pointstamp_tracker.node_state(child_index);
368 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
369 let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
370 if frontiers_empty && no_capabilities {
371 child.shut_down();
372 }
373 }
374 else {
375 #[cfg(debug_assertions)] {
377 child.validate_progress(self.pointstamp_tracker.node_state(child_index));
378 }
379 }
380
381 if child.local {
383 child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
384 }
385 else {
386 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
387 }
388
389 incomplete
390 }
391
392 fn accept_frontier(&mut self) {
394 for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
395 let source = Source::new(0, port);
396 for (time, value) in changes.drain() {
397 self.pointstamp_tracker.update_source(
398 source,
399 TInner::to_inner(time),
400 value
401 );
402 }
403 }
404 }
405
406 fn harvest_inputs(&mut self) {
413 for input in 0 .. self.inputs {
414 let source = Location::new_source(0, input);
415 let mut borrowed = self.input_messages[input].borrow_mut();
416 for (time, delta) in borrowed.drain() {
417 for target in &self.children[0].edges[input] {
418 self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
419 }
420 self.local_pointstamp.update((source, time), -delta);
421 }
422 }
423 }
424
425 fn propagate_pointstamps(&mut self) {
441
442 for ((location, timestamp), delta) in self.final_pointstamp.drain() {
444
445 if location.node == 0 {
447 match location.port {
448 Port::Source(scope_input) => {
451 self.shared_progress
452 .borrow_mut()
453 .consumeds[scope_input]
454 .update(timestamp.to_outer(), -delta);
455 },
456 Port::Target(scope_output) => {
460 self.shared_progress
461 .borrow_mut()
462 .produceds[scope_output]
463 .update(timestamp.to_outer(), delta);
464 },
465 }
466 }
467 else {
468 self.pointstamp_tracker.update(location, timestamp, delta);
469 }
470 }
471
472 self.pointstamp_tracker.propagate_all();
474
475 let (pushed, operators) = self.pointstamp_tracker.pushed();
477 for ((location, time), diff) in pushed.drain() {
478 self.maybe_shutdown.push(location.node);
479 if let crate::progress::Port::Target(port) = location.port {
481 let activate = match self.children[location.node].notify[port] {
483 FrontierInterest::Always => true,
484 FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 }
485 FrontierInterest::Never => false,
486 };
487 if activate { self.temp_active.push(Reverse(location.node)); }
488
489 self.children[location.node]
491 .shared_progress
492 .borrow_mut()
493 .frontiers[port]
494 .update(time, diff);
495 }
496 }
497
498 self.maybe_shutdown.sort_unstable();
500 self.maybe_shutdown.dedup();
501 for child_index in self.maybe_shutdown.drain(..) {
502 let child_state = self.pointstamp_tracker.node_state(child_index);
503 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
504 let no_capabilities = child_state.cap_counts == 0;
505 if frontiers_empty && no_capabilities {
506 self.temp_active.push(Reverse(child_index));
507 }
508 }
509
510 for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
512 self.pointstamp_tracker
513 .pushed_output()[output]
514 .drain()
515 .map(|(time, diff)| (time.to_outer(), diff))
516 .filter_through(&mut self.output_capabilities[output])
517 .for_each(|(time, diff)| internal.update(time, diff));
518 }
519 }
520
521 fn send_progress(&mut self) {
526
527 let must_send = self.progress_mode == ProgressMode::Eager || {
530 let tracker = &mut self.pointstamp_tracker;
531 self.local_pointstamp
532 .iter()
533 .any(|((location, time), diff)|
534 tracker.is_global(*location, time) && *diff < 0 ||
536 location.node == 0
538 )
539 };
540
541 if must_send {
542 self.progcaster.send(&mut self.local_pointstamp);
543 }
544 }
545}
546
547
548impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
549where
550 TOuter: Timestamp,
551 TInner: Timestamp+Refines<TOuter>,
552{
553 fn local(&self) -> bool { false }
554 fn inputs(&self) -> usize { self.inputs }
555 fn outputs(&self) -> usize { self.outputs }
556
557 fn initialize(mut self: Box<Self>) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, Box<dyn Schedule>) {
560
561 assert_eq!(self.children[0].outputs, self.inputs());
563 assert_eq!(self.children[0].inputs, self.outputs());
564
565 let mut internal_summary = vec![PortConnectivity::default(); self.inputs()];
569 for (input_idx, input) in self.scope_summary.iter().enumerate() {
570 for (output_idx, output) in input.iter_ports() {
571 for outer in output.elements().iter().cloned().map(TInner::summarize) {
572 internal_summary[input_idx].insert(output_idx, outer);
573 }
574 }
575 }
576
577 debug_assert_eq!(
578 internal_summary.len(),
579 self.inputs(),
580 "the internal summary should have as many elements as there are inputs",
581 );
582 debug_assert!(
583 internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())),
584 "each element of the internal summary should only reference valid outputs",
585 );
586
587 for child in self.children.iter_mut() {
591 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
592 }
593
594 self.propagate_pointstamps(); (internal_summary, Rc::clone(&self.shared_progress), self)
598 }
599
600 fn notify_me(&self) -> &[FrontierInterest] { &self.notify_me }
601}
602
603struct PerOperatorState<T: Timestamp> {
604
605 name: String, index: usize, id: usize, local: bool, notify: Vec<FrontierInterest>,
611 inputs: usize, outputs: usize, operator: Option<Box<dyn Schedule>>,
615
616 edges: Vec<Vec<Target>>, shared_progress: Rc<RefCell<SharedProgress<T>>>,
619
620 internal_summary: Connectivity<T::Summary>, logging: Option<Logger>,
623}
624
625impl<T: Timestamp> PerOperatorState<T> {
626
627 fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
628 PerOperatorState {
629 name: "External".to_owned(),
630 operator: None,
631 index: 0,
632 id: usize::MAX,
633 local: false,
634 notify: vec![FrontierInterest::IfCapability; inputs],
635 inputs,
636 outputs,
637
638 edges: vec![Vec::new(); outputs],
639
640 logging: None,
641
642 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
643 internal_summary: Vec::new(),
644 }
645 }
646
647 pub fn new(
648 scope: Box<dyn Operate<T>>,
649 index: usize,
650 identifier: usize,
651 logging: Option<Logger>,
652 summary_logging: &mut Option<SummaryLogger<T::Summary>>,
653 ) -> PerOperatorState<T>
654 {
655 let local = scope.local();
656 let inputs = scope.inputs();
657 let outputs = scope.outputs();
658 let notify = scope.notify_me().to_vec();
659
660 let (internal_summary, shared_progress, operator) = scope.initialize();
661
662 if let Some(l) = summary_logging {
663 l.log(crate::logging::OperatesSummaryEvent {
664 id: identifier,
665 summary: internal_summary.clone(),
666 })
667 }
668
669 assert_eq!(
670 internal_summary.len(),
671 inputs,
672 "operator summary has {} inputs when {} were expected",
673 internal_summary.len(),
674 inputs,
675 );
676 assert!(
677 internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)),
678 "operator summary references invalid output port",
679 );
680
681 PerOperatorState {
682 name: operator.name().to_owned(),
683 operator: Some(operator),
684 index,
685 id: identifier,
686 local,
687 notify,
688 inputs,
689 outputs,
690 edges: vec![vec![]; outputs],
691
692 logging,
693
694 shared_progress,
695 internal_summary,
696 }
697 }
698
699 pub fn schedule(&mut self) -> bool {
700
701 if let Some(ref mut operator) = self.operator {
702
703 if let Some(l) = self.logging.as_mut() {
705 let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
709 if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
710 l.log(crate::logging::PushProgressEvent { op_id: self.id })
711 }
712
713 l.log(crate::logging::ScheduleEvent::start(self.id));
714 }
715
716 let incomplete = operator.schedule();
717
718 if let Some(l) = self.logging.as_mut() {
720 l.log(crate::logging::ScheduleEvent::stop(self.id));
721 }
722
723 incomplete
724 }
725 else {
726
727 if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
729 println!("Operator prematurely shut down: {}", self.name);
730 println!(" {:?}", self.notify);
731 println!(" {:?}", self.shared_progress.borrow_mut().frontiers);
732 panic!();
733 }
734
735 false
737 }
738 }
739
740 fn shut_down(&mut self) {
741 if self.operator.is_some() {
742 if let Some(l) = self.logging.as_mut() {
743 l.log(crate::logging::ShutdownEvent{ id: self.id });
744 }
745 self.operator = None;
746 }
747 }
748
749 fn extract_progress(&self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
751
752 let shared_progress = &mut *self.shared_progress.borrow_mut();
753
754 for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
756 let target = Location::new_target(self.index, input);
757 for (time, delta) in consumed.drain() {
758 pointstamps.update((target, time), -delta);
759 }
760 }
761 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
762 let source = Location::new_source(self.index, output);
763 for (time, delta) in internal.drain() {
764 pointstamps.update((source, time.clone()), delta);
765 }
766 }
767 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
768 for (time, delta) in produced.drain() {
769 for target in &self.edges[output] {
770 pointstamps.update((Location::from(*target), time.clone()), delta);
771 temp_active.push(Reverse(target.node));
772 }
773 }
774 }
775 }
776
777 #[allow(dead_code)]
782 fn validate_progress(&self, child_state: &reachability::PerOperator<T>) {
783
784 let shared_progress = &mut *self.shared_progress.borrow_mut();
785
786 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
788 for (time, diff) in internal.iter() {
789 if *diff > 0 {
790 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
791 let internal = child_state.sources[output].implications.less_equal(time);
792 if !consumed && !internal {
793 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
794 panic!("Progress error; internal {:?}", self.name);
795 }
796 }
797 }
798 }
799 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
800 for (time, diff) in produced.iter() {
801 if *diff > 0 {
802 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
803 let internal = child_state.sources[output].implications.less_equal(time);
804 if !consumed && !internal {
805 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
806 panic!("Progress error; produced {:?}", self.name);
807 }
808 }
809 }
810 }
811 }
812}
813
814impl<T: Timestamp> Drop for PerOperatorState<T> {
816 fn drop(&mut self) {
817 self.shut_down();
818 }
819}