1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23 ENABLE_COLUMN_PAGED_BATCHER, ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION,
24 ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Eval, Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::batcher;
34use mz_timely_util::columnar::builder::ColumnBuilder;
35use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange};
36use mz_timely_util::columnation::ColumnationChunker;
37use timely::ContainerBuilder;
38use timely::container::{CapacityContainerBuilder, PushInto};
39use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
40use timely::dataflow::operators::Capability;
41use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
42use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
43use timely::dataflow::{Scope, Stream};
44use timely::progress::operate::FrontierInterest;
45use timely::progress::{Antichain, Timestamp};
46
47use crate::compute_state::ComputeState;
48use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
49use crate::render::errors::{DataflowErrorSer, ErrorLogger};
50use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
51use crate::typedefs::{
52 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
53};
54use mz_row_spine::{DatumSeq, RowRowBuilder, RowRowColPagedBuilder};
55
56pub struct Context<'scope, T: RenderTimestamp> {
64 pub(crate) scope: Scope<'scope, T>,
68 pub debug_name: String,
70 pub dataflow_id: usize,
72 pub export_ids: Vec<GlobalId>,
74 pub as_of_frontier: Antichain<mz_repr::Timestamp>,
79 pub until: Antichain<mz_repr::Timestamp>,
82 pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
84 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
86 pub(super) linear_join_spec: LinearJoinSpec,
88 pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
91 pub config_set: Rc<ConfigSet>,
93}
94
95impl<'scope, T: RenderTimestamp> Context<'scope, T> {
96 pub fn for_dataflow_in<Plan>(
98 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
99 scope: Scope<'scope, T>,
100 compute_state: &ComputeState,
101 until: Antichain<mz_repr::Timestamp>,
102 dataflow_expiration: Antichain<mz_repr::Timestamp>,
103 ) -> Self {
104 use mz_ore::collections::CollectionExt as IteratorExt;
105 let dataflow_id = *scope.addr().into_first();
106 let as_of_frontier = dataflow
107 .as_of
108 .clone()
109 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
110
111 let export_ids = dataflow.export_ids().collect();
112
113 let compute_logger = if dataflow.is_transient() {
117 None
118 } else {
119 compute_state.compute_logger.clone()
120 };
121
122 Self {
123 scope,
124 debug_name: dataflow.debug_name.clone(),
125 dataflow_id,
126 export_ids,
127 as_of_frontier,
128 until,
129 bindings: BTreeMap::new(),
130 compute_logger,
131 linear_join_spec: compute_state.linear_join_spec,
132 dataflow_expiration,
133 config_set: Rc::clone(&compute_state.worker_config),
134 }
135 }
136}
137
138impl<'scope, T: RenderTimestamp> Context<'scope, T> {
139 pub fn insert_id(
144 &mut self,
145 id: Id,
146 collection: CollectionBundle<'scope, T>,
147 ) -> Option<CollectionBundle<'scope, T>> {
148 self.bindings.insert(id, collection)
149 }
150 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
154 self.bindings.remove(&id)
155 }
156 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
158 if !self.bindings.contains_key(&id) {
159 self.bindings.insert(id, collection);
160 } else {
161 let binding = self
162 .bindings
163 .get_mut(&id)
164 .expect("Binding verified to exist");
165 if collection.collection.is_some() {
166 binding.collection = collection.collection;
167 }
168 for (key, flavor) in collection.arranged.into_iter() {
169 binding.arranged.insert(key, flavor);
170 }
171 }
172 }
173 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
175 self.bindings.get(&id).cloned()
176 }
177
178 pub(super) fn error_logger(&self) -> ErrorLogger {
179 ErrorLogger::new(self.debug_name.clone())
180 }
181}
182
183impl<'scope, T: RenderTimestamp> Context<'scope, T> {
184 pub fn enter_region<'a>(
186 &self,
187 region: Scope<'a, T>,
188 bindings: Option<&std::collections::BTreeSet<Id>>,
189 ) -> Context<'a, T> {
190 let bindings = self
191 .bindings
192 .iter()
193 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
194 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
195 .collect();
196
197 Context {
198 scope: region,
199 debug_name: self.debug_name.clone(),
200 dataflow_id: self.dataflow_id.clone(),
201 export_ids: self.export_ids.clone(),
202 as_of_frontier: self.as_of_frontier.clone(),
203 until: self.until.clone(),
204 compute_logger: self.compute_logger.clone(),
205 linear_join_spec: self.linear_join_spec.clone(),
206 bindings,
207 dataflow_expiration: self.dataflow_expiration.clone(),
208 config_set: Rc::clone(&self.config_set),
209 }
210 }
211}
212
213#[derive(Clone)]
215pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
216 Local(
218 Arranged<'scope, RowRowAgent<T, Diff>>,
219 Arranged<'scope, ErrAgent<T, Diff>>,
220 ),
221 Trace(
226 GlobalId,
227 Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
228 Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
229 ),
230}
231
232impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
233 #[deprecated(note = "Use `flat_map` instead.")]
241 pub fn as_collection(
242 &self,
243 ) -> (
244 VecCollection<'scope, T, Row, Diff>,
245 VecCollection<'scope, T, DataflowErrorSer, Diff>,
246 ) {
247 let mut datums = DatumVec::new();
248 let logic = move |k: DatumSeq, v: DatumSeq| {
249 let mut datums_borrow = datums.borrow();
250 datums_borrow.extend(k);
251 datums_borrow.extend(v);
252 SharedRow::pack(&**datums_borrow)
253 };
254 match &self {
255 ArrangementFlavor::Local(oks, errs) => (
256 oks.clone().as_collection(logic),
257 errs.clone().as_collection(|k, &()| k.clone()),
258 ),
259 ArrangementFlavor::Trace(_, oks, errs) => (
260 oks.clone().as_collection(logic),
261 errs.clone().as_collection(|k, &()| k.clone()),
262 ),
263 }
264 }
265
266 pub fn flat_map<D, DCB, L>(
299 &self,
300 key: Option<&Row>,
301 max_demand: usize,
302 mut logic: L,
303 ) -> (
304 Stream<'scope, T, DCB::Container>,
305 VecCollection<'scope, T, DataflowErrorSer, Diff>,
306 )
307 where
308 D: Data,
309 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
310 L: for<'a, 'b> FnMut(
311 &'a mut DatumVecBorrow<'b>,
312 T,
313 Diff,
314 &mut Session<T, DCB>,
315 &mut Session<T, ECB<T>>,
316 ) -> usize
317 + 'static,
318 {
319 let mut datums = DatumVec::new();
320 let logic = move |k: DatumSeq,
321 v: DatumSeq,
322 t,
323 d,
324 ok_session: &mut Session<T, DCB>,
325 err_session: &mut Session<T, ECB<T>>| {
326 let mut datums_borrow = datums.borrow();
327 datums_borrow.extend(k.to_datum_iter().take(max_demand));
328 let max_demand = max_demand.saturating_sub(datums_borrow.len());
329 datums_borrow.extend(v.to_datum_iter().take(max_demand));
330 logic(&mut datums_borrow, t, d, ok_session, err_session)
331 };
332
333 match &self {
334 ArrangementFlavor::Local(oks, errs) => {
335 let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
336 oks.clone(),
337 key,
338 logic,
339 REFUEL,
340 );
341 let errs = errs.clone().as_collection(|k, &()| k.clone());
342 let errs = errs.concat(mfp_errs.as_collection());
343 (oks, errs)
344 }
345 ArrangementFlavor::Trace(_, oks, errs) => {
346 let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
347 oks.clone(),
348 key,
349 logic,
350 REFUEL,
351 );
352 let errs = errs.clone().as_collection(|k, &()| k.clone());
353 let errs = errs.concat(mfp_errs.as_collection());
354 (oks, errs)
355 }
356 }
357 }
358
359 pub fn flat_map_ok<D, DCB, L>(
364 &self,
365 key: Option<&Row>,
366 max_demand: usize,
367 mut logic: L,
368 ) -> (
369 Stream<'scope, T, DCB::Container>,
370 VecCollection<'scope, T, DataflowErrorSer, Diff>,
371 )
372 where
373 D: Data,
374 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
375 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session<T, DCB>) -> usize
376 + 'static,
377 {
378 let mut datums = DatumVec::new();
379 let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session<T, DCB>| {
380 let mut datums_borrow = datums.borrow();
381 datums_borrow.extend(k.to_datum_iter().take(max_demand));
382 let max_demand = max_demand.saturating_sub(datums_borrow.len());
383 datums_borrow.extend(v.to_datum_iter().take(max_demand));
384 logic(&mut datums_borrow, t, d, ok_session)
385 };
386
387 match &self {
388 ArrangementFlavor::Local(oks, errs) => {
389 let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
390 oks.clone(),
391 key,
392 logic,
393 REFUEL,
394 );
395 let errs = errs.clone().as_collection(|k, &()| k.clone());
396 (oks, errs)
397 }
398 ArrangementFlavor::Trace(_, oks, errs) => {
399 let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
400 oks.clone(),
401 key,
402 logic,
403 REFUEL,
404 );
405 let errs = errs.clone().as_collection(|k, &()| k.clone());
406 (oks, errs)
407 }
408 }
409 }
410}
411impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
412 pub fn scope(&self) -> Scope<'scope, T> {
414 match self {
415 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
416 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
417 }
418 }
419
420 pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
422 match self {
423 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
424 oks.clone().enter_region(region),
425 errs.clone().enter_region(region),
426 ),
427 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
428 *gid,
429 oks.clone().enter_region(region),
430 errs.clone().enter_region(region),
431 ),
432 }
433 }
434}
435impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
436 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
438 match self {
439 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
440 oks.clone().leave_region(outer),
441 errs.clone().leave_region(outer),
442 ),
443 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
444 *gid,
445 oks.clone().leave_region(outer),
446 errs.clone().leave_region(outer),
447 ),
448 }
449 }
450}
451
452#[derive(Clone)]
457pub struct CollectionBundle<'scope, T: RenderTimestamp> {
458 pub collection: Option<(
459 VecCollection<'scope, T, Row, Diff>,
460 VecCollection<'scope, T, DataflowErrorSer, Diff>,
461 )>,
462 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
463}
464
465impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
466 pub fn from_collections(
468 oks: VecCollection<'scope, T, Row, Diff>,
469 errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
470 ) -> Self {
471 Self {
472 collection: Some((oks, errs)),
473 arranged: BTreeMap::default(),
474 }
475 }
476
477 pub fn from_expressions(
479 exprs: Vec<MirScalarExpr>,
480 arrangements: ArrangementFlavor<'scope, T>,
481 ) -> Self {
482 let mut arranged = BTreeMap::new();
483 arranged.insert(exprs, arrangements);
484 Self {
485 collection: None,
486 arranged,
487 }
488 }
489
490 pub fn from_columns<I: IntoIterator<Item = usize>>(
492 columns: I,
493 arrangements: ArrangementFlavor<'scope, T>,
494 ) -> Self {
495 let mut keys = Vec::new();
496 for column in columns {
497 keys.push(MirScalarExpr::column(column));
498 }
499 Self::from_expressions(keys, arrangements)
500 }
501
502 pub fn scope(&self) -> Scope<'scope, T> {
504 if let Some((oks, _errs)) = &self.collection {
505 oks.inner.scope()
506 } else {
507 self.arranged
508 .values()
509 .next()
510 .expect("Must contain a valid collection")
511 .scope()
512 }
513 }
514
515 pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
517 CollectionBundle {
518 collection: self.collection.as_ref().map(|(oks, errs)| {
519 (
520 oks.clone().enter_region(region),
521 errs.clone().enter_region(region),
522 )
523 }),
524 arranged: self
525 .arranged
526 .iter()
527 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
528 .collect(),
529 }
530 }
531}
532
533impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
534 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
536 CollectionBundle {
537 collection: self.collection.as_ref().map(|(oks, errs)| {
538 (
539 oks.clone().leave_region(outer),
540 errs.clone().leave_region(outer),
541 )
542 }),
543 arranged: self
544 .arranged
545 .iter()
546 .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
547 .collect(),
548 }
549 }
550}
551
552impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
553 pub fn as_specific_collection(
566 &self,
567 key: Option<&[MirScalarExpr]>,
568 config_set: &ConfigSet,
569 ) -> (
570 VecCollection<'scope, T, Row, Diff>,
571 VecCollection<'scope, T, DataflowErrorSer, Diff>,
572 ) {
573 match key {
579 None => self
580 .collection
581 .clone()
582 .expect("The unarranged collection doesn't exist."),
583 Some(key) => {
584 let arranged = self.arranged.get(key).unwrap_or_else(|| {
585 panic!("The collection arranged by {:?} doesn't exist.", key)
586 });
587 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
588 let (ok, err) = arranged
592 .flat_map_ok::<_, CapacityContainerBuilder<Vec<(Row, T, Diff)>>, _>(
593 None,
594 usize::MAX,
595 |borrow, t, r, ok_session| {
596 ok_session.give((SharedRow::pack(borrow.iter()), t, r));
597 1
598 },
599 );
600 (ok.as_collection(), err)
601 } else {
602 #[allow(deprecated)]
603 arranged.as_collection()
604 }
605 }
606 }
607 }
608
609 pub fn flat_map<D, DCB, L>(
625 &self,
626 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
627 max_demand: usize,
628 mut logic: L,
629 ) -> (
630 Stream<'scope, T, DCB::Container>,
631 VecCollection<'scope, T, DataflowErrorSer, Diff>,
632 )
633 where
634 D: Data,
635 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
636 L: for<'a> FnMut(
637 &'a mut DatumVecBorrow<'_>,
638 T,
639 Diff,
640 &mut Session<T, DCB>,
641 &mut Session<T, ECB<T>>,
642 ) -> usize
643 + 'static,
644 {
645 if let Some((key, val)) = key_val {
649 self.arrangement(&key)
650 .expect("Should have ensured during planning that this arrangement exists.")
651 .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic)
652 } else {
653 let (oks, errs) = self
654 .collection
655 .clone()
656 .expect("Invariant violated: CollectionBundle contains no collection.");
657 let scope = oks.inner.scope();
658 let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope);
659 let (ok_output, ok_stream) = builder.new_output();
660 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
661 let (err_output, err_stream) = builder.new_output();
662 let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
663 let mut input = builder.new_input(oks.inner, Pipeline);
664 builder.build(move |_capabilities| {
665 let mut datums = DatumVec::new();
666 move |_frontiers| {
667 let mut ok_output = ok_output.activate();
668 let mut err_output = err_output.activate();
669 input.for_each(|time, data| {
670 let ok_cap = time.retain(0);
673 let err_cap = time.retain(1);
674 let mut ok_session = ok_output.session_with_builder(&ok_cap);
675 let mut err_session = err_output.session_with_builder(&err_cap);
676 for (v, t, d) in data.drain(..) {
677 logic(
678 &mut datums.borrow_with_limit(&v, max_demand),
679 t,
680 d,
681 &mut ok_session,
682 &mut err_session,
683 );
684 }
685 });
686 }
687 });
688 let errs = errs.concat(err_stream.as_collection());
689 (ok_stream, errs)
690 }
691 }
692
693 fn flat_map_core_fallible<Tr, D, DCB, L>(
704 trace: Arranged<'scope, Tr>,
705 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
706 mut logic: L,
707 refuel: usize,
708 ) -> (
709 Stream<'scope, T, DCB::Container>,
710 Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>,
711 )
712 where
713 Tr: for<'a> TraceReader<
714 Key<'a>: ToDatumIter,
715 Val<'a>: ToDatumIter,
716 Time = T,
717 Diff = mz_repr::Diff,
718 > + Clone
719 + 'static,
720 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
721 D: Data,
722 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
723 L: FnMut(
724 Tr::Key<'_>,
725 Tr::Val<'_>,
726 T,
727 mz_repr::Diff,
728 &mut Session<T, DCB>,
729 &mut Session<T, ECB<T>>,
730 ) -> usize
731 + 'static,
732 {
733 let scope = trace.stream.scope();
734
735 let mut key_con = Tr::KeyContainer::with_capacity(1);
736 if let Some(key) = &key {
737 key_con.push_own(key);
738 }
739 let mode = if key.is_some() { "index" } else { "scan" };
740 let name = format!("ArrangementFlatMap({})", mode);
741
742 let mut builder = OperatorBuilder::new(name, scope.clone());
743 let (ok_output, ok_stream) = builder.new_output();
744 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
745 let (err_output, err_stream) = builder.new_output();
746 let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
747 let mut input = builder.new_input(trace.stream.clone(), Pipeline);
748 let operator_info = builder.operator_info();
749
750 builder.build(move |_capabilities| {
751 let activator = scope.activator_for(operator_info.address);
753 let mut todo = std::collections::VecDeque::new();
755 move |_frontiers| {
756 let key = key_con.get(0);
757 let mut ok_output = ok_output.activate();
758 let mut err_output = err_output.activate();
759
760 input.for_each(|time, data| {
762 let ok_cap = time.retain(0);
765 let err_cap = time.retain(1);
766 for batch in data.iter() {
767 todo.push_back(PendingWork::new(
768 ok_cap.clone(),
769 err_cap.clone(),
770 batch.cursor(),
771 batch.clone(),
772 ));
773 }
774 });
775
776 let mut fuel = refuel;
778 while !todo.is_empty() && fuel > 0 {
779 todo.front_mut().unwrap().do_work(
780 key.as_ref(),
781 &mut logic,
782 &mut fuel,
783 &mut ok_output,
784 &mut err_output,
785 );
786 if fuel > 0 {
787 todo.pop_front();
788 }
789 }
790 if !todo.is_empty() {
792 activator.activate();
793 }
794 }
795 });
796
797 (ok_stream, err_stream)
798 }
799
800 fn flat_map_core_ok<Tr, D, DCB, L>(
806 trace: Arranged<'scope, Tr>,
807 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
808 mut logic: L,
809 refuel: usize,
810 ) -> Stream<'scope, T, DCB::Container>
811 where
812 Tr: for<'a> TraceReader<
813 Key<'a>: ToDatumIter,
814 Val<'a>: ToDatumIter,
815 Time = T,
816 Diff = mz_repr::Diff,
817 > + Clone
818 + 'static,
819 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
820 D: Data,
821 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
822 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session<T, DCB>) -> usize
823 + 'static,
824 {
825 let scope = trace.stream.scope();
826
827 let mut key_con = Tr::KeyContainer::with_capacity(1);
828 if let Some(key) = &key {
829 key_con.push_own(key);
830 }
831 let mode = if key.is_some() { "index" } else { "scan" };
832 let name = format!("ArrangementFlatMapOk({})", mode);
833
834 let mut builder = OperatorBuilder::new(name, scope.clone());
835 let (ok_output, ok_stream) = builder.new_output();
836 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
837 let mut input = builder.new_input(trace.stream.clone(), Pipeline);
838 let operator_info = builder.operator_info();
839
840 builder.build(move |_capabilities| {
841 let activator = scope.activator_for(operator_info.address);
842 let mut todo = std::collections::VecDeque::new();
843 move |_frontiers| {
844 let key = key_con.get(0);
845 let mut ok_output = ok_output.activate();
846
847 input.for_each(|time, data| {
848 let cap = time.retain(0);
849 for batch in data.iter() {
850 todo.push_back(PendingWorkOk::new(
851 cap.clone(),
852 batch.cursor(),
853 batch.clone(),
854 ));
855 }
856 });
857
858 let mut fuel = refuel;
859 while !todo.is_empty() && fuel > 0 {
860 todo.front_mut().unwrap().do_work(
861 key.as_ref(),
862 &mut logic,
863 &mut fuel,
864 &mut ok_output,
865 );
866 if fuel > 0 {
867 todo.pop_front();
868 }
869 }
870 if !todo.is_empty() {
871 activator.activate();
872 }
873 }
874 });
875
876 ok_stream
877 }
878
879 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
884 self.arranged.get(key).map(|x| x.clone())
885 }
886}
887
888impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
889 pub fn as_collection_core(
898 &self,
899 mut mfp: MapFilterProject,
900 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
901 until: Antichain<mz_repr::Timestamp>,
902 config_set: &ConfigSet,
903 ) -> (
904 VecCollection<'scope, T, mz_repr::Row, Diff>,
905 VecCollection<'scope, T, DataflowErrorSer, Diff>,
906 ) {
907 mfp.optimize();
908 let mfp_plan = mfp.clone().into_plan().unwrap();
909
910 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
916 true
917 } else {
918 false
919 };
920
921 if mfp_plan.is_identity() && !has_key_val {
922 let key = key_val.map(|(k, _v)| k);
923 return self.as_specific_collection(key.as_deref(), config_set);
924 }
925
926 let max_demand = mfp.demand().last().map(|x| *x + 1).unwrap_or(0);
927 mfp.permute_fn(|c| c, max_demand);
928 mfp.optimize();
929 let mfp_plan = mfp.into_plan().unwrap();
930
931 let mut datum_vec = DatumVec::new();
932 let until = std::rc::Rc::new(until);
934
935 let (stream, errors) = self
936 .flat_map::<_, ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>, _>(
937 key_val,
938 max_demand,
939 move |row_datums, time, diff, ok_session, err_session| {
940 let mut row_builder = SharedRow::get();
941 let until = std::rc::Rc::clone(&until);
942 let temp_storage = RowArena::new();
943 let row_iter = row_datums.iter();
944 let mut datums_local = datum_vec.borrow();
945 datums_local.extend(row_iter);
946 let event_time = time.event_time();
947 let mut work: usize = 0;
948 for result in mfp_plan.evaluate(
949 &mut datums_local,
950 &temp_storage,
951 event_time,
952 diff.clone(),
953 move |time| !until.less_equal(time),
954 &mut row_builder,
955 ) {
956 work += 1;
957 match result {
958 Ok((row, event_time, diff)) => {
959 let mut time: T = time.clone();
961 *time.event_time_mut() = event_time;
962 ok_session.give((row, time, diff));
963 }
964 Err((e, event_time, diff)) => {
965 let mut time: T = time.clone();
967 *time.event_time_mut() = event_time;
968 err_session.give((e, time, diff));
969 }
970 }
971 }
972 work
973 },
974 );
975
976 (stream.as_collection(), errors)
977 }
978 pub fn ensure_collections(
979 mut self,
980 collections: AvailableCollections,
981 input_key: Option<Vec<MirScalarExpr>>,
982 input_mfp: MapFilterProject,
983 as_of: Antichain<mz_repr::Timestamp>,
984 until: Antichain<mz_repr::Timestamp>,
985 config_set: &ConfigSet,
986 strategy: ArrangementStrategy,
987 ) -> Self
988 where
989 T: MaybeBucketByTime,
990 {
991 if collections == Default::default() {
992 return self;
993 }
994 for (key, _, _) in collections.arranged.iter() {
1003 soft_assert_or_log!(
1004 !self.arranged.contains_key(key),
1005 "LIR ArrangeBy tried to create an existing arrangement"
1006 );
1007 }
1008
1009 let mut bucketed = false;
1012
1013 let will_create_arrangement = collections
1017 .arranged
1018 .iter()
1019 .any(|(key, _, _)| !self.arranged.contains_key(key));
1020
1021 let form_raw_collection = collections.raw || will_create_arrangement;
1023 if form_raw_collection && self.collection.is_none() {
1024 let (oks, errs) =
1025 self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
1026 let effective_strategy = if will_create_arrangement {
1030 strategy
1031 } else {
1032 ArrangementStrategy::Direct
1033 };
1034 let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1035 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1036 {
1037 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1038 .get(config_set)
1039 .try_into()
1040 .expect("must fit");
1041 bucketed = true;
1042 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1043 } else {
1044 oks
1045 };
1046 self.collection = Some((oks, errs));
1047 }
1048 for (key, _, thinning) in collections.arranged {
1049 if !self.arranged.contains_key(&key) {
1050 let name = format!("ArrangeBy[{:?}]", key);
1052
1053 let (oks, errs) = self
1054 .collection
1055 .take()
1056 .expect("Collection constructed above");
1057 let effective_strategy = if bucketed {
1062 ArrangementStrategy::Direct
1063 } else {
1064 strategy
1065 };
1066 let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1067 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1068 {
1069 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1070 .get(config_set)
1071 .try_into()
1072 .expect("must fit");
1073 bucketed = true;
1074 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1075 } else {
1076 oks
1077 };
1078 let use_paged_path = ENABLE_COLUMN_PAGED_BATCHER.get(config_set);
1079 let (oks, errs_keyed, passthrough) = Self::arrange_collection(
1080 &name,
1081 oks,
1082 key.clone(),
1083 thinning.clone(),
1084 use_paged_path,
1085 );
1086 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
1087 self.collection = Some((passthrough, errs));
1088 let errs =
1089 errs_concat.mz_arrange::<
1090 ColumnationChunker<_>,
1091 ErrBatcher<_, _>,
1092 ErrBuilder<_, _>,
1093 ErrSpine<_, _>,
1094 >(
1095 &format!("{}-errors", name),
1096 );
1097 self.arranged
1098 .insert(key, ArrangementFlavor::Local(oks, errs));
1099 }
1100 }
1101 self
1102 }
1103
1104 fn arrange_collection(
1115 name: &String,
1116 oks: VecCollection<'scope, T, Row, Diff>,
1117 key: Vec<MirScalarExpr>,
1118 thinning: Vec<usize>,
1119 use_paged_path: bool,
1120 ) -> (
1121 Arranged<'scope, RowRowAgent<T, Diff>>,
1122 VecCollection<'scope, T, DataflowErrorSer, Diff>,
1123 VecCollection<'scope, T, Row, Diff>,
1124 ) {
1125 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
1130 let (ok_output, ok_stream) = builder.new_output();
1131 let mut ok_output =
1132 OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
1133 let (err_output, err_stream) = builder.new_output();
1134 let mut err_output = OutputBuilder::from(err_output);
1135 let (passthrough_output, passthrough_stream) = builder.new_output();
1136 let mut passthrough_output = OutputBuilder::from(passthrough_output);
1137 let mut input = builder.new_input(oks.inner, Pipeline);
1138 builder.set_notify_for(0, FrontierInterest::Never);
1139 builder.build(move |_capabilities| {
1140 let mut key_buf = Row::default();
1141 let mut val_buf = Row::default();
1142 let mut datums = DatumVec::new();
1143 let mut temp_storage = RowArena::new();
1144 move |_frontiers| {
1145 let mut ok_output = ok_output.activate();
1146 let mut err_output = err_output.activate();
1147 let mut passthrough_output = passthrough_output.activate();
1148 input.for_each(|time, data| {
1149 let mut ok_session = ok_output.session_with_builder(&time);
1150 let mut err_session = err_output.session(&time);
1151 for (row, time, diff) in data.iter() {
1152 temp_storage.clear();
1153 let datums = datums.borrow_with(row);
1154 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
1155 match key_buf.packer().try_extend(key_iter) {
1156 Ok(()) => {
1157 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1158 val_buf.packer().extend(val_datum_iter);
1159 ok_session.give(((&*key_buf, &*val_buf), time, diff));
1160 }
1161 Err(e) => {
1162 err_session.give((e.into(), time.clone(), *diff));
1163 }
1164 }
1165 }
1166 passthrough_output.session(&time).give_container(data);
1167 });
1168 }
1169 });
1170
1171 let exchange =
1172 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, T, Diff>);
1173 let oks = if use_paged_path {
1174 ok_stream.mz_arrange_core::<
1175 _,
1176 batcher::ColumnChunker<_>,
1177 Col2ValPagedBatcher<_, _, _, _>,
1178 RowRowColPagedBuilder<_, _>,
1179 RowRowSpine<_, _>,
1180 >(exchange, name)
1181 } else {
1182 ok_stream.mz_arrange_core::<
1183 _,
1184 batcher::Chunker<_>,
1185 Col2ValBatcher<_, _, _, _>,
1186 RowRowBuilder<_, _>,
1187 RowRowSpine<_, _>,
1188 >(exchange, name)
1189 };
1190 (
1191 oks,
1192 err_stream.as_collection(),
1193 passthrough_stream.as_collection(),
1194 )
1195 }
1196}
1197
1198type Session<'a, 'b, T, CB> =
1202 timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability<T>>;
1203
1204type ECB<T> = ConsolidatingContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>;
1209
1210const REFUEL: usize = 1_000_000;
1214
1215struct PendingWork<C>
1216where
1217 C: Cursor,
1218{
1219 ok_capability: Capability<C::Time>,
1221 err_capability: Capability<C::Time>,
1223 cursor: C,
1224 batch: C::Storage,
1225}
1226
1227impl<C> PendingWork<C>
1228where
1229 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1230{
1231 fn new(
1234 ok_capability: Capability<C::Time>,
1235 err_capability: Capability<C::Time>,
1236 cursor: C,
1237 batch: C::Storage,
1238 ) -> Self {
1239 Self {
1240 ok_capability,
1241 err_capability,
1242 cursor,
1243 batch,
1244 }
1245 }
1246 fn do_work<D, DCB, L>(
1249 &mut self,
1250 key: Option<&C::Key<'_>>,
1251 logic: &mut L,
1252 fuel: &mut usize,
1253 ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1254 err_output: &mut OutputBuilderSession<'_, C::Time, ECB<C::Time>>,
1255 ) where
1256 D: Data,
1257 DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1258 L: FnMut(
1259 C::Key<'_>,
1260 C::Val<'_>,
1261 C::Time,
1262 C::Diff,
1263 &mut Session<C::Time, DCB>,
1264 &mut Session<C::Time, ECB<C::Time>>,
1265 ) -> usize
1266 + 'static,
1267 {
1268 let mut ok_session = ok_output.session_with_builder(&self.ok_capability);
1269 let mut err_session = err_output.session_with_builder(&self.err_capability);
1270 walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1271 logic(k, v, t, d, &mut ok_session, &mut err_session)
1272 });
1273 }
1274}
1275
1276struct PendingWorkOk<C>
1279where
1280 C: Cursor,
1281{
1282 capability: Capability<C::Time>,
1283 cursor: C,
1284 batch: C::Storage,
1285}
1286
1287impl<C> PendingWorkOk<C>
1288where
1289 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1290{
1291 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
1292 Self {
1293 capability,
1294 cursor,
1295 batch,
1296 }
1297 }
1298
1299 fn do_work<D, DCB, L>(
1302 &mut self,
1303 key: Option<&C::Key<'_>>,
1304 logic: &mut L,
1305 fuel: &mut usize,
1306 ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1307 ) where
1308 D: Data,
1309 DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1310 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session<C::Time, DCB>) -> usize
1311 + 'static,
1312 {
1313 let mut ok_session = ok_output.session_with_builder(&self.capability);
1314 walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1315 logic(k, v, t, d, &mut ok_session)
1316 });
1317 }
1318}
1319
1320fn walk_cursor<C, F>(
1330 cursor: &mut C,
1331 batch: &C::Storage,
1332 key: Option<&C::Key<'_>>,
1333 fuel: &mut usize,
1334 mut emit: F,
1335) where
1336 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1337 F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize,
1338{
1339 use differential_dataflow::consolidation::consolidate;
1340
1341 let mut work: usize = 0;
1342 let mut buffer = Vec::new();
1343 if let Some(key) = key {
1344 let key = C::KeyContainer::reborrow(*key);
1345 if cursor.get_key(batch).map(|k| k == key) != Some(true) {
1346 cursor.seek_key(batch, key);
1347 }
1348 if cursor.get_key(batch).map(|k| k == key) == Some(true) {
1349 let key = cursor.key(batch);
1350 while let Some(val) = cursor.get_val(batch) {
1351 cursor.map_times(batch, |time, diff| {
1352 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1353 });
1354 consolidate(&mut buffer);
1355 for (time, diff) in buffer.drain(..) {
1356 work += emit(key, val, time, diff);
1357 }
1358 cursor.step_val(batch);
1359 if work >= *fuel {
1360 *fuel = 0;
1361 return;
1362 }
1363 }
1364 }
1365 } else {
1366 while let Some(key) = cursor.get_key(batch) {
1367 while let Some(val) = cursor.get_val(batch) {
1368 cursor.map_times(batch, |time, diff| {
1369 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1370 });
1371 consolidate(&mut buffer);
1372 for (time, diff) in buffer.drain(..) {
1373 work += emit(key, val, time, diff);
1374 }
1375 cursor.step_val(batch);
1376 if work >= *fuel {
1377 *fuel = 0;
1378 return;
1379 }
1380 }
1381 cursor.step_key(batch);
1382 }
1383 }
1384 *fuel -= work;
1385}