1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::{CollectionExt, StreamExt};
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::OutputBuilderSession;
38use timely::dataflow::scopes::Child;
39use timely::dataflow::{Scope, Stream};
40use timely::progress::timestamp::Refines;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::compute_state::ComputeState;
44use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
45use crate::render::errors::ErrorLogger;
46use crate::render::{LinearJoinSpec, RenderTimestamp};
47use crate::row_spine::{DatumSeq, RowRowBuilder};
48use crate::typedefs::{
49 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
50 RowRowSpine,
51};
52
53pub struct Context<S: Scope, T = mz_repr::Timestamp>
64where
65 T: MzTimestamp,
66 S::Timestamp: MzTimestamp + Refines<T>,
67{
68 pub(crate) scope: S,
72 pub debug_name: String,
74 pub dataflow_id: usize,
76 pub export_ids: Vec<GlobalId>,
78 pub as_of_frontier: Antichain<T>,
83 pub until: Antichain<T>,
86 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
88 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
90 pub(super) linear_join_spec: LinearJoinSpec,
92 pub dataflow_expiration: Antichain<T>,
95 pub config_set: Rc<ConfigSet>,
97}
98
99impl<S: Scope> Context<S>
100where
101 S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
102{
103 pub fn for_dataflow_in<Plan>(
105 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
106 scope: S,
107 compute_state: &ComputeState,
108 until: Antichain<mz_repr::Timestamp>,
109 dataflow_expiration: Antichain<mz_repr::Timestamp>,
110 ) -> Self {
111 use mz_ore::collections::CollectionExt as IteratorExt;
112 let dataflow_id = *scope.addr().into_first();
113 let as_of_frontier = dataflow
114 .as_of
115 .clone()
116 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
117
118 let export_ids = dataflow.export_ids().collect();
119
120 let compute_logger = if dataflow.is_transient() {
124 None
125 } else {
126 compute_state.compute_logger.clone()
127 };
128
129 Self {
130 scope,
131 debug_name: dataflow.debug_name.clone(),
132 dataflow_id,
133 export_ids,
134 as_of_frontier,
135 until,
136 bindings: BTreeMap::new(),
137 compute_logger,
138 linear_join_spec: compute_state.linear_join_spec,
139 dataflow_expiration,
140 config_set: Rc::clone(&compute_state.worker_config),
141 }
142 }
143}
144
145impl<S: Scope, T> Context<S, T>
146where
147 T: MzTimestamp,
148 S::Timestamp: MzTimestamp + Refines<T>,
149{
150 pub fn insert_id(
155 &mut self,
156 id: Id,
157 collection: CollectionBundle<S, T>,
158 ) -> Option<CollectionBundle<S, T>> {
159 self.bindings.insert(id, collection)
160 }
161 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
165 self.bindings.remove(&id)
166 }
167 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
169 if !self.bindings.contains_key(&id) {
170 self.bindings.insert(id, collection);
171 } else {
172 let binding = self
173 .bindings
174 .get_mut(&id)
175 .expect("Binding verified to exist");
176 if collection.collection.is_some() {
177 binding.collection = collection.collection;
178 }
179 for (key, flavor) in collection.arranged.into_iter() {
180 binding.arranged.insert(key, flavor);
181 }
182 }
183 }
184 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
186 self.bindings.get(&id).cloned()
187 }
188
189 pub(super) fn error_logger(&self) -> ErrorLogger {
190 ErrorLogger::new(self.debug_name.clone())
191 }
192}
193
194impl<S: Scope, T> Context<S, T>
195where
196 T: MzTimestamp,
197 S::Timestamp: MzTimestamp + Refines<T>,
198{
199 pub fn enter_region<'a>(
201 &self,
202 region: &Child<'a, S, S::Timestamp>,
203 bindings: Option<&std::collections::BTreeSet<Id>>,
204 ) -> Context<Child<'a, S, S::Timestamp>, T> {
205 let bindings = self
206 .bindings
207 .iter()
208 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
209 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
210 .collect();
211
212 Context {
213 scope: region.clone(),
214 debug_name: self.debug_name.clone(),
215 dataflow_id: self.dataflow_id.clone(),
216 export_ids: self.export_ids.clone(),
217 as_of_frontier: self.as_of_frontier.clone(),
218 until: self.until.clone(),
219 compute_logger: self.compute_logger.clone(),
220 linear_join_spec: self.linear_join_spec.clone(),
221 bindings,
222 dataflow_expiration: self.dataflow_expiration.clone(),
223 config_set: Rc::clone(&self.config_set),
224 }
225 }
226}
227
228#[derive(Clone)]
230pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
231where
232 T: MzTimestamp,
233 S::Timestamp: MzTimestamp + Refines<T>,
234{
235 Local(
237 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
238 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
239 ),
240 Trace(
245 GlobalId,
246 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
247 Arranged<S, ErrEnter<T, S::Timestamp>>,
248 ),
249}
250
251impl<S: Scope, T> ArrangementFlavor<S, T>
252where
253 T: MzTimestamp,
254 S::Timestamp: MzTimestamp + Refines<T>,
255{
256 #[deprecated(note = "Use `flat_map` instead.")]
264 pub fn as_collection(
265 &self,
266 ) -> (
267 VecCollection<S, Row, Diff>,
268 VecCollection<S, DataflowError, Diff>,
269 ) {
270 let mut datums = DatumVec::new();
271 let logic = move |k: DatumSeq, v: DatumSeq| {
272 let mut datums_borrow = datums.borrow();
273 datums_borrow.extend(k);
274 datums_borrow.extend(v);
275 SharedRow::pack(&**datums_borrow)
276 };
277 match &self {
278 ArrangementFlavor::Local(oks, errs) => (
279 oks.as_collection(logic),
280 errs.as_collection(|k, &()| k.clone()),
281 ),
282 ArrangementFlavor::Trace(_, oks, errs) => (
283 oks.as_collection(logic),
284 errs.as_collection(|k, &()| k.clone()),
285 ),
286 }
287 }
288
289 pub fn flat_map<D, I, L>(
302 &self,
303 key: Option<&Row>,
304 max_demand: usize,
305 mut logic: L,
306 ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
307 where
308 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
309 D: Data,
310 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
311 {
312 let refuel = 1000000;
316
317 let mut datums = DatumVec::new();
318 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
319 let mut datums_borrow = datums.borrow();
320 datums_borrow.extend(k.to_datum_iter().take(max_demand));
321 let max_demand = max_demand.saturating_sub(datums_borrow.len());
322 datums_borrow.extend(v.to_datum_iter().take(max_demand));
323 logic(&mut datums_borrow, t, d)
324 };
325
326 match &self {
327 ArrangementFlavor::Local(oks, errs) => {
328 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
329 let errs = errs.as_collection(|k, &()| k.clone());
330 (oks, errs)
331 }
332 ArrangementFlavor::Trace(_, oks, errs) => {
333 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
334 let errs = errs.as_collection(|k, &()| k.clone());
335 (oks, errs)
336 }
337 }
338 }
339}
340impl<S: Scope, T> ArrangementFlavor<S, T>
341where
342 T: MzTimestamp,
343 S::Timestamp: MzTimestamp + Refines<T>,
344{
345 pub fn scope(&self) -> S {
347 match self {
348 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
349 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
350 }
351 }
352
353 pub fn enter_region<'a>(
355 &self,
356 region: &Child<'a, S, S::Timestamp>,
357 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
358 match self {
359 ArrangementFlavor::Local(oks, errs) => {
360 ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
361 }
362 ArrangementFlavor::Trace(gid, oks, errs) => {
363 ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
364 }
365 }
366 }
367}
368impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
369where
370 T: MzTimestamp,
371 S::Timestamp: MzTimestamp + Refines<T>,
372{
373 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
375 match self {
376 ArrangementFlavor::Local(oks, errs) => {
377 ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
378 }
379 ArrangementFlavor::Trace(gid, oks, errs) => {
380 ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
381 }
382 }
383 }
384}
385
386#[derive(Clone)]
391pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
392where
393 T: MzTimestamp,
394 S::Timestamp: MzTimestamp + Refines<T>,
395{
396 pub collection: Option<(
397 VecCollection<S, Row, Diff>,
398 VecCollection<S, DataflowError, Diff>,
399 )>,
400 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
401}
402
403impl<S: Scope, T> CollectionBundle<S, T>
404where
405 T: MzTimestamp,
406 S::Timestamp: MzTimestamp + Refines<T>,
407{
408 pub fn from_collections(
410 oks: VecCollection<S, Row, Diff>,
411 errs: VecCollection<S, DataflowError, Diff>,
412 ) -> Self {
413 Self {
414 collection: Some((oks, errs)),
415 arranged: BTreeMap::default(),
416 }
417 }
418
419 pub fn from_expressions(
421 exprs: Vec<MirScalarExpr>,
422 arrangements: ArrangementFlavor<S, T>,
423 ) -> Self {
424 let mut arranged = BTreeMap::new();
425 arranged.insert(exprs, arrangements);
426 Self {
427 collection: None,
428 arranged,
429 }
430 }
431
432 pub fn from_columns<I: IntoIterator<Item = usize>>(
434 columns: I,
435 arrangements: ArrangementFlavor<S, T>,
436 ) -> Self {
437 let mut keys = Vec::new();
438 for column in columns {
439 keys.push(MirScalarExpr::column(column));
440 }
441 Self::from_expressions(keys, arrangements)
442 }
443
444 pub fn scope(&self) -> S {
446 if let Some((oks, _errs)) = &self.collection {
447 oks.inner.scope()
448 } else {
449 self.arranged
450 .values()
451 .next()
452 .expect("Must contain a valid collection")
453 .scope()
454 }
455 }
456
457 pub fn enter_region<'a>(
459 &self,
460 region: &Child<'a, S, S::Timestamp>,
461 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
462 CollectionBundle {
463 collection: self
464 .collection
465 .as_ref()
466 .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
467 arranged: self
468 .arranged
469 .iter()
470 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
471 .collect(),
472 }
473 }
474}
475
476impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
477where
478 T: MzTimestamp,
479 S::Timestamp: MzTimestamp + Refines<T>,
480{
481 pub fn leave_region(&self) -> CollectionBundle<S, T> {
483 CollectionBundle {
484 collection: self
485 .collection
486 .as_ref()
487 .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
488 arranged: self
489 .arranged
490 .iter()
491 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
492 .collect(),
493 }
494 }
495}
496
497impl<S: Scope, T> CollectionBundle<S, T>
498where
499 T: MzTimestamp,
500 S::Timestamp: MzTimestamp + Refines<T>,
501{
502 pub fn as_specific_collection(
515 &self,
516 key: Option<&[MirScalarExpr]>,
517 config_set: &ConfigSet,
518 ) -> (
519 VecCollection<S, Row, Diff>,
520 VecCollection<S, DataflowError, Diff>,
521 ) {
522 match key {
528 None => self
529 .collection
530 .clone()
531 .expect("The unarranged collection doesn't exist."),
532 Some(key) => {
533 let arranged = self.arranged.get(key).unwrap_or_else(|| {
534 panic!("The collection arranged by {:?} doesn't exist.", key)
535 });
536 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
537 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
539 Some((SharedRow::pack(borrow.iter()), t, r))
540 });
541 (ok.as_collection(), err)
542 } else {
543 #[allow(deprecated)]
544 arranged.as_collection()
545 }
546 }
547 }
548 }
549
550 pub fn flat_map<D, I, L>(
566 &self,
567 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
568 max_demand: usize,
569 mut logic: L,
570 ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
571 where
572 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
573 D: Data,
574 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
575 {
576 if let Some((key, val)) = key_val {
580 self.arrangement(&key)
581 .expect("Should have ensured during planning that this arrangement exists.")
582 .flat_map(val.as_ref(), max_demand, logic)
583 } else {
584 use timely::dataflow::operators::Map;
585 let (oks, errs) = self
586 .collection
587 .clone()
588 .expect("Invariant violated: CollectionBundle contains no collection.");
589 let mut datums = DatumVec::new();
590 let oks = oks.inner.flat_map(move |(v, t, d)| {
591 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
592 });
593 (oks, errs)
594 }
595 }
596
597 fn flat_map_core<Tr, D, I, L>(
605 trace: &Arranged<S, Tr>,
606 key: Option<&Tr::KeyOwn>,
607 mut logic: L,
608 refuel: usize,
609 ) -> Stream<S, I::Item>
610 where
611 Tr: for<'a> TraceReader<
612 Key<'a>: ToDatumIter,
613 KeyOwn: PartialEq,
614 Val<'a>: ToDatumIter,
615 Time = S::Timestamp,
616 Diff = mz_repr::Diff,
617 > + Clone
618 + 'static,
619 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
620 D: Data,
621 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
622 {
623 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
624
625 let mut key_con = Tr::KeyContainer::with_capacity(1);
626 if let Some(key) = &key {
627 key_con.push_own(key);
628 }
629 let mode = if key.is_some() { "index" } else { "scan" };
630 let name = format!("ArrangementFlatMap({})", mode);
631 use timely::dataflow::operators::Operator;
632 trace
633 .stream
634 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
635 let activator = trace.stream.scope().activator_for(info.address);
637 let mut todo = std::collections::VecDeque::new();
639 move |input, output| {
640 let key = key_con.get(0);
641 input.for_each(|time, data| {
643 let capability = time.retain();
644 for batch in data.iter() {
645 todo.push_back(PendingWork::new(
647 capability.clone(),
648 batch.cursor(),
649 batch.clone(),
650 ));
651 }
652 });
653
654 let mut fuel = refuel;
656 while !todo.is_empty() && fuel > 0 {
657 todo.front_mut().unwrap().do_work(
658 key.as_ref(),
659 &mut logic,
660 &mut fuel,
661 output,
662 );
663 if fuel > 0 {
664 todo.pop_front();
665 }
666 }
667 if !todo.is_empty() {
669 activator.activate();
670 }
671 }
672 })
673 }
674
675 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
680 self.arranged.get(key).map(|x| x.clone())
681 }
682}
683
684impl<S, T> CollectionBundle<S, T>
685where
686 T: MzTimestamp,
687 S: Scope,
688 S::Timestamp: Refines<T> + RenderTimestamp,
689{
690 pub fn as_collection_core(
699 &self,
700 mut mfp: MapFilterProject,
701 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
702 until: Antichain<mz_repr::Timestamp>,
703 config_set: &ConfigSet,
704 ) -> (
705 VecCollection<S, mz_repr::Row, Diff>,
706 VecCollection<S, DataflowError, Diff>,
707 ) {
708 mfp.optimize();
709 let mfp_plan = mfp.clone().into_plan().unwrap();
710
711 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
717 true
718 } else {
719 false
720 };
721
722 if mfp_plan.is_identity() && !has_key_val {
723 let key = key_val.map(|(k, _v)| k);
724 return self.as_specific_collection(key.as_deref(), config_set);
725 }
726
727 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
728 mfp.permute_fn(|c| c, max_demand);
729 mfp.optimize();
730 let mfp_plan = mfp.into_plan().unwrap();
731
732 let mut datum_vec = DatumVec::new();
733 let until = std::rc::Rc::new(until);
735
736 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
737 let mut row_builder = SharedRow::get();
738 let until = std::rc::Rc::clone(&until);
739 let temp_storage = RowArena::new();
740 let row_iter = row_datums.iter();
741 let mut datums_local = datum_vec.borrow();
742 datums_local.extend(row_iter);
743 let time = time.clone();
744 let event_time = time.event_time();
745 mfp_plan
746 .evaluate(
747 &mut datums_local,
748 &temp_storage,
749 event_time,
750 diff.clone(),
751 move |time| !until.less_equal(time),
752 &mut row_builder,
753 )
754 .map(move |x| match x {
755 Ok((row, event_time, diff)) => {
756 let mut time: S::Timestamp = time.clone();
758 *time.event_time_mut() = event_time;
759 (Ok(row), time, diff)
760 }
761 Err((e, event_time, diff)) => {
762 let mut time: S::Timestamp = time.clone();
764 *time.event_time_mut() = event_time;
765 (Err(e), time, diff)
766 }
767 })
768 });
769
770 use differential_dataflow::AsCollection;
771 let (oks, errs) = stream
772 .as_collection()
773 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
774 "OkErr",
775 |x| x,
776 );
777
778 (oks, errors.concat(&errs))
779 }
780 pub fn ensure_collections(
781 mut self,
782 collections: AvailableCollections,
783 input_key: Option<Vec<MirScalarExpr>>,
784 input_mfp: MapFilterProject,
785 until: Antichain<mz_repr::Timestamp>,
786 config_set: &ConfigSet,
787 ) -> Self {
788 if collections == Default::default() {
789 return self;
790 }
791 for (key, _, _) in collections.arranged.iter() {
800 soft_assert_or_log!(
801 !self.arranged.contains_key(key),
802 "LIR ArrangeBy tried to create an existing arrangement"
803 );
804 }
805
806 let form_raw_collection = collections.raw
808 || collections
809 .arranged
810 .iter()
811 .any(|(key, _, _)| !self.arranged.contains_key(key));
812 if form_raw_collection && self.collection.is_none() {
813 self.collection = Some(self.as_collection_core(
814 input_mfp,
815 input_key.map(|k| (k, None)),
816 until,
817 config_set,
818 ));
819 }
820 for (key, _, thinning) in collections.arranged {
821 if !self.arranged.contains_key(&key) {
822 let name = format!("ArrangeBy[{:?}]", key);
824
825 let (oks, errs) = self
826 .collection
827 .clone()
828 .expect("Collection constructed above");
829 let (oks, errs_keyed) =
830 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
831 let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
832 let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
833 &format!("{}-errors", name),
834 );
835 self.arranged
836 .insert(key, ArrangementFlavor::Local(oks, errs));
837 }
838 }
839 self
840 }
841
842 fn arrange_collection(
849 name: &String,
850 oks: VecCollection<S, Row, Diff>,
851 key: Vec<MirScalarExpr>,
852 thinning: Vec<usize>,
853 ) -> (
854 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
855 VecCollection<S, DataflowError, Diff>,
856 ) {
857 let (oks, errs) = oks
862 .inner
863 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
864 Pipeline,
865 "FormArrangementKey",
866 move |_, _| {
867 Box::new(move |input, ok, err| {
868 let mut key_buf = Row::default();
869 let mut val_buf = Row::default();
870 let mut datums = DatumVec::new();
871 let mut temp_storage = RowArena::new();
872 while let Some((time, data)) = input.next() {
873 let mut ok_session = ok.session_with_builder(&time);
874 let mut err_session = err.session(&time);
875 for (row, time, diff) in data.iter() {
876 temp_storage.clear();
877 let datums = datums.borrow_with(row);
878 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
879 match key_buf.packer().try_extend(key_iter) {
880 Ok(()) => {
881 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
882 val_buf.packer().extend(val_datum_iter);
883 ok_session.give(((&*key_buf, &*val_buf), time, diff));
884 }
885 Err(e) => {
886 err_session.give((e.into(), time.clone(), *diff));
887 }
888 }
889 }
890 }
891 })
892 },
893 );
894 let oks = oks
895 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
896 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
897 );
898 (oks, errs.as_collection())
899 }
900}
901
902struct PendingWork<C>
903where
904 C: Cursor,
905{
906 capability: Capability<C::Time>,
907 cursor: C,
908 batch: C::Storage,
909}
910
911impl<C> PendingWork<C>
912where
913 C: Cursor<KeyOwn: PartialEq + Sized>,
914{
915 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
917 Self {
918 capability,
919 cursor,
920 batch,
921 }
922 }
923 fn do_work<I, D, L>(
925 &mut self,
926 key: Option<&C::Key<'_>>,
927 logic: &mut L,
928 fuel: &mut usize,
929 output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
930 ) where
931 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
932 D: Data,
933 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
934 {
935 use differential_dataflow::consolidation::consolidate;
936
937 let mut work: usize = 0;
939 let mut session = output.session_with_builder(&self.capability);
940 let mut buffer = Vec::new();
941 if let Some(key) = key {
942 let key = C::KeyContainer::reborrow(*key);
943 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
944 self.cursor.seek_key(&self.batch, key);
945 }
946 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
947 let key = self.cursor.key(&self.batch);
948 while let Some(val) = self.cursor.get_val(&self.batch) {
949 self.cursor.map_times(&self.batch, |time, diff| {
950 buffer.push((C::owned_time(time), C::owned_diff(diff)));
951 });
952 consolidate(&mut buffer);
953 for (time, diff) in buffer.drain(..) {
954 for datum in logic(key, val, time, diff) {
955 session.give(datum);
956 work += 1;
957 }
958 }
959 self.cursor.step_val(&self.batch);
960 if work >= *fuel {
961 *fuel = 0;
962 return;
963 }
964 }
965 }
966 } else {
967 while let Some(key) = self.cursor.get_key(&self.batch) {
968 while let Some(val) = self.cursor.get_val(&self.batch) {
969 self.cursor.map_times(&self.batch, |time, diff| {
970 buffer.push((C::owned_time(time), C::owned_diff(diff)));
971 });
972 consolidate(&mut buffer);
973 for (time, diff) in buffer.drain(..) {
974 for datum in logic(key, val, time, diff) {
975 session.give(datum);
976 work += 1;
977 }
978 }
979 self.cursor.step_val(&self.batch);
980 if work >= *fuel {
981 *fuel = 0;
982 return;
983 }
984 }
985 self.cursor.step_key(&self.batch);
986 }
987 }
988 *fuel -= work;
989 }
990}