1use std::collections::BTreeMap;
14use std::rc::{Rc, Weak};
15use std::sync::mpsc;
16
17use columnar::Columnar;
18use differential_dataflow::IntoOwned;
19use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
20use differential_dataflow::containers::Columnation;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::operators::arrange::Arranged;
23use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
24use differential_dataflow::{AsCollection, Collection, Data};
25use mz_compute_types::dataflows::DataflowDescription;
26use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
27use mz_compute_types::plan::{AvailableCollections, LirId};
28use mz_dyncfg::ConfigSet;
29use mz_expr::{Id, MapFilterProject, MirScalarExpr};
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_storage_types::errors::DataflowError;
34use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange};
35use mz_timely_util::operator::{CollectionExt, StreamExt};
36use timely::Container;
37use timely::container::CapacityContainerBuilder;
38use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
39use timely::dataflow::operators::Capability;
40use timely::dataflow::operators::generic::OutputHandleCore;
41use timely::dataflow::scopes::Child;
42use timely::dataflow::{Scope, Stream};
43use timely::progress::timestamp::Refines;
44use timely::progress::{Antichain, Timestamp};
45use tracing::error;
46
47use crate::compute_state::{ComputeState, HydrationEvent};
48use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
49use crate::render::errors::ErrorLogger;
50use crate::render::{LinearJoinSpec, RenderTimestamp};
51use crate::row_spine::{DatumSeq, RowRowBuilder};
52use crate::typedefs::{
53 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
54};
55
56pub struct Context<S: Scope, T = mz_repr::Timestamp>
67where
68 T: Timestamp + Lattice + Columnation,
69 S::Timestamp: Lattice + Refines<T> + Columnation,
70{
71 pub(crate) scope: S,
75 pub debug_name: String,
77 pub dataflow_id: usize,
79 pub as_of_frontier: Antichain<T>,
84 pub until: Antichain<T>,
87 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
89 pub(super) shutdown_token: ShutdownToken,
91 pub(super) hydration_logger: Option<HydrationLogger>,
95 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
97 pub(super) linear_join_spec: LinearJoinSpec,
99 pub dataflow_expiration: Antichain<T>,
102 pub config_set: Rc<ConfigSet>,
104}
105
106impl<S: Scope> Context<S>
107where
108 S::Timestamp: Lattice + Refines<mz_repr::Timestamp> + Columnation,
109{
110 pub fn for_dataflow_in<Plan>(
112 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
113 scope: S,
114 compute_state: &ComputeState,
115 until: Antichain<mz_repr::Timestamp>,
116 dataflow_expiration: Antichain<mz_repr::Timestamp>,
117 ) -> Self {
118 use mz_ore::collections::CollectionExt as IteratorExt;
119 let dataflow_id = *scope.addr().into_first();
120 let as_of_frontier = dataflow
121 .as_of
122 .clone()
123 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
124
125 let (hydration_logger, compute_logger) = if dataflow.is_transient() {
131 (None, None)
132 } else {
133 (
134 Some(HydrationLogger {
135 export_ids: dataflow.export_ids().collect(),
136 tx: compute_state.hydration_tx.clone(),
137 }),
138 compute_state.compute_logger.clone(),
139 )
140 };
141
142 Self {
143 scope,
144 debug_name: dataflow.debug_name.clone(),
145 dataflow_id,
146 as_of_frontier,
147 until,
148 bindings: BTreeMap::new(),
149 shutdown_token: Default::default(),
150 hydration_logger,
151 compute_logger,
152 linear_join_spec: compute_state.linear_join_spec,
153 dataflow_expiration,
154 config_set: Rc::clone(&compute_state.worker_config),
155 }
156 }
157}
158
159impl<S: Scope, T> Context<S, T>
160where
161 T: Timestamp + Lattice + Columnation,
162 S::Timestamp: Lattice + Refines<T> + Columnation,
163{
164 pub fn insert_id(
169 &mut self,
170 id: Id,
171 collection: CollectionBundle<S, T>,
172 ) -> Option<CollectionBundle<S, T>> {
173 self.bindings.insert(id, collection)
174 }
175 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
179 self.bindings.remove(&id)
180 }
181 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
183 if !self.bindings.contains_key(&id) {
184 self.bindings.insert(id, collection);
185 } else {
186 let binding = self
187 .bindings
188 .get_mut(&id)
189 .expect("Binding verified to exist");
190 if collection.collection.is_some() {
191 binding.collection = collection.collection;
192 }
193 for (key, flavor) in collection.arranged.into_iter() {
194 binding.arranged.insert(key, flavor);
195 }
196 }
197 }
198 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
200 self.bindings.get(&id).cloned()
201 }
202
203 pub(super) fn error_logger(&self) -> ErrorLogger {
204 ErrorLogger::new(self.shutdown_token.clone(), self.debug_name.clone())
205 }
206}
207
208impl<S: Scope, T> Context<S, T>
209where
210 T: Timestamp + Lattice + Columnation,
211 S::Timestamp: Lattice + Refines<T> + Columnation,
212{
213 pub fn enter_region<'a>(
215 &self,
216 region: &Child<'a, S, S::Timestamp>,
217 bindings: Option<&std::collections::BTreeSet<Id>>,
218 ) -> Context<Child<'a, S, S::Timestamp>, T> {
219 let bindings = self
220 .bindings
221 .iter()
222 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
223 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
224 .collect();
225
226 Context {
227 scope: region.clone(),
228 debug_name: self.debug_name.clone(),
229 dataflow_id: self.dataflow_id.clone(),
230 as_of_frontier: self.as_of_frontier.clone(),
231 until: self.until.clone(),
232 shutdown_token: self.shutdown_token.clone(),
233 hydration_logger: self.hydration_logger.clone(),
234 compute_logger: self.compute_logger.clone(),
235 linear_join_spec: self.linear_join_spec.clone(),
236 bindings,
237 dataflow_expiration: self.dataflow_expiration.clone(),
238 config_set: Rc::clone(&self.config_set),
239 }
240 }
241}
242
243#[derive(Clone, Default)]
249pub(super) struct ShutdownToken(Option<Weak<()>>);
250
251impl ShutdownToken {
252 pub(super) fn new(token: Weak<()>) -> Self {
254 Self(Some(token))
255 }
256
257 pub(super) fn probe(&self) -> Option<()> {
262 match &self.0 {
263 Some(t) => t.upgrade().map(|_| ()),
264 None => Some(()),
265 }
266 }
267
268 pub(super) fn in_shutdown(&self) -> bool {
270 self.probe().is_none()
271 }
272
273 pub(crate) fn get_inner(&self) -> Option<&Weak<()>> {
275 self.0.as_ref()
276 }
277}
278
279#[derive(Clone)]
281pub(super) struct HydrationLogger {
282 export_ids: Vec<GlobalId>,
283 tx: mpsc::Sender<HydrationEvent>,
284}
285
286impl HydrationLogger {
287 pub fn log(&self, lir_id: LirId, hydrated: bool) {
293 for &export_id in &self.export_ids {
294 let event = HydrationEvent {
295 export_id,
296 lir_id,
297 hydrated,
298 };
299 if self.tx.send(event).is_err() {
300 error!("hydration event receiver dropped unexpectely");
301 }
302 }
303 }
304}
305
306#[derive(Clone)]
308pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
309where
310 T: Timestamp + Lattice + Columnation,
311 S::Timestamp: Lattice + Refines<T> + Columnation,
312{
313 Local(
315 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
316 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
317 ),
318 Trace(
323 GlobalId,
324 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
325 Arranged<S, ErrEnter<T, S::Timestamp>>,
326 ),
327}
328
329impl<S: Scope, T> ArrangementFlavor<S, T>
330where
331 T: Timestamp + Lattice + Columnation,
332 S::Timestamp: Lattice + Refines<T> + Columnation,
333{
334 #[deprecated(note = "Use `flat_map` instead.")]
342 pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
343 let mut datums = DatumVec::new();
344 let logic = move |k: DatumSeq, v: DatumSeq| {
345 let mut datums_borrow = datums.borrow();
346 datums_borrow.extend(k);
347 datums_borrow.extend(v);
348 SharedRow::pack(&**datums_borrow)
349 };
350 match &self {
351 ArrangementFlavor::Local(oks, errs) => (
352 oks.as_collection(logic),
353 errs.as_collection(|k, &()| k.clone()),
354 ),
355 ArrangementFlavor::Trace(_, oks, errs) => (
356 oks.as_collection(logic),
357 errs.as_collection(|k, &()| k.clone()),
358 ),
359 }
360 }
361
362 pub fn flat_map<D, I, L>(
375 &self,
376 key: Option<Row>,
377 max_demand: usize,
378 mut logic: L,
379 ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
380 where
381 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
382 D: Data,
383 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
384 {
385 let refuel = 1000000;
389
390 let mut datums = DatumVec::new();
391 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
392 let mut datums_borrow = datums.borrow();
393 datums_borrow.extend(k.to_datum_iter().take(max_demand));
394 let max_demand = max_demand.saturating_sub(datums_borrow.len());
395 datums_borrow.extend(v.to_datum_iter().take(max_demand));
396 logic(&mut datums_borrow, t, d)
397 };
398
399 match &self {
400 ArrangementFlavor::Local(oks, errs) => {
401 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
402 let errs = errs.as_collection(|k, &()| k.clone());
403 (oks, errs)
404 }
405 ArrangementFlavor::Trace(_, oks, errs) => {
406 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
407 let errs = errs.as_collection(|k, &()| k.clone());
408 (oks, errs)
409 }
410 }
411 }
412}
413impl<S: Scope, T> ArrangementFlavor<S, T>
414where
415 T: Timestamp + Lattice + Columnation,
416 S::Timestamp: Lattice + Refines<T> + Columnation,
417{
418 pub fn scope(&self) -> S {
420 match self {
421 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
422 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
423 }
424 }
425
426 pub fn enter_region<'a>(
428 &self,
429 region: &Child<'a, S, S::Timestamp>,
430 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
431 match self {
432 ArrangementFlavor::Local(oks, errs) => {
433 ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
434 }
435 ArrangementFlavor::Trace(gid, oks, errs) => {
436 ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
437 }
438 }
439 }
440}
441impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
442where
443 T: Timestamp + Lattice + Columnation,
444 S::Timestamp: Lattice + Refines<T> + Columnation,
445{
446 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
448 match self {
449 ArrangementFlavor::Local(oks, errs) => {
450 ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
451 }
452 ArrangementFlavor::Trace(gid, oks, errs) => {
453 ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
454 }
455 }
456 }
457}
458
459#[derive(Clone)]
464pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
465where
466 T: Timestamp + Lattice + Columnation,
467 S::Timestamp: Lattice + Refines<T> + Columnation,
468{
469 pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
470 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
471}
472
473impl<S: Scope, T: Lattice> CollectionBundle<S, T>
474where
475 T: Timestamp + Lattice + Columnation,
476 S::Timestamp: Lattice + Refines<T> + Columnation,
477{
478 pub fn from_collections(
480 oks: Collection<S, Row, Diff>,
481 errs: Collection<S, DataflowError, Diff>,
482 ) -> Self {
483 Self {
484 collection: Some((oks, errs)),
485 arranged: BTreeMap::default(),
486 }
487 }
488
489 pub fn from_expressions(
491 exprs: Vec<MirScalarExpr>,
492 arrangements: ArrangementFlavor<S, T>,
493 ) -> Self {
494 let mut arranged = BTreeMap::new();
495 arranged.insert(exprs, arrangements);
496 Self {
497 collection: None,
498 arranged,
499 }
500 }
501
502 pub fn from_columns<I: IntoIterator<Item = usize>>(
504 columns: I,
505 arrangements: ArrangementFlavor<S, T>,
506 ) -> Self {
507 let mut keys = Vec::new();
508 for column in columns {
509 keys.push(MirScalarExpr::Column(column));
510 }
511 Self::from_expressions(keys, arrangements)
512 }
513
514 pub fn scope(&self) -> S {
516 if let Some((oks, _errs)) = &self.collection {
517 oks.inner.scope()
518 } else {
519 self.arranged
520 .values()
521 .next()
522 .expect("Must contain a valid collection")
523 .scope()
524 }
525 }
526
527 pub fn enter_region<'a>(
529 &self,
530 region: &Child<'a, S, S::Timestamp>,
531 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
532 CollectionBundle {
533 collection: self
534 .collection
535 .as_ref()
536 .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
537 arranged: self
538 .arranged
539 .iter()
540 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
541 .collect(),
542 }
543 }
544}
545
546impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
547where
548 T: Timestamp + Lattice + Columnation,
549 S::Timestamp: Lattice + Refines<T> + Columnation,
550{
551 pub fn leave_region(&self) -> CollectionBundle<S, T> {
553 CollectionBundle {
554 collection: self
555 .collection
556 .as_ref()
557 .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
558 arranged: self
559 .arranged
560 .iter()
561 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
562 .collect(),
563 }
564 }
565}
566
567impl<S: Scope, T> CollectionBundle<S, T>
568where
569 T: Timestamp + Lattice + Columnation,
570 S::Timestamp: Lattice + Refines<T> + Columnation,
571{
572 pub fn as_specific_collection(
585 &self,
586 key: Option<&[MirScalarExpr]>,
587 config_set: &ConfigSet,
588 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
589 match key {
595 None => self
596 .collection
597 .clone()
598 .expect("The unarranged collection doesn't exist."),
599 Some(key) => {
600 let arranged = self.arranged.get(key).unwrap_or_else(|| {
601 panic!("The collection arranged by {:?} doesn't exist.", key)
602 });
603 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
604 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
606 Some((SharedRow::pack(borrow.iter()), t, r))
607 });
608 (ok.as_collection(), err)
609 } else {
610 #[allow(deprecated)]
611 arranged.as_collection()
612 }
613 }
614 }
615 }
616
617 pub fn flat_map<D, I, L>(
633 &self,
634 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
635 max_demand: usize,
636 mut logic: L,
637 ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
638 where
639 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
640 D: Data,
641 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
642 {
643 if let Some((key, val)) = key_val {
647 self.arrangement(&key)
648 .expect("Should have ensured during planning that this arrangement exists.")
649 .flat_map(val, max_demand, logic)
650 } else {
651 use timely::dataflow::operators::Map;
652 let (oks, errs) = self
653 .collection
654 .clone()
655 .expect("Invariant violated: CollectionBundle contains no collection.");
656 let mut datums = DatumVec::new();
657 let oks = oks.inner.flat_map(move |(v, t, d)| {
658 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
659 });
660 (oks, errs)
661 }
662 }
663
664 fn flat_map_core<Tr, K, D, I, L>(
672 trace: &Arranged<S, Tr>,
673 key: Option<K>,
674 mut logic: L,
675 refuel: usize,
676 ) -> Stream<S, I::Item>
677 where
678 for<'a> Tr::Key<'a>: ToDatumIter + IntoOwned<'a, Owned = K>,
679 for<'a> Tr::Val<'a>: ToDatumIter,
680 Tr: TraceReader<Time = S::Timestamp, Diff = mz_repr::Diff> + Clone + 'static,
681 K: PartialEq + 'static,
682 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
683 D: Data,
684 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
685 {
686 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
687
688 let mode = if key.is_some() { "index" } else { "scan" };
689 let name = format!("ArrangementFlatMap({})", mode);
690 use timely::dataflow::operators::Operator;
691 trace
692 .stream
693 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
694 let activator = trace.stream.scope().activator_for(info.address);
696 let mut todo = std::collections::VecDeque::new();
698 move |input, output| {
699 input.for_each(|time, data| {
701 let capability = time.retain();
702 for batch in data.iter() {
703 todo.push_back(PendingWork::new(
705 capability.clone(),
706 batch.cursor(),
707 batch.clone(),
708 ));
709 }
710 });
711
712 let mut fuel = refuel;
714 while !todo.is_empty() && fuel > 0 {
715 todo.front_mut()
716 .unwrap()
717 .do_work(&key, &mut logic, &mut fuel, output);
718 if fuel > 0 {
719 todo.pop_front();
720 }
721 }
722 if !todo.is_empty() {
724 activator.activate();
725 }
726 }
727 })
728 }
729
730 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
735 self.arranged.get(key).map(|x| x.clone())
736 }
737}
738
739impl<S, T> CollectionBundle<S, T>
740where
741 T: Timestamp + Lattice + Columnation,
742 S: Scope,
743 S::Timestamp: Refines<T> + RenderTimestamp,
744 <S::Timestamp as Columnar>::Container: Clone + Send,
745 for<'a> <S::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
746{
747 pub fn as_collection_core(
756 &self,
757 mut mfp: MapFilterProject,
758 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
759 until: Antichain<mz_repr::Timestamp>,
760 config_set: &ConfigSet,
761 ) -> (
762 Collection<S, mz_repr::Row, Diff>,
763 Collection<S, DataflowError, Diff>,
764 ) {
765 mfp.optimize();
766 let mfp_plan = mfp.clone().into_plan().unwrap();
767
768 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
774 true
775 } else {
776 false
777 };
778
779 if mfp_plan.is_identity() && !has_key_val {
780 let key = key_val.map(|(k, _v)| k);
781 return self.as_specific_collection(key.as_deref(), config_set);
782 }
783
784 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
785 mfp.permute_fn(|c| c, max_demand);
786 mfp.optimize();
787 let mfp_plan = mfp.into_plan().unwrap();
788
789 let mut datum_vec = DatumVec::new();
790 let until = std::rc::Rc::new(until);
792
793 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
794 let binding = SharedRow::get();
795 let mut row_builder = binding.borrow_mut();
796 let until = std::rc::Rc::clone(&until);
797 let temp_storage = RowArena::new();
798 let row_iter = row_datums.iter();
799 let mut datums_local = datum_vec.borrow();
800 datums_local.extend(row_iter);
801 let time = time.clone();
802 let event_time = time.event_time();
803 mfp_plan
804 .evaluate(
805 &mut datums_local,
806 &temp_storage,
807 event_time,
808 diff.clone(),
809 move |time| !until.less_equal(time),
810 &mut row_builder,
811 )
812 .map(move |x| match x {
813 Ok((row, event_time, diff)) => {
814 let mut time: S::Timestamp = time.clone();
816 *time.event_time_mut() = event_time;
817 (Ok(row), time, diff)
818 }
819 Err((e, event_time, diff)) => {
820 let mut time: S::Timestamp = time.clone();
822 *time.event_time_mut() = event_time;
823 (Err(e), time, diff)
824 }
825 })
826 });
827
828 use differential_dataflow::AsCollection;
829 let (oks, errs) = stream
830 .as_collection()
831 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
832 "OkErr",
833 |x| x,
834 );
835
836 (oks, errors.concat(&errs))
837 }
838 pub fn ensure_collections(
839 mut self,
840 collections: AvailableCollections,
841 input_key: Option<Vec<MirScalarExpr>>,
842 input_mfp: MapFilterProject,
843 until: Antichain<mz_repr::Timestamp>,
844 config_set: &ConfigSet,
845 ) -> Self {
846 if collections == Default::default() {
847 return self;
848 }
849 let form_raw_collection = collections.raw
859 || collections
860 .arranged
861 .iter()
862 .any(|(key, _, _)| !self.arranged.contains_key(key));
863 if form_raw_collection && self.collection.is_none() {
864 self.collection = Some(self.as_collection_core(
865 input_mfp,
866 input_key.map(|k| (k, None)),
867 until,
868 config_set,
869 ));
870 }
871 for (key, _, thinning) in collections.arranged {
872 if !self.arranged.contains_key(&key) {
873 let name = format!("ArrangeBy[{:?}]", key);
875
876 let (oks, errs) = self
877 .collection
878 .clone()
879 .expect("Collection constructed above");
880 let (oks, errs_keyed) =
881 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
882 let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
883 let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
884 &format!("{}-errors", name),
885 );
886 self.arranged
887 .insert(key, ArrangementFlavor::Local(oks, errs));
888 }
889 }
890 self
891 }
892
893 fn arrange_collection(
900 name: &String,
901 oks: Collection<S, Row, Diff>,
902 key: Vec<MirScalarExpr>,
903 thinning: Vec<usize>,
904 ) -> (
905 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
906 Collection<S, DataflowError, Diff>,
907 ) {
908 let (oks, errs) = oks
913 .inner
914 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
915 Pipeline,
916 "FormArrangementKey",
917 move |_, _| {
918 Box::new(move |input, ok, err| {
919 let mut key_buf = Row::default();
920 let mut val_buf = Row::default();
921 let mut datums = DatumVec::new();
922 let mut temp_storage = RowArena::new();
923 while let Some((time, data)) = input.next() {
924 let mut ok_session = ok.session_with_builder(&time);
925 let mut err_session = err.session(&time);
926 for (row, time, diff) in data.iter() {
927 temp_storage.clear();
928 let datums = datums.borrow_with(row);
929 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
930 match key_buf.packer().try_extend(key_iter) {
931 Ok(()) => {
932 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
933 val_buf.packer().extend(val_datum_iter);
934 ok_session.give(((&*key_buf, &*val_buf), time, diff));
935 }
936 Err(e) => {
937 err_session.give((e.into(), time.clone(), *diff));
938 }
939 }
940 }
941 }
942 })
943 },
944 );
945 let oks = oks
946 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
947 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
948 );
949 (oks, errs.as_collection())
950 }
951}
952
953struct PendingWork<C>
954where
955 C: Cursor,
956 C::Time: Timestamp,
957{
958 capability: Capability<C::Time>,
959 cursor: C,
960 batch: C::Storage,
961}
962
963impl<C> PendingWork<C>
964where
965 C: Cursor,
966 C::Time: Timestamp,
967{
968 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
970 Self {
971 capability,
972 cursor,
973 batch,
974 }
975 }
976 fn do_work<I, D, L, K>(
978 &mut self,
979 key: &Option<K>,
980 logic: &mut L,
981 fuel: &mut usize,
982 output: &mut OutputHandleCore<
983 '_,
984 C::Time,
985 ConsolidatingContainerBuilder<Vec<I::Item>>,
986 timely::dataflow::channels::pushers::Tee<C::Time, Vec<I::Item>>,
987 >,
988 ) where
989 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
990 D: Data,
991 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
992 K: PartialEq + Sized,
993 for<'a> C::Key<'a>: IntoOwned<'a, Owned = K>,
994 {
995 use differential_dataflow::consolidation::consolidate;
996
997 let mut work: usize = 0;
999 let mut session = output.session_with_builder(&self.capability);
1000 let mut buffer = Vec::new();
1001 if let Some(key) = key {
1002 if self
1003 .cursor
1004 .get_key(&self.batch)
1005 .map(|k| k == IntoOwned::borrow_as(key))
1006 != Some(true)
1007 {
1008 self.cursor.seek_key(&self.batch, IntoOwned::borrow_as(key));
1009 }
1010 if self
1011 .cursor
1012 .get_key(&self.batch)
1013 .map(|k| k == IntoOwned::borrow_as(key))
1014 == Some(true)
1015 {
1016 let key = self.cursor.key(&self.batch);
1017 while let Some(val) = self.cursor.get_val(&self.batch) {
1018 self.cursor.map_times(&self.batch, |time, diff| {
1019 buffer.push((time.into_owned(), diff.into_owned()));
1020 });
1021 consolidate(&mut buffer);
1022 for (time, diff) in buffer.drain(..) {
1023 for datum in logic(key, val, time, diff) {
1024 session.give(datum);
1025 work += 1;
1026 }
1027 }
1028 self.cursor.step_val(&self.batch);
1029 if work >= *fuel {
1030 *fuel = 0;
1031 return;
1032 }
1033 }
1034 }
1035 } else {
1036 while let Some(key) = self.cursor.get_key(&self.batch) {
1037 while let Some(val) = self.cursor.get_val(&self.batch) {
1038 self.cursor.map_times(&self.batch, |time, diff| {
1039 buffer.push((time.into_owned(), diff.into_owned()));
1040 });
1041 consolidate(&mut buffer);
1042 for (time, diff) in buffer.drain(..) {
1043 for datum in logic(key, val, time, diff) {
1044 session.give(datum);
1045 work += 1;
1046 }
1047 }
1048 self.cursor.step_val(&self.batch);
1049 if work >= *fuel {
1050 *fuel = 0;
1051 return;
1052 }
1053 }
1054 self.cursor.step_key(&self.batch);
1055 }
1056 }
1057 *fuel -= work;
1058 }
1059}