1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::Variable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use mz_timely_util::scope_label::ScopeExt;
141use timely::PartialOrder;
142use timely::communication::Allocate;
143use timely::container::CapacityContainerBuilder;
144use timely::dataflow::channels::pact::Pipeline;
145use timely::dataflow::operators::vec::ToStream;
146use timely::dataflow::operators::vec::{BranchWhen, Filter};
147use timely::dataflow::operators::{Capability, Operator, Probe, probe};
148use timely::dataflow::scopes::Child;
149use timely::dataflow::{Scope, Stream, StreamVec};
150use timely::order::{Product, TotalOrder};
151use timely::progress::timestamp::Refines;
152use timely::progress::{Antichain, Timestamp};
153use timely::scheduling::ActivateOnDrop;
154use timely::worker::{AsWorker, Worker as TimelyWorker};
155
156use crate::arrangement::manager::TraceBundle;
157use crate::compute_state::ComputeState;
158use crate::extensions::arrange::{KeyCollection, MzArrange};
159use crate::extensions::reduce::MzReduce;
160use crate::extensions::temporal_bucket::TemporalBucketing;
161use crate::logging::compute::{
162 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
163};
164use crate::render::context::{ArrangementFlavor, Context};
165use crate::render::continual_task::ContinualTaskCtx;
166use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
167use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
168
169pub mod context;
170pub(crate) mod continual_task;
171mod errors;
172mod flat_map;
173mod join;
174mod reduce;
175pub mod sinks;
176mod threshold;
177mod top_k;
178
179pub use context::CollectionBundle;
180pub use join::LinearJoinSpec;
181
182pub fn build_compute_dataflow<A: Allocate>(
188 timely_worker: &mut TimelyWorker<A>,
189 compute_state: &mut ComputeState,
190 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
191 start_signal: StartSignal,
192 until: Antichain<mz_repr::Timestamp>,
193 dataflow_expiration: Antichain<mz_repr::Timestamp>,
194) {
195 let recursive = dataflow
197 .objects_to_build
198 .iter()
199 .any(|object| object.plan.is_recursive());
200
201 let indexes = dataflow
203 .index_exports
204 .iter()
205 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
206 .collect::<Vec<_>>();
207
208 let sinks = dataflow
210 .sink_exports
211 .iter()
212 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
213 .collect::<Vec<_>>();
214
215 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
216 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
217 let subscribe_snapshot_optimization =
218 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
219
220 let name = format!("Dataflow: {}", &dataflow.debug_name);
221 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
222 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
223
224 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
225 let scope = &mut scope.with_label();
226
227 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
230
231 let mut imported_sources = Vec::new();
236 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
237 let output_probe = MzProbeHandle::default();
238
239 scope.clone().region_named(&input_name, |region| {
240 for (source_id, import) in dataflow.source_imports.iter() {
242 region.region_named(&format!("Source({:?})", source_id), |inner| {
243 let mut read_schema = None;
244 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
245 if apply_demands {
248 let demands = ops.demand();
249 let new_desc = import
250 .desc
251 .storage_metadata
252 .relation_desc
253 .apply_demand(&demands);
254 let new_arity = demands.len();
255 let remap: BTreeMap<_, _> = demands
256 .into_iter()
257 .enumerate()
258 .map(|(new, old)| (old, new))
259 .collect();
260 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
261 read_schema = Some(new_desc);
262 }
263
264 mz_expr::MfpPlan::create_from(ops)
265 .expect("Linear operators should always be valid")
266 });
267
268 let mut snapshot_mode =
269 if import.with_snapshot || !subscribe_snapshot_optimization {
270 SnapshotMode::Include
271 } else {
272 compute_state.metrics.inc_subscribe_snapshot_optimization();
273 SnapshotMode::Exclude
274 };
275 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
276 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
277 if let Some(x) = ct_source_transformer.as_ref() {
278 snapshot_mode = x.snapshot_mode();
279 suppress_early_progress_as_of = suppress_early_progress_as_of
280 .map(|as_of| x.suppress_early_progress_as_of(as_of));
281 }
282
283 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
286 inner,
287 *source_id,
288 Arc::clone(&compute_state.persist_clients),
289 &compute_state.txns_ctx,
290 import.desc.storage_metadata.clone(),
291 read_schema,
292 dataflow.as_of.clone(),
293 snapshot_mode,
294 until.clone(),
295 mfp.as_mut(),
296 compute_state.dataflow_max_inflight_bytes(),
297 start_signal.clone(),
298 ErrorHandler::Halt("compute_import"),
299 );
300
301 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
304
305 if let Some(as_of) = suppress_early_progress_as_of {
309 ok_stream = suppress_early_progress(ok_stream, as_of);
310 }
311
312 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
313 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
315 .get(&compute_state.worker_config);
316 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
317 .get(&compute_state.worker_config)
318 .as_millis()
319 .try_into()
320 .expect("must fit");
321
322 let stream = ok_stream.limit_progress(
323 output_probe.clone(),
324 slack,
325 limit,
326 import.upper.clone(),
327 name.clone(),
328 );
329 ok_stream = stream;
330 }
331
332 let input_probe =
334 compute_state.input_probe_for(*source_id, dataflow.export_ids());
335 ok_stream = ok_stream.probe_with(&input_probe);
336
337 let (ok_stream, err_stream) = match ct_source_transformer {
341 None => (ok_stream, err_stream),
342 Some(ct_source_transformer) => {
343 let (oks, errs, ct_times) = ct_source_transformer
344 .transform(ok_stream.as_collection(), err_stream.as_collection());
345 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
348 (oks.inner, errs.inner)
349 }
350 };
351
352 let (oks, errs) = (
353 ok_stream.as_collection().leave_region().leave_region(),
354 err_stream.as_collection().leave_region().leave_region(),
355 );
356
357 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
358
359 tokens.insert(*source_id, Rc::new(token));
361 });
362 }
363 });
364
365 if recursive {
368 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
369 let mut context = Context::for_dataflow_in(
370 &dataflow,
371 region.clone(),
372 compute_state,
373 until,
374 dataflow_expiration,
375 );
376
377 for (id, (oks, errs)) in imported_sources.into_iter() {
378 let bundle = crate::render::CollectionBundle::from_collections(
379 oks.enter(region),
380 errs.enter(region),
381 );
382 context.insert_id(id, bundle);
384 }
385
386 for (idx_id, idx) in &dataflow.index_imports {
388 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
389 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
390 SnapshotMode::Include
391 } else {
392 compute_state.metrics.inc_subscribe_snapshot_optimization();
393 SnapshotMode::Exclude
394 };
395 context.import_index(
396 compute_state,
397 &mut tokens,
398 input_probe,
399 *idx_id,
400 &idx.desc,
401 &idx.typ,
402 snapshot_mode,
403 start_signal.clone(),
404 );
405 }
406
407 for object in dataflow.objects_to_build {
409 let bundle = context.scope.clone().region_named(
410 &format!("BuildingObject({:?})", object.id),
411 |region| {
412 let depends = object.plan.depends();
413 let in_let = object.plan.is_recursive();
414 context
415 .enter_region(region, Some(&depends))
416 .render_recursive_plan(
417 object.id,
418 0,
419 object.plan,
420 BindingInfo::Body { in_let },
422 )
423 .leave_region()
424 },
425 );
426 let global_id = object.id;
427
428 context.log_dataflow_global_id(
429 *bundle
430 .scope()
431 .addr()
432 .first()
433 .expect("Dataflow root id must exist"),
434 global_id,
435 );
436 context.insert_id(Id::Global(object.id), bundle);
437 }
438
439 for (idx_id, dependencies, idx) in indexes {
441 context.export_index_iterative(
442 compute_state,
443 &tokens,
444 dependencies,
445 idx_id,
446 &idx,
447 &output_probe,
448 );
449 }
450
451 for (sink_id, dependencies, sink) in sinks {
453 context.export_sink(
454 compute_state,
455 &tokens,
456 dependencies,
457 sink_id,
458 &sink,
459 start_signal.clone(),
460 ct_ctx.input_times(&context.scope.parent),
461 &output_probe,
462 );
463 }
464 });
465 } else {
466 scope.clone().region_named(&build_name, |region| {
467 let mut context = Context::for_dataflow_in(
468 &dataflow,
469 region.clone(),
470 compute_state,
471 until,
472 dataflow_expiration,
473 );
474
475 for (id, (oks, errs)) in imported_sources.into_iter() {
476 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
477 let as_of = context.as_of_frontier.clone();
478 let summary = TEMPORAL_BUCKETING_SUMMARY
479 .get(&compute_state.worker_config)
480 .try_into()
481 .expect("must fit");
482 oks.inner
483 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
484 .as_collection()
485 } else {
486 oks
487 };
488 let bundle = crate::render::CollectionBundle::from_collections(
489 oks.enter_region(region),
490 errs.enter_region(region),
491 );
492 context.insert_id(id, bundle);
494 }
495
496 for (idx_id, idx) in &dataflow.index_imports {
498 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
499 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
500 SnapshotMode::Include
501 } else {
502 compute_state.metrics.inc_subscribe_snapshot_optimization();
503 SnapshotMode::Exclude
504 };
505 context.import_index(
506 compute_state,
507 &mut tokens,
508 input_probe,
509 *idx_id,
510 &idx.desc,
511 &idx.typ,
512 snapshot_mode,
513 start_signal.clone(),
514 );
515 }
516
517 for object in dataflow.objects_to_build {
519 let bundle = context.scope.clone().region_named(
520 &format!("BuildingObject({:?})", object.id),
521 |region| {
522 let depends = object.plan.depends();
523 context
524 .enter_region(region, Some(&depends))
525 .render_plan(object.id, object.plan)
526 .leave_region()
527 },
528 );
529 let global_id = object.id;
530 context.log_dataflow_global_id(
531 *bundle
532 .scope()
533 .addr()
534 .first()
535 .expect("Dataflow root id must exist"),
536 global_id,
537 );
538 context.insert_id(Id::Global(object.id), bundle);
539 }
540
541 for (idx_id, dependencies, idx) in indexes {
543 context.export_index(
544 compute_state,
545 &tokens,
546 dependencies,
547 idx_id,
548 &idx,
549 &output_probe,
550 );
551 }
552
553 for (sink_id, dependencies, sink) in sinks {
555 context.export_sink(
556 compute_state,
557 &tokens,
558 dependencies,
559 sink_id,
560 &sink,
561 start_signal.clone(),
562 ct_ctx.input_times(&context.scope.parent),
563 &output_probe,
564 );
565 }
566 });
567 }
568 });
569}
570
571impl<'g, G, T> Context<Child<'g, G, T>>
574where
575 G: Scope<Timestamp = mz_repr::Timestamp>,
576 T: Refines<G::Timestamp> + RenderTimestamp,
577{
578 fn import_filtered_index_collection<Tr: TraceReader<Time = G::Timestamp> + Clone, V: Data>(
582 &self,
583 arranged: Arranged<G, Tr>,
584 start_signal: StartSignal,
585 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
586 ) -> VecCollection<Child<'g, G, T>, V, Tr::Diff>
587 where
588 G::Timestamp: TotalOrder,
591 {
592 let oks = arranged.stream.with_start_signal(start_signal).filter({
593 let as_of = self.as_of_frontier.clone();
594 move |b| !<Antichain<G::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
595 });
596 Arranged::<G, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(&self.scope)
597 }
598
599 pub(crate) fn import_index(
600 &mut self,
601 compute_state: &mut ComputeState,
602 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
603 input_probe: probe::Handle<mz_repr::Timestamp>,
604 idx_id: GlobalId,
605 idx: &IndexDesc,
606 typ: &ReprRelationType,
607 snapshot_mode: SnapshotMode,
608 start_signal: StartSignal,
609 ) {
610 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
611 assert!(
612 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
613 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
614 );
615
616 let token = traces.to_drop().clone();
617
618 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
619 &self.scope.parent,
620 &format!("Index({}, {:?})", idx.on_id, idx.key),
621 self.as_of_frontier.clone(),
622 self.until.clone(),
623 );
624
625 oks.stream = oks.stream.probe_with(&input_probe);
626
627 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
628 &self.scope.parent,
629 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
630 self.as_of_frontier.clone(),
631 self.until.clone(),
632 );
633
634 let bundle = match snapshot_mode {
635 SnapshotMode::Include => {
636 let ok_arranged = oks
637 .enter(&self.scope)
638 .with_start_signal(start_signal.clone());
639 let err_arranged = err_arranged
640 .enter(&self.scope)
641 .with_start_signal(start_signal);
642 CollectionBundle::from_expressions(
643 idx.key.clone(),
644 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
645 )
646 }
647 SnapshotMode::Exclude => {
648 let oks = {
655 let mut datums = DatumVec::new();
656 let (permutation, _thinning) =
657 permutation_for_arrangement(&idx.key, typ.arity());
658 self.import_filtered_index_collection(
659 oks,
660 start_signal.clone(),
661 move |k: DatumSeq, v: DatumSeq| {
662 let mut datums_borrow = datums.borrow();
663 datums_borrow.extend(k);
664 datums_borrow.extend(v);
665 SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
666 },
667 )
668 };
669 let errs = self.import_filtered_index_collection(
670 err_arranged,
671 start_signal,
672 |e, _| e.clone(),
673 );
674 CollectionBundle::from_collections(oks, errs)
675 }
676 };
677 self.update_id(Id::Global(idx.on_id), bundle);
678 tokens.insert(
679 idx_id,
680 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
681 );
682 } else {
683 panic!(
684 "import of index {} failed while building dataflow {}",
685 idx_id, self.dataflow_id
686 );
687 }
688 }
689}
690
691impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
694where
695 G: Scope<Timestamp = mz_repr::Timestamp>,
696{
697 pub(crate) fn export_index(
698 &self,
699 compute_state: &mut ComputeState,
700 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
701 dependency_ids: BTreeSet<GlobalId>,
702 idx_id: GlobalId,
703 idx: &IndexDesc,
704 output_probe: &MzProbeHandle<G::Timestamp>,
705 ) {
706 let mut needed_tokens = Vec::new();
708 for dep_id in dependency_ids {
709 if let Some(token) = tokens.get(&dep_id) {
710 needed_tokens.push(Rc::clone(token));
711 }
712 }
713 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
714 panic!(
715 "Arrangement alarmingly absent! id: {:?}",
716 Id::Global(idx_id)
717 )
718 });
719
720 match bundle.arrangement(&idx.key) {
721 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
722 if let Some(&expiration) = self.dataflow_expiration.as_option() {
725 oks.stream = oks.stream.expire_stream_at(
726 &format!("{}_export_index_oks", self.debug_name),
727 expiration,
728 );
729 errs.stream = errs.stream.expire_stream_at(
730 &format!("{}_export_index_errs", self.debug_name),
731 expiration,
732 );
733 }
734
735 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
736
737 if let Some(logger) = compute_state.compute_logger.clone() {
739 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
740 }
741
742 compute_state.traces.set(
743 idx_id,
744 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
745 );
746 }
747 Some(ArrangementFlavor::Trace(gid, _, _)) => {
748 let trace = compute_state.traces.get(&gid).unwrap().clone();
751 compute_state.traces.set(idx_id, trace);
752 }
753 None => {
754 println!("collection available: {:?}", bundle.collection.is_none());
755 println!(
756 "keys available: {:?}",
757 bundle.arranged.keys().collect::<Vec<_>>()
758 );
759 panic!(
760 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
761 Id::Global(idx_id),
762 &idx.key
763 );
764 }
765 };
766 }
767}
768
769impl<'g, G, T> Context<Child<'g, G, T>>
772where
773 G: Scope<Timestamp = mz_repr::Timestamp>,
774 T: RenderTimestamp,
775{
776 pub(crate) fn export_index_iterative(
777 &self,
778 compute_state: &mut ComputeState,
779 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
780 dependency_ids: BTreeSet<GlobalId>,
781 idx_id: GlobalId,
782 idx: &IndexDesc,
783 output_probe: &MzProbeHandle<G::Timestamp>,
784 ) {
785 let mut needed_tokens = Vec::new();
787 for dep_id in dependency_ids {
788 if let Some(token) = tokens.get(&dep_id) {
789 needed_tokens.push(Rc::clone(token));
790 }
791 }
792 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
793 panic!(
794 "Arrangement alarmingly absent! id: {:?}",
795 Id::Global(idx_id)
796 )
797 });
798
799 match bundle.arrangement(&idx.key) {
800 Some(ArrangementFlavor::Local(oks, errs)) => {
801 let mut oks = oks
805 .as_collection(|k, v| (k.to_row(), v.to_row()))
806 .leave()
807 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
808 "Arrange export iterative",
809 );
810
811 let mut errs = errs
812 .as_collection(|k, v| (k.clone(), v.clone()))
813 .leave()
814 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
815 "Arrange export iterative err",
816 );
817
818 if let Some(&expiration) = self.dataflow_expiration.as_option() {
821 oks.stream = oks.stream.expire_stream_at(
822 &format!("{}_export_index_iterative_oks", self.debug_name),
823 expiration,
824 );
825 errs.stream = errs.stream.expire_stream_at(
826 &format!("{}_export_index_iterative_err", self.debug_name),
827 expiration,
828 );
829 }
830
831 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
832
833 if let Some(logger) = compute_state.compute_logger.clone() {
835 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
836 }
837
838 compute_state.traces.set(
839 idx_id,
840 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
841 );
842 }
843 Some(ArrangementFlavor::Trace(gid, _, _)) => {
844 let trace = compute_state.traces.get(&gid).unwrap().clone();
847 compute_state.traces.set(idx_id, trace);
848 }
849 None => {
850 println!("collection available: {:?}", bundle.collection.is_none());
851 println!(
852 "keys available: {:?}",
853 bundle.arranged.keys().collect::<Vec<_>>()
854 );
855 panic!(
856 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
857 Id::Global(idx_id),
858 &idx.key
859 );
860 }
861 };
862 }
863}
864
865enum BindingInfo {
871 Body { in_let: bool },
872 Let { id: LocalId, last: bool },
873 LetRec { id: LocalId, last: bool },
874}
875
876impl<G> Context<G>
877where
878 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
879{
880 fn render_recursive_plan(
893 &mut self,
894 object_id: GlobalId,
895 level: usize,
896 plan: RenderPlan,
897 binding: BindingInfo,
898 ) -> CollectionBundle<G> {
899 for BindStage { lets, recs } in plan.binds {
900 let mut let_iter = lets.into_iter().peekable();
902 while let Some(LetBind { id, value }) = let_iter.next() {
903 let bundle =
904 self.scope
905 .clone()
906 .region_named(&format!("Binding({:?})", id), |region| {
907 let depends = value.depends();
908 let last = let_iter.peek().is_none();
909 let binding = BindingInfo::Let { id, last };
910 self.enter_region(region, Some(&depends))
911 .render_letfree_plan(object_id, value, binding)
912 .leave_region()
913 });
914 self.insert_id(Id::Local(id), bundle);
915 }
916
917 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
918
919 let mut variables = BTreeMap::new();
923 for id in rec_ids.iter() {
924 use differential_dataflow::dynamic::feedback_summary;
925 let inner = feedback_summary::<u64>(level + 1, 1);
926 let (oks_v, oks_collection) = Variable::new(
927 &mut self.scope,
928 Product::new(Default::default(), inner.clone()),
929 );
930 let (err_v, err_collection) =
931 Variable::new(&mut self.scope, Product::new(Default::default(), inner));
932
933 self.insert_id(
934 Id::Local(*id),
935 CollectionBundle::from_collections(oks_collection, err_collection),
936 );
937 variables.insert(Id::Local(*id), (oks_v, err_v));
938 }
939 let mut rec_iter = recs.into_iter().peekable();
941 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
942 let last = rec_iter.peek().is_none();
943 let binding = BindingInfo::LetRec { id, last };
944 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
945 let (oks, mut err) = bundle.collection.clone().unwrap();
948 self.insert_id(Id::Local(id), bundle);
949 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
950
951 let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
953 oks,
954 "LetRecConsolidation",
955 );
956
957 if let Some(limit) = limit {
958 let (in_limit, over_limit) =
961 oks.inner.branch_when(move |Product { inner: ps, .. }| {
962 let iteration_index = *ps.get(level).unwrap_or(&0);
964 iteration_index + 1 >= limit.max_iters.into()
966 });
967 oks = VecCollection::new(in_limit);
968 if !limit.return_at_limit {
969 err = err.concat(VecCollection::new(over_limit).map(move |_data| {
970 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
971 format!("{}", limit.max_iters.get()).into(),
972 )))
973 }));
974 }
975 }
976
977 let err: KeyCollection<_, _, _> = err.into();
983 let errs = err
984 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
985 "Arrange recursive err",
986 )
987 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
988 "Distinct recursive err",
989 move |_k, _s, t| t.push(((), Diff::ONE)),
990 )
991 .as_collection(|k, _| k.clone());
992
993 oks_v.set(oks);
994 err_v.set(errs);
995 }
996 for id in rec_ids.into_iter() {
998 let bundle = self.remove_id(Id::Local(id)).unwrap();
999 let (oks, err) = bundle.collection.unwrap();
1000 self.insert_id(
1001 Id::Local(id),
1002 CollectionBundle::from_collections(
1003 oks.leave_dynamic(level + 1),
1004 err.leave_dynamic(level + 1),
1005 ),
1006 );
1007 }
1008 }
1009
1010 self.render_letfree_plan(object_id, plan.body, binding)
1011 }
1012}
1013
1014impl<G> Context<G>
1015where
1016 G: Scope,
1017 G::Timestamp: RenderTimestamp,
1018{
1019 fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
1030 let mut in_let = false;
1031 for BindStage { lets, recs } in plan.binds {
1032 assert!(recs.is_empty());
1033
1034 let mut let_iter = lets.into_iter().peekable();
1035 while let Some(LetBind { id, value }) = let_iter.next() {
1036 in_let = true;
1038 let bundle =
1039 self.scope
1040 .clone()
1041 .region_named(&format!("Binding({:?})", id), |region| {
1042 let depends = value.depends();
1043 let last = let_iter.peek().is_none();
1044 let binding = BindingInfo::Let { id, last };
1045 self.enter_region(region, Some(&depends))
1046 .render_letfree_plan(object_id, value, binding)
1047 .leave_region()
1048 });
1049 self.insert_id(Id::Local(id), bundle);
1050 }
1051 }
1052
1053 self.scope.clone().region_named("Main Body", |region| {
1054 let depends = plan.body.depends();
1055 self.enter_region(region, Some(&depends))
1056 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1057 .leave_region()
1058 })
1059 }
1060
1061 fn render_letfree_plan(
1063 &mut self,
1064 object_id: GlobalId,
1065 plan: LetFreePlan,
1066 binding: BindingInfo,
1067 ) -> CollectionBundle<G> {
1068 let (mut nodes, root_id, topological_order) = plan.destruct();
1069
1070 let mut collections = BTreeMap::new();
1072
1073 let should_compute_lir_metadata = self.compute_logger.is_some();
1079 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1080 Some(Vec::with_capacity(nodes.len()))
1081 } else {
1082 None
1083 };
1084
1085 let mut topo_iter = topological_order.into_iter().peekable();
1086 while let Some(lir_id) = topo_iter.next() {
1087 let node = nodes.remove(&lir_id).unwrap();
1088
1089 let metadata = if should_compute_lir_metadata {
1093 let operator = node.expr.humanize(&DummyHumanizer);
1094
1095 let operator = if topo_iter.peek().is_none() {
1097 match &binding {
1098 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1099 BindingInfo::Body { in_let: false } => operator,
1100 BindingInfo::Let { id, last: true } => {
1101 format!("With {id} = {operator}")
1102 }
1103 BindingInfo::Let { id, last: false } => {
1104 format!("{id} = {operator}")
1105 }
1106 BindingInfo::LetRec { id, last: true } => {
1107 format!("With Recursive {id} = {operator}")
1108 }
1109 BindingInfo::LetRec { id, last: false } => {
1110 format!("{id} = {operator}")
1111 }
1112 }
1113 } else {
1114 operator
1115 };
1116
1117 let operator_id_start = self.scope.peek_identifier();
1118 Some((operator, operator_id_start))
1119 } else {
1120 None
1121 };
1122
1123 let mut bundle = self.render_plan_expr(node.expr, &collections);
1124
1125 if let Some((operator, operator_id_start)) = metadata {
1126 let operator_id_end = self.scope.peek_identifier();
1127 let operator_span = (operator_id_start, operator_id_end);
1128
1129 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1130 lir_mapping_metadata.push((
1131 lir_id,
1132 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1133 ))
1134 }
1135 }
1136
1137 self.log_operator_hydration(&mut bundle, lir_id);
1138
1139 collections.insert(lir_id, bundle);
1140 }
1141
1142 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1143 self.log_lir_mapping(object_id, lir_mapping_metadata);
1144 }
1145
1146 collections
1147 .remove(&root_id)
1148 .expect("LetFreePlan invariant (1)")
1149 }
1150
1151 fn render_plan_expr(
1158 &mut self,
1159 expr: render_plan::Expr,
1160 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1161 ) -> CollectionBundle<G> {
1162 use render_plan::Expr::*;
1163
1164 let expect_input = |id| {
1165 collections
1166 .get(&id)
1167 .cloned()
1168 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1169 };
1170
1171 match expr {
1172 Constant { rows } => {
1173 let (rows, errs) = match rows {
1175 Ok(rows) => (rows, Vec::new()),
1176 Err(e) => (Vec::new(), vec![e]),
1177 };
1178
1179 let as_of_frontier = self.as_of_frontier.clone();
1181 let until = self.until.clone();
1182 let ok_collection = rows
1183 .into_iter()
1184 .filter_map(move |(row, mut time, diff)| {
1185 time.advance_by(as_of_frontier.borrow());
1186 if !until.less_equal(&time) {
1187 Some((
1188 row,
1189 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1190 diff,
1191 ))
1192 } else {
1193 None
1194 }
1195 })
1196 .to_stream(&mut self.scope)
1197 .as_collection();
1198
1199 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1200 error_time.advance_by(self.as_of_frontier.borrow());
1201 let err_collection = errs
1202 .into_iter()
1203 .map(move |e| {
1204 (
1205 DataflowError::from(e),
1206 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1207 Diff::ONE,
1208 )
1209 })
1210 .to_stream(&mut self.scope)
1211 .as_collection();
1212
1213 CollectionBundle::from_collections(ok_collection, err_collection)
1214 }
1215 Get { id, keys, plan } => {
1216 let mut collection = self
1219 .lookup_id(id)
1220 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1221 match plan {
1222 mz_compute_types::plan::GetPlan::PassArrangements => {
1223 assert!(
1225 keys.arranged
1226 .iter()
1227 .all(|(key, _, _)| collection.arranged.contains_key(key))
1228 );
1229 assert!(keys.raw <= collection.collection.is_some());
1230 collection.arranged.retain(|key, _value| {
1232 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1233 });
1234 collection
1235 }
1236 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1237 let (oks, errs) = collection.as_collection_core(
1238 mfp,
1239 Some((key, row)),
1240 self.until.clone(),
1241 &self.config_set,
1242 );
1243 CollectionBundle::from_collections(oks, errs)
1244 }
1245 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1246 let (oks, errs) = collection.as_collection_core(
1247 mfp,
1248 None,
1249 self.until.clone(),
1250 &self.config_set,
1251 );
1252 CollectionBundle::from_collections(oks, errs)
1253 }
1254 }
1255 }
1256 Mfp {
1257 input,
1258 mfp,
1259 input_key_val,
1260 } => {
1261 let input = expect_input(input);
1262 if mfp.is_identity() {
1264 input
1265 } else {
1266 let (oks, errs) = input.as_collection_core(
1267 mfp,
1268 input_key_val,
1269 self.until.clone(),
1270 &self.config_set,
1271 );
1272 CollectionBundle::from_collections(oks, errs)
1273 }
1274 }
1275 FlatMap {
1276 input_key,
1277 input,
1278 exprs,
1279 func,
1280 mfp_after: mfp,
1281 } => {
1282 let input = expect_input(input);
1283 self.render_flat_map(input_key, input, exprs, func, mfp)
1284 }
1285 Join { inputs, plan } => {
1286 let inputs = inputs.into_iter().map(expect_input).collect();
1287 match plan {
1288 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1289 self.render_join(inputs, linear_plan)
1290 }
1291 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1292 self.render_delta_join(inputs, delta_plan)
1293 }
1294 }
1295 }
1296 Reduce {
1297 input_key,
1298 input,
1299 key_val_plan,
1300 plan,
1301 mfp_after,
1302 } => {
1303 let input = expect_input(input);
1304 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1305 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1306 }
1307 TopK { input, top_k_plan } => {
1308 let input = expect_input(input);
1309 self.render_topk(input, top_k_plan)
1310 }
1311 Negate { input } => {
1312 let input = expect_input(input);
1313 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1314 CollectionBundle::from_collections(oks.negate(), errs)
1315 }
1316 Threshold {
1317 input,
1318 threshold_plan,
1319 } => {
1320 let input = expect_input(input);
1321 self.render_threshold(input, threshold_plan)
1322 }
1323 Union {
1324 inputs,
1325 consolidate_output,
1326 } => {
1327 let mut oks = Vec::new();
1328 let mut errs = Vec::new();
1329 for input in inputs.into_iter() {
1330 let (os, es) =
1331 expect_input(input).as_specific_collection(None, &self.config_set);
1332 oks.push(os);
1333 errs.push(es);
1334 }
1335 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1336 if consolidate_output {
1337 oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1338 oks,
1339 "UnionConsolidation",
1340 )
1341 }
1342 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1343 CollectionBundle::from_collections(oks, errs)
1344 }
1345 ArrangeBy {
1346 input_key,
1347 input,
1348 input_mfp,
1349 forms: keys,
1350 } => {
1351 let input = expect_input(input);
1352 input.ensure_collections(
1353 keys,
1354 input_key,
1355 input_mfp,
1356 self.until.clone(),
1357 &self.config_set,
1358 )
1359 }
1360 }
1361 }
1362
1363 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1364 if let Some(logger) = &self.compute_logger {
1365 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1366 dataflow_index,
1367 global_id,
1368 }));
1369 }
1370 }
1371
1372 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1373 if let Some(logger) = &self.compute_logger {
1374 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1375 }
1376 }
1377
1378 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1379 match bundle.arranged.values_mut().next() {
1399 Some(arrangement) => {
1400 use ArrangementFlavor::*;
1401
1402 match arrangement {
1403 Local(a, _) => {
1404 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1405 }
1406 Trace(_, a, _) => {
1407 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1408 }
1409 }
1410 }
1411 None => {
1412 let (oks, _) = bundle
1413 .collection
1414 .as_mut()
1415 .expect("CollectionBundle invariant");
1416 let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1417 *oks = stream.as_collection();
1418 }
1419 }
1420 }
1421
1422 fn log_operator_hydration_inner<D>(&self, stream: Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1423 where
1424 D: timely::Container + Clone + 'static,
1425 {
1426 let Some(logger) = self.compute_logger.clone() else {
1427 return stream.clone(); };
1429
1430 let export_ids = self.export_ids.clone();
1431
1432 let mut hydration_frontier = Antichain::new();
1440 for time in self.as_of_frontier.iter() {
1441 if let Some(time) = time.try_step_forward() {
1442 hydration_frontier.insert(Refines::to_inner(time));
1443 }
1444 }
1445
1446 let name = format!("LogOperatorHydration ({lir_id})");
1447 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1448 let mut hydrated = false;
1449
1450 for &export_id in &export_ids {
1451 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1452 export_id,
1453 lir_id,
1454 hydrated,
1455 }));
1456 }
1457
1458 move |(input, frontier), output| {
1459 input.for_each(|cap, data| {
1461 output.session(&cap).give_container(data);
1462 });
1463
1464 if hydrated {
1465 return;
1466 }
1467
1468 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1469 hydrated = true;
1470
1471 for &export_id in &export_ids {
1472 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1473 export_id,
1474 lir_id,
1475 hydrated,
1476 }));
1477 }
1478 }
1479 }
1480 })
1481 }
1482}
1483
1484#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1487 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1492 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1494 fn event_time(&self) -> mz_repr::Timestamp;
1496 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1498 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1500 fn step_back(&self) -> Self;
1503}
1504
1505impl RenderTimestamp for mz_repr::Timestamp {
1506 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1507 self
1508 }
1509 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1510 delay
1511 }
1512 fn event_time(&self) -> mz_repr::Timestamp {
1513 *self
1514 }
1515 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1516 self
1517 }
1518 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1519 delay
1520 }
1521 fn step_back(&self) -> Self {
1522 self.saturating_sub(1)
1523 }
1524}
1525
1526impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1527 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1528 &mut self.outer
1529 }
1530 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1531 Product::new(delay, Default::default())
1532 }
1533 fn event_time(&self) -> mz_repr::Timestamp {
1534 self.outer
1535 }
1536 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1537 &mut self.outer
1538 }
1539 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1540 Product::new(delay, Default::default())
1541 }
1542 fn step_back(&self) -> Self {
1543 let inner = self.inner.clone();
1547 let mut vec = inner.into_inner();
1548 for item in vec.iter_mut() {
1549 *item = item.saturating_sub(1);
1550 }
1551 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1552 }
1553}
1554
1555#[derive(Clone)]
1565pub(crate) struct StartSignal {
1566 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1571 token_ref: Weak<RefCell<Box<dyn Any>>>,
1573}
1574
1575impl StartSignal {
1576 pub fn new() -> (Self, Rc<dyn Any>) {
1579 let (tx, rx) = oneshot::channel::<Infallible>();
1580 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1581 let signal = Self {
1582 fut: rx.shared(),
1583 token_ref: Rc::downgrade(&token),
1584 };
1585 (signal, token)
1586 }
1587
1588 pub fn has_fired(&self) -> bool {
1589 self.token_ref.strong_count() == 0
1590 }
1591
1592 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1593 if let Some(token) = self.token_ref.upgrade() {
1594 let mut token = token.borrow_mut();
1595 let inner = std::mem::replace(&mut *token, Box::new(()));
1596 *token = Box::new((inner, to_drop));
1597 }
1598 }
1599}
1600
1601impl Future for StartSignal {
1602 type Output = ();
1603
1604 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1605 self.fut.poll_unpin(cx).map(|_| ())
1606 }
1607}
1608
1609pub(crate) trait WithStartSignal {
1611 fn with_start_signal(self, signal: StartSignal) -> Self;
1616}
1617
1618impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1619where
1620 S: Scope,
1621 S::Timestamp: RenderTimestamp,
1622 Tr: TraceReader + Clone,
1623{
1624 fn with_start_signal(self, signal: StartSignal) -> Self {
1625 Arranged {
1626 stream: self.stream.with_start_signal(signal),
1627 trace: self.trace,
1628 }
1629 }
1630}
1631
1632impl<S, D> WithStartSignal for Stream<S, D>
1633where
1634 S: Scope,
1635 D: timely::Container + Clone + 'static,
1636{
1637 fn with_start_signal(self, signal: StartSignal) -> Self {
1638 let activations = self.scope().activations();
1639 self.unary(Pipeline, "StartSignal", |_cap, info| {
1640 let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1641 signal.drop_on_fire(token);
1642
1643 let mut stash = Vec::new();
1644
1645 move |input, output| {
1646 if !signal.has_fired() {
1648 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1649 return;
1650 }
1651
1652 for (cap, mut data) in std::mem::take(&mut stash) {
1654 output.session(&cap).give_container(&mut data);
1655 }
1656
1657 input.for_each(|cap, data| {
1659 output.session(&cap).give_container(data);
1660 });
1661 }
1662 })
1663 }
1664}
1665
1666fn suppress_early_progress<G, D>(
1688 stream: Stream<G, D>,
1689 as_of: Antichain<G::Timestamp>,
1690) -> Stream<G, D>
1691where
1692 G: Scope,
1693 D: Data + timely::Container,
1694{
1695 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1696 let mut early_cap = Some(default_cap);
1697
1698 move |(input, frontier), output| {
1699 input.for_each_time(|data_cap, data| {
1700 if as_of.less_than(data_cap.time()) {
1701 let mut session = output.session(&data_cap);
1702 for data in data {
1703 session.give_container(data);
1704 }
1705 } else {
1706 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1707 let mut session = output.session(&cap);
1708 for data in data {
1709 session.give_container(data);
1710 }
1711 }
1712 });
1713
1714 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1715 early_cap.take();
1716 }
1717 }
1718 })
1719}
1720
1721trait LimitProgress<T: Timestamp> {
1723 fn limit_progress(
1751 self,
1752 handle: MzProbeHandle<T>,
1753 slack_ms: u64,
1754 limit: Option<usize>,
1755 upper: Antichain<T>,
1756 name: String,
1757 ) -> Self;
1758}
1759
1760impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamVec<G, (D, mz_repr::Timestamp, R)>
1763where
1764 G: Scope<Timestamp = mz_repr::Timestamp>,
1765 D: Clone + 'static,
1766 R: Clone + 'static,
1767{
1768 fn limit_progress(
1769 self,
1770 handle: MzProbeHandle<mz_repr::Timestamp>,
1771 slack_ms: u64,
1772 limit: Option<usize>,
1773 upper: Antichain<mz_repr::Timestamp>,
1774 name: String,
1775 ) -> Self {
1776 let scope = self.scope();
1777 let stream =
1778 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1779 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1781 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1783
1784 let activator = scope.activator_for(info.address);
1785 handle.activate(activator.clone());
1786
1787 move |(input, frontier), output| {
1788 input.for_each(|cap, data| {
1789 for time in data
1790 .iter()
1791 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1792 {
1793 let rounded_time =
1794 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1795 if !upper.less_than(&rounded_time.into()) {
1796 pending_times.insert(rounded_time.into());
1797 }
1798 }
1799 output.session(&cap).give_container(data);
1800 if retained_cap.as_ref().is_none_or(|c| {
1801 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1802 }) {
1803 retained_cap = Some(cap.retain(0));
1804 }
1805 });
1806
1807 handle.with_frontier(|f| {
1808 while pending_times
1809 .first()
1810 .map_or(false, |retained_time| !f.less_than(&retained_time))
1811 {
1812 let _ = pending_times.pop_first();
1813 }
1814 });
1815
1816 while limit.map_or(false, |limit| pending_times.len() > limit) {
1817 let _ = pending_times.pop_first();
1818 }
1819
1820 match (retained_cap.as_mut(), pending_times.first()) {
1821 (Some(cap), Some(first)) => cap.downgrade(first),
1822 (_, None) => retained_cap = None,
1823 _ => {}
1824 }
1825
1826 if frontier.is_empty() {
1827 retained_cap = None;
1828 pending_times.clear();
1829 }
1830
1831 if !pending_times.is_empty() {
1832 tracing::debug!(
1833 name,
1834 info.global_id,
1835 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1836 frontier = ?frontier.frontier().get(0),
1837 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1838 ?upper,
1839 "pending times",
1840 );
1841 }
1842 }
1843 });
1844 stream
1845 }
1846}
1847
1848struct PendingTimesDisplay<T>(T);
1851
1852impl<T> std::fmt::Display for PendingTimesDisplay<T>
1853where
1854 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1855{
1856 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1857 let mut iter = self.0.clone().into_iter();
1858 write!(f, "[")?;
1859 if let Some(first) = iter.next() {
1860 write!(f, "{}", first)?;
1861 let mut last = u64::from(first);
1862 for time in iter {
1863 write!(f, ", +{}", u64::from(time) - last)?;
1864 last = u64::from(time);
1865 }
1866 }
1867 write!(f, "]")?;
1868 Ok(())
1869 }
1870}
1871
1872#[derive(Clone, Copy, Debug)]
1875struct Pairer {
1876 split_arity: usize,
1877}
1878
1879impl Pairer {
1880 fn new(split_arity: usize) -> Self {
1882 Self { split_arity }
1883 }
1884
1885 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1887 where
1888 I1: IntoIterator<Item = Datum<'a>>,
1889 I2: IntoIterator<Item = Datum<'a>>,
1890 {
1891 SharedRow::pack(first.into_iter().chain(second))
1892 }
1893
1894 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1896 let mut datum_iter = datum_iter.into_iter();
1897 let mut row_builder = SharedRow::get();
1898 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1899 let second = row_builder.pack_using(datum_iter);
1900 (first, second)
1901 }
1902}