1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, SharedRow, SqlRelationType};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use mz_timely_util::scope_label::ScopeExt;
141use timely::PartialOrder;
142use timely::communication::Allocate;
143use timely::container::CapacityContainerBuilder;
144use timely::dataflow::channels::pact::Pipeline;
145use timely::dataflow::operators::to_stream::ToStream;
146use timely::dataflow::operators::{BranchWhen, Capability, Filter, Operator, Probe, probe};
147use timely::dataflow::scopes::Child;
148use timely::dataflow::{Scope, Stream, StreamCore};
149use timely::order::{Product, TotalOrder};
150use timely::progress::timestamp::Refines;
151use timely::progress::{Antichain, Timestamp};
152use timely::scheduling::ActivateOnDrop;
153use timely::worker::{AsWorker, Worker as TimelyWorker};
154
155use crate::arrangement::manager::TraceBundle;
156use crate::compute_state::ComputeState;
157use crate::extensions::arrange::{KeyCollection, MzArrange};
158use crate::extensions::reduce::MzReduce;
159use crate::extensions::temporal_bucket::TemporalBucketing;
160use crate::logging::compute::{
161 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
162};
163use crate::render::context::{ArrangementFlavor, Context};
164use crate::render::continual_task::ContinualTaskCtx;
165use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
166use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
167
168pub mod context;
169pub(crate) mod continual_task;
170mod errors;
171mod flat_map;
172mod join;
173mod reduce;
174pub mod sinks;
175mod threshold;
176mod top_k;
177
178pub use context::CollectionBundle;
179pub use join::LinearJoinSpec;
180
181pub fn build_compute_dataflow<A: Allocate>(
187 timely_worker: &mut TimelyWorker<A>,
188 compute_state: &mut ComputeState,
189 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
190 start_signal: StartSignal,
191 until: Antichain<mz_repr::Timestamp>,
192 dataflow_expiration: Antichain<mz_repr::Timestamp>,
193) {
194 let recursive = dataflow
196 .objects_to_build
197 .iter()
198 .any(|object| object.plan.is_recursive());
199
200 let indexes = dataflow
202 .index_exports
203 .iter()
204 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
205 .collect::<Vec<_>>();
206
207 let sinks = dataflow
209 .sink_exports
210 .iter()
211 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
212 .collect::<Vec<_>>();
213
214 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
215 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
216 let subscribe_snapshot_optimization =
217 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
218
219 let name = format!("Dataflow: {}", &dataflow.debug_name);
220 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
221 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
222
223 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
224 let scope = &mut scope.with_label();
225
226 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
229
230 let mut imported_sources = Vec::new();
235 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
236 let output_probe = MzProbeHandle::default();
237
238 scope.clone().region_named(&input_name, |region| {
239 for (source_id, import) in dataflow.source_imports.iter() {
241 region.region_named(&format!("Source({:?})", source_id), |inner| {
242 let mut read_schema = None;
243 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
244 if apply_demands {
247 let demands = ops.demand();
248 let new_desc = import
249 .desc
250 .storage_metadata
251 .relation_desc
252 .apply_demand(&demands);
253 let new_arity = demands.len();
254 let remap: BTreeMap<_, _> = demands
255 .into_iter()
256 .enumerate()
257 .map(|(new, old)| (old, new))
258 .collect();
259 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
260 read_schema = Some(new_desc);
261 }
262
263 mz_expr::MfpPlan::create_from(ops)
264 .expect("Linear operators should always be valid")
265 });
266
267 let mut snapshot_mode =
268 if import.with_snapshot || !subscribe_snapshot_optimization {
269 SnapshotMode::Include
270 } else {
271 compute_state.metrics.inc_subscribe_snapshot_optimization();
272 SnapshotMode::Exclude
273 };
274 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
275 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
276 if let Some(x) = ct_source_transformer.as_ref() {
277 snapshot_mode = x.snapshot_mode();
278 suppress_early_progress_as_of = suppress_early_progress_as_of
279 .map(|as_of| x.suppress_early_progress_as_of(as_of));
280 }
281
282 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
285 inner,
286 *source_id,
287 Arc::clone(&compute_state.persist_clients),
288 &compute_state.txns_ctx,
289 import.desc.storage_metadata.clone(),
290 read_schema,
291 dataflow.as_of.clone(),
292 snapshot_mode,
293 until.clone(),
294 mfp.as_mut(),
295 compute_state.dataflow_max_inflight_bytes(),
296 start_signal.clone(),
297 ErrorHandler::Halt("compute_import"),
298 );
299
300 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
303
304 if let Some(as_of) = suppress_early_progress_as_of {
308 ok_stream = suppress_early_progress(ok_stream, as_of);
309 }
310
311 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
312 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
314 .get(&compute_state.worker_config);
315 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
316 .get(&compute_state.worker_config)
317 .as_millis()
318 .try_into()
319 .expect("must fit");
320
321 let stream = ok_stream.limit_progress(
322 output_probe.clone(),
323 slack,
324 limit,
325 import.upper.clone(),
326 name.clone(),
327 );
328 ok_stream = stream;
329 }
330
331 let input_probe =
333 compute_state.input_probe_for(*source_id, dataflow.export_ids());
334 ok_stream = ok_stream.probe_with(&input_probe);
335
336 let (ok_stream, err_stream) = match ct_source_transformer {
340 None => (ok_stream, err_stream),
341 Some(ct_source_transformer) => {
342 let (oks, errs, ct_times) = ct_source_transformer
343 .transform(ok_stream.as_collection(), err_stream.as_collection());
344 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
347 (oks.inner, errs.inner)
348 }
349 };
350
351 let (oks, errs) = (
352 ok_stream.as_collection().leave_region().leave_region(),
353 err_stream.as_collection().leave_region().leave_region(),
354 );
355
356 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
357
358 tokens.insert(*source_id, Rc::new(token));
360 });
361 }
362 });
363
364 if recursive {
367 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
368 let mut context = Context::for_dataflow_in(
369 &dataflow,
370 region.clone(),
371 compute_state,
372 until,
373 dataflow_expiration,
374 );
375
376 for (id, (oks, errs)) in imported_sources.into_iter() {
377 let bundle = crate::render::CollectionBundle::from_collections(
378 oks.enter(region),
379 errs.enter(region),
380 );
381 context.insert_id(id, bundle);
383 }
384
385 for (idx_id, idx) in &dataflow.index_imports {
387 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
388 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
389 SnapshotMode::Include
390 } else {
391 compute_state.metrics.inc_subscribe_snapshot_optimization();
392 SnapshotMode::Exclude
393 };
394 context.import_index(
395 compute_state,
396 &mut tokens,
397 input_probe,
398 *idx_id,
399 &idx.desc,
400 &idx.typ,
401 snapshot_mode,
402 start_signal.clone(),
403 );
404 }
405
406 for object in dataflow.objects_to_build {
408 let bundle = context.scope.clone().region_named(
409 &format!("BuildingObject({:?})", object.id),
410 |region| {
411 let depends = object.plan.depends();
412 let in_let = object.plan.is_recursive();
413 context
414 .enter_region(region, Some(&depends))
415 .render_recursive_plan(
416 object.id,
417 0,
418 object.plan,
419 BindingInfo::Body { in_let },
421 )
422 .leave_region()
423 },
424 );
425 let global_id = object.id;
426
427 context.log_dataflow_global_id(
428 *bundle
429 .scope()
430 .addr()
431 .first()
432 .expect("Dataflow root id must exist"),
433 global_id,
434 );
435 context.insert_id(Id::Global(object.id), bundle);
436 }
437
438 for (idx_id, dependencies, idx) in indexes {
440 context.export_index_iterative(
441 compute_state,
442 &tokens,
443 dependencies,
444 idx_id,
445 &idx,
446 &output_probe,
447 );
448 }
449
450 for (sink_id, dependencies, sink) in sinks {
452 context.export_sink(
453 compute_state,
454 &tokens,
455 dependencies,
456 sink_id,
457 &sink,
458 start_signal.clone(),
459 ct_ctx.input_times(&context.scope.parent),
460 &output_probe,
461 );
462 }
463 });
464 } else {
465 scope.clone().region_named(&build_name, |region| {
466 let mut context = Context::for_dataflow_in(
467 &dataflow,
468 region.clone(),
469 compute_state,
470 until,
471 dataflow_expiration,
472 );
473
474 for (id, (oks, errs)) in imported_sources.into_iter() {
475 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
476 let as_of = context.as_of_frontier.clone();
477 let summary = TEMPORAL_BUCKETING_SUMMARY
478 .get(&compute_state.worker_config)
479 .try_into()
480 .expect("must fit");
481 oks.inner
482 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
483 .as_collection()
484 } else {
485 oks
486 };
487 let bundle = crate::render::CollectionBundle::from_collections(
488 oks.enter_region(region),
489 errs.enter_region(region),
490 );
491 context.insert_id(id, bundle);
493 }
494
495 for (idx_id, idx) in &dataflow.index_imports {
497 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
498 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
499 SnapshotMode::Include
500 } else {
501 compute_state.metrics.inc_subscribe_snapshot_optimization();
502 SnapshotMode::Exclude
503 };
504 context.import_index(
505 compute_state,
506 &mut tokens,
507 input_probe,
508 *idx_id,
509 &idx.desc,
510 &idx.typ,
511 snapshot_mode,
512 start_signal.clone(),
513 );
514 }
515
516 for object in dataflow.objects_to_build {
518 let bundle = context.scope.clone().region_named(
519 &format!("BuildingObject({:?})", object.id),
520 |region| {
521 let depends = object.plan.depends();
522 context
523 .enter_region(region, Some(&depends))
524 .render_plan(object.id, object.plan)
525 .leave_region()
526 },
527 );
528 let global_id = object.id;
529 context.log_dataflow_global_id(
530 *bundle
531 .scope()
532 .addr()
533 .first()
534 .expect("Dataflow root id must exist"),
535 global_id,
536 );
537 context.insert_id(Id::Global(object.id), bundle);
538 }
539
540 for (idx_id, dependencies, idx) in indexes {
542 context.export_index(
543 compute_state,
544 &tokens,
545 dependencies,
546 idx_id,
547 &idx,
548 &output_probe,
549 );
550 }
551
552 for (sink_id, dependencies, sink) in sinks {
554 context.export_sink(
555 compute_state,
556 &tokens,
557 dependencies,
558 sink_id,
559 &sink,
560 start_signal.clone(),
561 ct_ctx.input_times(&context.scope.parent),
562 &output_probe,
563 );
564 }
565 });
566 }
567 });
568}
569
570impl<'g, G, T> Context<Child<'g, G, T>>
573where
574 G: Scope<Timestamp = mz_repr::Timestamp>,
575 T: Refines<G::Timestamp> + RenderTimestamp,
576{
577 fn import_filtered_index_collection<Tr: TraceReader<Time = G::Timestamp> + Clone, V: Data>(
581 &self,
582 arranged: Arranged<G, Tr>,
583 start_signal: StartSignal,
584 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
585 ) -> VecCollection<Child<'g, G, T>, V, Tr::Diff>
586 where
587 G::Timestamp: TotalOrder,
590 {
591 let oks = arranged.stream.with_start_signal(start_signal).filter({
592 let as_of = self.as_of_frontier.clone();
593 move |b| !<Antichain<G::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
594 });
595 Arranged::<G, Tr>::flat_map_batches(&oks, move |a, b| [logic(a, b)]).enter(&self.scope)
596 }
597
598 pub(crate) fn import_index(
599 &mut self,
600 compute_state: &mut ComputeState,
601 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
602 input_probe: probe::Handle<mz_repr::Timestamp>,
603 idx_id: GlobalId,
604 idx: &IndexDesc,
605 typ: &SqlRelationType,
606 snapshot_mode: SnapshotMode,
607 start_signal: StartSignal,
608 ) {
609 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
610 assert!(
611 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
612 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
613 );
614
615 let token = traces.to_drop().clone();
616
617 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
618 &self.scope.parent,
619 &format!("Index({}, {:?})", idx.on_id, idx.key),
620 self.as_of_frontier.clone(),
621 self.until.clone(),
622 );
623
624 oks.stream = oks.stream.probe_with(&input_probe);
625
626 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
627 &self.scope.parent,
628 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
629 self.as_of_frontier.clone(),
630 self.until.clone(),
631 );
632
633 let bundle = match snapshot_mode {
634 SnapshotMode::Include => {
635 let ok_arranged = oks
636 .enter(&self.scope)
637 .with_start_signal(start_signal.clone());
638 let err_arranged = err_arranged
639 .enter(&self.scope)
640 .with_start_signal(start_signal);
641 CollectionBundle::from_expressions(
642 idx.key.clone(),
643 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
644 )
645 }
646 SnapshotMode::Exclude => {
647 let oks = {
654 let mut datums = DatumVec::new();
655 let (permutation, _thinning) =
656 permutation_for_arrangement(&idx.key, typ.arity());
657 self.import_filtered_index_collection(
658 oks,
659 start_signal.clone(),
660 move |k: DatumSeq, v: DatumSeq| {
661 let mut datums_borrow = datums.borrow();
662 datums_borrow.extend(k);
663 datums_borrow.extend(v);
664 SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
665 },
666 )
667 };
668 let errs = self.import_filtered_index_collection(
669 err_arranged,
670 start_signal,
671 |e, _| e.clone(),
672 );
673 CollectionBundle::from_collections(oks, errs)
674 }
675 };
676 self.update_id(Id::Global(idx.on_id), bundle);
677 tokens.insert(
678 idx_id,
679 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
680 );
681 } else {
682 panic!(
683 "import of index {} failed while building dataflow {}",
684 idx_id, self.dataflow_id
685 );
686 }
687 }
688}
689
690impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
693where
694 G: Scope<Timestamp = mz_repr::Timestamp>,
695{
696 pub(crate) fn export_index(
697 &self,
698 compute_state: &mut ComputeState,
699 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
700 dependency_ids: BTreeSet<GlobalId>,
701 idx_id: GlobalId,
702 idx: &IndexDesc,
703 output_probe: &MzProbeHandle<G::Timestamp>,
704 ) {
705 let mut needed_tokens = Vec::new();
707 for dep_id in dependency_ids {
708 if let Some(token) = tokens.get(&dep_id) {
709 needed_tokens.push(Rc::clone(token));
710 }
711 }
712 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
713 panic!(
714 "Arrangement alarmingly absent! id: {:?}",
715 Id::Global(idx_id)
716 )
717 });
718
719 match bundle.arrangement(&idx.key) {
720 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
721 if let Some(&expiration) = self.dataflow_expiration.as_option() {
724 oks.stream = oks.stream.expire_stream_at(
725 &format!("{}_export_index_oks", self.debug_name),
726 expiration,
727 );
728 errs.stream = errs.stream.expire_stream_at(
729 &format!("{}_export_index_errs", self.debug_name),
730 expiration,
731 );
732 }
733
734 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
735
736 if let Some(logger) = compute_state.compute_logger.clone() {
738 errs.stream.log_dataflow_errors(logger, idx_id);
739 }
740
741 compute_state.traces.set(
742 idx_id,
743 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
744 );
745 }
746 Some(ArrangementFlavor::Trace(gid, _, _)) => {
747 let trace = compute_state.traces.get(&gid).unwrap().clone();
750 compute_state.traces.set(idx_id, trace);
751 }
752 None => {
753 println!("collection available: {:?}", bundle.collection.is_none());
754 println!(
755 "keys available: {:?}",
756 bundle.arranged.keys().collect::<Vec<_>>()
757 );
758 panic!(
759 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
760 Id::Global(idx_id),
761 &idx.key
762 );
763 }
764 };
765 }
766}
767
768impl<'g, G, T> Context<Child<'g, G, T>>
771where
772 G: Scope<Timestamp = mz_repr::Timestamp>,
773 T: RenderTimestamp,
774{
775 pub(crate) fn export_index_iterative(
776 &self,
777 compute_state: &mut ComputeState,
778 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
779 dependency_ids: BTreeSet<GlobalId>,
780 idx_id: GlobalId,
781 idx: &IndexDesc,
782 output_probe: &MzProbeHandle<G::Timestamp>,
783 ) {
784 let mut needed_tokens = Vec::new();
786 for dep_id in dependency_ids {
787 if let Some(token) = tokens.get(&dep_id) {
788 needed_tokens.push(Rc::clone(token));
789 }
790 }
791 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
792 panic!(
793 "Arrangement alarmingly absent! id: {:?}",
794 Id::Global(idx_id)
795 )
796 });
797
798 match bundle.arrangement(&idx.key) {
799 Some(ArrangementFlavor::Local(oks, errs)) => {
800 let mut oks = oks
804 .as_collection(|k, v| (k.to_row(), v.to_row()))
805 .leave()
806 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
807 "Arrange export iterative",
808 );
809
810 let mut errs = errs
811 .as_collection(|k, v| (k.clone(), v.clone()))
812 .leave()
813 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
814 "Arrange export iterative err",
815 );
816
817 if let Some(&expiration) = self.dataflow_expiration.as_option() {
820 oks.stream = oks.stream.expire_stream_at(
821 &format!("{}_export_index_iterative_oks", self.debug_name),
822 expiration,
823 );
824 errs.stream = errs.stream.expire_stream_at(
825 &format!("{}_export_index_iterative_err", self.debug_name),
826 expiration,
827 );
828 }
829
830 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
831
832 if let Some(logger) = compute_state.compute_logger.clone() {
834 errs.stream.log_dataflow_errors(logger, idx_id);
835 }
836
837 compute_state.traces.set(
838 idx_id,
839 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
840 );
841 }
842 Some(ArrangementFlavor::Trace(gid, _, _)) => {
843 let trace = compute_state.traces.get(&gid).unwrap().clone();
846 compute_state.traces.set(idx_id, trace);
847 }
848 None => {
849 println!("collection available: {:?}", bundle.collection.is_none());
850 println!(
851 "keys available: {:?}",
852 bundle.arranged.keys().collect::<Vec<_>>()
853 );
854 panic!(
855 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
856 Id::Global(idx_id),
857 &idx.key
858 );
859 }
860 };
861 }
862}
863
864enum BindingInfo {
870 Body { in_let: bool },
871 Let { id: LocalId, last: bool },
872 LetRec { id: LocalId, last: bool },
873}
874
875impl<G> Context<G>
876where
877 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
878{
879 fn render_recursive_plan(
892 &mut self,
893 object_id: GlobalId,
894 level: usize,
895 plan: RenderPlan,
896 binding: BindingInfo,
897 ) -> CollectionBundle<G> {
898 for BindStage { lets, recs } in plan.binds {
899 let mut let_iter = lets.into_iter().peekable();
901 while let Some(LetBind { id, value }) = let_iter.next() {
902 let bundle =
903 self.scope
904 .clone()
905 .region_named(&format!("Binding({:?})", id), |region| {
906 let depends = value.depends();
907 let last = let_iter.peek().is_none();
908 let binding = BindingInfo::Let { id, last };
909 self.enter_region(region, Some(&depends))
910 .render_letfree_plan(object_id, value, binding)
911 .leave_region()
912 });
913 self.insert_id(Id::Local(id), bundle);
914 }
915
916 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
917
918 let mut variables = BTreeMap::new();
922 for id in rec_ids.iter() {
923 use differential_dataflow::dynamic::feedback_summary;
924 let inner = feedback_summary::<u64>(level + 1, 1);
925 let oks_v = SemigroupVariable::new(
926 &mut self.scope,
927 Product::new(Default::default(), inner.clone()),
928 );
929 let err_v = SemigroupVariable::new(
930 &mut self.scope,
931 Product::new(Default::default(), inner),
932 );
933
934 self.insert_id(
935 Id::Local(*id),
936 CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
937 );
938 variables.insert(Id::Local(*id), (oks_v, err_v));
939 }
940 let mut rec_iter = recs.into_iter().peekable();
942 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
943 let last = rec_iter.peek().is_none();
944 let binding = BindingInfo::LetRec { id, last };
945 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
946 let (oks, mut err) = bundle.collection.clone().unwrap();
949 self.insert_id(Id::Local(id), bundle);
950 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
951
952 let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
954
955 if let Some(limit) = limit {
956 let (in_limit, over_limit) =
959 oks.inner.branch_when(move |Product { inner: ps, .. }| {
960 let iteration_index = *ps.get(level).unwrap_or(&0);
962 iteration_index + 1 >= limit.max_iters.into()
964 });
965 oks = VecCollection::new(in_limit);
966 if !limit.return_at_limit {
967 err = err.concat(&VecCollection::new(over_limit).map(move |_data| {
968 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
969 format!("{}", limit.max_iters.get()).into(),
970 )))
971 }));
972 }
973 }
974
975 let err: KeyCollection<_, _, _> = err.into();
981 let errs = err
982 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
983 "Arrange recursive err",
984 )
985 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
986 "Distinct recursive err",
987 move |_k, _s, t| t.push(((), Diff::ONE)),
988 )
989 .as_collection(|k, _| k.clone());
990
991 oks_v.set(&oks);
992 err_v.set(&errs);
993 }
994 for id in rec_ids.into_iter() {
996 let bundle = self.remove_id(Id::Local(id)).unwrap();
997 let (oks, err) = bundle.collection.unwrap();
998 self.insert_id(
999 Id::Local(id),
1000 CollectionBundle::from_collections(
1001 oks.leave_dynamic(level + 1),
1002 err.leave_dynamic(level + 1),
1003 ),
1004 );
1005 }
1006 }
1007
1008 self.render_letfree_plan(object_id, plan.body, binding)
1009 }
1010}
1011
1012impl<G> Context<G>
1013where
1014 G: Scope,
1015 G::Timestamp: RenderTimestamp,
1016{
1017 fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
1028 let mut in_let = false;
1029 for BindStage { lets, recs } in plan.binds {
1030 assert!(recs.is_empty());
1031
1032 let mut let_iter = lets.into_iter().peekable();
1033 while let Some(LetBind { id, value }) = let_iter.next() {
1034 in_let = true;
1036 let bundle =
1037 self.scope
1038 .clone()
1039 .region_named(&format!("Binding({:?})", id), |region| {
1040 let depends = value.depends();
1041 let last = let_iter.peek().is_none();
1042 let binding = BindingInfo::Let { id, last };
1043 self.enter_region(region, Some(&depends))
1044 .render_letfree_plan(object_id, value, binding)
1045 .leave_region()
1046 });
1047 self.insert_id(Id::Local(id), bundle);
1048 }
1049 }
1050
1051 self.scope.clone().region_named("Main Body", |region| {
1052 let depends = plan.body.depends();
1053 self.enter_region(region, Some(&depends))
1054 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1055 .leave_region()
1056 })
1057 }
1058
1059 fn render_letfree_plan(
1061 &mut self,
1062 object_id: GlobalId,
1063 plan: LetFreePlan,
1064 binding: BindingInfo,
1065 ) -> CollectionBundle<G> {
1066 let (mut nodes, root_id, topological_order) = plan.destruct();
1067
1068 let mut collections = BTreeMap::new();
1070
1071 let should_compute_lir_metadata = self.compute_logger.is_some();
1077 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1078 Some(Vec::with_capacity(nodes.len()))
1079 } else {
1080 None
1081 };
1082
1083 let mut topo_iter = topological_order.into_iter().peekable();
1084 while let Some(lir_id) = topo_iter.next() {
1085 let node = nodes.remove(&lir_id).unwrap();
1086
1087 let metadata = if should_compute_lir_metadata {
1091 let operator = node.expr.humanize(&DummyHumanizer);
1092
1093 let operator = if topo_iter.peek().is_none() {
1095 match &binding {
1096 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1097 BindingInfo::Body { in_let: false } => operator,
1098 BindingInfo::Let { id, last: true } => {
1099 format!("With {id} = {operator}")
1100 }
1101 BindingInfo::Let { id, last: false } => {
1102 format!("{id} = {operator}")
1103 }
1104 BindingInfo::LetRec { id, last: true } => {
1105 format!("With Recursive {id} = {operator}")
1106 }
1107 BindingInfo::LetRec { id, last: false } => {
1108 format!("{id} = {operator}")
1109 }
1110 }
1111 } else {
1112 operator
1113 };
1114
1115 let operator_id_start = self.scope.peek_identifier();
1116 Some((operator, operator_id_start))
1117 } else {
1118 None
1119 };
1120
1121 let mut bundle = self.render_plan_expr(node.expr, &collections);
1122
1123 if let Some((operator, operator_id_start)) = metadata {
1124 let operator_id_end = self.scope.peek_identifier();
1125 let operator_span = (operator_id_start, operator_id_end);
1126
1127 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1128 lir_mapping_metadata.push((
1129 lir_id,
1130 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1131 ))
1132 }
1133 }
1134
1135 self.log_operator_hydration(&mut bundle, lir_id);
1136
1137 collections.insert(lir_id, bundle);
1138 }
1139
1140 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1141 self.log_lir_mapping(object_id, lir_mapping_metadata);
1142 }
1143
1144 collections
1145 .remove(&root_id)
1146 .expect("LetFreePlan invariant (1)")
1147 }
1148
1149 fn render_plan_expr(
1156 &mut self,
1157 expr: render_plan::Expr,
1158 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1159 ) -> CollectionBundle<G> {
1160 use render_plan::Expr::*;
1161
1162 let expect_input = |id| {
1163 collections
1164 .get(&id)
1165 .cloned()
1166 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1167 };
1168
1169 match expr {
1170 Constant { rows } => {
1171 let (rows, errs) = match rows {
1173 Ok(rows) => (rows, Vec::new()),
1174 Err(e) => (Vec::new(), vec![e]),
1175 };
1176
1177 let as_of_frontier = self.as_of_frontier.clone();
1179 let until = self.until.clone();
1180 let ok_collection = rows
1181 .into_iter()
1182 .filter_map(move |(row, mut time, diff)| {
1183 time.advance_by(as_of_frontier.borrow());
1184 if !until.less_equal(&time) {
1185 Some((
1186 row,
1187 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1188 diff,
1189 ))
1190 } else {
1191 None
1192 }
1193 })
1194 .to_stream(&mut self.scope)
1195 .as_collection();
1196
1197 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1198 error_time.advance_by(self.as_of_frontier.borrow());
1199 let err_collection = errs
1200 .into_iter()
1201 .map(move |e| {
1202 (
1203 DataflowError::from(e),
1204 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1205 Diff::ONE,
1206 )
1207 })
1208 .to_stream(&mut self.scope)
1209 .as_collection();
1210
1211 CollectionBundle::from_collections(ok_collection, err_collection)
1212 }
1213 Get { id, keys, plan } => {
1214 let mut collection = self
1217 .lookup_id(id)
1218 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1219 match plan {
1220 mz_compute_types::plan::GetPlan::PassArrangements => {
1221 assert!(
1223 keys.arranged
1224 .iter()
1225 .all(|(key, _, _)| collection.arranged.contains_key(key))
1226 );
1227 assert!(keys.raw <= collection.collection.is_some());
1228 collection.arranged.retain(|key, _value| {
1230 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1231 });
1232 collection
1233 }
1234 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1235 let (oks, errs) = collection.as_collection_core(
1236 mfp,
1237 Some((key, row)),
1238 self.until.clone(),
1239 &self.config_set,
1240 );
1241 CollectionBundle::from_collections(oks, errs)
1242 }
1243 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1244 let (oks, errs) = collection.as_collection_core(
1245 mfp,
1246 None,
1247 self.until.clone(),
1248 &self.config_set,
1249 );
1250 CollectionBundle::from_collections(oks, errs)
1251 }
1252 }
1253 }
1254 Mfp {
1255 input,
1256 mfp,
1257 input_key_val,
1258 } => {
1259 let input = expect_input(input);
1260 if mfp.is_identity() {
1262 input
1263 } else {
1264 let (oks, errs) = input.as_collection_core(
1265 mfp,
1266 input_key_val,
1267 self.until.clone(),
1268 &self.config_set,
1269 );
1270 CollectionBundle::from_collections(oks, errs)
1271 }
1272 }
1273 FlatMap {
1274 input_key,
1275 input,
1276 exprs,
1277 func,
1278 mfp_after: mfp,
1279 } => {
1280 let input = expect_input(input);
1281 self.render_flat_map(input_key, input, exprs, func, mfp)
1282 }
1283 Join { inputs, plan } => {
1284 let inputs = inputs.into_iter().map(expect_input).collect();
1285 match plan {
1286 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1287 self.render_join(inputs, linear_plan)
1288 }
1289 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1290 self.render_delta_join(inputs, delta_plan)
1291 }
1292 }
1293 }
1294 Reduce {
1295 input_key,
1296 input,
1297 key_val_plan,
1298 plan,
1299 mfp_after,
1300 } => {
1301 let input = expect_input(input);
1302 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1303 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1304 }
1305 TopK { input, top_k_plan } => {
1306 let input = expect_input(input);
1307 self.render_topk(input, top_k_plan)
1308 }
1309 Negate { input } => {
1310 let input = expect_input(input);
1311 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1312 CollectionBundle::from_collections(oks.negate(), errs)
1313 }
1314 Threshold {
1315 input,
1316 threshold_plan,
1317 } => {
1318 let input = expect_input(input);
1319 self.render_threshold(input, threshold_plan)
1320 }
1321 Union {
1322 inputs,
1323 consolidate_output,
1324 } => {
1325 let mut oks = Vec::new();
1326 let mut errs = Vec::new();
1327 for input in inputs.into_iter() {
1328 let (os, es) =
1329 expect_input(input).as_specific_collection(None, &self.config_set);
1330 oks.push(os);
1331 errs.push(es);
1332 }
1333 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1334 if consolidate_output {
1335 oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1336 }
1337 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1338 CollectionBundle::from_collections(oks, errs)
1339 }
1340 ArrangeBy {
1341 input_key,
1342 input,
1343 input_mfp,
1344 forms: keys,
1345 } => {
1346 let input = expect_input(input);
1347 input.ensure_collections(
1348 keys,
1349 input_key,
1350 input_mfp,
1351 self.until.clone(),
1352 &self.config_set,
1353 )
1354 }
1355 }
1356 }
1357
1358 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1359 if let Some(logger) = &self.compute_logger {
1360 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1361 dataflow_index,
1362 global_id,
1363 }));
1364 }
1365 }
1366
1367 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1368 if let Some(logger) = &self.compute_logger {
1369 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1370 }
1371 }
1372
1373 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1374 match bundle.arranged.values_mut().next() {
1394 Some(arrangement) => {
1395 use ArrangementFlavor::*;
1396
1397 match arrangement {
1398 Local(a, _) => {
1399 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1400 }
1401 Trace(_, a, _) => {
1402 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1403 }
1404 }
1405 }
1406 None => {
1407 let (oks, _) = bundle
1408 .collection
1409 .as_mut()
1410 .expect("CollectionBundle invariant");
1411 let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1412 *oks = stream.as_collection();
1413 }
1414 }
1415 }
1416
1417 fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1418 where
1419 D: Clone + 'static,
1420 {
1421 let Some(logger) = self.compute_logger.clone() else {
1422 return stream.clone(); };
1424
1425 let export_ids = self.export_ids.clone();
1426
1427 let mut hydration_frontier = Antichain::new();
1435 for time in self.as_of_frontier.iter() {
1436 if let Some(time) = time.try_step_forward() {
1437 hydration_frontier.insert(Refines::to_inner(time));
1438 }
1439 }
1440
1441 let name = format!("LogOperatorHydration ({lir_id})");
1442 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1443 let mut hydrated = false;
1444
1445 for &export_id in &export_ids {
1446 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1447 export_id,
1448 lir_id,
1449 hydrated,
1450 }));
1451 }
1452
1453 move |(input, frontier), output| {
1454 input.for_each(|cap, data| {
1456 output.session(&cap).give_container(data);
1457 });
1458
1459 if hydrated {
1460 return;
1461 }
1462
1463 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1464 hydrated = true;
1465
1466 for &export_id in &export_ids {
1467 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1468 export_id,
1469 lir_id,
1470 hydrated,
1471 }));
1472 }
1473 }
1474 }
1475 })
1476 }
1477}
1478
1479#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1482 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1487 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1489 fn event_time(&self) -> mz_repr::Timestamp;
1491 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1493 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1495 fn step_back(&self) -> Self;
1498}
1499
1500impl RenderTimestamp for mz_repr::Timestamp {
1501 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1502 self
1503 }
1504 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1505 delay
1506 }
1507 fn event_time(&self) -> mz_repr::Timestamp {
1508 *self
1509 }
1510 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1511 self
1512 }
1513 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1514 delay
1515 }
1516 fn step_back(&self) -> Self {
1517 self.saturating_sub(1)
1518 }
1519}
1520
1521impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1522 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1523 &mut self.outer
1524 }
1525 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1526 Product::new(delay, Default::default())
1527 }
1528 fn event_time(&self) -> mz_repr::Timestamp {
1529 self.outer
1530 }
1531 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1532 &mut self.outer
1533 }
1534 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1535 Product::new(delay, Default::default())
1536 }
1537 fn step_back(&self) -> Self {
1538 let inner = self.inner.clone();
1542 let mut vec = inner.into_inner();
1543 for item in vec.iter_mut() {
1544 *item = item.saturating_sub(1);
1545 }
1546 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1547 }
1548}
1549
1550#[derive(Clone)]
1560pub(crate) struct StartSignal {
1561 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1566 token_ref: Weak<RefCell<Box<dyn Any>>>,
1568}
1569
1570impl StartSignal {
1571 pub fn new() -> (Self, Rc<dyn Any>) {
1574 let (tx, rx) = oneshot::channel::<Infallible>();
1575 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1576 let signal = Self {
1577 fut: rx.shared(),
1578 token_ref: Rc::downgrade(&token),
1579 };
1580 (signal, token)
1581 }
1582
1583 pub fn has_fired(&self) -> bool {
1584 self.token_ref.strong_count() == 0
1585 }
1586
1587 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1588 if let Some(token) = self.token_ref.upgrade() {
1589 let mut token = token.borrow_mut();
1590 let inner = std::mem::replace(&mut *token, Box::new(()));
1591 *token = Box::new((inner, to_drop));
1592 }
1593 }
1594}
1595
1596impl Future for StartSignal {
1597 type Output = ();
1598
1599 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1600 self.fut.poll_unpin(cx).map(|_| ())
1601 }
1602}
1603
1604pub(crate) trait WithStartSignal {
1606 fn with_start_signal(self, signal: StartSignal) -> Self;
1611}
1612
1613impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1614where
1615 S: Scope,
1616 S::Timestamp: RenderTimestamp,
1617 Tr: TraceReader + Clone,
1618{
1619 fn with_start_signal(self, signal: StartSignal) -> Self {
1620 Arranged {
1621 stream: self.stream.with_start_signal(signal),
1622 trace: self.trace,
1623 }
1624 }
1625}
1626
1627impl<S, D> WithStartSignal for Stream<S, D>
1628where
1629 S: Scope,
1630 D: timely::Data,
1631{
1632 fn with_start_signal(self, signal: StartSignal) -> Self {
1633 self.unary(Pipeline, "StartSignal", |_cap, info| {
1634 let token = Box::new(ActivateOnDrop::new(
1635 (),
1636 info.address,
1637 self.scope().activations(),
1638 ));
1639 signal.drop_on_fire(token);
1640
1641 let mut stash = Vec::new();
1642
1643 move |input, output| {
1644 if !signal.has_fired() {
1646 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1647 return;
1648 }
1649
1650 for (cap, mut data) in std::mem::take(&mut stash) {
1652 output.session(&cap).give_container(&mut data);
1653 }
1654
1655 input.for_each(|cap, data| {
1657 output.session(&cap).give_container(data);
1658 });
1659 }
1660 })
1661 }
1662}
1663
1664fn suppress_early_progress<G, D>(
1686 stream: Stream<G, D>,
1687 as_of: Antichain<G::Timestamp>,
1688) -> Stream<G, D>
1689where
1690 G: Scope,
1691 D: Data,
1692{
1693 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1694 let mut early_cap = Some(default_cap);
1695
1696 move |(input, frontier), output| {
1697 input.for_each_time(|data_cap, data| {
1698 if as_of.less_than(data_cap.time()) {
1699 let mut session = output.session(&data_cap);
1700 for data in data {
1701 session.give_container(data);
1702 }
1703 } else {
1704 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1705 let mut session = output.session(&cap);
1706 for data in data {
1707 session.give_container(data);
1708 }
1709 }
1710 });
1711
1712 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1713 early_cap.take();
1714 }
1715 }
1716 })
1717}
1718
1719trait LimitProgress<T: Timestamp> {
1721 fn limit_progress(
1749 &self,
1750 handle: MzProbeHandle<T>,
1751 slack_ms: u64,
1752 limit: Option<usize>,
1753 upper: Antichain<T>,
1754 name: String,
1755 ) -> Self;
1756}
1757
1758impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1761where
1762 G: Scope<Timestamp = mz_repr::Timestamp>,
1763 D: timely::Data,
1764 R: timely::Data,
1765{
1766 fn limit_progress(
1767 &self,
1768 handle: MzProbeHandle<mz_repr::Timestamp>,
1769 slack_ms: u64,
1770 limit: Option<usize>,
1771 upper: Antichain<mz_repr::Timestamp>,
1772 name: String,
1773 ) -> Self {
1774 let stream =
1775 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1776 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1778 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1780
1781 let activator = self.scope().activator_for(info.address);
1782 handle.activate(activator.clone());
1783
1784 move |(input, frontier), output| {
1785 input.for_each(|cap, data| {
1786 for time in data
1787 .iter()
1788 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1789 {
1790 let rounded_time =
1791 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1792 if !upper.less_than(&rounded_time.into()) {
1793 pending_times.insert(rounded_time.into());
1794 }
1795 }
1796 output.session(&cap).give_container(data);
1797 if retained_cap.as_ref().is_none_or(|c| {
1798 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1799 }) {
1800 retained_cap = Some(cap.retain());
1801 }
1802 });
1803
1804 handle.with_frontier(|f| {
1805 while pending_times
1806 .first()
1807 .map_or(false, |retained_time| !f.less_than(&retained_time))
1808 {
1809 let _ = pending_times.pop_first();
1810 }
1811 });
1812
1813 while limit.map_or(false, |limit| pending_times.len() > limit) {
1814 let _ = pending_times.pop_first();
1815 }
1816
1817 match (retained_cap.as_mut(), pending_times.first()) {
1818 (Some(cap), Some(first)) => cap.downgrade(first),
1819 (_, None) => retained_cap = None,
1820 _ => {}
1821 }
1822
1823 if frontier.is_empty() {
1824 retained_cap = None;
1825 pending_times.clear();
1826 }
1827
1828 if !pending_times.is_empty() {
1829 tracing::debug!(
1830 name,
1831 info.global_id,
1832 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1833 frontier = ?frontier.frontier().get(0),
1834 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1835 ?upper,
1836 "pending times",
1837 );
1838 }
1839 }
1840 });
1841 stream
1842 }
1843}
1844
1845struct PendingTimesDisplay<T>(T);
1848
1849impl<T> std::fmt::Display for PendingTimesDisplay<T>
1850where
1851 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1852{
1853 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1854 let mut iter = self.0.clone().into_iter();
1855 write!(f, "[")?;
1856 if let Some(first) = iter.next() {
1857 write!(f, "{}", first)?;
1858 let mut last = u64::from(first);
1859 for time in iter {
1860 write!(f, ", +{}", u64::from(time) - last)?;
1861 last = u64::from(time);
1862 }
1863 }
1864 write!(f, "]")?;
1865 Ok(())
1866 }
1867}
1868
1869#[derive(Clone, Copy, Debug)]
1872struct Pairer {
1873 split_arity: usize,
1874}
1875
1876impl Pairer {
1877 fn new(split_arity: usize) -> Self {
1879 Self { split_arity }
1880 }
1881
1882 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1884 where
1885 I1: IntoIterator<Item = Datum<'a>>,
1886 I2: IntoIterator<Item = Datum<'a>>,
1887 {
1888 SharedRow::pack(first.into_iter().chain(second))
1889 }
1890
1891 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1893 let mut datum_iter = datum_iter.into_iter();
1894 let mut row_builder = SharedRow::get();
1895 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1896 let second = row_builder.pack_using(datum_iter);
1897 (first, second)
1898 }
1899}