1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23 ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_COMPUTE_TEMPORAL_BUCKETING,
24 TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::builder::ColumnBuilder;
34use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
35use mz_timely_util::operator::CollectionExt;
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
38use timely::dataflow::operators::Capability;
39use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
40use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
41use timely::dataflow::{Scope, StreamVec};
42use timely::progress::operate::FrontierInterest;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::{DataflowErrorSer, ErrorLogger};
48use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
52};
53
54pub struct Context<'scope, T: RenderTimestamp> {
62 pub(crate) scope: Scope<'scope, T>,
66 pub debug_name: String,
68 pub dataflow_id: usize,
70 pub export_ids: Vec<GlobalId>,
72 pub as_of_frontier: Antichain<mz_repr::Timestamp>,
77 pub until: Antichain<mz_repr::Timestamp>,
80 pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
82 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
84 pub(super) linear_join_spec: LinearJoinSpec,
86 pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
89 pub config_set: Rc<ConfigSet>,
91}
92
93impl<'scope, T: RenderTimestamp> Context<'scope, T> {
94 pub fn for_dataflow_in<Plan>(
96 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
97 scope: Scope<'scope, T>,
98 compute_state: &ComputeState,
99 until: Antichain<mz_repr::Timestamp>,
100 dataflow_expiration: Antichain<mz_repr::Timestamp>,
101 ) -> Self {
102 use mz_ore::collections::CollectionExt as IteratorExt;
103 let dataflow_id = *scope.addr().into_first();
104 let as_of_frontier = dataflow
105 .as_of
106 .clone()
107 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
108
109 let export_ids = dataflow.export_ids().collect();
110
111 let compute_logger = if dataflow.is_transient() {
115 None
116 } else {
117 compute_state.compute_logger.clone()
118 };
119
120 Self {
121 scope,
122 debug_name: dataflow.debug_name.clone(),
123 dataflow_id,
124 export_ids,
125 as_of_frontier,
126 until,
127 bindings: BTreeMap::new(),
128 compute_logger,
129 linear_join_spec: compute_state.linear_join_spec,
130 dataflow_expiration,
131 config_set: Rc::clone(&compute_state.worker_config),
132 }
133 }
134}
135
136impl<'scope, T: RenderTimestamp> Context<'scope, T> {
137 pub fn insert_id(
142 &mut self,
143 id: Id,
144 collection: CollectionBundle<'scope, T>,
145 ) -> Option<CollectionBundle<'scope, T>> {
146 self.bindings.insert(id, collection)
147 }
148 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
152 self.bindings.remove(&id)
153 }
154 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
156 if !self.bindings.contains_key(&id) {
157 self.bindings.insert(id, collection);
158 } else {
159 let binding = self
160 .bindings
161 .get_mut(&id)
162 .expect("Binding verified to exist");
163 if collection.collection.is_some() {
164 binding.collection = collection.collection;
165 }
166 for (key, flavor) in collection.arranged.into_iter() {
167 binding.arranged.insert(key, flavor);
168 }
169 }
170 }
171 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
173 self.bindings.get(&id).cloned()
174 }
175
176 pub(super) fn error_logger(&self) -> ErrorLogger {
177 ErrorLogger::new(self.debug_name.clone())
178 }
179}
180
181impl<'scope, T: RenderTimestamp> Context<'scope, T> {
182 pub fn enter_region<'a>(
184 &self,
185 region: Scope<'a, T>,
186 bindings: Option<&std::collections::BTreeSet<Id>>,
187 ) -> Context<'a, T> {
188 let bindings = self
189 .bindings
190 .iter()
191 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
192 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
193 .collect();
194
195 Context {
196 scope: region,
197 debug_name: self.debug_name.clone(),
198 dataflow_id: self.dataflow_id.clone(),
199 export_ids: self.export_ids.clone(),
200 as_of_frontier: self.as_of_frontier.clone(),
201 until: self.until.clone(),
202 compute_logger: self.compute_logger.clone(),
203 linear_join_spec: self.linear_join_spec.clone(),
204 bindings,
205 dataflow_expiration: self.dataflow_expiration.clone(),
206 config_set: Rc::clone(&self.config_set),
207 }
208 }
209}
210
211#[derive(Clone)]
213pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
214 Local(
216 Arranged<'scope, RowRowAgent<T, Diff>>,
217 Arranged<'scope, ErrAgent<T, Diff>>,
218 ),
219 Trace(
224 GlobalId,
225 Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
226 Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
227 ),
228}
229
230impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
231 #[deprecated(note = "Use `flat_map` instead.")]
239 pub fn as_collection(
240 &self,
241 ) -> (
242 VecCollection<'scope, T, Row, Diff>,
243 VecCollection<'scope, T, DataflowErrorSer, Diff>,
244 ) {
245 let mut datums = DatumVec::new();
246 let logic = move |k: DatumSeq, v: DatumSeq| {
247 let mut datums_borrow = datums.borrow();
248 datums_borrow.extend(k);
249 datums_borrow.extend(v);
250 SharedRow::pack(&**datums_borrow)
251 };
252 match &self {
253 ArrangementFlavor::Local(oks, errs) => (
254 oks.clone().as_collection(logic),
255 errs.clone().as_collection(|k, &()| k.clone()),
256 ),
257 ArrangementFlavor::Trace(_, oks, errs) => (
258 oks.clone().as_collection(logic),
259 errs.clone().as_collection(|k, &()| k.clone()),
260 ),
261 }
262 }
263
264 pub fn flat_map<D, I, L>(
277 &self,
278 key: Option<&Row>,
279 max_demand: usize,
280 mut logic: L,
281 ) -> (
282 StreamVec<'scope, T, I::Item>,
283 VecCollection<'scope, T, DataflowErrorSer, Diff>,
284 )
285 where
286 I: IntoIterator<Item = (D, T, Diff)>,
287 D: Data,
288 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff) -> I + 'static,
289 {
290 let refuel = 1000000;
294
295 let mut datums = DatumVec::new();
296 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
297 let mut datums_borrow = datums.borrow();
298 datums_borrow.extend(k.to_datum_iter().take(max_demand));
299 let max_demand = max_demand.saturating_sub(datums_borrow.len());
300 datums_borrow.extend(v.to_datum_iter().take(max_demand));
301 logic(&mut datums_borrow, t, d)
302 };
303
304 match &self {
305 ArrangementFlavor::Local(oks, errs) => {
306 let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
307 let errs = errs.clone().as_collection(|k, &()| k.clone());
308 (oks, errs)
309 }
310 ArrangementFlavor::Trace(_, oks, errs) => {
311 let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
312 let errs = errs.clone().as_collection(|k, &()| k.clone());
313 (oks, errs)
314 }
315 }
316 }
317}
318impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
319 pub fn scope(&self) -> Scope<'scope, T> {
321 match self {
322 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
323 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
324 }
325 }
326
327 pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
329 match self {
330 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
331 oks.clone().enter_region(region),
332 errs.clone().enter_region(region),
333 ),
334 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
335 *gid,
336 oks.clone().enter_region(region),
337 errs.clone().enter_region(region),
338 ),
339 }
340 }
341}
342impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
343 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
345 match self {
346 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
347 oks.clone().leave_region(outer),
348 errs.clone().leave_region(outer),
349 ),
350 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
351 *gid,
352 oks.clone().leave_region(outer),
353 errs.clone().leave_region(outer),
354 ),
355 }
356 }
357}
358
359#[derive(Clone)]
364pub struct CollectionBundle<'scope, T: RenderTimestamp> {
365 pub collection: Option<(
366 VecCollection<'scope, T, Row, Diff>,
367 VecCollection<'scope, T, DataflowErrorSer, Diff>,
368 )>,
369 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
370}
371
372impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
373 pub fn from_collections(
375 oks: VecCollection<'scope, T, Row, Diff>,
376 errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
377 ) -> Self {
378 Self {
379 collection: Some((oks, errs)),
380 arranged: BTreeMap::default(),
381 }
382 }
383
384 pub fn from_expressions(
386 exprs: Vec<MirScalarExpr>,
387 arrangements: ArrangementFlavor<'scope, T>,
388 ) -> Self {
389 let mut arranged = BTreeMap::new();
390 arranged.insert(exprs, arrangements);
391 Self {
392 collection: None,
393 arranged,
394 }
395 }
396
397 pub fn from_columns<I: IntoIterator<Item = usize>>(
399 columns: I,
400 arrangements: ArrangementFlavor<'scope, T>,
401 ) -> Self {
402 let mut keys = Vec::new();
403 for column in columns {
404 keys.push(MirScalarExpr::column(column));
405 }
406 Self::from_expressions(keys, arrangements)
407 }
408
409 pub fn scope(&self) -> Scope<'scope, T> {
411 if let Some((oks, _errs)) = &self.collection {
412 oks.inner.scope()
413 } else {
414 self.arranged
415 .values()
416 .next()
417 .expect("Must contain a valid collection")
418 .scope()
419 }
420 }
421
422 pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
424 CollectionBundle {
425 collection: self.collection.as_ref().map(|(oks, errs)| {
426 (
427 oks.clone().enter_region(region),
428 errs.clone().enter_region(region),
429 )
430 }),
431 arranged: self
432 .arranged
433 .iter()
434 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
435 .collect(),
436 }
437 }
438}
439
440impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
441 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
443 CollectionBundle {
444 collection: self.collection.as_ref().map(|(oks, errs)| {
445 (
446 oks.clone().leave_region(outer),
447 errs.clone().leave_region(outer),
448 )
449 }),
450 arranged: self
451 .arranged
452 .iter()
453 .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
454 .collect(),
455 }
456 }
457}
458
459impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
460 pub fn as_specific_collection(
473 &self,
474 key: Option<&[MirScalarExpr]>,
475 config_set: &ConfigSet,
476 ) -> (
477 VecCollection<'scope, T, Row, Diff>,
478 VecCollection<'scope, T, DataflowErrorSer, Diff>,
479 ) {
480 match key {
486 None => self
487 .collection
488 .clone()
489 .expect("The unarranged collection doesn't exist."),
490 Some(key) => {
491 let arranged = self.arranged.get(key).unwrap_or_else(|| {
492 panic!("The collection arranged by {:?} doesn't exist.", key)
493 });
494 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
495 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
497 Some((SharedRow::pack(borrow.iter()), t, r))
498 });
499 (ok.as_collection(), err)
500 } else {
501 #[allow(deprecated)]
502 arranged.as_collection()
503 }
504 }
505 }
506 }
507
508 pub fn flat_map<D, I, L>(
524 &self,
525 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
526 max_demand: usize,
527 mut logic: L,
528 ) -> (
529 StreamVec<'scope, T, I::Item>,
530 VecCollection<'scope, T, DataflowErrorSer, Diff>,
531 )
532 where
533 I: IntoIterator<Item = (D, T, Diff)>,
534 D: Data,
535 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static,
536 {
537 if let Some((key, val)) = key_val {
541 self.arrangement(&key)
542 .expect("Should have ensured during planning that this arrangement exists.")
543 .flat_map(val.as_ref(), max_demand, logic)
544 } else {
545 use timely::dataflow::operators::vec::Map;
546 let (oks, errs) = self
547 .collection
548 .clone()
549 .expect("Invariant violated: CollectionBundle contains no collection.");
550 let mut datums = DatumVec::new();
551 let oks = oks.inner.flat_map(move |(v, t, d)| {
552 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
553 });
554 (oks, errs)
555 }
556 }
557
558 fn flat_map_core<Tr, D, I, L>(
566 trace: Arranged<'scope, Tr>,
567 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
568 mut logic: L,
569 refuel: usize,
570 ) -> StreamVec<'scope, T, I::Item>
571 where
572 Tr: for<'a> TraceReader<
573 Key<'a>: ToDatumIter,
574 Val<'a>: ToDatumIter,
575 Time = T,
576 Diff = mz_repr::Diff,
577 > + Clone
578 + 'static,
579 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
580 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
581 D: Data,
582 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff) -> I + 'static,
583 {
584 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
585 let scope = trace.stream.scope();
586
587 let mut key_con = Tr::KeyContainer::with_capacity(1);
588 if let Some(key) = &key {
589 key_con.push_own(key);
590 }
591 let mode = if key.is_some() { "index" } else { "scan" };
592 let name = format!("ArrangementFlatMap({})", mode);
593 use timely::dataflow::operators::Operator;
594 trace
595 .stream
596 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
597 let activator = scope.activator_for(info.address);
599 let mut todo = std::collections::VecDeque::new();
601 move |input, output| {
602 let key = key_con.get(0);
603 input.for_each(|time, data| {
605 let capability = time.retain(0);
606 for batch in data.iter() {
607 todo.push_back(PendingWork::new(
609 capability.clone(),
610 batch.cursor(),
611 batch.clone(),
612 ));
613 }
614 });
615
616 let mut fuel = refuel;
618 while !todo.is_empty() && fuel > 0 {
619 todo.front_mut().unwrap().do_work(
620 key.as_ref(),
621 &mut logic,
622 &mut fuel,
623 output,
624 );
625 if fuel > 0 {
626 todo.pop_front();
627 }
628 }
629 if !todo.is_empty() {
631 activator.activate();
632 }
633 }
634 })
635 }
636
637 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
642 self.arranged.get(key).map(|x| x.clone())
643 }
644}
645
646impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
647 pub fn as_collection_core(
656 &self,
657 mut mfp: MapFilterProject,
658 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
659 until: Antichain<mz_repr::Timestamp>,
660 config_set: &ConfigSet,
661 ) -> (
662 VecCollection<'scope, T, mz_repr::Row, Diff>,
663 VecCollection<'scope, T, DataflowErrorSer, Diff>,
664 ) {
665 mfp.optimize();
666 let mfp_plan = mfp.clone().into_plan().unwrap();
667
668 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
674 true
675 } else {
676 false
677 };
678
679 if mfp_plan.is_identity() && !has_key_val {
680 let key = key_val.map(|(k, _v)| k);
681 return self.as_specific_collection(key.as_deref(), config_set);
682 }
683
684 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
685 mfp.permute_fn(|c| c, max_demand);
686 mfp.optimize();
687 let mfp_plan = mfp.into_plan().unwrap();
688
689 let mut datum_vec = DatumVec::new();
690 let until = std::rc::Rc::new(until);
692
693 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
694 let mut row_builder = SharedRow::get();
695 let until = std::rc::Rc::clone(&until);
696 let temp_storage = RowArena::new();
697 let row_iter = row_datums.iter();
698 let mut datums_local = datum_vec.borrow();
699 datums_local.extend(row_iter);
700 let time = time.clone();
701 let event_time = time.event_time();
702 mfp_plan
703 .evaluate(
704 &mut datums_local,
705 &temp_storage,
706 event_time,
707 diff.clone(),
708 move |time| !until.less_equal(time),
709 &mut row_builder,
710 )
711 .map(move |x| match x {
712 Ok((row, event_time, diff)) => {
713 let mut time: T = time.clone();
715 *time.event_time_mut() = event_time;
716 (Ok(row), time, diff)
717 }
718 Err((e, event_time, diff)) => {
719 let mut time: T = time.clone();
721 *time.event_time_mut() = event_time;
722 (Err(e), time, diff)
723 }
724 })
725 });
726
727 use differential_dataflow::AsCollection;
728 let (oks, errs) = stream
729 .as_collection()
730 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
731 "OkErr",
732 |x| x,
733 );
734
735 (oks, errors.concat(errs))
736 }
737 pub fn ensure_collections(
738 mut self,
739 collections: AvailableCollections,
740 input_key: Option<Vec<MirScalarExpr>>,
741 input_mfp: MapFilterProject,
742 as_of: Antichain<mz_repr::Timestamp>,
743 until: Antichain<mz_repr::Timestamp>,
744 config_set: &ConfigSet,
745 strategy: ArrangementStrategy,
746 ) -> Self
747 where
748 T: MaybeBucketByTime,
749 {
750 if collections == Default::default() {
751 return self;
752 }
753 for (key, _, _) in collections.arranged.iter() {
762 soft_assert_or_log!(
763 !self.arranged.contains_key(key),
764 "LIR ArrangeBy tried to create an existing arrangement"
765 );
766 }
767
768 let mut bucketed = false;
770
771 let will_create_arrangement = collections
775 .arranged
776 .iter()
777 .any(|(key, _, _)| !self.arranged.contains_key(key));
778
779 let form_raw_collection = collections.raw || will_create_arrangement;
781 if form_raw_collection && self.collection.is_none() {
782 let (oks, errs) =
783 self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
784 let oks = if will_create_arrangement
788 && matches!(strategy, ArrangementStrategy::TemporalBucketing)
789 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
790 {
791 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
792 .get(config_set)
793 .try_into()
794 .expect("must fit");
795 bucketed = true;
796 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
797 } else {
798 oks
799 };
800 self.collection = Some((oks, errs));
801 }
802 for (key, _, thinning) in collections.arranged {
803 if !self.arranged.contains_key(&key) {
804 let name = format!("ArrangeBy[{:?}]", key);
806
807 let (oks, errs) = self
808 .collection
809 .take()
810 .expect("Collection constructed above");
811 let oks = if !bucketed
816 && matches!(strategy, ArrangementStrategy::TemporalBucketing)
817 && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
818 {
819 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
820 .get(config_set)
821 .try_into()
822 .expect("must fit");
823 bucketed = true;
824 T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
825 } else {
826 oks
827 };
828 let (oks, errs_keyed, passthrough) =
829 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
830 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
831 self.collection = Some((passthrough, errs));
832 let errs =
833 errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
834 &format!("{}-errors", name),
835 );
836 self.arranged
837 .insert(key, ArrangementFlavor::Local(oks, errs));
838 }
839 }
840 self
841 }
842
843 fn arrange_collection(
854 name: &String,
855 oks: VecCollection<'scope, T, Row, Diff>,
856 key: Vec<MirScalarExpr>,
857 thinning: Vec<usize>,
858 ) -> (
859 Arranged<'scope, RowRowAgent<T, Diff>>,
860 VecCollection<'scope, T, DataflowErrorSer, Diff>,
861 VecCollection<'scope, T, Row, Diff>,
862 ) {
863 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
868 let (ok_output, ok_stream) = builder.new_output();
869 let mut ok_output =
870 OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
871 let (err_output, err_stream) = builder.new_output();
872 let mut err_output = OutputBuilder::from(err_output);
873 let (passthrough_output, passthrough_stream) = builder.new_output();
874 let mut passthrough_output = OutputBuilder::from(passthrough_output);
875 let mut input = builder.new_input(oks.inner, Pipeline);
876 builder.set_notify_for(0, FrontierInterest::Never);
877 builder.build(move |_capabilities| {
878 let mut key_buf = Row::default();
879 let mut val_buf = Row::default();
880 let mut datums = DatumVec::new();
881 let mut temp_storage = RowArena::new();
882 move |_frontiers| {
883 let mut ok_output = ok_output.activate();
884 let mut err_output = err_output.activate();
885 let mut passthrough_output = passthrough_output.activate();
886 input.for_each(|time, data| {
887 let mut ok_session = ok_output.session_with_builder(&time);
888 let mut err_session = err_output.session(&time);
889 for (row, time, diff) in data.iter() {
890 temp_storage.clear();
891 let datums = datums.borrow_with(row);
892 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
893 match key_buf.packer().try_extend(key_iter) {
894 Ok(()) => {
895 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
896 val_buf.packer().extend(val_datum_iter);
897 ok_session.give(((&*key_buf, &*val_buf), time, diff));
898 }
899 Err(e) => {
900 err_session.give((e.into(), time.clone(), *diff));
901 }
902 }
903 }
904 passthrough_output.session(&time).give_container(data);
905 });
906 }
907 });
908
909 let oks = ok_stream
910 .mz_arrange_core::<
911 _,
912 Col2ValBatcher<_, _, _, _>,
913 RowRowBuilder<_, _>,
914 RowRowSpine<_, _>,
915 >(
916 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
917 columnar_exchange::<Row, Row, T, Diff>,
918 ),
919 name
920 );
921 (
922 oks,
923 err_stream.as_collection(),
924 passthrough_stream.as_collection(),
925 )
926 }
927}
928
929struct PendingWork<C>
930where
931 C: Cursor,
932{
933 capability: Capability<C::Time>,
934 cursor: C,
935 batch: C::Storage,
936}
937
938impl<C> PendingWork<C>
939where
940 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
941{
942 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
944 Self {
945 capability,
946 cursor,
947 batch,
948 }
949 }
950 fn do_work<I, D, L>(
952 &mut self,
953 key: Option<&C::Key<'_>>,
954 logic: &mut L,
955 fuel: &mut usize,
956 output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
957 ) where
958 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
959 D: Data,
960 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
961 {
962 use differential_dataflow::consolidation::consolidate;
963
964 let mut work: usize = 0;
966 let mut session = output.session_with_builder(&self.capability);
967 let mut buffer = Vec::new();
968 if let Some(key) = key {
969 let key = C::KeyContainer::reborrow(*key);
970 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
971 self.cursor.seek_key(&self.batch, key);
972 }
973 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
974 let key = self.cursor.key(&self.batch);
975 while let Some(val) = self.cursor.get_val(&self.batch) {
976 self.cursor.map_times(&self.batch, |time, diff| {
977 buffer.push((C::owned_time(time), C::owned_diff(diff)));
978 });
979 consolidate(&mut buffer);
980 for (time, diff) in buffer.drain(..) {
981 for datum in logic(key, val, time, diff) {
982 session.give(datum);
983 work += 1;
984 }
985 }
986 self.cursor.step_val(&self.batch);
987 if work >= *fuel {
988 *fuel = 0;
989 return;
990 }
991 }
992 }
993 } else {
994 while let Some(key) = self.cursor.get_key(&self.batch) {
995 while let Some(val) = self.cursor.get_val(&self.batch) {
996 self.cursor.map_times(&self.batch, |time, diff| {
997 buffer.push((C::owned_time(time), C::owned_diff(diff)));
998 });
999 consolidate(&mut buffer);
1000 for (time, diff) in buffer.drain(..) {
1001 for datum in logic(key, val, time, diff) {
1002 session.give(datum);
1003 work += 1;
1004 }
1005 }
1006 self.cursor.step_val(&self.batch);
1007 if work >= *fuel {
1008 *fuel = 0;
1009 return;
1010 }
1011 }
1012 self.cursor.step_key(&self.batch);
1013 }
1014 }
1015 *fuel -= work;
1016 }
1017}