1use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::TimelyLogger as Logger;
14use crate::logging::TimelySummaryLogger as SummaryLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30pub struct SubgraphBuilder<TOuter, TInner>
40where
41 TOuter: Timestamp,
42 TInner: Timestamp,
43{
44 pub name: String,
46
47 pub path: Rc<[usize]>,
49
50 index: usize,
52
53 identifier: usize,
55
56 children: Vec<PerOperatorState<TInner>>,
58 child_count: usize,
59
60 edge_stash: Vec<(Source, Target)>,
61
62 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
64
65 output_capabilities: Vec<MutableAntichain<TOuter>>,
67
68 logging: Option<Logger>,
70 summary_logging: Option<SummaryLogger<TInner::Summary>>,
72}
73
74impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
75where
76 TOuter: Timestamp,
77 TInner: Timestamp+Refines<TOuter>,
78{
79 pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
81 self.input_messages.push(shared_counts);
82 Target::new(self.index, self.input_messages.len() - 1)
83 }
84
85 pub fn new_output(&mut self) -> Source {
87 self.output_capabilities.push(MutableAntichain::new());
88 Source::new(self.index, self.output_capabilities.len() - 1)
89 }
90
91 pub fn connect(&mut self, source: Source, target: Target) {
96 self.edge_stash.push((source, target));
97 }
98
99 pub fn new_from(
102 path: Rc<[usize]>,
103 identifier: usize,
104 logging: Option<Logger>,
105 summary_logging: Option<SummaryLogger<TInner::Summary>>,
106 name: &str,
107 )
108 -> SubgraphBuilder<TOuter, TInner>
109 {
110 let children = vec![PerOperatorState::empty(0, 0)];
112 let index = path[path.len() - 1];
113
114 SubgraphBuilder {
115 name: name.to_owned(),
116 path,
117 index,
118 identifier,
119 children,
120 child_count: 1,
121 edge_stash: Vec::new(),
122 input_messages: Vec::new(),
123 output_capabilities: Vec::new(),
124 logging,
125 summary_logging,
126 }
127 }
128
129 pub fn allocate_child_id(&mut self) -> usize {
131 self.child_count += 1;
132 self.child_count - 1
133 }
134
135 pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
137 let child = PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging);
138 if let Some(l) = &mut self.logging {
139 let mut child_path = Vec::with_capacity(self.path.len() + 1);
140 child_path.extend_from_slice(&self.path[..]);
141 child_path.push(index);
142
143 l.log(crate::logging::OperatesEvent {
144 id: identifier,
145 addr: child_path,
146 name: child.name.to_owned(),
147 });
148 }
149 self.children.push(child);
150 }
151
152 pub fn build<A: crate::worker::AsWorker>(mut self, worker: &mut A) -> Subgraph<TOuter, TInner> {
154 self.children.sort_unstable_by(|x,y| x.index.cmp(&y.index));
161 assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));
162
163 let inputs = self.input_messages.len();
164 let outputs = self.output_capabilities.len();
165
166 self.children[0] = PerOperatorState::empty(outputs, inputs);
168
169 let mut builder = reachability::Builder::new();
170
171 let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
173 builder.add_node(0, outputs, inputs, summary);
174 for (index, child) in self.children.iter().enumerate().skip(1) {
175 builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
176 }
177
178 for (source, target) in self.edge_stash {
179 self.children[source.node].edges[source.port].push(target);
180 builder.add_edge(source, target);
181 }
182
183 let type_name = std::any::type_name::<TInner>();
185 let reachability_logging =
186 worker.logger_for(&format!("timely/reachability/{type_name}"))
187 .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
188 let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
189 let (tracker, scope_summary) = builder.build(reachability_logging);
190
191 let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);
192
193 let mut incomplete = vec![true; self.children.len()];
194 incomplete[0] = false;
195 let incomplete_count = incomplete.len() - 1;
196
197 let activations = worker.activations();
198
199 activations.borrow_mut().activate(&self.path[..]);
200
201 let max_interest = self.children.iter()
203 .flat_map(|c| c.notify.iter().copied())
204 .max()
205 .unwrap_or(FrontierInterest::Never);
206 let notify_me: Vec<FrontierInterest> = vec![max_interest; inputs];
207
208 Subgraph {
209 name: self.name,
210 path: self.path,
211 inputs,
212 outputs,
213 incomplete,
214 incomplete_count,
215 activations,
216 temp_active: BinaryHeap::new(),
217 maybe_shutdown: Vec::new(),
218 children: self.children,
219 input_messages: self.input_messages,
220 output_capabilities: self.output_capabilities,
221
222 local_pointstamp: ChangeBatch::new(),
223 final_pointstamp: ChangeBatch::new(),
224 progcaster,
225 pointstamp_tracker: tracker,
226
227 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
228 scope_summary,
229
230 progress_mode: worker.config().progress_mode,
231 notify_me,
232 }
233 }
234}
235
236
237pub struct Subgraph<TOuter, TInner>
242where
243 TOuter: Timestamp,
244 TInner: Timestamp+Refines<TOuter>,
245{
246 name: String, pub path: Rc<[usize]>,
249 inputs: usize, outputs: usize, children: Vec<PerOperatorState<TInner>>,
254
255 incomplete: Vec<bool>, incomplete_count: usize, activations: Rc<RefCell<Activations>>,
260 temp_active: BinaryHeap<Reverse<usize>>,
261 maybe_shutdown: Vec<usize>,
262
263 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
265
266 output_capabilities: Vec<MutableAntichain<TOuter>>,
268
269 local_pointstamp: ChangeBatch<(Location, TInner)>,
271 final_pointstamp: ChangeBatch<(Location, TInner)>,
272
273 pointstamp_tracker: reachability::Tracker<TInner>,
276
277 progcaster: Progcaster<TInner>,
279
280 shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
281 scope_summary: Connectivity<TInner::Summary>,
282
283 progress_mode: ProgressMode,
284
285 notify_me: Vec<FrontierInterest>,
286}
287
288impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
289where
290 TOuter: Timestamp,
291 TInner: Timestamp+Refines<TOuter>,
292{
293 fn name(&self) -> &str { &self.name }
294
295 fn path(&self) -> &[usize] { &self.path }
296
297 fn schedule(&mut self) -> bool {
298
299 self.accept_frontier(); self.harvest_inputs(); self.progcaster.recv(&mut self.final_pointstamp);
309
310 self.propagate_pointstamps();
312
313 { let temp_active = &mut self.temp_active;
315 self.activations
316 .borrow_mut()
317 .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
318 }
319
320 let mut previous = 0;
325 while let Some(Reverse(index)) = self.temp_active.pop() {
326 if index > previous {
328 self.activate_child(index);
330 previous = index;
331 }
332 }
333
334 self.send_progress();
336
337 if !self.final_pointstamp.is_empty() {
339 self.activations.borrow_mut().activate(&self.path[..]);
340 }
341
342 let incomplete = self.incomplete_count > 0;
344 let tracking = self.pointstamp_tracker.tracking_anything();
345
346 incomplete || tracking
347 }
348}
349
350
351impl<TOuter, TInner> Subgraph<TOuter, TInner>
352where
353 TOuter: Timestamp,
354 TInner: Timestamp+Refines<TOuter>,
355{
356 fn activate_child(&mut self, child_index: usize) -> bool {
360
361 let child = &mut self.children[child_index];
362
363 let incomplete = child.schedule();
364
365 if incomplete != self.incomplete[child_index] {
366 if incomplete { self.incomplete_count += 1; }
367 else { self.incomplete_count -= 1; }
368 self.incomplete[child_index] = incomplete;
369 }
370
371 if !incomplete {
372 let child_state = self.pointstamp_tracker.node_state(child_index);
374 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
375 let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
376 if frontiers_empty && no_capabilities {
377 child.shut_down();
378 }
379 }
380 else {
381 #[cfg(debug_assertions)] {
383 child.validate_progress(self.pointstamp_tracker.node_state(child_index));
384 }
385 }
386
387 if child.local {
389 child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
390 }
391 else {
392 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
393 }
394
395 incomplete
396 }
397
398 fn accept_frontier(&mut self) {
400 for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
401 let source = Source::new(0, port);
402 for (time, value) in changes.drain() {
403 self.pointstamp_tracker.update_source(
404 source,
405 TInner::to_inner(time),
406 value
407 );
408 }
409 }
410 }
411
412 fn harvest_inputs(&mut self) {
419 for input in 0 .. self.inputs {
420 let source = Location::new_source(0, input);
421 let mut borrowed = self.input_messages[input].borrow_mut();
422 for (time, delta) in borrowed.drain() {
423 for target in &self.children[0].edges[input] {
424 self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
425 }
426 self.local_pointstamp.update((source, time), -delta);
427 }
428 }
429 }
430
431 fn propagate_pointstamps(&mut self) {
447
448 for ((location, timestamp), delta) in self.final_pointstamp.drain() {
450
451 if location.node == 0 {
453 match location.port {
454 Port::Source(scope_input) => {
457 self.shared_progress
458 .borrow_mut()
459 .consumeds[scope_input]
460 .update(timestamp.to_outer(), -delta);
461 },
462 Port::Target(scope_output) => {
466 self.shared_progress
467 .borrow_mut()
468 .produceds[scope_output]
469 .update(timestamp.to_outer(), delta);
470 },
471 }
472 }
473 else {
474 self.pointstamp_tracker.update(location, timestamp, delta);
475 }
476 }
477
478 self.pointstamp_tracker.propagate_all();
480
481 let (pushed, operators) = self.pointstamp_tracker.pushed();
483 for ((location, time), diff) in pushed.drain() {
484 self.maybe_shutdown.push(location.node);
485 if let crate::progress::Port::Target(port) = location.port {
487 let activate = match self.children[location.node].notify[port] {
489 FrontierInterest::Always => true,
490 FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 }
491 FrontierInterest::Never => false,
492 };
493 if activate { self.temp_active.push(Reverse(location.node)); }
494
495 self.children[location.node]
497 .shared_progress
498 .borrow_mut()
499 .frontiers[port]
500 .update(time, diff);
501 }
502 }
503
504 self.maybe_shutdown.sort_unstable();
506 self.maybe_shutdown.dedup();
507 for child_index in self.maybe_shutdown.drain(..) {
508 let child_state = self.pointstamp_tracker.node_state(child_index);
509 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
510 let no_capabilities = child_state.cap_counts == 0;
511 if frontiers_empty && no_capabilities {
512 self.temp_active.push(Reverse(child_index));
513 }
514 }
515
516 for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
518 self.pointstamp_tracker
519 .pushed_output()[output]
520 .drain()
521 .map(|(time, diff)| (time.to_outer(), diff))
522 .filter_through(&mut self.output_capabilities[output])
523 .for_each(|(time, diff)| internal.update(time, diff));
524 }
525 }
526
527 fn send_progress(&mut self) {
532
533 let must_send = self.progress_mode == ProgressMode::Eager || {
536 let tracker = &mut self.pointstamp_tracker;
537 self.local_pointstamp
538 .iter()
539 .any(|((location, time), diff)|
540 tracker.is_global(*location, time) && *diff < 0 ||
542 location.node == 0
544 )
545 };
546
547 if must_send {
548 self.progcaster.send(&mut self.local_pointstamp);
549 }
550 }
551}
552
553
554impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
555where
556 TOuter: Timestamp,
557 TInner: Timestamp+Refines<TOuter>,
558{
559 fn local(&self) -> bool { false }
560 fn inputs(&self) -> usize { self.inputs }
561 fn outputs(&self) -> usize { self.outputs }
562
563 fn initialize(mut self: Box<Self>) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>, Box<dyn Schedule>) {
566
567 assert_eq!(self.children[0].outputs, self.inputs());
569 assert_eq!(self.children[0].inputs, self.outputs());
570
571 let mut internal_summary = vec![PortConnectivity::default(); self.inputs()];
575 for (input_idx, input) in self.scope_summary.iter().enumerate() {
576 for (output_idx, output) in input.iter_ports() {
577 for outer in output.elements().iter().cloned().map(TInner::summarize) {
578 internal_summary[input_idx].insert(output_idx, outer);
579 }
580 }
581 }
582
583 debug_assert_eq!(
584 internal_summary.len(),
585 self.inputs(),
586 "the internal summary should have as many elements as there are inputs",
587 );
588 debug_assert!(
589 internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())),
590 "each element of the internal summary should only reference valid outputs",
591 );
592
593 for child in self.children.iter_mut() {
597 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
598 }
599
600 self.propagate_pointstamps(); (internal_summary, Rc::clone(&self.shared_progress), self)
604 }
605
606 fn notify_me(&self) -> &[FrontierInterest] { &self.notify_me }
607}
608
609struct PerOperatorState<T: Timestamp> {
610
611 name: String, index: usize, id: usize, local: bool, notify: Vec<FrontierInterest>,
617 inputs: usize, outputs: usize, operator: Option<Box<dyn Schedule>>,
621
622 edges: Vec<Vec<Target>>, shared_progress: Rc<RefCell<SharedProgress<T>>>,
625
626 internal_summary: Connectivity<T::Summary>, logging: Option<Logger>,
629}
630
631impl<T: Timestamp> PerOperatorState<T> {
632
633 fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
634 PerOperatorState {
635 name: "External".to_owned(),
636 operator: None,
637 index: 0,
638 id: usize::MAX,
639 local: false,
640 notify: vec![FrontierInterest::IfCapability; inputs],
641 inputs,
642 outputs,
643
644 edges: vec![Vec::new(); outputs],
645
646 logging: None,
647
648 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
649 internal_summary: Vec::new(),
650 }
651 }
652
653 pub fn new(
654 scope: Box<dyn Operate<T>>,
655 index: usize,
656 identifier: usize,
657 logging: Option<Logger>,
658 summary_logging: &mut Option<SummaryLogger<T::Summary>>,
659 ) -> PerOperatorState<T>
660 {
661 let local = scope.local();
662 let inputs = scope.inputs();
663 let outputs = scope.outputs();
664 let notify = scope.notify_me().to_vec();
665
666 let (internal_summary, shared_progress, operator) = scope.initialize();
667
668 if let Some(l) = summary_logging {
669 l.log(crate::logging::OperatesSummaryEvent {
670 id: identifier,
671 summary: internal_summary.clone(),
672 })
673 }
674
675 assert_eq!(
676 internal_summary.len(),
677 inputs,
678 "operator summary has {} inputs when {} were expected",
679 internal_summary.len(),
680 inputs,
681 );
682 assert!(
683 internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)),
684 "operator summary references invalid output port",
685 );
686
687 PerOperatorState {
688 name: operator.name().to_owned(),
689 operator: Some(operator),
690 index,
691 id: identifier,
692 local,
693 notify,
694 inputs,
695 outputs,
696 edges: vec![vec![]; outputs],
697
698 logging,
699
700 shared_progress,
701 internal_summary,
702 }
703 }
704
705 pub fn schedule(&mut self) -> bool {
706
707 if let Some(ref mut operator) = self.operator {
708
709 if let Some(l) = self.logging.as_mut() {
711 let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
715 if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
716 l.log(crate::logging::PushProgressEvent { op_id: self.id })
717 }
718
719 l.log(crate::logging::ScheduleEvent::start(self.id));
720 }
721
722 let incomplete = operator.schedule();
723
724 if let Some(l) = self.logging.as_mut() {
726 l.log(crate::logging::ScheduleEvent::stop(self.id));
727 }
728
729 incomplete
730 }
731 else {
732
733 if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
735 println!("Operator prematurely shut down: {}", self.name);
736 println!(" {:?}", self.notify);
737 println!(" {:?}", self.shared_progress.borrow_mut().frontiers);
738 panic!();
739 }
740
741 false
743 }
744 }
745
746 fn shut_down(&mut self) {
747 if self.operator.is_some() {
748 if let Some(l) = self.logging.as_mut() {
749 l.log(crate::logging::ShutdownEvent{ id: self.id });
750 }
751 self.operator = None;
752 }
753 }
754
755 fn extract_progress(&self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
757
758 let shared_progress = &mut *self.shared_progress.borrow_mut();
759
760 for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
762 let target = Location::new_target(self.index, input);
763 for (time, delta) in consumed.drain() {
764 pointstamps.update((target, time), -delta);
765 }
766 }
767 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
768 let source = Location::new_source(self.index, output);
769 for (time, delta) in internal.drain() {
770 pointstamps.update((source, time.clone()), delta);
771 }
772 }
773 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
774 for (time, delta) in produced.drain() {
775 for target in &self.edges[output] {
776 pointstamps.update((Location::from(*target), time.clone()), delta);
777 temp_active.push(Reverse(target.node));
778 }
779 }
780 }
781 }
782
783 #[allow(dead_code)]
788 fn validate_progress(&self, child_state: &reachability::PerOperator<T>) {
789
790 let shared_progress = &mut *self.shared_progress.borrow_mut();
791
792 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
794 for (time, diff) in internal.iter() {
795 if *diff > 0 {
796 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
797 let internal = child_state.sources[output].implications.less_equal(time);
798 if !consumed && !internal {
799 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
800 panic!("Progress error; internal {:?}", self.name);
801 }
802 }
803 }
804 }
805 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
806 for (time, diff) in produced.iter() {
807 if *diff > 0 {
808 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
809 let internal = child_state.sources[output].implications.less_equal(time);
810 if !consumed && !internal {
811 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
812 panic!("Progress error; produced {:?}", self.name);
813 }
814 }
815 }
816 }
817 }
818}
819
820impl<T: Timestamp> Drop for PerOperatorState<T> {
822 fn drop(&mut self) {
823 self.shut_down();
824 }
825}