1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::{Arranged, ShutdownButton};
116use differential_dataflow::trace::TraceReader;
117use differential_dataflow::{AsCollection, Collection, Data};
118use futures::FutureExt;
119use futures::channel::oneshot;
120use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
121use mz_compute_types::dyncfgs::{
122 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
123 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
124 ENABLE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
125};
126use mz_compute_types::plan::LirId;
127use mz_compute_types::plan::render_plan::{
128 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
129};
130use mz_expr::{EvalError, Id, LocalId};
131use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
132use mz_repr::explain::DummyHumanizer;
133use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
134use mz_storage_operators::persist_source;
135use mz_storage_types::controller::CollectionMetadata;
136use mz_storage_types::errors::DataflowError;
137use mz_timely_util::operator::{CollectionExt, StreamExt};
138use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
139use timely::PartialOrder;
140use timely::communication::Allocate;
141use timely::container::CapacityContainerBuilder;
142use timely::dataflow::channels::pact::Pipeline;
143use timely::dataflow::operators::to_stream::ToStream;
144use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
145use timely::dataflow::scopes::Child;
146use timely::dataflow::{Scope, Stream, StreamCore};
147use timely::order::Product;
148use timely::progress::timestamp::Refines;
149use timely::progress::{Antichain, Timestamp};
150use timely::scheduling::ActivateOnDrop;
151use timely::worker::{AsWorker, Worker as TimelyWorker};
152
153use crate::arrangement::manager::TraceBundle;
154use crate::compute_state::ComputeState;
155use crate::extensions::arrange::{KeyCollection, MzArrange};
156use crate::extensions::reduce::MzReduce;
157use crate::extensions::temporal_bucket::TemporalBucketing;
158use crate::logging::compute::{
159 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors,
160};
161use crate::render::context::{ArrangementFlavor, Context, ShutdownProbe, shutdown_token};
162use crate::render::continual_task::ContinualTaskCtx;
163use crate::row_spine::{RowRowBatcher, RowRowBuilder};
164use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
165
166pub mod context;
167pub(crate) mod continual_task;
168mod errors;
169mod flat_map;
170mod join;
171mod reduce;
172pub mod sinks;
173mod threshold;
174mod top_k;
175
176pub use context::CollectionBundle;
177pub use join::LinearJoinSpec;
178
179pub fn build_compute_dataflow<A: Allocate>(
185 timely_worker: &mut TimelyWorker<A>,
186 compute_state: &mut ComputeState,
187 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
188 start_signal: StartSignal,
189 until: Antichain<mz_repr::Timestamp>,
190 dataflow_expiration: Antichain<mz_repr::Timestamp>,
191) {
192 let recursive = dataflow
194 .objects_to_build
195 .iter()
196 .any(|object| object.plan.is_recursive());
197
198 let indexes = dataflow
200 .index_exports
201 .iter()
202 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
203 .collect::<Vec<_>>();
204
205 let sinks = dataflow
207 .sink_exports
208 .iter()
209 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
210 .collect::<Vec<_>>();
211
212 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
213 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
214
215 let name = format!("Dataflow: {}", &dataflow.debug_name);
216 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
217 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
218
219 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
220 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
223
224 let mut imported_sources = Vec::new();
229 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
230 let output_probe = MzProbeHandle::default();
231
232 scope.clone().region_named(&input_name, |region| {
233 for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
235 region.region_named(&format!("Source({:?})", source_id), |inner| {
236 let mut read_schema = None;
237 let mut mfp = source.arguments.operators.clone().map(|mut ops| {
238 if apply_demands {
241 let demands = ops.demand();
242 let new_desc =
243 source.storage_metadata.relation_desc.apply_demand(&demands);
244 let new_arity = demands.len();
245 let remap: BTreeMap<_, _> = demands
246 .into_iter()
247 .enumerate()
248 .map(|(new, old)| (old, new))
249 .collect();
250 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
251 read_schema = Some(new_desc);
252 }
253
254 mz_expr::MfpPlan::create_from(ops)
255 .expect("Linear operators should always be valid")
256 });
257
258 let mut snapshot_mode = SnapshotMode::Include;
259 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
260 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
261 if let Some(x) = ct_source_transformer.as_ref() {
262 snapshot_mode = x.snapshot_mode();
263 suppress_early_progress_as_of = suppress_early_progress_as_of
264 .map(|as_of| x.suppress_early_progress_as_of(as_of));
265 }
266
267 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
270 inner,
271 *source_id,
272 Arc::clone(&compute_state.persist_clients),
273 &compute_state.txns_ctx,
274 &compute_state.worker_config,
275 source.storage_metadata.clone(),
276 read_schema,
277 dataflow.as_of.clone(),
278 snapshot_mode,
279 until.clone(),
280 mfp.as_mut(),
281 compute_state.dataflow_max_inflight_bytes(),
282 start_signal.clone(),
283 ErrorHandler::Halt("compute_import"),
284 );
285
286 let mut source_tokens: Vec<Rc<dyn Any>> = vec![Rc::new(token)];
287
288 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
291
292 if let Some(as_of) = suppress_early_progress_as_of {
296 ok_stream = suppress_early_progress(ok_stream, as_of);
297 }
298
299 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
300 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
302 .get(&compute_state.worker_config);
303 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
304 .get(&compute_state.worker_config)
305 .as_millis()
306 .try_into()
307 .expect("must fit");
308
309 let (token, stream) = ok_stream.limit_progress(
310 output_probe.clone(),
311 slack,
312 limit,
313 upper.clone(),
314 name.clone(),
315 );
316 ok_stream = stream;
317 source_tokens.push(token);
318 }
319
320 let input_probe =
322 compute_state.input_probe_for(*source_id, dataflow.export_ids());
323 ok_stream = ok_stream.probe_with(&input_probe);
324
325 let (ok_stream, err_stream) = match ct_source_transformer {
329 None => (ok_stream, err_stream),
330 Some(ct_source_transformer) => {
331 let (oks, errs, ct_times) = ct_source_transformer
332 .transform(ok_stream.as_collection(), err_stream.as_collection());
333 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
336 (oks.inner, errs.inner)
337 }
338 };
339
340 let (oks, errs) = (
341 ok_stream.as_collection().leave_region().leave_region(),
342 err_stream.as_collection().leave_region().leave_region(),
343 );
344
345 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
346
347 tokens.insert(*source_id, Rc::new(source_tokens));
349 });
350 }
351 });
352
353 if recursive {
356 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
357 let mut context = Context::for_dataflow_in(
358 &dataflow,
359 region.clone(),
360 compute_state,
361 until,
362 dataflow_expiration,
363 );
364
365 for (id, (oks, errs)) in imported_sources.into_iter() {
366 let bundle = crate::render::CollectionBundle::from_collections(
367 oks.enter(region),
368 errs.enter(region),
369 );
370 context.insert_id(id, bundle);
372 }
373
374 for (idx_id, idx) in &dataflow.index_imports {
376 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
377 context.import_index(
378 compute_state,
379 &mut tokens,
380 input_probe,
381 *idx_id,
382 &idx.desc,
383 start_signal.clone(),
384 );
385 }
386
387 for object in dataflow.objects_to_build {
389 let (probe, token) = shutdown_token(region);
390 context.shutdown_probe = probe;
391 tokens.insert(object.id, Rc::new(token));
392
393 let bundle = context.scope.clone().region_named(
394 &format!("BuildingObject({:?})", object.id),
395 |region| {
396 let depends = object.plan.depends();
397 let in_let = object.plan.is_recursive();
398 context
399 .enter_region(region, Some(&depends))
400 .render_recursive_plan(
401 object.id,
402 0,
403 object.plan,
404 BindingInfo::Body { in_let },
406 )
407 .leave_region()
408 },
409 );
410 let global_id = object.id;
411
412 context.log_dataflow_global_id(
413 *bundle
414 .scope()
415 .addr()
416 .first()
417 .expect("Dataflow root id must exist"),
418 global_id,
419 );
420 context.insert_id(Id::Global(object.id), bundle);
421 }
422
423 for (idx_id, dependencies, idx) in indexes {
425 context.export_index_iterative(
426 compute_state,
427 &tokens,
428 dependencies,
429 idx_id,
430 &idx,
431 &output_probe,
432 );
433 }
434
435 for (sink_id, dependencies, sink) in sinks {
437 context.export_sink(
438 compute_state,
439 &tokens,
440 dependencies,
441 sink_id,
442 &sink,
443 start_signal.clone(),
444 ct_ctx.input_times(&context.scope.parent),
445 &output_probe,
446 );
447 }
448 });
449 } else {
450 scope.clone().region_named(&build_name, |region| {
451 let mut context = Context::for_dataflow_in(
452 &dataflow,
453 region.clone(),
454 compute_state,
455 until,
456 dataflow_expiration,
457 );
458
459 for (id, (oks, errs)) in imported_sources.into_iter() {
460 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
461 let as_of = context.as_of_frontier.clone();
462 let summary = TEMPORAL_BUCKETING_SUMMARY
463 .get(&compute_state.worker_config)
464 .try_into()
465 .expect("must fit");
466 oks.inner
467 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
468 .as_collection()
469 } else {
470 oks
471 };
472 let bundle = crate::render::CollectionBundle::from_collections(
473 oks.enter_region(region),
474 errs.enter_region(region),
475 );
476 context.insert_id(id, bundle);
478 }
479
480 for (idx_id, idx) in &dataflow.index_imports {
482 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
483 context.import_index(
484 compute_state,
485 &mut tokens,
486 input_probe,
487 *idx_id,
488 &idx.desc,
489 start_signal.clone(),
490 );
491 }
492
493 for object in dataflow.objects_to_build {
495 let (probe, token) = shutdown_token(region);
496 context.shutdown_probe = probe;
497 tokens.insert(object.id, Rc::new(token));
498
499 let bundle = context.scope.clone().region_named(
500 &format!("BuildingObject({:?})", object.id),
501 |region| {
502 let depends = object.plan.depends();
503 context
504 .enter_region(region, Some(&depends))
505 .render_plan(object.id, object.plan)
506 .leave_region()
507 },
508 );
509 let global_id = object.id;
510 context.log_dataflow_global_id(
511 *bundle
512 .scope()
513 .addr()
514 .first()
515 .expect("Dataflow root id must exist"),
516 global_id,
517 );
518 context.insert_id(Id::Global(object.id), bundle);
519 }
520
521 for (idx_id, dependencies, idx) in indexes {
523 context.export_index(
524 compute_state,
525 &tokens,
526 dependencies,
527 idx_id,
528 &idx,
529 &output_probe,
530 );
531 }
532
533 for (sink_id, dependencies, sink) in sinks {
535 context.export_sink(
536 compute_state,
537 &tokens,
538 dependencies,
539 sink_id,
540 &sink,
541 start_signal.clone(),
542 ct_ctx.input_times(&context.scope.parent),
543 &output_probe,
544 );
545 }
546 });
547 }
548 })
549}
550
551impl<'g, G, T> Context<Child<'g, G, T>>
554where
555 G: Scope<Timestamp = mz_repr::Timestamp>,
556 T: Refines<G::Timestamp> + RenderTimestamp,
557{
558 pub(crate) fn import_index(
559 &mut self,
560 compute_state: &mut ComputeState,
561 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
562 input_probe: probe::Handle<mz_repr::Timestamp>,
563 idx_id: GlobalId,
564 idx: &IndexDesc,
565 start_signal: StartSignal,
566 ) {
567 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
568 assert!(
569 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
570 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
571 );
572
573 let token = traces.to_drop().clone();
574
575 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
576 &self.scope.parent,
577 &format!("Index({}, {:?})", idx.on_id, idx.key),
578 self.as_of_frontier.clone(),
579 self.until.clone(),
580 );
581
582 oks.stream = oks.stream.probe_with(&input_probe);
583
584 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
585 &self.scope.parent,
586 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
587 self.as_of_frontier.clone(),
588 self.until.clone(),
589 );
590
591 let ok_arranged = oks
592 .enter(&self.scope)
593 .with_start_signal(start_signal.clone());
594 let err_arranged = err_arranged
595 .enter(&self.scope)
596 .with_start_signal(start_signal);
597
598 self.update_id(
599 Id::Global(idx.on_id),
600 CollectionBundle::from_expressions(
601 idx.key.clone(),
602 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
603 ),
604 );
605 tokens.insert(
606 idx_id,
607 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
608 );
609 } else {
610 panic!(
611 "import of index {} failed while building dataflow {}",
612 idx_id, self.dataflow_id
613 );
614 }
615 }
616}
617
618impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
621where
622 G: Scope<Timestamp = mz_repr::Timestamp>,
623{
624 pub(crate) fn export_index(
625 &self,
626 compute_state: &mut ComputeState,
627 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
628 dependency_ids: BTreeSet<GlobalId>,
629 idx_id: GlobalId,
630 idx: &IndexDesc,
631 output_probe: &MzProbeHandle<G::Timestamp>,
632 ) {
633 let mut needed_tokens = Vec::new();
635 for dep_id in dependency_ids {
636 if let Some(token) = tokens.get(&dep_id) {
637 needed_tokens.push(Rc::clone(token));
638 }
639 }
640 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
641 panic!(
642 "Arrangement alarmingly absent! id: {:?}",
643 Id::Global(idx_id)
644 )
645 });
646
647 match bundle.arrangement(&idx.key) {
648 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
649 if let Some(&expiration) = self.dataflow_expiration.as_option() {
652 let token = Rc::new(());
653 let shutdown_token = Rc::downgrade(&token);
654 oks.stream = oks.stream.expire_stream_at(
655 &format!("{}_export_index_oks", self.debug_name),
656 expiration,
657 Weak::clone(&shutdown_token),
658 );
659 errs.stream = errs.stream.expire_stream_at(
660 &format!("{}_export_index_errs", self.debug_name),
661 expiration,
662 shutdown_token,
663 );
664 needed_tokens.push(token);
665 }
666
667 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
668
669 if let Some(logger) = compute_state.compute_logger.clone() {
671 errs.stream.log_dataflow_errors(logger, idx_id);
672 }
673
674 compute_state.traces.set(
675 idx_id,
676 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
677 );
678 }
679 Some(ArrangementFlavor::Trace(gid, _, _)) => {
680 let trace = compute_state.traces.get(&gid).unwrap().clone();
683 compute_state.traces.set(idx_id, trace);
684 }
685 None => {
686 println!("collection available: {:?}", bundle.collection.is_none());
687 println!(
688 "keys available: {:?}",
689 bundle.arranged.keys().collect::<Vec<_>>()
690 );
691 panic!(
692 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
693 Id::Global(idx_id),
694 &idx.key
695 );
696 }
697 };
698 }
699}
700
701impl<'g, G, T> Context<Child<'g, G, T>>
704where
705 G: Scope<Timestamp = mz_repr::Timestamp>,
706 T: RenderTimestamp,
707{
708 pub(crate) fn export_index_iterative(
709 &self,
710 compute_state: &mut ComputeState,
711 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
712 dependency_ids: BTreeSet<GlobalId>,
713 idx_id: GlobalId,
714 idx: &IndexDesc,
715 output_probe: &MzProbeHandle<G::Timestamp>,
716 ) {
717 let mut needed_tokens = Vec::new();
719 for dep_id in dependency_ids {
720 if let Some(token) = tokens.get(&dep_id) {
721 needed_tokens.push(Rc::clone(token));
722 }
723 }
724 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
725 panic!(
726 "Arrangement alarmingly absent! id: {:?}",
727 Id::Global(idx_id)
728 )
729 });
730
731 match bundle.arrangement(&idx.key) {
732 Some(ArrangementFlavor::Local(oks, errs)) => {
733 let mut oks = oks
737 .as_collection(|k, v| (k.to_row(), v.to_row()))
738 .leave()
739 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
740 "Arrange export iterative",
741 );
742
743 let mut errs = errs
744 .as_collection(|k, v| (k.clone(), v.clone()))
745 .leave()
746 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
747 "Arrange export iterative err",
748 );
749
750 if let Some(&expiration) = self.dataflow_expiration.as_option() {
753 let token = Rc::new(());
754 let shutdown_token = Rc::downgrade(&token);
755 oks.stream = oks.stream.expire_stream_at(
756 &format!("{}_export_index_iterative_oks", self.debug_name),
757 expiration,
758 Weak::clone(&shutdown_token),
759 );
760 errs.stream = errs.stream.expire_stream_at(
761 &format!("{}_export_index_iterative_err", self.debug_name),
762 expiration,
763 shutdown_token,
764 );
765 needed_tokens.push(token);
766 }
767
768 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
769
770 if let Some(logger) = compute_state.compute_logger.clone() {
772 errs.stream.log_dataflow_errors(logger, idx_id);
773 }
774
775 compute_state.traces.set(
776 idx_id,
777 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
778 );
779 }
780 Some(ArrangementFlavor::Trace(gid, _, _)) => {
781 let trace = compute_state.traces.get(&gid).unwrap().clone();
784 compute_state.traces.set(idx_id, trace);
785 }
786 None => {
787 println!("collection available: {:?}", bundle.collection.is_none());
788 println!(
789 "keys available: {:?}",
790 bundle.arranged.keys().collect::<Vec<_>>()
791 );
792 panic!(
793 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
794 Id::Global(idx_id),
795 &idx.key
796 );
797 }
798 };
799 }
800}
801
802enum BindingInfo {
808 Body { in_let: bool },
809 Let { id: LocalId, last: bool },
810 LetRec { id: LocalId, last: bool },
811}
812
813impl<G> Context<G>
814where
815 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
816{
817 fn render_recursive_plan(
830 &mut self,
831 object_id: GlobalId,
832 level: usize,
833 plan: RenderPlan,
834 binding: BindingInfo,
835 ) -> CollectionBundle<G> {
836 for BindStage { lets, recs } in plan.binds {
837 let mut let_iter = lets.into_iter().peekable();
839 while let Some(LetBind { id, value }) = let_iter.next() {
840 let bundle =
841 self.scope
842 .clone()
843 .region_named(&format!("Binding({:?})", id), |region| {
844 let depends = value.depends();
845 let last = let_iter.peek().is_none();
846 let binding = BindingInfo::Let { id, last };
847 self.enter_region(region, Some(&depends))
848 .render_letfree_plan(object_id, value, binding)
849 .leave_region()
850 });
851 self.insert_id(Id::Local(id), bundle);
852 }
853
854 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
855
856 let mut variables = BTreeMap::new();
860 for id in rec_ids.iter() {
861 use differential_dataflow::dynamic::feedback_summary;
862 use differential_dataflow::operators::iterate::Variable;
863 let inner = feedback_summary::<u64>(level + 1, 1);
864 let oks_v = Variable::new(
865 &mut self.scope,
866 Product::new(Default::default(), inner.clone()),
867 );
868 let err_v = Variable::new(&mut self.scope, Product::new(Default::default(), inner));
869
870 self.insert_id(
871 Id::Local(*id),
872 CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
873 );
874 variables.insert(Id::Local(*id), (oks_v, err_v));
875 }
876 let mut rec_iter = recs.into_iter().peekable();
878 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
879 let last = rec_iter.peek().is_none();
880 let binding = BindingInfo::LetRec { id, last };
881 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
882 let (oks, mut err) = bundle.collection.clone().unwrap();
885 self.insert_id(Id::Local(id), bundle);
886 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
887
888 let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
890
891 if let Some(limit) = limit {
892 let (in_limit, over_limit) =
895 oks.inner.branch_when(move |Product { inner: ps, .. }| {
896 let iteration_index = *ps.get(level).unwrap_or(&0);
898 iteration_index + 1 >= limit.max_iters.into()
900 });
901 oks = Collection::new(in_limit);
902 if !limit.return_at_limit {
903 err = err.concat(&Collection::new(over_limit).map(move |_data| {
904 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
905 format!("{}", limit.max_iters.get()).into(),
906 )))
907 }));
908 }
909 }
910
911 let err: KeyCollection<_, _, _> = err.into();
917 let errs = err
918 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
919 "Arrange recursive err",
920 )
921 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
922 "Distinct recursive err",
923 move |_k, _s, t| t.push(((), Diff::ONE)),
924 )
925 .as_collection(|k, _| k.clone());
926
927 let oks = render_shutdown_fuse(oks, self.shutdown_probe.clone());
930 let errs = render_shutdown_fuse(errs, self.shutdown_probe.clone());
931
932 oks_v.set(&oks);
933 err_v.set(&errs);
934 }
935 for id in rec_ids.into_iter() {
937 let bundle = self.remove_id(Id::Local(id)).unwrap();
938 let (oks, err) = bundle.collection.unwrap();
939 self.insert_id(
940 Id::Local(id),
941 CollectionBundle::from_collections(
942 oks.leave_dynamic(level + 1),
943 err.leave_dynamic(level + 1),
944 ),
945 );
946 }
947 }
948
949 self.render_letfree_plan(object_id, plan.body, binding)
950 }
951}
952
953impl<G> Context<G>
954where
955 G: Scope,
956 G::Timestamp: RenderTimestamp,
957{
958 fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
969 let mut in_let = false;
970 for BindStage { lets, recs } in plan.binds {
971 assert!(recs.is_empty());
972
973 let mut let_iter = lets.into_iter().peekable();
974 while let Some(LetBind { id, value }) = let_iter.next() {
975 in_let = true;
977 let bundle =
978 self.scope
979 .clone()
980 .region_named(&format!("Binding({:?})", id), |region| {
981 let depends = value.depends();
982 let last = let_iter.peek().is_none();
983 let binding = BindingInfo::Let { id, last };
984 self.enter_region(region, Some(&depends))
985 .render_letfree_plan(object_id, value, binding)
986 .leave_region()
987 });
988 self.insert_id(Id::Local(id), bundle);
989 }
990 }
991
992 self.scope.clone().region_named("Main Body", |region| {
993 let depends = plan.body.depends();
994 self.enter_region(region, Some(&depends))
995 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
996 .leave_region()
997 })
998 }
999
1000 fn render_letfree_plan(
1002 &mut self,
1003 object_id: GlobalId,
1004 plan: LetFreePlan,
1005 binding: BindingInfo,
1006 ) -> CollectionBundle<G> {
1007 let (mut nodes, root_id, topological_order) = plan.destruct();
1008
1009 let mut collections = BTreeMap::new();
1011
1012 let should_compute_lir_metadata = self.compute_logger.is_some();
1018 let mut lir_mapping_metadata = if should_compute_lir_metadata {
1019 Some(Vec::with_capacity(nodes.len()))
1020 } else {
1021 None
1022 };
1023
1024 let mut topo_iter = topological_order.into_iter().peekable();
1025 while let Some(lir_id) = topo_iter.next() {
1026 let node = nodes.remove(&lir_id).unwrap();
1027
1028 let metadata = if should_compute_lir_metadata {
1032 let operator = node.expr.humanize(&DummyHumanizer);
1033
1034 let operator = if topo_iter.peek().is_none() {
1036 match &binding {
1037 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1038 BindingInfo::Body { in_let: false } => operator,
1039 BindingInfo::Let { id, last: true } => {
1040 format!("With {id} = {operator}")
1041 }
1042 BindingInfo::Let { id, last: false } => {
1043 format!("{id} = {operator}")
1044 }
1045 BindingInfo::LetRec { id, last: true } => {
1046 format!("With Recursive {id} = {operator}")
1047 }
1048 BindingInfo::LetRec { id, last: false } => {
1049 format!("{id} = {operator}")
1050 }
1051 }
1052 } else {
1053 operator
1054 };
1055
1056 let operator_id_start = self.scope.peek_identifier();
1057 Some((operator, operator_id_start))
1058 } else {
1059 None
1060 };
1061
1062 let mut bundle = self.render_plan_expr(node.expr, &collections);
1063
1064 if let Some((operator, operator_id_start)) = metadata {
1065 let operator_id_end = self.scope.peek_identifier();
1066 let operator_span = (operator_id_start, operator_id_end);
1067
1068 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1069 lir_mapping_metadata.push((
1070 lir_id,
1071 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1072 ))
1073 }
1074 }
1075
1076 self.log_operator_hydration(&mut bundle, lir_id);
1077
1078 collections.insert(lir_id, bundle);
1079 }
1080
1081 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1082 self.log_lir_mapping(object_id, lir_mapping_metadata);
1083 }
1084
1085 collections
1086 .remove(&root_id)
1087 .expect("LetFreePlan invariant (1)")
1088 }
1089
1090 fn render_plan_expr(
1097 &mut self,
1098 expr: render_plan::Expr,
1099 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1100 ) -> CollectionBundle<G> {
1101 use render_plan::Expr::*;
1102
1103 let expect_input = |id| {
1104 collections
1105 .get(&id)
1106 .cloned()
1107 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1108 };
1109
1110 match expr {
1111 Constant { rows } => {
1112 let (rows, errs) = match rows {
1114 Ok(rows) => (rows, Vec::new()),
1115 Err(e) => (Vec::new(), vec![e]),
1116 };
1117
1118 let as_of_frontier = self.as_of_frontier.clone();
1120 let until = self.until.clone();
1121 let ok_collection = rows
1122 .into_iter()
1123 .filter_map(move |(row, mut time, diff)| {
1124 time.advance_by(as_of_frontier.borrow());
1125 if !until.less_equal(&time) {
1126 Some((
1127 row,
1128 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1129 diff,
1130 ))
1131 } else {
1132 None
1133 }
1134 })
1135 .to_stream(&mut self.scope)
1136 .as_collection();
1137
1138 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1139 error_time.advance_by(self.as_of_frontier.borrow());
1140 let err_collection = errs
1141 .into_iter()
1142 .map(move |e| {
1143 (
1144 DataflowError::from(e),
1145 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1146 Diff::ONE,
1147 )
1148 })
1149 .to_stream(&mut self.scope)
1150 .as_collection();
1151
1152 CollectionBundle::from_collections(ok_collection, err_collection)
1153 }
1154 Get { id, keys, plan } => {
1155 let mut collection = self
1158 .lookup_id(id)
1159 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1160 match plan {
1161 mz_compute_types::plan::GetPlan::PassArrangements => {
1162 assert!(
1164 keys.arranged
1165 .iter()
1166 .all(|(key, _, _)| collection.arranged.contains_key(key))
1167 );
1168 assert!(keys.raw <= collection.collection.is_some());
1169 collection.arranged.retain(|key, _value| {
1171 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1172 });
1173 collection
1174 }
1175 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1176 let (oks, errs) = collection.as_collection_core(
1177 mfp,
1178 Some((key, row)),
1179 self.until.clone(),
1180 &self.config_set,
1181 );
1182 CollectionBundle::from_collections(oks, errs)
1183 }
1184 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1185 let (oks, errs) = collection.as_collection_core(
1186 mfp,
1187 None,
1188 self.until.clone(),
1189 &self.config_set,
1190 );
1191 CollectionBundle::from_collections(oks, errs)
1192 }
1193 }
1194 }
1195 Mfp {
1196 input,
1197 mfp,
1198 input_key_val,
1199 } => {
1200 let input = expect_input(input);
1201 if mfp.is_identity() {
1203 input
1204 } else {
1205 let (oks, errs) = input.as_collection_core(
1206 mfp,
1207 input_key_val,
1208 self.until.clone(),
1209 &self.config_set,
1210 );
1211 CollectionBundle::from_collections(oks, errs)
1212 }
1213 }
1214 FlatMap {
1215 input_key,
1216 input,
1217 exprs,
1218 func,
1219 mfp_after: mfp,
1220 } => {
1221 let input = expect_input(input);
1222 self.render_flat_map(input_key, input, exprs, func, mfp)
1223 }
1224 Join { inputs, plan } => {
1225 let inputs = inputs.into_iter().map(expect_input).collect();
1226 match plan {
1227 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1228 self.render_join(inputs, linear_plan)
1229 }
1230 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1231 self.render_delta_join(inputs, delta_plan)
1232 }
1233 }
1234 }
1235 Reduce {
1236 input_key,
1237 input,
1238 key_val_plan,
1239 plan,
1240 mfp_after,
1241 } => {
1242 let input = expect_input(input);
1243 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1244 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1245 }
1246 TopK { input, top_k_plan } => {
1247 let input = expect_input(input);
1248 self.render_topk(input, top_k_plan)
1249 }
1250 Negate { input } => {
1251 let input = expect_input(input);
1252 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1253 CollectionBundle::from_collections(oks.negate(), errs)
1254 }
1255 Threshold {
1256 input,
1257 threshold_plan,
1258 } => {
1259 let input = expect_input(input);
1260 self.render_threshold(input, threshold_plan)
1261 }
1262 Union {
1263 inputs,
1264 consolidate_output,
1265 } => {
1266 let mut oks = Vec::new();
1267 let mut errs = Vec::new();
1268 for input in inputs.into_iter() {
1269 let (os, es) =
1270 expect_input(input).as_specific_collection(None, &self.config_set);
1271 oks.push(os);
1272 errs.push(es);
1273 }
1274 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1275 if consolidate_output {
1276 oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1277 }
1278 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1279 CollectionBundle::from_collections(oks, errs)
1280 }
1281 ArrangeBy {
1282 input_key,
1283 input,
1284 input_mfp,
1285 forms: keys,
1286 } => {
1287 let input = expect_input(input);
1288 input.ensure_collections(
1289 keys,
1290 input_key,
1291 input_mfp,
1292 self.until.clone(),
1293 &self.config_set,
1294 )
1295 }
1296 }
1297 }
1298
1299 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1300 if let Some(logger) = &self.compute_logger {
1301 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1302 dataflow_index,
1303 global_id,
1304 }));
1305 }
1306 }
1307
1308 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1309 if let Some(logger) = &self.compute_logger {
1310 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1311 }
1312 }
1313
1314 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1315 match bundle.arranged.values_mut().next() {
1335 Some(arrangement) => {
1336 use ArrangementFlavor::*;
1337
1338 match arrangement {
1339 Local(a, _) => {
1340 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1341 }
1342 Trace(_, a, _) => {
1343 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1344 }
1345 }
1346 }
1347 None => {
1348 let (oks, _) = bundle
1349 .collection
1350 .as_mut()
1351 .expect("CollectionBundle invariant");
1352 let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1353 *oks = stream.as_collection();
1354 }
1355 }
1356 }
1357
1358 fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1359 where
1360 D: Clone + 'static,
1361 {
1362 let Some(logger) = self.hydration_logger.clone() else {
1363 return stream.clone(); };
1365
1366 let mut hydration_frontier = Antichain::new();
1374 for time in self.as_of_frontier.iter() {
1375 if let Some(time) = time.try_step_forward() {
1376 hydration_frontier.insert(Refines::to_inner(time));
1377 }
1378 }
1379
1380 let name = format!("LogOperatorHydration ({lir_id})");
1381 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1382 let mut hydrated = false;
1383 logger.log(lir_id, hydrated);
1384
1385 move |input, output| {
1386 input.for_each(|cap, data| {
1388 output.session(&cap).give_container(data);
1389 });
1390
1391 if hydrated {
1392 return;
1393 }
1394
1395 let frontier = input.frontier().frontier();
1396 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier) {
1397 hydrated = true;
1398 logger.log(lir_id, hydrated);
1399 }
1400 }
1401 })
1402 }
1403}
1404
1405#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1408 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1413 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1415 fn event_time(&self) -> mz_repr::Timestamp;
1417 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1419 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1421 fn step_back(&self) -> Self;
1424}
1425
1426impl RenderTimestamp for mz_repr::Timestamp {
1427 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1428 self
1429 }
1430 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1431 delay
1432 }
1433 fn event_time(&self) -> mz_repr::Timestamp {
1434 *self
1435 }
1436 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1437 self
1438 }
1439 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1440 delay
1441 }
1442 fn step_back(&self) -> Self {
1443 self.saturating_sub(1)
1444 }
1445}
1446
1447impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1448 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1449 &mut self.outer
1450 }
1451 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1452 Product::new(delay, Default::default())
1453 }
1454 fn event_time(&self) -> mz_repr::Timestamp {
1455 self.outer
1456 }
1457 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1458 &mut self.outer
1459 }
1460 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1461 Product::new(delay, Default::default())
1462 }
1463 fn step_back(&self) -> Self {
1464 let inner = self.inner.clone();
1468 let mut vec = inner.into_vec();
1469 for item in vec.iter_mut() {
1470 *item = item.saturating_sub(1);
1471 }
1472 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1473 }
1474}
1475
1476#[derive(Clone)]
1486pub(crate) struct StartSignal {
1487 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1492 token_ref: Weak<RefCell<Box<dyn Any>>>,
1494}
1495
1496impl StartSignal {
1497 pub fn new() -> (Self, Rc<dyn Any>) {
1500 let (tx, rx) = oneshot::channel::<Infallible>();
1501 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1502 let signal = Self {
1503 fut: rx.shared(),
1504 token_ref: Rc::downgrade(&token),
1505 };
1506 (signal, token)
1507 }
1508
1509 pub fn has_fired(&self) -> bool {
1510 self.token_ref.strong_count() == 0
1511 }
1512
1513 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1514 if let Some(token) = self.token_ref.upgrade() {
1515 let mut token = token.borrow_mut();
1516 let inner = std::mem::replace(&mut *token, Box::new(()));
1517 *token = Box::new((inner, to_drop));
1518 }
1519 }
1520}
1521
1522impl Future for StartSignal {
1523 type Output = ();
1524
1525 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1526 self.fut.poll_unpin(cx).map(|_| ())
1527 }
1528}
1529
1530pub(crate) trait WithStartSignal {
1532 fn with_start_signal(self, signal: StartSignal) -> Self;
1537}
1538
1539impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1540where
1541 S: Scope,
1542 S::Timestamp: RenderTimestamp,
1543 Tr: TraceReader + Clone,
1544{
1545 fn with_start_signal(self, signal: StartSignal) -> Self {
1546 Arranged {
1547 stream: self.stream.with_start_signal(signal),
1548 trace: self.trace,
1549 }
1550 }
1551}
1552
1553impl<S, D> WithStartSignal for Stream<S, D>
1554where
1555 S: Scope,
1556 D: timely::Data,
1557{
1558 fn with_start_signal(self, signal: StartSignal) -> Self {
1559 self.unary(Pipeline, "StartSignal", |_cap, info| {
1560 let token = Box::new(ActivateOnDrop::new(
1561 (),
1562 info.address,
1563 self.scope().activations(),
1564 ));
1565 signal.drop_on_fire(token);
1566
1567 let mut stash = Vec::new();
1568
1569 move |input, output| {
1570 if !signal.has_fired() {
1572 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1573 return;
1574 }
1575
1576 for (cap, mut data) in std::mem::take(&mut stash) {
1578 output.session(&cap).give_container(&mut data);
1579 }
1580
1581 input.for_each(|cap, data| {
1583 output.session(&cap).give_container(data);
1584 });
1585 }
1586 })
1587 }
1588}
1589
1590fn render_shutdown_fuse<G, D>(
1594 collection: Collection<G, D, Diff>,
1595 probe: ShutdownProbe,
1596) -> Collection<G, D, Diff>
1597where
1598 G: Scope,
1599 D: Data,
1600{
1601 let stream = collection.inner;
1602 let stream = stream.unary(Pipeline, "ShutdownFuse", move |_cap, _info| {
1603 move |input, output| {
1604 input.for_each(|cap, data| {
1605 if !probe.in_shutdown() {
1606 output.session(&cap).give_container(data);
1607 }
1608 });
1609 }
1610 });
1611 stream.as_collection()
1612}
1613
1614fn suppress_early_progress<G, D>(
1636 stream: Stream<G, D>,
1637 as_of: Antichain<G::Timestamp>,
1638) -> Stream<G, D>
1639where
1640 G: Scope,
1641 D: Data,
1642{
1643 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1644 let mut early_cap = Some(default_cap);
1645
1646 move |input, output| {
1647 input.for_each(|data_cap, data| {
1648 let mut session = if as_of.less_than(data_cap.time()) {
1649 output.session(&data_cap)
1650 } else {
1651 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1652 output.session(cap)
1653 };
1654 session.give_container(data);
1655 });
1656
1657 let frontier = input.frontier().frontier();
1658 if !PartialOrder::less_equal(&frontier, &as_of.borrow()) {
1659 early_cap.take();
1660 }
1661 }
1662 })
1663}
1664
1665trait LimitProgress<T: Timestamp> {
1667 fn limit_progress(
1697 &self,
1698 handle: MzProbeHandle<T>,
1699 slack_ms: u64,
1700 limit: Option<usize>,
1701 upper: Antichain<T>,
1702 name: String,
1703 ) -> (Rc<dyn Any>, Self);
1704}
1705
1706impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1709where
1710 G: Scope<Timestamp = mz_repr::Timestamp>,
1711 D: timely::Data,
1712 R: timely::Data,
1713{
1714 fn limit_progress(
1715 &self,
1716 handle: MzProbeHandle<mz_repr::Timestamp>,
1717 slack_ms: u64,
1718 limit: Option<usize>,
1719 upper: Antichain<mz_repr::Timestamp>,
1720 name: String,
1721 ) -> (Rc<dyn Any>, Self) {
1722 let mut button = None;
1723
1724 let stream =
1725 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1726 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1728 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1730
1731 let activator = self.scope().activator_for(info.address);
1732 handle.activate(activator.clone());
1733
1734 let shutdown = Rc::new(());
1735 button = Some(ShutdownButton::new(
1736 Rc::new(RefCell::new(Some(Rc::clone(&shutdown)))),
1737 activator,
1738 ));
1739 let shutdown = Rc::downgrade(&shutdown);
1740
1741 move |input, output| {
1742 if shutdown.strong_count() == 0 {
1744 retained_cap = None;
1745 pending_times.clear();
1746 while let Some(_) = input.next() {}
1747 return;
1748 }
1749
1750 while let Some((cap, data)) = input.next() {
1751 for time in data
1752 .iter()
1753 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1754 {
1755 let rounded_time =
1756 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1757 if !upper.less_than(&rounded_time.into()) {
1758 pending_times.insert(rounded_time.into());
1759 }
1760 }
1761 output.session(&cap).give_container(data);
1762 if retained_cap.as_ref().is_none_or(|c| {
1763 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1764 }) {
1765 retained_cap = Some(cap.retain());
1766 }
1767 }
1768
1769 handle.with_frontier(|f| {
1770 while pending_times
1771 .first()
1772 .map_or(false, |retained_time| !f.less_than(&retained_time))
1773 {
1774 let _ = pending_times.pop_first();
1775 }
1776 });
1777
1778 while limit.map_or(false, |limit| pending_times.len() > limit) {
1779 let _ = pending_times.pop_first();
1780 }
1781
1782 match (retained_cap.as_mut(), pending_times.first()) {
1783 (Some(cap), Some(first)) => cap.downgrade(first),
1784 (_, None) => retained_cap = None,
1785 _ => {}
1786 }
1787
1788 if input.frontier.is_empty() {
1789 retained_cap = None;
1790 pending_times.clear();
1791 }
1792
1793 if !pending_times.is_empty() {
1794 tracing::debug!(
1795 name,
1796 info.global_id,
1797 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1798 frontier = ?input.frontier.frontier().get(0),
1799 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1800 ?upper,
1801 "pending times",
1802 );
1803 }
1804 }
1805 });
1806 (Rc::new(button.unwrap().press_on_drop()), stream)
1807 }
1808}
1809
1810struct PendingTimesDisplay<T>(T);
1813
1814impl<T> std::fmt::Display for PendingTimesDisplay<T>
1815where
1816 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1817{
1818 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1819 let mut iter = self.0.clone().into_iter();
1820 write!(f, "[")?;
1821 if let Some(first) = iter.next() {
1822 write!(f, "{}", first)?;
1823 let mut last = u64::from(first);
1824 for time in iter {
1825 write!(f, ", +{}", u64::from(time) - last)?;
1826 last = u64::from(time);
1827 }
1828 }
1829 write!(f, "]")?;
1830 Ok(())
1831 }
1832}
1833
1834#[derive(Clone, Copy, Debug)]
1837struct Pairer {
1838 split_arity: usize,
1839}
1840
1841impl Pairer {
1842 fn new(split_arity: usize) -> Self {
1844 Self { split_arity }
1845 }
1846
1847 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1849 where
1850 I1: IntoIterator<Item = Datum<'a>>,
1851 I2: IntoIterator<Item = Datum<'a>>,
1852 {
1853 SharedRow::pack(first.into_iter().chain(second))
1854 }
1855
1856 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1858 let mut datum_iter = datum_iter.into_iter();
1859 let mut row_builder = SharedRow::get();
1860 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1861 let second = row_builder.pack_using(datum_iter);
1862 (first, second)
1863 }
1864}