1use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::rc::Rc;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::operators::arrange::Arranged;
19use differential_dataflow::trace::implementations::BatchContainer;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, Collection, Data};
22use mz_compute_types::dataflows::DataflowDescription;
23use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
24use mz_compute_types::plan::AvailableCollections;
25use mz_dyncfg::ConfigSet;
26use mz_expr::{Id, MapFilterProject, MirScalarExpr};
27use mz_ore::soft_assert_or_log;
28use mz_repr::fixed_length::ToDatumIter;
29use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
30use mz_storage_types::controller::CollectionMetadata;
31use mz_storage_types::errors::DataflowError;
32use mz_timely_util::builder_async::{ButtonHandle, PressOnDropButton};
33use mz_timely_util::columnar::builder::ColumnBuilder;
34use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
35use mz_timely_util::operator::{CollectionExt, StreamExt};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
38use timely::dataflow::operators::Capability;
39use timely::dataflow::operators::generic::OutputHandleCore;
40use timely::dataflow::scopes::Child;
41use timely::dataflow::{Scope, Stream};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::ErrorLogger;
48use crate::render::{LinearJoinSpec, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
52 RowRowSpine,
53};
54
55pub struct Context<S: Scope, T = mz_repr::Timestamp>
66where
67 T: MzTimestamp,
68 S::Timestamp: MzTimestamp + Refines<T>,
69{
70 pub(crate) scope: S,
74 pub debug_name: String,
76 pub dataflow_id: usize,
78 pub export_ids: Vec<GlobalId>,
80 pub as_of_frontier: Antichain<T>,
85 pub until: Antichain<T>,
88 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
90 pub(super) shutdown_probe: ShutdownProbe,
92 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
94 pub(super) linear_join_spec: LinearJoinSpec,
96 pub dataflow_expiration: Antichain<T>,
99 pub config_set: Rc<ConfigSet>,
101}
102
103impl<S: Scope> Context<S>
104where
105 S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
106{
107 pub fn for_dataflow_in<Plan>(
109 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
110 scope: S,
111 compute_state: &ComputeState,
112 until: Antichain<mz_repr::Timestamp>,
113 dataflow_expiration: Antichain<mz_repr::Timestamp>,
114 ) -> Self {
115 use mz_ore::collections::CollectionExt as IteratorExt;
116 let dataflow_id = *scope.addr().into_first();
117 let as_of_frontier = dataflow
118 .as_of
119 .clone()
120 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
121
122 let export_ids = dataflow.export_ids().collect();
123
124 let compute_logger = if dataflow.is_transient() {
128 None
129 } else {
130 compute_state.compute_logger.clone()
131 };
132
133 Self {
134 scope,
135 debug_name: dataflow.debug_name.clone(),
136 dataflow_id,
137 export_ids,
138 as_of_frontier,
139 until,
140 bindings: BTreeMap::new(),
141 shutdown_probe: Default::default(),
142 compute_logger,
143 linear_join_spec: compute_state.linear_join_spec,
144 dataflow_expiration,
145 config_set: Rc::clone(&compute_state.worker_config),
146 }
147 }
148}
149
150impl<S: Scope, T> Context<S, T>
151where
152 T: MzTimestamp,
153 S::Timestamp: MzTimestamp + Refines<T>,
154{
155 pub fn insert_id(
160 &mut self,
161 id: Id,
162 collection: CollectionBundle<S, T>,
163 ) -> Option<CollectionBundle<S, T>> {
164 self.bindings.insert(id, collection)
165 }
166 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
170 self.bindings.remove(&id)
171 }
172 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
174 if !self.bindings.contains_key(&id) {
175 self.bindings.insert(id, collection);
176 } else {
177 let binding = self
178 .bindings
179 .get_mut(&id)
180 .expect("Binding verified to exist");
181 if collection.collection.is_some() {
182 binding.collection = collection.collection;
183 }
184 for (key, flavor) in collection.arranged.into_iter() {
185 binding.arranged.insert(key, flavor);
186 }
187 }
188 }
189 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
191 self.bindings.get(&id).cloned()
192 }
193
194 pub(super) fn error_logger(&self) -> ErrorLogger {
195 ErrorLogger::new(self.shutdown_probe.clone(), self.debug_name.clone())
196 }
197}
198
199impl<S: Scope, T> Context<S, T>
200where
201 T: MzTimestamp,
202 S::Timestamp: MzTimestamp + Refines<T>,
203{
204 pub fn enter_region<'a>(
206 &self,
207 region: &Child<'a, S, S::Timestamp>,
208 bindings: Option<&std::collections::BTreeSet<Id>>,
209 ) -> Context<Child<'a, S, S::Timestamp>, T> {
210 let bindings = self
211 .bindings
212 .iter()
213 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
214 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
215 .collect();
216
217 Context {
218 scope: region.clone(),
219 debug_name: self.debug_name.clone(),
220 dataflow_id: self.dataflow_id.clone(),
221 export_ids: self.export_ids.clone(),
222 as_of_frontier: self.as_of_frontier.clone(),
223 until: self.until.clone(),
224 shutdown_probe: self.shutdown_probe.clone(),
225 compute_logger: self.compute_logger.clone(),
226 linear_join_spec: self.linear_join_spec.clone(),
227 bindings,
228 dataflow_expiration: self.dataflow_expiration.clone(),
229 config_set: Rc::clone(&self.config_set),
230 }
231 }
232}
233
234pub(super) fn shutdown_token<G: Scope>(scope: &mut G) -> (ShutdownProbe, PressOnDropButton) {
235 let (button_handle, button) = mz_timely_util::builder_async::button(scope, scope.addr());
236 let probe = ShutdownProbe::new(button_handle);
237 let token = button.press_on_drop();
238 (probe, token)
239}
240
241#[derive(Clone, Default)]
247pub(super) struct ShutdownProbe(Option<Rc<RefCell<ButtonHandle>>>);
248
249impl ShutdownProbe {
250 fn new(button: ButtonHandle) -> Self {
252 Self(Some(Rc::new(RefCell::new(button))))
253 }
254
255 pub(super) fn in_shutdown(&self) -> bool {
260 match &self.0 {
261 Some(t) => t.borrow_mut().all_pressed(),
262 None => false,
263 }
264 }
265
266 pub(super) fn in_local_shutdown(&self) -> bool {
271 match &self.0 {
272 Some(t) => t.borrow_mut().local_pressed(),
273 None => false,
274 }
275 }
276}
277
278#[derive(Clone)]
280pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
281where
282 T: MzTimestamp,
283 S::Timestamp: MzTimestamp + Refines<T>,
284{
285 Local(
287 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
288 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
289 ),
290 Trace(
295 GlobalId,
296 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
297 Arranged<S, ErrEnter<T, S::Timestamp>>,
298 ),
299}
300
301impl<S: Scope, T> ArrangementFlavor<S, T>
302where
303 T: MzTimestamp,
304 S::Timestamp: MzTimestamp + Refines<T>,
305{
306 #[deprecated(note = "Use `flat_map` instead.")]
314 pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
315 let mut datums = DatumVec::new();
316 let logic = move |k: DatumSeq, v: DatumSeq| {
317 let mut datums_borrow = datums.borrow();
318 datums_borrow.extend(k);
319 datums_borrow.extend(v);
320 SharedRow::pack(&**datums_borrow)
321 };
322 match &self {
323 ArrangementFlavor::Local(oks, errs) => (
324 oks.as_collection(logic),
325 errs.as_collection(|k, &()| k.clone()),
326 ),
327 ArrangementFlavor::Trace(_, oks, errs) => (
328 oks.as_collection(logic),
329 errs.as_collection(|k, &()| k.clone()),
330 ),
331 }
332 }
333
334 pub fn flat_map<D, I, L>(
347 &self,
348 key: Option<&Row>,
349 max_demand: usize,
350 mut logic: L,
351 ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
352 where
353 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
354 D: Data,
355 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
356 {
357 let refuel = 1000000;
361
362 let mut datums = DatumVec::new();
363 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
364 let mut datums_borrow = datums.borrow();
365 datums_borrow.extend(k.to_datum_iter().take(max_demand));
366 let max_demand = max_demand.saturating_sub(datums_borrow.len());
367 datums_borrow.extend(v.to_datum_iter().take(max_demand));
368 logic(&mut datums_borrow, t, d)
369 };
370
371 match &self {
372 ArrangementFlavor::Local(oks, errs) => {
373 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
374 let errs = errs.as_collection(|k, &()| k.clone());
375 (oks, errs)
376 }
377 ArrangementFlavor::Trace(_, oks, errs) => {
378 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
379 let errs = errs.as_collection(|k, &()| k.clone());
380 (oks, errs)
381 }
382 }
383 }
384}
385impl<S: Scope, T> ArrangementFlavor<S, T>
386where
387 T: MzTimestamp,
388 S::Timestamp: MzTimestamp + Refines<T>,
389{
390 pub fn scope(&self) -> S {
392 match self {
393 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
394 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
395 }
396 }
397
398 pub fn enter_region<'a>(
400 &self,
401 region: &Child<'a, S, S::Timestamp>,
402 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
403 match self {
404 ArrangementFlavor::Local(oks, errs) => {
405 ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
406 }
407 ArrangementFlavor::Trace(gid, oks, errs) => {
408 ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
409 }
410 }
411 }
412}
413impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
414where
415 T: MzTimestamp,
416 S::Timestamp: MzTimestamp + Refines<T>,
417{
418 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
420 match self {
421 ArrangementFlavor::Local(oks, errs) => {
422 ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
423 }
424 ArrangementFlavor::Trace(gid, oks, errs) => {
425 ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
426 }
427 }
428 }
429}
430
431#[derive(Clone)]
436pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
437where
438 T: MzTimestamp,
439 S::Timestamp: MzTimestamp + Refines<T>,
440{
441 pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
442 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
443}
444
445impl<S: Scope, T> CollectionBundle<S, T>
446where
447 T: MzTimestamp,
448 S::Timestamp: MzTimestamp + Refines<T>,
449{
450 pub fn from_collections(
452 oks: Collection<S, Row, Diff>,
453 errs: Collection<S, DataflowError, Diff>,
454 ) -> Self {
455 Self {
456 collection: Some((oks, errs)),
457 arranged: BTreeMap::default(),
458 }
459 }
460
461 pub fn from_expressions(
463 exprs: Vec<MirScalarExpr>,
464 arrangements: ArrangementFlavor<S, T>,
465 ) -> Self {
466 let mut arranged = BTreeMap::new();
467 arranged.insert(exprs, arrangements);
468 Self {
469 collection: None,
470 arranged,
471 }
472 }
473
474 pub fn from_columns<I: IntoIterator<Item = usize>>(
476 columns: I,
477 arrangements: ArrangementFlavor<S, T>,
478 ) -> Self {
479 let mut keys = Vec::new();
480 for column in columns {
481 keys.push(MirScalarExpr::column(column));
482 }
483 Self::from_expressions(keys, arrangements)
484 }
485
486 pub fn scope(&self) -> S {
488 if let Some((oks, _errs)) = &self.collection {
489 oks.inner.scope()
490 } else {
491 self.arranged
492 .values()
493 .next()
494 .expect("Must contain a valid collection")
495 .scope()
496 }
497 }
498
499 pub fn enter_region<'a>(
501 &self,
502 region: &Child<'a, S, S::Timestamp>,
503 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
504 CollectionBundle {
505 collection: self
506 .collection
507 .as_ref()
508 .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
509 arranged: self
510 .arranged
511 .iter()
512 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
513 .collect(),
514 }
515 }
516}
517
518impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
519where
520 T: MzTimestamp,
521 S::Timestamp: MzTimestamp + Refines<T>,
522{
523 pub fn leave_region(&self) -> CollectionBundle<S, T> {
525 CollectionBundle {
526 collection: self
527 .collection
528 .as_ref()
529 .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
530 arranged: self
531 .arranged
532 .iter()
533 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
534 .collect(),
535 }
536 }
537}
538
539impl<S: Scope, T> CollectionBundle<S, T>
540where
541 T: MzTimestamp,
542 S::Timestamp: MzTimestamp + Refines<T>,
543{
544 pub fn as_specific_collection(
557 &self,
558 key: Option<&[MirScalarExpr]>,
559 config_set: &ConfigSet,
560 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
561 match key {
567 None => self
568 .collection
569 .clone()
570 .expect("The unarranged collection doesn't exist."),
571 Some(key) => {
572 let arranged = self.arranged.get(key).unwrap_or_else(|| {
573 panic!("The collection arranged by {:?} doesn't exist.", key)
574 });
575 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
576 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
578 Some((SharedRow::pack(borrow.iter()), t, r))
579 });
580 (ok.as_collection(), err)
581 } else {
582 #[allow(deprecated)]
583 arranged.as_collection()
584 }
585 }
586 }
587 }
588
589 pub fn flat_map<D, I, L>(
605 &self,
606 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
607 max_demand: usize,
608 mut logic: L,
609 ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
610 where
611 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
612 D: Data,
613 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
614 {
615 if let Some((key, val)) = key_val {
619 self.arrangement(&key)
620 .expect("Should have ensured during planning that this arrangement exists.")
621 .flat_map(val.as_ref(), max_demand, logic)
622 } else {
623 use timely::dataflow::operators::Map;
624 let (oks, errs) = self
625 .collection
626 .clone()
627 .expect("Invariant violated: CollectionBundle contains no collection.");
628 let mut datums = DatumVec::new();
629 let oks = oks.inner.flat_map(move |(v, t, d)| {
630 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
631 });
632 (oks, errs)
633 }
634 }
635
636 fn flat_map_core<Tr, D, I, L>(
644 trace: &Arranged<S, Tr>,
645 key: Option<&Tr::KeyOwn>,
646 mut logic: L,
647 refuel: usize,
648 ) -> Stream<S, I::Item>
649 where
650 Tr: for<'a> TraceReader<
651 Key<'a>: ToDatumIter,
652 KeyOwn: PartialEq,
653 Val<'a>: ToDatumIter,
654 Time = S::Timestamp,
655 Diff = mz_repr::Diff,
656 > + Clone
657 + 'static,
658 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
659 D: Data,
660 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
661 {
662 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
663
664 let mut key_con = Tr::KeyContainer::with_capacity(1);
665 if let Some(key) = &key {
666 key_con.push_own(key);
667 }
668 let mode = if key.is_some() { "index" } else { "scan" };
669 let name = format!("ArrangementFlatMap({})", mode);
670 use timely::dataflow::operators::Operator;
671 trace
672 .stream
673 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
674 let activator = trace.stream.scope().activator_for(info.address);
676 let mut todo = std::collections::VecDeque::new();
678 move |input, output| {
679 let key = key_con.get(0);
680 input.for_each(|time, data| {
682 let capability = time.retain();
683 for batch in data.iter() {
684 todo.push_back(PendingWork::new(
686 capability.clone(),
687 batch.cursor(),
688 batch.clone(),
689 ));
690 }
691 });
692
693 let mut fuel = refuel;
695 while !todo.is_empty() && fuel > 0 {
696 todo.front_mut().unwrap().do_work(
697 key.as_ref(),
698 &mut logic,
699 &mut fuel,
700 output,
701 );
702 if fuel > 0 {
703 todo.pop_front();
704 }
705 }
706 if !todo.is_empty() {
708 activator.activate();
709 }
710 }
711 })
712 }
713
714 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
719 self.arranged.get(key).map(|x| x.clone())
720 }
721}
722
723impl<S, T> CollectionBundle<S, T>
724where
725 T: MzTimestamp,
726 S: Scope,
727 S::Timestamp: Refines<T> + RenderTimestamp,
728{
729 pub fn as_collection_core(
738 &self,
739 mut mfp: MapFilterProject,
740 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
741 until: Antichain<mz_repr::Timestamp>,
742 config_set: &ConfigSet,
743 ) -> (
744 Collection<S, mz_repr::Row, Diff>,
745 Collection<S, DataflowError, Diff>,
746 ) {
747 mfp.optimize();
748 let mfp_plan = mfp.clone().into_plan().unwrap();
749
750 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
756 true
757 } else {
758 false
759 };
760
761 if mfp_plan.is_identity() && !has_key_val {
762 let key = key_val.map(|(k, _v)| k);
763 return self.as_specific_collection(key.as_deref(), config_set);
764 }
765
766 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
767 mfp.permute_fn(|c| c, max_demand);
768 mfp.optimize();
769 let mfp_plan = mfp.into_plan().unwrap();
770
771 let mut datum_vec = DatumVec::new();
772 let until = std::rc::Rc::new(until);
774
775 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
776 let mut row_builder = SharedRow::get();
777 let until = std::rc::Rc::clone(&until);
778 let temp_storage = RowArena::new();
779 let row_iter = row_datums.iter();
780 let mut datums_local = datum_vec.borrow();
781 datums_local.extend(row_iter);
782 let time = time.clone();
783 let event_time = time.event_time();
784 mfp_plan
785 .evaluate(
786 &mut datums_local,
787 &temp_storage,
788 event_time,
789 diff.clone(),
790 move |time| !until.less_equal(time),
791 &mut row_builder,
792 )
793 .map(move |x| match x {
794 Ok((row, event_time, diff)) => {
795 let mut time: S::Timestamp = time.clone();
797 *time.event_time_mut() = event_time;
798 (Ok(row), time, diff)
799 }
800 Err((e, event_time, diff)) => {
801 let mut time: S::Timestamp = time.clone();
803 *time.event_time_mut() = event_time;
804 (Err(e), time, diff)
805 }
806 })
807 });
808
809 use differential_dataflow::AsCollection;
810 let (oks, errs) = stream
811 .as_collection()
812 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
813 "OkErr",
814 |x| x,
815 );
816
817 (oks, errors.concat(&errs))
818 }
819 pub fn ensure_collections(
820 mut self,
821 collections: AvailableCollections,
822 input_key: Option<Vec<MirScalarExpr>>,
823 input_mfp: MapFilterProject,
824 until: Antichain<mz_repr::Timestamp>,
825 config_set: &ConfigSet,
826 ) -> Self {
827 if collections == Default::default() {
828 return self;
829 }
830 for (key, _, _) in collections.arranged.iter() {
839 soft_assert_or_log!(
840 !self.arranged.contains_key(key),
841 "LIR ArrangeBy tried to create an existing arrangement"
842 );
843 }
844
845 let form_raw_collection = collections.raw
847 || collections
848 .arranged
849 .iter()
850 .any(|(key, _, _)| !self.arranged.contains_key(key));
851 if form_raw_collection && self.collection.is_none() {
852 self.collection = Some(self.as_collection_core(
853 input_mfp,
854 input_key.map(|k| (k, None)),
855 until,
856 config_set,
857 ));
858 }
859 for (key, _, thinning) in collections.arranged {
860 if !self.arranged.contains_key(&key) {
861 let name = format!("ArrangeBy[{:?}]", key);
863
864 let (oks, errs) = self
865 .collection
866 .clone()
867 .expect("Collection constructed above");
868 let (oks, errs_keyed) =
869 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
870 let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
871 let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
872 &format!("{}-errors", name),
873 );
874 self.arranged
875 .insert(key, ArrangementFlavor::Local(oks, errs));
876 }
877 }
878 self
879 }
880
881 fn arrange_collection(
888 name: &String,
889 oks: Collection<S, Row, Diff>,
890 key: Vec<MirScalarExpr>,
891 thinning: Vec<usize>,
892 ) -> (
893 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
894 Collection<S, DataflowError, Diff>,
895 ) {
896 let (oks, errs) = oks
901 .inner
902 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
903 Pipeline,
904 "FormArrangementKey",
905 move |_, _| {
906 Box::new(move |input, ok, err| {
907 let mut key_buf = Row::default();
908 let mut val_buf = Row::default();
909 let mut datums = DatumVec::new();
910 let mut temp_storage = RowArena::new();
911 while let Some((time, data)) = input.next() {
912 let mut ok_session = ok.session_with_builder(&time);
913 let mut err_session = err.session(&time);
914 for (row, time, diff) in data.iter() {
915 temp_storage.clear();
916 let datums = datums.borrow_with(row);
917 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
918 match key_buf.packer().try_extend(key_iter) {
919 Ok(()) => {
920 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
921 val_buf.packer().extend(val_datum_iter);
922 ok_session.give(((&*key_buf, &*val_buf), time, diff));
923 }
924 Err(e) => {
925 err_session.give((e.into(), time.clone(), *diff));
926 }
927 }
928 }
929 }
930 })
931 },
932 );
933 let oks = oks
934 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
935 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
936 );
937 (oks, errs.as_collection())
938 }
939}
940
941struct PendingWork<C>
942where
943 C: Cursor,
944{
945 capability: Capability<C::Time>,
946 cursor: C,
947 batch: C::Storage,
948}
949
950impl<C> PendingWork<C>
951where
952 C: Cursor<KeyOwn: PartialEq + Sized>,
953{
954 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
956 Self {
957 capability,
958 cursor,
959 batch,
960 }
961 }
962 fn do_work<I, D, L>(
964 &mut self,
965 key: Option<&C::Key<'_>>,
966 logic: &mut L,
967 fuel: &mut usize,
968 output: &mut OutputHandleCore<
969 '_,
970 C::Time,
971 ConsolidatingContainerBuilder<Vec<I::Item>>,
972 timely::dataflow::channels::pushers::Tee<C::Time, Vec<I::Item>>,
973 >,
974 ) where
975 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
976 D: Data,
977 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
978 {
979 use differential_dataflow::consolidation::consolidate;
980
981 let mut work: usize = 0;
983 let mut session = output.session_with_builder(&self.capability);
984 let mut buffer = Vec::new();
985 if let Some(key) = key {
986 let key = C::KeyContainer::reborrow(*key);
987 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
988 self.cursor.seek_key(&self.batch, key);
989 }
990 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
991 let key = self.cursor.key(&self.batch);
992 while let Some(val) = self.cursor.get_val(&self.batch) {
993 self.cursor.map_times(&self.batch, |time, diff| {
994 buffer.push((C::owned_time(time), C::owned_diff(diff)));
995 });
996 consolidate(&mut buffer);
997 for (time, diff) in buffer.drain(..) {
998 for datum in logic(key, val, time, diff) {
999 session.give(datum);
1000 work += 1;
1001 }
1002 }
1003 self.cursor.step_val(&self.batch);
1004 if work >= *fuel {
1005 *fuel = 0;
1006 return;
1007 }
1008 }
1009 }
1010 } else {
1011 while let Some(key) = self.cursor.get_key(&self.batch) {
1012 while let Some(val) = self.cursor.get_val(&self.batch) {
1013 self.cursor.map_times(&self.batch, |time, diff| {
1014 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1015 });
1016 consolidate(&mut buffer);
1017 for (time, diff) in buffer.drain(..) {
1018 for datum in logic(key, val, time, diff) {
1019 session.give(datum);
1020 work += 1;
1021 }
1022 }
1023 self.cursor.step_val(&self.batch);
1024 if work >= *fuel {
1025 *fuel = 0;
1026 return;
1027 }
1028 }
1029 self.cursor.step_key(&self.batch);
1030 }
1031 }
1032 *fuel -= work;
1033 }
1034}