1use std::any::Any;
104use std::cell::RefCell;
105use std::collections::{BTreeMap, BTreeSet};
106use std::convert::Infallible;
107use std::future::Future;
108use std::pin::Pin;
109use std::rc::{Rc, Weak};
110use std::sync::Arc;
111use std::task::Poll;
112
113use columnar::Columnar;
114use differential_dataflow::IntoOwned;
115use differential_dataflow::containers::Columnation;
116use differential_dataflow::dynamic::pointstamp::PointStamp;
117use differential_dataflow::lattice::Lattice;
118use differential_dataflow::operators::arrange::{Arranged, ShutdownButton};
119use differential_dataflow::trace::TraceReader;
120use differential_dataflow::{AsCollection, Collection, Data};
121use futures::FutureExt;
122use futures::channel::oneshot;
123use mz_compute_types::dataflows::{DataflowDescription, IndexDesc};
124use mz_compute_types::dyncfgs::{
125 COMPUTE_APPLY_COLUMN_DEMANDS, COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK,
126 COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES, ENABLE_COMPUTE_LOGICAL_BACKPRESSURE,
127};
128use mz_compute_types::plan::LirId;
129use mz_compute_types::plan::render_plan::{
130 self, BindStage, LetBind, LetFreePlan, RecBind, RenderPlan,
131};
132use mz_expr::{EvalError, Id};
133use mz_persist_client::operators::shard_source::SnapshotMode;
134use mz_repr::explain::DummyHumanizer;
135use mz_repr::{Datum, Diff, GlobalId, Row, SharedRow};
136use mz_storage_operators::persist_source;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_timely_util::operator::{CollectionExt, StreamExt};
140use mz_timely_util::probe::{Handle as MzProbeHandle, ProbeNotify};
141use timely::PartialOrder;
142use timely::communication::Allocate;
143use timely::dataflow::channels::pact::Pipeline;
144use timely::dataflow::operators::to_stream::ToStream;
145use timely::dataflow::operators::{BranchWhen, Capability, Operator, Probe, probe};
146use timely::dataflow::scopes::Child;
147use timely::dataflow::{Scope, Stream, StreamCore};
148use timely::order::Product;
149use timely::progress::timestamp::Refines;
150use timely::progress::{Antichain, Timestamp};
151use timely::scheduling::ActivateOnDrop;
152use timely::worker::Worker as TimelyWorker;
153
154use crate::arrangement::manager::TraceBundle;
155use crate::compute_state::ComputeState;
156use crate::extensions::arrange::{KeyCollection, MzArrange};
157use crate::extensions::reduce::MzReduce;
158use crate::logging::compute::{
159 ComputeEvent, DataflowGlobal, LirMapping, LirMetadata, LogDataflowErrors,
160};
161use crate::render::context::{ArrangementFlavor, Context, ShutdownToken};
162use crate::render::continual_task::ContinualTaskCtx;
163use crate::row_spine::{RowRowBatcher, RowRowBuilder};
164use crate::typedefs::{ErrBatcher, ErrBuilder, ErrSpine, KeyBatcher};
165
166pub mod context;
167pub(crate) mod continual_task;
168mod errors;
169mod flat_map;
170mod join;
171mod reduce;
172pub mod sinks;
173mod threshold;
174mod top_k;
175
176pub use context::CollectionBundle;
177pub use join::LinearJoinSpec;
178
179pub fn build_compute_dataflow<A: Allocate>(
185 timely_worker: &mut TimelyWorker<A>,
186 compute_state: &mut ComputeState,
187 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
188 start_signal: StartSignal,
189 until: Antichain<mz_repr::Timestamp>,
190 dataflow_expiration: Antichain<mz_repr::Timestamp>,
191) {
192 let recursive = dataflow
194 .objects_to_build
195 .iter()
196 .any(|object| object.plan.is_recursive());
197
198 let indexes = dataflow
200 .index_exports
201 .iter()
202 .map(|(idx_id, (idx, _typ))| (*idx_id, dataflow.depends_on(idx.on_id), idx.clone()))
203 .collect::<Vec<_>>();
204
205 let sinks = dataflow
207 .sink_exports
208 .iter()
209 .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
210 .collect::<Vec<_>>();
211
212 let worker_logging = timely_worker.log_register().get("timely").map(Into::into);
213 let apply_demands = COMPUTE_APPLY_COLUMN_DEMANDS.get(&compute_state.worker_config);
214
215 let name = format!("Dataflow: {}", &dataflow.debug_name);
218 let input_name = format!("InputRegion: {}", &dataflow.debug_name);
219 let build_name = format!("BuildRegion: {}", &dataflow.debug_name);
220
221 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
222 let mut ct_ctx = ContinualTaskCtx::new(&dataflow);
225
226 let mut imported_sources = Vec::new();
231 let mut tokens: BTreeMap<_, Rc<dyn Any>> = BTreeMap::new();
232 let output_probe = MzProbeHandle::default();
233
234 scope.clone().region_named(&input_name, |region| {
235 for (source_id, (source, _monotonic, upper)) in dataflow.source_imports.iter() {
237 region.region_named(&format!("Source({:?})", source_id), |inner| {
238 let mut read_schema = None;
239 let mut mfp = source.arguments.operators.clone().map(|mut ops| {
240 if apply_demands {
243 let demands = ops.demand();
244 let new_desc =
245 source.storage_metadata.relation_desc.apply_demand(&demands);
246 let new_arity = demands.len();
247 let remap: BTreeMap<_, _> = demands
248 .into_iter()
249 .enumerate()
250 .map(|(new, old)| (old, new))
251 .collect();
252 ops.permute_fn(|old_idx| remap[&old_idx], new_arity);
253 read_schema = Some(new_desc);
254 }
255
256 mz_expr::MfpPlan::create_from(ops)
257 .expect("Linear operators should always be valid")
258 });
259
260 let mut snapshot_mode = SnapshotMode::Include;
261 let mut suppress_early_progress_as_of = dataflow.as_of.clone();
262 let ct_source_transformer = ct_ctx.get_ct_source_transformer(*source_id);
263 if let Some(x) = ct_source_transformer.as_ref() {
264 snapshot_mode = x.snapshot_mode();
265 suppress_early_progress_as_of = suppress_early_progress_as_of
266 .map(|as_of| x.suppress_early_progress_as_of(as_of));
267 }
268
269 let (mut ok_stream, err_stream, token) = persist_source::persist_source(
272 inner,
273 *source_id,
274 Arc::clone(&compute_state.persist_clients),
275 &compute_state.txns_ctx,
276 &compute_state.worker_config,
277 source.storage_metadata.clone(),
278 read_schema,
279 dataflow.as_of.clone(),
280 snapshot_mode,
281 until.clone(),
282 mfp.as_mut(),
283 compute_state.dataflow_max_inflight_bytes(),
284 start_signal.clone(),
285 |error| panic!("compute_import: {error}"),
286 );
287
288 let mut source_tokens: Vec<Rc<dyn Any>> = vec![Rc::new(token)];
289
290 assert!(mfp.map(|x| x.is_identity()).unwrap_or(true));
293
294 if let Some(as_of) = suppress_early_progress_as_of {
298 ok_stream = suppress_early_progress(ok_stream, as_of);
299 }
300
301 if ENABLE_COMPUTE_LOGICAL_BACKPRESSURE.get(&compute_state.worker_config) {
302 let limit = COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES
304 .get(&compute_state.worker_config);
305 let slack = COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK
306 .get(&compute_state.worker_config)
307 .as_millis()
308 .try_into()
309 .expect("must fit");
310
311 let (token, stream) = ok_stream.limit_progress(
312 output_probe.clone(),
313 slack,
314 limit,
315 upper.clone(),
316 name.clone(),
317 );
318 ok_stream = stream;
319 source_tokens.push(token);
320 }
321
322 let input_probe =
324 compute_state.input_probe_for(*source_id, dataflow.export_ids());
325 ok_stream = ok_stream.probe_with(&input_probe);
326
327 let (ok_stream, err_stream) = match ct_source_transformer {
331 None => (ok_stream, err_stream),
332 Some(ct_source_transformer) => {
333 let (oks, errs, ct_times) = ct_source_transformer
334 .transform(ok_stream.as_collection(), err_stream.as_collection());
335 ct_ctx.ct_times.push(ct_times.leave_region().leave_region());
338 (oks.inner, errs.inner)
339 }
340 };
341
342 let (oks, errs) = (
343 ok_stream.as_collection().leave_region().leave_region(),
344 err_stream.as_collection().leave_region().leave_region(),
345 );
346
347 imported_sources.push((mz_expr::Id::Global(*source_id), (oks, errs)));
348
349 tokens.insert(*source_id, Rc::new(source_tokens));
351 });
352 }
353 });
354
355 if recursive {
358 scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
359 let mut context = Context::for_dataflow_in(
360 &dataflow,
361 region.clone(),
362 compute_state,
363 until,
364 dataflow_expiration,
365 );
366
367 for (id, (oks, errs)) in imported_sources.into_iter() {
368 let bundle = crate::render::CollectionBundle::from_collections(
369 oks.enter(region),
370 errs.enter(region),
371 );
372 context.insert_id(id, bundle);
374 }
375
376 for (idx_id, idx) in &dataflow.index_imports {
378 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
379 context.import_index(
380 compute_state,
381 &mut tokens,
382 input_probe,
383 *idx_id,
384 &idx.desc,
385 start_signal.clone(),
386 );
387 }
388
389 for object in dataflow.objects_to_build {
391 let object_token = Rc::new(());
392 context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token));
393 tokens.insert(object.id, object_token);
394
395 let bundle = context.scope.clone().region_named(
396 &format!("BuildingObject({:?})", object.id),
397 |region| {
398 let depends = object.plan.depends();
399 context
400 .enter_region(region, Some(&depends))
401 .render_recursive_plan(object.id, 0, object.plan)
402 .leave_region()
403 },
404 );
405 let global_id = object.id;
406
407 context.log_dataflow_global_id(
408 *bundle
409 .scope()
410 .addr()
411 .first()
412 .expect("Dataflow root id must exist"),
413 global_id,
414 );
415 context.insert_id(Id::Global(object.id), bundle);
416 }
417
418 for (idx_id, dependencies, idx) in indexes {
420 context.export_index_iterative(
421 compute_state,
422 &tokens,
423 dependencies,
424 idx_id,
425 &idx,
426 &output_probe,
427 );
428 }
429
430 for (sink_id, dependencies, sink) in sinks {
432 context.export_sink(
433 compute_state,
434 &tokens,
435 dependencies,
436 sink_id,
437 &sink,
438 start_signal.clone(),
439 ct_ctx.input_times(&context.scope.parent),
440 &output_probe,
441 );
442 }
443 });
444 } else {
445 scope.clone().region_named(&build_name, |region| {
446 let mut context = Context::for_dataflow_in(
447 &dataflow,
448 region.clone(),
449 compute_state,
450 until,
451 dataflow_expiration,
452 );
453
454 for (id, (oks, errs)) in imported_sources.into_iter() {
455 let bundle = crate::render::CollectionBundle::from_collections(
456 oks.enter_region(region),
457 errs.enter_region(region),
458 );
459 context.insert_id(id, bundle);
461 }
462
463 for (idx_id, idx) in &dataflow.index_imports {
465 let input_probe = compute_state.input_probe_for(*idx_id, dataflow.export_ids());
466 context.import_index(
467 compute_state,
468 &mut tokens,
469 input_probe,
470 *idx_id,
471 &idx.desc,
472 start_signal.clone(),
473 );
474 }
475
476 for object in dataflow.objects_to_build {
478 let object_token = Rc::new(());
479 context.shutdown_token = ShutdownToken::new(Rc::downgrade(&object_token));
480 tokens.insert(object.id, object_token);
481
482 let bundle = context.scope.clone().region_named(
483 &format!("BuildingObject({:?})", object.id),
484 |region| {
485 let depends = object.plan.depends();
486 context
487 .enter_region(region, Some(&depends))
488 .render_plan(object.id, object.plan)
489 .leave_region()
490 },
491 );
492 let global_id = object.id;
493 context.log_dataflow_global_id(
494 *bundle
495 .scope()
496 .addr()
497 .first()
498 .expect("Dataflow root id must exist"),
499 global_id,
500 );
501 context.insert_id(Id::Global(object.id), bundle);
502 }
503
504 for (idx_id, dependencies, idx) in indexes {
506 context.export_index(
507 compute_state,
508 &tokens,
509 dependencies,
510 idx_id,
511 &idx,
512 &output_probe,
513 );
514 }
515
516 for (sink_id, dependencies, sink) in sinks {
518 context.export_sink(
519 compute_state,
520 &tokens,
521 dependencies,
522 sink_id,
523 &sink,
524 start_signal.clone(),
525 ct_ctx.input_times(&context.scope.parent),
526 &output_probe,
527 );
528 }
529 });
530 }
531 })
532}
533
534impl<'g, G, T> Context<Child<'g, G, T>>
537where
538 G: Scope<Timestamp = mz_repr::Timestamp>,
539 T: Refines<G::Timestamp> + RenderTimestamp,
540 <T as Columnar>::Container: Clone + Send,
541{
542 pub(crate) fn import_index(
543 &mut self,
544 compute_state: &mut ComputeState,
545 tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
546 input_probe: probe::Handle<mz_repr::Timestamp>,
547 idx_id: GlobalId,
548 idx: &IndexDesc,
549 start_signal: StartSignal,
550 ) {
551 if let Some(traces) = compute_state.traces.get_mut(&idx_id) {
552 assert!(
553 PartialOrder::less_equal(&traces.compaction_frontier(), &self.as_of_frontier),
554 "Index {idx_id} has been allowed to compact beyond the dataflow as_of"
555 );
556
557 let token = traces.to_drop().clone();
558
559 let (mut oks, ok_button) = traces.oks_mut().import_frontier_core(
560 &self.scope.parent,
561 &format!("Index({}, {:?})", idx.on_id, idx.key),
562 self.as_of_frontier.clone(),
563 self.until.clone(),
564 );
565
566 oks.stream = oks.stream.probe_with(&input_probe);
567
568 let (err_arranged, err_button) = traces.errs_mut().import_frontier_core(
569 &self.scope.parent,
570 &format!("ErrIndex({}, {:?})", idx.on_id, idx.key),
571 self.as_of_frontier.clone(),
572 self.until.clone(),
573 );
574
575 let ok_arranged = oks
576 .enter(&self.scope)
577 .with_start_signal(start_signal.clone());
578 let err_arranged = err_arranged
579 .enter(&self.scope)
580 .with_start_signal(start_signal);
581
582 self.update_id(
583 Id::Global(idx.on_id),
584 CollectionBundle::from_expressions(
585 idx.key.clone(),
586 ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
587 ),
588 );
589 tokens.insert(
590 idx_id,
591 Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
592 );
593 } else {
594 panic!(
595 "import of index {} failed while building dataflow {}",
596 idx_id, self.dataflow_id
597 );
598 }
599 }
600}
601
602impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
605where
606 G: Scope<Timestamp = mz_repr::Timestamp>,
607{
608 pub(crate) fn export_index(
609 &self,
610 compute_state: &mut ComputeState,
611 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
612 dependency_ids: BTreeSet<GlobalId>,
613 idx_id: GlobalId,
614 idx: &IndexDesc,
615 output_probe: &MzProbeHandle<G::Timestamp>,
616 ) {
617 let mut needed_tokens = Vec::new();
619 for dep_id in dependency_ids {
620 if let Some(token) = tokens.get(&dep_id) {
621 needed_tokens.push(Rc::clone(token));
622 }
623 }
624 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
625 panic!(
626 "Arrangement alarmingly absent! id: {:?}",
627 Id::Global(idx_id)
628 )
629 });
630
631 match bundle.arrangement(&idx.key) {
632 Some(ArrangementFlavor::Local(mut oks, mut errs)) => {
633 if let Some(&expiration) = self.dataflow_expiration.as_option() {
636 let token = Rc::new(());
637 let shutdown_token = Rc::downgrade(&token);
638 oks.stream = oks.stream.expire_stream_at(
639 &format!("{}_export_index_oks", self.debug_name),
640 expiration,
641 Weak::clone(&shutdown_token),
642 );
643 errs.stream = errs.stream.expire_stream_at(
644 &format!("{}_export_index_errs", self.debug_name),
645 expiration,
646 shutdown_token,
647 );
648 needed_tokens.push(token);
649 }
650
651 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
652
653 if let Some(logger) = compute_state.compute_logger.clone() {
655 errs.stream.log_dataflow_errors(logger, idx_id);
656 }
657
658 compute_state.traces.set(
659 idx_id,
660 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
661 );
662 }
663 Some(ArrangementFlavor::Trace(gid, _, _)) => {
664 let trace = compute_state.traces.get(&gid).unwrap().clone();
667 compute_state.traces.set(idx_id, trace);
668 }
669 None => {
670 println!("collection available: {:?}", bundle.collection.is_none());
671 println!(
672 "keys available: {:?}",
673 bundle.arranged.keys().collect::<Vec<_>>()
674 );
675 panic!(
676 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
677 Id::Global(idx_id),
678 &idx.key
679 );
680 }
681 };
682 }
683}
684
685impl<'g, G, T> Context<Child<'g, G, T>>
688where
689 G: Scope<Timestamp = mz_repr::Timestamp>,
690 T: RenderTimestamp,
691 <T as Columnar>::Container: Clone + Send,
692{
693 pub(crate) fn export_index_iterative(
694 &self,
695 compute_state: &mut ComputeState,
696 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
697 dependency_ids: BTreeSet<GlobalId>,
698 idx_id: GlobalId,
699 idx: &IndexDesc,
700 output_probe: &MzProbeHandle<G::Timestamp>,
701 ) {
702 let mut needed_tokens = Vec::new();
704 for dep_id in dependency_ids {
705 if let Some(token) = tokens.get(&dep_id) {
706 needed_tokens.push(Rc::clone(token));
707 }
708 }
709 let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
710 panic!(
711 "Arrangement alarmingly absent! id: {:?}",
712 Id::Global(idx_id)
713 )
714 });
715
716 match bundle.arrangement(&idx.key) {
717 Some(ArrangementFlavor::Local(oks, errs)) => {
718 let mut oks = oks
719 .as_collection(|k, v| (k.into_owned(), v.into_owned()))
720 .leave()
721 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, _>(
722 "Arrange export iterative",
723 );
724
725 let mut errs = errs
726 .as_collection(|k, v| (k.clone(), v.clone()))
727 .leave()
728 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
729 "Arrange export iterative err",
730 );
731
732 if let Some(&expiration) = self.dataflow_expiration.as_option() {
735 let token = Rc::new(());
736 let shutdown_token = Rc::downgrade(&token);
737 oks.stream = oks.stream.expire_stream_at(
738 &format!("{}_export_index_iterative_oks", self.debug_name),
739 expiration,
740 Weak::clone(&shutdown_token),
741 );
742 errs.stream = errs.stream.expire_stream_at(
743 &format!("{}_export_index_iterative_err", self.debug_name),
744 expiration,
745 shutdown_token,
746 );
747 needed_tokens.push(token);
748 }
749
750 oks.stream = oks.stream.probe_notify_with(vec![output_probe.clone()]);
751
752 if let Some(logger) = compute_state.compute_logger.clone() {
754 errs.stream.log_dataflow_errors(logger, idx_id);
755 }
756
757 compute_state.traces.set(
758 idx_id,
759 TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
760 );
761 }
762 Some(ArrangementFlavor::Trace(gid, _, _)) => {
763 let trace = compute_state.traces.get(&gid).unwrap().clone();
766 compute_state.traces.set(idx_id, trace);
767 }
768 None => {
769 println!("collection available: {:?}", bundle.collection.is_none());
770 println!(
771 "keys available: {:?}",
772 bundle.arranged.keys().collect::<Vec<_>>()
773 );
774 panic!(
775 "Arrangement alarmingly absent! id: {:?}, keys: {:?}",
776 Id::Global(idx_id),
777 &idx.key
778 );
779 }
780 };
781 }
782}
783
784impl<G> Context<G>
785where
786 G: Scope<Timestamp = Product<mz_repr::Timestamp, PointStamp<u64>>>,
787{
788 pub fn render_recursive_plan(
801 &mut self,
802 object_id: GlobalId,
803 level: usize,
804 plan: RenderPlan,
805 ) -> CollectionBundle<G> {
806 for BindStage { lets, recs } in plan.binds {
807 for LetBind { id, value } in lets {
809 let bundle =
810 self.scope
811 .clone()
812 .region_named(&format!("Binding({:?})", id), |region| {
813 let depends = value.depends();
814 self.enter_region(region, Some(&depends))
815 .render_letfree_plan(object_id, value)
816 .leave_region()
817 });
818 self.insert_id(Id::Local(id), bundle);
819 }
820
821 let rec_ids: Vec<_> = recs.iter().map(|r| r.id).collect();
822
823 let mut variables = BTreeMap::new();
827 for id in rec_ids.iter() {
828 use differential_dataflow::dynamic::feedback_summary;
829 use differential_dataflow::operators::iterate::Variable;
830 let inner = feedback_summary::<u64>(level + 1, 1);
831 let oks_v = Variable::new(
832 &mut self.scope,
833 Product::new(Default::default(), inner.clone()),
834 );
835 let err_v = Variable::new(&mut self.scope, Product::new(Default::default(), inner));
836
837 self.insert_id(
838 Id::Local(*id),
839 CollectionBundle::from_collections(oks_v.clone(), err_v.clone()),
840 );
841 variables.insert(Id::Local(*id), (oks_v, err_v));
842 }
843 for RecBind { id, value, limit } in recs {
845 let bundle = self.render_recursive_plan(object_id, level + 1, value);
846 let (oks, mut err) = bundle.collection.clone().unwrap();
849 self.insert_id(Id::Local(id), bundle);
850 let (oks_v, err_v) = variables.remove(&Id::Local(id)).unwrap();
851
852 let mut oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("LetRecConsolidation");
854 if let Some(token) = &self.shutdown_token.get_inner() {
855 oks = oks.with_token(Weak::clone(token));
856 }
857
858 if let Some(limit) = limit {
859 let (in_limit, over_limit) =
862 oks.inner.branch_when(move |Product { inner: ps, .. }| {
863 let iteration_index = *ps.get(level).unwrap_or(&0);
865 iteration_index + 1 >= limit.max_iters.into()
867 });
868 oks = Collection::new(in_limit);
869 if !limit.return_at_limit {
870 err = err.concat(&Collection::new(over_limit).map(move |_data| {
871 DataflowError::EvalError(Box::new(EvalError::LetRecLimitExceeded(
872 format!("{}", limit.max_iters.get()).into(),
873 )))
874 }));
875 }
876 }
877
878 oks_v.set(&oks);
879
880 let err: KeyCollection<_, _, _> = err.into();
886 let mut errs = err
887 .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
888 "Arrange recursive err",
889 )
890 .mz_reduce_abelian::<_, _, _, ErrBuilder<_, _>, ErrSpine<_, _>>(
891 "Distinct recursive err",
892 move |_k, _s, t| t.push(((), Diff::ONE)),
893 )
894 .as_collection(|k, _| k.clone());
895 if let Some(token) = &self.shutdown_token.get_inner() {
896 errs = errs.with_token(Weak::clone(token));
897 }
898 err_v.set(&errs);
899 }
900 for id in rec_ids.into_iter() {
902 let bundle = self.remove_id(Id::Local(id)).unwrap();
903 let (oks, err) = bundle.collection.unwrap();
904 self.insert_id(
905 Id::Local(id),
906 CollectionBundle::from_collections(
907 oks.leave_dynamic(level + 1),
908 err.leave_dynamic(level + 1),
909 ),
910 );
911 }
912 }
913
914 self.render_letfree_plan(object_id, plan.body)
915 }
916}
917
918impl<G> Context<G>
919where
920 G: Scope,
921 G::Timestamp: RenderTimestamp,
922 <G::Timestamp as Columnar>::Container: Clone + Send,
923 for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
924{
925 pub fn render_plan(&mut self, object_id: GlobalId, plan: RenderPlan) -> CollectionBundle<G> {
936 for BindStage { lets, recs } in plan.binds {
937 assert!(recs.is_empty());
938
939 for LetBind { id, value } in lets {
940 let bundle =
941 self.scope
942 .clone()
943 .region_named(&format!("Binding({:?})", id), |region| {
944 let depends = value.depends();
945 self.enter_region(region, Some(&depends))
946 .render_letfree_plan(object_id, value)
947 .leave_region()
948 });
949 self.insert_id(Id::Local(id), bundle);
950 }
951 }
952
953 self.scope.clone().region_named("Main Body", |region| {
954 let depends = plan.body.depends();
955 self.enter_region(region, Some(&depends))
956 .render_letfree_plan(object_id, plan.body)
957 .leave_region()
958 })
959 }
960
961 fn render_letfree_plan(
963 &mut self,
964 object_id: GlobalId,
965 plan: LetFreePlan,
966 ) -> CollectionBundle<G> {
967 let (mut nodes, root_id, topological_order) = plan.destruct();
968
969 let mut collections = BTreeMap::new();
971
972 let should_compute_lir_metadata = self.compute_logger.is_some();
978 let mut lir_mapping_metadata = if should_compute_lir_metadata {
979 Some(Vec::with_capacity(nodes.len()))
980 } else {
981 None
982 };
983
984 for lir_id in topological_order {
985 let node = nodes.remove(&lir_id).unwrap();
986
987 let metadata = if should_compute_lir_metadata {
991 let operator = node.expr.humanize(&DummyHumanizer);
992 let operator_id_start = self.scope.peek_identifier();
993 Some((operator, operator_id_start))
994 } else {
995 None
996 };
997
998 let mut bundle = self.render_plan_expr(node.expr, &collections);
999
1000 if let Some((operator, operator_id_start)) = metadata {
1001 let operator_id_end = self.scope.peek_identifier();
1002 let operator_span = (operator_id_start, operator_id_end);
1003
1004 if let Some(lir_mapping_metadata) = &mut lir_mapping_metadata {
1005 lir_mapping_metadata.push((
1006 lir_id,
1007 LirMetadata::new(operator, node.parent, node.nesting, operator_span),
1008 ))
1009 }
1010 }
1011
1012 self.log_operator_hydration(&mut bundle, lir_id);
1013
1014 collections.insert(lir_id, bundle);
1015 }
1016
1017 if let Some(lir_mapping_metadata) = lir_mapping_metadata {
1018 self.log_lir_mapping(object_id, lir_mapping_metadata);
1019 }
1020
1021 collections
1022 .remove(&root_id)
1023 .expect("LetFreePlan invariant (1)")
1024 }
1025
1026 fn render_plan_expr(
1033 &mut self,
1034 expr: render_plan::Expr,
1035 collections: &BTreeMap<LirId, CollectionBundle<G>>,
1036 ) -> CollectionBundle<G> {
1037 use render_plan::Expr::*;
1038
1039 let expect_input = |id| {
1040 collections
1041 .get(&id)
1042 .cloned()
1043 .unwrap_or_else(|| panic!("missing input collection: {id}"))
1044 };
1045
1046 match expr {
1047 Constant { rows } => {
1048 let (rows, errs) = match rows {
1050 Ok(rows) => (rows, Vec::new()),
1051 Err(e) => (Vec::new(), vec![e]),
1052 };
1053
1054 let as_of_frontier = self.as_of_frontier.clone();
1056 let until = self.until.clone();
1057 let ok_collection = rows
1058 .into_iter()
1059 .filter_map(move |(row, mut time, diff)| {
1060 time.advance_by(as_of_frontier.borrow());
1061 if !until.less_equal(&time) {
1062 Some((
1063 row,
1064 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(time),
1065 diff,
1066 ))
1067 } else {
1068 None
1069 }
1070 })
1071 .to_stream(&mut self.scope)
1072 .as_collection();
1073
1074 let mut error_time: mz_repr::Timestamp = Timestamp::minimum();
1075 error_time.advance_by(self.as_of_frontier.borrow());
1076 let err_collection = errs
1077 .into_iter()
1078 .map(move |e| {
1079 (
1080 DataflowError::from(e),
1081 <G::Timestamp as Refines<mz_repr::Timestamp>>::to_inner(error_time),
1082 Diff::ONE,
1083 )
1084 })
1085 .to_stream(&mut self.scope)
1086 .as_collection();
1087
1088 CollectionBundle::from_collections(ok_collection, err_collection)
1089 }
1090 Get { id, keys, plan } => {
1091 let mut collection = self
1094 .lookup_id(id)
1095 .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
1096 match plan {
1097 mz_compute_types::plan::GetPlan::PassArrangements => {
1098 assert!(
1100 keys.arranged
1101 .iter()
1102 .all(|(key, _, _)| collection.arranged.contains_key(key))
1103 );
1104 assert!(keys.raw <= collection.collection.is_some());
1105 collection.arranged.retain(|key, _value| {
1107 keys.arranged.iter().any(|(key2, _, _)| key2 == key)
1108 });
1109 collection
1110 }
1111 mz_compute_types::plan::GetPlan::Arrangement(key, row, mfp) => {
1112 let (oks, errs) = collection.as_collection_core(
1113 mfp,
1114 Some((key, row)),
1115 self.until.clone(),
1116 &self.config_set,
1117 );
1118 CollectionBundle::from_collections(oks, errs)
1119 }
1120 mz_compute_types::plan::GetPlan::Collection(mfp) => {
1121 let (oks, errs) = collection.as_collection_core(
1122 mfp,
1123 None,
1124 self.until.clone(),
1125 &self.config_set,
1126 );
1127 CollectionBundle::from_collections(oks, errs)
1128 }
1129 }
1130 }
1131 Mfp {
1132 input,
1133 mfp,
1134 input_key_val,
1135 } => {
1136 let input = expect_input(input);
1137 if mfp.is_identity() {
1139 input
1140 } else {
1141 let (oks, errs) = input.as_collection_core(
1142 mfp,
1143 input_key_val,
1144 self.until.clone(),
1145 &self.config_set,
1146 );
1147 CollectionBundle::from_collections(oks, errs)
1148 }
1149 }
1150 FlatMap {
1151 input,
1152 func,
1153 exprs,
1154 mfp_after: mfp,
1155 input_key,
1156 } => {
1157 let input = expect_input(input);
1158 self.render_flat_map(input, func, exprs, mfp, input_key)
1159 }
1160 Join { inputs, plan } => {
1161 let inputs = inputs.into_iter().map(expect_input).collect();
1162 match plan {
1163 mz_compute_types::plan::join::JoinPlan::Linear(linear_plan) => {
1164 self.render_join(inputs, linear_plan)
1165 }
1166 mz_compute_types::plan::join::JoinPlan::Delta(delta_plan) => {
1167 self.render_delta_join(inputs, delta_plan)
1168 }
1169 }
1170 }
1171 Reduce {
1172 input,
1173 key_val_plan,
1174 plan,
1175 input_key,
1176 mfp_after,
1177 } => {
1178 let input = expect_input(input);
1179 let mfp_option = (!mfp_after.is_identity()).then_some(mfp_after);
1180 self.render_reduce(input, key_val_plan, plan, input_key, mfp_option)
1181 }
1182 TopK { input, top_k_plan } => {
1183 let input = expect_input(input);
1184 self.render_topk(input, top_k_plan)
1185 }
1186 Negate { input } => {
1187 let input = expect_input(input);
1188 let (oks, errs) = input.as_specific_collection(None, &self.config_set);
1189 CollectionBundle::from_collections(oks.negate(), errs)
1190 }
1191 Threshold {
1192 input,
1193 threshold_plan,
1194 } => {
1195 let input = expect_input(input);
1196 self.render_threshold(input, threshold_plan)
1197 }
1198 Union {
1199 inputs,
1200 consolidate_output,
1201 } => {
1202 let mut oks = Vec::new();
1203 let mut errs = Vec::new();
1204 for input in inputs.into_iter() {
1205 let (os, es) =
1206 expect_input(input).as_specific_collection(None, &self.config_set);
1207 oks.push(os);
1208 errs.push(es);
1209 }
1210 let mut oks = differential_dataflow::collection::concatenate(&mut self.scope, oks);
1211 if consolidate_output {
1212 oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("UnionConsolidation")
1213 }
1214 let errs = differential_dataflow::collection::concatenate(&mut self.scope, errs);
1215 CollectionBundle::from_collections(oks, errs)
1216 }
1217 ArrangeBy {
1218 input,
1219 forms: keys,
1220 input_key,
1221 input_mfp,
1222 } => {
1223 let input = expect_input(input);
1224 input.ensure_collections(
1225 keys,
1226 input_key,
1227 input_mfp,
1228 self.until.clone(),
1229 &self.config_set,
1230 )
1231 }
1232 }
1233 }
1234
1235 fn log_dataflow_global_id(&self, dataflow_index: usize, global_id: GlobalId) {
1236 if let Some(logger) = &self.compute_logger {
1237 logger.log(&ComputeEvent::DataflowGlobal(DataflowGlobal {
1238 dataflow_index,
1239 global_id,
1240 }));
1241 }
1242 }
1243
1244 fn log_lir_mapping(&self, global_id: GlobalId, mapping: Vec<(LirId, LirMetadata)>) {
1245 if let Some(logger) = &self.compute_logger {
1246 logger.log(&ComputeEvent::LirMapping(LirMapping { global_id, mapping }));
1247 }
1248 }
1249
1250 fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: LirId) {
1251 match bundle.arranged.values_mut().next() {
1271 Some(arrangement) => {
1272 use ArrangementFlavor::*;
1273
1274 match arrangement {
1275 Local(a, _) => {
1276 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1277 }
1278 Trace(_, a, _) => {
1279 a.stream = self.log_operator_hydration_inner(&a.stream, lir_id);
1280 }
1281 }
1282 }
1283 None => {
1284 let (oks, _) = bundle
1285 .collection
1286 .as_mut()
1287 .expect("CollectionBundle invariant");
1288 let stream = self.log_operator_hydration_inner(&oks.inner, lir_id);
1289 *oks = stream.as_collection();
1290 }
1291 }
1292 }
1293
1294 fn log_operator_hydration_inner<D>(&self, stream: &Stream<G, D>, lir_id: LirId) -> Stream<G, D>
1295 where
1296 D: Clone + 'static,
1297 {
1298 let Some(logger) = self.hydration_logger.clone() else {
1299 return stream.clone(); };
1301
1302 let mut hydration_frontier = Antichain::new();
1310 for time in self.as_of_frontier.iter() {
1311 if let Some(time) = time.try_step_forward() {
1312 hydration_frontier.insert(Refines::to_inner(time));
1313 }
1314 }
1315
1316 let name = format!("LogOperatorHydration ({lir_id})");
1317 stream.unary_frontier(Pipeline, &name, |_cap, _info| {
1318 let mut hydrated = false;
1319 logger.log(lir_id, hydrated);
1320
1321 move |input, output| {
1322 input.for_each(|cap, data| {
1324 output.session(&cap).give_container(data);
1325 });
1326
1327 if hydrated {
1328 return;
1329 }
1330
1331 let frontier = input.frontier().frontier();
1332 if PartialOrder::less_equal(&hydration_frontier.borrow(), &frontier) {
1333 hydrated = true;
1334 logger.log(lir_id, hydrated);
1335 }
1336 }
1337 })
1338 }
1339}
1340
1341#[allow(dead_code)] pub trait RenderTimestamp:
1344 Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation + Columnar
1345where
1346 <Self as Columnar>::Container: Clone + Send,
1347{
1348 fn system_time(&mut self) -> &mut mz_repr::Timestamp;
1353 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1355 fn event_time(&self) -> mz_repr::Timestamp;
1357 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp;
1359 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary;
1361 fn step_back(&self) -> Self;
1364}
1365
1366impl RenderTimestamp for mz_repr::Timestamp {
1367 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1368 self
1369 }
1370 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1371 delay
1372 }
1373 fn event_time(&self) -> mz_repr::Timestamp {
1374 *self
1375 }
1376 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1377 self
1378 }
1379 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1380 delay
1381 }
1382 fn step_back(&self) -> Self {
1383 self.saturating_sub(1)
1384 }
1385}
1386
1387impl RenderTimestamp for Product<mz_repr::Timestamp, PointStamp<u64>> {
1388 fn system_time(&mut self) -> &mut mz_repr::Timestamp {
1389 &mut self.outer
1390 }
1391 fn system_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1392 Product::new(delay, Default::default())
1393 }
1394 fn event_time(&self) -> mz_repr::Timestamp {
1395 self.outer
1396 }
1397 fn event_time_mut(&mut self) -> &mut mz_repr::Timestamp {
1398 &mut self.outer
1399 }
1400 fn event_delay(delay: mz_repr::Timestamp) -> <Self as Timestamp>::Summary {
1401 Product::new(delay, Default::default())
1402 }
1403 fn step_back(&self) -> Self {
1404 let inner = self.inner.clone();
1408 let mut vec = inner.into_vec();
1409 for item in vec.iter_mut() {
1410 *item = item.saturating_sub(1);
1411 }
1412 Product::new(self.outer.saturating_sub(1), PointStamp::new(vec))
1413 }
1414}
1415
1416#[derive(Clone)]
1426pub(crate) struct StartSignal {
1427 fut: futures::future::Shared<oneshot::Receiver<Infallible>>,
1432 token_ref: Weak<RefCell<Box<dyn Any>>>,
1434}
1435
1436impl StartSignal {
1437 pub fn new() -> (Self, Rc<dyn Any>) {
1440 let (tx, rx) = oneshot::channel::<Infallible>();
1441 let token: Rc<RefCell<Box<dyn Any>>> = Rc::new(RefCell::new(Box::new(tx)));
1442 let signal = Self {
1443 fut: rx.shared(),
1444 token_ref: Rc::downgrade(&token),
1445 };
1446 (signal, token)
1447 }
1448
1449 pub fn has_fired(&self) -> bool {
1450 self.token_ref.strong_count() == 0
1451 }
1452
1453 pub fn drop_on_fire(&self, to_drop: Box<dyn Any>) {
1454 if let Some(token) = self.token_ref.upgrade() {
1455 let mut token = token.borrow_mut();
1456 let inner = std::mem::replace(&mut *token, Box::new(()));
1457 *token = Box::new((inner, to_drop));
1458 }
1459 }
1460}
1461
1462impl Future for StartSignal {
1463 type Output = ();
1464
1465 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1466 self.fut.poll_unpin(cx).map(|_| ())
1467 }
1468}
1469
1470pub(crate) trait WithStartSignal {
1472 fn with_start_signal(self, signal: StartSignal) -> Self;
1477}
1478
1479impl<S, Tr> WithStartSignal for Arranged<S, Tr>
1480where
1481 S: Scope,
1482 S::Timestamp: RenderTimestamp,
1483 <S::Timestamp as Columnar>::Container: Clone + Send,
1484 Tr: TraceReader + Clone,
1485{
1486 fn with_start_signal(self, signal: StartSignal) -> Self {
1487 Arranged {
1488 stream: self.stream.with_start_signal(signal),
1489 trace: self.trace,
1490 }
1491 }
1492}
1493
1494impl<S, D> WithStartSignal for Stream<S, D>
1495where
1496 S: Scope,
1497 D: timely::Data,
1498{
1499 fn with_start_signal(self, signal: StartSignal) -> Self {
1500 self.unary(Pipeline, "StartSignal", |_cap, info| {
1501 let token = Box::new(ActivateOnDrop::new(
1502 (),
1503 info.address,
1504 self.scope().activations(),
1505 ));
1506 signal.drop_on_fire(token);
1507
1508 let mut stash = Vec::new();
1509
1510 move |input, output| {
1511 if !signal.has_fired() {
1513 input.for_each(|cap, data| stash.push((cap, std::mem::take(data))));
1514 return;
1515 }
1516
1517 for (cap, mut data) in std::mem::take(&mut stash) {
1519 output.session(&cap).give_container(&mut data);
1520 }
1521
1522 input.for_each(|cap, data| {
1524 output.session(&cap).give_container(data);
1525 });
1526 }
1527 })
1528 }
1529}
1530
1531fn suppress_early_progress<G, D>(
1553 stream: Stream<G, D>,
1554 as_of: Antichain<G::Timestamp>,
1555) -> Stream<G, D>
1556where
1557 G: Scope,
1558 D: Data,
1559{
1560 stream.unary_frontier(Pipeline, "SuppressEarlyProgress", |default_cap, _info| {
1561 let mut early_cap = Some(default_cap);
1562
1563 move |input, output| {
1564 input.for_each(|data_cap, data| {
1565 let mut session = if as_of.less_than(data_cap.time()) {
1566 output.session(&data_cap)
1567 } else {
1568 let cap = early_cap.as_ref().expect("early_cap can't be dropped yet");
1569 output.session(cap)
1570 };
1571 session.give_container(data);
1572 });
1573
1574 let frontier = input.frontier().frontier();
1575 if !PartialOrder::less_equal(&frontier, &as_of.borrow()) {
1576 early_cap.take();
1577 }
1578 }
1579 })
1580}
1581
1582trait LimitProgress<T: Timestamp> {
1584 fn limit_progress(
1614 &self,
1615 handle: MzProbeHandle<T>,
1616 slack_ms: u64,
1617 limit: Option<usize>,
1618 upper: Antichain<T>,
1619 name: String,
1620 ) -> (Rc<dyn Any>, Self);
1621}
1622
1623impl<G, D, R> LimitProgress<mz_repr::Timestamp> for StreamCore<G, Vec<(D, mz_repr::Timestamp, R)>>
1626where
1627 G: Scope<Timestamp = mz_repr::Timestamp>,
1628 D: timely::Data,
1629 R: timely::Data,
1630{
1631 fn limit_progress(
1632 &self,
1633 handle: MzProbeHandle<mz_repr::Timestamp>,
1634 slack_ms: u64,
1635 limit: Option<usize>,
1636 upper: Antichain<mz_repr::Timestamp>,
1637 name: String,
1638 ) -> (Rc<dyn Any>, Self) {
1639 let mut button = None;
1640
1641 let stream =
1642 self.unary_frontier(Pipeline, &format!("LimitProgress({name})"), |_cap, info| {
1643 let mut pending_times: BTreeSet<G::Timestamp> = BTreeSet::new();
1645 let mut retained_cap: Option<Capability<G::Timestamp>> = None;
1647
1648 let activator = self.scope().activator_for(info.address);
1649 handle.activate(activator.clone());
1650
1651 let shutdown = Rc::new(());
1652 button = Some(ShutdownButton::new(
1653 Rc::new(RefCell::new(Some(Rc::clone(&shutdown)))),
1654 activator,
1655 ));
1656 let shutdown = Rc::downgrade(&shutdown);
1657
1658 move |input, output| {
1659 if shutdown.strong_count() == 0 {
1661 retained_cap = None;
1662 pending_times.clear();
1663 while let Some(_) = input.next() {}
1664 return;
1665 }
1666
1667 while let Some((cap, data)) = input.next() {
1668 for time in data
1669 .iter()
1670 .flat_map(|(_, time, _)| u64::from(time).checked_add(slack_ms))
1671 {
1672 let rounded_time =
1673 (time / slack_ms).saturating_add(1).saturating_mul(slack_ms);
1674 if !upper.less_than(&rounded_time.into()) {
1675 pending_times.insert(rounded_time.into());
1676 }
1677 }
1678 output.session(&cap).give_container(data);
1679 if retained_cap.as_ref().is_none_or(|c| {
1680 !c.time().less_than(cap.time()) && !upper.less_than(cap.time())
1681 }) {
1682 retained_cap = Some(cap.retain());
1683 }
1684 }
1685
1686 handle.with_frontier(|f| {
1687 while pending_times
1688 .first()
1689 .map_or(false, |retained_time| !f.less_than(&retained_time))
1690 {
1691 let _ = pending_times.pop_first();
1692 }
1693 });
1694
1695 while limit.map_or(false, |limit| pending_times.len() > limit) {
1696 let _ = pending_times.pop_first();
1697 }
1698
1699 match (retained_cap.as_mut(), pending_times.first()) {
1700 (Some(cap), Some(first)) => cap.downgrade(first),
1701 (_, None) => retained_cap = None,
1702 _ => {}
1703 }
1704
1705 if input.frontier.is_empty() {
1706 retained_cap = None;
1707 pending_times.clear();
1708 }
1709
1710 if !pending_times.is_empty() {
1711 tracing::debug!(
1712 name,
1713 info.global_id,
1714 pending_times = %PendingTimesDisplay(pending_times.iter().cloned()),
1715 frontier = ?input.frontier.frontier().get(0),
1716 probe = ?handle.with_frontier(|f| f.get(0).cloned()),
1717 ?upper,
1718 "pending times",
1719 );
1720 }
1721 }
1722 });
1723 (Rc::new(button.unwrap().press_on_drop()), stream)
1724 }
1725}
1726
1727struct PendingTimesDisplay<T>(T);
1730
1731impl<T> std::fmt::Display for PendingTimesDisplay<T>
1732where
1733 T: IntoIterator<Item = mz_repr::Timestamp> + Clone,
1734{
1735 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1736 let mut iter = self.0.clone().into_iter();
1737 write!(f, "[")?;
1738 if let Some(first) = iter.next() {
1739 write!(f, "{}", first)?;
1740 let mut last = u64::from(first);
1741 for time in iter {
1742 write!(f, ", +{}", u64::from(time) - last)?;
1743 last = u64::from(time);
1744 }
1745 }
1746 write!(f, "]")?;
1747 Ok(())
1748 }
1749}
1750
1751#[derive(Clone, Copy, Debug)]
1754struct Pairer {
1755 split_arity: usize,
1756}
1757
1758impl Pairer {
1759 fn new(split_arity: usize) -> Self {
1761 Self { split_arity }
1762 }
1763
1764 fn merge<'a, I1, I2>(&self, first: I1, second: I2) -> Row
1766 where
1767 I1: IntoIterator<Item = Datum<'a>>,
1768 I2: IntoIterator<Item = Datum<'a>>,
1769 {
1770 SharedRow::pack(first.into_iter().chain(second))
1771 }
1772
1773 fn split<'a>(&self, datum_iter: impl IntoIterator<Item = Datum<'a>>) -> (Row, Row) {
1775 let mut datum_iter = datum_iter.into_iter();
1776 let binding = SharedRow::get();
1777 let mut row_builder = binding.borrow_mut();
1778 let first = row_builder.pack_using(datum_iter.by_ref().take(self.split_arity));
1779 let second = row_builder.pack_using(datum_iter);
1780 (first, second)
1781 }
1782}