1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::scopes::Child;
40use timely::dataflow::{Scope, Stream};
41use timely::progress::timestamp::Refines;
42use timely::progress::{Antichain, Timestamp};
43
44use crate::compute_state::ComputeState;
45use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
46use crate::render::errors::ErrorLogger;
47use crate::render::{LinearJoinSpec, RenderTimestamp};
48use crate::row_spine::{DatumSeq, RowRowBuilder};
49use crate::typedefs::{
50 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
51 RowRowSpine,
52};
53
54pub struct Context<S: Scope, T = mz_repr::Timestamp>
65where
66 T: MzTimestamp,
67 S::Timestamp: MzTimestamp + Refines<T>,
68{
69 pub(crate) scope: S,
73 pub debug_name: String,
75 pub dataflow_id: usize,
77 pub export_ids: Vec<GlobalId>,
79 pub as_of_frontier: Antichain<T>,
84 pub until: Antichain<T>,
87 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
89 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
91 pub(super) linear_join_spec: LinearJoinSpec,
93 pub dataflow_expiration: Antichain<T>,
96 pub config_set: Rc<ConfigSet>,
98}
99
100impl<S: Scope> Context<S>
101where
102 S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
103{
104 pub fn for_dataflow_in<Plan>(
106 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
107 scope: S,
108 compute_state: &ComputeState,
109 until: Antichain<mz_repr::Timestamp>,
110 dataflow_expiration: Antichain<mz_repr::Timestamp>,
111 ) -> Self {
112 use mz_ore::collections::CollectionExt as IteratorExt;
113 let dataflow_id = *scope.addr().into_first();
114 let as_of_frontier = dataflow
115 .as_of
116 .clone()
117 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
118
119 let export_ids = dataflow.export_ids().collect();
120
121 let compute_logger = if dataflow.is_transient() {
125 None
126 } else {
127 compute_state.compute_logger.clone()
128 };
129
130 Self {
131 scope,
132 debug_name: dataflow.debug_name.clone(),
133 dataflow_id,
134 export_ids,
135 as_of_frontier,
136 until,
137 bindings: BTreeMap::new(),
138 compute_logger,
139 linear_join_spec: compute_state.linear_join_spec,
140 dataflow_expiration,
141 config_set: Rc::clone(&compute_state.worker_config),
142 }
143 }
144}
145
146impl<S: Scope, T> Context<S, T>
147where
148 T: MzTimestamp,
149 S::Timestamp: MzTimestamp + Refines<T>,
150{
151 pub fn insert_id(
156 &mut self,
157 id: Id,
158 collection: CollectionBundle<S, T>,
159 ) -> Option<CollectionBundle<S, T>> {
160 self.bindings.insert(id, collection)
161 }
162 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
166 self.bindings.remove(&id)
167 }
168 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
170 if !self.bindings.contains_key(&id) {
171 self.bindings.insert(id, collection);
172 } else {
173 let binding = self
174 .bindings
175 .get_mut(&id)
176 .expect("Binding verified to exist");
177 if collection.collection.is_some() {
178 binding.collection = collection.collection;
179 }
180 for (key, flavor) in collection.arranged.into_iter() {
181 binding.arranged.insert(key, flavor);
182 }
183 }
184 }
185 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
187 self.bindings.get(&id).cloned()
188 }
189
190 pub(super) fn error_logger(&self) -> ErrorLogger {
191 ErrorLogger::new(self.debug_name.clone())
192 }
193}
194
195impl<S: Scope, T> Context<S, T>
196where
197 T: MzTimestamp,
198 S::Timestamp: MzTimestamp + Refines<T>,
199{
200 pub fn enter_region<'a>(
202 &self,
203 region: &Child<'a, S, S::Timestamp>,
204 bindings: Option<&std::collections::BTreeSet<Id>>,
205 ) -> Context<Child<'a, S, S::Timestamp>, T> {
206 let bindings = self
207 .bindings
208 .iter()
209 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
210 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
211 .collect();
212
213 Context {
214 scope: region.clone(),
215 debug_name: self.debug_name.clone(),
216 dataflow_id: self.dataflow_id.clone(),
217 export_ids: self.export_ids.clone(),
218 as_of_frontier: self.as_of_frontier.clone(),
219 until: self.until.clone(),
220 compute_logger: self.compute_logger.clone(),
221 linear_join_spec: self.linear_join_spec.clone(),
222 bindings,
223 dataflow_expiration: self.dataflow_expiration.clone(),
224 config_set: Rc::clone(&self.config_set),
225 }
226 }
227}
228
229#[derive(Clone)]
231pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
232where
233 T: MzTimestamp,
234 S::Timestamp: MzTimestamp + Refines<T>,
235{
236 Local(
238 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
239 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
240 ),
241 Trace(
246 GlobalId,
247 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
248 Arranged<S, ErrEnter<T, S::Timestamp>>,
249 ),
250}
251
252impl<S: Scope, T> ArrangementFlavor<S, T>
253where
254 T: MzTimestamp,
255 S::Timestamp: MzTimestamp + Refines<T>,
256{
257 #[deprecated(note = "Use `flat_map` instead.")]
265 pub fn as_collection(
266 &self,
267 ) -> (
268 VecCollection<S, Row, Diff>,
269 VecCollection<S, DataflowError, Diff>,
270 ) {
271 let mut datums = DatumVec::new();
272 let logic = move |k: DatumSeq, v: DatumSeq| {
273 let mut datums_borrow = datums.borrow();
274 datums_borrow.extend(k);
275 datums_borrow.extend(v);
276 SharedRow::pack(&**datums_borrow)
277 };
278 match &self {
279 ArrangementFlavor::Local(oks, errs) => (
280 oks.as_collection(logic),
281 errs.as_collection(|k, &()| k.clone()),
282 ),
283 ArrangementFlavor::Trace(_, oks, errs) => (
284 oks.as_collection(logic),
285 errs.as_collection(|k, &()| k.clone()),
286 ),
287 }
288 }
289
290 pub fn flat_map<D, I, L>(
303 &self,
304 key: Option<&Row>,
305 max_demand: usize,
306 mut logic: L,
307 ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
308 where
309 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
310 D: Data,
311 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
312 {
313 let refuel = 1000000;
317
318 let mut datums = DatumVec::new();
319 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
320 let mut datums_borrow = datums.borrow();
321 datums_borrow.extend(k.to_datum_iter().take(max_demand));
322 let max_demand = max_demand.saturating_sub(datums_borrow.len());
323 datums_borrow.extend(v.to_datum_iter().take(max_demand));
324 logic(&mut datums_borrow, t, d)
325 };
326
327 match &self {
328 ArrangementFlavor::Local(oks, errs) => {
329 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
330 let errs = errs.as_collection(|k, &()| k.clone());
331 (oks, errs)
332 }
333 ArrangementFlavor::Trace(_, oks, errs) => {
334 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
335 let errs = errs.as_collection(|k, &()| k.clone());
336 (oks, errs)
337 }
338 }
339 }
340}
341impl<S: Scope, T> ArrangementFlavor<S, T>
342where
343 T: MzTimestamp,
344 S::Timestamp: MzTimestamp + Refines<T>,
345{
346 pub fn scope(&self) -> S {
348 match self {
349 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
350 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
351 }
352 }
353
354 pub fn enter_region<'a>(
356 &self,
357 region: &Child<'a, S, S::Timestamp>,
358 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
359 match self {
360 ArrangementFlavor::Local(oks, errs) => {
361 ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
362 }
363 ArrangementFlavor::Trace(gid, oks, errs) => {
364 ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
365 }
366 }
367 }
368}
369impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
370where
371 T: MzTimestamp,
372 S::Timestamp: MzTimestamp + Refines<T>,
373{
374 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
376 match self {
377 ArrangementFlavor::Local(oks, errs) => {
378 ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
379 }
380 ArrangementFlavor::Trace(gid, oks, errs) => {
381 ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
382 }
383 }
384 }
385}
386
387#[derive(Clone)]
392pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
393where
394 T: MzTimestamp,
395 S::Timestamp: MzTimestamp + Refines<T>,
396{
397 pub collection: Option<(
398 VecCollection<S, Row, Diff>,
399 VecCollection<S, DataflowError, Diff>,
400 )>,
401 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
402}
403
404impl<S: Scope, T> CollectionBundle<S, T>
405where
406 T: MzTimestamp,
407 S::Timestamp: MzTimestamp + Refines<T>,
408{
409 pub fn from_collections(
411 oks: VecCollection<S, Row, Diff>,
412 errs: VecCollection<S, DataflowError, Diff>,
413 ) -> Self {
414 Self {
415 collection: Some((oks, errs)),
416 arranged: BTreeMap::default(),
417 }
418 }
419
420 pub fn from_expressions(
422 exprs: Vec<MirScalarExpr>,
423 arrangements: ArrangementFlavor<S, T>,
424 ) -> Self {
425 let mut arranged = BTreeMap::new();
426 arranged.insert(exprs, arrangements);
427 Self {
428 collection: None,
429 arranged,
430 }
431 }
432
433 pub fn from_columns<I: IntoIterator<Item = usize>>(
435 columns: I,
436 arrangements: ArrangementFlavor<S, T>,
437 ) -> Self {
438 let mut keys = Vec::new();
439 for column in columns {
440 keys.push(MirScalarExpr::column(column));
441 }
442 Self::from_expressions(keys, arrangements)
443 }
444
445 pub fn scope(&self) -> S {
447 if let Some((oks, _errs)) = &self.collection {
448 oks.inner.scope()
449 } else {
450 self.arranged
451 .values()
452 .next()
453 .expect("Must contain a valid collection")
454 .scope()
455 }
456 }
457
458 pub fn enter_region<'a>(
460 &self,
461 region: &Child<'a, S, S::Timestamp>,
462 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
463 CollectionBundle {
464 collection: self
465 .collection
466 .as_ref()
467 .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
468 arranged: self
469 .arranged
470 .iter()
471 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
472 .collect(),
473 }
474 }
475}
476
477impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
478where
479 T: MzTimestamp,
480 S::Timestamp: MzTimestamp + Refines<T>,
481{
482 pub fn leave_region(&self) -> CollectionBundle<S, T> {
484 CollectionBundle {
485 collection: self
486 .collection
487 .as_ref()
488 .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
489 arranged: self
490 .arranged
491 .iter()
492 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
493 .collect(),
494 }
495 }
496}
497
498impl<S: Scope, T> CollectionBundle<S, T>
499where
500 T: MzTimestamp,
501 S::Timestamp: MzTimestamp + Refines<T>,
502{
503 pub fn as_specific_collection(
516 &self,
517 key: Option<&[MirScalarExpr]>,
518 config_set: &ConfigSet,
519 ) -> (
520 VecCollection<S, Row, Diff>,
521 VecCollection<S, DataflowError, Diff>,
522 ) {
523 match key {
529 None => self
530 .collection
531 .clone()
532 .expect("The unarranged collection doesn't exist."),
533 Some(key) => {
534 let arranged = self.arranged.get(key).unwrap_or_else(|| {
535 panic!("The collection arranged by {:?} doesn't exist.", key)
536 });
537 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
538 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
540 Some((SharedRow::pack(borrow.iter()), t, r))
541 });
542 (ok.as_collection(), err)
543 } else {
544 #[allow(deprecated)]
545 arranged.as_collection()
546 }
547 }
548 }
549 }
550
551 pub fn flat_map<D, I, L>(
567 &self,
568 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
569 max_demand: usize,
570 mut logic: L,
571 ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
572 where
573 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
574 D: Data,
575 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
576 {
577 if let Some((key, val)) = key_val {
581 self.arrangement(&key)
582 .expect("Should have ensured during planning that this arrangement exists.")
583 .flat_map(val.as_ref(), max_demand, logic)
584 } else {
585 use timely::dataflow::operators::Map;
586 let (oks, errs) = self
587 .collection
588 .clone()
589 .expect("Invariant violated: CollectionBundle contains no collection.");
590 let mut datums = DatumVec::new();
591 let oks = oks.inner.flat_map(move |(v, t, d)| {
592 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
593 });
594 (oks, errs)
595 }
596 }
597
598 fn flat_map_core<Tr, D, I, L>(
606 trace: &Arranged<S, Tr>,
607 key: Option<&Tr::KeyOwn>,
608 mut logic: L,
609 refuel: usize,
610 ) -> Stream<S, I::Item>
611 where
612 Tr: for<'a> TraceReader<
613 Key<'a>: ToDatumIter,
614 KeyOwn: PartialEq,
615 Val<'a>: ToDatumIter,
616 Time = S::Timestamp,
617 Diff = mz_repr::Diff,
618 > + Clone
619 + 'static,
620 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
621 D: Data,
622 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
623 {
624 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
625
626 let mut key_con = Tr::KeyContainer::with_capacity(1);
627 if let Some(key) = &key {
628 key_con.push_own(key);
629 }
630 let mode = if key.is_some() { "index" } else { "scan" };
631 let name = format!("ArrangementFlatMap({})", mode);
632 use timely::dataflow::operators::Operator;
633 trace
634 .stream
635 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
636 let activator = trace.stream.scope().activator_for(info.address);
638 let mut todo = std::collections::VecDeque::new();
640 move |input, output| {
641 let key = key_con.get(0);
642 input.for_each(|time, data| {
644 let capability = time.retain();
645 for batch in data.iter() {
646 todo.push_back(PendingWork::new(
648 capability.clone(),
649 batch.cursor(),
650 batch.clone(),
651 ));
652 }
653 });
654
655 let mut fuel = refuel;
657 while !todo.is_empty() && fuel > 0 {
658 todo.front_mut().unwrap().do_work(
659 key.as_ref(),
660 &mut logic,
661 &mut fuel,
662 output,
663 );
664 if fuel > 0 {
665 todo.pop_front();
666 }
667 }
668 if !todo.is_empty() {
670 activator.activate();
671 }
672 }
673 })
674 }
675
676 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
681 self.arranged.get(key).map(|x| x.clone())
682 }
683}
684
685impl<S, T> CollectionBundle<S, T>
686where
687 T: MzTimestamp,
688 S: Scope,
689 S::Timestamp: Refines<T> + RenderTimestamp,
690{
691 pub fn as_collection_core(
700 &self,
701 mut mfp: MapFilterProject,
702 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
703 until: Antichain<mz_repr::Timestamp>,
704 config_set: &ConfigSet,
705 ) -> (
706 VecCollection<S, mz_repr::Row, Diff>,
707 VecCollection<S, DataflowError, Diff>,
708 ) {
709 mfp.optimize();
710 let mfp_plan = mfp.clone().into_plan().unwrap();
711
712 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
718 true
719 } else {
720 false
721 };
722
723 if mfp_plan.is_identity() && !has_key_val {
724 let key = key_val.map(|(k, _v)| k);
725 return self.as_specific_collection(key.as_deref(), config_set);
726 }
727
728 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
729 mfp.permute_fn(|c| c, max_demand);
730 mfp.optimize();
731 let mfp_plan = mfp.into_plan().unwrap();
732
733 let mut datum_vec = DatumVec::new();
734 let until = std::rc::Rc::new(until);
736
737 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
738 let mut row_builder = SharedRow::get();
739 let until = std::rc::Rc::clone(&until);
740 let temp_storage = RowArena::new();
741 let row_iter = row_datums.iter();
742 let mut datums_local = datum_vec.borrow();
743 datums_local.extend(row_iter);
744 let time = time.clone();
745 let event_time = time.event_time();
746 mfp_plan
747 .evaluate(
748 &mut datums_local,
749 &temp_storage,
750 event_time,
751 diff.clone(),
752 move |time| !until.less_equal(time),
753 &mut row_builder,
754 )
755 .map(move |x| match x {
756 Ok((row, event_time, diff)) => {
757 let mut time: S::Timestamp = time.clone();
759 *time.event_time_mut() = event_time;
760 (Ok(row), time, diff)
761 }
762 Err((e, event_time, diff)) => {
763 let mut time: S::Timestamp = time.clone();
765 *time.event_time_mut() = event_time;
766 (Err(e), time, diff)
767 }
768 })
769 });
770
771 use differential_dataflow::AsCollection;
772 let (oks, errs) = stream
773 .as_collection()
774 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
775 "OkErr",
776 |x| x,
777 );
778
779 (oks, errors.concat(&errs))
780 }
781 pub fn ensure_collections(
782 mut self,
783 collections: AvailableCollections,
784 input_key: Option<Vec<MirScalarExpr>>,
785 input_mfp: MapFilterProject,
786 until: Antichain<mz_repr::Timestamp>,
787 config_set: &ConfigSet,
788 ) -> Self {
789 if collections == Default::default() {
790 return self;
791 }
792 for (key, _, _) in collections.arranged.iter() {
801 soft_assert_or_log!(
802 !self.arranged.contains_key(key),
803 "LIR ArrangeBy tried to create an existing arrangement"
804 );
805 }
806
807 let form_raw_collection = collections.raw
809 || collections
810 .arranged
811 .iter()
812 .any(|(key, _, _)| !self.arranged.contains_key(key));
813 if form_raw_collection && self.collection.is_none() {
814 self.collection = Some(self.as_collection_core(
815 input_mfp,
816 input_key.map(|k| (k, None)),
817 until,
818 config_set,
819 ));
820 }
821 for (key, _, thinning) in collections.arranged {
822 if !self.arranged.contains_key(&key) {
823 let name = format!("ArrangeBy[{:?}]", key);
825
826 let (oks, errs) = self
827 .collection
828 .take()
829 .expect("Collection constructed above");
830 let (oks, errs_keyed, passthrough) =
831 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
832 let errs_concat: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
833 self.collection = Some((passthrough, errs));
834 let errs =
835 errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
836 &format!("{}-errors", name),
837 );
838 self.arranged
839 .insert(key, ArrangementFlavor::Local(oks, errs));
840 }
841 }
842 self
843 }
844
845 fn arrange_collection(
856 name: &String,
857 oks: VecCollection<S, Row, Diff>,
858 key: Vec<MirScalarExpr>,
859 thinning: Vec<usize>,
860 ) -> (
861 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
862 VecCollection<S, DataflowError, Diff>,
863 VecCollection<S, Row, Diff>,
864 ) {
865 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
870 let (ok_output, ok_stream) = builder.new_output();
871 let mut ok_output =
872 OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output);
873 let (err_output, err_stream) = builder.new_output();
874 let mut err_output = OutputBuilder::from(err_output);
875 let (passthrough_output, passthrough_stream) = builder.new_output();
876 let mut passthrough_output = OutputBuilder::from(passthrough_output);
877 let mut input = builder.new_input(&oks.inner, Pipeline);
878 builder.set_notify(false);
879 builder.build(move |_capabilities| {
880 let mut key_buf = Row::default();
881 let mut val_buf = Row::default();
882 let mut datums = DatumVec::new();
883 let mut temp_storage = RowArena::new();
884 move |_frontiers| {
885 let mut ok_output = ok_output.activate();
886 let mut err_output = err_output.activate();
887 let mut passthrough_output = passthrough_output.activate();
888 input.for_each(|time, data| {
889 let mut ok_session = ok_output.session_with_builder(&time);
890 let mut err_session = err_output.session(&time);
891 for (row, time, diff) in data.iter() {
892 temp_storage.clear();
893 let datums = datums.borrow_with(row);
894 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
895 match key_buf.packer().try_extend(key_iter) {
896 Ok(()) => {
897 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
898 val_buf.packer().extend(val_datum_iter);
899 ok_session.give(((&*key_buf, &*val_buf), time, diff));
900 }
901 Err(e) => {
902 err_session.give((e.into(), time.clone(), *diff));
903 }
904 }
905 }
906 passthrough_output.session(&time).give_container(data);
907 });
908 }
909 });
910
911 let oks = ok_stream
912 .mz_arrange_core::<
913 _,
914 Col2ValBatcher<_, _, _, _>,
915 RowRowBuilder<_, _>,
916 RowRowSpine<_, _>,
917 >(
918 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
919 columnar_exchange::<Row, Row, S::Timestamp, Diff>,
920 ),
921 name
922 );
923 (
924 oks,
925 err_stream.as_collection(),
926 passthrough_stream.as_collection(),
927 )
928 }
929}
930
931struct PendingWork<C>
932where
933 C: Cursor,
934{
935 capability: Capability<C::Time>,
936 cursor: C,
937 batch: C::Storage,
938}
939
940impl<C> PendingWork<C>
941where
942 C: Cursor<KeyOwn: PartialEq + Sized>,
943{
944 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
946 Self {
947 capability,
948 cursor,
949 batch,
950 }
951 }
952 fn do_work<I, D, L>(
954 &mut self,
955 key: Option<&C::Key<'_>>,
956 logic: &mut L,
957 fuel: &mut usize,
958 output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
959 ) where
960 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
961 D: Data,
962 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
963 {
964 use differential_dataflow::consolidation::consolidate;
965
966 let mut work: usize = 0;
968 let mut session = output.session_with_builder(&self.capability);
969 let mut buffer = Vec::new();
970 if let Some(key) = key {
971 let key = C::KeyContainer::reborrow(*key);
972 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
973 self.cursor.seek_key(&self.batch, key);
974 }
975 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
976 let key = self.cursor.key(&self.batch);
977 while let Some(val) = self.cursor.get_val(&self.batch) {
978 self.cursor.map_times(&self.batch, |time, diff| {
979 buffer.push((C::owned_time(time), C::owned_diff(diff)));
980 });
981 consolidate(&mut buffer);
982 for (time, diff) in buffer.drain(..) {
983 for datum in logic(key, val, time, diff) {
984 session.give(datum);
985 work += 1;
986 }
987 }
988 self.cursor.step_val(&self.batch);
989 if work >= *fuel {
990 *fuel = 0;
991 return;
992 }
993 }
994 }
995 } else {
996 while let Some(key) = self.cursor.get_key(&self.batch) {
997 while let Some(val) = self.cursor.get_val(&self.batch) {
998 self.cursor.map_times(&self.batch, |time, diff| {
999 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1000 });
1001 consolidate(&mut buffer);
1002 for (time, diff) in buffer.drain(..) {
1003 for datum in logic(key, val, time, diff) {
1004 session.give(datum);
1005 work += 1;
1006 }
1007 }
1008 self.cursor.step_val(&self.batch);
1009 if work >= *fuel {
1010 *fuel = 0;
1011 return;
1012 }
1013 }
1014 self.cursor.step_key(&self.batch);
1015 }
1016 }
1017 *fuel -= work;
1018 }
1019}