1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::arrange::ShutdownButton;
117use differential_dataflow::operators::iterate::Variable;
118use differential_dataflow::trace::{BatchReader, TraceReader};
119use differential_dataflow::{AsCollection, Data, VecCollection};
120use futures::FutureExt;
121use futures::channel::oneshot;
122use itertools::Itertools;
123use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
124use mz_compute_types::dyncfgs::{
125 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
126 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
127 ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
128};
129use mz_compute_types::plan::render_plan::{
130 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
131};
132use mz_compute_types::plan::{ArrangementStrategy, LirId};
133use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
134use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
135use mz_repr::explain::DummyHumanizer;
136use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
137use mz_storage_operators::persist_source;
138use mz_storage_types::controller::CollectionMetadata;
139use mz_timely_util::columnation::ColumnationChunker;
140use mz_timely_util::operator::{CollectionExt, StreamExt};
141use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
142use mz_timely_util::scope_label::ScopeExt;
143use timely::PartialOrder;
144use timely::container::CapacityContainerBuilder;
145use timely::dataflow::channels::pact::Pipeline;
146use timely::dataflow::operators::vec::ToStream;
147use timely::dataflow::operators::vec::{BranchWhen, Filter};
148use timely::dataflow::operators::{Capability, Operator, Probe, probe};
149use timely::dataflow::{Scope, Stream, StreamVec};
150use timely::order::{Product, TotalOrder};
151use timely::progress::timestamp::Refines;
152use timely::progress::{Antichain, Timestamp};
153use timely::scheduling::ActivateOnDrop;
154use timely::worker::Worker as TimelyWorker;
155
156use crate::arrangement::manager::TraceBundle;
157use crate::compute_state::ComputeState;
158use crate::extensions::arrange::{KeyCollection, MzArrange};
159use crate::extensions::reduce::MzReduce;
160use crate::extensions::temporal_bucket::TemporalBucketing;
161use crate::logging::compute::{
162 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
163};
164use crate::render::context::{ArrangementFlavor, Context};
165use crate::render::errors::DataflowErrorSer;
166use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
167use mz_row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
168
169pub mod context;
170pub(crate) mod errors;
171mod flat_map;
172mod join;
173mod reduce;
174pub mod sinks;
175mod threshold;
176mod top_k;
177
178pub use context::CollectionBundle;
179pub use join::LinearJoinSpec;
180
181struct PressOnDrop<T>(ShutdownButton<T>);
185
186impl<T> Drop for PressOnDrop<T> {
187 fn drop(&mut self) {
188 self.0.press();
189 }
190}
191
192pub fn build_compute_dataflow(
198 timely_worker: &mut TimelyWorker,
199 compute_state: &mut ComputeState,
200 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
201 start_signal: StartSignal,
202 until: Antichain<mz_repr::Timestamp>,
203 dataflow_expiration: Antichain<mz_repr::Timestamp>,
204) {
205 let recursive = dataflow
207 .objects_to_build
208 .iter()
209 .any(|object| object.plan.is_recursive());
210
211 let indexes = dataflow
213 .index_exports
214 .iter()
215 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
216 .collect::<Vec<_>>();
217
218 let sinks = dataflow
220 .sink_exports
221 .iter()
222 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
223 .collect::<Vec<_>>();
224
225 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
226 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
227 let subscribe_snapshot_optimization =
228 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
229
230 let name = format!("Dataflow: {}", &dataflow.debug_name);
231 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
232 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
233
234 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
235 let scope = scope.with_label();
236
237 let mut imported_sources = Vec::new();
242 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
243 let output_probe = MzProbeHandle::default();
244
245 scope.clone().region_named(&input_name, |region| {
246 for (source_id, import) in dataflow.source_imports.iter() {
248 region.region_named(&format!("Source({:?})", source_id), |inner| {
249 let mut read_schema = None;
250 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
251 if apply_demands {
254 let demands = ops.demand();
255 let new_desc = import
256 .desc
257 .storage_metadata
258 .relation_desc
259 .apply_demand(&demands);
260 let new_arity = demands.len();
261 let remap: BTreeMap<_, _> = demands
262 .into_iter()
263 .enumerate()
264 .map(|(new, old)| (old, new))
265 .collect();
266 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
267 read_schema = Some(new_desc);
268 }
269
270 mz_expr::MfpPlan::create_from(ops)
271 .expect("Linear operators should always be valid")
272 });
273
274 let snapshot_mode = if import.with_snapshot || !subscribe_snapshot_optimization
275 {
276 SnapshotMode::Include
277 } else {
278 compute_state.metrics.inc_subscribe_snapshot_optimization();
279 SnapshotMode::Exclude
280 };
281 let suppress_early_progress_as_of = dataflow.as_of.clone();
282
283 let (mut ok_stream, err_stream, token) =
286 persist_source::persist_source::<DataflowErrorSer>(
287 inner,
288 *source_id,
289 Arc::clone(&compute_state.persist_clients),
290 &compute_state.txns_ctx,
291 import.desc.storage_metadata.clone(),
292 read_schema,
293 dataflow.as_of.clone(),
294 snapshot_mode,
295 until.clone(),
296 mfp.as_mut(),
297 compute_state.dataflow_max_inflight_bytes(),
298 start_signal.clone().into_send_future(),
299 ErrorHandler::Halt("compute_import"),
300 );
301
302 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
305
306 if let Some(as_of) = suppress_early_progress_as_of {
310 ok_stream = suppress_early_progress(ok_stream, as_of);
311 }
312
313 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
314 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
316 .get(&compute_state.worker_config);
317 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
318 .get(&compute_state.worker_config)
319 .as_millis()
320 .try_into()
321 .expect("must fit");
322
323 let stream = ok_stream.limit_progress(
324 output_probe.clone(),
325 slack,
326 limit,
327 import.upper.clone(),
328 name.clone(),
329 );
330 ok_stream = stream;
331 }
332
333 let input_probe =
335 compute_state.input_probe_for(*source_id, dataflow.export_ids());
336 ok_stream = ok_stream.probe_with(&input_probe);
337
338 let (oks, errs) = (
339 ok_stream
340 .as_collection()
341 .leave_region(region)
342 .leave_region(scope),
343 err_stream
344 .as_collection()
345 .leave_region(region)
346 .leave_region(scope),
347 );
348
349 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
350
351 tokens.insert(*source_id, Rc::new(token));
353 });
354 }
355 });
356
357 if recursive {
360 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
361 let mut context = Context::for_dataflow_in(
362 &dataflow,
363 region.clone(),
364 compute_state,
365 until,
366 dataflow_expiration,
367 );
368
369 for (id, (oks, errs)) in imported_sources.into_iter() {
370 let bundle = crate::render::CollectionBundle::from_collections(
371 oks.enter(region),
372 errs.enter(region),
373 );
374 context.insert_id(id, bundle);
376 }
377
378 for (idx_id, idx) in &dataflow.index_imports {
380 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
381 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
382 SnapshotMode::Include
383 } else {
384 compute_state.metrics.inc_subscribe_snapshot_optimization();
385 SnapshotMode::Exclude
386 };
387 context.import_index(
388 scope,
389 compute_state,
390 &mut tokens,
391 input_probe,
392 *idx_id,
393 &idx.desc,
394 &idx.typ,
395 snapshot_mode,
396 start_signal.clone(),
397 );
398 }
399
400 for object in dataflow.objects_to_build {
402 let bundle = context.scope.clone().region_named(
403 &format!("BuildingObject({:?})", object.id),
404 |region| {
405 let depends = object.plan.depends();
406 let in_let = object.plan.is_recursive();
407 context
408 .enter_region(region, Some(&depends))
409 .render_recursive_plan(
410 object.id,
411 0,
412 object.plan,
413 BindingInfo::Body { in_let },
415 )
416 .leave_region(context.scope)
417 },
418 );
419 let global_id = object.id;
420
421 context.log_dataflow_global_id(
422 *bundle
423 .scope()
424 .addr()
425 .first()
426 .expect("Dataflow root id must exist"),
427 global_id,
428 );
429 context.insert_id(Id::Global(object.id), bundle);
430 }
431
432 for (idx_id, dependencies, idx) in indexes {
434 context.export_index_iterative(
435 scope,
436 compute_state,
437 &tokens,
438 dependencies,
439 idx_id,
440 &idx,
441 &output_probe,
442 );
443 }
444
445 for (sink_id, dependencies, sink) in sinks {
447 context.export_sink(
448 compute_state,
449 &tokens,
450 dependencies,
451 sink_id,
452 &sink,
453 start_signal.clone(),
454 &output_probe,
455 scope,
456 );
457 }
458 });
459 } else {
460 scope.clone().region_named(&build_name, |region| {
461 let mut context = Context::for_dataflow_in(
462 &dataflow,
463 region.clone(),
464 compute_state,
465 until,
466 dataflow_expiration,
467 );
468
469 for (id, (oks, errs)) in imported_sources.into_iter() {
470 let bundle = crate::render::CollectionBundle::from_collections(
471 oks.enter_region(region),
472 errs.enter_region(region),
473 );
474 context.insert_id(id, bundle);
476 }
477
478 for (idx_id, idx) in &dataflow.index_imports {
480 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
481 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
482 SnapshotMode::Include
483 } else {
484 compute_state.metrics.inc_subscribe_snapshot_optimization();
485 SnapshotMode::Exclude
486 };
487 context.import_index(
488 scope,
489 compute_state,
490 &mut tokens,
491 input_probe,
492 *idx_id,
493 &idx.desc,
494 &idx.typ,
495 snapshot_mode,
496 start_signal.clone(),
497 );
498 }
499
500 for object in dataflow.objects_to_build {
502 let bundle = context.scope.clone().region_named(
503 &format!("BuildingObject({:?})", object.id),
504 |region| {
505 let depends = object.plan.depends();
506 context
507 .enter_region(region, Some(&depends))
508 .render_plan(object.id, object.plan)
509 .leave_region(context.scope)
510 },
511 );
512 let global_id = object.id;
513 context.log_dataflow_global_id(
514 *bundle
515 .scope()
516 .addr()
517 .first()
518 .expect("Dataflow root id must exist"),
519 global_id,
520 );
521 context.insert_id(Id::Global(object.id), bundle);
522 }
523
524 for (idx_id, dependencies, idx) in indexes {
526 context.export_index(
527 compute_state,
528 &tokens,
529 dependencies,
530 idx_id,
531 &idx,
532 &output_probe,
533 );
534 }
535
536 for (sink_id, dependencies, sink) in sinks {
538 context.export_sink(
539 compute_state,
540 &tokens,
541 dependencies,
542 sink_id,
543 &sink,
544 start_signal.clone(),
545 &output_probe,
546 scope,
547 );
548 }
549 });
550 }
551 });
552}
553
554impl<'g, T> Context<'g, T>
557where
558 T: Refines<mz_repr::Timestamp> + RenderTimestamp,
559{
560 fn import_filtered_index_collection<
564 'outer,
565 Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
566 V: Data,
567 >(
568 &self,
569 arranged: Arranged<'outer, Tr>,
570 start_signal: StartSignal,
571 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
572 ) -> VecCollection<'g, T, V, Tr::Diff>
573 where
574 mz_repr::Timestamp: TotalOrder,
577 {
578 let oks = arranged.stream.with_start_signal(start_signal).filter({
579 let as_of = self.as_of_frontier.clone();
580 move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
581 });
582 Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
583 }
584
585 pub(crate) fn import_index<'outer>(
586 &mut self,
587 outer: Scope<'outer, mz_repr::Timestamp>,
588 compute_state: &mut ComputeState,
589 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
590 input_probe: probe::Handle<mz_repr::Timestamp>,
591 idx_id: GlobalId,
592 idx: &IndexDesc,
593 typ: &ReprRelationType,
594 snapshot_mode: SnapshotMode,
595 start_signal: StartSignal,
596 ) {
597 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
598 assert!(
599 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
600 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
601 );
602
603 let token = traces.to_drop().clone();
604
605 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
606 outer,
607 &format!("Index({}, {:?})", idx.on_id, idx.key),
608 self.as_of_frontier.clone(),
609 self.until.clone(),
610 );
611
612 oks.stream = oks.stream.probe_with(&input_probe);
613
614 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
615 outer,
616 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
617 self.as_of_frontier.clone(),
618 self.until.clone(),
619 );
620
621 let bundle = match snapshot_mode {
622 SnapshotMode::Include => {
623 let ok_arranged = oks
624 .enter(self.scope)
625 .with_start_signal(start_signal.clone());
626 let err_arranged = err_arranged
627 .enter(self.scope)
628 .with_start_signal(start_signal);
629 CollectionBundle::from_expressions(
630 idx.key.clone(),
631 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
632 )
633 }
634 SnapshotMode::Exclude => {
635 let oks = {
642 let mut datums = DatumVec::new();
643 let (permutation, _thinning) =
644 permutation_for_arrangement(&idx.key, typ.arity());
645 self.import_filtered_index_collection(
646 oks,
647 start_signal.clone(),
648 move |k: DatumSeq, v: DatumSeq| {
649 let mut datums_borrow = datums.borrow();
650 datums_borrow.extend(k);
651 datums_borrow.extend(v);
652 SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
653 },
654 )
655 };
656 let errs = self.import_filtered_index_collection(
657 err_arranged,
658 start_signal,
659 |e, _| e.clone(),
660 );
661 CollectionBundle::from_collections(oks, errs)
662 }
663 };
664 self.update_id(Id::Global(idx.on_id), bundle);
665 tokens.insert(
666 idx_id,
667 Rc::new((PressOnDrop(ok_button), PressOnDrop(err_button), token)),
668 );
669 } else {
670 panic!(
671 "import of index {} failed while building dataflow {}",
672 idx_id, self.dataflow_id
673 );
674 }
675 }
676}
677
678impl<'g> Context<'g, mz_repr::Timestamp> {
681 pub(crate) fn export_index(
682 &self,
683 compute_state: &mut ComputeState,
684 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
685 dependency_ids: BTreeSet<GlobalId>,
686 idx_id: GlobalId,
687 idx: &IndexDesc,
688 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
689 ) {
690 let mut needed_tokens = Vec::new();
692 for dep_id in dependency_ids {
693 if let Some(token) = tokens.get(&dep_id) {
694 needed_tokens.push(Rc::clone(token));
695 }
696 }
697 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
698 panic!(
699 "Arrangement alarmingly absent! id: {:?}",
700 Id::Global(idx_id)
701 )
702 });
703
704 match bundle.arrangement(&idx.key) {
705 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
706 if let Some(&expiration) = self.dataflow_expiration.as_option() {
709 oks.stream = oks.stream.expire_stream_at(
710 &format!("{}_export_index_oks", self.debug_name),
711 expiration,
712 );
713 errs.stream = errs.stream.expire_stream_at(
714 &format!("{}_export_index_errs", self.debug_name),
715 expiration,
716 );
717 }
718
719 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
720
721 if let Some(logger) = compute_state.compute_logger.clone() {
723 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
724 }
725
726 compute_state.traces.set(
727 idx_id,
728 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
729 );
730 }
731 Some(ArrangementFlavor::Trace(gid, _, _)) => {
732 let trace = compute_state.traces.get(&gid).unwrap().clone();
735 compute_state.traces.set(idx_id, trace);
736 }
737 None => {
738 println!("collection available: {:?}", bundle.collection.is_none());
739 println!(
740 "keys available: {:?}",
741 bundle.arranged.keys().collect::<Vec<_>>()
742 );
743 panic!(
744 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
745 Id::Global(idx_id),
746 &idx.key
747 );
748 }
749 };
750 }
751}
752
753impl<'g, T> Context<'g, T>
756where
757 T: RenderTimestamp,
758{
759 pub(crate) fn export_index_iterative<'outer>(
760 &self,
761 outer: Scope<'outer, mz_repr::Timestamp>,
762 compute_state: &mut ComputeState,
763 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
764 dependency_ids: BTreeSet<GlobalId>,
765 idx_id: GlobalId,
766 idx: &IndexDesc,
767 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
768 ) {
769 let mut needed_tokens = Vec::new();
771 for dep_id in dependency_ids {
772 if let Some(token) = tokens.get(&dep_id) {
773 needed_tokens.push(Rc::clone(token));
774 }
775 }
776 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
777 panic!(
778 "Arrangement alarmingly absent! id: {:?}",
779 Id::Global(idx_id)
780 )
781 });
782
783 match bundle.arrangement(&idx.key) {
784 Some(ArrangementFlavor::Local(oks, errs)) => {
785 let mut oks = oks
789 .as_collection(|k, v| (k.to_row(), v.to_row()))
790 .leave(outer)
791 .mz_arrange::<
792 ColumnationChunker<_>,
793 RowRowBatcher<_, _>,
794 RowRowBuilder<_, _>,
795 _,
796 >(
797 "Arrange export iterative",
798 );
799
800 let mut errs = errs
801 .as_collection(|k, v| (k.clone(), v.clone()))
802 .leave(outer)
803 .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
804 "Arrange export iterative err",
805 );
806
807 if let Some(&expiration) = self.dataflow_expiration.as_option() {
810 oks.stream = oks.stream.expire_stream_at(
811 &format!("{}_export_index_iterative_oks", self.debug_name),
812 expiration,
813 );
814 errs.stream = errs.stream.expire_stream_at(
815 &format!("{}_export_index_iterative_err", self.debug_name),
816 expiration,
817 );
818 }
819
820 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
821
822 if let Some(logger) = compute_state.compute_logger.clone() {
824 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
825 }
826
827 compute_state.traces.set(
828 idx_id,
829 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
830 );
831 }
832 Some(ArrangementFlavor::Trace(gid, _, _)) => {
833 let trace = compute_state.traces.get(&gid).unwrap().clone();
836 compute_state.traces.set(idx_id, trace);
837 }
838 None => {
839 println!("collection available: {:?}", bundle.collection.is_none());
840 println!(
841 "keys available: {:?}",
842 bundle.arranged.keys().collect::<Vec<_>>()
843 );
844 panic!(
845 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
846 Id::Global(idx_id),
847 &idx.key
848 );
849 }
850 };
851 }
852}
853
854enum BindingInfo {
860 Body { in_let: bool },
861 Let { id: LocalId, last: bool },
862 LetRec { id: LocalId, last: bool },
863}
864
865impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
866 fn render_recursive_plan(
879 &mut self,
880 object_id: GlobalId,
881 level: usize,
882 plan: RenderPlan,
883 binding: BindingInfo,
884 ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
885 for BindStage { lets, recs } in plan.binds {
886 let mut let_iter = lets.into_iter().peekable();
888 while let Some(LetBind { id, value }) = let_iter.next() {
889 let bundle =
890 self.scope
891 .clone()
892 .region_named(&format!("Binding({:?})", id), |region| {
893 let depends = value.depends();
894 let last = let_iter.peek().is_none();
895 let binding = BindingInfo::Let { id, last };
896 self.enter_region(region, Some(&depends))
897 .render_letfree_plan(object_id, value, binding)
898 .leave_region(self.scope)
899 });
900 self.insert_id(Id::Local(id), bundle);
901 }
902
903 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
904
905 let mut variables = BTreeMap::new();
909 for id in rec_ids.iter() {
910 use differential_dataflow::dynamic::feedback_summary;
911 let inner = feedback_summary::<u64>(level + 1, 1);
912 let (oks_v, oks_collection) =
913 Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
914 let (err_v, err_collection) =
915 Variable::new(self.scope, Product::new(Default::default(), inner));
916
917 self.insert_id(
918 Id::Local(*id),
919 CollectionBundle::from_collections(oks_collection, err_collection),
920 );
921 variables.insert(Id::Local(*id), (oks_v, err_v));
922 }
923 let mut rec_iter = recs.into_iter().peekable();
925 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
926 let last = rec_iter.peek().is_none();
927 let binding = BindingInfo::LetRec { id, last };
928 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
929 let (oks, mut err) = bundle.collection.clone().unwrap();
932 self.insert_id(Id::Local(id), bundle);
933 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
934
935 let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
937 oks,
938 "LetRecConsolidation",
939 );
940
941 if let Some(limit) = limit {
942 let (in_limit, over_limit) =
945 oks.inner.branch_when(move |Product { inner: ps, .. }| {
946 let iteration_index = *ps.get(level).unwrap_or(&0);
948 iteration_index + 1 >= limit.max_iters.into()
950 });
951 oks = VecCollection::new(in_limit);
952 if !limit.return_at_limit {
953 err = err.concat(VecCollection::new(over_limit).map(move |_data| {
954 DataflowErrorSer::from(EvalError::LetRecLimitExceeded(
955 format!("{}", limit.max_iters.get()).into(),
956 ))
957 }));
958 }
959 }
960
961 let err: KeyCollection<_, _, _> = err.into();
967 let errs = err
968 .mz_arrange::<
969 ColumnationChunker<_>,
970 ErrBatcher<_, _>,
971 ErrBuilder<_, _>,
972 ErrSpine<_, _>,
973 >(
974 "Arrange recursive err",
975 )
976 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
977 "Distinct recursive err",
978 move |_k, _s, t| t.push(((), Diff::ONE)),
979 )
980 .as_collection(|k, _| k.clone());
981
982 oks_v.set(oks);
983 err_v.set(errs);
984 }
985 for id in rec_ids.into_iter() {
987 let bundle = self.remove_id(Id::Local(id)).unwrap();
988 let (oks, err) = bundle.collection.unwrap();
989 self.insert_id(
990 Id::Local(id),
991 CollectionBundle::from_collections(
992 oks.leave_dynamic(level + 1),
993 err.leave_dynamic(level + 1),
994 ),
995 );
996 }
997 }
998
999 self.render_letfree_plan(object_id, plan.body, binding)
1000 }
1001}
1002
1003impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
1004 fn render_plan(
1015 &mut self,
1016 object_id: GlobalId,
1017 plan: RenderPlan,
1018 ) -> CollectionBundle<'scope, T> {
1019 let mut in_let = false;
1020 for BindStage { lets, recs } in plan.binds {
1021 assert!(recs.is_empty());
1022
1023 let mut let_iter = lets.into_iter().peekable();
1024 while let Some(LetBind { id, value }) = let_iter.next() {
1025 in_let = true;
1027 let bundle =
1028 self.scope
1029 .clone()
1030 .region_named(&format!("Binding({:?})", id), |region| {
1031 let depends = value.depends();
1032 let last = let_iter.peek().is_none();
1033 let binding = BindingInfo::Let { id, last };
1034 self.enter_region(region, Some(&depends))
1035 .render_letfree_plan(object_id, value, binding)
1036 .leave_region(self.scope)
1037 });
1038 self.insert_id(Id::Local(id), bundle);
1039 }
1040 }
1041
1042 self.scope.clone().region_named("Main Body", |region| {
1043 let depends = plan.body.depends();
1044 self.enter_region(region, Some(&depends))
1045 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1046 .leave_region(self.scope)
1047 })
1048 }
1049
1050 fn render_letfree_plan(
1052 &self,
1053 object_id: GlobalId,
1054 plan: LetFreePlan,
1055 binding: BindingInfo,
1056 ) -> CollectionBundle<'scope, T> {
1057 let (mut nodes, root_id, topological_order) = plan.destruct();
1058
1059 let mut collections = BTreeMap::new();
1061
1062 let should_compute_lir_metadata = self.compute_logger.is_some();
1068 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1069 Some(Vec::with_capacity(nodes.len()))
1070 } else {
1071 None
1072 };
1073
1074 let mut topo_iter = topological_order.into_iter().peekable();
1075 while let Some(lir_id) = topo_iter.next() {
1076 let node = nodes.remove(&lir_id).unwrap();
1077
1078 let metadata = if should_compute_lir_metadata {
1082 let operator = node.expr.humanize(&DummyHumanizer);
1083
1084 let operator = if topo_iter.peek().is_none() {
1086 match &binding {
1087 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1088 BindingInfo::Body { in_let: false } => operator,
1089 BindingInfo::Let { id, last: true } => {
1090 format!("With {id} = {operator}")
1091 }
1092 BindingInfo::Let { id, last: false } => {
1093 format!("{id} = {operator}")
1094 }
1095 BindingInfo::LetRec { id, last: true } => {
1096 format!("With Recursive {id} = {operator}")
1097 }
1098 BindingInfo::LetRec { id, last: false } => {
1099 format!("{id} = {operator}")
1100 }
1101 }
1102 } else {
1103 operator
1104 };
1105
1106 let operator_id_start = self.scope.worker().peek_identifier();
1107 Some((operator, operator_id_start))
1108 } else {
1109 None
1110 };
1111
1112 let mut bundle = self.render_plan_expr(node.expr, &collections);
1113
1114 if let Some((operator, operator_id_start)) = metadata {
1115 let operator_id_end = self.scope.worker().peek_identifier();
1116 let operator_span = (operator_id_start, operator_id_end);
1117
1118 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1119 lir_mapping_metadata.push((
1120 lir_id,
1121 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1122 ))
1123 }
1124 }
1125
1126 self.log_operator_hydration(&mut bundle, lir_id);
1127
1128 collections.insert(lir_id, bundle);
1129 }
1130
1131 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1132 self.log_lir_mapping(object_id, lir_mapping_metadata);
1133 }
1134
1135 collections
1136 .remove(&root_id)
1137 .expect("LetFreePlan invariant (1)")
1138 }
1139
1140 fn render_plan_expr(
1147 &self,
1148 expr: render_plan::Expr,
1149 collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1150 ) -> CollectionBundle<'scope, T> {
1151 use render_plan::Expr::*;
1152
1153 let expect_input = |id| {
1154 collections
1155 .get(&id)
1156 .cloned()
1157 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1158 };
1159
1160 match expr {
1161 Constant { rows } => {
1162 let (rows, errs) = match rows {
1164 Ok(rows) => (rows, Vec::new()),
1165 Err(e) => (Vec::new(), vec![e]),
1166 };
1167
1168 let as_of_frontier = self.as_of_frontier.clone();
1170 let until = self.until.clone();
1171 let ok_collection = rows
1172 .into_iter()
1173 .filter_map(move |(row, mut time, diff)| {
1174 time.advance_by(as_of_frontier.borrow());
1175 if !until.less_equal(&time) {
1176 Some((
1177 row,
1178 <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1179 diff,
1180 ))
1181 } else {
1182 None
1183 }
1184 })
1185 .to_stream(self.scope)
1186 .as_collection();
1187
1188 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1189 error_time.advance_by(self.as_of_frontier.borrow());
1190 let err_collection = errs
1191 .into_iter()
1192 .map(move |e| {
1193 (
1194 DataflowErrorSer::from(e),
1195 <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1196 Diff::ONE,
1197 )
1198 })
1199 .to_stream(self.scope)
1200 .as_collection();
1201
1202 CollectionBundle::from_collections(ok_collection, err_collection)
1203 }
1204 Get { id, keys, plan } => {
1205 let mut collection = self
1208 .lookup_id(id)
1209 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1210 match plan {
1211 mz_compute_types::plan::GetPlan::PassArrangements => {
1212 assert!(
1214 keys.arranged
1215 .iter()
1216 .all(|(key, _, _)| collection.arranged.contains_key(key))
1217 );
1218 assert!(keys.raw <= collection.collection.is_some());
1219 collection.arranged.retain(|key, _value| {
1221 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1222 });
1223 collection
1224 }
1225 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1226 let (oks, errs) = collection.as_collection_core(
1227 mfp,
1228 Some((key, row)),
1229 self.until.clone(),
1230 &self.config_set,
1231 );
1232 CollectionBundle::from_collections(oks, errs)
1233 }
1234 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1235 let (oks, errs) = collection.as_collection_core(
1236 mfp,
1237 None,
1238 self.until.clone(),
1239 &self.config_set,
1240 );
1241 CollectionBundle::from_collections(oks, errs)
1242 }
1243 }
1244 }
1245 Mfp {
1246 input,
1247 mfp,
1248 input_key_val,
1249 } => {
1250 let input = expect_input(input);
1251 if mfp.is_identity() {
1253 input
1254 } else {
1255 let (oks, errs) = input.as_collection_core(
1256 mfp,
1257 input_key_val,
1258 self.until.clone(),
1259 &self.config_set,
1260 );
1261 CollectionBundle::from_collections(oks, errs)
1262 }
1263 }
1264 FlatMap {
1265 input_key,
1266 input,
1267 exprs,
1268 func,
1269 mfp_after: mfp,
1270 } => {
1271 let input = expect_input(input);
1272 self.render_flat_map(input_key, input, exprs, func, mfp)
1273 }
1274 Join { inputs, plan } => {
1275 let inputs = inputs.into_iter().map(expect_input).collect();
1276 match plan {
1277 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1278 self.render_join(inputs, linear_plan)
1279 }
1280 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1281 self.render_delta_join(inputs, delta_plan)
1282 }
1283 }
1284 }
1285 Reduce {
1286 input_key,
1287 input,
1288 key_val_plan,
1289 plan,
1290 mfp_after,
1291 temporal_bucketing_strategy,
1292 } => {
1293 let input = expect_input(input);
1294 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1295 self.render_reduce(
1296 input_key,
1297 input,
1298 key_val_plan,
1299 plan,
1300 mfp_option,
1301 temporal_bucketing_strategy,
1302 )
1303 }
1304 TopK {
1305 input,
1306 top_k_plan,
1307 temporal_bucketing_strategy,
1308 } => {
1309 let input = expect_input(input);
1310 self.render_topk(input, top_k_plan, temporal_bucketing_strategy)
1311 }
1312 Negate { input } => {
1313 let input = expect_input(input);
1314 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1315 CollectionBundle::from_collections(oks.negate(), errs)
1316 }
1317 Threshold {
1318 input,
1319 threshold_plan,
1320 } => {
1321 let input = expect_input(input);
1322 self.render_threshold(input, threshold_plan)
1323 }
1324 Union {
1325 inputs,
1326 consolidate_output,
1327 temporal_bucketing_strategies,
1328 } => {
1329 let mut oks = Vec::new();
1330 let mut errs = Vec::new();
1331 for (input, strategy) in inputs.into_iter().zip_eq(temporal_bucketing_strategies) {
1332 let (os, es) =
1333 expect_input(input).as_specific_collection(None, &self.config_set);
1334 let os = if matches!(strategy, ArrangementStrategy::TemporalBucketing)
1338 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
1339 {
1340 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1341 .get(&self.config_set)
1342 .try_into()
1343 .expect("must fit");
1344 T::maybe_apply_temporal_bucketing(
1345 os.inner,
1346 self.as_of_frontier.clone(),
1347 summary,
1348 )
1349 } else {
1350 os
1351 };
1352 oks.push(os);
1353 errs.push(es);
1354 }
1355 let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1356 if consolidate_output {
1357 oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1358 oks,
1359 "UnionConsolidation",
1360 )
1361 }
1362 let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1363 CollectionBundle::from_collections(oks, errs)
1364 }
1365 ArrangeBy {
1366 input_key,
1367 input,
1368 input_mfp,
1369 forms: keys,
1370 strategy,
1371 } => {
1372 let input = expect_input(input);
1373 input.ensure_collections(
1374 keys,
1375 input_key,
1376 input_mfp,
1377 self.as_of_frontier.clone(),
1378 self.until.clone(),
1379 &self.config_set,
1380 strategy,
1381 )
1382 }
1383 }
1384 }
1385
1386 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1387 if let Some(logger) = &self.compute_logger {
1388 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1389 dataflow_index,
1390 global_id,
1391 }));
1392 }
1393 }
1394
1395 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1396 if let Some(logger) = &self.compute_logger {
1397 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1398 }
1399 }
1400
1401 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1402 match bundle.arranged.values_mut().next() {
1422 Some(arrangement) => {
1423 use ArrangementFlavor::*;
1424
1425 match arrangement {
1426 Local(a, _) => {
1427 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1428 }
1429 Trace(_, a, _) => {
1430 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1431 }
1432 }
1433 }
1434 None => {
1435 let (oks, _) = bundle
1436 .collection
1437 .as_mut()
1438 .expect("CollectionBundle invariant");
1439 let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1440 *oks = stream.as_collection();
1441 }
1442 }
1443 }
1444
1445 fn log_operator_hydration_inner<D>(
1446 &self,
1447 stream: Stream<'scope, T, D>,
1448 lir_id: LirId,
1449 ) -> Stream<'scope, T, D>
1450 where
1451 D: timely::Container + Clone + 'static,
1452 {
1453 let Some(logger) = self.compute_logger.clone() else {
1454 return stream.clone(); };
1456
1457 let export_ids = self.export_ids.clone();
1458
1459 let mut hydration_frontier = Antichain::new();
1467 for time in self.as_of_frontier.iter() {
1468 if let Some(time) = time.try_step_forward() {
1469 hydration_frontier.insert(Refines::to_inner(time));
1470 }
1471 }
1472
1473 let name = format!("LogOperatorHydration ({lir_id})");
1474 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1475 let mut hydrated = false;
1476
1477 for &export_id in &export_ids {
1478 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1479 export_id,
1480 lir_id,
1481 hydrated,
1482 }));
1483 }
1484
1485 move |(input, frontier), output| {
1486 input.for_each(|cap, data| {
1488 output.session(&cap).give_container(data);
1489 });
1490
1491 if hydrated {
1492 return;
1493 }
1494
1495 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1496 hydrated = true;
1497
1498 for &export_id in &export_ids {
1499 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1500 export_id,
1501 lir_id,
1502 hydrated,
1503 }));
1504 }
1505 }
1506 }
1507 })
1508 }
1509}
1510
1511#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Default + Refines<mz_repr::Timestamp> {
1514 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1519 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1521 fn event_time(&self) -> mz_repr::Timestamp;
1523 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1525 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1527 fn step_back(&self) -> Self;
1530}
1531
1532pub trait MaybeBucketByTime: Timestamp {
1539 fn maybe_apply_temporal_bucketing<'scope, D>(
1540 stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1541 as_of: Antichain<mz_repr::Timestamp>,
1542 summary: mz_repr::Timestamp,
1543 ) -> VecCollection<'scope, Self, D, Diff>
1544 where
1545 D: differential_dataflow::ExchangeData
1546 + crate::typedefs::MzData
1547 + differential_dataflow::Hashable;
1548}
1549
1550impl RenderTimestamp for mz_repr::Timestamp {
1551 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1552 self
1553 }
1554 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1555 delay
1556 }
1557 fn event_time(&self) -> mz_repr::Timestamp {
1558 *self
1559 }
1560 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1561 self
1562 }
1563 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1564 delay
1565 }
1566 fn step_back(&self) -> Self {
1567 self.saturating_sub(1)
1568 }
1569}
1570
1571impl MaybeBucketByTime for mz_repr::Timestamp {
1572 fn maybe_apply_temporal_bucketing<'scope, D>(
1573 stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1574 as_of: Antichain<mz_repr::Timestamp>,
1575 summary: mz_repr::Timestamp,
1576 ) -> VecCollection<'scope, Self, D, Diff>
1577 where
1578 D: differential_dataflow::ExchangeData
1579 + crate::typedefs::MzData
1580 + differential_dataflow::Hashable,
1581 {
1582 stream
1583 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
1584 .as_collection()
1585 }
1586}
1587
1588impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1589 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1590 &mut self.outer
1591 }
1592 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1593 Product::new(delay, Default::default())
1594 }
1595 fn event_time(&self) -> mz_repr::Timestamp {
1596 self.outer
1597 }
1598 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1599 &mut self.outer
1600 }
1601 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1602 Product::new(delay, Default::default())
1603 }
1604 fn step_back(&self) -> Self {
1605 let inner = self.inner.clone();
1609 let mut vec = inner.into_inner();
1610 for item in vec.iter_mut() {
1611 *item = item.saturating_sub(1);
1612 }
1613 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1614 }
1615}
1616
1617impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
1618 fn maybe_apply_temporal_bucketing<'scope, D>(
1619 stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1620 _as_of: Antichain<mz_repr::Timestamp>,
1621 _summary: mz_repr::Timestamp,
1622 ) -> VecCollection<'scope, Self, D, Diff>
1623 where
1624 D: differential_dataflow::ExchangeData
1625 + crate::typedefs::MzData
1626 + differential_dataflow::Hashable,
1627 {
1628 stream.as_collection()
1630 }
1631}
1632
1633#[derive(Clone)]
1643pub(crate) struct StartSignal {
1644 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1649 token_ref: Weak<RefCell<Box<dyn Any>>>,
1651}
1652
1653impl StartSignal {
1654 pub fn new() -> (Self, Rc<dyn Any>) {
1657 let (tx, rx) = oneshot::channel::<Infallible>();
1658 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1659 let signal = Self {
1660 fut: rx.shared(),
1661 token_ref: Rc::downgrade(&token),
1662 };
1663 (signal, token)
1664 }
1665
1666 pub fn has_fired(&self) -> bool {
1667 self.token_ref.strong_count() == 0
1668 }
1669
1670 pub fn into_send_future(self) -> impl Future<Output = ()> + Send {
1675 use futures::FutureExt;
1676 self.fut.map(|_| ())
1677 }
1678
1679 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1680 if let Some(token) = self.token_ref.upgrade() {
1681 let mut token = token.borrow_mut();
1682 let inner = std::mem::replace(&mut *token, Box::new(()));
1683 *token = Box::new((inner, to_drop));
1684 }
1685 }
1686}
1687
1688impl Future for StartSignal {
1689 type Output = ();
1690
1691 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1692 self.fut.poll_unpin(cx).map(|_| ())
1693 }
1694}
1695
1696pub(crate) trait WithStartSignal {
1698 fn with_start_signal(self, signal: StartSignal) -> Self;
1703}
1704
1705impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1706where
1707 Tr: TraceReader<Time: RenderTimestamp> + Clone,
1708{
1709 fn with_start_signal(self, signal: StartSignal) -> Self {
1710 Arranged {
1711 stream: self.stream.with_start_signal(signal),
1712 trace: self.trace,
1713 }
1714 }
1715}
1716
1717impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1718where
1719 D: timely::Container + Clone + 'static,
1720{
1721 fn with_start_signal(self, signal: StartSignal) -> Self {
1722 let activations = self.scope().activations();
1723 self.unary(Pipeline, "StartSignal", |_cap, info| {
1724 let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1725 signal.drop_on_fire(token);
1726
1727 let mut stash = Vec::new();
1728
1729 move |input, output| {
1730 if !signal.has_fired() {
1732 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1733 return;
1734 }
1735
1736 for (cap, mut data) in std::mem::take(&mut stash) {
1738 output.session(&cap).give_container(&mut data);
1739 }
1740
1741 input.for_each(|cap, data| {
1743 output.session(&cap).give_container(data);
1744 });
1745 }
1746 })
1747 }
1748}
1749
1750fn suppress_early_progress<'scope, T: Timestamp, D>(
1772 stream: Stream<'scope, T, D>,
1773 as_of: Antichain<T>,
1774) -> Stream<'scope, T, D>
1775where
1776 D: Data + timely::Container,
1777{
1778 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1779 let mut early_cap = Some(default_cap);
1780
1781 move |(input, frontier), output| {
1782 input.for_each_time(|data_cap, data| {
1783 if as_of.less_than(data_cap.time()) {
1784 let mut session = output.session(&data_cap);
1785 for data in data {
1786 session.give_container(data);
1787 }
1788 } else {
1789 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1790 let mut session = output.session(&cap);
1791 for data in data {
1792 session.give_container(data);
1793 }
1794 }
1795 });
1796
1797 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1798 early_cap.take();
1799 }
1800 }
1801 })
1802}
1803
1804trait LimitProgress<T: Timestamp> {
1806 fn limit_progress(
1834 self,
1835 handle: MzProbeHandle<T>,
1836 slack_ms: u64,
1837 limit: Option<usize>,
1838 upper: Antichain<T>,
1839 name: String,
1840 ) -> Self;
1841}
1842
1843impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1846 for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1847where
1848 D: Clone + 'static,
1849 R: Clone + 'static,
1850{
1851 fn limit_progress(
1852 self,
1853 handle: MzProbeHandle<mz_repr::Timestamp>,
1854 slack_ms: u64,
1855 limit: Option<usize>,
1856 upper: Antichain<mz_repr::Timestamp>,
1857 name: String,
1858 ) -> Self {
1859 let scope = self.scope();
1860 let stream =
1861 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1862 let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1864 let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1866
1867 let activator = scope.activator_for(info.address);
1868 handle.activate(activator.clone());
1869
1870 move |(input, frontier), output| {
1871 input.for_each(|cap, data| {
1872 for time in data
1873 .iter()
1874 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1875 {
1876 let rounded_time = if slack_ms == 0 {
1880 time
1881 } else {
1882 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms)
1883 };
1884 if !upper.less_than(&rounded_time.into()) {
1885 pending_times.insert(rounded_time.into());
1886 }
1887 }
1888 output.session(&cap).give_container(data);
1889 if retained_cap.as_ref().is_none_or(|c| {
1890 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1891 }) {
1892 retained_cap = Some(cap.retain(0));
1893 }
1894 });
1895
1896 handle.with_frontier(|f| {
1897 while pending_times
1898 .first()
1899 .map_or(false, |retained_time| !f.less_than(&retained_time))
1900 {
1901 let _ = pending_times.pop_first();
1902 }
1903 });
1904
1905 while limit.map_or(false, |limit| pending_times.len() > limit) {
1906 let _ = pending_times.pop_first();
1907 }
1908
1909 match (retained_cap.as_mut(), pending_times.first()) {
1910 (Some(cap), Some(first)) => cap.downgrade(first),
1911 (_, None) => retained_cap = None,
1912 _ => {}
1913 }
1914
1915 if frontier.is_empty() {
1916 retained_cap = None;
1917 pending_times.clear();
1918 }
1919
1920 if !pending_times.is_empty() {
1921 tracing::debug!(
1922 name,
1923 info.global_id,
1924 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1925 frontier = ?frontier.frontier().get(0),
1926 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1927 ?upper,
1928 "pending times",
1929 );
1930 }
1931 }
1932 });
1933 stream
1934 }
1935}
1936
1937struct PendingTimesDisplay<T>(T);
1940
1941impl<T> std::fmt::Display for PendingTimesDisplay<T>
1942where
1943 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1944{
1945 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1946 let mut iter = self.0.clone().into_iter();
1947 write!(f, "[")?;
1948 if let Some(first) = iter.next() {
1949 write!(f, "{}", first)?;
1950 let mut last = u64::from(first);
1951 for time in iter {
1952 write!(f, ", +{}", u64::from(time) - last)?;
1953 last = u64::from(time);
1954 }
1955 }
1956 write!(f, "]")?;
1957 Ok(())
1958 }
1959}
1960
1961#[derive(Clone, Copy, Debug)]
1964struct Pairer {
1965 split_arity: usize,
1966}
1967
1968impl Pairer {
1969 fn new(split_arity: usize) -> Self {
1971 Self { split_arity }
1972 }
1973
1974 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1976 where
1977 I1: IntoIterator<Item = Datum<'a>>,
1978 I2: IntoIterator<Item = Datum<'a>>,
1979 {
1980 SharedRow::pack(first.into_iter().chain(second))
1981 }
1982
1983 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1985 let mut datum_iter = datum_iter.into_iter();
1986 let mut row_builder = SharedRow::get();
1987 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1988 let second = row_builder.pack_using(datum_iter);
1989 (first, second)
1990 }
1991}