1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::scopes::Child;
40use timely::dataflow::{Scope, StreamVec};
41use timely::progress::timestamp::Refines;
42use timely::progress::{Antichain, Timestamp};
43
44use crate::compute_state::ComputeState;
45use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
46use crate::render::errors::ErrorLogger;
47use crate::render::{LinearJoinSpec, RenderTimestamp};
48use crate::row_spine::{DatumSeq, RowRowBuilder};
49use crate::typedefs::{
50 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
51 RowRowSpine,
52};
53
54pub struct Context<S: Scope, T = mz_repr::Timestamp>
65where
66 T: MzTimestamp,
67 S::Timestamp: MzTimestamp + Refines<T>,
68{
69 pub(crate) scope: S,
73 pub debug_name: String,
75 pub dataflow_id: usize,
77 pub export_ids: Vec<GlobalId>,
79 pub as_of_frontier: Antichain<T>,
84 pub until: Antichain<T>,
87 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
89 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
91 pub(super) linear_join_spec: LinearJoinSpec,
93 pub dataflow_expiration: Antichain<T>,
96 pub config_set: Rc<ConfigSet>,
98}
99
100impl<S: Scope> Context<S>
101where
102 S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
103{
104 pub fn for_dataflow_in<Plan>(
106 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
107 scope: S,
108 compute_state: &ComputeState,
109 until: Antichain<mz_repr::Timestamp>,
110 dataflow_expiration: Antichain<mz_repr::Timestamp>,
111 ) -> Self {
112 use mz_ore::collections::CollectionExt as IteratorExt;
113 let dataflow_id = *scope.addr().into_first();
114 let as_of_frontier = dataflow
115 .as_of
116 .clone()
117 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
118
119 let export_ids = dataflow.export_ids().collect();
120
121 let compute_logger = if dataflow.is_transient() {
125 None
126 } else {
127 compute_state.compute_logger.clone()
128 };
129
130 Self {
131 scope,
132 debug_name: dataflow.debug_name.clone(),
133 dataflow_id,
134 export_ids,
135 as_of_frontier,
136 until,
137 bindings: BTreeMap::new(),
138 compute_logger,
139 linear_join_spec: compute_state.linear_join_spec,
140 dataflow_expiration,
141 config_set: Rc::clone(&compute_state.worker_config),
142 }
143 }
144}
145
146impl<S: Scope, T> Context<S, T>
147where
148 T: MzTimestamp,
149 S::Timestamp: MzTimestamp + Refines<T>,
150{
151 pub fn insert_id(
156 &mut self,
157 id: Id,
158 collection: CollectionBundle<S, T>,
159 ) -> Option<CollectionBundle<S, T>> {
160 self.bindings.insert(id, collection)
161 }
162 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
166 self.bindings.remove(&id)
167 }
168 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
170 if !self.bindings.contains_key(&id) {
171 self.bindings.insert(id, collection);
172 } else {
173 let binding = self
174 .bindings
175 .get_mut(&id)
176 .expect("Binding verified to exist");
177 if collection.collection.is_some() {
178 binding.collection = collection.collection;
179 }
180 for (key, flavor) in collection.arranged.into_iter() {
181 binding.arranged.insert(key, flavor);
182 }
183 }
184 }
185 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
187 self.bindings.get(&id).cloned()
188 }
189
190 pub(super) fn error_logger(&self) -> ErrorLogger {
191 ErrorLogger::new(self.debug_name.clone())
192 }
193}
194
195impl<S: Scope, T> Context<S, T>
196where
197 T: MzTimestamp,
198 S::Timestamp: MzTimestamp + Refines<T>,
199{
200 pub fn enter_region<'a>(
202 &self,
203 region: &Child<'a, S, S::Timestamp>,
204 bindings: Option<&std::collections::BTreeSet<Id>>,
205 ) -> Context<Child<'a, S, S::Timestamp>, T> {
206 let bindings = self
207 .bindings
208 .iter()
209 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
210 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
211 .collect();
212
213 Context {
214 scope: region.clone(),
215 debug_name: self.debug_name.clone(),
216 dataflow_id: self.dataflow_id.clone(),
217 export_ids: self.export_ids.clone(),
218 as_of_frontier: self.as_of_frontier.clone(),
219 until: self.until.clone(),
220 compute_logger: self.compute_logger.clone(),
221 linear_join_spec: self.linear_join_spec.clone(),
222 bindings,
223 dataflow_expiration: self.dataflow_expiration.clone(),
224 config_set: Rc::clone(&self.config_set),
225 }
226 }
227}
228
229#[derive(Clone)]
231pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
232where
233 T: MzTimestamp,
234 S::Timestamp: MzTimestamp + Refines<T>,
235{
236 Local(
238 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
239 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
240 ),
241 Trace(
246 GlobalId,
247 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
248 Arranged<S, ErrEnter<T, S::Timestamp>>,
249 ),
250}
251
252impl<S: Scope, T> ArrangementFlavor<S, T>
253where
254 T: MzTimestamp,
255 S::Timestamp: MzTimestamp + Refines<T>,
256{
257 #[deprecated(note = "Use `flat_map` instead.")]
265 pub fn as_collection(
266 &self,
267 ) -> (
268 VecCollection<S, Row, Diff>,
269 VecCollection<S, DataflowError, Diff>,
270 ) {
271 let mut datums = DatumVec::new();
272 let logic = move |k: DatumSeq, v: DatumSeq| {
273 let mut datums_borrow = datums.borrow();
274 datums_borrow.extend(k);
275 datums_borrow.extend(v);
276 SharedRow::pack(&**datums_borrow)
277 };
278 match &self {
279 ArrangementFlavor::Local(oks, errs) => (
280 oks.clone().as_collection(logic),
281 errs.clone().as_collection(|k, &()| k.clone()),
282 ),
283 ArrangementFlavor::Trace(_, oks, errs) => (
284 oks.clone().as_collection(logic),
285 errs.clone().as_collection(|k, &()| k.clone()),
286 ),
287 }
288 }
289
290 pub fn flat_map<D, I, L>(
303 &self,
304 key: Option<&Row>,
305 max_demand: usize,
306 mut logic: L,
307 ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
308 where
309 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
310 D: Data,
311 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
312 {
313 let refuel = 1000000;
317
318 let mut datums = DatumVec::new();
319 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
320 let mut datums_borrow = datums.borrow();
321 datums_borrow.extend(k.to_datum_iter().take(max_demand));
322 let max_demand = max_demand.saturating_sub(datums_borrow.len());
323 datums_borrow.extend(v.to_datum_iter().take(max_demand));
324 logic(&mut datums_borrow, t, d)
325 };
326
327 match &self {
328 ArrangementFlavor::Local(oks, errs) => {
329 let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
330 let errs = errs.clone().as_collection(|k, &()| k.clone());
331 (oks, errs)
332 }
333 ArrangementFlavor::Trace(_, oks, errs) => {
334 let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
335 let errs = errs.clone().as_collection(|k, &()| k.clone());
336 (oks, errs)
337 }
338 }
339 }
340}
341impl<S: Scope, T> ArrangementFlavor<S, T>
342where
343 T: MzTimestamp,
344 S::Timestamp: MzTimestamp + Refines<T>,
345{
346 pub fn scope(&self) -> S {
348 match self {
349 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
350 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
351 }
352 }
353
354 pub fn enter_region<'a>(
356 &self,
357 region: &Child<'a, S, S::Timestamp>,
358 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
359 match self {
360 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
361 oks.clone().enter_region(region),
362 errs.clone().enter_region(region),
363 ),
364 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
365 *gid,
366 oks.clone().enter_region(region),
367 errs.clone().enter_region(region),
368 ),
369 }
370 }
371}
372impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
373where
374 T: MzTimestamp,
375 S::Timestamp: MzTimestamp + Refines<T>,
376{
377 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
379 match self {
380 ArrangementFlavor::Local(oks, errs) => {
381 ArrangementFlavor::Local(oks.clone().leave_region(), errs.clone().leave_region())
382 }
383 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
384 *gid,
385 oks.clone().leave_region(),
386 errs.clone().leave_region(),
387 ),
388 }
389 }
390}
391
392#[derive(Clone)]
397pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
398where
399 T: MzTimestamp,
400 S::Timestamp: MzTimestamp + Refines<T>,
401{
402 pub collection: Option<(
403 VecCollection<S, Row, Diff>,
404 VecCollection<S, DataflowError, Diff>,
405 )>,
406 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
407}
408
409impl<S: Scope, T> CollectionBundle<S, T>
410where
411 T: MzTimestamp,
412 S::Timestamp: MzTimestamp + Refines<T>,
413{
414 pub fn from_collections(
416 oks: VecCollection<S, Row, Diff>,
417 errs: VecCollection<S, DataflowError, Diff>,
418 ) -> Self {
419 Self {
420 collection: Some((oks, errs)),
421 arranged: BTreeMap::default(),
422 }
423 }
424
425 pub fn from_expressions(
427 exprs: Vec<MirScalarExpr>,
428 arrangements: ArrangementFlavor<S, T>,
429 ) -> Self {
430 let mut arranged = BTreeMap::new();
431 arranged.insert(exprs, arrangements);
432 Self {
433 collection: None,
434 arranged,
435 }
436 }
437
438 pub fn from_columns<I: IntoIterator<Item = usize>>(
440 columns: I,
441 arrangements: ArrangementFlavor<S, T>,
442 ) -> Self {
443 let mut keys = Vec::new();
444 for column in columns {
445 keys.push(MirScalarExpr::column(column));
446 }
447 Self::from_expressions(keys, arrangements)
448 }
449
450 pub fn scope(&self) -> S {
452 if let Some((oks, _errs)) = &self.collection {
453 oks.inner.scope()
454 } else {
455 self.arranged
456 .values()
457 .next()
458 .expect("Must contain a valid collection")
459 .scope()
460 }
461 }
462
463 pub fn enter_region<'a>(
465 &self,
466 region: &Child<'a, S, S::Timestamp>,
467 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
468 CollectionBundle {
469 collection: self.collection.as_ref().map(|(oks, errs)| {
470 (
471 oks.clone().enter_region(region),
472 errs.clone().enter_region(region),
473 )
474 }),
475 arranged: self
476 .arranged
477 .iter()
478 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
479 .collect(),
480 }
481 }
482}
483
484impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
485where
486 T: MzTimestamp,
487 S::Timestamp: MzTimestamp + Refines<T>,
488{
489 pub fn leave_region(&self) -> CollectionBundle<S, T> {
491 CollectionBundle {
492 collection: self
493 .collection
494 .as_ref()
495 .map(|(oks, errs)| (oks.clone().leave_region(), errs.clone().leave_region())),
496 arranged: self
497 .arranged
498 .iter()
499 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
500 .collect(),
501 }
502 }
503}
504
505impl<S: Scope, T> CollectionBundle<S, T>
506where
507 T: MzTimestamp,
508 S::Timestamp: MzTimestamp + Refines<T>,
509{
510 pub fn as_specific_collection(
523 &self,
524 key: Option<&[MirScalarExpr]>,
525 config_set: &ConfigSet,
526 ) -> (
527 VecCollection<S, Row, Diff>,
528 VecCollection<S, DataflowError, Diff>,
529 ) {
530 match key {
536 None => self
537 .collection
538 .clone()
539 .expect("The unarranged collection doesn't exist."),
540 Some(key) => {
541 let arranged = self.arranged.get(key).unwrap_or_else(|| {
542 panic!("The collection arranged by {:?} doesn't exist.", key)
543 });
544 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
545 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
547 Some((SharedRow::pack(borrow.iter()), t, r))
548 });
549 (ok.as_collection(), err)
550 } else {
551 #[allow(deprecated)]
552 arranged.as_collection()
553 }
554 }
555 }
556 }
557
558 pub fn flat_map<D, I, L>(
574 &self,
575 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
576 max_demand: usize,
577 mut logic: L,
578 ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
579 where
580 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
581 D: Data,
582 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
583 {
584 if let Some((key, val)) = key_val {
588 self.arrangement(&key)
589 .expect("Should have ensured during planning that this arrangement exists.")
590 .flat_map(val.as_ref(), max_demand, logic)
591 } else {
592 use timely::dataflow::operators::vec::Map;
593 let (oks, errs) = self
594 .collection
595 .clone()
596 .expect("Invariant violated: CollectionBundle contains no collection.");
597 let mut datums = DatumVec::new();
598 let oks = oks.inner.flat_map(move |(v, t, d)| {
599 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
600 });
601 (oks, errs)
602 }
603 }
604
605 fn flat_map_core<Tr, D, I, L>(
613 trace: Arranged<S, Tr>,
614 key: Option<&Tr::KeyOwn>,
615 mut logic: L,
616 refuel: usize,
617 ) -> StreamVec<S, I::Item>
618 where
619 Tr: for<'a> TraceReader<
620 Key<'a>: ToDatumIter,
621 KeyOwn: PartialEq,
622 Val<'a>: ToDatumIter,
623 Time = S::Timestamp,
624 Diff = mz_repr::Diff,
625 > + Clone
626 + 'static,
627 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
628 D: Data,
629 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
630 {
631 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
632 let scope = trace.stream.scope();
633
634 let mut key_con = Tr::KeyContainer::with_capacity(1);
635 if let Some(key) = &key {
636 key_con.push_own(key);
637 }
638 let mode = if key.is_some() { "index" } else { "scan" };
639 let name = format!("ArrangementFlatMap({})", mode);
640 use timely::dataflow::operators::Operator;
641 trace
642 .stream
643 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
644 let activator = scope.activator_for(info.address);
646 let mut todo = std::collections::VecDeque::new();
648 move |input, output| {
649 let key = key_con.get(0);
650 input.for_each(|time, data| {
652 let capability = time.retain(0);
653 for batch in data.iter() {
654 todo.push_back(PendingWork::new(
656 capability.clone(),
657 batch.cursor(),
658 batch.clone(),
659 ));
660 }
661 });
662
663 let mut fuel = refuel;
665 while !todo.is_empty() && fuel > 0 {
666 todo.front_mut().unwrap().do_work(
667 key.as_ref(),
668 &mut logic,
669 &mut fuel,
670 output,
671 );
672 if fuel > 0 {
673 todo.pop_front();
674 }
675 }
676 if !todo.is_empty() {
678 activator.activate();
679 }
680 }
681 })
682 }
683
684 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
689 self.arranged.get(key).map(|x| x.clone())
690 }
691}
692
693impl<S, T> CollectionBundle<S, T>
694where
695 T: MzTimestamp,
696 S: Scope,
697 S::Timestamp: Refines<T> + RenderTimestamp,
698{
699 pub fn as_collection_core(
708 &self,
709 mut mfp: MapFilterProject,
710 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
711 until: Antichain<mz_repr::Timestamp>,
712 config_set: &ConfigSet,
713 ) -> (
714 VecCollection<S, mz_repr::Row, Diff>,
715 VecCollection<S, DataflowError, Diff>,
716 ) {
717 mfp.optimize();
718 let mfp_plan = mfp.clone().into_plan().unwrap();
719
720 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
726 true
727 } else {
728 false
729 };
730
731 if mfp_plan.is_identity() && !has_key_val {
732 let key = key_val.map(|(k, _v)| k);
733 return self.as_specific_collection(key.as_deref(), config_set);
734 }
735
736 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
737 mfp.permute_fn(|c| c, max_demand);
738 mfp.optimize();
739 let mfp_plan = mfp.into_plan().unwrap();
740
741 let mut datum_vec = DatumVec::new();
742 let until = std::rc::Rc::new(until);
744
745 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
746 let mut row_builder = SharedRow::get();
747 let until = std::rc::Rc::clone(&until);
748 let temp_storage = RowArena::new();
749 let row_iter = row_datums.iter();
750 let mut datums_local = datum_vec.borrow();
751 datums_local.extend(row_iter);
752 let time = time.clone();
753 let event_time = time.event_time();
754 mfp_plan
755 .evaluate(
756 &mut datums_local,
757 &temp_storage,
758 event_time,
759 diff.clone(),
760 move |time| !until.less_equal(time),
761 &mut row_builder,
762 )
763 .map(move |x| match x {
764 Ok((row, event_time, diff)) => {
765 let mut time: S::Timestamp = time.clone();
767 *time.event_time_mut() = event_time;
768 (Ok(row), time, diff)
769 }
770 Err((e, event_time, diff)) => {
771 let mut time: S::Timestamp = time.clone();
773 *time.event_time_mut() = event_time;
774 (Err(e), time, diff)
775 }
776 })
777 });
778
779 use differential_dataflow::AsCollection;
780 let (oks, errs) = stream
781 .as_collection()
782 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
783 "OkErr",
784 |x| x,
785 );
786
787 (oks, errors.concat(errs))
788 }
789 pub fn ensure_collections(
790 mut self,
791 collections: AvailableCollections,
792 input_key: Option<Vec<MirScalarExpr>>,
793 input_mfp: MapFilterProject,
794 until: Antichain<mz_repr::Timestamp>,
795 config_set: &ConfigSet,
796 ) -> Self {
797 if collections == Default::default() {
798 return self;
799 }
800 for (key, _, _) in collections.arranged.iter() {
809 soft_assert_or_log!(
810 !self.arranged.contains_key(key),
811 "LIR ArrangeBy tried to create an existing arrangement"
812 );
813 }
814
815 let form_raw_collection = collections.raw
817 || collections
818 .arranged
819 .iter()
820 .any(|(key, _, _)| !self.arranged.contains_key(key));
821 if form_raw_collection && self.collection.is_none() {
822 self.collection = Some(self.as_collection_core(
823 input_mfp,
824 input_key.map(|k| (k, None)),
825 until,
826 config_set,
827 ));
828 }
829 for (key, _, thinning) in collections.arranged {
830 if !self.arranged.contains_key(&key) {
831 let name = format!("ArrangeBy[{:?}]", key);
833
834 let (oks, errs) = self
835 .collection
836 .take()
837 .expect("Collection constructed above");
838 let (oks, errs_keyed, passthrough) =
839 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
840 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
841 self.collection = Some((passthrough, errs));
842 let errs =
843 errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
844 &format!("{}-errors", name),
845 );
846 self.arranged
847 .insert(key, ArrangementFlavor::Local(oks, errs));
848 }
849 }
850 self
851 }
852
853 fn arrange_collection(
864 name: &String,
865 oks: VecCollection<S, Row, Diff>,
866 key: Vec<MirScalarExpr>,
867 thinning: Vec<usize>,
868 ) -> (
869 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
870 VecCollection<S, DataflowError, Diff>,
871 VecCollection<S, Row, Diff>,
872 ) {
873 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
878 let (ok_output, ok_stream) = builder.new_output();
879 let mut ok_output =
880 OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output);
881 let (err_output, err_stream) = builder.new_output();
882 let mut err_output = OutputBuilder::from(err_output);
883 let (passthrough_output, passthrough_stream) = builder.new_output();
884 let mut passthrough_output = OutputBuilder::from(passthrough_output);
885 let mut input = builder.new_input(oks.inner, Pipeline);
886 builder.set_notify(false);
887 builder.build(move |_capabilities| {
888 let mut key_buf = Row::default();
889 let mut val_buf = Row::default();
890 let mut datums = DatumVec::new();
891 let mut temp_storage = RowArena::new();
892 move |_frontiers| {
893 let mut ok_output = ok_output.activate();
894 let mut err_output = err_output.activate();
895 let mut passthrough_output = passthrough_output.activate();
896 input.for_each(|time, data| {
897 let mut ok_session = ok_output.session_with_builder(&time);
898 let mut err_session = err_output.session(&time);
899 for (row, time, diff) in data.iter() {
900 temp_storage.clear();
901 let datums = datums.borrow_with(row);
902 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
903 match key_buf.packer().try_extend(key_iter) {
904 Ok(()) => {
905 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
906 val_buf.packer().extend(val_datum_iter);
907 ok_session.give(((&*key_buf, &*val_buf), time, diff));
908 }
909 Err(e) => {
910 err_session.give((e.into(), time.clone(), *diff));
911 }
912 }
913 }
914 passthrough_output.session(&time).give_container(data);
915 });
916 }
917 });
918
919 let oks = ok_stream
920 .mz_arrange_core::<
921 _,
922 Col2ValBatcher<_, _, _, _>,
923 RowRowBuilder<_, _>,
924 RowRowSpine<_, _>,
925 >(
926 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
927 columnar_exchange::<Row, Row, S::Timestamp, Diff>,
928 ),
929 name
930 );
931 (
932 oks,
933 err_stream.as_collection(),
934 passthrough_stream.as_collection(),
935 )
936 }
937}
938
939struct PendingWork<C>
940where
941 C: Cursor,
942{
943 capability: Capability<C::Time>,
944 cursor: C,
945 batch: C::Storage,
946}
947
948impl<C> PendingWork<C>
949where
950 C: Cursor<KeyOwn: PartialEq + Sized>,
951{
952 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
954 Self {
955 capability,
956 cursor,
957 batch,
958 }
959 }
960 fn do_work<I, D, L>(
962 &mut self,
963 key: Option<&C::Key<'_>>,
964 logic: &mut L,
965 fuel: &mut usize,
966 output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
967 ) where
968 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
969 D: Data,
970 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
971 {
972 use differential_dataflow::consolidation::consolidate;
973
974 let mut work: usize = 0;
976 let mut session = output.session_with_builder(&self.capability);
977 let mut buffer = Vec::new();
978 if let Some(key) = key {
979 let key = C::KeyContainer::reborrow(*key);
980 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
981 self.cursor.seek_key(&self.batch, key);
982 }
983 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
984 let key = self.cursor.key(&self.batch);
985 while let Some(val) = self.cursor.get_val(&self.batch) {
986 self.cursor.map_times(&self.batch, |time, diff| {
987 buffer.push((C::owned_time(time), C::owned_diff(diff)));
988 });
989 consolidate(&mut buffer);
990 for (time, diff) in buffer.drain(..) {
991 for datum in logic(key, val, time, diff) {
992 session.give(datum);
993 work += 1;
994 }
995 }
996 self.cursor.step_val(&self.batch);
997 if work >= *fuel {
998 *fuel = 0;
999 return;
1000 }
1001 }
1002 }
1003 } else {
1004 while let Some(key) = self.cursor.get_key(&self.batch) {
1005 while let Some(val) = self.cursor.get_val(&self.batch) {
1006 self.cursor.map_times(&self.batch, |time, diff| {
1007 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1008 });
1009 consolidate(&mut buffer);
1010 for (time, diff) in buffer.drain(..) {
1011 for datum in logic(key, val, time, diff) {
1012 session.give(datum);
1013 work += 1;
1014 }
1015 }
1016 self.cursor.step_val(&self.batch);
1017 if work >= *fuel {
1018 *fuel = 0;
1019 return;
1020 }
1021 }
1022 self.cursor.step_key(&self.batch);
1023 }
1024 }
1025 *fuel -= work;
1026 }
1027}