1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::Variable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 ENABLE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use mz_timely_util::scope_label::ScopeExt;
141use timely::PartialOrder;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::vec::ToStream;
145use timely::dataflow::operators::vec::{BranchWhen, Filter};
146use timely::dataflow::operators::{Capability, Operator, Probe, probe};
147use timely::dataflow::{Scope, Stream, StreamVec};
148use timely::order::{Product, TotalOrder};
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::Worker as TimelyWorker;
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180pub fn build_compute_dataflow(
186 timely_worker: &mut TimelyWorker,
187 compute_state: &mut ComputeState,
188 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189 start_signal: StartSignal,
190 until: Antichain<mz_repr::Timestamp>,
191 dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193 let recursive = dataflow
195 .objects_to_build
196 .iter()
197 .any(|object| object.plan.is_recursive());
198
199 let indexes = dataflow
201 .index_exports
202 .iter()
203 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204 .collect::<Vec<_>>();
205
206 let sinks = dataflow
208 .sink_exports
209 .iter()
210 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211 .collect::<Vec<_>>();
212
213 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215 let subscribe_snapshot_optimization =
216 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
217
218 let name = format!("Dataflow: {}", &dataflow.debug_name);
219 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
220 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
221
222 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
223 let scope = scope.with_label();
224
225 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
228
229 let mut imported_sources = Vec::new();
234 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
235 let output_probe = MzProbeHandle::default();
236
237 scope.clone().region_named(&input_name, |region| {
238 for (source_id, import) in dataflow.source_imports.iter() {
240 region.region_named(&format!("Source({:?})", source_id), |inner| {
241 let mut read_schema = None;
242 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
243 if apply_demands {
246 let demands = ops.demand();
247 let new_desc = import
248 .desc
249 .storage_metadata
250 .relation_desc
251 .apply_demand(&demands);
252 let new_arity = demands.len();
253 let remap: BTreeMap<_, _> = demands
254 .into_iter()
255 .enumerate()
256 .map(|(new, old)| (old, new))
257 .collect();
258 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
259 read_schema = Some(new_desc);
260 }
261
262 mz_expr::MfpPlan::create_from(ops)
263 .expect("Linear operators should always be valid")
264 });
265
266 let mut snapshot_mode =
267 if import.with_snapshot || !subscribe_snapshot_optimization {
268 SnapshotMode::Include
269 } else {
270 compute_state.metrics.inc_subscribe_snapshot_optimization();
271 SnapshotMode::Exclude
272 };
273 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
274 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
275 if let Some(x) = ct_source_transformer.as_ref() {
276 snapshot_mode = x.snapshot_mode();
277 suppress_early_progress_as_of = suppress_early_progress_as_of
278 .map(|as_of| x.suppress_early_progress_as_of(as_of));
279 }
280
281 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
284 inner,
285 *source_id,
286 Arc::clone(&compute_state.persist_clients),
287 &compute_state.txns_ctx,
288 import.desc.storage_metadata.clone(),
289 read_schema,
290 dataflow.as_of.clone(),
291 snapshot_mode,
292 until.clone(),
293 mfp.as_mut(),
294 compute_state.dataflow_max_inflight_bytes(),
295 start_signal.clone(),
296 ErrorHandler::Halt("compute_import"),
297 );
298
299 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
302
303 if let Some(as_of) = suppress_early_progress_as_of {
307 ok_stream = suppress_early_progress(ok_stream, as_of);
308 }
309
310 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
311 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
313 .get(&compute_state.worker_config);
314 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
315 .get(&compute_state.worker_config)
316 .as_millis()
317 .try_into()
318 .expect("must fit");
319
320 let stream = ok_stream.limit_progress(
321 output_probe.clone(),
322 slack,
323 limit,
324 import.upper.clone(),
325 name.clone(),
326 );
327 ok_stream = stream;
328 }
329
330 let input_probe =
332 compute_state.input_probe_for(*source_id, dataflow.export_ids());
333 ok_stream = ok_stream.probe_with(&input_probe);
334
335 let (ok_stream, err_stream) = match ct_source_transformer {
339 None => (ok_stream, err_stream),
340 Some(ct_source_transformer) => {
341 let (oks, errs, ct_times) = ct_source_transformer
342 .transform(ok_stream.as_collection(), err_stream.as_collection());
343 ct_ctx
346 .ct_times
347 .push(ct_times.leave_region(region).leave_region(scope));
348 (oks.inner, errs.inner)
349 }
350 };
351
352 let (oks, errs) = (
353 ok_stream
354 .as_collection()
355 .leave_region(region)
356 .leave_region(scope),
357 err_stream
358 .as_collection()
359 .leave_region(region)
360 .leave_region(scope),
361 );
362
363 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
364
365 tokens.insert(*source_id, Rc::new(token));
367 });
368 }
369 });
370
371 if recursive {
374 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
375 let mut context = Context::for_dataflow_in(
376 &dataflow,
377 region.clone(),
378 compute_state,
379 until,
380 dataflow_expiration,
381 );
382
383 for (id, (oks, errs)) in imported_sources.into_iter() {
384 let bundle = crate::render::CollectionBundle::from_collections(
385 oks.enter(region),
386 errs.enter(region),
387 );
388 context.insert_id(id, bundle);
390 }
391
392 for (idx_id, idx) in &dataflow.index_imports {
394 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
395 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
396 SnapshotMode::Include
397 } else {
398 compute_state.metrics.inc_subscribe_snapshot_optimization();
399 SnapshotMode::Exclude
400 };
401 context.import_index(
402 scope,
403 compute_state,
404 &mut tokens,
405 input_probe,
406 *idx_id,
407 &idx.desc,
408 &idx.typ,
409 snapshot_mode,
410 start_signal.clone(),
411 );
412 }
413
414 for object in dataflow.objects_to_build {
416 let bundle = context.scope.clone().region_named(
417 &format!("BuildingObject({:?})", object.id),
418 |region| {
419 let depends = object.plan.depends();
420 let in_let = object.plan.is_recursive();
421 context
422 .enter_region(region, Some(&depends))
423 .render_recursive_plan(
424 object.id,
425 0,
426 object.plan,
427 BindingInfo::Body { in_let },
429 )
430 .leave_region(context.scope)
431 },
432 );
433 let global_id = object.id;
434
435 context.log_dataflow_global_id(
436 *bundle
437 .scope()
438 .addr()
439 .first()
440 .expect("Dataflow root id must exist"),
441 global_id,
442 );
443 context.insert_id(Id::Global(object.id), bundle);
444 }
445
446 for (idx_id, dependencies, idx) in indexes {
448 context.export_index_iterative(
449 scope,
450 compute_state,
451 &tokens,
452 dependencies,
453 idx_id,
454 &idx,
455 &output_probe,
456 );
457 }
458
459 for (sink_id, dependencies, sink) in sinks {
461 context.export_sink(
462 compute_state,
463 &tokens,
464 dependencies,
465 sink_id,
466 &sink,
467 start_signal.clone(),
468 ct_ctx.input_times(scope),
469 &output_probe,
470 scope,
471 );
472 }
473 });
474 } else {
475 scope.clone().region_named(&build_name, |region| {
476 let mut context = Context::for_dataflow_in(
477 &dataflow,
478 region.clone(),
479 compute_state,
480 until,
481 dataflow_expiration,
482 );
483
484 for (id, (oks, errs)) in imported_sources.into_iter() {
485 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
486 let as_of = context.as_of_frontier.clone();
487 let summary = TEMPORAL_BUCKETING_SUMMARY
488 .get(&compute_state.worker_config)
489 .try_into()
490 .expect("must fit");
491 oks.inner
492 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
493 .as_collection()
494 } else {
495 oks
496 };
497 let bundle = crate::render::CollectionBundle::from_collections(
498 oks.enter_region(region),
499 errs.enter_region(region),
500 );
501 context.insert_id(id, bundle);
503 }
504
505 for (idx_id, idx) in &dataflow.index_imports {
507 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
508 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
509 SnapshotMode::Include
510 } else {
511 compute_state.metrics.inc_subscribe_snapshot_optimization();
512 SnapshotMode::Exclude
513 };
514 context.import_index(
515 scope,
516 compute_state,
517 &mut tokens,
518 input_probe,
519 *idx_id,
520 &idx.desc,
521 &idx.typ,
522 snapshot_mode,
523 start_signal.clone(),
524 );
525 }
526
527 for object in dataflow.objects_to_build {
529 let bundle = context.scope.clone().region_named(
530 &format!("BuildingObject({:?})", object.id),
531 |region| {
532 let depends = object.plan.depends();
533 context
534 .enter_region(region, Some(&depends))
535 .render_plan(object.id, object.plan)
536 .leave_region(context.scope)
537 },
538 );
539 let global_id = object.id;
540 context.log_dataflow_global_id(
541 *bundle
542 .scope()
543 .addr()
544 .first()
545 .expect("Dataflow root id must exist"),
546 global_id,
547 );
548 context.insert_id(Id::Global(object.id), bundle);
549 }
550
551 for (idx_id, dependencies, idx) in indexes {
553 context.export_index(
554 compute_state,
555 &tokens,
556 dependencies,
557 idx_id,
558 &idx,
559 &output_probe,
560 );
561 }
562
563 for (sink_id, dependencies, sink) in sinks {
565 context.export_sink(
566 compute_state,
567 &tokens,
568 dependencies,
569 sink_id,
570 &sink,
571 start_signal.clone(),
572 ct_ctx.input_times(scope),
573 &output_probe,
574 scope,
575 );
576 }
577 });
578 }
579 });
580}
581
582impl<'g, T> Context<'g, T>
585where
586 T: Refines<mz_repr::Timestamp> + RenderTimestamp,
587{
588 fn import_filtered_index_collection<
592 'outer,
593 Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
594 V: Data,
595 >(
596 &self,
597 arranged: Arranged<'outer, Tr>,
598 start_signal: StartSignal,
599 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
600 ) -> VecCollection<'g, T, V, Tr::Diff>
601 where
602 mz_repr::Timestamp: TotalOrder,
605 {
606 let oks = arranged.stream.with_start_signal(start_signal).filter({
607 let as_of = self.as_of_frontier.clone();
608 move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
609 });
610 Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
611 }
612
613 pub(crate) fn import_index<'outer>(
614 &mut self,
615 outer: Scope<'outer, mz_repr::Timestamp>,
616 compute_state: &mut ComputeState,
617 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
618 input_probe: probe::Handle<mz_repr::Timestamp>,
619 idx_id: GlobalId,
620 idx: &IndexDesc,
621 typ: &ReprRelationType,
622 snapshot_mode: SnapshotMode,
623 start_signal: StartSignal,
624 ) {
625 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
626 assert!(
627 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
628 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
629 );
630
631 let token = traces.to_drop().clone();
632
633 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
634 outer,
635 &format!("Index({}, {:?})", idx.on_id, idx.key),
636 self.as_of_frontier.clone(),
637 self.until.clone(),
638 );
639
640 oks.stream = oks.stream.probe_with(&input_probe);
641
642 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
643 outer,
644 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
645 self.as_of_frontier.clone(),
646 self.until.clone(),
647 );
648
649 let bundle = match snapshot_mode {
650 SnapshotMode::Include => {
651 let ok_arranged = oks
652 .enter(self.scope)
653 .with_start_signal(start_signal.clone());
654 let err_arranged = err_arranged
655 .enter(self.scope)
656 .with_start_signal(start_signal);
657 CollectionBundle::from_expressions(
658 idx.key.clone(),
659 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
660 )
661 }
662 SnapshotMode::Exclude => {
663 let oks = {
670 let mut datums = DatumVec::new();
671 let (permutation, _thinning) =
672 permutation_for_arrangement(&idx.key, typ.arity());
673 self.import_filtered_index_collection(
674 oks,
675 start_signal.clone(),
676 move |k: DatumSeq, v: DatumSeq| {
677 let mut datums_borrow = datums.borrow();
678 datums_borrow.extend(k);
679 datums_borrow.extend(v);
680 SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
681 },
682 )
683 };
684 let errs = self.import_filtered_index_collection(
685 err_arranged,
686 start_signal,
687 |e, _| e.clone(),
688 );
689 CollectionBundle::from_collections(oks, errs)
690 }
691 };
692 self.update_id(Id::Global(idx.on_id), bundle);
693 tokens.insert(
694 idx_id,
695 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
696 );
697 } else {
698 panic!(
699 "import of index {} failed while building dataflow {}",
700 idx_id, self.dataflow_id
701 );
702 }
703 }
704}
705
706impl<'g> Context<'g, mz_repr::Timestamp> {
709 pub(crate) fn export_index(
710 &self,
711 compute_state: &mut ComputeState,
712 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
713 dependency_ids: BTreeSet<GlobalId>,
714 idx_id: GlobalId,
715 idx: &IndexDesc,
716 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
717 ) {
718 let mut needed_tokens = Vec::new();
720 for dep_id in dependency_ids {
721 if let Some(token) = tokens.get(&dep_id) {
722 needed_tokens.push(Rc::clone(token));
723 }
724 }
725 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
726 panic!(
727 "Arrangement alarmingly absent! id: {:?}",
728 Id::Global(idx_id)
729 )
730 });
731
732 match bundle.arrangement(&idx.key) {
733 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
734 if let Some(&expiration) = self.dataflow_expiration.as_option() {
737 oks.stream = oks.stream.expire_stream_at(
738 &format!("{}_export_index_oks", self.debug_name),
739 expiration,
740 );
741 errs.stream = errs.stream.expire_stream_at(
742 &format!("{}_export_index_errs", self.debug_name),
743 expiration,
744 );
745 }
746
747 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
748
749 if let Some(logger) = compute_state.compute_logger.clone() {
751 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
752 }
753
754 compute_state.traces.set(
755 idx_id,
756 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
757 );
758 }
759 Some(ArrangementFlavor::Trace(gid, _, _)) => {
760 let trace = compute_state.traces.get(&gid).unwrap().clone();
763 compute_state.traces.set(idx_id, trace);
764 }
765 None => {
766 println!("collection available: {:?}", bundle.collection.is_none());
767 println!(
768 "keys available: {:?}",
769 bundle.arranged.keys().collect::<Vec<_>>()
770 );
771 panic!(
772 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
773 Id::Global(idx_id),
774 &idx.key
775 );
776 }
777 };
778 }
779}
780
781impl<'g, T> Context<'g, T>
784where
785 T: RenderTimestamp,
786{
787 pub(crate) fn export_index_iterative<'outer>(
788 &self,
789 outer: Scope<'outer, mz_repr::Timestamp>,
790 compute_state: &mut ComputeState,
791 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
792 dependency_ids: BTreeSet<GlobalId>,
793 idx_id: GlobalId,
794 idx: &IndexDesc,
795 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
796 ) {
797 let mut needed_tokens = Vec::new();
799 for dep_id in dependency_ids {
800 if let Some(token) = tokens.get(&dep_id) {
801 needed_tokens.push(Rc::clone(token));
802 }
803 }
804 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
805 panic!(
806 "Arrangement alarmingly absent! id: {:?}",
807 Id::Global(idx_id)
808 )
809 });
810
811 match bundle.arrangement(&idx.key) {
812 Some(ArrangementFlavor::Local(oks, errs)) => {
813 let mut oks = oks
817 .as_collection(|k, v| (k.to_row(), v.to_row()))
818 .leave(outer)
819 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
820 "Arrange export iterative",
821 );
822
823 let mut errs = errs
824 .as_collection(|k, v| (k.clone(), v.clone()))
825 .leave(outer)
826 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
827 "Arrange export iterative err",
828 );
829
830 if let Some(&expiration) = self.dataflow_expiration.as_option() {
833 oks.stream = oks.stream.expire_stream_at(
834 &format!("{}_export_index_iterative_oks", self.debug_name),
835 expiration,
836 );
837 errs.stream = errs.stream.expire_stream_at(
838 &format!("{}_export_index_iterative_err", self.debug_name),
839 expiration,
840 );
841 }
842
843 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
844
845 if let Some(logger) = compute_state.compute_logger.clone() {
847 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
848 }
849
850 compute_state.traces.set(
851 idx_id,
852 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
853 );
854 }
855 Some(ArrangementFlavor::Trace(gid, _, _)) => {
856 let trace = compute_state.traces.get(&gid).unwrap().clone();
859 compute_state.traces.set(idx_id, trace);
860 }
861 None => {
862 println!("collection available: {:?}", bundle.collection.is_none());
863 println!(
864 "keys available: {:?}",
865 bundle.arranged.keys().collect::<Vec<_>>()
866 );
867 panic!(
868 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
869 Id::Global(idx_id),
870 &idx.key
871 );
872 }
873 };
874 }
875}
876
877enum BindingInfo {
883 Body { in_let: bool },
884 Let { id: LocalId, last: bool },
885 LetRec { id: LocalId, last: bool },
886}
887
888impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
889 fn render_recursive_plan(
902 &mut self,
903 object_id: GlobalId,
904 level: usize,
905 plan: RenderPlan,
906 binding: BindingInfo,
907 ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
908 for BindStage { lets, recs } in plan.binds {
909 let mut let_iter = lets.into_iter().peekable();
911 while let Some(LetBind { id, value }) = let_iter.next() {
912 let bundle =
913 self.scope
914 .clone()
915 .region_named(&format!("Binding({:?})", id), |region| {
916 let depends = value.depends();
917 let last = let_iter.peek().is_none();
918 let binding = BindingInfo::Let { id, last };
919 self.enter_region(region, Some(&depends))
920 .render_letfree_plan(object_id, value, binding)
921 .leave_region(self.scope)
922 });
923 self.insert_id(Id::Local(id), bundle);
924 }
925
926 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
927
928 let mut variables = BTreeMap::new();
932 for id in rec_ids.iter() {
933 use differential_dataflow::dynamic::feedback_summary;
934 let inner = feedback_summary::<u64>(level + 1, 1);
935 let (oks_v, oks_collection) =
936 Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
937 let (err_v, err_collection) =
938 Variable::new(self.scope, Product::new(Default::default(), inner));
939
940 self.insert_id(
941 Id::Local(*id),
942 CollectionBundle::from_collections(oks_collection, err_collection),
943 );
944 variables.insert(Id::Local(*id), (oks_v, err_v));
945 }
946 let mut rec_iter = recs.into_iter().peekable();
948 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
949 let last = rec_iter.peek().is_none();
950 let binding = BindingInfo::LetRec { id, last };
951 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
952 let (oks, mut err) = bundle.collection.clone().unwrap();
955 self.insert_id(Id::Local(id), bundle);
956 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
957
958 let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
960 oks,
961 "LetRecConsolidation",
962 );
963
964 if let Some(limit) = limit {
965 let (in_limit, over_limit) =
968 oks.inner.branch_when(move |Product { inner: ps, .. }| {
969 let iteration_index = *ps.get(level).unwrap_or(&0);
971 iteration_index + 1 >= limit.max_iters.into()
973 });
974 oks = VecCollection::new(in_limit);
975 if !limit.return_at_limit {
976 err = err.concat(VecCollection::new(over_limit).map(move |_data| {
977 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
978 format!("{}", limit.max_iters.get()).into(),
979 )))
980 }));
981 }
982 }
983
984 let err: KeyCollection<_, _, _> = err.into();
990 let errs = err
991 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
992 "Arrange recursive err",
993 )
994 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
995 "Distinct recursive err",
996 move |_k, _s, t| t.push(((), Diff::ONE)),
997 )
998 .as_collection(|k, _| k.clone());
999
1000 oks_v.set(oks);
1001 err_v.set(errs);
1002 }
1003 for id in rec_ids.into_iter() {
1005 let bundle = self.remove_id(Id::Local(id)).unwrap();
1006 let (oks, err) = bundle.collection.unwrap();
1007 self.insert_id(
1008 Id::Local(id),
1009 CollectionBundle::from_collections(
1010 oks.leave_dynamic(level + 1),
1011 err.leave_dynamic(level + 1),
1012 ),
1013 );
1014 }
1015 }
1016
1017 self.render_letfree_plan(object_id, plan.body, binding)
1018 }
1019}
1020
1021impl<'scope, T: RenderTimestamp> Context<'scope, T> {
1022 fn render_plan(
1033 &mut self,
1034 object_id: GlobalId,
1035 plan: RenderPlan,
1036 ) -> CollectionBundle<'scope, T> {
1037 let mut in_let = false;
1038 for BindStage { lets, recs } in plan.binds {
1039 assert!(recs.is_empty());
1040
1041 let mut let_iter = lets.into_iter().peekable();
1042 while let Some(LetBind { id, value }) = let_iter.next() {
1043 in_let = true;
1045 let bundle =
1046 self.scope
1047 .clone()
1048 .region_named(&format!("Binding({:?})", id), |region| {
1049 let depends = value.depends();
1050 let last = let_iter.peek().is_none();
1051 let binding = BindingInfo::Let { id, last };
1052 self.enter_region(region, Some(&depends))
1053 .render_letfree_plan(object_id, value, binding)
1054 .leave_region(self.scope)
1055 });
1056 self.insert_id(Id::Local(id), bundle);
1057 }
1058 }
1059
1060 self.scope.clone().region_named("Main Body", |region| {
1061 let depends = plan.body.depends();
1062 self.enter_region(region, Some(&depends))
1063 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1064 .leave_region(self.scope)
1065 })
1066 }
1067
1068 fn render_letfree_plan(
1070 &self,
1071 object_id: GlobalId,
1072 plan: LetFreePlan,
1073 binding: BindingInfo,
1074 ) -> CollectionBundle<'scope, T> {
1075 let (mut nodes, root_id, topological_order) = plan.destruct();
1076
1077 let mut collections = BTreeMap::new();
1079
1080 let should_compute_lir_metadata = self.compute_logger.is_some();
1086 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1087 Some(Vec::with_capacity(nodes.len()))
1088 } else {
1089 None
1090 };
1091
1092 let mut topo_iter = topological_order.into_iter().peekable();
1093 while let Some(lir_id) = topo_iter.next() {
1094 let node = nodes.remove(&lir_id).unwrap();
1095
1096 let metadata = if should_compute_lir_metadata {
1100 let operator = node.expr.humanize(&DummyHumanizer);
1101
1102 let operator = if topo_iter.peek().is_none() {
1104 match &binding {
1105 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1106 BindingInfo::Body { in_let: false } => operator,
1107 BindingInfo::Let { id, last: true } => {
1108 format!("With {id} = {operator}")
1109 }
1110 BindingInfo::Let { id, last: false } => {
1111 format!("{id} = {operator}")
1112 }
1113 BindingInfo::LetRec { id, last: true } => {
1114 format!("With Recursive {id} = {operator}")
1115 }
1116 BindingInfo::LetRec { id, last: false } => {
1117 format!("{id} = {operator}")
1118 }
1119 }
1120 } else {
1121 operator
1122 };
1123
1124 let operator_id_start = self.scope.worker().peek_identifier();
1125 Some((operator, operator_id_start))
1126 } else {
1127 None
1128 };
1129
1130 let mut bundle = self.render_plan_expr(node.expr, &collections);
1131
1132 if let Some((operator, operator_id_start)) = metadata {
1133 let operator_id_end = self.scope.worker().peek_identifier();
1134 let operator_span = (operator_id_start, operator_id_end);
1135
1136 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1137 lir_mapping_metadata.push((
1138 lir_id,
1139 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1140 ))
1141 }
1142 }
1143
1144 self.log_operator_hydration(&mut bundle, lir_id);
1145
1146 collections.insert(lir_id, bundle);
1147 }
1148
1149 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1150 self.log_lir_mapping(object_id, lir_mapping_metadata);
1151 }
1152
1153 collections
1154 .remove(&root_id)
1155 .expect("LetFreePlan invariant (1)")
1156 }
1157
1158 fn render_plan_expr(
1165 &self,
1166 expr: render_plan::Expr,
1167 collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1168 ) -> CollectionBundle<'scope, T> {
1169 use render_plan::Expr::*;
1170
1171 let expect_input = |id| {
1172 collections
1173 .get(&id)
1174 .cloned()
1175 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1176 };
1177
1178 match expr {
1179 Constant { rows } => {
1180 let (rows, errs) = match rows {
1182 Ok(rows) => (rows, Vec::new()),
1183 Err(e) => (Vec::new(), vec![e]),
1184 };
1185
1186 let as_of_frontier = self.as_of_frontier.clone();
1188 let until = self.until.clone();
1189 let ok_collection = rows
1190 .into_iter()
1191 .filter_map(move |(row, mut time, diff)| {
1192 time.advance_by(as_of_frontier.borrow());
1193 if !until.less_equal(&time) {
1194 Some((
1195 row,
1196 <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1197 diff,
1198 ))
1199 } else {
1200 None
1201 }
1202 })
1203 .to_stream(self.scope)
1204 .as_collection();
1205
1206 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1207 error_time.advance_by(self.as_of_frontier.borrow());
1208 let err_collection = errs
1209 .into_iter()
1210 .map(move |e| {
1211 (
1212 DataflowError::from(e),
1213 <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1214 Diff::ONE,
1215 )
1216 })
1217 .to_stream(self.scope)
1218 .as_collection();
1219
1220 CollectionBundle::from_collections(ok_collection, err_collection)
1221 }
1222 Get { id, keys, plan } => {
1223 let mut collection = self
1226 .lookup_id(id)
1227 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1228 match plan {
1229 mz_compute_types::plan::GetPlan::PassArrangements => {
1230 assert!(
1232 keys.arranged
1233 .iter()
1234 .all(|(key, _, _)| collection.arranged.contains_key(key))
1235 );
1236 assert!(keys.raw <= collection.collection.is_some());
1237 collection.arranged.retain(|key, _value| {
1239 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1240 });
1241 collection
1242 }
1243 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1244 let (oks, errs) = collection.as_collection_core(
1245 mfp,
1246 Some((key, row)),
1247 self.until.clone(),
1248 &self.config_set,
1249 );
1250 CollectionBundle::from_collections(oks, errs)
1251 }
1252 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1253 let (oks, errs) = collection.as_collection_core(
1254 mfp,
1255 None,
1256 self.until.clone(),
1257 &self.config_set,
1258 );
1259 CollectionBundle::from_collections(oks, errs)
1260 }
1261 }
1262 }
1263 Mfp {
1264 input,
1265 mfp,
1266 input_key_val,
1267 } => {
1268 let input = expect_input(input);
1269 if mfp.is_identity() {
1271 input
1272 } else {
1273 let (oks, errs) = input.as_collection_core(
1274 mfp,
1275 input_key_val,
1276 self.until.clone(),
1277 &self.config_set,
1278 );
1279 CollectionBundle::from_collections(oks, errs)
1280 }
1281 }
1282 FlatMap {
1283 input_key,
1284 input,
1285 exprs,
1286 func,
1287 mfp_after: mfp,
1288 } => {
1289 let input = expect_input(input);
1290 self.render_flat_map(input_key, input, exprs, func, mfp)
1291 }
1292 Join { inputs, plan } => {
1293 let inputs = inputs.into_iter().map(expect_input).collect();
1294 match plan {
1295 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1296 self.render_join(inputs, linear_plan)
1297 }
1298 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1299 self.render_delta_join(inputs, delta_plan)
1300 }
1301 }
1302 }
1303 Reduce {
1304 input_key,
1305 input,
1306 key_val_plan,
1307 plan,
1308 mfp_after,
1309 } => {
1310 let input = expect_input(input);
1311 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1312 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1313 }
1314 TopK { input, top_k_plan } => {
1315 let input = expect_input(input);
1316 self.render_topk(input, top_k_plan)
1317 }
1318 Negate { input } => {
1319 let input = expect_input(input);
1320 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1321 CollectionBundle::from_collections(oks.negate(), errs)
1322 }
1323 Threshold {
1324 input,
1325 threshold_plan,
1326 } => {
1327 let input = expect_input(input);
1328 self.render_threshold(input, threshold_plan)
1329 }
1330 Union {
1331 inputs,
1332 consolidate_output,
1333 } => {
1334 let mut oks = Vec::new();
1335 let mut errs = Vec::new();
1336 for input in inputs.into_iter() {
1337 let (os, es) =
1338 expect_input(input).as_specific_collection(None, &self.config_set);
1339 oks.push(os);
1340 errs.push(es);
1341 }
1342 let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1343 if consolidate_output {
1344 oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1345 oks,
1346 "UnionConsolidation",
1347 )
1348 }
1349 let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1350 CollectionBundle::from_collections(oks, errs)
1351 }
1352 ArrangeBy {
1353 input_key,
1354 input,
1355 input_mfp,
1356 forms: keys,
1357 } => {
1358 let input = expect_input(input);
1359 input.ensure_collections(
1360 keys,
1361 input_key,
1362 input_mfp,
1363 self.until.clone(),
1364 &self.config_set,
1365 )
1366 }
1367 }
1368 }
1369
1370 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1371 if let Some(logger) = &self.compute_logger {
1372 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1373 dataflow_index,
1374 global_id,
1375 }));
1376 }
1377 }
1378
1379 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1380 if let Some(logger) = &self.compute_logger {
1381 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1382 }
1383 }
1384
1385 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1386 match bundle.arranged.values_mut().next() {
1406 Some(arrangement) => {
1407 use ArrangementFlavor::*;
1408
1409 match arrangement {
1410 Local(a, _) => {
1411 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1412 }
1413 Trace(_, a, _) => {
1414 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1415 }
1416 }
1417 }
1418 None => {
1419 let (oks, _) = bundle
1420 .collection
1421 .as_mut()
1422 .expect("CollectionBundle invariant");
1423 let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1424 *oks = stream.as_collection();
1425 }
1426 }
1427 }
1428
1429 fn log_operator_hydration_inner<D>(
1430 &self,
1431 stream: Stream<'scope, T, D>,
1432 lir_id: LirId,
1433 ) -> Stream<'scope, T, D>
1434 where
1435 D: timely::Container + Clone + 'static,
1436 {
1437 let Some(logger) = self.compute_logger.clone() else {
1438 return stream.clone(); };
1440
1441 let export_ids = self.export_ids.clone();
1442
1443 let mut hydration_frontier = Antichain::new();
1451 for time in self.as_of_frontier.iter() {
1452 if let Some(time) = time.try_step_forward() {
1453 hydration_frontier.insert(Refines::to_inner(time));
1454 }
1455 }
1456
1457 let name = format!("LogOperatorHydration ({lir_id})");
1458 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1459 let mut hydrated = false;
1460
1461 for &export_id in &export_ids {
1462 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1463 export_id,
1464 lir_id,
1465 hydrated,
1466 }));
1467 }
1468
1469 move |(input, frontier), output| {
1470 input.for_each(|cap, data| {
1472 output.session(&cap).give_container(data);
1473 });
1474
1475 if hydrated {
1476 return;
1477 }
1478
1479 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1480 hydrated = true;
1481
1482 for &export_id in &export_ids {
1483 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1484 export_id,
1485 lir_id,
1486 hydrated,
1487 }));
1488 }
1489 }
1490 }
1491 })
1492 }
1493}
1494
1495#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1498 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1503 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1505 fn event_time(&self) -> mz_repr::Timestamp;
1507 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1509 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1511 fn step_back(&self) -> Self;
1514}
1515
1516impl RenderTimestamp for mz_repr::Timestamp {
1517 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1518 self
1519 }
1520 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1521 delay
1522 }
1523 fn event_time(&self) -> mz_repr::Timestamp {
1524 *self
1525 }
1526 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1527 self
1528 }
1529 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1530 delay
1531 }
1532 fn step_back(&self) -> Self {
1533 self.saturating_sub(1)
1534 }
1535}
1536
1537impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1538 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1539 &mut self.outer
1540 }
1541 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1542 Product::new(delay, Default::default())
1543 }
1544 fn event_time(&self) -> mz_repr::Timestamp {
1545 self.outer
1546 }
1547 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1548 &mut self.outer
1549 }
1550 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1551 Product::new(delay, Default::default())
1552 }
1553 fn step_back(&self) -> Self {
1554 let inner = self.inner.clone();
1558 let mut vec = inner.into_inner();
1559 for item in vec.iter_mut() {
1560 *item = item.saturating_sub(1);
1561 }
1562 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1563 }
1564}
1565
1566#[derive(Clone)]
1576pub(crate) struct StartSignal {
1577 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1582 token_ref: Weak<RefCell<Box<dyn Any>>>,
1584}
1585
1586impl StartSignal {
1587 pub fn new() -> (Self, Rc<dyn Any>) {
1590 let (tx, rx) = oneshot::channel::<Infallible>();
1591 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1592 let signal = Self {
1593 fut: rx.shared(),
1594 token_ref: Rc::downgrade(&token),
1595 };
1596 (signal, token)
1597 }
1598
1599 pub fn has_fired(&self) -> bool {
1600 self.token_ref.strong_count() == 0
1601 }
1602
1603 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1604 if let Some(token) = self.token_ref.upgrade() {
1605 let mut token = token.borrow_mut();
1606 let inner = std::mem::replace(&mut *token, Box::new(()));
1607 *token = Box::new((inner, to_drop));
1608 }
1609 }
1610}
1611
1612impl Future for StartSignal {
1613 type Output = ();
1614
1615 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1616 self.fut.poll_unpin(cx).map(|_| ())
1617 }
1618}
1619
1620pub(crate) trait WithStartSignal {
1622 fn with_start_signal(self, signal: StartSignal) -> Self;
1627}
1628
1629impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1630where
1631 Tr: TraceReader<Time: RenderTimestamp> + Clone,
1632{
1633 fn with_start_signal(self, signal: StartSignal) -> Self {
1634 Arranged {
1635 stream: self.stream.with_start_signal(signal),
1636 trace: self.trace,
1637 }
1638 }
1639}
1640
1641impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1642where
1643 D: timely::Container + Clone + 'static,
1644{
1645 fn with_start_signal(self, signal: StartSignal) -> Self {
1646 let activations = self.scope().activations();
1647 self.unary(Pipeline, "StartSignal", |_cap, info| {
1648 let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1649 signal.drop_on_fire(token);
1650
1651 let mut stash = Vec::new();
1652
1653 move |input, output| {
1654 if !signal.has_fired() {
1656 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1657 return;
1658 }
1659
1660 for (cap, mut data) in std::mem::take(&mut stash) {
1662 output.session(&cap).give_container(&mut data);
1663 }
1664
1665 input.for_each(|cap, data| {
1667 output.session(&cap).give_container(data);
1668 });
1669 }
1670 })
1671 }
1672}
1673
1674fn suppress_early_progress<'scope, T: Timestamp, D>(
1696 stream: Stream<'scope, T, D>,
1697 as_of: Antichain<T>,
1698) -> Stream<'scope, T, D>
1699where
1700 D: Data + timely::Container,
1701{
1702 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1703 let mut early_cap = Some(default_cap);
1704
1705 move |(input, frontier), output| {
1706 input.for_each_time(|data_cap, data| {
1707 if as_of.less_than(data_cap.time()) {
1708 let mut session = output.session(&data_cap);
1709 for data in data {
1710 session.give_container(data);
1711 }
1712 } else {
1713 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1714 let mut session = output.session(&cap);
1715 for data in data {
1716 session.give_container(data);
1717 }
1718 }
1719 });
1720
1721 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1722 early_cap.take();
1723 }
1724 }
1725 })
1726}
1727
1728trait LimitProgress<T: Timestamp> {
1730 fn limit_progress(
1758 self,
1759 handle: MzProbeHandle<T>,
1760 slack_ms: u64,
1761 limit: Option<usize>,
1762 upper: Antichain<T>,
1763 name: String,
1764 ) -> Self;
1765}
1766
1767impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1770 for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1771where
1772 D: Clone + 'static,
1773 R: Clone + 'static,
1774{
1775 fn limit_progress(
1776 self,
1777 handle: MzProbeHandle<mz_repr::Timestamp>,
1778 slack_ms: u64,
1779 limit: Option<usize>,
1780 upper: Antichain<mz_repr::Timestamp>,
1781 name: String,
1782 ) -> Self {
1783 let scope = self.scope();
1784 let stream =
1785 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1786 let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1788 let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1790
1791 let activator = scope.activator_for(info.address);
1792 handle.activate(activator.clone());
1793
1794 move |(input, frontier), output| {
1795 input.for_each(|cap, data| {
1796 for time in data
1797 .iter()
1798 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1799 {
1800 let rounded_time =
1801 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1802 if !upper.less_than(&rounded_time.into()) {
1803 pending_times.insert(rounded_time.into());
1804 }
1805 }
1806 output.session(&cap).give_container(data);
1807 if retained_cap.as_ref().is_none_or(|c| {
1808 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1809 }) {
1810 retained_cap = Some(cap.retain(0));
1811 }
1812 });
1813
1814 handle.with_frontier(|f| {
1815 while pending_times
1816 .first()
1817 .map_or(false, |retained_time| !f.less_than(&retained_time))
1818 {
1819 let _ = pending_times.pop_first();
1820 }
1821 });
1822
1823 while limit.map_or(false, |limit| pending_times.len() > limit) {
1824 let _ = pending_times.pop_first();
1825 }
1826
1827 match (retained_cap.as_mut(), pending_times.first()) {
1828 (Some(cap), Some(first)) => cap.downgrade(first),
1829 (_, None) => retained_cap = None,
1830 _ => {}
1831 }
1832
1833 if frontier.is_empty() {
1834 retained_cap = None;
1835 pending_times.clear();
1836 }
1837
1838 if !pending_times.is_empty() {
1839 tracing::debug!(
1840 name,
1841 info.global_id,
1842 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1843 frontier = ?frontier.frontier().get(0),
1844 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1845 ?upper,
1846 "pending times",
1847 );
1848 }
1849 }
1850 });
1851 stream
1852 }
1853}
1854
1855struct PendingTimesDisplay<T>(T);
1858
1859impl<T> std::fmt::Display for PendingTimesDisplay<T>
1860where
1861 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1862{
1863 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1864 let mut iter = self.0.clone().into_iter();
1865 write!(f, "[")?;
1866 if let Some(first) = iter.next() {
1867 write!(f, "{}", first)?;
1868 let mut last = u64::from(first);
1869 for time in iter {
1870 write!(f, ", +{}", u64::from(time) - last)?;
1871 last = u64::from(time);
1872 }
1873 }
1874 write!(f, "]")?;
1875 Ok(())
1876 }
1877}
1878
1879#[derive(Clone, Copy, Debug)]
1882struct Pairer {
1883 split_arity: usize,
1884}
1885
1886impl Pairer {
1887 fn new(split_arity: usize) -> Self {
1889 Self { split_arity }
1890 }
1891
1892 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1894 where
1895 I1: IntoIterator<Item = Datum<'a>>,
1896 I2: IntoIterator<Item = Datum<'a>>,
1897 {
1898 SharedRow::pack(first.into_iter().chain(second))
1899 }
1900
1901 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1903 let mut datum_iter = datum_iter.into_iter();
1904 let mut row_builder = SharedRow::get();
1905 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1906 let second = row_builder.pack_using(datum_iter);
1907 (first, second)
1908 }
1909}