1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23 ENABLE_COLUMN_PAGED_BATCHER, ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION,
24 ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Eval, Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ExtendDatums;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::batcher;
34use mz_timely_util::columnar::builder::ColumnBuilder;
35use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange};
36use mz_timely_util::columnation::ColumnationChunker;
37use timely::ContainerBuilder;
38use timely::container::{CapacityContainerBuilder, PushInto};
39use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
40use timely::dataflow::operators::Capability;
41use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
42use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
43use timely::dataflow::{Scope, Stream};
44use timely::progress::operate::FrontierInterest;
45use timely::progress::{Antichain, Timestamp};
46
47use crate::compute_state::ComputeState;
48use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
49use crate::render::errors::{DataflowErrorSer, ErrorLogger};
50use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
51use crate::typedefs::{
52 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
53};
54use mz_row_spine::{DatumSeq, RowRowBuilder, RowRowColPagedBuilder};
55
56pub struct Context<'scope, T: RenderTimestamp> {
64 pub(crate) scope: Scope<'scope, T>,
68 pub debug_name: String,
70 pub dataflow_id: usize,
72 pub export_ids: Vec<GlobalId>,
74 pub as_of_frontier: Antichain<mz_repr::Timestamp>,
79 pub until: Antichain<mz_repr::Timestamp>,
82 pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
84 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
86 pub(super) linear_join_spec: LinearJoinSpec,
88 pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
91 pub config_set: Rc<ConfigSet>,
93}
94
95impl<'scope, T: RenderTimestamp> Context<'scope, T> {
96 pub fn for_dataflow_in<Plan>(
98 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
99 scope: Scope<'scope, T>,
100 compute_state: &ComputeState,
101 until: Antichain<mz_repr::Timestamp>,
102 dataflow_expiration: Antichain<mz_repr::Timestamp>,
103 ) -> Self {
104 use mz_ore::collections::CollectionExt as IteratorExt;
105 let dataflow_id = *scope.addr().into_first();
106 let as_of_frontier = dataflow
107 .as_of
108 .clone()
109 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
110
111 let export_ids = dataflow.export_ids().collect();
112
113 let compute_logger = if dataflow.is_transient() {
117 None
118 } else {
119 compute_state.compute_logger.clone()
120 };
121
122 Self {
123 scope,
124 debug_name: dataflow.debug_name.clone(),
125 dataflow_id,
126 export_ids,
127 as_of_frontier,
128 until,
129 bindings: BTreeMap::new(),
130 compute_logger,
131 linear_join_spec: compute_state.linear_join_spec,
132 dataflow_expiration,
133 config_set: Rc::clone(&compute_state.worker_config),
134 }
135 }
136}
137
138impl<'scope, T: RenderTimestamp> Context<'scope, T> {
139 pub fn insert_id(
144 &mut self,
145 id: Id,
146 collection: CollectionBundle<'scope, T>,
147 ) -> Option<CollectionBundle<'scope, T>> {
148 self.bindings.insert(id, collection)
149 }
150 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
154 self.bindings.remove(&id)
155 }
156 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
158 if !self.bindings.contains_key(&id) {
159 self.bindings.insert(id, collection);
160 } else {
161 let binding = self
162 .bindings
163 .get_mut(&id)
164 .expect("Binding verified to exist");
165 if collection.collection.is_some() {
166 binding.collection = collection.collection;
167 }
168 for (key, flavor) in collection.arranged.into_iter() {
169 binding.arranged.insert(key, flavor);
170 }
171 }
172 }
173 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
175 self.bindings.get(&id).cloned()
176 }
177
178 pub(super) fn error_logger(&self) -> ErrorLogger {
179 ErrorLogger::new(self.debug_name.clone())
180 }
181}
182
183impl<'scope, T: RenderTimestamp> Context<'scope, T> {
184 pub fn enter_region<'a>(
186 &self,
187 region: Scope<'a, T>,
188 bindings: Option<&std::collections::BTreeSet<Id>>,
189 ) -> Context<'a, T> {
190 let bindings = self
191 .bindings
192 .iter()
193 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
194 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
195 .collect();
196
197 Context {
198 scope: region,
199 debug_name: self.debug_name.clone(),
200 dataflow_id: self.dataflow_id.clone(),
201 export_ids: self.export_ids.clone(),
202 as_of_frontier: self.as_of_frontier.clone(),
203 until: self.until.clone(),
204 compute_logger: self.compute_logger.clone(),
205 linear_join_spec: self.linear_join_spec.clone(),
206 bindings,
207 dataflow_expiration: self.dataflow_expiration.clone(),
208 config_set: Rc::clone(&self.config_set),
209 }
210 }
211}
212
213#[derive(Clone)]
215pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
216 Local(
218 Arranged<'scope, RowRowAgent<T, Diff>>,
219 Arranged<'scope, ErrAgent<T, Diff>>,
220 ),
221 Trace(
226 GlobalId,
227 Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
228 Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
229 ),
230}
231
232impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
233 #[deprecated(note = "Use `flat_map` instead.")]
241 pub fn as_collection(
242 &self,
243 ) -> (
244 VecCollection<'scope, T, Row, Diff>,
245 VecCollection<'scope, T, DataflowErrorSer, Diff>,
246 ) {
247 let mut datums = DatumVec::new();
248 let logic = move |k: DatumSeq, v: DatumSeq| {
249 let temp_storage = RowArena::new();
250 let mut datums_borrow = datums.borrow();
251 k.extend_datums(&temp_storage, &mut datums_borrow, None);
252 v.extend_datums(&temp_storage, &mut datums_borrow, None);
253 SharedRow::pack(&**datums_borrow)
254 };
255 match &self {
256 ArrangementFlavor::Local(oks, errs) => (
257 oks.clone().as_collection(logic),
258 errs.clone().as_collection(|k, &()| k.clone()),
259 ),
260 ArrangementFlavor::Trace(_, oks, errs) => (
261 oks.clone().as_collection(logic),
262 errs.clone().as_collection(|k, &()| k.clone()),
263 ),
264 }
265 }
266
267 pub fn flat_map<D, DCB, L>(
300 &self,
301 key: Option<&Row>,
302 max_demand: usize,
303 logic: L,
304 ) -> (
305 Stream<'scope, T, DCB::Container>,
306 VecCollection<'scope, T, DataflowErrorSer, Diff>,
307 )
308 where
309 D: Data,
310 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
311 L: for<'a, 'b> FnMut(
312 &'a mut DatumVecBorrow<'b>,
313 T,
314 Diff,
315 &mut Session<T, DCB>,
316 &mut Session<T, ECB<T>>,
317 ) -> usize
318 + 'static,
319 {
320 match &self {
323 ArrangementFlavor::Local(oks, errs) => {
324 let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
325 oks.clone(),
326 key,
327 max_demand,
328 logic,
329 REFUEL,
330 );
331 let errs = errs.clone().as_collection(|k, &()| k.clone());
332 let errs = errs.concat(mfp_errs.as_collection());
333 (oks, errs)
334 }
335 ArrangementFlavor::Trace(_, oks, errs) => {
336 let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
337 oks.clone(),
338 key,
339 max_demand,
340 logic,
341 REFUEL,
342 );
343 let errs = errs.clone().as_collection(|k, &()| k.clone());
344 let errs = errs.concat(mfp_errs.as_collection());
345 (oks, errs)
346 }
347 }
348 }
349
350 pub fn flat_map_ok<D, DCB, L>(
355 &self,
356 key: Option<&Row>,
357 max_demand: usize,
358 logic: L,
359 ) -> (
360 Stream<'scope, T, DCB::Container>,
361 VecCollection<'scope, T, DataflowErrorSer, Diff>,
362 )
363 where
364 D: Data,
365 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
366 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session<T, DCB>) -> usize
367 + 'static,
368 {
369 match &self {
370 ArrangementFlavor::Local(oks, errs) => {
371 let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
372 oks.clone(),
373 key,
374 max_demand,
375 logic,
376 REFUEL,
377 );
378 let errs = errs.clone().as_collection(|k, &()| k.clone());
379 (oks, errs)
380 }
381 ArrangementFlavor::Trace(_, oks, errs) => {
382 let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
383 oks.clone(),
384 key,
385 max_demand,
386 logic,
387 REFUEL,
388 );
389 let errs = errs.clone().as_collection(|k, &()| k.clone());
390 (oks, errs)
391 }
392 }
393 }
394}
395impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
396 pub fn scope(&self) -> Scope<'scope, T> {
398 match self {
399 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
400 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
401 }
402 }
403
404 pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
406 match self {
407 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
408 oks.clone().enter_region(region),
409 errs.clone().enter_region(region),
410 ),
411 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
412 *gid,
413 oks.clone().enter_region(region),
414 errs.clone().enter_region(region),
415 ),
416 }
417 }
418}
419impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
420 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
422 match self {
423 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
424 oks.clone().leave_region(outer),
425 errs.clone().leave_region(outer),
426 ),
427 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
428 *gid,
429 oks.clone().leave_region(outer),
430 errs.clone().leave_region(outer),
431 ),
432 }
433 }
434}
435
436#[derive(Clone)]
441pub struct CollectionBundle<'scope, T: RenderTimestamp> {
442 pub collection: Option<(
443 VecCollection<'scope, T, Row, Diff>,
444 VecCollection<'scope, T, DataflowErrorSer, Diff>,
445 )>,
446 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
447}
448
449impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
450 pub fn from_collections(
452 oks: VecCollection<'scope, T, Row, Diff>,
453 errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
454 ) -> Self {
455 Self {
456 collection: Some((oks, errs)),
457 arranged: BTreeMap::default(),
458 }
459 }
460
461 pub fn from_expressions(
463 exprs: Vec<MirScalarExpr>,
464 arrangements: ArrangementFlavor<'scope, T>,
465 ) -> Self {
466 let mut arranged = BTreeMap::new();
467 arranged.insert(exprs, arrangements);
468 Self {
469 collection: None,
470 arranged,
471 }
472 }
473
474 pub fn from_columns<I: IntoIterator<Item = usize>>(
476 columns: I,
477 arrangements: ArrangementFlavor<'scope, T>,
478 ) -> Self {
479 let mut keys = Vec::new();
480 for column in columns {
481 keys.push(MirScalarExpr::column(column));
482 }
483 Self::from_expressions(keys, arrangements)
484 }
485
486 pub fn scope(&self) -> Scope<'scope, T> {
488 if let Some((oks, _errs)) = &self.collection {
489 oks.inner.scope()
490 } else {
491 self.arranged
492 .values()
493 .next()
494 .expect("Must contain a valid collection")
495 .scope()
496 }
497 }
498
499 pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
501 CollectionBundle {
502 collection: self.collection.as_ref().map(|(oks, errs)| {
503 (
504 oks.clone().enter_region(region),
505 errs.clone().enter_region(region),
506 )
507 }),
508 arranged: self
509 .arranged
510 .iter()
511 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
512 .collect(),
513 }
514 }
515}
516
517impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
518 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
520 CollectionBundle {
521 collection: self.collection.as_ref().map(|(oks, errs)| {
522 (
523 oks.clone().leave_region(outer),
524 errs.clone().leave_region(outer),
525 )
526 }),
527 arranged: self
528 .arranged
529 .iter()
530 .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
531 .collect(),
532 }
533 }
534}
535
536impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
537 pub fn as_specific_collection(
550 &self,
551 key: Option<&[MirScalarExpr]>,
552 config_set: &ConfigSet,
553 ) -> (
554 VecCollection<'scope, T, Row, Diff>,
555 VecCollection<'scope, T, DataflowErrorSer, Diff>,
556 ) {
557 match key {
563 None => self
564 .collection
565 .clone()
566 .expect("The unarranged collection doesn't exist."),
567 Some(key) => {
568 let arranged = self.arranged.get(key).unwrap_or_else(|| {
569 panic!("The collection arranged by {:?} doesn't exist.", key)
570 });
571 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
572 let (ok, err) = arranged
576 .flat_map_ok::<_, CapacityContainerBuilder<Vec<(Row, T, Diff)>>, _>(
577 None,
578 usize::MAX,
579 |borrow, t, r, ok_session| {
580 ok_session.give((SharedRow::pack(borrow.iter()), t, r));
581 1
582 },
583 );
584 (ok.as_collection(), err)
585 } else {
586 #[allow(deprecated)]
587 arranged.as_collection()
588 }
589 }
590 }
591 }
592
593 pub fn flat_map<D, DCB, L>(
609 &self,
610 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
611 max_demand: usize,
612 mut logic: L,
613 ) -> (
614 Stream<'scope, T, DCB::Container>,
615 VecCollection<'scope, T, DataflowErrorSer, Diff>,
616 )
617 where
618 D: Data,
619 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
620 L: for<'a> FnMut(
621 &'a mut DatumVecBorrow<'_>,
622 T,
623 Diff,
624 &mut Session<T, DCB>,
625 &mut Session<T, ECB<T>>,
626 ) -> usize
627 + 'static,
628 {
629 if let Some((key, val)) = key_val {
633 self.arrangement(&key)
634 .expect("Should have ensured during planning that this arrangement exists.")
635 .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic)
636 } else {
637 let (oks, errs) = self
638 .collection
639 .clone()
640 .expect("Invariant violated: CollectionBundle contains no collection.");
641 let scope = oks.inner.scope();
642 let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope);
643 let (ok_output, ok_stream) = builder.new_output();
644 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
645 let (err_output, err_stream) = builder.new_output();
646 let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
647 let mut input = builder.new_input(oks.inner, Pipeline);
648 builder.build(move |_capabilities| {
649 let mut datums = DatumVec::new();
650 move |_frontiers| {
651 let mut ok_output = ok_output.activate();
652 let mut err_output = err_output.activate();
653 input.for_each(|time, data| {
654 let ok_cap = time.retain(0);
657 let err_cap = time.retain(1);
658 let mut ok_session = ok_output.session_with_builder(&ok_cap);
659 let mut err_session = err_output.session_with_builder(&err_cap);
660 for (v, t, d) in data.drain(..) {
661 logic(
662 &mut datums.borrow_with_limit(&v, max_demand),
663 t,
664 d,
665 &mut ok_session,
666 &mut err_session,
667 );
668 }
669 });
670 }
671 });
672 let errs = errs.concat(err_stream.as_collection());
673 (ok_stream, errs)
674 }
675 }
676
677 fn flat_map_core_fallible<Tr, D, DCB, L>(
688 trace: Arranged<'scope, Tr>,
689 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
690 max_demand: usize,
691 mut logic: L,
692 refuel: usize,
693 ) -> (
694 Stream<'scope, T, DCB::Container>,
695 Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>,
696 )
697 where
698 Tr: for<'a> TraceReader<
699 Key<'a>: ExtendDatums,
700 Val<'a>: ExtendDatums,
701 Time = T,
702 Diff = mz_repr::Diff,
703 > + Clone
704 + 'static,
705 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
706 D: Data,
707 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
708 L: for<'a, 'b> FnMut(
712 &'a mut DatumVecBorrow<'b>,
713 T,
714 mz_repr::Diff,
715 &mut Session<T, DCB>,
716 &mut Session<T, ECB<T>>,
717 ) -> usize
718 + 'static,
719 {
720 let scope = trace.stream.scope();
721
722 let mut key_con = Tr::KeyContainer::with_capacity(1);
723 if let Some(key) = &key {
724 key_con.push_own(key);
725 }
726 let mode = if key.is_some() { "index" } else { "scan" };
727 let name = format!("ArrangementFlatMap({})", mode);
728
729 let mut builder = OperatorBuilder::new(name, scope.clone());
730 let (ok_output, ok_stream) = builder.new_output();
731 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
732 let (err_output, err_stream) = builder.new_output();
733 let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
734 let mut input = builder.new_input(trace.stream.clone(), Pipeline);
735 let operator_info = builder.operator_info();
736
737 builder.build(move |_capabilities| {
738 let activator = scope.activator_for(operator_info.address);
740 let mut todo = std::collections::VecDeque::new();
742 move |_frontiers| {
743 let key = key_con.get(0);
744 let mut ok_output = ok_output.activate();
745 let mut err_output = err_output.activate();
746
747 input.for_each(|time, data| {
749 let ok_cap = time.retain(0);
752 let err_cap = time.retain(1);
753 for batch in data.iter() {
754 todo.push_back(PendingWork::new(
755 ok_cap.clone(),
756 err_cap.clone(),
757 batch.cursor(),
758 batch.clone(),
759 ));
760 }
761 });
762
763 let mut temp_storage = RowArena::new();
768 let mut datums = DatumVec::new();
769 let mut decode_logic =
770 |k: Tr::Key<'_>,
771 v: Tr::Val<'_>,
772 t: T,
773 d: mz_repr::Diff,
774 ok_session: &mut Session<T, DCB>,
775 err_session: &mut Session<T, ECB<T>>| {
776 temp_storage.clear();
777 let mut datums_borrow = datums.borrow();
778 k.extend_datums(&temp_storage, &mut datums_borrow, Some(max_demand));
779 let remaining = max_demand.saturating_sub(datums_borrow.len());
780 v.extend_datums(&temp_storage, &mut datums_borrow, Some(remaining));
781 logic(&mut datums_borrow, t, d, ok_session, err_session)
782 };
783
784 let mut fuel = refuel;
786 while !todo.is_empty() && fuel > 0 {
787 todo.front_mut().unwrap().do_work(
788 key.as_ref(),
789 &mut decode_logic,
790 &mut fuel,
791 &mut ok_output,
792 &mut err_output,
793 );
794 if fuel > 0 {
795 todo.pop_front();
796 }
797 }
798 if !todo.is_empty() {
800 activator.activate();
801 }
802 }
803 });
804
805 (ok_stream, err_stream)
806 }
807
808 fn flat_map_core_ok<Tr, D, DCB, L>(
814 trace: Arranged<'scope, Tr>,
815 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
816 max_demand: usize,
817 mut logic: L,
818 refuel: usize,
819 ) -> Stream<'scope, T, DCB::Container>
820 where
821 Tr: for<'a> TraceReader<
822 Key<'a>: ExtendDatums,
823 Val<'a>: ExtendDatums,
824 Time = T,
825 Diff = mz_repr::Diff,
826 > + Clone
827 + 'static,
828 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
829 D: Data,
830 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
831 L: for<'a, 'b> FnMut(
834 &'a mut DatumVecBorrow<'b>,
835 T,
836 mz_repr::Diff,
837 &mut Session<T, DCB>,
838 ) -> usize
839 + 'static,
840 {
841 let scope = trace.stream.scope();
842
843 let mut key_con = Tr::KeyContainer::with_capacity(1);
844 if let Some(key) = &key {
845 key_con.push_own(key);
846 }
847 let mode = if key.is_some() { "index" } else { "scan" };
848 let name = format!("ArrangementFlatMapOk({})", mode);
849
850 let mut builder = OperatorBuilder::new(name, scope.clone());
851 let (ok_output, ok_stream) = builder.new_output();
852 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
853 let mut input = builder.new_input(trace.stream.clone(), Pipeline);
854 let operator_info = builder.operator_info();
855
856 builder.build(move |_capabilities| {
857 let activator = scope.activator_for(operator_info.address);
858 let mut todo = std::collections::VecDeque::new();
859 move |_frontiers| {
860 let key = key_con.get(0);
861 let mut ok_output = ok_output.activate();
862
863 input.for_each(|time, data| {
864 let cap = time.retain(0);
865 for batch in data.iter() {
866 todo.push_back(PendingWorkOk::new(
867 cap.clone(),
868 batch.cursor(),
869 batch.clone(),
870 ));
871 }
872 });
873
874 let mut temp_storage = RowArena::new();
876 let mut datums = DatumVec::new();
877 let mut decode_logic =
878 |k: Tr::Key<'_>,
879 v: Tr::Val<'_>,
880 t: T,
881 d: mz_repr::Diff,
882 ok_session: &mut Session<T, DCB>| {
883 temp_storage.clear();
884 let mut datums_borrow = datums.borrow();
885 k.extend_datums(&temp_storage, &mut datums_borrow, Some(max_demand));
886 let remaining = max_demand.saturating_sub(datums_borrow.len());
887 v.extend_datums(&temp_storage, &mut datums_borrow, Some(remaining));
888 logic(&mut datums_borrow, t, d, ok_session)
889 };
890
891 let mut fuel = refuel;
892 while !todo.is_empty() && fuel > 0 {
893 todo.front_mut().unwrap().do_work(
894 key.as_ref(),
895 &mut decode_logic,
896 &mut fuel,
897 &mut ok_output,
898 );
899 if fuel > 0 {
900 todo.pop_front();
901 }
902 }
903 if !todo.is_empty() {
904 activator.activate();
905 }
906 }
907 });
908
909 ok_stream
910 }
911
912 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
917 self.arranged.get(key).map(|x| x.clone())
918 }
919}
920
921impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
922 pub fn as_collection_core(
931 &self,
932 mut mfp: MapFilterProject,
933 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
934 until: Antichain<mz_repr::Timestamp>,
935 config_set: &ConfigSet,
936 ) -> (
937 VecCollection<'scope, T, mz_repr::Row, Diff>,
938 VecCollection<'scope, T, DataflowErrorSer, Diff>,
939 ) {
940 mfp.optimize();
941 let mfp_plan = mfp.clone().into_plan().unwrap();
942
943 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
949 true
950 } else {
951 false
952 };
953
954 if mfp_plan.is_identity() && !has_key_val {
955 let key = key_val.map(|(k, _v)| k);
956 return self.as_specific_collection(key.as_deref(), config_set);
957 }
958
959 let max_demand = mfp.demand().last().map(|x| *x + 1).unwrap_or(0);
960 mfp.permute_fn(|c| c, max_demand);
961 mfp.optimize();
962 let mfp_plan = mfp.into_plan().unwrap();
963
964 let mut datum_vec = DatumVec::new();
965 let until = std::rc::Rc::new(until);
967
968 let (stream, errors) = self
969 .flat_map::<_, ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>, _>(
970 key_val,
971 max_demand,
972 move |row_datums, time, diff, ok_session, err_session| {
973 let mut row_builder = SharedRow::get();
974 let until = std::rc::Rc::clone(&until);
975 let temp_storage = RowArena::new();
976 let row_iter = row_datums.iter();
977 let mut datums_local = datum_vec.borrow();
978 datums_local.extend(row_iter);
979 let event_time = time.event_time();
980 let mut work: usize = 0;
981 for result in mfp_plan.evaluate(
982 &mut datums_local,
983 &temp_storage,
984 event_time,
985 diff.clone(),
986 move |time| !until.less_equal(time),
987 &mut row_builder,
988 ) {
989 work += 1;
990 match result {
991 Ok((row, event_time, diff)) => {
992 let mut time: T = time.clone();
994 *time.event_time_mut() = event_time;
995 ok_session.give((row, time, diff));
996 }
997 Err((e, event_time, diff)) => {
998 let mut time: T = time.clone();
1000 *time.event_time_mut() = event_time;
1001 err_session.give((e, time, diff));
1002 }
1003 }
1004 }
1005 work
1006 },
1007 );
1008
1009 (stream.as_collection(), errors)
1010 }
1011 pub fn ensure_collections(
1012 mut self,
1013 collections: AvailableCollections,
1014 input_key: Option<Vec<MirScalarExpr>>,
1015 input_mfp: MapFilterProject,
1016 as_of: Antichain<mz_repr::Timestamp>,
1017 until: Antichain<mz_repr::Timestamp>,
1018 config_set: &ConfigSet,
1019 strategy: ArrangementStrategy,
1020 ) -> Self
1021 where
1022 T: MaybeBucketByTime,
1023 {
1024 if collections == Default::default() {
1025 return self;
1026 }
1027 for (key, _, _) in collections.arranged.iter() {
1036 soft_assert_or_log!(
1037 !self.arranged.contains_key(key),
1038 "LIR ArrangeBy tried to create an existing arrangement"
1039 );
1040 }
1041
1042 let mut bucketed = false;
1045
1046 let will_create_arrangement = collections
1050 .arranged
1051 .iter()
1052 .any(|(key, _, _)| !self.arranged.contains_key(key));
1053
1054 let form_raw_collection = collections.raw || will_create_arrangement;
1056 if form_raw_collection && self.collection.is_none() {
1057 let (oks, errs) =
1058 self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
1059 let effective_strategy = if will_create_arrangement {
1063 strategy
1064 } else {
1065 ArrangementStrategy::Direct
1066 };
1067 let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1068 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1069 {
1070 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1071 .get(config_set)
1072 .try_into()
1073 .expect("must fit");
1074 bucketed = true;
1075 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1076 } else {
1077 oks
1078 };
1079 self.collection = Some((oks, errs));
1080 }
1081 for (key, _, thinning) in collections.arranged {
1082 if !self.arranged.contains_key(&key) {
1083 let name = format!("ArrangeBy[{:?}]", key);
1085
1086 let (oks, errs) = self
1087 .collection
1088 .take()
1089 .expect("Collection constructed above");
1090 let effective_strategy = if bucketed {
1095 ArrangementStrategy::Direct
1096 } else {
1097 strategy
1098 };
1099 let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1100 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1101 {
1102 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1103 .get(config_set)
1104 .try_into()
1105 .expect("must fit");
1106 bucketed = true;
1107 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1108 } else {
1109 oks
1110 };
1111 let use_paged_path = ENABLE_COLUMN_PAGED_BATCHER.get(config_set);
1112 let (oks, errs_keyed, passthrough) = Self::arrange_collection(
1113 &name,
1114 oks,
1115 key.clone(),
1116 thinning.clone(),
1117 use_paged_path,
1118 );
1119 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
1120 self.collection = Some((passthrough, errs));
1121 let errs =
1122 errs_concat.mz_arrange::<
1123 ColumnationChunker<_>,
1124 ErrBatcher<_, _>,
1125 ErrBuilder<_, _>,
1126 ErrSpine<_, _>,
1127 >(
1128 &format!("{}-errors", name),
1129 );
1130 self.arranged
1131 .insert(key, ArrangementFlavor::Local(oks, errs));
1132 }
1133 }
1134 self
1135 }
1136
1137 fn arrange_collection(
1148 name: &String,
1149 oks: VecCollection<'scope, T, Row, Diff>,
1150 key: Vec<MirScalarExpr>,
1151 thinning: Vec<usize>,
1152 use_paged_path: bool,
1153 ) -> (
1154 Arranged<'scope, RowRowAgent<T, Diff>>,
1155 VecCollection<'scope, T, DataflowErrorSer, Diff>,
1156 VecCollection<'scope, T, Row, Diff>,
1157 ) {
1158 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
1163 let (ok_output, ok_stream) = builder.new_output();
1164 let mut ok_output =
1165 OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
1166 let (err_output, err_stream) = builder.new_output();
1167 let mut err_output = OutputBuilder::from(err_output);
1168 let (passthrough_output, passthrough_stream) = builder.new_output();
1169 let mut passthrough_output = OutputBuilder::from(passthrough_output);
1170 let mut input = builder.new_input(oks.inner, Pipeline);
1171 builder.set_notify_for(0, FrontierInterest::Never);
1172 builder.build(move |_capabilities| {
1173 let mut key_buf = Row::default();
1174 let mut val_buf = Row::default();
1175 let mut datums = DatumVec::new();
1176 move |_frontiers| {
1177 let mut temp_storage = RowArena::new();
1180 let mut ok_output = ok_output.activate();
1181 let mut err_output = err_output.activate();
1182 let mut passthrough_output = passthrough_output.activate();
1183 input.for_each(|time, data| {
1184 let mut ok_session = ok_output.session_with_builder(&time);
1185 let mut err_session = err_output.session(&time);
1186 for (row, time, diff) in data.iter() {
1187 temp_storage.clear();
1188 let datums = datums.borrow_with(row);
1189 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
1190 match key_buf.packer().try_extend(key_iter) {
1191 Ok(()) => {
1192 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1193 val_buf.packer().extend(val_datum_iter);
1194 ok_session.give(((&*key_buf, &*val_buf), time, diff));
1195 }
1196 Err(e) => {
1197 err_session.give((e.into(), time.clone(), *diff));
1198 }
1199 }
1200 }
1201 passthrough_output.session(&time).give_container(data);
1202 });
1203 }
1204 });
1205
1206 let exchange =
1207 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, T, Diff>);
1208 let oks = if use_paged_path {
1209 ok_stream.mz_arrange_core::<
1210 _,
1211 batcher::ColumnChunker<_>,
1212 Col2ValPagedBatcher<_, _, _, _>,
1213 RowRowColPagedBuilder<_, _>,
1214 RowRowSpine<_, _>,
1215 >(exchange, name)
1216 } else {
1217 ok_stream.mz_arrange_core::<
1218 _,
1219 batcher::Chunker<_>,
1220 Col2ValBatcher<_, _, _, _>,
1221 RowRowBuilder<_, _>,
1222 RowRowSpine<_, _>,
1223 >(exchange, name)
1224 };
1225 (
1226 oks,
1227 err_stream.as_collection(),
1228 passthrough_stream.as_collection(),
1229 )
1230 }
1231}
1232
1233type Session<'a, 'b, T, CB> =
1237 timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability<T>>;
1238
1239type ECB<T> = ConsolidatingContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>;
1244
1245const REFUEL: usize = 1_000_000;
1249
1250struct PendingWork<C>
1251where
1252 C: Cursor,
1253{
1254 ok_capability: Capability<C::Time>,
1256 err_capability: Capability<C::Time>,
1258 cursor: C,
1259 batch: C::Storage,
1260}
1261
1262impl<C> PendingWork<C>
1263where
1264 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1265{
1266 fn new(
1269 ok_capability: Capability<C::Time>,
1270 err_capability: Capability<C::Time>,
1271 cursor: C,
1272 batch: C::Storage,
1273 ) -> Self {
1274 Self {
1275 ok_capability,
1276 err_capability,
1277 cursor,
1278 batch,
1279 }
1280 }
1281 fn do_work<D, DCB, L>(
1284 &mut self,
1285 key: Option<&C::Key<'_>>,
1286 logic: &mut L,
1287 fuel: &mut usize,
1288 ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1289 err_output: &mut OutputBuilderSession<'_, C::Time, ECB<C::Time>>,
1290 ) where
1291 D: Data,
1292 DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1293 L: FnMut(
1294 C::Key<'_>,
1295 C::Val<'_>,
1296 C::Time,
1297 C::Diff,
1298 &mut Session<C::Time, DCB>,
1299 &mut Session<C::Time, ECB<C::Time>>,
1300 ) -> usize,
1301 {
1302 let mut ok_session = ok_output.session_with_builder(&self.ok_capability);
1303 let mut err_session = err_output.session_with_builder(&self.err_capability);
1304 walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1305 logic(k, v, t, d, &mut ok_session, &mut err_session)
1306 });
1307 }
1308}
1309
1310struct PendingWorkOk<C>
1313where
1314 C: Cursor,
1315{
1316 capability: Capability<C::Time>,
1317 cursor: C,
1318 batch: C::Storage,
1319}
1320
1321impl<C> PendingWorkOk<C>
1322where
1323 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1324{
1325 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
1326 Self {
1327 capability,
1328 cursor,
1329 batch,
1330 }
1331 }
1332
1333 fn do_work<D, DCB, L>(
1336 &mut self,
1337 key: Option<&C::Key<'_>>,
1338 logic: &mut L,
1339 fuel: &mut usize,
1340 ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1341 ) where
1342 D: Data,
1343 DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1344 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session<C::Time, DCB>) -> usize,
1345 {
1346 let mut ok_session = ok_output.session_with_builder(&self.capability);
1347 walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1348 logic(k, v, t, d, &mut ok_session)
1349 });
1350 }
1351}
1352
1353fn walk_cursor<C, F>(
1363 cursor: &mut C,
1364 batch: &C::Storage,
1365 key: Option<&C::Key<'_>>,
1366 fuel: &mut usize,
1367 mut emit: F,
1368) where
1369 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1370 F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize,
1371{
1372 use differential_dataflow::consolidation::consolidate;
1373
1374 let mut work: usize = 0;
1375 let mut buffer = Vec::new();
1376 if let Some(key) = key {
1377 let key = C::KeyContainer::reborrow(*key);
1378 if cursor.get_key(batch).map(|k| k == key) != Some(true) {
1379 cursor.seek_key(batch, key);
1380 }
1381 if cursor.get_key(batch).map(|k| k == key) == Some(true) {
1382 let key = cursor.key(batch);
1383 while let Some(val) = cursor.get_val(batch) {
1384 cursor.map_times(batch, |time, diff| {
1385 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1386 });
1387 consolidate(&mut buffer);
1388 for (time, diff) in buffer.drain(..) {
1389 work += emit(key, val, time, diff);
1390 }
1391 cursor.step_val(batch);
1392 if work >= *fuel {
1393 *fuel = 0;
1394 return;
1395 }
1396 }
1397 }
1398 } else {
1399 while let Some(key) = cursor.get_key(batch) {
1400 while let Some(val) = cursor.get_val(batch) {
1401 cursor.map_times(batch, |time, diff| {
1402 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1403 });
1404 consolidate(&mut buffer);
1405 for (time, diff) in buffer.drain(..) {
1406 work += emit(key, val, time, diff);
1407 }
1408 cursor.step_val(batch);
1409 if work >= *fuel {
1410 *fuel = 0;
1411 return;
1412 }
1413 }
1414 cursor.step_key(batch);
1415 }
1416 }
1417 *fuel -= work;
1418}