1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::arrange::ShutdownButton;
117use differential_dataflow::operators::iterate::Variable;
118use differential_dataflow::trace::{BatchReader, TraceReader};
119use differential_dataflow::{AsCollection, Data, VecCollection};
120use futures::FutureExt;
121use futures::channel::oneshot;
122use itertools::Itertools;
123use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
124use mz_compute_types::dyncfgs::{
125 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
126 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
127 ENABLE_COMPUTE_TEMPORAL_BUCKETING, SUBSCRIBE_SNAPSHOT_OPTIMIZATION, TEMPORAL_BUCKETING_SUMMARY,
128};
129use mz_compute_types::plan::render_plan::{
130 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
131};
132use mz_compute_types::plan::{ArrangementStrategy, LirId};
133use mz_expr::{EvalError, Id, LocalId, permutation_for_arrangement};
134use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
135use mz_repr::explain::DummyHumanizer;
136use mz_repr::fixed_length::ExtendDatums;
137use mz_repr::{Datum, DatumVec, Diff, GlobalId, ReprRelationType, Row, RowArena, SharedRow};
138use mz_storage_operators::persist_source;
139use mz_storage_types::controller::CollectionMetadata;
140use mz_timely_util::columnation::ColumnationChunker;
141use mz_timely_util::operator::{CollectionExt, StreamExt};
142use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
143use mz_timely_util::scope_label::ScopeExt;
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::Pipeline;
147use timely::dataflow::operators::vec::ToStream;
148use timely::dataflow::operators::vec::{BranchWhen, Filter};
149use timely::dataflow::operators::{Capability, Operator, Probe, probe};
150use timely::dataflow::{Scope, Stream, StreamVec};
151use timely::order::{Product, TotalOrder};
152use timely::progress::timestamp::Refines;
153use timely::progress::{Antichain, Timestamp};
154use timely::scheduling::ActivateOnDrop;
155use timely::worker::Worker as TimelyWorker;
156
157use crate::arrangement::manager::TraceBundle;
158use crate::compute_state::ComputeState;
159use crate::extensions::arrange::{KeyCollection, MzArrange};
160use crate::extensions::reduce::MzReduce;
161use crate::extensions::temporal_bucket::TemporalBucketing;
162use crate::logging::compute::{
163 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
164};
165use crate::render::context::{ArrangementFlavor, Context};
166use crate::render::errors::DataflowErrorSer;
167use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
168use mz_row_spine::{DatumSeq, RowRowBatcher, RowRowBuilder};
169
170pub mod context;
171pub(crate) mod errors;
172mod flat_map;
173mod join;
174mod reduce;
175pub mod sinks;
176mod threshold;
177mod top_k;
178
179pub use context::CollectionBundle;
180pub use join::LinearJoinSpec;
181
182struct PressOnDrop<T>(ShutdownButton<T>);
186
187impl<T> Drop for PressOnDrop<T> {
188 fn drop(&mut self) {
189 self.0.press();
190 }
191}
192
193pub fn build_compute_dataflow(
199 timely_worker: &mut TimelyWorker,
200 compute_state: &mut ComputeState,
201 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
202 start_signal: StartSignal,
203 until: Antichain<mz_repr::Timestamp>,
204 dataflow_expiration: Antichain<mz_repr::Timestamp>,
205) {
206 let recursive = dataflow
208 .objects_to_build
209 .iter()
210 .any(|object| object.plan.is_recursive());
211
212 let indexes = dataflow
214 .index_exports
215 .iter()
216 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
217 .collect::<Vec<_>>();
218
219 let sinks = dataflow
221 .sink_exports
222 .iter()
223 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
224 .collect::<Vec<_>>();
225
226 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
227 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
228 let subscribe_snapshot_optimization =
229 SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(&compute_state.worker_config);
230
231 let name = format!("Dataflow: {}", &dataflow.debug_name);
232 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
233 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
234
235 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
236 let scope = scope.with_label();
237
238 let mut imported_sources = Vec::new();
243 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
244 let output_probe = MzProbeHandle::default();
245
246 scope.clone().region_named(&input_name, |region| {
247 for (source_id, import) in dataflow.source_imports.iter() {
249 region.region_named(&format!("Source({:?})", source_id), |inner| {
250 let mut read_schema = None;
251 let mut mfp = import.desc.arguments.operators.clone().map(|mut ops| {
252 if apply_demands {
255 let demands = ops.demand();
256 let new_desc = import
257 .desc
258 .storage_metadata
259 .relation_desc
260 .apply_demand(&demands);
261 let new_arity = demands.len();
262 let remap: BTreeMap<_, _> = demands
263 .into_iter()
264 .enumerate()
265 .map(|(new, old)| (old, new))
266 .collect();
267 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
268 read_schema = Some(new_desc);
269 }
270
271 mz_expr::MfpPlan::create_from(ops)
272 .expect("Linear operators should always be valid")
273 });
274
275 let snapshot_mode = if import.with_snapshot || !subscribe_snapshot_optimization
276 {
277 SnapshotMode::Include
278 } else {
279 compute_state.metrics.inc_subscribe_snapshot_optimization();
280 SnapshotMode::Exclude
281 };
282 let suppress_early_progress_as_of = dataflow.as_of.clone();
283
284 let (mut ok_stream, err_stream, token) =
287 persist_source::persist_source::<DataflowErrorSer>(
288 inner,
289 *source_id,
290 Arc::clone(&compute_state.persist_clients),
291 &compute_state.txns_ctx,
292 import.desc.storage_metadata.clone(),
293 read_schema,
294 dataflow.as_of.clone(),
295 snapshot_mode,
296 until.clone(),
297 mfp.as_mut(),
298 compute_state.dataflow_max_inflight_bytes(),
299 start_signal.clone().into_send_future(),
300 ErrorHandler::Halt("compute_import"),
301 );
302
303 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
306
307 if let Some(as_of) = suppress_early_progress_as_of {
311 ok_stream = suppress_early_progress(ok_stream, as_of);
312 }
313
314 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
315 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
317 .get(&compute_state.worker_config);
318 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
319 .get(&compute_state.worker_config)
320 .as_millis()
321 .try_into()
322 .expect("must fit");
323
324 let stream = ok_stream.limit_progress(
325 output_probe.clone(),
326 slack,
327 limit,
328 import.upper.clone(),
329 name.clone(),
330 );
331 ok_stream = stream;
332 }
333
334 let input_probe =
336 compute_state.input_probe_for(*source_id, dataflow.export_ids());
337 ok_stream = ok_stream.probe_with(&input_probe);
338
339 let (oks, errs) = (
340 ok_stream
341 .as_collection()
342 .leave_region(region)
343 .leave_region(scope),
344 err_stream
345 .as_collection()
346 .leave_region(region)
347 .leave_region(scope),
348 );
349
350 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
351
352 tokens.insert(*source_id, Rc::new(token));
354 });
355 }
356 });
357
358 if recursive {
361 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
362 let mut context = Context::for_dataflow_in(
363 &dataflow,
364 region.clone(),
365 compute_state,
366 until,
367 dataflow_expiration,
368 );
369
370 for (id, (oks, errs)) in imported_sources.into_iter() {
371 let bundle = crate::render::CollectionBundle::from_collections(
372 oks.enter(region),
373 errs.enter(region),
374 );
375 context.insert_id(id, bundle);
377 }
378
379 for (idx_id, idx) in &dataflow.index_imports {
381 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
382 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
383 SnapshotMode::Include
384 } else {
385 compute_state.metrics.inc_subscribe_snapshot_optimization();
386 SnapshotMode::Exclude
387 };
388 context.import_index(
389 scope,
390 compute_state,
391 &mut tokens,
392 input_probe,
393 *idx_id,
394 &idx.desc,
395 &idx.typ,
396 snapshot_mode,
397 start_signal.clone(),
398 );
399 }
400
401 for object in dataflow.objects_to_build {
403 let bundle = context.scope.clone().region_named(
404 &format!("BuildingObject({:?})", object.id),
405 |region| {
406 let depends = object.plan.depends();
407 let in_let = object.plan.is_recursive();
408 context
409 .enter_region(region, Some(&depends))
410 .render_recursive_plan(
411 object.id,
412 0,
413 object.plan,
414 BindingInfo::Body { in_let },
416 )
417 .leave_region(context.scope)
418 },
419 );
420 let global_id = object.id;
421
422 context.log_dataflow_global_id(
423 *bundle
424 .scope()
425 .addr()
426 .first()
427 .expect("Dataflow root id must exist"),
428 global_id,
429 );
430 context.insert_id(Id::Global(object.id), bundle);
431 }
432
433 for (idx_id, dependencies, idx) in indexes {
435 context.export_index_iterative(
436 scope,
437 compute_state,
438 &tokens,
439 dependencies,
440 idx_id,
441 &idx,
442 &output_probe,
443 );
444 }
445
446 for (sink_id, dependencies, sink) in sinks {
448 context.export_sink(
449 compute_state,
450 &tokens,
451 dependencies,
452 sink_id,
453 &sink,
454 start_signal.clone(),
455 &output_probe,
456 scope,
457 );
458 }
459 });
460 } else {
461 scope.clone().region_named(&build_name, |region| {
462 let mut context = Context::for_dataflow_in(
463 &dataflow,
464 region.clone(),
465 compute_state,
466 until,
467 dataflow_expiration,
468 );
469
470 for (id, (oks, errs)) in imported_sources.into_iter() {
471 let bundle = crate::render::CollectionBundle::from_collections(
472 oks.enter_region(region),
473 errs.enter_region(region),
474 );
475 context.insert_id(id, bundle);
477 }
478
479 for (idx_id, idx) in &dataflow.index_imports {
481 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
482 let snapshot_mode = if idx.with_snapshot || !subscribe_snapshot_optimization {
483 SnapshotMode::Include
484 } else {
485 compute_state.metrics.inc_subscribe_snapshot_optimization();
486 SnapshotMode::Exclude
487 };
488 context.import_index(
489 scope,
490 compute_state,
491 &mut tokens,
492 input_probe,
493 *idx_id,
494 &idx.desc,
495 &idx.typ,
496 snapshot_mode,
497 start_signal.clone(),
498 );
499 }
500
501 for object in dataflow.objects_to_build {
503 let bundle = context.scope.clone().region_named(
504 &format!("BuildingObject({:?})", object.id),
505 |region| {
506 let depends = object.plan.depends();
507 context
508 .enter_region(region, Some(&depends))
509 .render_plan(object.id, object.plan)
510 .leave_region(context.scope)
511 },
512 );
513 let global_id = object.id;
514 context.log_dataflow_global_id(
515 *bundle
516 .scope()
517 .addr()
518 .first()
519 .expect("Dataflow root id must exist"),
520 global_id,
521 );
522 context.insert_id(Id::Global(object.id), bundle);
523 }
524
525 for (idx_id, dependencies, idx) in indexes {
527 context.export_index(
528 compute_state,
529 &tokens,
530 dependencies,
531 idx_id,
532 &idx,
533 &output_probe,
534 );
535 }
536
537 for (sink_id, dependencies, sink) in sinks {
539 context.export_sink(
540 compute_state,
541 &tokens,
542 dependencies,
543 sink_id,
544 &sink,
545 start_signal.clone(),
546 &output_probe,
547 scope,
548 );
549 }
550 });
551 }
552 });
553}
554
555impl<'g, T> Context<'g, T>
558where
559 T: Refines<mz_repr::Timestamp> + RenderTimestamp,
560{
561 fn import_filtered_index_collection<
565 'outer,
566 Tr: TraceReader<Time = mz_repr::Timestamp> + Clone,
567 V: Data,
568 >(
569 &self,
570 arranged: Arranged<'outer, Tr>,
571 start_signal: StartSignal,
572 mut logic: impl FnMut(Tr::Key<'_>, Tr::Val<'_>) -> V + 'static,
573 ) -> VecCollection<'g, T, V, Tr::Diff>
574 where
575 mz_repr::Timestamp: TotalOrder,
578 {
579 let oks = arranged.stream.with_start_signal(start_signal).filter({
580 let as_of = self.as_of_frontier.clone();
581 move |b| !<Antichain<mz_repr::Timestamp> as PartialOrder>::less_equal(b.upper(), &as_of)
582 });
583 Arranged::<'outer, Tr>::flat_map_batches(oks, move |a, b| [logic(a, b)]).enter(self.scope)
584 }
585
586 pub(crate) fn import_index<'outer>(
587 &mut self,
588 outer: Scope<'outer, mz_repr::Timestamp>,
589 compute_state: &mut ComputeState,
590 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
591 input_probe: probe::Handle<mz_repr::Timestamp>,
592 idx_id: GlobalId,
593 idx: &IndexDesc,
594 typ: &ReprRelationType,
595 snapshot_mode: SnapshotMode,
596 start_signal: StartSignal,
597 ) {
598 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
599 assert!(
600 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
601 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
602 );
603
604 let token = traces.to_drop().clone();
605
606 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
607 outer,
608 &format!("Index({}, {:?})", idx.on_id, idx.key),
609 self.as_of_frontier.clone(),
610 self.until.clone(),
611 );
612
613 oks.stream = oks.stream.probe_with(&input_probe);
614
615 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
616 outer,
617 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
618 self.as_of_frontier.clone(),
619 self.until.clone(),
620 );
621
622 let bundle = match snapshot_mode {
623 SnapshotMode::Include => {
624 let ok_arranged = oks
625 .enter(self.scope)
626 .with_start_signal(start_signal.clone());
627 let err_arranged = err_arranged
628 .enter(self.scope)
629 .with_start_signal(start_signal);
630 CollectionBundle::from_expressions(
631 idx.key.clone(),
632 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
633 )
634 }
635 SnapshotMode::Exclude => {
636 let oks = {
643 let mut datums = DatumVec::new();
644 let (permutation, _thinning) =
645 permutation_for_arrangement(&idx.key, typ.arity());
646 self.import_filtered_index_collection(
647 oks,
648 start_signal.clone(),
649 move |k: DatumSeq, v: DatumSeq| {
650 let temp_storage = RowArena::new();
651 let mut datums_borrow = datums.borrow();
652 k.extend_datums(&temp_storage, &mut datums_borrow, None);
653 v.extend_datums(&temp_storage, &mut datums_borrow, None);
654 SharedRow::pack(permutation.iter().map(|i| datums_borrow[*i]))
655 },
656 )
657 };
658 let errs = self.import_filtered_index_collection(
659 err_arranged,
660 start_signal,
661 |e, _| e.clone(),
662 );
663 CollectionBundle::from_collections(oks, errs)
664 }
665 };
666 self.update_id(Id::Global(idx.on_id), bundle);
667 tokens.insert(
668 idx_id,
669 Rc::new((PressOnDrop(ok_button), PressOnDrop(err_button), token)),
670 );
671 } else {
672 panic!(
673 "import of index {} failed while building dataflow {}",
674 idx_id, self.dataflow_id
675 );
676 }
677 }
678}
679
680impl<'g> Context<'g, mz_repr::Timestamp> {
683 pub(crate) fn export_index(
684 &self,
685 compute_state: &mut ComputeState,
686 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
687 dependency_ids: BTreeSet<GlobalId>,
688 idx_id: GlobalId,
689 idx: &IndexDesc,
690 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
691 ) {
692 let mut needed_tokens = Vec::new();
694 for dep_id in dependency_ids {
695 if let Some(token) = tokens.get(&dep_id) {
696 needed_tokens.push(Rc::clone(token));
697 }
698 }
699 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
700 panic!(
701 "Arrangement alarmingly absent! id: {:?}",
702 Id::Global(idx_id)
703 )
704 });
705
706 match bundle.arrangement(&idx.key) {
707 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
708 if let Some(&expiration) = self.dataflow_expiration.as_option() {
711 oks.stream = oks.stream.expire_stream_at(
712 &format!("{}_export_index_oks", self.debug_name),
713 expiration,
714 );
715 errs.stream = errs.stream.expire_stream_at(
716 &format!("{}_export_index_errs", self.debug_name),
717 expiration,
718 );
719 }
720
721 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
722
723 if let Some(logger) = compute_state.compute_logger.clone() {
725 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
726 }
727
728 compute_state.traces.set(
729 idx_id,
730 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
731 );
732 }
733 Some(ArrangementFlavor::Trace(gid, _, _)) => {
734 let trace = compute_state.traces.get(&gid).unwrap().clone();
737 compute_state.traces.set(idx_id, trace);
738 }
739 None => {
740 println!("collection available: {:?}", bundle.collection.is_none());
741 println!(
742 "keys available: {:?}",
743 bundle.arranged.keys().collect::<Vec<_>>()
744 );
745 panic!(
746 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
747 Id::Global(idx_id),
748 &idx.key
749 );
750 }
751 };
752 }
753}
754
755impl<'g, T> Context<'g, T>
758where
759 T: RenderTimestamp,
760{
761 pub(crate) fn export_index_iterative<'outer>(
762 &self,
763 outer: Scope<'outer, mz_repr::Timestamp>,
764 compute_state: &mut ComputeState,
765 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
766 dependency_ids: BTreeSet<GlobalId>,
767 idx_id: GlobalId,
768 idx: &IndexDesc,
769 output_probe: &MzProbeHandle<mz_repr::Timestamp>,
770 ) {
771 let mut needed_tokens = Vec::new();
773 for dep_id in dependency_ids {
774 if let Some(token) = tokens.get(&dep_id) {
775 needed_tokens.push(Rc::clone(token));
776 }
777 }
778 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
779 panic!(
780 "Arrangement alarmingly absent! id: {:?}",
781 Id::Global(idx_id)
782 )
783 });
784
785 match bundle.arrangement(&idx.key) {
786 Some(ArrangementFlavor::Local(oks, errs)) => {
787 let mut oks = oks
791 .as_collection(|k, v| (k.to_row(), v.to_row()))
792 .leave(outer)
793 .mz_arrange::<
794 ColumnationChunker<_>,
795 RowRowBatcher<_, _>,
796 RowRowBuilder<_, _>,
797 _,
798 >(
799 "Arrange export iterative",
800 );
801
802 let mut errs = errs
803 .as_collection(|k, v| (k.clone(), v.clone()))
804 .leave(outer)
805 .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
806 "Arrange export iterative err",
807 );
808
809 if let Some(&expiration) = self.dataflow_expiration.as_option() {
812 oks.stream = oks.stream.expire_stream_at(
813 &format!("{}_export_index_iterative_oks", self.debug_name),
814 expiration,
815 );
816 errs.stream = errs.stream.expire_stream_at(
817 &format!("{}_export_index_iterative_err", self.debug_name),
818 expiration,
819 );
820 }
821
822 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
823
824 if let Some(logger) = compute_state.compute_logger.clone() {
826 errs.stream = errs.stream.log_dataflow_errors(logger, idx_id);
827 }
828
829 compute_state.traces.set(
830 idx_id,
831 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
832 );
833 }
834 Some(ArrangementFlavor::Trace(gid, _, _)) => {
835 let trace = compute_state.traces.get(&gid).unwrap().clone();
838 compute_state.traces.set(idx_id, trace);
839 }
840 None => {
841 println!("collection available: {:?}", bundle.collection.is_none());
842 println!(
843 "keys available: {:?}",
844 bundle.arranged.keys().collect::<Vec<_>>()
845 );
846 panic!(
847 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
848 Id::Global(idx_id),
849 &idx.key
850 );
851 }
852 };
853 }
854}
855
856enum BindingInfo {
862 Body { in_let: bool },
863 Let { id: LocalId, last: bool },
864 LetRec { id: LocalId, last: bool },
865}
866
867impl<'scope> Context<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
868 fn render_recursive_plan(
881 &mut self,
882 object_id: GlobalId,
883 level: usize,
884 plan: RenderPlan,
885 binding: BindingInfo,
886 ) -> CollectionBundle<'scope, Product<mz_repr::Timestamp, PointStamp<u64>>> {
887 for BindStage { lets, recs } in plan.binds {
888 let mut let_iter = lets.into_iter().peekable();
890 while let Some(LetBind { id, value }) = let_iter.next() {
891 let bundle =
892 self.scope
893 .clone()
894 .region_named(&format!("Binding({:?})", id), |region| {
895 let depends = value.depends();
896 let last = let_iter.peek().is_none();
897 let binding = BindingInfo::Let { id, last };
898 self.enter_region(region, Some(&depends))
899 .render_letfree_plan(object_id, value, binding)
900 .leave_region(self.scope)
901 });
902 self.insert_id(Id::Local(id), bundle);
903 }
904
905 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
906
907 let mut variables = BTreeMap::new();
911 for id in rec_ids.iter() {
912 use differential_dataflow::dynamic::feedback_summary;
913 let inner = feedback_summary::<u64>(level + 1, 1);
914 let (oks_v, oks_collection) =
915 Variable::new(self.scope, Product::new(Default::default(), inner.clone()));
916 let (err_v, err_collection) =
917 Variable::new(self.scope, Product::new(Default::default(), inner));
918
919 self.insert_id(
920 Id::Local(*id),
921 CollectionBundle::from_collections(oks_collection, err_collection),
922 );
923 variables.insert(Id::Local(*id), (oks_v, err_v));
924 }
925 let mut rec_iter = recs.into_iter().peekable();
927 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
928 let last = rec_iter.peek().is_none();
929 let binding = BindingInfo::LetRec { id, last };
930 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
931 let (oks, mut err) = bundle.collection.clone().unwrap();
934 self.insert_id(Id::Local(id), bundle);
935 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
936
937 let mut oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
939 oks,
940 "LetRecConsolidation",
941 );
942
943 if let Some(limit) = limit {
944 let (in_limit, over_limit) =
947 oks.inner.branch_when(move |Product { inner: ps, .. }| {
948 let iteration_index = *ps.get(level).unwrap_or(&0);
950 iteration_index + 1 >= limit.max_iters.into()
952 });
953 oks = VecCollection::new(in_limit);
954 if !limit.return_at_limit {
955 err = err.concat(VecCollection::new(over_limit).map(move |_data| {
956 DataflowErrorSer::from(EvalError::LetRecLimitExceeded(
957 format!("{}", limit.max_iters.get()).into(),
958 ))
959 }));
960 }
961 }
962
963 let err: KeyCollection<_, _, _> = err.into();
969 let errs = err
970 .mz_arrange::<
971 ColumnationChunker<_>,
972 ErrBatcher<_, _>,
973 ErrBuilder<_, _>,
974 ErrSpine<_, _>,
975 >(
976 "Arrange recursive err",
977 )
978 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
979 "Distinct recursive err",
980 move |_k, _s, t| t.push(((), Diff::ONE)),
981 )
982 .as_collection(|k, _| k.clone());
983
984 oks_v.set(oks);
985 err_v.set(errs);
986 }
987 for id in rec_ids.into_iter() {
989 let bundle = self.remove_id(Id::Local(id)).unwrap();
990 let (oks, err) = bundle.collection.unwrap();
991 self.insert_id(
992 Id::Local(id),
993 CollectionBundle::from_collections(
994 oks.leave_dynamic(level + 1),
995 err.leave_dynamic(level + 1),
996 ),
997 );
998 }
999 }
1000
1001 self.render_letfree_plan(object_id, plan.body, binding)
1002 }
1003}
1004
1005impl<'scope, T: RenderTimestamp + MaybeBucketByTime> Context<'scope, T> {
1006 fn render_plan(
1017 &mut self,
1018 object_id: GlobalId,
1019 plan: RenderPlan,
1020 ) -> CollectionBundle<'scope, T> {
1021 let mut in_let = false;
1022 for BindStage { lets, recs } in plan.binds {
1023 assert!(recs.is_empty());
1024
1025 let mut let_iter = lets.into_iter().peekable();
1026 while let Some(LetBind { id, value }) = let_iter.next() {
1027 in_let = true;
1029 let bundle =
1030 self.scope
1031 .clone()
1032 .region_named(&format!("Binding({:?})", id), |region| {
1033 let depends = value.depends();
1034 let last = let_iter.peek().is_none();
1035 let binding = BindingInfo::Let { id, last };
1036 self.enter_region(region, Some(&depends))
1037 .render_letfree_plan(object_id, value, binding)
1038 .leave_region(self.scope)
1039 });
1040 self.insert_id(Id::Local(id), bundle);
1041 }
1042 }
1043
1044 self.scope.clone().region_named("Main Body", |region| {
1045 let depends = plan.body.depends();
1046 self.enter_region(region, Some(&depends))
1047 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
1048 .leave_region(self.scope)
1049 })
1050 }
1051
1052 fn render_letfree_plan(
1054 &self,
1055 object_id: GlobalId,
1056 plan: LetFreePlan,
1057 binding: BindingInfo,
1058 ) -> CollectionBundle<'scope, T> {
1059 let (mut nodes, root_id, topological_order) = plan.destruct();
1060
1061 let mut collections = BTreeMap::new();
1063
1064 let should_compute_lir_metadata = self.compute_logger.is_some();
1070 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1071 Some(Vec::with_capacity(nodes.len()))
1072 } else {
1073 None
1074 };
1075
1076 let mut topo_iter = topological_order.into_iter().peekable();
1077 while let Some(lir_id) = topo_iter.next() {
1078 let node = nodes.remove(&lir_id).unwrap();
1079
1080 let metadata = if should_compute_lir_metadata {
1084 let operator = node.expr.humanize(&DummyHumanizer);
1085
1086 let operator = if topo_iter.peek().is_none() {
1088 match &binding {
1089 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1090 BindingInfo::Body { in_let: false } => operator,
1091 BindingInfo::Let { id, last: true } => {
1092 format!("With {id} = {operator}")
1093 }
1094 BindingInfo::Let { id, last: false } => {
1095 format!("{id} = {operator}")
1096 }
1097 BindingInfo::LetRec { id, last: true } => {
1098 format!("With Recursive {id} = {operator}")
1099 }
1100 BindingInfo::LetRec { id, last: false } => {
1101 format!("{id} = {operator}")
1102 }
1103 }
1104 } else {
1105 operator
1106 };
1107
1108 let operator_id_start = self.scope.worker().peek_identifier();
1109 Some((operator, operator_id_start))
1110 } else {
1111 None
1112 };
1113
1114 let mut bundle = self.render_plan_expr(node.expr, &collections);
1115
1116 if let Some((operator, operator_id_start)) = metadata {
1117 let operator_id_end = self.scope.worker().peek_identifier();
1118 let operator_span = (operator_id_start, operator_id_end);
1119
1120 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1121 lir_mapping_metadata.push((
1122 lir_id,
1123 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1124 ))
1125 }
1126 }
1127
1128 self.log_operator_hydration(&mut bundle, lir_id);
1129
1130 collections.insert(lir_id, bundle);
1131 }
1132
1133 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1134 self.log_lir_mapping(object_id, lir_mapping_metadata);
1135 }
1136
1137 collections
1138 .remove(&root_id)
1139 .expect("LetFreePlan invariant (1)")
1140 }
1141
1142 fn render_plan_expr(
1149 &self,
1150 expr: render_plan::Expr,
1151 collections: &BTreeMap<LirId, CollectionBundle<'scope, T>>,
1152 ) -> CollectionBundle<'scope, T> {
1153 use render_plan::Expr::*;
1154
1155 let expect_input = |id| {
1156 collections
1157 .get(&id)
1158 .cloned()
1159 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1160 };
1161
1162 match expr {
1163 Constant { rows } => {
1164 let (rows, errs) = match rows {
1166 Ok(rows) => (rows, Vec::new()),
1167 Err(e) => (Vec::new(), vec![e]),
1168 };
1169
1170 let as_of_frontier = self.as_of_frontier.clone();
1172 let until = self.until.clone();
1173 let ok_collection = rows
1174 .into_iter()
1175 .filter_map(move |(row, mut time, diff)| {
1176 time.advance_by(as_of_frontier.borrow());
1177 if !until.less_equal(&time) {
1178 Some((
1179 row,
1180 <T as Refines<mz_repr::Timestamp>>::to_inner(time),
1181 diff,
1182 ))
1183 } else {
1184 None
1185 }
1186 })
1187 .to_stream(self.scope)
1188 .as_collection();
1189
1190 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1191 error_time.advance_by(self.as_of_frontier.borrow());
1192 let err_collection = errs
1193 .into_iter()
1194 .map(move |e| {
1195 (
1196 DataflowErrorSer::from(e),
1197 <T as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1198 Diff::ONE,
1199 )
1200 })
1201 .to_stream(self.scope)
1202 .as_collection();
1203
1204 CollectionBundle::from_collections(ok_collection, err_collection)
1205 }
1206 Get { id, keys, plan } => {
1207 let mut collection = self
1210 .lookup_id(id)
1211 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1212 match plan {
1213 mz_compute_types::plan::GetPlan::PassArrangements => {
1214 assert!(
1216 keys.arranged
1217 .iter()
1218 .all(|(key, _, _)| collection.arranged.contains_key(key))
1219 );
1220 assert!(keys.raw <= collection.collection.is_some());
1221 collection.arranged.retain(|key, _value| {
1223 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1224 });
1225 collection
1226 }
1227 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1228 let (oks, errs) = collection.as_collection_core(
1229 mfp,
1230 Some((key, row)),
1231 self.until.clone(),
1232 &self.config_set,
1233 );
1234 CollectionBundle::from_collections(oks, errs)
1235 }
1236 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1237 let (oks, errs) = collection.as_collection_core(
1238 mfp,
1239 None,
1240 self.until.clone(),
1241 &self.config_set,
1242 );
1243 CollectionBundle::from_collections(oks, errs)
1244 }
1245 }
1246 }
1247 Mfp {
1248 input,
1249 mfp,
1250 input_key_val,
1251 } => {
1252 let input = expect_input(input);
1253 if mfp.is_identity() {
1255 input
1256 } else {
1257 let (oks, errs) = input.as_collection_core(
1258 mfp,
1259 input_key_val,
1260 self.until.clone(),
1261 &self.config_set,
1262 );
1263 CollectionBundle::from_collections(oks, errs)
1264 }
1265 }
1266 FlatMap {
1267 input_key,
1268 input,
1269 exprs,
1270 func,
1271 mfp_after: mfp,
1272 } => {
1273 let input = expect_input(input);
1274 self.render_flat_map(input_key, input, exprs, func, mfp)
1275 }
1276 Join { inputs, plan } => {
1277 let inputs = inputs.into_iter().map(expect_input).collect();
1278 match plan {
1279 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1280 self.render_join(inputs, linear_plan)
1281 }
1282 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1283 self.render_delta_join(inputs, delta_plan)
1284 }
1285 }
1286 }
1287 Reduce {
1288 input_key,
1289 input,
1290 key_val_plan,
1291 plan,
1292 mfp_after,
1293 temporal_bucketing_strategy,
1294 } => {
1295 let input = expect_input(input);
1296 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1297 self.render_reduce(
1298 input_key,
1299 input,
1300 key_val_plan,
1301 plan,
1302 mfp_option,
1303 temporal_bucketing_strategy,
1304 )
1305 }
1306 TopK {
1307 input,
1308 top_k_plan,
1309 temporal_bucketing_strategy,
1310 } => {
1311 let input = expect_input(input);
1312 self.render_topk(input, top_k_plan, temporal_bucketing_strategy)
1313 }
1314 Negate { input } => {
1315 let input = expect_input(input);
1316 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1317 CollectionBundle::from_collections(oks.negate(), errs)
1318 }
1319 Threshold {
1320 input,
1321 threshold_plan,
1322 } => {
1323 let input = expect_input(input);
1324 self.render_threshold(input, threshold_plan)
1325 }
1326 Union {
1327 inputs,
1328 consolidate_output,
1329 temporal_bucketing_strategies,
1330 } => {
1331 let mut oks = Vec::new();
1332 let mut errs = Vec::new();
1333 for (input, strategy) in inputs.into_iter().zip_eq(temporal_bucketing_strategies) {
1334 let (os, es) =
1335 expect_input(input).as_specific_collection(None, &self.config_set);
1336 let os = if matches!(strategy, ArrangementStrategy::TemporalBucketing)
1340 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
1341 {
1342 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1343 .get(&self.config_set)
1344 .try_into()
1345 .expect("must fit");
1346 T::maybe_apply_temporal_bucketing(
1347 os.inner,
1348 self.as_of_frontier.clone(),
1349 summary,
1350 )
1351 } else {
1352 os
1353 };
1354 oks.push(os);
1355 errs.push(es);
1356 }
1357 let mut oks = differential_dataflow::collection::concatenate(self.scope, oks);
1358 if consolidate_output {
1359 oks = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
1360 oks,
1361 "UnionConsolidation",
1362 )
1363 }
1364 let errs = differential_dataflow::collection::concatenate(self.scope, errs);
1365 CollectionBundle::from_collections(oks, errs)
1366 }
1367 ArrangeBy {
1368 input_key,
1369 input,
1370 input_mfp,
1371 forms: keys,
1372 strategy,
1373 } => {
1374 let input = expect_input(input);
1375 input.ensure_collections(
1376 keys,
1377 input_key,
1378 input_mfp,
1379 self.as_of_frontier.clone(),
1380 self.until.clone(),
1381 &self.config_set,
1382 strategy,
1383 )
1384 }
1385 }
1386 }
1387
1388 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1389 if let Some(logger) = &self.compute_logger {
1390 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1391 dataflow_index,
1392 global_id,
1393 }));
1394 }
1395 }
1396
1397 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1398 if let Some(logger) = &self.compute_logger {
1399 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1400 }
1401 }
1402
1403 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<'scope, T>, lir_id: LirId) {
1404 match bundle.arranged.values_mut().next() {
1424 Some(arrangement) => {
1425 use ArrangementFlavor::*;
1426
1427 match arrangement {
1428 Local(a, _) => {
1429 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1430 }
1431 Trace(_, a, _) => {
1432 a.stream = self.log_operator_hydration_inner(a.stream.clone(), lir_id);
1433 }
1434 }
1435 }
1436 None => {
1437 let (oks, _) = bundle
1438 .collection
1439 .as_mut()
1440 .expect("CollectionBundle invariant");
1441 let stream = self.log_operator_hydration_inner(oks.inner.clone(), lir_id);
1442 *oks = stream.as_collection();
1443 }
1444 }
1445 }
1446
1447 fn log_operator_hydration_inner<D>(
1448 &self,
1449 stream: Stream<'scope, T, D>,
1450 lir_id: LirId,
1451 ) -> Stream<'scope, T, D>
1452 where
1453 D: timely::Container + Clone + 'static,
1454 {
1455 let Some(logger) = self.compute_logger.clone() else {
1456 return stream.clone(); };
1458
1459 let export_ids = self.export_ids.clone();
1460
1461 let mut hydration_frontier = Antichain::new();
1469 for time in self.as_of_frontier.iter() {
1470 if let Some(time) = time.try_step_forward() {
1471 hydration_frontier.insert(Refines::to_inner(time));
1472 }
1473 }
1474
1475 let name = format!("LogOperatorHydration ({lir_id})");
1476 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1477 let mut hydrated = false;
1478
1479 for &export_id in &export_ids {
1480 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1481 export_id,
1482 lir_id,
1483 hydrated,
1484 }));
1485 }
1486
1487 move |(input, frontier), output| {
1488 input.for_each(|cap, data| {
1490 output.session(&cap).give_container(data);
1491 });
1492
1493 if hydrated {
1494 return;
1495 }
1496
1497 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1498 hydrated = true;
1499
1500 for &export_id in &export_ids {
1501 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1502 export_id,
1503 lir_id,
1504 hydrated,
1505 }));
1506 }
1507 }
1508 }
1509 })
1510 }
1511}
1512
1513#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Default + Refines<mz_repr::Timestamp> {
1516 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1521 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1523 fn event_time(&self) -> mz_repr::Timestamp;
1525 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1527 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1529 fn step_back(&self) -> Self;
1532}
1533
1534pub trait MaybeBucketByTime: Timestamp {
1541 fn maybe_apply_temporal_bucketing<'scope, D>(
1542 stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1543 as_of: Antichain<mz_repr::Timestamp>,
1544 summary: mz_repr::Timestamp,
1545 ) -> VecCollection<'scope, Self, D, Diff>
1546 where
1547 D: differential_dataflow::ExchangeData
1548 + crate::typedefs::MzData
1549 + differential_dataflow::Hashable;
1550}
1551
1552impl RenderTimestamp for mz_repr::Timestamp {
1553 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1554 self
1555 }
1556 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1557 delay
1558 }
1559 fn event_time(&self) -> mz_repr::Timestamp {
1560 *self
1561 }
1562 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1563 self
1564 }
1565 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1566 delay
1567 }
1568 fn step_back(&self) -> Self {
1569 self.saturating_sub(1)
1570 }
1571}
1572
1573impl MaybeBucketByTime for mz_repr::Timestamp {
1574 fn maybe_apply_temporal_bucketing<'scope, D>(
1575 stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1576 as_of: Antichain<mz_repr::Timestamp>,
1577 summary: mz_repr::Timestamp,
1578 ) -> VecCollection<'scope, Self, D, Diff>
1579 where
1580 D: differential_dataflow::ExchangeData
1581 + crate::typedefs::MzData
1582 + differential_dataflow::Hashable,
1583 {
1584 stream
1585 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
1586 .as_collection()
1587 }
1588}
1589
1590impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1591 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1592 &mut self.outer
1593 }
1594 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1595 Product::new(delay, Default::default())
1596 }
1597 fn event_time(&self) -> mz_repr::Timestamp {
1598 self.outer
1599 }
1600 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1601 &mut self.outer
1602 }
1603 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1604 Product::new(delay, Default::default())
1605 }
1606 fn step_back(&self) -> Self {
1607 let inner = self.inner.clone();
1611 let mut vec = inner.into_inner();
1612 for item in vec.iter_mut() {
1613 *item = item.saturating_sub(1);
1614 }
1615 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1616 }
1617}
1618
1619impl MaybeBucketByTime for Product<mz_repr::Timestamp, PointStamp<u64>> {
1620 fn maybe_apply_temporal_bucketing<'scope, D>(
1621 stream: StreamVec<'scope, Self, (D, Self, Diff)>,
1622 _as_of: Antichain<mz_repr::Timestamp>,
1623 _summary: mz_repr::Timestamp,
1624 ) -> VecCollection<'scope, Self, D, Diff>
1625 where
1626 D: differential_dataflow::ExchangeData
1627 + crate::typedefs::MzData
1628 + differential_dataflow::Hashable,
1629 {
1630 stream.as_collection()
1632 }
1633}
1634
1635#[derive(Clone)]
1645pub(crate) struct StartSignal {
1646 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1651 token_ref: Weak<RefCell<Box<dyn Any>>>,
1653}
1654
1655impl StartSignal {
1656 pub fn new() -> (Self, Rc<dyn Any>) {
1659 let (tx, rx) = oneshot::channel::<Infallible>();
1660 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1661 let signal = Self {
1662 fut: rx.shared(),
1663 token_ref: Rc::downgrade(&token),
1664 };
1665 (signal, token)
1666 }
1667
1668 pub fn has_fired(&self) -> bool {
1669 self.token_ref.strong_count() == 0
1670 }
1671
1672 pub fn into_send_future(self) -> impl Future<Output = ()> + Send {
1677 use futures::FutureExt;
1678 self.fut.map(|_| ())
1679 }
1680
1681 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1682 if let Some(token) = self.token_ref.upgrade() {
1683 let mut token = token.borrow_mut();
1684 let inner = std::mem::replace(&mut *token, Box::new(()));
1685 *token = Box::new((inner, to_drop));
1686 }
1687 }
1688}
1689
1690impl Future for StartSignal {
1691 type Output = ();
1692
1693 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1694 self.fut.poll_unpin(cx).map(|_| ())
1695 }
1696}
1697
1698pub(crate) trait WithStartSignal {
1700 fn with_start_signal(self, signal: StartSignal) -> Self;
1705}
1706
1707impl<'scope, Tr> WithStartSignal for Arranged<'scope, Tr>
1708where
1709 Tr: TraceReader<Time: RenderTimestamp> + Clone,
1710{
1711 fn with_start_signal(self, signal: StartSignal) -> Self {
1712 Arranged {
1713 stream: self.stream.with_start_signal(signal),
1714 trace: self.trace,
1715 }
1716 }
1717}
1718
1719impl<'scope, T: Timestamp, D> WithStartSignal for Stream<'scope, T, D>
1720where
1721 D: timely::Container + Clone + 'static,
1722{
1723 fn with_start_signal(self, signal: StartSignal) -> Self {
1724 let activations = self.scope().activations();
1725 self.unary(Pipeline, "StartSignal", |_cap, info| {
1726 let token = Box::new(ActivateOnDrop::new((), info.address, activations));
1727 signal.drop_on_fire(token);
1728
1729 let mut stash = Vec::new();
1730
1731 move |input, output| {
1732 if !signal.has_fired() {
1734 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1735 return;
1736 }
1737
1738 for (cap, mut data) in std::mem::take(&mut stash) {
1740 output.session(&cap).give_container(&mut data);
1741 }
1742
1743 input.for_each(|cap, data| {
1745 output.session(&cap).give_container(data);
1746 });
1747 }
1748 })
1749 }
1750}
1751
1752fn suppress_early_progress<'scope, T: Timestamp, D>(
1774 stream: Stream<'scope, T, D>,
1775 as_of: Antichain<T>,
1776) -> Stream<'scope, T, D>
1777where
1778 D: Data + timely::Container,
1779{
1780 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1781 let mut early_cap = Some(default_cap);
1782
1783 move |(input, frontier), output| {
1784 input.for_each_time(|data_cap, data| {
1785 if as_of.less_than(data_cap.time()) {
1786 let mut session = output.session(&data_cap);
1787 for data in data {
1788 session.give_container(data);
1789 }
1790 } else {
1791 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1792 let mut session = output.session(&cap);
1793 for data in data {
1794 session.give_container(data);
1795 }
1796 }
1797 });
1798
1799 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1800 early_cap.take();
1801 }
1802 }
1803 })
1804}
1805
1806trait LimitProgress<T: Timestamp> {
1808 fn limit_progress(
1836 self,
1837 handle: MzProbeHandle<T>,
1838 slack_ms: u64,
1839 limit: Option<usize>,
1840 upper: Antichain<T>,
1841 name: String,
1842 ) -> Self;
1843}
1844
1845impl<'scope, D, R> LimitProgress<mz_repr::Timestamp>
1848 for StreamVec<'scope, mz_repr::Timestamp, (D, mz_repr::Timestamp, R)>
1849where
1850 D: Clone + 'static,
1851 R: Clone + 'static,
1852{
1853 fn limit_progress(
1854 self,
1855 handle: MzProbeHandle<mz_repr::Timestamp>,
1856 slack_ms: u64,
1857 limit: Option<usize>,
1858 upper: Antichain<mz_repr::Timestamp>,
1859 name: String,
1860 ) -> Self {
1861 let scope = self.scope();
1862 let stream =
1863 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1864 let mut pending_times: BTreeSet<mz_repr::Timestamp> = BTreeSet::new();
1866 let mut retained_cap: Option<Capability<mz_repr::Timestamp>> = None;
1868
1869 let activator = scope.activator_for(info.address);
1870 handle.activate(activator.clone());
1871
1872 move |(input, frontier), output| {
1873 input.for_each(|cap, data| {
1874 for time in data
1875 .iter()
1876 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1877 {
1878 let rounded_time = if slack_ms == 0 {
1882 time
1883 } else {
1884 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms)
1885 };
1886 if !upper.less_than(&rounded_time.into()) {
1887 pending_times.insert(rounded_time.into());
1888 }
1889 }
1890 output.session(&cap).give_container(data);
1891 if retained_cap.as_ref().is_none_or(|c| {
1892 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1893 }) {
1894 retained_cap = Some(cap.retain(0));
1895 }
1896 });
1897
1898 handle.with_frontier(|f| {
1899 while pending_times
1900 .first()
1901 .map_or(false, |retained_time| !f.less_than(&retained_time))
1902 {
1903 let _ = pending_times.pop_first();
1904 }
1905 });
1906
1907 while limit.map_or(false, |limit| pending_times.len() > limit) {
1908 let _ = pending_times.pop_first();
1909 }
1910
1911 match (retained_cap.as_mut(), pending_times.first()) {
1912 (Some(cap), Some(first)) => cap.downgrade(first),
1913 (_, None) => retained_cap = None,
1914 _ => {}
1915 }
1916
1917 if frontier.is_empty() {
1918 retained_cap = None;
1919 pending_times.clear();
1920 }
1921
1922 if !pending_times.is_empty() {
1923 tracing::debug!(
1924 name,
1925 info.global_id,
1926 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1927 frontier = ?frontier.frontier().get(0),
1928 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1929 ?upper,
1930 "pending times",
1931 );
1932 }
1933 }
1934 });
1935 stream
1936 }
1937}
1938
1939struct PendingTimesDisplay<T>(T);
1942
1943impl<T> std::fmt::Display for PendingTimesDisplay<T>
1944where
1945 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1946{
1947 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1948 let mut iter = self.0.clone().into_iter();
1949 write!(f, "[")?;
1950 if let Some(first) = iter.next() {
1951 write!(f, "{}", first)?;
1952 let mut last = u64::from(first);
1953 for time in iter {
1954 write!(f, ", +{}", u64::from(time) - last)?;
1955 last = u64::from(time);
1956 }
1957 }
1958 write!(f, "]")?;
1959 Ok(())
1960 }
1961}
1962
1963#[derive(Clone, Copy, Debug)]
1966struct Pairer {
1967 split_arity: usize,
1968}
1969
1970impl Pairer {
1971 fn new(split_arity: usize) -> Self {
1973 Self { split_arity }
1974 }
1975
1976 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1978 where
1979 I1: IntoIterator<Item = Datum<'a>>,
1980 I2: IntoIterator<Item = Datum<'a>>,
1981 {
1982 SharedRow::pack(first.into_iter().chain(second))
1983 }
1984
1985 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1987 let mut datum_iter = datum_iter.into_iter();
1988 let mut row_builder = SharedRow::get();
1989 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1990 let second = row_builder.pack_using(datum_iter);
1991 (first, second)
1992 }
1993}