1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use timely::PartialOrder;
141use timely::communication::Allocate;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Filter, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::{Product, TotalOrder};
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::{AsWorker, Worker as TimelyWorker};
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180pub fn build_compute_dataflow<A: Allocate>(
186 timely_worker: &mut TimelyWorker<A>,
187 compute_state: &mut ComputeState,
188 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189 start_signal: StartSignal,
190 until: Antichain<mz_repr::Timestamp>,
191 dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193 let recursive = dataflow
195 .objects_to_build
196 .iter()
197 .any(|object| object.plan.is_recursive());
198
199 let indexes = dataflow
201 .index_exports
202 .iter()
203 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204 .collect::<Vec<_>>();
205
206 let sinks = dataflow
208 .sink_exports
209 .iter()
210 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211 .collect::<Vec<_>>();
212
213 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215 let subscribe_snapshot_optimization =
216 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
217
218 let name = format!("Dataflow: {}", &dataflow.debug_name);
219 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
220 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
221
222 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
223 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
226
227 let mut imported_sources = Vec::new();
232 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
233 let output_probe = MzProbeHandle::default();
234
235 scope.clone().region_named(&input_name, |region| {
236 for (source_id, import) in dataflow.source_imports.iter() {
238 region.region_named(&format!("Source({:?})", source_id), |inner| {
239 let mut read_schema = None;
240 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
241 if apply_demands {
244 let demands = ops.demand();
245 let new_desc = import
246 .desc
247 .storage_metadata
248 .relation_desc
249 .apply_demand(&demands);
250 let new_arity = demands.len();
251 let remap: BTreeMap<_, _> = demands
252 .into_iter()
253 .enumerate()
254 .map(|(new, old)| (old, new))
255 .collect();
256 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
257 read_schema = Some(new_desc);
258 }
259
260 mz_expr::MfpPlan::create_from(ops)
261 .expect("Linear operators should always be valid")
262 });
263
264 let mut snapshot_mode =
265 if import.with_snapshot || !subscribe_snapshot_optimization {
266 SnapshotMode::Include
267 } else {
268 compute_state.metrics.inc_subscribe_snapshot_optimization();
269 SnapshotMode::Exclude
270 };
271 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
272 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
273 if let Some(x) = ct_source_transformer.as_ref() {
274 snapshot_mode = x.snapshot_mode();
275 suppress_early_progress_as_of = suppress_early_progress_as_of
276 .map(|as_of| x.suppress_early_progress_as_of(as_of));
277 }
278
279 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
282 inner,
283 *source_id,
284 Arc::clone(&compute_state.persist_clients),
285 &compute_state.txns_ctx,
286 import.desc.storage_metadata.clone(),
287 read_schema,
288 dataflow.as_of.clone(),
289 snapshot_mode,
290 until.clone(),
291 mfp.as_mut(),
292 compute_state.dataflow_max_inflight_bytes(),
293 start_signal.clone(),
294 ErrorHandler::Halt("compute_import"),
295 );
296
297 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
300
301 if let Some(as_of) = suppress_early_progress_as_of {
305 ok_stream = suppress_early_progress(ok_stream, as_of);
306 }
307
308 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
309 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
311 .get(&compute_state.worker_config);
312 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
313 .get(&compute_state.worker_config)
314 .as_millis()
315 .try_into()
316 .expect("must fit");
317
318 let stream = ok_stream.limit_progress(
319 output_probe.clone(),
320 slack,
321 limit,
322 import.upper.clone(),
323 name.clone(),
324 );
325 ok_stream = stream;
326 }
327
328 let input_probe =
330 compute_state.input_probe_for(*source_id, dataflow.export_ids());
331 ok_stream = ok_stream.probe_with(&input_probe);
332
333 let (ok_stream, err_stream) = match ct_source_transformer {
337 None => (ok_stream, err_stream),
338 Some(ct_source_transformer) => {
339 let (oks, errs, ct_times) = ct_source_transformer
340 .transform(ok_stream.as_collection(), err_stream.as_collection());
341 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
344 (oks.inner, errs.inner)
345 }
346 };
347
348 let (oks, errs) = (
349 ok_stream.as_collection().leave_region().leave_region(),
350 err_stream.as_collection().leave_region().leave_region(),
351 );
352
353 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
354
355 tokens.insert(*source_id, Rc::new(token));
357 });
358 }
359 });
360
361 if recursive {
364 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
365 let mut context = Context::for_dataflow_in(
366 &dataflow,
367 region.clone(),
368 compute_state,
369 until,
370 dataflow_expiration,
371 );
372
373 for (id, (oks, errs)) in imported_sources.into_iter() {
374 let bundle = crate::render::CollectionBundle::from_collections(
375 oks.enter(region),
376 errs.enter(region),
377 );
378 context.insert_id(id, bundle);
380 }
381
382 for (idx_id, idx) in &dataflow.index_imports {
384 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
385 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
386 SnapshotMode::Include
387 } else {
388 compute_state.metrics.inc_subscribe_snapshot_optimization();
389 SnapshotMode::Exclude
390 };
391 context.import_index(
392 compute_state,
393 &mut tokens,
394 input_probe,
395 *idx_id,
396 &idx.desc,
397 snapshot_mode,
398 start_signal.clone(),
399 );
400 }
401
402 for object in dataflow.objects_to_build {
404 let bundle = context.scope.clone().region_named(
405 &format!("BuildingObject({:?})", object.id),
406 |region| {
407 let depends = object.plan.depends();
408 let in_let = object.plan.is_recursive();
409 context
410 .enter_region(region, Some(&depends))
411 .render_recursive_plan(
412 object.id,
413 0,
414 object.plan,
415 BindingInfo::Body { in_let },
417 )
418 .leave_region()
419 },
420 );
421 let global_id = object.id;
422
423 context.log_dataflow_global_id(
424 *bundle
425 .scope()
426 .addr()
427 .first()
428 .expect("Dataflow root id must exist"),
429 global_id,
430 );
431 context.insert_id(Id::Global(object.id), bundle);
432 }
433
434 for (idx_id, dependencies, idx) in indexes {
436 context.export_index_iterative(
437 compute_state,
438 &tokens,
439 dependencies,
440 idx_id,
441 &idx,
442 &output_probe,
443 );
444 }
445
446 for (sink_id, dependencies, sink) in sinks {
448 context.export_sink(
449 compute_state,
450 &tokens,
451 dependencies,
452 sink_id,
453 &sink,
454 start_signal.clone(),
455 ct_ctx.input_times(&context.scope.parent),
456 &output_probe,
457 );
458 }
459 });
460 } else {
461 scope.clone().region_named(&build_name, |region| {
462 let mut context = Context::for_dataflow_in(
463 &dataflow,
464 region.clone(),
465 compute_state,
466 until,
467 dataflow_expiration,
468 );
469
470 for (id, (oks, errs)) in imported_sources.into_iter() {
471 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
472 let as_of = context.as_of_frontier.clone();
473 let summary = TEMPORAL_BUCKETING_SUMMARY
474 .get(&compute_state.worker_config)
475 .try_into()
476 .expect("must fit");
477 oks.inner
478 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
479 .as_collection()
480 } else {
481 oks
482 };
483 let bundle = crate::render::CollectionBundle::from_collections(
484 oks.enter_region(region),
485 errs.enter_region(region),
486 );
487 context.insert_id(id, bundle);
489 }
490
491 for (idx_id, idx) in &dataflow.index_imports {
493 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
494 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
495 SnapshotMode::Include
496 } else {
497 compute_state.metrics.inc_subscribe_snapshot_optimization();
498 SnapshotMode::Exclude
499 };
500 context.import_index(
501 compute_state,
502 &mut tokens,
503 input_probe,
504 *idx_id,
505 &idx.desc,
506 snapshot_mode,
507 start_signal.clone(),
508 );
509 }
510
511 for object in dataflow.objects_to_build {
513 let bundle = context.scope.clone().region_named(
514 &format!("BuildingObject({:?})", object.id),
515 |region| {
516 let depends = object.plan.depends();
517 context
518 .enter_region(region, Some(&depends))
519 .render_plan(object.id, object.plan)
520 .leave_region()
521 },
522 );
523 let global_id = object.id;
524 context.log_dataflow_global_id(
525 *bundle
526 .scope()
527 .addr()
528 .first()
529 .expect("Dataflow root id must exist"),
530 global_id,
531 );
532 context.insert_id(Id::Global(object.id), bundle);
533 }
534
535 for (idx_id, dependencies, idx) in indexes {
537 context.export_index(
538 compute_state,
539 &tokens,
540 dependencies,
541 idx_id,
542 &idx,
543 &output_probe,
544 );
545 }
546
547 for (sink_id, dependencies, sink) in sinks {
549 context.export_sink(
550 compute_state,
551 &tokens,
552 dependencies,
553 sink_id,
554 &sink,
555 start_signal.clone(),
556 ct_ctx.input_times(&context.scope.parent),
557 &output_probe,
558 );
559 }
560 });
561 }
562 })
563}
564
565impl<'g, G, T> Context<Child<'g, G, T>>
568where
569 G: Scope<Timestamp = mz_repr::Timestamp>,
570 T: Refines<G::Timestamp> + RenderTimestamp,
571{
572 fn import_filtered_index_collection<Tr: TraceReader<Time = G::Timestamp> + Clone, V: Data>(
576 &self,
577 arranged: Arranged<G, Tr>,
578 start_signal: StartSignal,
579 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
580 ) -> VecCollection<Child<'g, G, T>, V, Tr::Diff>
581 where
582 G::Timestamp: TotalOrder,
585 {
586 let oks = arranged.stream.with_start_signal(start_signal).filter({
587 let as_of = self.as_of_frontier.clone();
588 move |b| !<Antichain<G::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
589 });
590 Arranged::<G, Tr>::flat_map_batches(&oks, move |a, b| [logic(a, b)]).enter(&self.scope)
591 }
592
593 pub(crate) fn import_index(
594 &mut self,
595 compute_state: &mut ComputeState,
596 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
597 input_probe: probe::Handle<mz_repr::Timestamp>,
598 idx_id: GlobalId,
599 idx: &IndexDesc,
600 snapshot_mode: SnapshotMode,
601 start_signal: StartSignal,
602 ) {
603 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
604 assert!(
605 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
606 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
607 );
608
609 let token = traces.to_drop().clone();
610
611 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
612 &self.scope.parent,
613 &format!("Index({}, {:?})", idx.on_id, idx.key),
614 self.as_of_frontier.clone(),
615 self.until.clone(),
616 );
617
618 oks.stream = oks.stream.probe_with(&input_probe);
619
620 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
621 &self.scope.parent,
622 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
623 self.as_of_frontier.clone(),
624 self.until.clone(),
625 );
626
627 let bundle = match snapshot_mode {
628 SnapshotMode::Include => {
629 let ok_arranged = oks
630 .enter(&self.scope)
631 .with_start_signal(start_signal.clone());
632 let err_arranged = err_arranged
633 .enter(&self.scope)
634 .with_start_signal(start_signal);
635 CollectionBundle::from_expressions(
636 idx.key.clone(),
637 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
638 )
639 }
640 SnapshotMode::Exclude => {
641 let oks = {
648 let mut datums = DatumVec::new();
649 self.import_filtered_index_collection(
650 oks,
651 start_signal.clone(),
652 move |k: DatumSeq, v: DatumSeq| {
653 let mut datums_borrow = datums.borrow();
654 datums_borrow.extend(k);
655 datums_borrow.extend(v);
656 SharedRow::pack(&**datums_borrow)
657 },
658 )
659 };
660 let errs = self.import_filtered_index_collection(
661 err_arranged,
662 start_signal,
663 |e, _| e.clone(),
664 );
665 CollectionBundle::from_collections(oks, errs)
666 }
667 };
668 self.update_id(Id::Global(idx.on_id), bundle);
669 tokens.insert(
670 idx_id,
671 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
672 );
673 } else {
674 panic!(
675 "import of index {} failed while building dataflow {}",
676 idx_id, self.dataflow_id
677 );
678 }
679 }
680}
681
682impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
685where
686 G: Scope<Timestamp = mz_repr::Timestamp>,
687{
688 pub(crate) fn export_index(
689 &self,
690 compute_state: &mut ComputeState,
691 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
692 dependency_ids: BTreeSet<GlobalId>,
693 idx_id: GlobalId,
694 idx: &IndexDesc,
695 output_probe: &MzProbeHandle<G::Timestamp>,
696 ) {
697 let mut needed_tokens = Vec::new();
699 for dep_id in dependency_ids {
700 if let Some(token) = tokens.get(&dep_id) {
701 needed_tokens.push(Rc::clone(token));
702 }
703 }
704 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
705 panic!(
706 "Arrangement alarmingly absent! id: {:?}",
707 Id::Global(idx_id)
708 )
709 });
710
711 match bundle.arrangement(&idx.key) {
712 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
713 if let Some(&expiration) = self.dataflow_expiration.as_option() {
716 oks.stream = oks.stream.expire_stream_at(
717 &format!("{}_export_index_oks", self.debug_name),
718 expiration,
719 );
720 errs.stream = errs.stream.expire_stream_at(
721 &format!("{}_export_index_errs", self.debug_name),
722 expiration,
723 );
724 }
725
726 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
727
728 if let Some(logger) = compute_state.compute_logger.clone() {
730 errs.stream.log_dataflow_errors(logger, idx_id);
731 }
732
733 compute_state.traces.set(
734 idx_id,
735 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
736 );
737 }
738 Some(ArrangementFlavor::Trace(gid, _, _)) => {
739 let trace = compute_state.traces.get(&gid).unwrap().clone();
742 compute_state.traces.set(idx_id, trace);
743 }
744 None => {
745 println!("collection available: {:?}", bundle.collection.is_none());
746 println!(
747 "keys available: {:?}",
748 bundle.arranged.keys().collect::<Vec<_>>()
749 );
750 panic!(
751 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
752 Id::Global(idx_id),
753 &idx.key
754 );
755 }
756 };
757 }
758}
759
760impl<'g, G, T> Context<Child<'g, G, T>>
763where
764 G: Scope<Timestamp = mz_repr::Timestamp>,
765 T: RenderTimestamp,
766{
767 pub(crate) fn export_index_iterative(
768 &self,
769 compute_state: &mut ComputeState,
770 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
771 dependency_ids: BTreeSet<GlobalId>,
772 idx_id: GlobalId,
773 idx: &IndexDesc,
774 output_probe: &MzProbeHandle<G::Timestamp>,
775 ) {
776 let mut needed_tokens = Vec::new();
778 for dep_id in dependency_ids {
779 if let Some(token) = tokens.get(&dep_id) {
780 needed_tokens.push(Rc::clone(token));
781 }
782 }
783 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
784 panic!(
785 "Arrangement alarmingly absent! id: {:?}",
786 Id::Global(idx_id)
787 )
788 });
789
790 match bundle.arrangement(&idx.key) {
791 Some(ArrangementFlavor::Local(oks, errs)) => {
792 let mut oks = oks
796 .as_collection(|k, v| (k.to_row(), v.to_row()))
797 .leave()
798 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
799 "Arrange export iterative",
800 );
801
802 let mut errs = errs
803 .as_collection(|k, v| (k.clone(), v.clone()))
804 .leave()
805 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
806 "Arrange export iterative err",
807 );
808
809 if let Some(&expiration) = self.dataflow_expiration.as_option() {
812 oks.stream = oks.stream.expire_stream_at(
813 &format!("{}_export_index_iterative_oks", self.debug_name),
814 expiration,
815 );
816 errs.stream = errs.stream.expire_stream_at(
817 &format!("{}_export_index_iterative_err", self.debug_name),
818 expiration,
819 );
820 }
821
822 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
823
824 if let Some(logger) = compute_state.compute_logger.clone() {
826 errs.stream.log_dataflow_errors(logger, idx_id);
827 }
828
829 compute_state.traces.set(
830 idx_id,
831 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
832 );
833 }
834 Some(ArrangementFlavor::Trace(gid, _, _)) => {
835 let trace = compute_state.traces.get(&gid).unwrap().clone();
838 compute_state.traces.set(idx_id, trace);
839 }
840 None => {
841 println!("collection available: {:?}", bundle.collection.is_none());
842 println!(
843 "keys available: {:?}",
844 bundle.arranged.keys().collect::<Vec<_>>()
845 );
846 panic!(
847 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
848 Id::Global(idx_id),
849 &idx.key
850 );
851 }
852 };
853 }
854}
855
856enum BindingInfo {
862 Body { in_let: bool },
863 Let { id: LocalId, last: bool },
864 LetRec { id: LocalId, last: bool },
865}
866
867impl<G> Context<G>
868where
869 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
870{
871 fn render_recursive_plan(
884 &mut self,
885 object_id: GlobalId,
886 level: usize,
887 plan: RenderPlan,
888 binding: BindingInfo,
889 ) -> CollectionBundle<G> {
890 for BindStage { lets, recs } in plan.binds {
891 let mut let_iter = lets.into_iter().peekable();
893 while let Some(LetBind { id, value }) = let_iter.next() {
894 let bundle =
895 self.scope
896 .clone()
897 .region_named(&format!("Binding({:?})", id), |region| {
898 let depends = value.depends();
899 let last = let_iter.peek().is_none();
900 let binding = BindingInfo::Let { id, last };
901 self.enter_region(region, Some(&depends))
902 .render_letfree_plan(object_id, value, binding)
903 .leave_region()
904 });
905 self.insert_id(Id::Local(id), bundle);
906 }
907
908 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
909
910 let mut variables = BTreeMap::new();
914 for id in rec_ids.iter() {
915 use differential_dataflow::dynamic::feedback_summary;
916 let inner = feedback_summary::<u64>(level + 1, 1);
917 let oks_v = SemigroupVariable::new(
918 &mut self.scope,
919 Product::new(Default::default(), inner.clone()),
920 );
921 let err_v = SemigroupVariable::new(
922 &mut self.scope,
923 Product::new(Default::default(), inner),
924 );
925
926 self.insert_id(
927 Id::Local(*id),
928 CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
929 );
930 variables.insert(Id::Local(*id), (oks_v, err_v));
931 }
932 let mut rec_iter = recs.into_iter().peekable();
934 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
935 let last = rec_iter.peek().is_none();
936 let binding = BindingInfo::LetRec { id, last };
937 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
938 let (oks, mut err) = bundle.collection.clone().unwrap();
941 self.insert_id(Id::Local(id), bundle);
942 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
943
944 let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
946
947 if let Some(limit) = limit {
948 let (in_limit, over_limit) =
951 oks.inner.branch_when(move |Product { inner: ps, .. }| {
952 let iteration_index = *ps.get(level).unwrap_or(&0);
954 iteration_index + 1 >= limit.max_iters.into()
956 });
957 oks = VecCollection::new(in_limit);
958 if !limit.return_at_limit {
959 err = err.concat(&VecCollection::new(over_limit).map(move |_data| {
960 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
961 format!("{}", limit.max_iters.get()).into(),
962 )))
963 }));
964 }
965 }
966
967 let err: KeyCollection<_, _, _> = err.into();
973 let errs = err
974 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
975 "Arrange recursive err",
976 )
977 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
978 "Distinct recursive err",
979 move |_k, _s, t| t.push(((), Diff::ONE)),
980 )
981 .as_collection(|k, _| k.clone());
982
983 oks_v.set(&oks);
984 err_v.set(&errs);
985 }
986 for id in rec_ids.into_iter() {
988 let bundle = self.remove_id(Id::Local(id)).unwrap();
989 let (oks, err) = bundle.collection.unwrap();
990 self.insert_id(
991 Id::Local(id),
992 CollectionBundle::from_collections(
993 oks.leave_dynamic(level + 1),
994 err.leave_dynamic(level + 1),
995 ),
996 );
997 }
998 }
999
1000 self.render_letfree_plan(object_id, plan.body, binding)
1001 }
1002}
1003
1004impl<G> Context<G>
1005where
1006 G: Scope,
1007 G::Timestamp: RenderTimestamp,
1008{
1009 fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
1020 let mut in_let = false;
1021 for BindStage { lets, recs } in plan.binds {
1022 assert!(recs.is_empty());
1023
1024 let mut let_iter = lets.into_iter().peekable();
1025 while let Some(LetBind { id, value }) = let_iter.next() {
1026 in_let = true;
1028 let bundle =
1029 self.scope
1030 .clone()
1031 .region_named(&format!("Binding({:?})", id), |region| {
1032 let depends = value.depends();
1033 let last = let_iter.peek().is_none();
1034 let binding = BindingInfo::Let { id, last };
1035 self.enter_region(region, Some(&depends))
1036 .render_letfree_plan(object_id, value, binding)
1037 .leave_region()
1038 });
1039 self.insert_id(Id::Local(id), bundle);
1040 }
1041 }
1042
1043 self.scope.clone().region_named("Main Body", |region| {
1044 let depends = plan.body.depends();
1045 self.enter_region(region, Some(&depends))
1046 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1047 .leave_region()
1048 })
1049 }
1050
1051 fn render_letfree_plan(
1053 &mut self,
1054 object_id: GlobalId,
1055 plan: LetFreePlan,
1056 binding: BindingInfo,
1057 ) -> CollectionBundle<G> {
1058 let (mut nodes, root_id, topological_order) = plan.destruct();
1059
1060 let mut collections = BTreeMap::new();
1062
1063 let should_compute_lir_metadata = self.compute_logger.is_some();
1069 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1070 Some(Vec::with_capacity(nodes.len()))
1071 } else {
1072 None
1073 };
1074
1075 let mut topo_iter = topological_order.into_iter().peekable();
1076 while let Some(lir_id) = topo_iter.next() {
1077 let node = nodes.remove(&lir_id).unwrap();
1078
1079 let metadata = if should_compute_lir_metadata {
1083 let operator = node.expr.humanize(&DummyHumanizer);
1084
1085 let operator = if topo_iter.peek().is_none() {
1087 match &binding {
1088 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1089 BindingInfo::Body { in_let: false } => operator,
1090 BindingInfo::Let { id, last: true } => {
1091 format!("With {id} = {operator}")
1092 }
1093 BindingInfo::Let { id, last: false } => {
1094 format!("{id} = {operator}")
1095 }
1096 BindingInfo::LetRec { id, last: true } => {
1097 format!("With Recursive {id} = {operator}")
1098 }
1099 BindingInfo::LetRec { id, last: false } => {
1100 format!("{id} = {operator}")
1101 }
1102 }
1103 } else {
1104 operator
1105 };
1106
1107 let operator_id_start = self.scope.peek_identifier();
1108 Some((operator, operator_id_start))
1109 } else {
1110 None
1111 };
1112
1113 let mut bundle = self.render_plan_expr(node.expr, &collections);
1114
1115 if let Some((operator, operator_id_start)) = metadata {
1116 let operator_id_end = self.scope.peek_identifier();
1117 let operator_span = (operator_id_start, operator_id_end);
1118
1119 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1120 lir_mapping_metadata.push((
1121 lir_id,
1122 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1123 ))
1124 }
1125 }
1126
1127 self.log_operator_hydration(&mut bundle, lir_id);
1128
1129 collections.insert(lir_id, bundle);
1130 }
1131
1132 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1133 self.log_lir_mapping(object_id, lir_mapping_metadata);
1134 }
1135
1136 collections
1137 .remove(&root_id)
1138 .expect("LetFreePlan invariant (1)")
1139 }
1140
1141 fn render_plan_expr(
1148 &mut self,
1149 expr: render_plan::Expr,
1150 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1151 ) -> CollectionBundle<G> {
1152 use render_plan::Expr::*;
1153
1154 let expect_input = |id| {
1155 collections
1156 .get(&id)
1157 .cloned()
1158 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1159 };
1160
1161 match expr {
1162 Constant { rows } => {
1163 let (rows, errs) = match rows {
1165 Ok(rows) => (rows, Vec::new()),
1166 Err(e) => (Vec::new(), vec![e]),
1167 };
1168
1169 let as_of_frontier = self.as_of_frontier.clone();
1171 let until = self.until.clone();
1172 let ok_collection = rows
1173 .into_iter()
1174 .filter_map(move |(row, mut time, diff)| {
1175 time.advance_by(as_of_frontier.borrow());
1176 if !until.less_equal(&time) {
1177 Some((
1178 row,
1179 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1180 diff,
1181 ))
1182 } else {
1183 None
1184 }
1185 })
1186 .to_stream(&mut self.scope)
1187 .as_collection();
1188
1189 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1190 error_time.advance_by(self.as_of_frontier.borrow());
1191 let err_collection = errs
1192 .into_iter()
1193 .map(move |e| {
1194 (
1195 DataflowError::from(e),
1196 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1197 Diff::ONE,
1198 )
1199 })
1200 .to_stream(&mut self.scope)
1201 .as_collection();
1202
1203 CollectionBundle::from_collections(ok_collection, err_collection)
1204 }
1205 Get { id, keys, plan } => {
1206 let mut collection = self
1209 .lookup_id(id)
1210 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1211 match plan {
1212 mz_compute_types::plan::GetPlan::PassArrangements => {
1213 assert!(
1215 keys.arranged
1216 .iter()
1217 .all(|(key, _, _)| collection.arranged.contains_key(key))
1218 );
1219 assert!(keys.raw <= collection.collection.is_some());
1220 collection.arranged.retain(|key, _value| {
1222 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1223 });
1224 collection
1225 }
1226 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1227 let (oks, errs) = collection.as_collection_core(
1228 mfp,
1229 Some((key, row)),
1230 self.until.clone(),
1231 &self.config_set,
1232 );
1233 CollectionBundle::from_collections(oks, errs)
1234 }
1235 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1236 let (oks, errs) = collection.as_collection_core(
1237 mfp,
1238 None,
1239 self.until.clone(),
1240 &self.config_set,
1241 );
1242 CollectionBundle::from_collections(oks, errs)
1243 }
1244 }
1245 }
1246 Mfp {
1247 input,
1248 mfp,
1249 input_key_val,
1250 } => {
1251 let input = expect_input(input);
1252 if mfp.is_identity() {
1254 input
1255 } else {
1256 let (oks, errs) = input.as_collection_core(
1257 mfp,
1258 input_key_val,
1259 self.until.clone(),
1260 &self.config_set,
1261 );
1262 CollectionBundle::from_collections(oks, errs)
1263 }
1264 }
1265 FlatMap {
1266 input_key,
1267 input,
1268 exprs,
1269 func,
1270 mfp_after: mfp,
1271 } => {
1272 let input = expect_input(input);
1273 self.render_flat_map(input_key, input, exprs, func, mfp)
1274 }
1275 Join { inputs, plan } => {
1276 let inputs = inputs.into_iter().map(expect_input).collect();
1277 match plan {
1278 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1279 self.render_join(inputs, linear_plan)
1280 }
1281 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1282 self.render_delta_join(inputs, delta_plan)
1283 }
1284 }
1285 }
1286 Reduce {
1287 input_key,
1288 input,
1289 key_val_plan,
1290 plan,
1291 mfp_after,
1292 } => {
1293 let input = expect_input(input);
1294 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1295 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1296 }
1297 TopK { input, top_k_plan } => {
1298 let input = expect_input(input);
1299 self.render_topk(input, top_k_plan)
1300 }
1301 Negate { input } => {
1302 let input = expect_input(input);
1303 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1304 CollectionBundle::from_collections(oks.negate(), errs)
1305 }
1306 Threshold {
1307 input,
1308 threshold_plan,
1309 } => {
1310 let input = expect_input(input);
1311 self.render_threshold(input, threshold_plan)
1312 }
1313 Union {
1314 inputs,
1315 consolidate_output,
1316 } => {
1317 let mut oks = Vec::new();
1318 let mut errs = Vec::new();
1319 for input in inputs.into_iter() {
1320 let (os, es) =
1321 expect_input(input).as_specific_collection(None, &self.config_set);
1322 oks.push(os);
1323 errs.push(es);
1324 }
1325 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1326 if consolidate_output {
1327 oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1328 }
1329 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1330 CollectionBundle::from_collections(oks, errs)
1331 }
1332 ArrangeBy {
1333 input_key,
1334 input,
1335 input_mfp,
1336 forms: keys,
1337 } => {
1338 let input = expect_input(input);
1339 input.ensure_collections(
1340 keys,
1341 input_key,
1342 input_mfp,
1343 self.until.clone(),
1344 &self.config_set,
1345 )
1346 }
1347 }
1348 }
1349
1350 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1351 if let Some(logger) = &self.compute_logger {
1352 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1353 dataflow_index,
1354 global_id,
1355 }));
1356 }
1357 }
1358
1359 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1360 if let Some(logger) = &self.compute_logger {
1361 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1362 }
1363 }
1364
1365 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1366 match bundle.arranged.values_mut().next() {
1386 Some(arrangement) => {
1387 use ArrangementFlavor::*;
1388
1389 match arrangement {
1390 Local(a, _) => {
1391 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1392 }
1393 Trace(_, a, _) => {
1394 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1395 }
1396 }
1397 }
1398 None => {
1399 let (oks, _) = bundle
1400 .collection
1401 .as_mut()
1402 .expect("CollectionBundle invariant");
1403 let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1404 *oks = stream.as_collection();
1405 }
1406 }
1407 }
1408
1409 fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1410 where
1411 D: Clone + 'static,
1412 {
1413 let Some(logger) = self.compute_logger.clone() else {
1414 return stream.clone(); };
1416
1417 let export_ids = self.export_ids.clone();
1418
1419 let mut hydration_frontier = Antichain::new();
1427 for time in self.as_of_frontier.iter() {
1428 if let Some(time) = time.try_step_forward() {
1429 hydration_frontier.insert(Refines::to_inner(time));
1430 }
1431 }
1432
1433 let name = format!("LogOperatorHydration ({lir_id})");
1434 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1435 let mut hydrated = false;
1436
1437 for &export_id in &export_ids {
1438 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1439 export_id,
1440 lir_id,
1441 hydrated,
1442 }));
1443 }
1444
1445 move |(input, frontier), output| {
1446 input.for_each(|cap, data| {
1448 output.session(&cap).give_container(data);
1449 });
1450
1451 if hydrated {
1452 return;
1453 }
1454
1455 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1456 hydrated = true;
1457
1458 for &export_id in &export_ids {
1459 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1460 export_id,
1461 lir_id,
1462 hydrated,
1463 }));
1464 }
1465 }
1466 }
1467 })
1468 }
1469}
1470
1471#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1474 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1479 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1481 fn event_time(&self) -> mz_repr::Timestamp;
1483 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1485 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1487 fn step_back(&self) -> Self;
1490}
1491
1492impl RenderTimestamp for mz_repr::Timestamp {
1493 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1494 self
1495 }
1496 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1497 delay
1498 }
1499 fn event_time(&self) -> mz_repr::Timestamp {
1500 *self
1501 }
1502 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1503 self
1504 }
1505 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1506 delay
1507 }
1508 fn step_back(&self) -> Self {
1509 self.saturating_sub(1)
1510 }
1511}
1512
1513impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1514 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1515 &mut self.outer
1516 }
1517 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1518 Product::new(delay, Default::default())
1519 }
1520 fn event_time(&self) -> mz_repr::Timestamp {
1521 self.outer
1522 }
1523 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1524 &mut self.outer
1525 }
1526 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1527 Product::new(delay, Default::default())
1528 }
1529 fn step_back(&self) -> Self {
1530 let inner = self.inner.clone();
1534 let mut vec = inner.into_vec();
1535 for item in vec.iter_mut() {
1536 *item = item.saturating_sub(1);
1537 }
1538 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1539 }
1540}
1541
1542#[derive(Clone)]
1552pub(crate) struct StartSignal {
1553 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1558 token_ref: Weak<RefCell<Box<dyn Any>>>,
1560}
1561
1562impl StartSignal {
1563 pub fn new() -> (Self, Rc<dyn Any>) {
1566 let (tx, rx) = oneshot::channel::<Infallible>();
1567 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1568 let signal = Self {
1569 fut: rx.shared(),
1570 token_ref: Rc::downgrade(&token),
1571 };
1572 (signal, token)
1573 }
1574
1575 pub fn has_fired(&self) -> bool {
1576 self.token_ref.strong_count() == 0
1577 }
1578
1579 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1580 if let Some(token) = self.token_ref.upgrade() {
1581 let mut token = token.borrow_mut();
1582 let inner = std::mem::replace(&mut *token, Box::new(()));
1583 *token = Box::new((inner, to_drop));
1584 }
1585 }
1586}
1587
1588impl Future for StartSignal {
1589 type Output = ();
1590
1591 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1592 self.fut.poll_unpin(cx).map(|_| ())
1593 }
1594}
1595
1596pub(crate) trait WithStartSignal {
1598 fn with_start_signal(self, signal: StartSignal) -> Self;
1603}
1604
1605impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1606where
1607 S: Scope,
1608 S::Timestamp: RenderTimestamp,
1609 Tr: TraceReader + Clone,
1610{
1611 fn with_start_signal(self, signal: StartSignal) -> Self {
1612 Arranged {
1613 stream: self.stream.with_start_signal(signal),
1614 trace: self.trace,
1615 }
1616 }
1617}
1618
1619impl<S, D> WithStartSignal for Stream<S, D>
1620where
1621 S: Scope,
1622 D: timely::Data,
1623{
1624 fn with_start_signal(self, signal: StartSignal) -> Self {
1625 self.unary(Pipeline, "StartSignal", |_cap, info| {
1626 let token = Box::new(ActivateOnDrop::new(
1627 (),
1628 info.address,
1629 self.scope().activations(),
1630 ));
1631 signal.drop_on_fire(token);
1632
1633 let mut stash = Vec::new();
1634
1635 move |input, output| {
1636 if !signal.has_fired() {
1638 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1639 return;
1640 }
1641
1642 for (cap, mut data) in std::mem::take(&mut stash) {
1644 output.session(&cap).give_container(&mut data);
1645 }
1646
1647 input.for_each(|cap, data| {
1649 output.session(&cap).give_container(data);
1650 });
1651 }
1652 })
1653 }
1654}
1655
1656fn suppress_early_progress<G, D>(
1678 stream: Stream<G, D>,
1679 as_of: Antichain<G::Timestamp>,
1680) -> Stream<G, D>
1681where
1682 G: Scope,
1683 D: Data,
1684{
1685 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1686 let mut early_cap = Some(default_cap);
1687
1688 move |(input, frontier), output| {
1689 input.for_each_time(|data_cap, data| {
1690 if as_of.less_than(data_cap.time()) {
1691 let mut session = output.session(&data_cap);
1692 for data in data {
1693 session.give_container(data);
1694 }
1695 } else {
1696 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1697 let mut session = output.session(&cap);
1698 for data in data {
1699 session.give_container(data);
1700 }
1701 }
1702 });
1703
1704 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1705 early_cap.take();
1706 }
1707 }
1708 })
1709}
1710
1711trait LimitProgress<T: Timestamp> {
1713 fn limit_progress(
1741 &self,
1742 handle: MzProbeHandle<T>,
1743 slack_ms: u64,
1744 limit: Option<usize>,
1745 upper: Antichain<T>,
1746 name: String,
1747 ) -> Self;
1748}
1749
1750impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1753where
1754 G: Scope<Timestamp = mz_repr::Timestamp>,
1755 D: timely::Data,
1756 R: timely::Data,
1757{
1758 fn limit_progress(
1759 &self,
1760 handle: MzProbeHandle<mz_repr::Timestamp>,
1761 slack_ms: u64,
1762 limit: Option<usize>,
1763 upper: Antichain<mz_repr::Timestamp>,
1764 name: String,
1765 ) -> Self {
1766 let stream =
1767 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1768 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1770 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1772
1773 let activator = self.scope().activator_for(info.address);
1774 handle.activate(activator.clone());
1775
1776 move |(input, frontier), output| {
1777 input.for_each(|cap, data| {
1778 for time in data
1779 .iter()
1780 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1781 {
1782 let rounded_time =
1783 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1784 if !upper.less_than(&rounded_time.into()) {
1785 pending_times.insert(rounded_time.into());
1786 }
1787 }
1788 output.session(&cap).give_container(data);
1789 if retained_cap.as_ref().is_none_or(|c| {
1790 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1791 }) {
1792 retained_cap = Some(cap.retain());
1793 }
1794 });
1795
1796 handle.with_frontier(|f| {
1797 while pending_times
1798 .first()
1799 .map_or(false, |retained_time| !f.less_than(&retained_time))
1800 {
1801 let _ = pending_times.pop_first();
1802 }
1803 });
1804
1805 while limit.map_or(false, |limit| pending_times.len() > limit) {
1806 let _ = pending_times.pop_first();
1807 }
1808
1809 match (retained_cap.as_mut(), pending_times.first()) {
1810 (Some(cap), Some(first)) => cap.downgrade(first),
1811 (_, None) => retained_cap = None,
1812 _ => {}
1813 }
1814
1815 if frontier.is_empty() {
1816 retained_cap = None;
1817 pending_times.clear();
1818 }
1819
1820 if !pending_times.is_empty() {
1821 tracing::debug!(
1822 name,
1823 info.global_id,
1824 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1825 frontier = ?frontier.frontier().get(0),
1826 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1827 ?upper,
1828 "pending times",
1829 );
1830 }
1831 }
1832 });
1833 stream
1834 }
1835}
1836
1837struct PendingTimesDisplay<T>(T);
1840
1841impl<T> std::fmt::Display for PendingTimesDisplay<T>
1842where
1843 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1844{
1845 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1846 let mut iter = self.0.clone().into_iter();
1847 write!(f, "[")?;
1848 if let Some(first) = iter.next() {
1849 write!(f, "{}", first)?;
1850 let mut last = u64::from(first);
1851 for time in iter {
1852 write!(f, ", +{}", u64::from(time) - last)?;
1853 last = u64::from(time);
1854 }
1855 }
1856 write!(f, "]")?;
1857 Ok(())
1858 }
1859}
1860
1861#[derive(Clone, Copy, Debug)]
1864struct Pairer {
1865 split_arity: usize,
1866}
1867
1868impl Pairer {
1869 fn new(split_arity: usize) -> Self {
1871 Self { split_arity }
1872 }
1873
1874 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1876 where
1877 I1: IntoIterator<Item = Datum<'a>>,
1878 I2: IntoIterator<Item = Datum<'a>>,
1879 {
1880 SharedRow::pack(first.into_iter().chain(second))
1881 }
1882
1883 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1885 let mut datum_iter = datum_iter.into_iter();
1886 let mut row_builder = SharedRow::get();
1887 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1888 let second = row_builder.pack_using(datum_iter);
1889 (first, second)
1890 }
1891}