1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::{Arranged, ShutdownButton};
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::TraceReader;
118use differential_dataflow::{AsCollection, Collection, Data};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 ENABLE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use timely::PartialOrder;
141use timely::communication::Allocate;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::Product;
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::{AsWorker, Worker as TimelyWorker};
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context, ShutdownProbe, shutdown_token};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180pub fn build_compute_dataflow<A: Allocate>(
186 timely_worker: &mut TimelyWorker<A>,
187 compute_state: &mut ComputeState,
188 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189 start_signal: StartSignal,
190 until: Antichain<mz_repr::Timestamp>,
191 dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193 let recursive = dataflow
195 .objects_to_build
196 .iter()
197 .any(|object| object.plan.is_recursive());
198
199 let indexes = dataflow
201 .index_exports
202 .iter()
203 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204 .collect::<Vec<_>>();
205
206 let sinks = dataflow
208 .sink_exports
209 .iter()
210 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211 .collect::<Vec<_>>();
212
213 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215
216 let name = format!("Dataflow: {}", &dataflow.debug_name);
217 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
218 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
219
220 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
221 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
224
225 let mut imported_sources = Vec::new();
230 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
231 let output_probe = MzProbeHandle::default();
232
233 scope.clone().region_named(&input_name, |region| {
234 for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
236 region.region_named(&format!("Source({:?})", source_id), |inner| {
237 let mut read_schema = None;
238 let mut mfp = source.arguments.operators.clone().map(|mut ops| {
239 if apply_demands {
242 let demands = ops.demand();
243 let new_desc =
244 source.storage_metadata.relation_desc.apply_demand(&demands);
245 let new_arity = demands.len();
246 let remap: BTreeMap<_, _> = demands
247 .into_iter()
248 .enumerate()
249 .map(|(new, old)| (old, new))
250 .collect();
251 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
252 read_schema = Some(new_desc);
253 }
254
255 mz_expr::MfpPlan::create_from(ops)
256 .expect("Linear operators should always be valid")
257 });
258
259 let mut snapshot_mode = SnapshotMode::Include;
260 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
261 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
262 if let Some(x) = ct_source_transformer.as_ref() {
263 snapshot_mode = x.snapshot_mode();
264 suppress_early_progress_as_of = suppress_early_progress_as_of
265 .map(|as_of| x.suppress_early_progress_as_of(as_of));
266 }
267
268 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
271 inner,
272 *source_id,
273 Arc::clone(&compute_state.persist_clients),
274 &compute_state.txns_ctx,
275 &compute_state.worker_config,
276 source.storage_metadata.clone(),
277 read_schema,
278 dataflow.as_of.clone(),
279 snapshot_mode,
280 until.clone(),
281 mfp.as_mut(),
282 compute_state.dataflow_max_inflight_bytes(),
283 start_signal.clone(),
284 ErrorHandler::Halt("compute_import"),
285 );
286
287 let mut source_tokens: Vec<Rc<dyn Any>> = vec![Rc::new(token)];
288
289 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
292
293 if let Some(as_of) = suppress_early_progress_as_of {
297 ok_stream = suppress_early_progress(ok_stream, as_of);
298 }
299
300 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
301 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
303 .get(&compute_state.worker_config);
304 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
305 .get(&compute_state.worker_config)
306 .as_millis()
307 .try_into()
308 .expect("must fit");
309
310 let (token, stream) = ok_stream.limit_progress(
311 output_probe.clone(),
312 slack,
313 limit,
314 upper.clone(),
315 name.clone(),
316 );
317 ok_stream = stream;
318 source_tokens.push(token);
319 }
320
321 let input_probe =
323 compute_state.input_probe_for(*source_id, dataflow.export_ids());
324 ok_stream = ok_stream.probe_with(&input_probe);
325
326 let (ok_stream, err_stream) = match ct_source_transformer {
330 None => (ok_stream, err_stream),
331 Some(ct_source_transformer) => {
332 let (oks, errs, ct_times) = ct_source_transformer
333 .transform(ok_stream.as_collection(), err_stream.as_collection());
334 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
337 (oks.inner, errs.inner)
338 }
339 };
340
341 let (oks, errs) = (
342 ok_stream.as_collection().leave_region().leave_region(),
343 err_stream.as_collection().leave_region().leave_region(),
344 );
345
346 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
347
348 tokens.insert(*source_id, Rc::new(source_tokens));
350 });
351 }
352 });
353
354 if recursive {
357 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
358 let mut context = Context::for_dataflow_in(
359 &dataflow,
360 region.clone(),
361 compute_state,
362 until,
363 dataflow_expiration,
364 );
365
366 for (id, (oks, errs)) in imported_sources.into_iter() {
367 let bundle = crate::render::CollectionBundle::from_collections(
368 oks.enter(region),
369 errs.enter(region),
370 );
371 context.insert_id(id, bundle);
373 }
374
375 for (idx_id, idx) in &dataflow.index_imports {
377 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
378 context.import_index(
379 compute_state,
380 &mut tokens,
381 input_probe,
382 *idx_id,
383 &idx.desc,
384 start_signal.clone(),
385 );
386 }
387
388 for object in dataflow.objects_to_build {
390 let (probe, token) = shutdown_token(region);
391 context.shutdown_probe = probe;
392 tokens.insert(object.id, Rc::new(token));
393
394 let bundle = context.scope.clone().region_named(
395 &format!("BuildingObject({:?})", object.id),
396 |region| {
397 let depends = object.plan.depends();
398 let in_let = object.plan.is_recursive();
399 context
400 .enter_region(region, Some(&depends))
401 .render_recursive_plan(
402 object.id,
403 0,
404 object.plan,
405 BindingInfo::Body { in_let },
407 )
408 .leave_region()
409 },
410 );
411 let global_id = object.id;
412
413 context.log_dataflow_global_id(
414 *bundle
415 .scope()
416 .addr()
417 .first()
418 .expect("Dataflow root id must exist"),
419 global_id,
420 );
421 context.insert_id(Id::Global(object.id), bundle);
422 }
423
424 for (idx_id, dependencies, idx) in indexes {
426 context.export_index_iterative(
427 compute_state,
428 &tokens,
429 dependencies,
430 idx_id,
431 &idx,
432 &output_probe,
433 );
434 }
435
436 for (sink_id, dependencies, sink) in sinks {
438 context.export_sink(
439 compute_state,
440 &tokens,
441 dependencies,
442 sink_id,
443 &sink,
444 start_signal.clone(),
445 ct_ctx.input_times(&context.scope.parent),
446 &output_probe,
447 );
448 }
449 });
450 } else {
451 scope.clone().region_named(&build_name, |region| {
452 let mut context = Context::for_dataflow_in(
453 &dataflow,
454 region.clone(),
455 compute_state,
456 until,
457 dataflow_expiration,
458 );
459
460 for (id, (oks, errs)) in imported_sources.into_iter() {
461 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
462 let as_of = context.as_of_frontier.clone();
463 let summary = TEMPORAL_BUCKETING_SUMMARY
464 .get(&compute_state.worker_config)
465 .try_into()
466 .expect("must fit");
467 oks.inner
468 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
469 .as_collection()
470 } else {
471 oks
472 };
473 let bundle = crate::render::CollectionBundle::from_collections(
474 oks.enter_region(region),
475 errs.enter_region(region),
476 );
477 context.insert_id(id, bundle);
479 }
480
481 for (idx_id, idx) in &dataflow.index_imports {
483 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
484 context.import_index(
485 compute_state,
486 &mut tokens,
487 input_probe,
488 *idx_id,
489 &idx.desc,
490 start_signal.clone(),
491 );
492 }
493
494 for object in dataflow.objects_to_build {
496 let (probe, token) = shutdown_token(region);
497 context.shutdown_probe = probe;
498 tokens.insert(object.id, Rc::new(token));
499
500 let bundle = context.scope.clone().region_named(
501 &format!("BuildingObject({:?})", object.id),
502 |region| {
503 let depends = object.plan.depends();
504 context
505 .enter_region(region, Some(&depends))
506 .render_plan(object.id, object.plan)
507 .leave_region()
508 },
509 );
510 let global_id = object.id;
511 context.log_dataflow_global_id(
512 *bundle
513 .scope()
514 .addr()
515 .first()
516 .expect("Dataflow root id must exist"),
517 global_id,
518 );
519 context.insert_id(Id::Global(object.id), bundle);
520 }
521
522 for (idx_id, dependencies, idx) in indexes {
524 context.export_index(
525 compute_state,
526 &tokens,
527 dependencies,
528 idx_id,
529 &idx,
530 &output_probe,
531 );
532 }
533
534 for (sink_id, dependencies, sink) in sinks {
536 context.export_sink(
537 compute_state,
538 &tokens,
539 dependencies,
540 sink_id,
541 &sink,
542 start_signal.clone(),
543 ct_ctx.input_times(&context.scope.parent),
544 &output_probe,
545 );
546 }
547 });
548 }
549 })
550}
551
552impl<'g, G, T> Context<Child<'g, G, T>>
555where
556 G: Scope<Timestamp = mz_repr::Timestamp>,
557 T: Refines<G::Timestamp> + RenderTimestamp,
558{
559 pub(crate) fn import_index(
560 &mut self,
561 compute_state: &mut ComputeState,
562 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
563 input_probe: probe::Handle<mz_repr::Timestamp>,
564 idx_id: GlobalId,
565 idx: &IndexDesc,
566 start_signal: StartSignal,
567 ) {
568 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
569 assert!(
570 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
571 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
572 );
573
574 let token = traces.to_drop().clone();
575
576 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
577 &self.scope.parent,
578 &format!("Index({}, {:?})", idx.on_id, idx.key),
579 self.as_of_frontier.clone(),
580 self.until.clone(),
581 );
582
583 oks.stream = oks.stream.probe_with(&input_probe);
584
585 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
586 &self.scope.parent,
587 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
588 self.as_of_frontier.clone(),
589 self.until.clone(),
590 );
591
592 let ok_arranged = oks
593 .enter(&self.scope)
594 .with_start_signal(start_signal.clone());
595 let err_arranged = err_arranged
596 .enter(&self.scope)
597 .with_start_signal(start_signal);
598
599 self.update_id(
600 Id::Global(idx.on_id),
601 CollectionBundle::from_expressions(
602 idx.key.clone(),
603 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
604 ),
605 );
606 tokens.insert(
607 idx_id,
608 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
609 );
610 } else {
611 panic!(
612 "import of index {} failed while building dataflow {}",
613 idx_id, self.dataflow_id
614 );
615 }
616 }
617}
618
619impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
622where
623 G: Scope<Timestamp = mz_repr::Timestamp>,
624{
625 pub(crate) fn export_index(
626 &self,
627 compute_state: &mut ComputeState,
628 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
629 dependency_ids: BTreeSet<GlobalId>,
630 idx_id: GlobalId,
631 idx: &IndexDesc,
632 output_probe: &MzProbeHandle<G::Timestamp>,
633 ) {
634 let mut needed_tokens = Vec::new();
636 for dep_id in dependency_ids {
637 if let Some(token) = tokens.get(&dep_id) {
638 needed_tokens.push(Rc::clone(token));
639 }
640 }
641 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
642 panic!(
643 "Arrangement alarmingly absent! id: {:?}",
644 Id::Global(idx_id)
645 )
646 });
647
648 match bundle.arrangement(&idx.key) {
649 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
650 if let Some(&expiration) = self.dataflow_expiration.as_option() {
653 let token = Rc::new(());
654 let shutdown_token = Rc::downgrade(&token);
655 oks.stream = oks.stream.expire_stream_at(
656 &format!("{}_export_index_oks", self.debug_name),
657 expiration,
658 Weak::clone(&shutdown_token),
659 );
660 errs.stream = errs.stream.expire_stream_at(
661 &format!("{}_export_index_errs", self.debug_name),
662 expiration,
663 shutdown_token,
664 );
665 needed_tokens.push(token);
666 }
667
668 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
669
670 if let Some(logger) = compute_state.compute_logger.clone() {
672 errs.stream.log_dataflow_errors(logger, idx_id);
673 }
674
675 compute_state.traces.set(
676 idx_id,
677 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
678 );
679 }
680 Some(ArrangementFlavor::Trace(gid, _, _)) => {
681 let trace = compute_state.traces.get(&gid).unwrap().clone();
684 compute_state.traces.set(idx_id, trace);
685 }
686 None => {
687 println!("collection available: {:?}", bundle.collection.is_none());
688 println!(
689 "keys available: {:?}",
690 bundle.arranged.keys().collect::<Vec<_>>()
691 );
692 panic!(
693 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
694 Id::Global(idx_id),
695 &idx.key
696 );
697 }
698 };
699 }
700}
701
702impl<'g, G, T> Context<Child<'g, G, T>>
705where
706 G: Scope<Timestamp = mz_repr::Timestamp>,
707 T: RenderTimestamp,
708{
709 pub(crate) fn export_index_iterative(
710 &self,
711 compute_state: &mut ComputeState,
712 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
713 dependency_ids: BTreeSet<GlobalId>,
714 idx_id: GlobalId,
715 idx: &IndexDesc,
716 output_probe: &MzProbeHandle<G::Timestamp>,
717 ) {
718 let mut needed_tokens = Vec::new();
720 for dep_id in dependency_ids {
721 if let Some(token) = tokens.get(&dep_id) {
722 needed_tokens.push(Rc::clone(token));
723 }
724 }
725 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
726 panic!(
727 "Arrangement alarmingly absent! id: {:?}",
728 Id::Global(idx_id)
729 )
730 });
731
732 match bundle.arrangement(&idx.key) {
733 Some(ArrangementFlavor::Local(oks, errs)) => {
734 let mut oks = oks
738 .as_collection(|k, v| (k.to_row(), v.to_row()))
739 .leave()
740 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
741 "Arrange export iterative",
742 );
743
744 let mut errs = errs
745 .as_collection(|k, v| (k.clone(), v.clone()))
746 .leave()
747 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
748 "Arrange export iterative err",
749 );
750
751 if let Some(&expiration) = self.dataflow_expiration.as_option() {
754 let token = Rc::new(());
755 let shutdown_token = Rc::downgrade(&token);
756 oks.stream = oks.stream.expire_stream_at(
757 &format!("{}_export_index_iterative_oks", self.debug_name),
758 expiration,
759 Weak::clone(&shutdown_token),
760 );
761 errs.stream = errs.stream.expire_stream_at(
762 &format!("{}_export_index_iterative_err", self.debug_name),
763 expiration,
764 shutdown_token,
765 );
766 needed_tokens.push(token);
767 }
768
769 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
770
771 if let Some(logger) = compute_state.compute_logger.clone() {
773 errs.stream.log_dataflow_errors(logger, idx_id);
774 }
775
776 compute_state.traces.set(
777 idx_id,
778 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
779 );
780 }
781 Some(ArrangementFlavor::Trace(gid, _, _)) => {
782 let trace = compute_state.traces.get(&gid).unwrap().clone();
785 compute_state.traces.set(idx_id, trace);
786 }
787 None => {
788 println!("collection available: {:?}", bundle.collection.is_none());
789 println!(
790 "keys available: {:?}",
791 bundle.arranged.keys().collect::<Vec<_>>()
792 );
793 panic!(
794 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
795 Id::Global(idx_id),
796 &idx.key
797 );
798 }
799 };
800 }
801}
802
803enum BindingInfo {
809 Body { in_let: bool },
810 Let { id: LocalId, last: bool },
811 LetRec { id: LocalId, last: bool },
812}
813
814impl<G> Context<G>
815where
816 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
817{
818 fn render_recursive_plan(
831 &mut self,
832 object_id: GlobalId,
833 level: usize,
834 plan: RenderPlan,
835 binding: BindingInfo,
836 ) -> CollectionBundle<G> {
837 for BindStage { lets, recs } in plan.binds {
838 let mut let_iter = lets.into_iter().peekable();
840 while let Some(LetBind { id, value }) = let_iter.next() {
841 let bundle =
842 self.scope
843 .clone()
844 .region_named(&format!("Binding({:?})", id), |region| {
845 let depends = value.depends();
846 let last = let_iter.peek().is_none();
847 let binding = BindingInfo::Let { id, last };
848 self.enter_region(region, Some(&depends))
849 .render_letfree_plan(object_id, value, binding)
850 .leave_region()
851 });
852 self.insert_id(Id::Local(id), bundle);
853 }
854
855 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
856
857 let mut variables = BTreeMap::new();
861 for id in rec_ids.iter() {
862 use differential_dataflow::dynamic::feedback_summary;
863 let inner = feedback_summary::<u64>(level + 1, 1);
864 let oks_v = SemigroupVariable::new(
865 &mut self.scope,
866 Product::new(Default::default(), inner.clone()),
867 );
868 let err_v = SemigroupVariable::new(
869 &mut self.scope,
870 Product::new(Default::default(), inner),
871 );
872
873 self.insert_id(
874 Id::Local(*id),
875 CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
876 );
877 variables.insert(Id::Local(*id), (oks_v, err_v));
878 }
879 let mut rec_iter = recs.into_iter().peekable();
881 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
882 let last = rec_iter.peek().is_none();
883 let binding = BindingInfo::LetRec { id, last };
884 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
885 let (oks, mut err) = bundle.collection.clone().unwrap();
888 self.insert_id(Id::Local(id), bundle);
889 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
890
891 let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
893
894 if let Some(limit) = limit {
895 let (in_limit, over_limit) =
898 oks.inner.branch_when(move |Product { inner: ps, .. }| {
899 let iteration_index = *ps.get(level).unwrap_or(&0);
901 iteration_index + 1 >= limit.max_iters.into()
903 });
904 oks = Collection::new(in_limit);
905 if !limit.return_at_limit {
906 err = err.concat(&Collection::new(over_limit).map(move |_data| {
907 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
908 format!("{}", limit.max_iters.get()).into(),
909 )))
910 }));
911 }
912 }
913
914 let err: KeyCollection<_, _, _> = err.into();
920 let errs = err
921 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
922 "Arrange recursive err",
923 )
924 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
925 "Distinct recursive err",
926 move |_k, _s, t| t.push(((), Diff::ONE)),
927 )
928 .as_collection(|k, _| k.clone());
929
930 let oks = render_shutdown_fuse(oks, self.shutdown_probe.clone());
933 let errs = render_shutdown_fuse(errs, self.shutdown_probe.clone());
934
935 oks_v.set(&oks);
936 err_v.set(&errs);
937 }
938 for id in rec_ids.into_iter() {
940 let bundle = self.remove_id(Id::Local(id)).unwrap();
941 let (oks, err) = bundle.collection.unwrap();
942 self.insert_id(
943 Id::Local(id),
944 CollectionBundle::from_collections(
945 oks.leave_dynamic(level + 1),
946 err.leave_dynamic(level + 1),
947 ),
948 );
949 }
950 }
951
952 self.render_letfree_plan(object_id, plan.body, binding)
953 }
954}
955
956impl<G> Context<G>
957where
958 G: Scope,
959 G::Timestamp: RenderTimestamp,
960{
961 fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
972 let mut in_let = false;
973 for BindStage { lets, recs } in plan.binds {
974 assert!(recs.is_empty());
975
976 let mut let_iter = lets.into_iter().peekable();
977 while let Some(LetBind { id, value }) = let_iter.next() {
978 in_let = true;
980 let bundle =
981 self.scope
982 .clone()
983 .region_named(&format!("Binding({:?})", id), |region| {
984 let depends = value.depends();
985 let last = let_iter.peek().is_none();
986 let binding = BindingInfo::Let { id, last };
987 self.enter_region(region, Some(&depends))
988 .render_letfree_plan(object_id, value, binding)
989 .leave_region()
990 });
991 self.insert_id(Id::Local(id), bundle);
992 }
993 }
994
995 self.scope.clone().region_named("Main Body", |region| {
996 let depends = plan.body.depends();
997 self.enter_region(region, Some(&depends))
998 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
999 .leave_region()
1000 })
1001 }
1002
1003 fn render_letfree_plan(
1005 &mut self,
1006 object_id: GlobalId,
1007 plan: LetFreePlan,
1008 binding: BindingInfo,
1009 ) -> CollectionBundle<G> {
1010 let (mut nodes, root_id, topological_order) = plan.destruct();
1011
1012 let mut collections = BTreeMap::new();
1014
1015 let should_compute_lir_metadata = self.compute_logger.is_some();
1021 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1022 Some(Vec::with_capacity(nodes.len()))
1023 } else {
1024 None
1025 };
1026
1027 let mut topo_iter = topological_order.into_iter().peekable();
1028 while let Some(lir_id) = topo_iter.next() {
1029 let node = nodes.remove(&lir_id).unwrap();
1030
1031 let metadata = if should_compute_lir_metadata {
1035 let operator = node.expr.humanize(&DummyHumanizer);
1036
1037 let operator = if topo_iter.peek().is_none() {
1039 match &binding {
1040 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1041 BindingInfo::Body { in_let: false } => operator,
1042 BindingInfo::Let { id, last: true } => {
1043 format!("With {id} = {operator}")
1044 }
1045 BindingInfo::Let { id, last: false } => {
1046 format!("{id} = {operator}")
1047 }
1048 BindingInfo::LetRec { id, last: true } => {
1049 format!("With Recursive {id} = {operator}")
1050 }
1051 BindingInfo::LetRec { id, last: false } => {
1052 format!("{id} = {operator}")
1053 }
1054 }
1055 } else {
1056 operator
1057 };
1058
1059 let operator_id_start = self.scope.peek_identifier();
1060 Some((operator, operator_id_start))
1061 } else {
1062 None
1063 };
1064
1065 let mut bundle = self.render_plan_expr(node.expr, &collections);
1066
1067 if let Some((operator, operator_id_start)) = metadata {
1068 let operator_id_end = self.scope.peek_identifier();
1069 let operator_span = (operator_id_start, operator_id_end);
1070
1071 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1072 lir_mapping_metadata.push((
1073 lir_id,
1074 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1075 ))
1076 }
1077 }
1078
1079 self.log_operator_hydration(&mut bundle, lir_id);
1080
1081 collections.insert(lir_id, bundle);
1082 }
1083
1084 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1085 self.log_lir_mapping(object_id, lir_mapping_metadata);
1086 }
1087
1088 collections
1089 .remove(&root_id)
1090 .expect("LetFreePlan invariant (1)")
1091 }
1092
1093 fn render_plan_expr(
1100 &mut self,
1101 expr: render_plan::Expr,
1102 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1103 ) -> CollectionBundle<G> {
1104 use render_plan::Expr::*;
1105
1106 let expect_input = |id| {
1107 collections
1108 .get(&id)
1109 .cloned()
1110 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1111 };
1112
1113 match expr {
1114 Constant { rows } => {
1115 let (rows, errs) = match rows {
1117 Ok(rows) => (rows, Vec::new()),
1118 Err(e) => (Vec::new(), vec![e]),
1119 };
1120
1121 let as_of_frontier = self.as_of_frontier.clone();
1123 let until = self.until.clone();
1124 let ok_collection = rows
1125 .into_iter()
1126 .filter_map(move |(row, mut time, diff)| {
1127 time.advance_by(as_of_frontier.borrow());
1128 if !until.less_equal(&time) {
1129 Some((
1130 row,
1131 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1132 diff,
1133 ))
1134 } else {
1135 None
1136 }
1137 })
1138 .to_stream(&mut self.scope)
1139 .as_collection();
1140
1141 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1142 error_time.advance_by(self.as_of_frontier.borrow());
1143 let err_collection = errs
1144 .into_iter()
1145 .map(move |e| {
1146 (
1147 DataflowError::from(e),
1148 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1149 Diff::ONE,
1150 )
1151 })
1152 .to_stream(&mut self.scope)
1153 .as_collection();
1154
1155 CollectionBundle::from_collections(ok_collection, err_collection)
1156 }
1157 Get { id, keys, plan } => {
1158 let mut collection = self
1161 .lookup_id(id)
1162 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1163 match plan {
1164 mz_compute_types::plan::GetPlan::PassArrangements => {
1165 assert!(
1167 keys.arranged
1168 .iter()
1169 .all(|(key, _, _)| collection.arranged.contains_key(key))
1170 );
1171 assert!(keys.raw <= collection.collection.is_some());
1172 collection.arranged.retain(|key, _value| {
1174 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1175 });
1176 collection
1177 }
1178 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1179 let (oks, errs) = collection.as_collection_core(
1180 mfp,
1181 Some((key, row)),
1182 self.until.clone(),
1183 &self.config_set,
1184 );
1185 CollectionBundle::from_collections(oks, errs)
1186 }
1187 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1188 let (oks, errs) = collection.as_collection_core(
1189 mfp,
1190 None,
1191 self.until.clone(),
1192 &self.config_set,
1193 );
1194 CollectionBundle::from_collections(oks, errs)
1195 }
1196 }
1197 }
1198 Mfp {
1199 input,
1200 mfp,
1201 input_key_val,
1202 } => {
1203 let input = expect_input(input);
1204 if mfp.is_identity() {
1206 input
1207 } else {
1208 let (oks, errs) = input.as_collection_core(
1209 mfp,
1210 input_key_val,
1211 self.until.clone(),
1212 &self.config_set,
1213 );
1214 CollectionBundle::from_collections(oks, errs)
1215 }
1216 }
1217 FlatMap {
1218 input_key,
1219 input,
1220 exprs,
1221 func,
1222 mfp_after: mfp,
1223 } => {
1224 let input = expect_input(input);
1225 self.render_flat_map(input_key, input, exprs, func, mfp)
1226 }
1227 Join { inputs, plan } => {
1228 let inputs = inputs.into_iter().map(expect_input).collect();
1229 match plan {
1230 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1231 self.render_join(inputs, linear_plan)
1232 }
1233 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1234 self.render_delta_join(inputs, delta_plan)
1235 }
1236 }
1237 }
1238 Reduce {
1239 input_key,
1240 input,
1241 key_val_plan,
1242 plan,
1243 mfp_after,
1244 } => {
1245 let input = expect_input(input);
1246 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1247 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1248 }
1249 TopK { input, top_k_plan } => {
1250 let input = expect_input(input);
1251 self.render_topk(input, top_k_plan)
1252 }
1253 Negate { input } => {
1254 let input = expect_input(input);
1255 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1256 CollectionBundle::from_collections(oks.negate(), errs)
1257 }
1258 Threshold {
1259 input,
1260 threshold_plan,
1261 } => {
1262 let input = expect_input(input);
1263 self.render_threshold(input, threshold_plan)
1264 }
1265 Union {
1266 inputs,
1267 consolidate_output,
1268 } => {
1269 let mut oks = Vec::new();
1270 let mut errs = Vec::new();
1271 for input in inputs.into_iter() {
1272 let (os, es) =
1273 expect_input(input).as_specific_collection(None, &self.config_set);
1274 oks.push(os);
1275 errs.push(es);
1276 }
1277 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1278 if consolidate_output {
1279 oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1280 }
1281 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1282 CollectionBundle::from_collections(oks, errs)
1283 }
1284 ArrangeBy {
1285 input_key,
1286 input,
1287 input_mfp,
1288 forms: keys,
1289 } => {
1290 let input = expect_input(input);
1291 input.ensure_collections(
1292 keys,
1293 input_key,
1294 input_mfp,
1295 self.until.clone(),
1296 &self.config_set,
1297 )
1298 }
1299 }
1300 }
1301
1302 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1303 if let Some(logger) = &self.compute_logger {
1304 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1305 dataflow_index,
1306 global_id,
1307 }));
1308 }
1309 }
1310
1311 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1312 if let Some(logger) = &self.compute_logger {
1313 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1314 }
1315 }
1316
1317 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1318 match bundle.arranged.values_mut().next() {
1338 Some(arrangement) => {
1339 use ArrangementFlavor::*;
1340
1341 match arrangement {
1342 Local(a, _) => {
1343 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1344 }
1345 Trace(_, a, _) => {
1346 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1347 }
1348 }
1349 }
1350 None => {
1351 let (oks, _) = bundle
1352 .collection
1353 .as_mut()
1354 .expect("CollectionBundle invariant");
1355 let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1356 *oks = stream.as_collection();
1357 }
1358 }
1359 }
1360
1361 fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1362 where
1363 D: Clone + 'static,
1364 {
1365 let Some(logger) = self.compute_logger.clone() else {
1366 return stream.clone(); };
1368
1369 let export_ids = self.export_ids.clone();
1370
1371 let mut hydration_frontier = Antichain::new();
1379 for time in self.as_of_frontier.iter() {
1380 if let Some(time) = time.try_step_forward() {
1381 hydration_frontier.insert(Refines::to_inner(time));
1382 }
1383 }
1384
1385 let name = format!("LogOperatorHydration ({lir_id})");
1386 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1387 let mut hydrated = false;
1388
1389 for &export_id in &export_ids {
1390 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1391 export_id,
1392 lir_id,
1393 hydrated,
1394 }));
1395 }
1396
1397 move |input, output| {
1398 input.for_each(|cap, data| {
1400 output.session(&cap).give_container(data);
1401 });
1402
1403 if hydrated {
1404 return;
1405 }
1406
1407 let frontier = input.frontier().frontier();
1408 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier) {
1409 hydrated = true;
1410
1411 for &export_id in &export_ids {
1412 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1413 export_id,
1414 lir_id,
1415 hydrated,
1416 }));
1417 }
1418 }
1419 }
1420 })
1421 }
1422}
1423
1424#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1427 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1432 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1434 fn event_time(&self) -> mz_repr::Timestamp;
1436 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1438 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1440 fn step_back(&self) -> Self;
1443}
1444
1445impl RenderTimestamp for mz_repr::Timestamp {
1446 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1447 self
1448 }
1449 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1450 delay
1451 }
1452 fn event_time(&self) -> mz_repr::Timestamp {
1453 *self
1454 }
1455 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1456 self
1457 }
1458 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1459 delay
1460 }
1461 fn step_back(&self) -> Self {
1462 self.saturating_sub(1)
1463 }
1464}
1465
1466impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1467 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1468 &mut self.outer
1469 }
1470 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1471 Product::new(delay, Default::default())
1472 }
1473 fn event_time(&self) -> mz_repr::Timestamp {
1474 self.outer
1475 }
1476 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1477 &mut self.outer
1478 }
1479 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1480 Product::new(delay, Default::default())
1481 }
1482 fn step_back(&self) -> Self {
1483 let inner = self.inner.clone();
1487 let mut vec = inner.into_vec();
1488 for item in vec.iter_mut() {
1489 *item = item.saturating_sub(1);
1490 }
1491 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1492 }
1493}
1494
1495#[derive(Clone)]
1505pub(crate) struct StartSignal {
1506 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1511 token_ref: Weak<RefCell<Box<dyn Any>>>,
1513}
1514
1515impl StartSignal {
1516 pub fn new() -> (Self, Rc<dyn Any>) {
1519 let (tx, rx) = oneshot::channel::<Infallible>();
1520 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1521 let signal = Self {
1522 fut: rx.shared(),
1523 token_ref: Rc::downgrade(&token),
1524 };
1525 (signal, token)
1526 }
1527
1528 pub fn has_fired(&self) -> bool {
1529 self.token_ref.strong_count() == 0
1530 }
1531
1532 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1533 if let Some(token) = self.token_ref.upgrade() {
1534 let mut token = token.borrow_mut();
1535 let inner = std::mem::replace(&mut *token, Box::new(()));
1536 *token = Box::new((inner, to_drop));
1537 }
1538 }
1539}
1540
1541impl Future for StartSignal {
1542 type Output = ();
1543
1544 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1545 self.fut.poll_unpin(cx).map(|_| ())
1546 }
1547}
1548
1549pub(crate) trait WithStartSignal {
1551 fn with_start_signal(self, signal: StartSignal) -> Self;
1556}
1557
1558impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1559where
1560 S: Scope,
1561 S::Timestamp: RenderTimestamp,
1562 Tr: TraceReader + Clone,
1563{
1564 fn with_start_signal(self, signal: StartSignal) -> Self {
1565 Arranged {
1566 stream: self.stream.with_start_signal(signal),
1567 trace: self.trace,
1568 }
1569 }
1570}
1571
1572impl<S, D> WithStartSignal for Stream<S, D>
1573where
1574 S: Scope,
1575 D: timely::Data,
1576{
1577 fn with_start_signal(self, signal: StartSignal) -> Self {
1578 self.unary(Pipeline, "StartSignal", |_cap, info| {
1579 let token = Box::new(ActivateOnDrop::new(
1580 (),
1581 info.address,
1582 self.scope().activations(),
1583 ));
1584 signal.drop_on_fire(token);
1585
1586 let mut stash = Vec::new();
1587
1588 move |input, output| {
1589 if !signal.has_fired() {
1591 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1592 return;
1593 }
1594
1595 for (cap, mut data) in std::mem::take(&mut stash) {
1597 output.session(&cap).give_container(&mut data);
1598 }
1599
1600 input.for_each(|cap, data| {
1602 output.session(&cap).give_container(data);
1603 });
1604 }
1605 })
1606 }
1607}
1608
1609fn render_shutdown_fuse<G, D>(
1613 collection: Collection<G, D, Diff>,
1614 probe: ShutdownProbe,
1615) -> Collection<G, D, Diff>
1616where
1617 G: Scope,
1618 D: Data,
1619{
1620 let stream = collection.inner;
1621 let stream = stream.unary(Pipeline, "ShutdownFuse", move |_cap, _info| {
1622 move |input, output| {
1623 input.for_each(|cap, data| {
1624 if !probe.in_shutdown() {
1625 output.session(&cap).give_container(data);
1626 }
1627 });
1628 }
1629 });
1630 stream.as_collection()
1631}
1632
1633fn suppress_early_progress<G, D>(
1655 stream: Stream<G, D>,
1656 as_of: Antichain<G::Timestamp>,
1657) -> Stream<G, D>
1658where
1659 G: Scope,
1660 D: Data,
1661{
1662 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1663 let mut early_cap = Some(default_cap);
1664
1665 move |input, output| {
1666 input.for_each(|data_cap, data| {
1667 let mut session = if as_of.less_than(data_cap.time()) {
1668 output.session(&data_cap)
1669 } else {
1670 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1671 output.session(cap)
1672 };
1673 session.give_container(data);
1674 });
1675
1676 let frontier = input.frontier().frontier();
1677 if !PartialOrder::less_equal(&frontier, &as_of.borrow()) {
1678 early_cap.take();
1679 }
1680 }
1681 })
1682}
1683
1684trait LimitProgress<T: Timestamp> {
1686 fn limit_progress(
1716 &self,
1717 handle: MzProbeHandle<T>,
1718 slack_ms: u64,
1719 limit: Option<usize>,
1720 upper: Antichain<T>,
1721 name: String,
1722 ) -> (Rc<dyn Any>, Self);
1723}
1724
1725impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1728where
1729 G: Scope<Timestamp = mz_repr::Timestamp>,
1730 D: timely::Data,
1731 R: timely::Data,
1732{
1733 fn limit_progress(
1734 &self,
1735 handle: MzProbeHandle<mz_repr::Timestamp>,
1736 slack_ms: u64,
1737 limit: Option<usize>,
1738 upper: Antichain<mz_repr::Timestamp>,
1739 name: String,
1740 ) -> (Rc<dyn Any>, Self) {
1741 let mut button = None;
1742
1743 let stream =
1744 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1745 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1747 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1749
1750 let activator = self.scope().activator_for(info.address);
1751 handle.activate(activator.clone());
1752
1753 let shutdown = Rc::new(());
1754 button = Some(ShutdownButton::new(
1755 Rc::new(RefCell::new(Some(Rc::clone(&shutdown)))),
1756 activator,
1757 ));
1758 let shutdown = Rc::downgrade(&shutdown);
1759
1760 move |input, output| {
1761 if shutdown.strong_count() == 0 {
1763 retained_cap = None;
1764 pending_times.clear();
1765 while let Some(_) = input.next() {}
1766 return;
1767 }
1768
1769 while let Some((cap, data)) = input.next() {
1770 for time in data
1771 .iter()
1772 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1773 {
1774 let rounded_time =
1775 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1776 if !upper.less_than(&rounded_time.into()) {
1777 pending_times.insert(rounded_time.into());
1778 }
1779 }
1780 output.session(&cap).give_container(data);
1781 if retained_cap.as_ref().is_none_or(|c| {
1782 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1783 }) {
1784 retained_cap = Some(cap.retain());
1785 }
1786 }
1787
1788 handle.with_frontier(|f| {
1789 while pending_times
1790 .first()
1791 .map_or(false, |retained_time| !f.less_than(&retained_time))
1792 {
1793 let _ = pending_times.pop_first();
1794 }
1795 });
1796
1797 while limit.map_or(false, |limit| pending_times.len() > limit) {
1798 let _ = pending_times.pop_first();
1799 }
1800
1801 match (retained_cap.as_mut(), pending_times.first()) {
1802 (Some(cap), Some(first)) => cap.downgrade(first),
1803 (_, None) => retained_cap = None,
1804 _ => {}
1805 }
1806
1807 if input.frontier.is_empty() {
1808 retained_cap = None;
1809 pending_times.clear();
1810 }
1811
1812 if !pending_times.is_empty() {
1813 tracing::debug!(
1814 name,
1815 info.global_id,
1816 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1817 frontier = ?input.frontier.frontier().get(0),
1818 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1819 ?upper,
1820 "pending times",
1821 );
1822 }
1823 }
1824 });
1825 (Rc::new(button.unwrap().press_on_drop()), stream)
1826 }
1827}
1828
1829struct PendingTimesDisplay<T>(T);
1832
1833impl<T> std::fmt::Display for PendingTimesDisplay<T>
1834where
1835 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1836{
1837 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1838 let mut iter = self.0.clone().into_iter();
1839 write!(f, "[")?;
1840 if let Some(first) = iter.next() {
1841 write!(f, "{}", first)?;
1842 let mut last = u64::from(first);
1843 for time in iter {
1844 write!(f, ", +{}", u64::from(time) - last)?;
1845 last = u64::from(time);
1846 }
1847 }
1848 write!(f, "]")?;
1849 Ok(())
1850 }
1851}
1852
1853#[derive(Clone, Copy, Debug)]
1856struct Pairer {
1857 split_arity: usize,
1858}
1859
1860impl Pairer {
1861 fn new(split_arity: usize) -> Self {
1863 Self { split_arity }
1864 }
1865
1866 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1868 where
1869 I1: IntoIterator<Item = Datum<'a>>,
1870 I2: IntoIterator<Item = Datum<'a>>,
1871 {
1872 SharedRow::pack(first.into_iter().chain(second))
1873 }
1874
1875 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1877 let mut datum_iter = datum_iter.into_iter();
1878 let mut row_builder = SharedRow::get();
1879 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1880 let second = row_builder.pack_using(datum_iter);
1881 (first, second)
1882 }
1883}