1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::Variable;
117use differential_dataflow::trace::{BatchReader, TraceReader};
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 SUBSCRIBE_SNAPSHOT_OPTIMIZATION,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_timely_util::operator::{CollectionExt, StreamExt};
138use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
139use mz_timely_util::scope_label::ScopeExt;
140use timely::PartialOrder;
141use timely::container::CapacityContainerBuilder;
142use timely::dataflow::channels::pact::Pipeline;
143use timely::dataflow::operators::vec::ToStream;
144use timely::dataflow::operators::vec::{BranchWhen, Filter};
145use timely::dataflow::operators::{Capability, Operator, Probe, probe};
146use timely::dataflow::{Scope, Stream, StreamVec};
147use timely::order::{Product, TotalOrder};
148use timely::progress::timestamp::Refines;
149use timely::progress::{Antichain, Timestamp};
150use timely::scheduling::ActivateOnDrop;
151use timely::worker::Worker as TimelyWorker;
152
153use crate::arrangement::manager::TraceBundle;
154use crate::compute_state::ComputeState;
155use crate::extensions::arrange::{KeyCollection, MzArrange};
156use crate::extensions::reduce::MzReduce;
157use crate::extensions::temporal_bucket::TemporalBucketing;
158use crate::logging::compute::{
159 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
160};
161use crate::render::context::{ArrangementFlavor, Context};
162use crate::render::errors::DataflowErrorSer;
163use crate::row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
164use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
165
166pub mod context;
167pub(crate) mod errors;
168mod flat_map;
169mod join;
170mod reduce;
171pub mod sinks;
172mod threshold;
173mod top_k;
174
175pub use context::CollectionBundle;
176pub use join::LinearJoinSpec;
177
178pub fn build_compute_dataflow(
184 timely_worker: &mut TimelyWorker,
185 compute_state: &mut ComputeState,
186 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
187 start_signal: StartSignal,
188 until: Antichain<mz_repr::Timestamp>,
189 dataflow_expiration: Antichain<mz_repr::Timestamp>,
190) {
191 let recursive = dataflow
193 .objects_to_build
194 .iter()
195 .any(|object| object.plan.is_recursive());
196
197 let indexes = dataflow
199 .index_exports
200 .iter()
201 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
202 .collect::<Vec<_>>();
203
204 let sinks = dataflow
206 .sink_exports
207 .iter()
208 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
209 .collect::<Vec<_>>();
210
211 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
212 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
213 let subscribe_snapshot_optimization =
214 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
215
216 let name = format!("Dataflow: {}", &dataflow.debug_name);
217 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
218 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
219
220 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
221 let scope = scope.with_label();
222
223 let mut imported_sources = Vec::new();
228 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
229 let output_probe = MzProbeHandle::default();
230
231 scope.clone().region_named(&input_name, |region| {
232 for (source_id, import) in dataflow.source_imports.iter() {
234 region.region_named(&format!("Source({:?})", source_id), |inner| {
235 let mut read_schema = None;
236 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
237 if apply_demands {
240 let demands = ops.demand();
241 let new_desc = import
242 .desc
243 .storage_metadata
244 .relation_desc
245 .apply_demand(&demands);
246 let new_arity = demands.len();
247 let remap: BTreeMap<_, _> = demands
248 .into_iter()
249 .enumerate()
250 .map(|(new, old)| (old, new))
251 .collect();
252 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
253 read_schema = Some(new_desc);
254 }
255
256 mz_expr::MfpPlan::create_from(ops)
257 .expect("Linear operators should always be valid")
258 });
259
260 let snapshot_mode = if import.with_snapshot || !subscribe_snapshot_optimization
261 {
262 SnapshotMode::Include
263 } else {
264 compute_state.metrics.inc_subscribe_snapshot_optimization();
265 SnapshotMode::Exclude
266 };
267 let suppress_early_progress_as_of = dataflow.as_of.clone();
268
269 let (mut ok_stream, err_stream, token) =
272 persist_source::persist_source::<DataflowErrorSer>(
273 inner,
274 *source_id,
275 Arc::clone(&compute_state.persist_clients),
276 &compute_state.txns_ctx,
277 import.desc.storage_metadata.clone(),
278 read_schema,
279 dataflow.as_of.clone(),
280 snapshot_mode,
281 until.clone(),
282 mfp.as_mut(),
283 compute_state.dataflow_max_inflight_bytes(),
284 start_signal.clone().into_send_future(),
285 ErrorHandler::Halt("compute_import"),
286 );
287
288 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
291
292 if let Some(as_of) = suppress_early_progress_as_of {
296 ok_stream = suppress_early_progress(ok_stream, as_of);
297 }
298
299 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
300 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
302 .get(&compute_state.worker_config);
303 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
304 .get(&compute_state.worker_config)
305 .as_millis()
306 .try_into()
307 .expect("must fit");
308
309 let stream = ok_stream.limit_progress(
310 output_probe.clone(),
311 slack,
312 limit,
313 import.upper.clone(),
314 name.clone(),
315 );
316 ok_stream = stream;
317 }
318
319 let input_probe =
321 compute_state.input_probe_for(*source_id, dataflow.export_ids());
322 ok_stream = ok_stream.probe_with(&input_probe);
323
324 let (oks, errs) = (
325 ok_stream
326 .as_collection()
327 .leave_region(region)
328 .leave_region(scope),
329 err_stream
330 .as_collection()
331 .leave_region(region)
332 .leave_region(scope),
333 );
334
335 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
336
337 tokens.insert(*source_id, Rc::new(token));
339 });
340 }
341 });
342
343 if recursive {
346 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
347 let mut context = Context::for_dataflow_in(
348 &dataflow,
349 region.clone(),
350 compute_state,
351 until,
352 dataflow_expiration,
353 );
354
355 for (id, (oks, errs)) in imported_sources.into_iter() {
356 let bundle = crate::render::CollectionBundle::from_collections(
357 oks.enter(region),
358 errs.enter(region),
359 );
360 context.insert_id(id, bundle);
362 }
363
364 for (idx_id, idx) in &dataflow.index_imports {
366 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
367 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
368 SnapshotMode::Include
369 } else {
370 compute_state.metrics.inc_subscribe_snapshot_optimization();
371 SnapshotMode::Exclude
372 };
373 context.import_index(
374 scope,
375 compute_state,
376 &mut tokens,
377 input_probe,
378 *idx_id,
379 &idx.desc,
380 &idx.typ,
381 snapshot_mode,
382 start_signal.clone(),
383 );
384 }
385
386 for object in dataflow.objects_to_build {
388 let bundle = context.scope.clone().region_named(
389 &format!("BuildingObject({:?})", object.id),
390 |region| {
391 let depends = object.plan.depends();
392 let in_let = object.plan.is_recursive();
393 context
394 .enter_region(region, Some(&depends))
395 .render_recursive_plan(
396 object.id,
397 0,
398 object.plan,
399 BindingInfo::Body { in_let },
401 )
402 .leave_region(context.scope)
403 },
404 );
405 let global_id = object.id;
406
407 context.log_dataflow_global_id(
408 *bundle
409 .scope()
410 .addr()
411 .first()
412 .expect("Dataflow root id must exist"),
413 global_id,
414 );
415 context.insert_id(Id::Global(object.id), bundle);
416 }
417
418 for (idx_id, dependencies, idx) in indexes {
420 context.export_index_iterative(
421 scope,
422 compute_state,
423 &tokens,
424 dependencies,
425 idx_id,
426 &idx,
427 &output_probe,
428 );
429 }
430
431 for (sink_id, dependencies, sink) in sinks {
433 context.export_sink(
434 compute_state,
435 &tokens,
436 dependencies,
437 sink_id,
438 &sink,
439 start_signal.clone(),
440 &output_probe,
441 scope,
442 );
443 }
444 });
445 } else {
446 scope.clone().region_named(&build_name, |region| {
447 let mut context = Context::for_dataflow_in(
448 &dataflow,
449 region.clone(),
450 compute_state,
451 until,
452 dataflow_expiration,
453 );
454
455 for (id, (oks, errs)) in imported_sources.into_iter() {
456 let bundle = crate::render::CollectionBundle::from_collections(
457 oks.enter_region(region),
458 errs.enter_region(region),
459 );
460 context.insert_id(id, bundle);
462 }
463
464 for (idx_id, idx) in &dataflow.index_imports {
466 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
467 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
468 SnapshotMode::Include
469 } else {
470 compute_state.metrics.inc_subscribe_snapshot_optimization();
471 SnapshotMode::Exclude
472 };
473 context.import_index(
474 scope,
475 compute_state,
476 &mut tokens,
477 input_probe,
478 *idx_id,
479 &idx.desc,
480 &idx.typ,
481 snapshot_mode,
482 start_signal.clone(),
483 );
484 }
485
486 for object in dataflow.objects_to_build {
488 let bundle = context.scope.clone().region_named(
489 &format!("BuildingObject({:?})", object.id),
490 |region| {
491 let depends = object.plan.depends();
492 context
493 .enter_region(region, Some(&depends))
494 .render_plan(object.id, object.plan)
495 .leave_region(context.scope)
496 },
497 );
498 let global_id = object.id;
499 context.log_dataflow_global_id(
500 *bundle
501 .scope()
502 .addr()
503 .first()
504 .expect("Dataflow root id must exist"),
505 global_id,
506 );
507 context.insert_id(Id::Global(object.id), bundle);
508 }
509
510 for (idx_id, dependencies, idx) in indexes {
512 context.export_index(
513 compute_state,
514 &tokens,
515 dependencies,
516 idx_id,
517 &idx,
518 &output_probe,
519 );
520 }
521
522 for (sink_id, dependencies, sink) in sinks {
524 context.export_sink(
525 compute_state,
526 &tokens,
527 dependencies,
528 sink_id,
529 &sink,
530 start_signal.clone(),
531 &output_probe,
532 scope,
533 );
534 }
535 });
536 }
537 });
538}
539
540impl<'g, T> Context<'g, T>
543where
544 T: Refines<mz_repr::Timestamp> + RenderTimestamp,
545{
546 fn import_filtered_index_collection<
550 'outer,
551 Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
552 V: Data,
553 >(
554 &self,
555 arranged: Arranged<'outer, Tr>,
556 start_signal: StartSignal,
557 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
558 ) -> VecCollection<'g, T, V, Tr::Diff>
559 where
560 mz_repr::Timestamp: TotalOrder,
563 {
564 let oks = arranged.stream.with_start_signal(start_signal).filter({
565 let as_of = self.as_of_frontier.clone();
566 move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
567 });
568 Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
569 }
570
571 pub(crate) fn import_index<'outer>(
572 &mut self,
573 outer: Scope<'outer, mz_repr::Timestamp>,
574 compute_state: &mut ComputeState,
575 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
576 input_probe: probe::Handle<mz_repr::Timestamp>,
577 idx_id: GlobalId,
578 idx: &IndexDesc,
579 typ: &ReprRelationType,
580 snapshot_mode: SnapshotMode,
581 start_signal: StartSignal,
582 ) {
583 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
584 assert!(
585 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
586 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
587 );
588
589 let token = traces.to_drop().clone();
590
591 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
592 outer,
593 &format!("Index({}, {:?})", idx.on_id, idx.key),
594 self.as_of_frontier.clone(),
595 self.until.clone(),
596 );
597
598 oks.stream = oks.stream.probe_with(&input_probe);
599
600 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
601 outer,
602 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
603 self.as_of_frontier.clone(),
604 self.until.clone(),
605 );
606
607 let bundle = match snapshot_mode {
608 SnapshotMode::Include => {
609 let ok_arranged = oks
610 .enter(self.scope)
611 .with_start_signal(start_signal.clone());
612 let err_arranged = err_arranged
613 .enter(self.scope)
614 .with_start_signal(start_signal);
615 CollectionBundle::from_expressions(
616 idx.key.clone(),
617 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
618 )
619 }
620 SnapshotMode::Exclude => {
621 let oks = {
628 let mut datums = DatumVec::new();
629 let (permutation, _thinning) =
630 permutation_for_arrangement(&idx.key, typ.arity());
631 self.import_filtered_index_collection(
632 oks,
633 start_signal.clone(),
634 move |k: DatumSeq, v: DatumSeq| {
635 let mut datums_borrow = datums.borrow();
636 datums_borrow.extend(k);
637 datums_borrow.extend(v);
638 SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
639 },
640 )
641 };
642 let errs = self.import_filtered_index_collection(
643 err_arranged,
644 start_signal,
645 |e, _| e.clone(),
646 );
647 CollectionBundle::from_collections(oks, errs)
648 }
649 };
650 self.update_id(Id::Global(idx.on_id), bundle);
651 tokens.insert(
652 idx_id,
653 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
654 );
655 } else {
656 panic!(
657 "import of index {} failed while building dataflow {}",
658 idx_id, self.dataflow_id
659 );
660 }
661 }
662}
663
664impl<'g> Context<'g, mz_repr::Timestamp> {
667 pub(crate) fn export_index(
668 &self,
669 compute_state: &mut ComputeState,
670 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
671 dependency_ids: BTreeSet<GlobalId>,
672 idx_id: GlobalId,
673 idx: &IndexDesc,
674 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
675 ) {
676 let mut needed_tokens = Vec::new();
678 for dep_id in dependency_ids {
679 if let Some(token) = tokens.get(&dep_id) {
680 needed_tokens.push(Rc::clone(token));
681 }
682 }
683 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
684 panic!(
685 "Arrangement alarmingly absent! id: {:?}",
686 Id::Global(idx_id)
687 )
688 });
689
690 match bundle.arrangement(&idx.key) {
691 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
692 if let Some(&expiration) = self.dataflow_expiration.as_option() {
695 oks.stream = oks.stream.expire_stream_at(
696 &format!("{}_export_index_oks", self.debug_name),
697 expiration,
698 );
699 errs.stream = errs.stream.expire_stream_at(
700 &format!("{}_export_index_errs", self.debug_name),
701 expiration,
702 );
703 }
704
705 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
706
707 if let Some(logger) = compute_state.compute_logger.clone() {
709 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
710 }
711
712 compute_state.traces.set(
713 idx_id,
714 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
715 );
716 }
717 Some(ArrangementFlavor::Trace(gid, _, _)) => {
718 let trace = compute_state.traces.get(&gid).unwrap().clone();
721 compute_state.traces.set(idx_id, trace);
722 }
723 None => {
724 println!("collection available: {:?}", bundle.collection.is_none());
725 println!(
726 "keys available: {:?}",
727 bundle.arranged.keys().collect::<Vec<_>>()
728 );
729 panic!(
730 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
731 Id::Global(idx_id),
732 &idx.key
733 );
734 }
735 };
736 }
737}
738
739impl<'g, T> Context<'g, T>
742where
743 T: RenderTimestamp,
744{
745 pub(crate) fn export_index_iterative<'outer>(
746 &self,
747 outer: Scope<'outer, mz_repr::Timestamp>,
748 compute_state: &mut ComputeState,
749 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
750 dependency_ids: BTreeSet<GlobalId>,
751 idx_id: GlobalId,
752 idx: &IndexDesc,
753 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
754 ) {
755 let mut needed_tokens = Vec::new();
757 for dep_id in dependency_ids {
758 if let Some(token) = tokens.get(&dep_id) {
759 needed_tokens.push(Rc::clone(token));
760 }
761 }
762 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
763 panic!(
764 "Arrangement alarmingly absent! id: {:?}",
765 Id::Global(idx_id)
766 )
767 });
768
769 match bundle.arrangement(&idx.key) {
770 Some(ArrangementFlavor::Local(oks, errs)) => {
771 let mut oks = oks
775 .as_collection(|k, v| (k.to_row(), v.to_row()))
776 .leave(outer)
777 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
778 "Arrange export iterative",
779 );
780
781 let mut errs = errs
782 .as_collection(|k, v| (k.clone(), v.clone()))
783 .leave(outer)
784 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
785 "Arrange export iterative err",
786 );
787
788 if let Some(&expiration) = self.dataflow_expiration.as_option() {
791 oks.stream = oks.stream.expire_stream_at(
792 &format!("{}_export_index_iterative_oks", self.debug_name),
793 expiration,
794 );
795 errs.stream = errs.stream.expire_stream_at(
796 &format!("{}_export_index_iterative_err", self.debug_name),
797 expiration,
798 );
799 }
800
801 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
802
803 if let Some(logger) = compute_state.compute_logger.clone() {
805 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
806 }
807
808 compute_state.traces.set(
809 idx_id,
810 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
811 );
812 }
813 Some(ArrangementFlavor::Trace(gid, _, _)) => {
814 let trace = compute_state.traces.get(&gid).unwrap().clone();
817 compute_state.traces.set(idx_id, trace);
818 }
819 None => {
820 println!("collection available: {:?}", bundle.collection.is_none());
821 println!(
822 "keys available: {:?}",
823 bundle.arranged.keys().collect::<Vec<_>>()
824 );
825 panic!(
826 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
827 Id::Global(idx_id),
828 &idx.key
829 );
830 }
831 };
832 }
833}
834
835enum BindingInfo {
841 Body { in_let: bool },
842 Let { id: LocalId, last: bool },
843 LetRec { id: LocalId, last: bool },
844}
845
846impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
847 fn render_recursive_plan(
860 &mut self,
861 object_id: GlobalId,
862 level: usize,
863 plan: RenderPlan,
864 binding: BindingInfo,
865 ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
866 for BindStage { lets, recs } in plan.binds {
867 let mut let_iter = lets.into_iter().peekable();
869 while let Some(LetBind { id, value }) = let_iter.next() {
870 let bundle =
871 self.scope
872 .clone()
873 .region_named(&format!("Binding({:?})", id), |region| {
874 let depends = value.depends();
875 let last = let_iter.peek().is_none();
876 let binding = BindingInfo::Let { id, last };
877 self.enter_region(region, Some(&depends))
878 .render_letfree_plan(object_id, value, binding)
879 .leave_region(self.scope)
880 });
881 self.insert_id(Id::Local(id), bundle);
882 }
883
884 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
885
886 let mut variables = BTreeMap::new();
890 for id in rec_ids.iter() {
891 use differential_dataflow::dynamic::feedback_summary;
892 let inner = feedback_summary::<u64>(level + 1, 1);
893 let (oks_v, oks_collection) =
894 Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
895 let (err_v, err_collection) =
896 Variable::new(self.scope, Product::new(Default::default(), inner));
897
898 self.insert_id(
899 Id::Local(*id),
900 CollectionBundle::from_collections(oks_collection, err_collection),
901 );
902 variables.insert(Id::Local(*id), (oks_v, err_v));
903 }
904 let mut rec_iter = recs.into_iter().peekable();
906 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
907 let last = rec_iter.peek().is_none();
908 let binding = BindingInfo::LetRec { id, last };
909 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
910 let (oks, mut err) = bundle.collection.clone().unwrap();
913 self.insert_id(Id::Local(id), bundle);
914 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
915
916 let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
918 oks,
919 "LetRecConsolidation",
920 );
921
922 if let Some(limit) = limit {
923 let (in_limit, over_limit) =
926 oks.inner.branch_when(move |Product { inner: ps, .. }| {
927 let iteration_index = *ps.get(level).unwrap_or(&0);
929 iteration_index + 1 >= limit.max_iters.into()
931 });
932 oks = VecCollection::new(in_limit);
933 if !limit.return_at_limit {
934 err = err.concat(VecCollection::new(over_limit).map(move |_data| {
935 DataflowErrorSer::from(EvalError::LetRecLimitExceeded(
936 format!("{}", limit.max_iters.get()).into(),
937 ))
938 }));
939 }
940 }
941
942 let err: KeyCollection<_, _, _> = err.into();
948 let errs = err
949 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
950 "Arrange recursive err",
951 )
952 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
953 "Distinct recursive err",
954 move |_k, _s, t| t.push(((), Diff::ONE)),
955 )
956 .as_collection(|k, _| k.clone());
957
958 oks_v.set(oks);
959 err_v.set(errs);
960 }
961 for id in rec_ids.into_iter() {
963 let bundle = self.remove_id(Id::Local(id)).unwrap();
964 let (oks, err) = bundle.collection.unwrap();
965 self.insert_id(
966 Id::Local(id),
967 CollectionBundle::from_collections(
968 oks.leave_dynamic(level + 1),
969 err.leave_dynamic(level + 1),
970 ),
971 );
972 }
973 }
974
975 self.render_letfree_plan(object_id, plan.body, binding)
976 }
977}
978
979impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
980 fn render_plan(
991 &mut self,
992 object_id: GlobalId,
993 plan: RenderPlan,
994 ) -> CollectionBundle<'scope, T> {
995 let mut in_let = false;
996 for BindStage { lets, recs } in plan.binds {
997 assert!(recs.is_empty());
998
999 let mut let_iter = lets.into_iter().peekable();
1000 while let Some(LetBind { id, value }) = let_iter.next() {
1001 in_let = true;
1003 let bundle =
1004 self.scope
1005 .clone()
1006 .region_named(&format!("Binding({:?})", id), |region| {
1007 let depends = value.depends();
1008 let last = let_iter.peek().is_none();
1009 let binding = BindingInfo::Let { id, last };
1010 self.enter_region(region, Some(&depends))
1011 .render_letfree_plan(object_id, value, binding)
1012 .leave_region(self.scope)
1013 });
1014 self.insert_id(Id::Local(id), bundle);
1015 }
1016 }
1017
1018 self.scope.clone().region_named("Main Body", |region| {
1019 let depends = plan.body.depends();
1020 self.enter_region(region, Some(&depends))
1021 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1022 .leave_region(self.scope)
1023 })
1024 }
1025
1026 fn render_letfree_plan(
1028 &self,
1029 object_id: GlobalId,
1030 plan: LetFreePlan,
1031 binding: BindingInfo,
1032 ) -> CollectionBundle<'scope, T> {
1033 let (mut nodes, root_id, topological_order) = plan.destruct();
1034
1035 let mut collections = BTreeMap::new();
1037
1038 let should_compute_lir_metadata = self.compute_logger.is_some();
1044 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1045 Some(Vec::with_capacity(nodes.len()))
1046 } else {
1047 None
1048 };
1049
1050 let mut topo_iter = topological_order.into_iter().peekable();
1051 while let Some(lir_id) = topo_iter.next() {
1052 let node = nodes.remove(&lir_id).unwrap();
1053
1054 let metadata = if should_compute_lir_metadata {
1058 let operator = node.expr.humanize(&DummyHumanizer);
1059
1060 let operator = if topo_iter.peek().is_none() {
1062 match &binding {
1063 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1064 BindingInfo::Body { in_let: false } => operator,
1065 BindingInfo::Let { id, last: true } => {
1066 format!("With {id} = {operator}")
1067 }
1068 BindingInfo::Let { id, last: false } => {
1069 format!("{id} = {operator}")
1070 }
1071 BindingInfo::LetRec { id, last: true } => {
1072 format!("With Recursive {id} = {operator}")
1073 }
1074 BindingInfo::LetRec { id, last: false } => {
1075 format!("{id} = {operator}")
1076 }
1077 }
1078 } else {
1079 operator
1080 };
1081
1082 let operator_id_start = self.scope.worker().peek_identifier();
1083 Some((operator, operator_id_start))
1084 } else {
1085 None
1086 };
1087
1088 let mut bundle = self.render_plan_expr(node.expr, &collections);
1089
1090 if let Some((operator, operator_id_start)) = metadata {
1091 let operator_id_end = self.scope.worker().peek_identifier();
1092 let operator_span = (operator_id_start, operator_id_end);
1093
1094 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1095 lir_mapping_metadata.push((
1096 lir_id,
1097 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1098 ))
1099 }
1100 }
1101
1102 self.log_operator_hydration(&mut bundle, lir_id);
1103
1104 collections.insert(lir_id, bundle);
1105 }
1106
1107 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1108 self.log_lir_mapping(object_id, lir_mapping_metadata);
1109 }
1110
1111 collections
1112 .remove(&root_id)
1113 .expect("LetFreePlan invariant (1)")
1114 }
1115
1116 fn render_plan_expr(
1123 &self,
1124 expr: render_plan::Expr,
1125 collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1126 ) -> CollectionBundle<'scope, T> {
1127 use render_plan::Expr::*;
1128
1129 let expect_input = |id| {
1130 collections
1131 .get(&id)
1132 .cloned()
1133 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1134 };
1135
1136 match expr {
1137 Constant { rows } => {
1138 let (rows, errs) = match rows {
1140 Ok(rows) => (rows, Vec::new()),
1141 Err(e) => (Vec::new(), vec![e]),
1142 };
1143
1144 let as_of_frontier = self.as_of_frontier.clone();
1146 let until = self.until.clone();
1147 let ok_collection = rows
1148 .into_iter()
1149 .filter_map(move |(row, mut time, diff)| {
1150 time.advance_by(as_of_frontier.borrow());
1151 if !until.less_equal(&time) {
1152 Some((
1153 row,
1154 <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1155 diff,
1156 ))
1157 } else {
1158 None
1159 }
1160 })
1161 .to_stream(self.scope)
1162 .as_collection();
1163
1164 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1165 error_time.advance_by(self.as_of_frontier.borrow());
1166 let err_collection = errs
1167 .into_iter()
1168 .map(move |e| {
1169 (
1170 DataflowErrorSer::from(e),
1171 <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1172 Diff::ONE,
1173 )
1174 })
1175 .to_stream(self.scope)
1176 .as_collection();
1177
1178 CollectionBundle::from_collections(ok_collection, err_collection)
1179 }
1180 Get { id, keys, plan } => {
1181 let mut collection = self
1184 .lookup_id(id)
1185 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1186 match plan {
1187 mz_compute_types::plan::GetPlan::PassArrangements => {
1188 assert!(
1190 keys.arranged
1191 .iter()
1192 .all(|(key, _, _)| collection.arranged.contains_key(key))
1193 );
1194 assert!(keys.raw <= collection.collection.is_some());
1195 collection.arranged.retain(|key, _value| {
1197 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1198 });
1199 collection
1200 }
1201 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1202 let (oks, errs) = collection.as_collection_core(
1203 mfp,
1204 Some((key, row)),
1205 self.until.clone(),
1206 &self.config_set,
1207 );
1208 CollectionBundle::from_collections(oks, errs)
1209 }
1210 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1211 let (oks, errs) = collection.as_collection_core(
1212 mfp,
1213 None,
1214 self.until.clone(),
1215 &self.config_set,
1216 );
1217 CollectionBundle::from_collections(oks, errs)
1218 }
1219 }
1220 }
1221 Mfp {
1222 input,
1223 mfp,
1224 input_key_val,
1225 } => {
1226 let input = expect_input(input);
1227 if mfp.is_identity() {
1229 input
1230 } else {
1231 let (oks, errs) = input.as_collection_core(
1232 mfp,
1233 input_key_val,
1234 self.until.clone(),
1235 &self.config_set,
1236 );
1237 CollectionBundle::from_collections(oks, errs)
1238 }
1239 }
1240 FlatMap {
1241 input_key,
1242 input,
1243 exprs,
1244 func,
1245 mfp_after: mfp,
1246 } => {
1247 let input = expect_input(input);
1248 self.render_flat_map(input_key, input, exprs, func, mfp)
1249 }
1250 Join { inputs, plan } => {
1251 let inputs = inputs.into_iter().map(expect_input).collect();
1252 match plan {
1253 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1254 self.render_join(inputs, linear_plan)
1255 }
1256 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1257 self.render_delta_join(inputs, delta_plan)
1258 }
1259 }
1260 }
1261 Reduce {
1262 input_key,
1263 input,
1264 key_val_plan,
1265 plan,
1266 mfp_after,
1267 } => {
1268 let input = expect_input(input);
1269 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1270 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1271 }
1272 TopK { input, top_k_plan } => {
1273 let input = expect_input(input);
1274 self.render_topk(input, top_k_plan)
1275 }
1276 Negate { input } => {
1277 let input = expect_input(input);
1278 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1279 CollectionBundle::from_collections(oks.negate(), errs)
1280 }
1281 Threshold {
1282 input,
1283 threshold_plan,
1284 } => {
1285 let input = expect_input(input);
1286 self.render_threshold(input, threshold_plan)
1287 }
1288 Union {
1289 inputs,
1290 consolidate_output,
1291 } => {
1292 let mut oks = Vec::new();
1293 let mut errs = Vec::new();
1294 for input in inputs.into_iter() {
1295 let (os, es) =
1296 expect_input(input).as_specific_collection(None, &self.config_set);
1297 oks.push(os);
1298 errs.push(es);
1299 }
1300 let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1301 if consolidate_output {
1302 oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1303 oks,
1304 "UnionConsolidation",
1305 )
1306 }
1307 let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1308 CollectionBundle::from_collections(oks, errs)
1309 }
1310 ArrangeBy {
1311 input_key,
1312 input,
1313 input_mfp,
1314 forms: keys,
1315 strategy,
1316 } => {
1317 let input = expect_input(input);
1318 input.ensure_collections(
1319 keys,
1320 input_key,
1321 input_mfp,
1322 self.as_of_frontier.clone(),
1323 self.until.clone(),
1324 &self.config_set,
1325 strategy,
1326 )
1327 }
1328 }
1329 }
1330
1331 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1332 if let Some(logger) = &self.compute_logger {
1333 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1334 dataflow_index,
1335 global_id,
1336 }));
1337 }
1338 }
1339
1340 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1341 if let Some(logger) = &self.compute_logger {
1342 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1343 }
1344 }
1345
1346 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1347 match bundle.arranged.values_mut().next() {
1367 Some(arrangement) => {
1368 use ArrangementFlavor::*;
1369
1370 match arrangement {
1371 Local(a, _) => {
1372 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1373 }
1374 Trace(_, a, _) => {
1375 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1376 }
1377 }
1378 }
1379 None => {
1380 let (oks, _) = bundle
1381 .collection
1382 .as_mut()
1383 .expect("CollectionBundle invariant");
1384 let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1385 *oks = stream.as_collection();
1386 }
1387 }
1388 }
1389
1390 fn log_operator_hydration_inner<D>(
1391 &self,
1392 stream: Stream<'scope, T, D>,
1393 lir_id: LirId,
1394 ) -> Stream<'scope, T, D>
1395 where
1396 D: timely::Container + Clone + 'static,
1397 {
1398 let Some(logger) = self.compute_logger.clone() else {
1399 return stream.clone(); };
1401
1402 let export_ids = self.export_ids.clone();
1403
1404 let mut hydration_frontier = Antichain::new();
1412 for time in self.as_of_frontier.iter() {
1413 if let Some(time) = time.try_step_forward() {
1414 hydration_frontier.insert(Refines::to_inner(time));
1415 }
1416 }
1417
1418 let name = format!("LogOperatorHydration ({lir_id})");
1419 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1420 let mut hydrated = false;
1421
1422 for &export_id in &export_ids {
1423 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1424 export_id,
1425 lir_id,
1426 hydrated,
1427 }));
1428 }
1429
1430 move |(input, frontier), output| {
1431 input.for_each(|cap, data| {
1433 output.session(&cap).give_container(data);
1434 });
1435
1436 if hydrated {
1437 return;
1438 }
1439
1440 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1441 hydrated = true;
1442
1443 for &export_id in &export_ids {
1444 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1445 export_id,
1446 lir_id,
1447 hydrated,
1448 }));
1449 }
1450 }
1451 }
1452 })
1453 }
1454}
1455
1456#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1459 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1464 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1466 fn event_time(&self) -> mz_repr::Timestamp;
1468 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1470 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1472 fn step_back(&self) -> Self;
1475}
1476
1477pub trait MaybeBucketByTime: Timestamp {
1484 fn maybe_apply_temporal_bucketing<'scope>(
1485 stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
1486 as_of: Antichain<mz_repr::Timestamp>,
1487 summary: mz_repr::Timestamp,
1488 ) -> VecCollection<'scope, Self, Row, Diff>;
1489}
1490
1491impl RenderTimestamp for mz_repr::Timestamp {
1492 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1493 self
1494 }
1495 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1496 delay
1497 }
1498 fn event_time(&self) -> mz_repr::Timestamp {
1499 *self
1500 }
1501 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1502 self
1503 }
1504 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1505 delay
1506 }
1507 fn step_back(&self) -> Self {
1508 self.saturating_sub(1)
1509 }
1510}
1511
1512impl MaybeBucketByTime for mz_repr::Timestamp {
1513 fn maybe_apply_temporal_bucketing<'scope>(
1514 stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
1515 as_of: Antichain<mz_repr::Timestamp>,
1516 summary: mz_repr::Timestamp,
1517 ) -> VecCollection<'scope, Self, Row, Diff> {
1518 stream
1519 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
1520 .as_collection()
1521 }
1522}
1523
1524impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1525 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1526 &mut self.outer
1527 }
1528 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1529 Product::new(delay, Default::default())
1530 }
1531 fn event_time(&self) -> mz_repr::Timestamp {
1532 self.outer
1533 }
1534 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1535 &mut self.outer
1536 }
1537 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1538 Product::new(delay, Default::default())
1539 }
1540 fn step_back(&self) -> Self {
1541 let inner = self.inner.clone();
1545 let mut vec = inner.into_inner();
1546 for item in vec.iter_mut() {
1547 *item = item.saturating_sub(1);
1548 }
1549 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1550 }
1551}
1552
1553impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
1554 fn maybe_apply_temporal_bucketing<'scope>(
1555 stream: StreamVec<'scope, Self, (Row, Self, Diff)>,
1556 _as_of: Antichain<mz_repr::Timestamp>,
1557 _summary: mz_repr::Timestamp,
1558 ) -> VecCollection<'scope, Self, Row, Diff> {
1559 stream.as_collection()
1561 }
1562}
1563
1564#[derive(Clone)]
1574pub(crate) struct StartSignal {
1575 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1580 token_ref: Weak<RefCell<Box<dyn Any>>>,
1582}
1583
1584impl StartSignal {
1585 pub fn new() -> (Self, Rc<dyn Any>) {
1588 let (tx, rx) = oneshot::channel::<Infallible>();
1589 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1590 let signal = Self {
1591 fut: rx.shared(),
1592 token_ref: Rc::downgrade(&token),
1593 };
1594 (signal, token)
1595 }
1596
1597 pub fn has_fired(&self) -> bool {
1598 self.token_ref.strong_count() == 0
1599 }
1600
1601 pub fn into_send_future(self) -> impl Future<Output = ()> + Send {
1606 use futures::FutureExt;
1607 self.fut.map(|_| ())
1608 }
1609
1610 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1611 if let Some(token) = self.token_ref.upgrade() {
1612 let mut token = token.borrow_mut();
1613 let inner = std::mem::replace(&mut *token, Box::new(()));
1614 *token = Box::new((inner, to_drop));
1615 }
1616 }
1617}
1618
1619impl Future for StartSignal {
1620 type Output = ();
1621
1622 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1623 self.fut.poll_unpin(cx).map(|_| ())
1624 }
1625}
1626
1627pub(crate) trait WithStartSignal {
1629 fn with_start_signal(self, signal: StartSignal) -> Self;
1634}
1635
1636impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1637where
1638 Tr: TraceReader<Time: RenderTimestamp> + Clone,
1639{
1640 fn with_start_signal(self, signal: StartSignal) -> Self {
1641 Arranged {
1642 stream: self.stream.with_start_signal(signal),
1643 trace: self.trace,
1644 }
1645 }
1646}
1647
1648impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1649where
1650 D: timely::Container + Clone + 'static,
1651{
1652 fn with_start_signal(self, signal: StartSignal) -> Self {
1653 let activations = self.scope().activations();
1654 self.unary(Pipeline, "StartSignal", |_cap, info| {
1655 let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1656 signal.drop_on_fire(token);
1657
1658 let mut stash = Vec::new();
1659
1660 move |input, output| {
1661 if !signal.has_fired() {
1663 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1664 return;
1665 }
1666
1667 for (cap, mut data) in std::mem::take(&mut stash) {
1669 output.session(&cap).give_container(&mut data);
1670 }
1671
1672 input.for_each(|cap, data| {
1674 output.session(&cap).give_container(data);
1675 });
1676 }
1677 })
1678 }
1679}
1680
1681fn suppress_early_progress<'scope, T: Timestamp, D>(
1703 stream: Stream<'scope, T, D>,
1704 as_of: Antichain<T>,
1705) -> Stream<'scope, T, D>
1706where
1707 D: Data + timely::Container,
1708{
1709 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1710 let mut early_cap = Some(default_cap);
1711
1712 move |(input, frontier), output| {
1713 input.for_each_time(|data_cap, data| {
1714 if as_of.less_than(data_cap.time()) {
1715 let mut session = output.session(&data_cap);
1716 for data in data {
1717 session.give_container(data);
1718 }
1719 } else {
1720 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1721 let mut session = output.session(&cap);
1722 for data in data {
1723 session.give_container(data);
1724 }
1725 }
1726 });
1727
1728 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1729 early_cap.take();
1730 }
1731 }
1732 })
1733}
1734
1735trait LimitProgress<T: Timestamp> {
1737 fn limit_progress(
1765 self,
1766 handle: MzProbeHandle<T>,
1767 slack_ms: u64,
1768 limit: Option<usize>,
1769 upper: Antichain<T>,
1770 name: String,
1771 ) -> Self;
1772}
1773
1774impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1777 for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1778where
1779 D: Clone + 'static,
1780 R: Clone + 'static,
1781{
1782 fn limit_progress(
1783 self,
1784 handle: MzProbeHandle<mz_repr::Timestamp>,
1785 slack_ms: u64,
1786 limit: Option<usize>,
1787 upper: Antichain<mz_repr::Timestamp>,
1788 name: String,
1789 ) -> Self {
1790 let scope = self.scope();
1791 let stream =
1792 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1793 let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1795 let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1797
1798 let activator = scope.activator_for(info.address);
1799 handle.activate(activator.clone());
1800
1801 move |(input, frontier), output| {
1802 input.for_each(|cap, data| {
1803 for time in data
1804 .iter()
1805 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1806 {
1807 let rounded_time = if slack_ms == 0 {
1811 time
1812 } else {
1813 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms)
1814 };
1815 if !upper.less_than(&rounded_time.into()) {
1816 pending_times.insert(rounded_time.into());
1817 }
1818 }
1819 output.session(&cap).give_container(data);
1820 if retained_cap.as_ref().is_none_or(|c| {
1821 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1822 }) {
1823 retained_cap = Some(cap.retain(0));
1824 }
1825 });
1826
1827 handle.with_frontier(|f| {
1828 while pending_times
1829 .first()
1830 .map_or(false, |retained_time| !f.less_than(&retained_time))
1831 {
1832 let _ = pending_times.pop_first();
1833 }
1834 });
1835
1836 while limit.map_or(false, |limit| pending_times.len() > limit) {
1837 let _ = pending_times.pop_first();
1838 }
1839
1840 match (retained_cap.as_mut(), pending_times.first()) {
1841 (Some(cap), Some(first)) => cap.downgrade(first),
1842 (_, None) => retained_cap = None,
1843 _ => {}
1844 }
1845
1846 if frontier.is_empty() {
1847 retained_cap = None;
1848 pending_times.clear();
1849 }
1850
1851 if !pending_times.is_empty() {
1852 tracing::debug!(
1853 name,
1854 info.global_id,
1855 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1856 frontier = ?frontier.frontier().get(0),
1857 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1858 ?upper,
1859 "pending times",
1860 );
1861 }
1862 }
1863 });
1864 stream
1865 }
1866}
1867
1868struct PendingTimesDisplay<T>(T);
1871
1872impl<T> std::fmt::Display for PendingTimesDisplay<T>
1873where
1874 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1875{
1876 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1877 let mut iter = self.0.clone().into_iter();
1878 write!(f, "[")?;
1879 if let Some(first) = iter.next() {
1880 write!(f, "{}", first)?;
1881 let mut last = u64::from(first);
1882 for time in iter {
1883 write!(f, ", +{}", u64::from(time) - last)?;
1884 last = u64::from(time);
1885 }
1886 }
1887 write!(f, "]")?;
1888 Ok(())
1889 }
1890}
1891
1892#[derive(Clone, Copy, Debug)]
1895struct Pairer {
1896 split_arity: usize,
1897}
1898
1899impl Pairer {
1900 fn new(split_arity: usize) -> Self {
1902 Self { split_arity }
1903 }
1904
1905 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1907 where
1908 I1: IntoIterator<Item = Datum<'a>>,
1909 I2: IntoIterator<Item = Datum<'a>>,
1910 {
1911 SharedRow::pack(first.into_iter().chain(second))
1912 }
1913
1914 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1916 let mut datum_iter = datum_iter.into_iter();
1917 let mut row_builder = SharedRow::get();
1918 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1919 let second = row_builder.pack_using(datum_iter);
1920 (first, second)
1921 }
1922}