1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23 ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_COMPUTE_TEMPORAL_BUCKETING,
24 TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::builder::ColumnBuilder;
34use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
35use timely::ContainerBuilder;
36use timely::container::{CapacityContainerBuilder, PushInto};
37use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
38use timely::dataflow::operators::Capability;
39use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
40use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
41use timely::dataflow::{Scope, Stream};
42use timely::progress::operate::FrontierInterest;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::{DataflowErrorSer, ErrorLogger};
48use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
52};
53
54pub struct Context<'scope, T: RenderTimestamp> {
62 pub(crate) scope: Scope<'scope, T>,
66 pub debug_name: String,
68 pub dataflow_id: usize,
70 pub export_ids: Vec<GlobalId>,
72 pub as_of_frontier: Antichain<mz_repr::Timestamp>,
77 pub until: Antichain<mz_repr::Timestamp>,
80 pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
82 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
84 pub(super) linear_join_spec: LinearJoinSpec,
86 pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
89 pub config_set: Rc<ConfigSet>,
91}
92
93impl<'scope, T: RenderTimestamp> Context<'scope, T> {
94 pub fn for_dataflow_in<Plan>(
96 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
97 scope: Scope<'scope, T>,
98 compute_state: &ComputeState,
99 until: Antichain<mz_repr::Timestamp>,
100 dataflow_expiration: Antichain<mz_repr::Timestamp>,
101 ) -> Self {
102 use mz_ore::collections::CollectionExt as IteratorExt;
103 let dataflow_id = *scope.addr().into_first();
104 let as_of_frontier = dataflow
105 .as_of
106 .clone()
107 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
108
109 let export_ids = dataflow.export_ids().collect();
110
111 let compute_logger = if dataflow.is_transient() {
115 None
116 } else {
117 compute_state.compute_logger.clone()
118 };
119
120 Self {
121 scope,
122 debug_name: dataflow.debug_name.clone(),
123 dataflow_id,
124 export_ids,
125 as_of_frontier,
126 until,
127 bindings: BTreeMap::new(),
128 compute_logger,
129 linear_join_spec: compute_state.linear_join_spec,
130 dataflow_expiration,
131 config_set: Rc::clone(&compute_state.worker_config),
132 }
133 }
134}
135
136impl<'scope, T: RenderTimestamp> Context<'scope, T> {
137 pub fn insert_id(
142 &mut self,
143 id: Id,
144 collection: CollectionBundle<'scope, T>,
145 ) -> Option<CollectionBundle<'scope, T>> {
146 self.bindings.insert(id, collection)
147 }
148 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
152 self.bindings.remove(&id)
153 }
154 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
156 if !self.bindings.contains_key(&id) {
157 self.bindings.insert(id, collection);
158 } else {
159 let binding = self
160 .bindings
161 .get_mut(&id)
162 .expect("Binding verified to exist");
163 if collection.collection.is_some() {
164 binding.collection = collection.collection;
165 }
166 for (key, flavor) in collection.arranged.into_iter() {
167 binding.arranged.insert(key, flavor);
168 }
169 }
170 }
171 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
173 self.bindings.get(&id).cloned()
174 }
175
176 pub(super) fn error_logger(&self) -> ErrorLogger {
177 ErrorLogger::new(self.debug_name.clone())
178 }
179}
180
181impl<'scope, T: RenderTimestamp> Context<'scope, T> {
182 pub fn enter_region<'a>(
184 &self,
185 region: Scope<'a, T>,
186 bindings: Option<&std::collections::BTreeSet<Id>>,
187 ) -> Context<'a, T> {
188 let bindings = self
189 .bindings
190 .iter()
191 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
192 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
193 .collect();
194
195 Context {
196 scope: region,
197 debug_name: self.debug_name.clone(),
198 dataflow_id: self.dataflow_id.clone(),
199 export_ids: self.export_ids.clone(),
200 as_of_frontier: self.as_of_frontier.clone(),
201 until: self.until.clone(),
202 compute_logger: self.compute_logger.clone(),
203 linear_join_spec: self.linear_join_spec.clone(),
204 bindings,
205 dataflow_expiration: self.dataflow_expiration.clone(),
206 config_set: Rc::clone(&self.config_set),
207 }
208 }
209}
210
211#[derive(Clone)]
213pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
214 Local(
216 Arranged<'scope, RowRowAgent<T, Diff>>,
217 Arranged<'scope, ErrAgent<T, Diff>>,
218 ),
219 Trace(
224 GlobalId,
225 Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
226 Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
227 ),
228}
229
230impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
231 #[deprecated(note = "Use `flat_map` instead.")]
239 pub fn as_collection(
240 &self,
241 ) -> (
242 VecCollection<'scope, T, Row, Diff>,
243 VecCollection<'scope, T, DataflowErrorSer, Diff>,
244 ) {
245 let mut datums = DatumVec::new();
246 let logic = move |k: DatumSeq, v: DatumSeq| {
247 let mut datums_borrow = datums.borrow();
248 datums_borrow.extend(k);
249 datums_borrow.extend(v);
250 SharedRow::pack(&**datums_borrow)
251 };
252 match &self {
253 ArrangementFlavor::Local(oks, errs) => (
254 oks.clone().as_collection(logic),
255 errs.clone().as_collection(|k, &()| k.clone()),
256 ),
257 ArrangementFlavor::Trace(_, oks, errs) => (
258 oks.clone().as_collection(logic),
259 errs.clone().as_collection(|k, &()| k.clone()),
260 ),
261 }
262 }
263
264 pub fn flat_map<D, DCB, L>(
297 &self,
298 key: Option<&Row>,
299 max_demand: usize,
300 mut logic: L,
301 ) -> (
302 Stream<'scope, T, DCB::Container>,
303 VecCollection<'scope, T, DataflowErrorSer, Diff>,
304 )
305 where
306 D: Data,
307 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
308 L: for<'a, 'b> FnMut(
309 &'a mut DatumVecBorrow<'b>,
310 T,
311 Diff,
312 &mut Session<T, DCB>,
313 &mut Session<T, ECB<T>>,
314 ) -> usize
315 + 'static,
316 {
317 let mut datums = DatumVec::new();
318 let logic = move |k: DatumSeq,
319 v: DatumSeq,
320 t,
321 d,
322 ok_session: &mut Session<T, DCB>,
323 err_session: &mut Session<T, ECB<T>>| {
324 let mut datums_borrow = datums.borrow();
325 datums_borrow.extend(k.to_datum_iter().take(max_demand));
326 let max_demand = max_demand.saturating_sub(datums_borrow.len());
327 datums_borrow.extend(v.to_datum_iter().take(max_demand));
328 logic(&mut datums_borrow, t, d, ok_session, err_session)
329 };
330
331 match &self {
332 ArrangementFlavor::Local(oks, errs) => {
333 let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
334 oks.clone(),
335 key,
336 logic,
337 REFUEL,
338 );
339 let errs = errs.clone().as_collection(|k, &()| k.clone());
340 let errs = errs.concat(mfp_errs.as_collection());
341 (oks, errs)
342 }
343 ArrangementFlavor::Trace(_, oks, errs) => {
344 let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
345 oks.clone(),
346 key,
347 logic,
348 REFUEL,
349 );
350 let errs = errs.clone().as_collection(|k, &()| k.clone());
351 let errs = errs.concat(mfp_errs.as_collection());
352 (oks, errs)
353 }
354 }
355 }
356
357 pub fn flat_map_ok<D, DCB, L>(
362 &self,
363 key: Option<&Row>,
364 max_demand: usize,
365 mut logic: L,
366 ) -> (
367 Stream<'scope, T, DCB::Container>,
368 VecCollection<'scope, T, DataflowErrorSer, Diff>,
369 )
370 where
371 D: Data,
372 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
373 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session<T, DCB>) -> usize
374 + 'static,
375 {
376 let mut datums = DatumVec::new();
377 let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session<T, DCB>| {
378 let mut datums_borrow = datums.borrow();
379 datums_borrow.extend(k.to_datum_iter().take(max_demand));
380 let max_demand = max_demand.saturating_sub(datums_borrow.len());
381 datums_borrow.extend(v.to_datum_iter().take(max_demand));
382 logic(&mut datums_borrow, t, d, ok_session)
383 };
384
385 match &self {
386 ArrangementFlavor::Local(oks, errs) => {
387 let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
388 oks.clone(),
389 key,
390 logic,
391 REFUEL,
392 );
393 let errs = errs.clone().as_collection(|k, &()| k.clone());
394 (oks, errs)
395 }
396 ArrangementFlavor::Trace(_, oks, errs) => {
397 let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
398 oks.clone(),
399 key,
400 logic,
401 REFUEL,
402 );
403 let errs = errs.clone().as_collection(|k, &()| k.clone());
404 (oks, errs)
405 }
406 }
407 }
408}
409impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
410 pub fn scope(&self) -> Scope<'scope, T> {
412 match self {
413 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
414 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
415 }
416 }
417
418 pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
420 match self {
421 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
422 oks.clone().enter_region(region),
423 errs.clone().enter_region(region),
424 ),
425 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
426 *gid,
427 oks.clone().enter_region(region),
428 errs.clone().enter_region(region),
429 ),
430 }
431 }
432}
433impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
434 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
436 match self {
437 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
438 oks.clone().leave_region(outer),
439 errs.clone().leave_region(outer),
440 ),
441 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
442 *gid,
443 oks.clone().leave_region(outer),
444 errs.clone().leave_region(outer),
445 ),
446 }
447 }
448}
449
450#[derive(Clone)]
455pub struct CollectionBundle<'scope, T: RenderTimestamp> {
456 pub collection: Option<(
457 VecCollection<'scope, T, Row, Diff>,
458 VecCollection<'scope, T, DataflowErrorSer, Diff>,
459 )>,
460 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
461}
462
463impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
464 pub fn from_collections(
466 oks: VecCollection<'scope, T, Row, Diff>,
467 errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
468 ) -> Self {
469 Self {
470 collection: Some((oks, errs)),
471 arranged: BTreeMap::default(),
472 }
473 }
474
475 pub fn from_expressions(
477 exprs: Vec<MirScalarExpr>,
478 arrangements: ArrangementFlavor<'scope, T>,
479 ) -> Self {
480 let mut arranged = BTreeMap::new();
481 arranged.insert(exprs, arrangements);
482 Self {
483 collection: None,
484 arranged,
485 }
486 }
487
488 pub fn from_columns<I: IntoIterator<Item = usize>>(
490 columns: I,
491 arrangements: ArrangementFlavor<'scope, T>,
492 ) -> Self {
493 let mut keys = Vec::new();
494 for column in columns {
495 keys.push(MirScalarExpr::column(column));
496 }
497 Self::from_expressions(keys, arrangements)
498 }
499
500 pub fn scope(&self) -> Scope<'scope, T> {
502 if let Some((oks, _errs)) = &self.collection {
503 oks.inner.scope()
504 } else {
505 self.arranged
506 .values()
507 .next()
508 .expect("Must contain a valid collection")
509 .scope()
510 }
511 }
512
513 pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
515 CollectionBundle {
516 collection: self.collection.as_ref().map(|(oks, errs)| {
517 (
518 oks.clone().enter_region(region),
519 errs.clone().enter_region(region),
520 )
521 }),
522 arranged: self
523 .arranged
524 .iter()
525 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
526 .collect(),
527 }
528 }
529}
530
531impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
532 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
534 CollectionBundle {
535 collection: self.collection.as_ref().map(|(oks, errs)| {
536 (
537 oks.clone().leave_region(outer),
538 errs.clone().leave_region(outer),
539 )
540 }),
541 arranged: self
542 .arranged
543 .iter()
544 .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
545 .collect(),
546 }
547 }
548}
549
550impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
551 pub fn as_specific_collection(
564 &self,
565 key: Option<&[MirScalarExpr]>,
566 config_set: &ConfigSet,
567 ) -> (
568 VecCollection<'scope, T, Row, Diff>,
569 VecCollection<'scope, T, DataflowErrorSer, Diff>,
570 ) {
571 match key {
577 None => self
578 .collection
579 .clone()
580 .expect("The unarranged collection doesn't exist."),
581 Some(key) => {
582 let arranged = self.arranged.get(key).unwrap_or_else(|| {
583 panic!("The collection arranged by {:?} doesn't exist.", key)
584 });
585 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
586 let (ok, err) = arranged
590 .flat_map_ok::<_, CapacityContainerBuilder<Vec<(Row, T, Diff)>>, _>(
591 None,
592 usize::MAX,
593 |borrow, t, r, ok_session| {
594 ok_session.give((SharedRow::pack(borrow.iter()), t, r));
595 1
596 },
597 );
598 (ok.as_collection(), err)
599 } else {
600 #[allow(deprecated)]
601 arranged.as_collection()
602 }
603 }
604 }
605 }
606
607 pub fn flat_map<D, DCB, L>(
623 &self,
624 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
625 max_demand: usize,
626 mut logic: L,
627 ) -> (
628 Stream<'scope, T, DCB::Container>,
629 VecCollection<'scope, T, DataflowErrorSer, Diff>,
630 )
631 where
632 D: Data,
633 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
634 L: for<'a> FnMut(
635 &'a mut DatumVecBorrow<'_>,
636 T,
637 Diff,
638 &mut Session<T, DCB>,
639 &mut Session<T, ECB<T>>,
640 ) -> usize
641 + 'static,
642 {
643 if let Some((key, val)) = key_val {
647 self.arrangement(&key)
648 .expect("Should have ensured during planning that this arrangement exists.")
649 .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic)
650 } else {
651 let (oks, errs) = self
652 .collection
653 .clone()
654 .expect("Invariant violated: CollectionBundle contains no collection.");
655 let scope = oks.inner.scope();
656 let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope);
657 let (ok_output, ok_stream) = builder.new_output();
658 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
659 let (err_output, err_stream) = builder.new_output();
660 let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
661 let mut input = builder.new_input(oks.inner, Pipeline);
662 builder.build(move |_capabilities| {
663 let mut datums = DatumVec::new();
664 move |_frontiers| {
665 let mut ok_output = ok_output.activate();
666 let mut err_output = err_output.activate();
667 input.for_each(|time, data| {
668 let ok_cap = time.retain(0);
671 let err_cap = time.retain(1);
672 let mut ok_session = ok_output.session_with_builder(&ok_cap);
673 let mut err_session = err_output.session_with_builder(&err_cap);
674 for (v, t, d) in data.drain(..) {
675 logic(
676 &mut datums.borrow_with_limit(&v, max_demand),
677 t,
678 d,
679 &mut ok_session,
680 &mut err_session,
681 );
682 }
683 });
684 }
685 });
686 let errs = errs.concat(err_stream.as_collection());
687 (ok_stream, errs)
688 }
689 }
690
691 fn flat_map_core_fallible<Tr, D, DCB, L>(
702 trace: Arranged<'scope, Tr>,
703 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
704 mut logic: L,
705 refuel: usize,
706 ) -> (
707 Stream<'scope, T, DCB::Container>,
708 Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>,
709 )
710 where
711 Tr: for<'a> TraceReader<
712 Key<'a>: ToDatumIter,
713 Val<'a>: ToDatumIter,
714 Time = T,
715 Diff = mz_repr::Diff,
716 > + Clone
717 + 'static,
718 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
719 D: Data,
720 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
721 L: FnMut(
722 Tr::Key<'_>,
723 Tr::Val<'_>,
724 T,
725 mz_repr::Diff,
726 &mut Session<T, DCB>,
727 &mut Session<T, ECB<T>>,
728 ) -> usize
729 + 'static,
730 {
731 let scope = trace.stream.scope();
732
733 let mut key_con = Tr::KeyContainer::with_capacity(1);
734 if let Some(key) = &key {
735 key_con.push_own(key);
736 }
737 let mode = if key.is_some() { "index" } else { "scan" };
738 let name = format!("ArrangementFlatMap({})", mode);
739
740 let mut builder = OperatorBuilder::new(name, scope.clone());
741 let (ok_output, ok_stream) = builder.new_output();
742 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
743 let (err_output, err_stream) = builder.new_output();
744 let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
745 let mut input = builder.new_input(trace.stream.clone(), Pipeline);
746 let operator_info = builder.operator_info();
747
748 builder.build(move |_capabilities| {
749 let activator = scope.activator_for(operator_info.address);
751 let mut todo = std::collections::VecDeque::new();
753 move |_frontiers| {
754 let key = key_con.get(0);
755 let mut ok_output = ok_output.activate();
756 let mut err_output = err_output.activate();
757
758 input.for_each(|time, data| {
760 let ok_cap = time.retain(0);
763 let err_cap = time.retain(1);
764 for batch in data.iter() {
765 todo.push_back(PendingWork::new(
766 ok_cap.clone(),
767 err_cap.clone(),
768 batch.cursor(),
769 batch.clone(),
770 ));
771 }
772 });
773
774 let mut fuel = refuel;
776 while !todo.is_empty() && fuel > 0 {
777 todo.front_mut().unwrap().do_work(
778 key.as_ref(),
779 &mut logic,
780 &mut fuel,
781 &mut ok_output,
782 &mut err_output,
783 );
784 if fuel > 0 {
785 todo.pop_front();
786 }
787 }
788 if !todo.is_empty() {
790 activator.activate();
791 }
792 }
793 });
794
795 (ok_stream, err_stream)
796 }
797
798 fn flat_map_core_ok<Tr, D, DCB, L>(
804 trace: Arranged<'scope, Tr>,
805 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
806 mut logic: L,
807 refuel: usize,
808 ) -> Stream<'scope, T, DCB::Container>
809 where
810 Tr: for<'a> TraceReader<
811 Key<'a>: ToDatumIter,
812 Val<'a>: ToDatumIter,
813 Time = T,
814 Diff = mz_repr::Diff,
815 > + Clone
816 + 'static,
817 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
818 D: Data,
819 DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
820 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session<T, DCB>) -> usize
821 + 'static,
822 {
823 let scope = trace.stream.scope();
824
825 let mut key_con = Tr::KeyContainer::with_capacity(1);
826 if let Some(key) = &key {
827 key_con.push_own(key);
828 }
829 let mode = if key.is_some() { "index" } else { "scan" };
830 let name = format!("ArrangementFlatMapOk({})", mode);
831
832 let mut builder = OperatorBuilder::new(name, scope.clone());
833 let (ok_output, ok_stream) = builder.new_output();
834 let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
835 let mut input = builder.new_input(trace.stream.clone(), Pipeline);
836 let operator_info = builder.operator_info();
837
838 builder.build(move |_capabilities| {
839 let activator = scope.activator_for(operator_info.address);
840 let mut todo = std::collections::VecDeque::new();
841 move |_frontiers| {
842 let key = key_con.get(0);
843 let mut ok_output = ok_output.activate();
844
845 input.for_each(|time, data| {
846 let cap = time.retain(0);
847 for batch in data.iter() {
848 todo.push_back(PendingWorkOk::new(
849 cap.clone(),
850 batch.cursor(),
851 batch.clone(),
852 ));
853 }
854 });
855
856 let mut fuel = refuel;
857 while !todo.is_empty() && fuel > 0 {
858 todo.front_mut().unwrap().do_work(
859 key.as_ref(),
860 &mut logic,
861 &mut fuel,
862 &mut ok_output,
863 );
864 if fuel > 0 {
865 todo.pop_front();
866 }
867 }
868 if !todo.is_empty() {
869 activator.activate();
870 }
871 }
872 });
873
874 ok_stream
875 }
876
877 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
882 self.arranged.get(key).map(|x| x.clone())
883 }
884}
885
886impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
887 pub fn as_collection_core(
896 &self,
897 mut mfp: MapFilterProject,
898 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
899 until: Antichain<mz_repr::Timestamp>,
900 config_set: &ConfigSet,
901 ) -> (
902 VecCollection<'scope, T, mz_repr::Row, Diff>,
903 VecCollection<'scope, T, DataflowErrorSer, Diff>,
904 ) {
905 mfp.optimize();
906 let mfp_plan = mfp.clone().into_plan().unwrap();
907
908 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
914 true
915 } else {
916 false
917 };
918
919 if mfp_plan.is_identity() && !has_key_val {
920 let key = key_val.map(|(k, _v)| k);
921 return self.as_specific_collection(key.as_deref(), config_set);
922 }
923
924 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
925 mfp.permute_fn(|c| c, max_demand);
926 mfp.optimize();
927 let mfp_plan = mfp.into_plan().unwrap();
928
929 let mut datum_vec = DatumVec::new();
930 let until = std::rc::Rc::new(until);
932
933 let (stream, errors) = self
934 .flat_map::<_, ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>, _>(
935 key_val,
936 max_demand,
937 move |row_datums, time, diff, ok_session, err_session| {
938 let mut row_builder = SharedRow::get();
939 let until = std::rc::Rc::clone(&until);
940 let temp_storage = RowArena::new();
941 let row_iter = row_datums.iter();
942 let mut datums_local = datum_vec.borrow();
943 datums_local.extend(row_iter);
944 let event_time = time.event_time();
945 let mut work: usize = 0;
946 for result in mfp_plan.evaluate(
947 &mut datums_local,
948 &temp_storage,
949 event_time,
950 diff.clone(),
951 move |time| !until.less_equal(time),
952 &mut row_builder,
953 ) {
954 work += 1;
955 match result {
956 Ok((row, event_time, diff)) => {
957 let mut time: T = time.clone();
959 *time.event_time_mut() = event_time;
960 ok_session.give((row, time, diff));
961 }
962 Err((e, event_time, diff)) => {
963 let mut time: T = time.clone();
965 *time.event_time_mut() = event_time;
966 err_session.give((e, time, diff));
967 }
968 }
969 }
970 work
971 },
972 );
973
974 (stream.as_collection(), errors)
975 }
976 pub fn ensure_collections(
977 mut self,
978 collections: AvailableCollections,
979 input_key: Option<Vec<MirScalarExpr>>,
980 input_mfp: MapFilterProject,
981 as_of: Antichain<mz_repr::Timestamp>,
982 until: Antichain<mz_repr::Timestamp>,
983 config_set: &ConfigSet,
984 strategy: ArrangementStrategy,
985 ) -> Self
986 where
987 T: MaybeBucketByTime,
988 {
989 if collections == Default::default() {
990 return self;
991 }
992 for (key, _, _) in collections.arranged.iter() {
1001 soft_assert_or_log!(
1002 !self.arranged.contains_key(key),
1003 "LIR ArrangeBy tried to create an existing arrangement"
1004 );
1005 }
1006
1007 let mut bucketed = false;
1009
1010 let will_create_arrangement = collections
1014 .arranged
1015 .iter()
1016 .any(|(key, _, _)| !self.arranged.contains_key(key));
1017
1018 let form_raw_collection = collections.raw || will_create_arrangement;
1020 if form_raw_collection && self.collection.is_none() {
1021 let (oks, errs) =
1022 self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
1023 let oks = if will_create_arrangement
1027 && matches!(strategy, ArrangementStrategy::TemporalBucketing)
1028 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1029 {
1030 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1031 .get(config_set)
1032 .try_into()
1033 .expect("must fit");
1034 bucketed = true;
1035 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1036 } else {
1037 oks
1038 };
1039 self.collection = Some((oks, errs));
1040 }
1041 for (key, _, thinning) in collections.arranged {
1042 if !self.arranged.contains_key(&key) {
1043 let name = format!("ArrangeBy[{:?}]", key);
1045
1046 let (oks, errs) = self
1047 .collection
1048 .take()
1049 .expect("Collection constructed above");
1050 let oks = if !bucketed
1055 && matches!(strategy, ArrangementStrategy::TemporalBucketing)
1056 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1057 {
1058 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1059 .get(config_set)
1060 .try_into()
1061 .expect("must fit");
1062 bucketed = true;
1063 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1064 } else {
1065 oks
1066 };
1067 let (oks, errs_keyed, passthrough) =
1068 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
1069 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
1070 self.collection = Some((passthrough, errs));
1071 let errs =
1072 errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
1073 &format!("{}-errors", name),
1074 );
1075 self.arranged
1076 .insert(key, ArrangementFlavor::Local(oks, errs));
1077 }
1078 }
1079 self
1080 }
1081
1082 fn arrange_collection(
1093 name: &String,
1094 oks: VecCollection<'scope, T, Row, Diff>,
1095 key: Vec<MirScalarExpr>,
1096 thinning: Vec<usize>,
1097 ) -> (
1098 Arranged<'scope, RowRowAgent<T, Diff>>,
1099 VecCollection<'scope, T, DataflowErrorSer, Diff>,
1100 VecCollection<'scope, T, Row, Diff>,
1101 ) {
1102 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
1107 let (ok_output, ok_stream) = builder.new_output();
1108 let mut ok_output =
1109 OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
1110 let (err_output, err_stream) = builder.new_output();
1111 let mut err_output = OutputBuilder::from(err_output);
1112 let (passthrough_output, passthrough_stream) = builder.new_output();
1113 let mut passthrough_output = OutputBuilder::from(passthrough_output);
1114 let mut input = builder.new_input(oks.inner, Pipeline);
1115 builder.set_notify_for(0, FrontierInterest::Never);
1116 builder.build(move |_capabilities| {
1117 let mut key_buf = Row::default();
1118 let mut val_buf = Row::default();
1119 let mut datums = DatumVec::new();
1120 let mut temp_storage = RowArena::new();
1121 move |_frontiers| {
1122 let mut ok_output = ok_output.activate();
1123 let mut err_output = err_output.activate();
1124 let mut passthrough_output = passthrough_output.activate();
1125 input.for_each(|time, data| {
1126 let mut ok_session = ok_output.session_with_builder(&time);
1127 let mut err_session = err_output.session(&time);
1128 for (row, time, diff) in data.iter() {
1129 temp_storage.clear();
1130 let datums = datums.borrow_with(row);
1131 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
1132 match key_buf.packer().try_extend(key_iter) {
1133 Ok(()) => {
1134 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1135 val_buf.packer().extend(val_datum_iter);
1136 ok_session.give(((&*key_buf, &*val_buf), time, diff));
1137 }
1138 Err(e) => {
1139 err_session.give((e.into(), time.clone(), *diff));
1140 }
1141 }
1142 }
1143 passthrough_output.session(&time).give_container(data);
1144 });
1145 }
1146 });
1147
1148 let oks = ok_stream
1149 .mz_arrange_core::<
1150 _,
1151 Col2ValBatcher<_, _, _, _>,
1152 RowRowBuilder<_, _>,
1153 RowRowSpine<_, _>,
1154 >(
1155 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
1156 columnar_exchange::<Row, Row, T, Diff>,
1157 ),
1158 name
1159 );
1160 (
1161 oks,
1162 err_stream.as_collection(),
1163 passthrough_stream.as_collection(),
1164 )
1165 }
1166}
1167
1168type Session<'a, 'b, T, CB> =
1172 timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability<T>>;
1173
1174type ECB<T> = ConsolidatingContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>;
1179
1180const REFUEL: usize = 1_000_000;
1184
1185struct PendingWork<C>
1186where
1187 C: Cursor,
1188{
1189 ok_capability: Capability<C::Time>,
1191 err_capability: Capability<C::Time>,
1193 cursor: C,
1194 batch: C::Storage,
1195}
1196
1197impl<C> PendingWork<C>
1198where
1199 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1200{
1201 fn new(
1204 ok_capability: Capability<C::Time>,
1205 err_capability: Capability<C::Time>,
1206 cursor: C,
1207 batch: C::Storage,
1208 ) -> Self {
1209 Self {
1210 ok_capability,
1211 err_capability,
1212 cursor,
1213 batch,
1214 }
1215 }
1216 fn do_work<D, DCB, L>(
1219 &mut self,
1220 key: Option<&C::Key<'_>>,
1221 logic: &mut L,
1222 fuel: &mut usize,
1223 ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1224 err_output: &mut OutputBuilderSession<'_, C::Time, ECB<C::Time>>,
1225 ) where
1226 D: Data,
1227 DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1228 L: FnMut(
1229 C::Key<'_>,
1230 C::Val<'_>,
1231 C::Time,
1232 C::Diff,
1233 &mut Session<C::Time, DCB>,
1234 &mut Session<C::Time, ECB<C::Time>>,
1235 ) -> usize
1236 + 'static,
1237 {
1238 let mut ok_session = ok_output.session_with_builder(&self.ok_capability);
1239 let mut err_session = err_output.session_with_builder(&self.err_capability);
1240 walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1241 logic(k, v, t, d, &mut ok_session, &mut err_session)
1242 });
1243 }
1244}
1245
1246struct PendingWorkOk<C>
1249where
1250 C: Cursor,
1251{
1252 capability: Capability<C::Time>,
1253 cursor: C,
1254 batch: C::Storage,
1255}
1256
1257impl<C> PendingWorkOk<C>
1258where
1259 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1260{
1261 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
1262 Self {
1263 capability,
1264 cursor,
1265 batch,
1266 }
1267 }
1268
1269 fn do_work<D, DCB, L>(
1272 &mut self,
1273 key: Option<&C::Key<'_>>,
1274 logic: &mut L,
1275 fuel: &mut usize,
1276 ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1277 ) where
1278 D: Data,
1279 DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1280 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session<C::Time, DCB>) -> usize
1281 + 'static,
1282 {
1283 let mut ok_session = ok_output.session_with_builder(&self.capability);
1284 walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1285 logic(k, v, t, d, &mut ok_session)
1286 });
1287 }
1288}
1289
1290fn walk_cursor<C, F>(
1300 cursor: &mut C,
1301 batch: &C::Storage,
1302 key: Option<&C::Key<'_>>,
1303 fuel: &mut usize,
1304 mut emit: F,
1305) where
1306 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1307 F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize,
1308{
1309 use differential_dataflow::consolidation::consolidate;
1310
1311 let mut work: usize = 0;
1312 let mut buffer = Vec::new();
1313 if let Some(key) = key {
1314 let key = C::KeyContainer::reborrow(*key);
1315 if cursor.get_key(batch).map(|k| k == key) != Some(true) {
1316 cursor.seek_key(batch, key);
1317 }
1318 if cursor.get_key(batch).map(|k| k == key) == Some(true) {
1319 let key = cursor.key(batch);
1320 while let Some(val) = cursor.get_val(batch) {
1321 cursor.map_times(batch, |time, diff| {
1322 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1323 });
1324 consolidate(&mut buffer);
1325 for (time, diff) in buffer.drain(..) {
1326 work += emit(key, val, time, diff);
1327 }
1328 cursor.step_val(batch);
1329 if work >= *fuel {
1330 *fuel = 0;
1331 return;
1332 }
1333 }
1334 }
1335 } else {
1336 while let Some(key) = cursor.get_key(batch) {
1337 while let Some(val) = cursor.get_val(batch) {
1338 cursor.map_times(batch, |time, diff| {
1339 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1340 });
1341 consolidate(&mut buffer);
1342 for (time, diff) in buffer.drain(..) {
1343 work += emit(key, val, time, diff);
1344 }
1345 cursor.step_val(batch);
1346 if work >= *fuel {
1347 *fuel = 0;
1348 return;
1349 }
1350 }
1351 cursor.step_key(batch);
1352 }
1353 }
1354 *fuel -= work;
1355}