1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use differential_dataflow::dynamic::pointstamp::PointStamp;
114use differential_dataflow::lattice::Lattice;
115use differential_dataflow::operators::arrange::Arranged;
116use differential_dataflow::operators::iterate::SemigroupVariable;
117use differential_dataflow::trace::TraceReader;
118use differential_dataflow::{AsCollection, Data, VecCollection};
119use futures::FutureExt;
120use futures::channel::oneshot;
121use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
122use mz_compute_types::dyncfgs::{
123 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
124 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
125 ENABLE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
126};
127use mz_compute_types::plan::LirId;
128use mz_compute_types::plan::render_plan::{
129 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
130};
131use mz_expr::{EvalError, Id, LocalId};
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_repr::explain::DummyHumanizer;
134use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
135use mz_storage_operators::persist_source;
136use mz_storage_types::controller::CollectionMetadata;
137use mz_storage_types::errors::DataflowError;
138use mz_timely_util::operator::{CollectionExt, StreamExt};
139use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
140use timely::PartialOrder;
141use timely::communication::Allocate;
142use timely::container::CapacityContainerBuilder;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::Product;
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::{AsWorker, Worker as TimelyWorker};
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::extensions::temporal_bucket::TemporalBucketing;
159use crate::logging::compute::{
160 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors, OperatorHydration,
161};
162use crate::render::context::{ArrangementFlavor, Context};
163use crate::render::continual_task::ContinualTaskCtx;
164use crate::row_spine::{RowRowBatcher, RowRowBuilder};
165use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher, MzTimestamp};
166
167pub mod context;
168pub(crate) mod continual_task;
169mod errors;
170mod flat_map;
171mod join;
172mod reduce;
173pub mod sinks;
174mod threshold;
175mod top_k;
176
177pub use context::CollectionBundle;
178pub use join::LinearJoinSpec;
179
180pub fn build_compute_dataflow<A: Allocate>(
186 timely_worker: &mut TimelyWorker<A>,
187 compute_state: &mut ComputeState,
188 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
189 start_signal: StartSignal,
190 until: Antichain<mz_repr::Timestamp>,
191 dataflow_expiration: Antichain<mz_repr::Timestamp>,
192) {
193 let recursive = dataflow
195 .objects_to_build
196 .iter()
197 .any(|object| object.plan.is_recursive());
198
199 let indexes = dataflow
201 .index_exports
202 .iter()
203 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
204 .collect::<Vec<_>>();
205
206 let sinks = dataflow
208 .sink_exports
209 .iter()
210 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
211 .collect::<Vec<_>>();
212
213 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
214 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
215
216 let name = format!("Dataflow: {}", &dataflow.debug_name);
217 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
218 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
219
220 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
221 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
224
225 let mut imported_sources = Vec::new();
230 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
231 let output_probe = MzProbeHandle::default();
232
233 scope.clone().region_named(&input_name, |region| {
234 for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
236 region.region_named(&format!("Source({:?})", source_id), |inner| {
237 let mut read_schema = None;
238 let mut mfp = source.arguments.operators.clone().map(|mut ops| {
239 if apply_demands {
242 let demands = ops.demand();
243 let new_desc =
244 source.storage_metadata.relation_desc.apply_demand(&demands);
245 let new_arity = demands.len();
246 let remap: BTreeMap<_, _> = demands
247 .into_iter()
248 .enumerate()
249 .map(|(new, old)| (old, new))
250 .collect();
251 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
252 read_schema = Some(new_desc);
253 }
254
255 mz_expr::MfpPlan::create_from(ops)
256 .expect("Linear operators should always be valid")
257 });
258
259 let mut snapshot_mode = SnapshotMode::Include;
260 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
261 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
262 if let Some(x) = ct_source_transformer.as_ref() {
263 snapshot_mode = x.snapshot_mode();
264 suppress_early_progress_as_of = suppress_early_progress_as_of
265 .map(|as_of| x.suppress_early_progress_as_of(as_of));
266 }
267
268 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
271 inner,
272 *source_id,
273 Arc::clone(&compute_state.persist_clients),
274 &compute_state.txns_ctx,
275 source.storage_metadata.clone(),
276 read_schema,
277 dataflow.as_of.clone(),
278 snapshot_mode,
279 until.clone(),
280 mfp.as_mut(),
281 compute_state.dataflow_max_inflight_bytes(),
282 start_signal.clone(),
283 ErrorHandler::Halt("compute_import"),
284 );
285
286 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
289
290 if let Some(as_of) = suppress_early_progress_as_of {
294 ok_stream = suppress_early_progress(ok_stream, as_of);
295 }
296
297 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
298 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
300 .get(&compute_state.worker_config);
301 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
302 .get(&compute_state.worker_config)
303 .as_millis()
304 .try_into()
305 .expect("must fit");
306
307 let stream = ok_stream.limit_progress(
308 output_probe.clone(),
309 slack,
310 limit,
311 upper.clone(),
312 name.clone(),
313 );
314 ok_stream = stream;
315 }
316
317 let input_probe =
319 compute_state.input_probe_for(*source_id, dataflow.export_ids());
320 ok_stream = ok_stream.probe_with(&input_probe);
321
322 let (ok_stream, err_stream) = match ct_source_transformer {
326 None => (ok_stream, err_stream),
327 Some(ct_source_transformer) => {
328 let (oks, errs, ct_times) = ct_source_transformer
329 .transform(ok_stream.as_collection(), err_stream.as_collection());
330 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
333 (oks.inner, errs.inner)
334 }
335 };
336
337 let (oks, errs) = (
338 ok_stream.as_collection().leave_region().leave_region(),
339 err_stream.as_collection().leave_region().leave_region(),
340 );
341
342 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
343
344 tokens.insert(*source_id, Rc::new(token));
346 });
347 }
348 });
349
350 if recursive {
353 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
354 let mut context = Context::for_dataflow_in(
355 &dataflow,
356 region.clone(),
357 compute_state,
358 until,
359 dataflow_expiration,
360 );
361
362 for (id, (oks, errs)) in imported_sources.into_iter() {
363 let bundle = crate::render::CollectionBundle::from_collections(
364 oks.enter(region),
365 errs.enter(region),
366 );
367 context.insert_id(id, bundle);
369 }
370
371 for (idx_id, idx) in &dataflow.index_imports {
373 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
374 context.import_index(
375 compute_state,
376 &mut tokens,
377 input_probe,
378 *idx_id,
379 &idx.desc,
380 start_signal.clone(),
381 );
382 }
383
384 for object in dataflow.objects_to_build {
386 let bundle = context.scope.clone().region_named(
387 &format!("BuildingObject({:?})", object.id),
388 |region| {
389 let depends = object.plan.depends();
390 let in_let = object.plan.is_recursive();
391 context
392 .enter_region(region, Some(&depends))
393 .render_recursive_plan(
394 object.id,
395 0,
396 object.plan,
397 BindingInfo::Body { in_let },
399 )
400 .leave_region()
401 },
402 );
403 let global_id = object.id;
404
405 context.log_dataflow_global_id(
406 *bundle
407 .scope()
408 .addr()
409 .first()
410 .expect("Dataflow root id must exist"),
411 global_id,
412 );
413 context.insert_id(Id::Global(object.id), bundle);
414 }
415
416 for (idx_id, dependencies, idx) in indexes {
418 context.export_index_iterative(
419 compute_state,
420 &tokens,
421 dependencies,
422 idx_id,
423 &idx,
424 &output_probe,
425 );
426 }
427
428 for (sink_id, dependencies, sink) in sinks {
430 context.export_sink(
431 compute_state,
432 &tokens,
433 dependencies,
434 sink_id,
435 &sink,
436 start_signal.clone(),
437 ct_ctx.input_times(&context.scope.parent),
438 &output_probe,
439 );
440 }
441 });
442 } else {
443 scope.clone().region_named(&build_name, |region| {
444 let mut context = Context::for_dataflow_in(
445 &dataflow,
446 region.clone(),
447 compute_state,
448 until,
449 dataflow_expiration,
450 );
451
452 for (id, (oks, errs)) in imported_sources.into_iter() {
453 let oks = if ENABLE_TEMPORAL_BUCKETING.get(&compute_state.worker_config) {
454 let as_of = context.as_of_frontier.clone();
455 let summary = TEMPORAL_BUCKETING_SUMMARY
456 .get(&compute_state.worker_config)
457 .try_into()
458 .expect("must fit");
459 oks.inner
460 .bucket::<CapacityContainerBuilder<_>>(as_of, summary)
461 .as_collection()
462 } else {
463 oks
464 };
465 let bundle = crate::render::CollectionBundle::from_collections(
466 oks.enter_region(region),
467 errs.enter_region(region),
468 );
469 context.insert_id(id, bundle);
471 }
472
473 for (idx_id, idx) in &dataflow.index_imports {
475 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
476 context.import_index(
477 compute_state,
478 &mut tokens,
479 input_probe,
480 *idx_id,
481 &idx.desc,
482 start_signal.clone(),
483 );
484 }
485
486 for object in dataflow.objects_to_build {
488 let bundle = context.scope.clone().region_named(
489 &format!("BuildingObject({:?})", object.id),
490 |region| {
491 let depends = object.plan.depends();
492 context
493 .enter_region(region, Some(&depends))
494 .render_plan(object.id, object.plan)
495 .leave_region()
496 },
497 );
498 let global_id = object.id;
499 context.log_dataflow_global_id(
500 *bundle
501 .scope()
502 .addr()
503 .first()
504 .expect("Dataflow root id must exist"),
505 global_id,
506 );
507 context.insert_id(Id::Global(object.id), bundle);
508 }
509
510 for (idx_id, dependencies, idx) in indexes {
512 context.export_index(
513 compute_state,
514 &tokens,
515 dependencies,
516 idx_id,
517 &idx,
518 &output_probe,
519 );
520 }
521
522 for (sink_id, dependencies, sink) in sinks {
524 context.export_sink(
525 compute_state,
526 &tokens,
527 dependencies,
528 sink_id,
529 &sink,
530 start_signal.clone(),
531 ct_ctx.input_times(&context.scope.parent),
532 &output_probe,
533 );
534 }
535 });
536 }
537 })
538}
539
540impl<'g, G, T> Context<Child<'g, G, T>>
543where
544 G: Scope<Timestamp = mz_repr::Timestamp>,
545 T: Refines<G::Timestamp> + RenderTimestamp,
546{
547 pub(crate) fn import_index(
548 &mut self,
549 compute_state: &mut ComputeState,
550 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
551 input_probe: probe::Handle<mz_repr::Timestamp>,
552 idx_id: GlobalId,
553 idx: &IndexDesc,
554 start_signal: StartSignal,
555 ) {
556 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
557 assert!(
558 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
559 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
560 );
561
562 let token = traces.to_drop().clone();
563
564 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
565 &self.scope.parent,
566 &format!("Index({}, {:?})", idx.on_id, idx.key),
567 self.as_of_frontier.clone(),
568 self.until.clone(),
569 );
570
571 oks.stream = oks.stream.probe_with(&input_probe);
572
573 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
574 &self.scope.parent,
575 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
576 self.as_of_frontier.clone(),
577 self.until.clone(),
578 );
579
580 let ok_arranged = oks
581 .enter(&self.scope)
582 .with_start_signal(start_signal.clone());
583 let err_arranged = err_arranged
584 .enter(&self.scope)
585 .with_start_signal(start_signal);
586
587 self.update_id(
588 Id::Global(idx.on_id),
589 CollectionBundle::from_expressions(
590 idx.key.clone(),
591 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
592 ),
593 );
594 tokens.insert(
595 idx_id,
596 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
597 );
598 } else {
599 panic!(
600 "import of index {} failed while building dataflow {}",
601 idx_id, self.dataflow_id
602 );
603 }
604 }
605}
606
607impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
610where
611 G: Scope<Timestamp = mz_repr::Timestamp>,
612{
613 pub(crate) fn export_index(
614 &self,
615 compute_state: &mut ComputeState,
616 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
617 dependency_ids: BTreeSet<GlobalId>,
618 idx_id: GlobalId,
619 idx: &IndexDesc,
620 output_probe: &MzProbeHandle<G::Timestamp>,
621 ) {
622 let mut needed_tokens = Vec::new();
624 for dep_id in dependency_ids {
625 if let Some(token) = tokens.get(&dep_id) {
626 needed_tokens.push(Rc::clone(token));
627 }
628 }
629 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
630 panic!(
631 "Arrangement alarmingly absent! id: {:?}",
632 Id::Global(idx_id)
633 )
634 });
635
636 match bundle.arrangement(&idx.key) {
637 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
638 if let Some(&expiration) = self.dataflow_expiration.as_option() {
641 oks.stream = oks.stream.expire_stream_at(
642 &format!("{}_export_index_oks", self.debug_name),
643 expiration,
644 );
645 errs.stream = errs.stream.expire_stream_at(
646 &format!("{}_export_index_errs", self.debug_name),
647 expiration,
648 );
649 }
650
651 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
652
653 if let Some(logger) = compute_state.compute_logger.clone() {
655 errs.stream.log_dataflow_errors(logger, idx_id);
656 }
657
658 compute_state.traces.set(
659 idx_id,
660 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
661 );
662 }
663 Some(ArrangementFlavor::Trace(gid, _, _)) => {
664 let trace = compute_state.traces.get(&gid).unwrap().clone();
667 compute_state.traces.set(idx_id, trace);
668 }
669 None => {
670 println!("collection available: {:?}", bundle.collection.is_none());
671 println!(
672 "keys available: {:?}",
673 bundle.arranged.keys().collect::<Vec<_>>()
674 );
675 panic!(
676 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
677 Id::Global(idx_id),
678 &idx.key
679 );
680 }
681 };
682 }
683}
684
685impl<'g, G, T> Context<Child<'g, G, T>>
688where
689 G: Scope<Timestamp = mz_repr::Timestamp>,
690 T: RenderTimestamp,
691{
692 pub(crate) fn export_index_iterative(
693 &self,
694 compute_state: &mut ComputeState,
695 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
696 dependency_ids: BTreeSet<GlobalId>,
697 idx_id: GlobalId,
698 idx: &IndexDesc,
699 output_probe: &MzProbeHandle<G::Timestamp>,
700 ) {
701 let mut needed_tokens = Vec::new();
703 for dep_id in dependency_ids {
704 if let Some(token) = tokens.get(&dep_id) {
705 needed_tokens.push(Rc::clone(token));
706 }
707 }
708 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
709 panic!(
710 "Arrangement alarmingly absent! id: {:?}",
711 Id::Global(idx_id)
712 )
713 });
714
715 match bundle.arrangement(&idx.key) {
716 Some(ArrangementFlavor::Local(oks, errs)) => {
717 let mut oks = oks
721 .as_collection(|k, v| (k.to_row(), v.to_row()))
722 .leave()
723 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
724 "Arrange export iterative",
725 );
726
727 let mut errs = errs
728 .as_collection(|k, v| (k.clone(), v.clone()))
729 .leave()
730 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
731 "Arrange export iterative err",
732 );
733
734 if let Some(&expiration) = self.dataflow_expiration.as_option() {
737 oks.stream = oks.stream.expire_stream_at(
738 &format!("{}_export_index_iterative_oks", self.debug_name),
739 expiration,
740 );
741 errs.stream = errs.stream.expire_stream_at(
742 &format!("{}_export_index_iterative_err", self.debug_name),
743 expiration,
744 );
745 }
746
747 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
748
749 if let Some(logger) = compute_state.compute_logger.clone() {
751 errs.stream.log_dataflow_errors(logger, idx_id);
752 }
753
754 compute_state.traces.set(
755 idx_id,
756 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
757 );
758 }
759 Some(ArrangementFlavor::Trace(gid, _, _)) => {
760 let trace = compute_state.traces.get(&gid).unwrap().clone();
763 compute_state.traces.set(idx_id, trace);
764 }
765 None => {
766 println!("collection available: {:?}", bundle.collection.is_none());
767 println!(
768 "keys available: {:?}",
769 bundle.arranged.keys().collect::<Vec<_>>()
770 );
771 panic!(
772 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
773 Id::Global(idx_id),
774 &idx.key
775 );
776 }
777 };
778 }
779}
780
781enum BindingInfo {
787 Body { in_let: bool },
788 Let { id: LocalId, last: bool },
789 LetRec { id: LocalId, last: bool },
790}
791
792impl<G> Context<G>
793where
794 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
795{
796 fn render_recursive_plan(
809 &mut self,
810 object_id: GlobalId,
811 level: usize,
812 plan: RenderPlan,
813 binding: BindingInfo,
814 ) -> CollectionBundle<G> {
815 for BindStage { lets, recs } in plan.binds {
816 let mut let_iter = lets.into_iter().peekable();
818 while let Some(LetBind { id, value }) = let_iter.next() {
819 let bundle =
820 self.scope
821 .clone()
822 .region_named(&format!("Binding({:?})", id), |region| {
823 let depends = value.depends();
824 let last = let_iter.peek().is_none();
825 let binding = BindingInfo::Let { id, last };
826 self.enter_region(region, Some(&depends))
827 .render_letfree_plan(object_id, value, binding)
828 .leave_region()
829 });
830 self.insert_id(Id::Local(id), bundle);
831 }
832
833 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
834
835 let mut variables = BTreeMap::new();
839 for id in rec_ids.iter() {
840 use differential_dataflow::dynamic::feedback_summary;
841 let inner = feedback_summary::<u64>(level + 1, 1);
842 let oks_v = SemigroupVariable::new(
843 &mut self.scope,
844 Product::new(Default::default(), inner.clone()),
845 );
846 let err_v = SemigroupVariable::new(
847 &mut self.scope,
848 Product::new(Default::default(), inner),
849 );
850
851 self.insert_id(
852 Id::Local(*id),
853 CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
854 );
855 variables.insert(Id::Local(*id), (oks_v, err_v));
856 }
857 let mut rec_iter = recs.into_iter().peekable();
859 while let Some(RecBind { id, value, limit }) = rec_iter.next() {
860 let last = rec_iter.peek().is_none();
861 let binding = BindingInfo::LetRec { id, last };
862 let bundle = self.render_recursive_plan(object_id, level + 1, value, binding);
863 let (oks, mut err) = bundle.collection.clone().unwrap();
866 self.insert_id(Id::Local(id), bundle);
867 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
868
869 let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
871
872 if let Some(limit) = limit {
873 let (in_limit, over_limit) =
876 oks.inner.branch_when(move |Product { inner: ps, .. }| {
877 let iteration_index = *ps.get(level).unwrap_or(&0);
879 iteration_index + 1 >= limit.max_iters.into()
881 });
882 oks = VecCollection::new(in_limit);
883 if !limit.return_at_limit {
884 err = err.concat(&VecCollection::new(over_limit).map(move |_data| {
885 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
886 format!("{}", limit.max_iters.get()).into(),
887 )))
888 }));
889 }
890 }
891
892 let err: KeyCollection<_, _, _> = err.into();
898 let errs = err
899 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
900 "Arrange recursive err",
901 )
902 .mz_reduce_abelian::<_, ErrBuilder<_, _>, ErrSpine<_, _>>(
903 "Distinct recursive err",
904 move |_k, _s, t| t.push(((), Diff::ONE)),
905 )
906 .as_collection(|k, _| k.clone());
907
908 oks_v.set(&oks);
909 err_v.set(&errs);
910 }
911 for id in rec_ids.into_iter() {
913 let bundle = self.remove_id(Id::Local(id)).unwrap();
914 let (oks, err) = bundle.collection.unwrap();
915 self.insert_id(
916 Id::Local(id),
917 CollectionBundle::from_collections(
918 oks.leave_dynamic(level + 1),
919 err.leave_dynamic(level + 1),
920 ),
921 );
922 }
923 }
924
925 self.render_letfree_plan(object_id, plan.body, binding)
926 }
927}
928
929impl<G> Context<G>
930where
931 G: Scope,
932 G::Timestamp: RenderTimestamp,
933{
934 fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
945 let mut in_let = false;
946 for BindStage { lets, recs } in plan.binds {
947 assert!(recs.is_empty());
948
949 let mut let_iter = lets.into_iter().peekable();
950 while let Some(LetBind { id, value }) = let_iter.next() {
951 in_let = true;
953 let bundle =
954 self.scope
955 .clone()
956 .region_named(&format!("Binding({:?})", id), |region| {
957 let depends = value.depends();
958 let last = let_iter.peek().is_none();
959 let binding = BindingInfo::Let { id, last };
960 self.enter_region(region, Some(&depends))
961 .render_letfree_plan(object_id, value, binding)
962 .leave_region()
963 });
964 self.insert_id(Id::Local(id), bundle);
965 }
966 }
967
968 self.scope.clone().region_named("Main Body", |region| {
969 let depends = plan.body.depends();
970 self.enter_region(region, Some(&depends))
971 .render_letfree_plan(object_id, plan.body, BindingInfo::Body { in_let })
972 .leave_region()
973 })
974 }
975
976 fn render_letfree_plan(
978 &mut self,
979 object_id: GlobalId,
980 plan: LetFreePlan,
981 binding: BindingInfo,
982 ) -> CollectionBundle<G> {
983 let (mut nodes, root_id, topological_order) = plan.destruct();
984
985 let mut collections = BTreeMap::new();
987
988 let should_compute_lir_metadata = self.compute_logger.is_some();
994 let mut lir_mapping_metadata = if should_compute_lir_metadata {
995 Some(Vec::with_capacity(nodes.len()))
996 } else {
997 None
998 };
999
1000 let mut topo_iter = topological_order.into_iter().peekable();
1001 while let Some(lir_id) = topo_iter.next() {
1002 let node = nodes.remove(&lir_id).unwrap();
1003
1004 let metadata = if should_compute_lir_metadata {
1008 let operator = node.expr.humanize(&DummyHumanizer);
1009
1010 let operator = if topo_iter.peek().is_none() {
1012 match &binding {
1013 BindingInfo::Body { in_let: true } => format!("Returning {operator}"),
1014 BindingInfo::Body { in_let: false } => operator,
1015 BindingInfo::Let { id, last: true } => {
1016 format!("With {id} = {operator}")
1017 }
1018 BindingInfo::Let { id, last: false } => {
1019 format!("{id} = {operator}")
1020 }
1021 BindingInfo::LetRec { id, last: true } => {
1022 format!("With Recursive {id} = {operator}")
1023 }
1024 BindingInfo::LetRec { id, last: false } => {
1025 format!("{id} = {operator}")
1026 }
1027 }
1028 } else {
1029 operator
1030 };
1031
1032 let operator_id_start = self.scope.peek_identifier();
1033 Some((operator, operator_id_start))
1034 } else {
1035 None
1036 };
1037
1038 let mut bundle = self.render_plan_expr(node.expr, &collections);
1039
1040 if let Some((operator, operator_id_start)) = metadata {
1041 let operator_id_end = self.scope.peek_identifier();
1042 let operator_span = (operator_id_start, operator_id_end);
1043
1044 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1045 lir_mapping_metadata.push((
1046 lir_id,
1047 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1048 ))
1049 }
1050 }
1051
1052 self.log_operator_hydration(&mut bundle, lir_id);
1053
1054 collections.insert(lir_id, bundle);
1055 }
1056
1057 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1058 self.log_lir_mapping(object_id, lir_mapping_metadata);
1059 }
1060
1061 collections
1062 .remove(&root_id)
1063 .expect("LetFreePlan invariant (1)")
1064 }
1065
1066 fn render_plan_expr(
1073 &mut self,
1074 expr: render_plan::Expr,
1075 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1076 ) -> CollectionBundle<G> {
1077 use render_plan::Expr::*;
1078
1079 let expect_input = |id| {
1080 collections
1081 .get(&id)
1082 .cloned()
1083 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1084 };
1085
1086 match expr {
1087 Constant { rows } => {
1088 let (rows, errs) = match rows {
1090 Ok(rows) => (rows, Vec::new()),
1091 Err(e) => (Vec::new(), vec![e]),
1092 };
1093
1094 let as_of_frontier = self.as_of_frontier.clone();
1096 let until = self.until.clone();
1097 let ok_collection = rows
1098 .into_iter()
1099 .filter_map(move |(row, mut time, diff)| {
1100 time.advance_by(as_of_frontier.borrow());
1101 if !until.less_equal(&time) {
1102 Some((
1103 row,
1104 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1105 diff,
1106 ))
1107 } else {
1108 None
1109 }
1110 })
1111 .to_stream(&mut self.scope)
1112 .as_collection();
1113
1114 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1115 error_time.advance_by(self.as_of_frontier.borrow());
1116 let err_collection = errs
1117 .into_iter()
1118 .map(move |e| {
1119 (
1120 DataflowError::from(e),
1121 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1122 Diff::ONE,
1123 )
1124 })
1125 .to_stream(&mut self.scope)
1126 .as_collection();
1127
1128 CollectionBundle::from_collections(ok_collection, err_collection)
1129 }
1130 Get { id, keys, plan } => {
1131 let mut collection = self
1134 .lookup_id(id)
1135 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1136 match plan {
1137 mz_compute_types::plan::GetPlan::PassArrangements => {
1138 assert!(
1140 keys.arranged
1141 .iter()
1142 .all(|(key, _, _)| collection.arranged.contains_key(key))
1143 );
1144 assert!(keys.raw <= collection.collection.is_some());
1145 collection.arranged.retain(|key, _value| {
1147 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1148 });
1149 collection
1150 }
1151 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1152 let (oks, errs) = collection.as_collection_core(
1153 mfp,
1154 Some((key, row)),
1155 self.until.clone(),
1156 &self.config_set,
1157 );
1158 CollectionBundle::from_collections(oks, errs)
1159 }
1160 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1161 let (oks, errs) = collection.as_collection_core(
1162 mfp,
1163 None,
1164 self.until.clone(),
1165 &self.config_set,
1166 );
1167 CollectionBundle::from_collections(oks, errs)
1168 }
1169 }
1170 }
1171 Mfp {
1172 input,
1173 mfp,
1174 input_key_val,
1175 } => {
1176 let input = expect_input(input);
1177 if mfp.is_identity() {
1179 input
1180 } else {
1181 let (oks, errs) = input.as_collection_core(
1182 mfp,
1183 input_key_val,
1184 self.until.clone(),
1185 &self.config_set,
1186 );
1187 CollectionBundle::from_collections(oks, errs)
1188 }
1189 }
1190 FlatMap {
1191 input_key,
1192 input,
1193 exprs,
1194 func,
1195 mfp_after: mfp,
1196 } => {
1197 let input = expect_input(input);
1198 self.render_flat_map(input_key, input, exprs, func, mfp)
1199 }
1200 Join { inputs, plan } => {
1201 let inputs = inputs.into_iter().map(expect_input).collect();
1202 match plan {
1203 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1204 self.render_join(inputs, linear_plan)
1205 }
1206 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1207 self.render_delta_join(inputs, delta_plan)
1208 }
1209 }
1210 }
1211 Reduce {
1212 input_key,
1213 input,
1214 key_val_plan,
1215 plan,
1216 mfp_after,
1217 } => {
1218 let input = expect_input(input);
1219 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1220 self.render_reduce(input_key, input, key_val_plan, plan, mfp_option)
1221 }
1222 TopK { input, top_k_plan } => {
1223 let input = expect_input(input);
1224 self.render_topk(input, top_k_plan)
1225 }
1226 Negate { input } => {
1227 let input = expect_input(input);
1228 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1229 CollectionBundle::from_collections(oks.negate(), errs)
1230 }
1231 Threshold {
1232 input,
1233 threshold_plan,
1234 } => {
1235 let input = expect_input(input);
1236 self.render_threshold(input, threshold_plan)
1237 }
1238 Union {
1239 inputs,
1240 consolidate_output,
1241 } => {
1242 let mut oks = Vec::new();
1243 let mut errs = Vec::new();
1244 for input in inputs.into_iter() {
1245 let (os, es) =
1246 expect_input(input).as_specific_collection(None, &self.config_set);
1247 oks.push(os);
1248 errs.push(es);
1249 }
1250 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1251 if consolidate_output {
1252 oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1253 }
1254 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1255 CollectionBundle::from_collections(oks, errs)
1256 }
1257 ArrangeBy {
1258 input_key,
1259 input,
1260 input_mfp,
1261 forms: keys,
1262 } => {
1263 let input = expect_input(input);
1264 input.ensure_collections(
1265 keys,
1266 input_key,
1267 input_mfp,
1268 self.until.clone(),
1269 &self.config_set,
1270 )
1271 }
1272 }
1273 }
1274
1275 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1276 if let Some(logger) = &self.compute_logger {
1277 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1278 dataflow_index,
1279 global_id,
1280 }));
1281 }
1282 }
1283
1284 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1285 if let Some(logger) = &self.compute_logger {
1286 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1287 }
1288 }
1289
1290 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1291 match bundle.arranged.values_mut().next() {
1311 Some(arrangement) => {
1312 use ArrangementFlavor::*;
1313
1314 match arrangement {
1315 Local(a, _) => {
1316 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1317 }
1318 Trace(_, a, _) => {
1319 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1320 }
1321 }
1322 }
1323 None => {
1324 let (oks, _) = bundle
1325 .collection
1326 .as_mut()
1327 .expect("CollectionBundle invariant");
1328 let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1329 *oks = stream.as_collection();
1330 }
1331 }
1332 }
1333
1334 fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1335 where
1336 D: Clone + 'static,
1337 {
1338 let Some(logger) = self.compute_logger.clone() else {
1339 return stream.clone(); };
1341
1342 let export_ids = self.export_ids.clone();
1343
1344 let mut hydration_frontier = Antichain::new();
1352 for time in self.as_of_frontier.iter() {
1353 if let Some(time) = time.try_step_forward() {
1354 hydration_frontier.insert(Refines::to_inner(time));
1355 }
1356 }
1357
1358 let name = format!("LogOperatorHydration ({lir_id})");
1359 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1360 let mut hydrated = false;
1361
1362 for &export_id in &export_ids {
1363 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1364 export_id,
1365 lir_id,
1366 hydrated,
1367 }));
1368 }
1369
1370 move |(input, frontier), output| {
1371 input.for_each(|cap, data| {
1373 output.session(&cap).give_container(data);
1374 });
1375
1376 if hydrated {
1377 return;
1378 }
1379
1380 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier.frontier()) {
1381 hydrated = true;
1382
1383 for &export_id in &export_ids {
1384 logger.log(&ComputeEvent::OperatorHydration(OperatorHydration {
1385 export_id,
1386 lir_id,
1387 hydrated,
1388 }));
1389 }
1390 }
1391 }
1392 })
1393 }
1394}
1395
1396#[allow(dead_code)] pub trait RenderTimestamp: MzTimestamp + Refines<mz_repr::Timestamp> {
1399 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1404 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1406 fn event_time(&self) -> mz_repr::Timestamp;
1408 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1410 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1412 fn step_back(&self) -> Self;
1415}
1416
1417impl RenderTimestamp for mz_repr::Timestamp {
1418 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1419 self
1420 }
1421 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1422 delay
1423 }
1424 fn event_time(&self) -> mz_repr::Timestamp {
1425 *self
1426 }
1427 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1428 self
1429 }
1430 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1431 delay
1432 }
1433 fn step_back(&self) -> Self {
1434 self.saturating_sub(1)
1435 }
1436}
1437
1438impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1439 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1440 &mut self.outer
1441 }
1442 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1443 Product::new(delay, Default::default())
1444 }
1445 fn event_time(&self) -> mz_repr::Timestamp {
1446 self.outer
1447 }
1448 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1449 &mut self.outer
1450 }
1451 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1452 Product::new(delay, Default::default())
1453 }
1454 fn step_back(&self) -> Self {
1455 let inner = self.inner.clone();
1459 let mut vec = inner.into_vec();
1460 for item in vec.iter_mut() {
1461 *item = item.saturating_sub(1);
1462 }
1463 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1464 }
1465}
1466
1467#[derive(Clone)]
1477pub(crate) struct StartSignal {
1478 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1483 token_ref: Weak<RefCell<Box<dyn Any>>>,
1485}
1486
1487impl StartSignal {
1488 pub fn new() -> (Self, Rc<dyn Any>) {
1491 let (tx, rx) = oneshot::channel::<Infallible>();
1492 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1493 let signal = Self {
1494 fut: rx.shared(),
1495 token_ref: Rc::downgrade(&token),
1496 };
1497 (signal, token)
1498 }
1499
1500 pub fn has_fired(&self) -> bool {
1501 self.token_ref.strong_count() == 0
1502 }
1503
1504 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1505 if let Some(token) = self.token_ref.upgrade() {
1506 let mut token = token.borrow_mut();
1507 let inner = std::mem::replace(&mut *token, Box::new(()));
1508 *token = Box::new((inner, to_drop));
1509 }
1510 }
1511}
1512
1513impl Future for StartSignal {
1514 type Output = ();
1515
1516 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1517 self.fut.poll_unpin(cx).map(|_| ())
1518 }
1519}
1520
1521pub(crate) trait WithStartSignal {
1523 fn with_start_signal(self, signal: StartSignal) -> Self;
1528}
1529
1530impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1531where
1532 S: Scope,
1533 S::Timestamp: RenderTimestamp,
1534 Tr: TraceReader + Clone,
1535{
1536 fn with_start_signal(self, signal: StartSignal) -> Self {
1537 Arranged {
1538 stream: self.stream.with_start_signal(signal),
1539 trace: self.trace,
1540 }
1541 }
1542}
1543
1544impl<S, D> WithStartSignal for Stream<S, D>
1545where
1546 S: Scope,
1547 D: timely::Data,
1548{
1549 fn with_start_signal(self, signal: StartSignal) -> Self {
1550 self.unary(Pipeline, "StartSignal", |_cap, info| {
1551 let token = Box::new(ActivateOnDrop::new(
1552 (),
1553 info.address,
1554 self.scope().activations(),
1555 ));
1556 signal.drop_on_fire(token);
1557
1558 let mut stash = Vec::new();
1559
1560 move |input, output| {
1561 if !signal.has_fired() {
1563 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1564 return;
1565 }
1566
1567 for (cap, mut data) in std::mem::take(&mut stash) {
1569 output.session(&cap).give_container(&mut data);
1570 }
1571
1572 input.for_each(|cap, data| {
1574 output.session(&cap).give_container(data);
1575 });
1576 }
1577 })
1578 }
1579}
1580
1581fn suppress_early_progress<G, D>(
1603 stream: Stream<G, D>,
1604 as_of: Antichain<G::Timestamp>,
1605) -> Stream<G, D>
1606where
1607 G: Scope,
1608 D: Data,
1609{
1610 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1611 let mut early_cap = Some(default_cap);
1612
1613 move |(input, frontier), output| {
1614 input.for_each_time(|data_cap, data| {
1615 if as_of.less_than(data_cap.time()) {
1616 let mut session = output.session(&data_cap);
1617 for data in data {
1618 session.give_container(data);
1619 }
1620 } else {
1621 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1622 let mut session = output.session(&cap);
1623 for data in data {
1624 session.give_container(data);
1625 }
1626 }
1627 });
1628
1629 if !PartialOrder::less_equal(&frontier.frontier(), &as_of.borrow()) {
1630 early_cap.take();
1631 }
1632 }
1633 })
1634}
1635
1636trait LimitProgress<T: Timestamp> {
1638 fn limit_progress(
1666 &self,
1667 handle: MzProbeHandle<T>,
1668 slack_ms: u64,
1669 limit: Option<usize>,
1670 upper: Antichain<T>,
1671 name: String,
1672 ) -> Self;
1673}
1674
1675impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1678where
1679 G: Scope<Timestamp = mz_repr::Timestamp>,
1680 D: timely::Data,
1681 R: timely::Data,
1682{
1683 fn limit_progress(
1684 &self,
1685 handle: MzProbeHandle<mz_repr::Timestamp>,
1686 slack_ms: u64,
1687 limit: Option<usize>,
1688 upper: Antichain<mz_repr::Timestamp>,
1689 name: String,
1690 ) -> Self {
1691 let stream =
1692 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1693 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1695 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1697
1698 let activator = self.scope().activator_for(info.address);
1699 handle.activate(activator.clone());
1700
1701 move |(input, frontier), output| {
1702 input.for_each(|cap, data| {
1703 for time in data
1704 .iter()
1705 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1706 {
1707 let rounded_time =
1708 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1709 if !upper.less_than(&rounded_time.into()) {
1710 pending_times.insert(rounded_time.into());
1711 }
1712 }
1713 output.session(&cap).give_container(data);
1714 if retained_cap.as_ref().is_none_or(|c| {
1715 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1716 }) {
1717 retained_cap = Some(cap.retain());
1718 }
1719 });
1720
1721 handle.with_frontier(|f| {
1722 while pending_times
1723 .first()
1724 .map_or(false, |retained_time| !f.less_than(&retained_time))
1725 {
1726 let _ = pending_times.pop_first();
1727 }
1728 });
1729
1730 while limit.map_or(false, |limit| pending_times.len() > limit) {
1731 let _ = pending_times.pop_first();
1732 }
1733
1734 match (retained_cap.as_mut(), pending_times.first()) {
1735 (Some(cap), Some(first)) => cap.downgrade(first),
1736 (_, None) => retained_cap = None,
1737 _ => {}
1738 }
1739
1740 if frontier.is_empty() {
1741 retained_cap = None;
1742 pending_times.clear();
1743 }
1744
1745 if !pending_times.is_empty() {
1746 tracing::debug!(
1747 name,
1748 info.global_id,
1749 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1750 frontier = ?frontier.frontier().get(0),
1751 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1752 ?upper,
1753 "pending times",
1754 );
1755 }
1756 }
1757 });
1758 stream
1759 }
1760}
1761
1762struct PendingTimesDisplay<T>(T);
1765
1766impl<T> std::fmt::Display for PendingTimesDisplay<T>
1767where
1768 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1769{
1770 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1771 let mut iter = self.0.clone().into_iter();
1772 write!(f, "[")?;
1773 if let Some(first) = iter.next() {
1774 write!(f, "{}", first)?;
1775 let mut last = u64::from(first);
1776 for time in iter {
1777 write!(f, ", +{}", u64::from(time) - last)?;
1778 last = u64::from(time);
1779 }
1780 }
1781 write!(f, "]")?;
1782 Ok(())
1783 }
1784}
1785
1786#[derive(Clone, Copy, Debug)]
1789struct Pairer {
1790 split_arity: usize,
1791}
1792
1793impl Pairer {
1794 fn new(split_arity: usize) -> Self {
1796 Self { split_arity }
1797 }
1798
1799 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1801 where
1802 I1: IntoIterator<Item = Datum<'a>>,
1803 I2: IntoIterator<Item = Datum<'a>>,
1804 {
1805 SharedRow::pack(first.into_iter().chain(second))
1806 }
1807
1808 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1810 let mut datum_iter = datum_iter.into_iter();
1811 let mut row_builder = SharedRow::get();
1812 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1813 let second = row_builder.pack_using(datum_iter);
1814 (first, second)
1815 }
1816}