1use std::rc::Rc;
9use std::cell::RefCell;
10use std::collections::BinaryHeap;
11use std::cmp::Reverse;
12
13use crate::logging::{TimelyLogger as Logger, TimelyProgressEventBuilder};
14use crate::logging::TimelySummaryLogger as SummaryLogger;
15
16use crate::scheduling::Schedule;
17use crate::scheduling::activate::Activations;
18
19use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter};
20use crate::progress::{Timestamp, Operate, operate::SharedProgress};
21use crate::progress::{Location, Port, Source, Target};
22use crate::progress::operate::{Connectivity, PortConnectivity};
23use crate::progress::ChangeBatch;
24use crate::progress::broadcast::Progcaster;
25use crate::progress::reachability;
26use crate::progress::timestamp::Refines;
27
28use crate::worker::ProgressMode;
29
30pub struct SubgraphBuilder<TOuter, TInner>
40where
41 TOuter: Timestamp,
42 TInner: Timestamp,
43{
44 pub name: String,
46
47 pub path: Rc<[usize]>,
49
50 index: usize,
52
53 identifier: usize,
55
56 children: Vec<PerOperatorState<TInner>>,
58 child_count: usize,
59
60 edge_stash: Vec<(Source, Target)>,
61
62 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
64
65 output_capabilities: Vec<MutableAntichain<TOuter>>,
67
68 logging: Option<Logger>,
70 summary_logging: Option<SummaryLogger<TInner::Summary>>,
72}
73
74impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
75where
76 TOuter: Timestamp,
77 TInner: Timestamp+Refines<TOuter>,
78{
79 pub fn new_input(&mut self, shared_counts: Rc<RefCell<ChangeBatch<TInner>>>) -> Target {
81 self.input_messages.push(shared_counts);
82 Target::new(self.index, self.input_messages.len() - 1)
83 }
84
85 pub fn new_output(&mut self) -> Source {
87 self.output_capabilities.push(MutableAntichain::new());
88 Source::new(self.index, self.output_capabilities.len() - 1)
89 }
90
91 pub fn connect(&mut self, source: Source, target: Target) {
96 self.edge_stash.push((source, target));
97 }
98
99 pub fn new_from(
102 path: Rc<[usize]>,
103 identifier: usize,
104 logging: Option<Logger>,
105 summary_logging: Option<SummaryLogger<TInner::Summary>>,
106 name: &str,
107 )
108 -> SubgraphBuilder<TOuter, TInner>
109 {
110 let children = vec![PerOperatorState::empty(0, 0)];
112 let index = path[path.len() - 1];
113
114 SubgraphBuilder {
115 name: name.to_owned(),
116 path,
117 index,
118 identifier,
119 children,
120 child_count: 1,
121 edge_stash: Vec::new(),
122 input_messages: Vec::new(),
123 output_capabilities: Vec::new(),
124 logging,
125 summary_logging,
126 }
127 }
128
129 pub fn allocate_child_id(&mut self) -> usize {
131 self.child_count += 1;
132 self.child_count - 1
133 }
134
135 pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
137 if let Some(l) = &mut self.logging {
138 let mut child_path = Vec::with_capacity(self.path.len() + 1);
139 child_path.extend_from_slice(&self.path[..]);
140 child_path.push(index);
141
142 l.log(crate::logging::OperatesEvent {
143 id: identifier,
144 addr: child_path,
145 name: child.name().to_owned(),
146 });
147 }
148 self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging));
149 }
150
151 pub fn build<A: crate::worker::AsWorker>(mut self, worker: &mut A) -> Subgraph<TOuter, TInner> {
153 self.children.sort_by(|x,y| x.index.cmp(&y.index));
160 assert!(self.children.iter().enumerate().all(|(i,x)| i == x.index));
161
162 let inputs = self.input_messages.len();
163 let outputs = self.output_capabilities.len();
164
165 self.children[0] = PerOperatorState::empty(outputs, inputs);
167
168 let mut builder = reachability::Builder::new();
169
170 let summary = (0..outputs).map(|_| PortConnectivity::default()).collect();
172 builder.add_node(0, outputs, inputs, summary);
173 for (index, child) in self.children.iter().enumerate().skip(1) {
174 builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone());
175 }
176
177 for (source, target) in self.edge_stash {
178 self.children[source.node].edges[source.port].push(target);
179 builder.add_edge(source, target);
180 }
181
182 let type_name = std::any::type_name::<TInner>();
184 let reachability_logging =
185 worker.log_register()
186 .get::<reachability::logging::TrackerEventBuilder<TInner>>(&format!("timely/reachability/{type_name}"))
187 .map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
188 let progress_logging = worker.log_register().get::<TimelyProgressEventBuilder<TInner>>(&format!("timely/progress/{type_name}"));
189 let (tracker, scope_summary) = builder.build(reachability_logging);
190
191 let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);
192
193 let mut incomplete = vec![true; self.children.len()];
194 incomplete[0] = false;
195 let incomplete_count = incomplete.len() - 1;
196
197 let activations = worker.activations();
198
199 activations.borrow_mut().activate(&self.path[..]);
200
201 Subgraph {
202 name: self.name,
203 path: self.path,
204 inputs,
205 outputs,
206 incomplete,
207 incomplete_count,
208 activations,
209 temp_active: BinaryHeap::new(),
210 maybe_shutdown: Vec::new(),
211 children: self.children,
212 input_messages: self.input_messages,
213 output_capabilities: self.output_capabilities,
214
215 local_pointstamp: ChangeBatch::new(),
216 final_pointstamp: ChangeBatch::new(),
217 progcaster,
218 pointstamp_tracker: tracker,
219
220 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
221 scope_summary,
222
223 progress_mode: worker.config().progress_mode,
224 }
225 }
226}
227
228
229pub struct Subgraph<TOuter, TInner>
234where
235 TOuter: Timestamp,
236 TInner: Timestamp+Refines<TOuter>,
237{
238 name: String, pub path: Rc<[usize]>,
241 inputs: usize, outputs: usize, children: Vec<PerOperatorState<TInner>>,
246
247 incomplete: Vec<bool>, incomplete_count: usize, activations: Rc<RefCell<Activations>>,
252 temp_active: BinaryHeap<Reverse<usize>>,
253 maybe_shutdown: Vec<usize>,
254
255 input_messages: Vec<Rc<RefCell<ChangeBatch<TInner>>>>,
257
258 output_capabilities: Vec<MutableAntichain<TOuter>>,
260
261 local_pointstamp: ChangeBatch<(Location, TInner)>,
263 final_pointstamp: ChangeBatch<(Location, TInner)>,
264
265 pointstamp_tracker: reachability::Tracker<TInner>,
268
269 progcaster: Progcaster<TInner>,
271
272 shared_progress: Rc<RefCell<SharedProgress<TOuter>>>,
273 scope_summary: Connectivity<TInner::Summary>,
274
275 progress_mode: ProgressMode,
276}
277
278impl<TOuter, TInner> Schedule for Subgraph<TOuter, TInner>
279where
280 TOuter: Timestamp,
281 TInner: Timestamp+Refines<TOuter>,
282{
283 fn name(&self) -> &str { &self.name }
284
285 fn path(&self) -> &[usize] { &self.path }
286
287 fn schedule(&mut self) -> bool {
288
289 self.accept_frontier(); self.harvest_inputs(); self.progcaster.recv(&mut self.final_pointstamp);
299
300 self.propagate_pointstamps();
302
303 { let temp_active = &mut self.temp_active;
305 self.activations
306 .borrow_mut()
307 .for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
308 }
309
310 let mut previous = 0;
315 while let Some(Reverse(index)) = self.temp_active.pop() {
316 if index > previous {
318 self.activate_child(index);
320 previous = index;
321 }
322 }
323
324 self.send_progress();
326
327 if !self.final_pointstamp.is_empty() {
329 self.activations.borrow_mut().activate(&self.path[..]);
330 }
331
332 let incomplete = self.incomplete_count > 0;
334 let tracking = self.pointstamp_tracker.tracking_anything();
335
336 incomplete || tracking
337 }
338}
339
340
341impl<TOuter, TInner> Subgraph<TOuter, TInner>
342where
343 TOuter: Timestamp,
344 TInner: Timestamp+Refines<TOuter>,
345{
346 fn activate_child(&mut self, child_index: usize) -> bool {
350
351 let child = &mut self.children[child_index];
352
353 let incomplete = child.schedule();
354
355 if incomplete != self.incomplete[child_index] {
356 if incomplete { self.incomplete_count += 1; }
357 else { self.incomplete_count -= 1; }
358 self.incomplete[child_index] = incomplete;
359 }
360
361 if !incomplete {
362 let child_state = self.pointstamp_tracker.node_state(child_index);
364 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
365 let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
366 if frontiers_empty && no_capabilities {
367 child.shut_down();
368 }
369 }
370 else {
371 #[cfg(debug_assertions)] {
373 child.validate_progress(self.pointstamp_tracker.node_state(child_index));
374 }
375 }
376
377 if child.local {
379 child.extract_progress(&mut self.local_pointstamp, &mut self.temp_active);
380 }
381 else {
382 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
383 }
384
385 incomplete
386 }
387
388 fn accept_frontier(&mut self) {
390 for (port, changes) in self.shared_progress.borrow_mut().frontiers.iter_mut().enumerate() {
391 let source = Source::new(0, port);
392 for (time, value) in changes.drain() {
393 self.pointstamp_tracker.update_source(
394 source,
395 TInner::to_inner(time),
396 value
397 );
398 }
399 }
400 }
401
402 fn harvest_inputs(&mut self) {
409 for input in 0 .. self.inputs {
410 let source = Location::new_source(0, input);
411 let mut borrowed = self.input_messages[input].borrow_mut();
412 for (time, delta) in borrowed.drain() {
413 for target in &self.children[0].edges[input] {
414 self.local_pointstamp.update((Location::from(*target), time.clone()), delta);
415 }
416 self.local_pointstamp.update((source, time), -delta);
417 }
418 }
419 }
420
421 fn propagate_pointstamps(&mut self) {
437
438 for ((location, timestamp), delta) in self.final_pointstamp.drain() {
440
441 if location.node == 0 {
443 match location.port {
444 Port::Source(scope_input) => {
447 self.shared_progress
448 .borrow_mut()
449 .consumeds[scope_input]
450 .update(timestamp.to_outer(), -delta);
451 },
452 Port::Target(scope_output) => {
456 self.shared_progress
457 .borrow_mut()
458 .produceds[scope_output]
459 .update(timestamp.to_outer(), delta);
460 },
461 }
462 }
463 else {
464 self.pointstamp_tracker.update(location, timestamp, delta);
465 }
466 }
467
468 self.pointstamp_tracker.propagate_all();
470
471 for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() {
473 self.maybe_shutdown.push(location.node);
474 if let crate::progress::Port::Target(port) = location.port {
476 if self.children[location.node].notify {
477 self.temp_active.push(Reverse(location.node));
478 }
479 self.children[location.node]
483 .shared_progress
484 .borrow_mut()
485 .frontiers[port]
486 .update(time, diff);
487 }
488 }
489
490 self.maybe_shutdown.sort();
492 self.maybe_shutdown.dedup();
493 for child_index in self.maybe_shutdown.drain(..) {
494 let child_state = self.pointstamp_tracker.node_state(child_index);
495 let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty());
496 let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty());
497 if frontiers_empty && no_capabilities {
498 self.temp_active.push(Reverse(child_index));
499 }
500 }
501
502 for (output, internal) in self.shared_progress.borrow_mut().internals.iter_mut().enumerate() {
504 self.pointstamp_tracker
505 .pushed_output()[output]
506 .drain()
507 .map(|(time, diff)| (time.to_outer(), diff))
508 .filter_through(&mut self.output_capabilities[output])
509 .for_each(|(time, diff)| internal.update(time, diff));
510 }
511 }
512
513 fn send_progress(&mut self) {
518
519 let must_send = self.progress_mode == ProgressMode::Eager || {
522 let tracker = &mut self.pointstamp_tracker;
523 self.local_pointstamp
524 .iter()
525 .any(|((location, time), diff)|
526 tracker.is_global(*location, time) && *diff < 0
528 )
529 };
530
531 if must_send {
532 self.progcaster.send(&mut self.local_pointstamp);
533 }
534 }
535}
536
537
538impl<TOuter, TInner> Operate<TOuter> for Subgraph<TOuter, TInner>
539where
540 TOuter: Timestamp,
541 TInner: Timestamp+Refines<TOuter>,
542{
543 fn local(&self) -> bool { false }
544 fn inputs(&self) -> usize { self.inputs }
545 fn outputs(&self) -> usize { self.outputs }
546
547 fn get_internal_summary(&mut self) -> (Connectivity<TOuter::Summary>, Rc<RefCell<SharedProgress<TOuter>>>) {
550
551 assert_eq!(self.children[0].outputs, self.inputs());
553 assert_eq!(self.children[0].inputs, self.outputs());
554
555 let mut internal_summary = vec![PortConnectivity::default(); self.inputs()];
559 for (input_idx, input) in self.scope_summary.iter().enumerate() {
560 for (output_idx, output) in input.iter_ports() {
561 for outer in output.elements().iter().cloned().map(TInner::summarize) {
562 internal_summary[input_idx].insert(output_idx, outer);
563 }
564 }
565 }
566
567 debug_assert_eq!(
568 internal_summary.len(),
569 self.inputs(),
570 "the internal summary should have as many elements as there are inputs",
571 );
572 debug_assert!(
573 internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < self.outputs())),
574 "each element of the internal summary should only reference valid outputs",
575 );
576
577 for child in self.children.iter_mut() {
581 child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active);
582 }
583
584 self.propagate_pointstamps(); (internal_summary, Rc::clone(&self.shared_progress))
588 }
589
590 fn set_external_summary(&mut self) {
591 self.accept_frontier();
592 self.propagate_pointstamps(); self.children
594 .iter_mut()
595 .flat_map(|child| child.operator.as_mut())
596 .for_each(|op| op.set_external_summary());
597 }
598}
599
600struct PerOperatorState<T: Timestamp> {
601
602 name: String, index: usize, id: usize, local: bool, notify: bool,
608 inputs: usize, outputs: usize, operator: Option<Box<dyn Operate<T>>>,
612
613 edges: Vec<Vec<Target>>, shared_progress: Rc<RefCell<SharedProgress<T>>>,
616
617 internal_summary: Connectivity<T::Summary>, logging: Option<Logger>,
620}
621
622impl<T: Timestamp> PerOperatorState<T> {
623
624 fn empty(inputs: usize, outputs: usize) -> PerOperatorState<T> {
625 PerOperatorState {
626 name: "External".to_owned(),
627 operator: None,
628 index: 0,
629 id: usize::MAX,
630 local: false,
631 notify: true,
632 inputs,
633 outputs,
634
635 edges: vec![Vec::new(); outputs],
636
637 logging: None,
638
639 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))),
640 internal_summary: Vec::new(),
641 }
642 }
643
644 pub fn new(
645 mut scope: Box<dyn Operate<T>>,
646 index: usize,
647 identifier: usize,
648 logging: Option<Logger>,
649 summary_logging: &mut Option<SummaryLogger<T::Summary>>,
650 ) -> PerOperatorState<T>
651 {
652 let local = scope.local();
653 let inputs = scope.inputs();
654 let outputs = scope.outputs();
655 let notify = scope.notify_me();
656
657 let (internal_summary, shared_progress) = scope.get_internal_summary();
658
659 if let Some(l) = summary_logging {
660 l.log(crate::logging::OperatesSummaryEvent {
661 id: identifier,
662 summary: internal_summary.clone(),
663 })
664 }
665
666 assert_eq!(
667 internal_summary.len(),
668 inputs,
669 "operator summary has {} inputs when {} were expected",
670 internal_summary.len(),
671 inputs,
672 );
673 assert!(
674 internal_summary.iter().all(|os| os.iter_ports().all(|(o,_)| o < outputs)),
675 "operator summary references invalid output port",
676 );
677
678 PerOperatorState {
679 name: scope.name().to_owned(),
680 operator: Some(scope),
681 index,
682 id: identifier,
683 local,
684 notify,
685 inputs,
686 outputs,
687 edges: vec![vec![]; outputs],
688
689 logging,
690
691 shared_progress,
692 internal_summary,
693 }
694 }
695
696 pub fn schedule(&mut self) -> bool {
697
698 if let Some(ref mut operator) = self.operator {
699
700 if let Some(l) = self.logging.as_mut() {
702 let frontiers = &mut self.shared_progress.borrow_mut().frontiers[..];
706 if frontiers.iter_mut().any(|buffer| !buffer.is_empty()) {
707 l.log(crate::logging::PushProgressEvent { op_id: self.id })
708 }
709
710 l.log(crate::logging::ScheduleEvent::start(self.id));
711 }
712
713 let incomplete = operator.schedule();
714
715 if let Some(l) = self.logging.as_mut() {
717 l.log(crate::logging::ScheduleEvent::stop(self.id));
718 }
719
720 incomplete
721 }
722 else {
723
724 if self.shared_progress.borrow_mut().frontiers.iter_mut().any(|x| !x.is_empty()) {
726 println!("Operator prematurely shut down: {}", self.name);
727 println!(" {:?}", self.notify);
728 println!(" {:?}", self.shared_progress.borrow_mut().frontiers);
729 panic!();
730 }
731
732 false
734 }
735 }
736
737 fn shut_down(&mut self) {
738 if self.operator.is_some() {
739 if let Some(l) = self.logging.as_mut() {
740 l.log(crate::logging::ShutdownEvent{ id: self.id });
741 }
742 self.operator = None;
743 }
744 }
745
746 fn extract_progress(&self, pointstamps: &mut ChangeBatch<(Location, T)>, temp_active: &mut BinaryHeap<Reverse<usize>>) {
748
749 let shared_progress = &mut *self.shared_progress.borrow_mut();
750
751 for (input, consumed) in shared_progress.consumeds.iter_mut().enumerate() {
753 let target = Location::new_target(self.index, input);
754 for (time, delta) in consumed.drain() {
755 pointstamps.update((target, time), -delta);
756 }
757 }
758 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
759 let source = Location::new_source(self.index, output);
760 for (time, delta) in internal.drain() {
761 pointstamps.update((source, time.clone()), delta);
762 }
763 }
764 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
765 for (time, delta) in produced.drain() {
766 for target in &self.edges[output] {
767 pointstamps.update((Location::from(*target), time.clone()), delta);
768 temp_active.push(Reverse(target.node));
769 }
770 }
771 }
772 }
773
774 #[allow(dead_code)]
779 fn validate_progress(&self, child_state: &reachability::PerOperator<T>) {
780
781 let shared_progress = &mut *self.shared_progress.borrow_mut();
782
783 for (output, internal) in shared_progress.internals.iter_mut().enumerate() {
785 for (time, diff) in internal.iter() {
786 if *diff > 0 {
787 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
788 let internal = child_state.sources[output].implications.less_equal(time);
789 if !consumed && !internal {
790 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
791 panic!("Progress error; internal {:?}", self.name);
792 }
793 }
794 }
795 }
796 for (output, produced) in shared_progress.produceds.iter_mut().enumerate() {
797 for (time, diff) in produced.iter() {
798 if *diff > 0 {
799 let consumed = shared_progress.consumeds.iter_mut().any(|x| x.iter().any(|(t,d)| *d > 0 && t.less_equal(time)));
800 let internal = child_state.sources[output].implications.less_equal(time);
801 if !consumed && !internal {
802 println!("Increment at {:?}, not supported by\n\tconsumed: {:?}\n\tinternal: {:?}", time, shared_progress.consumeds, child_state.sources[output].implications);
803 panic!("Progress error; produced {:?}", self.name);
804 }
805 }
806 }
807 }
808 }
809}
810
811impl<T: Timestamp> Drop for PerOperatorState<T> {
813 fn drop(&mut self) {
814 self.shut_down();
815 }
816}