1use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::rc::Rc;
16use std::sync::mpsc;
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
22use differential_dataflow::{AsCollection, Collection, Data};
23use mz_compute_types::dataflows::DataflowDescription;
24use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
25use mz_compute_types::plan::{AvailableCollections, LirId};
26use mz_dyncfg::ConfigSet;
27use mz_expr::{Id, MapFilterProject, MirScalarExpr};
28use mz_ore::soft_assert_or_log;
29use mz_repr::fixed_length::ToDatumIter;
30use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
31use mz_storage_types::controller::CollectionMetadata;
32use mz_storage_types::errors::DataflowError;
33use mz_timely_util::builder_async::{ButtonHandle, PressOnDropButton};
34use mz_timely_util::columnar::builder::ColumnBuilder;
35use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
36use mz_timely_util::operator::{CollectionExt, StreamExt};
37use timely::Container;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
40use timely::dataflow::operators::Capability;
41use timely::dataflow::operators::generic::OutputHandleCore;
42use timely::dataflow::scopes::Child;
43use timely::dataflow::{Scope, Stream};
44use timely::progress::timestamp::Refines;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::compute_state::{ComputeState, HydrationEvent};
49use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
50use crate::render::errors::ErrorLogger;
51use crate::render::{LinearJoinSpec, RenderTimestamp};
52use crate::row_spine::{DatumSeq, RowRowBuilder};
53use crate::typedefs::{
54 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
55 RowRowSpine,
56};
57
58pub struct Context<S: Scope, T = mz_repr::Timestamp>
69where
70 T: MzTimestamp,
71 S::Timestamp: MzTimestamp + Refines<T>,
72{
73 pub(crate) scope: S,
77 pub debug_name: String,
79 pub dataflow_id: usize,
81 pub as_of_frontier: Antichain<T>,
86 pub until: Antichain<T>,
89 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
91 pub(super) shutdown_probe: ShutdownProbe,
93 pub(super) hydration_logger: Option<HydrationLogger>,
97 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
99 pub(super) linear_join_spec: LinearJoinSpec,
101 pub dataflow_expiration: Antichain<T>,
104 pub config_set: Rc<ConfigSet>,
106}
107
108impl<S: Scope> Context<S>
109where
110 S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
111{
112 pub fn for_dataflow_in<Plan>(
114 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
115 scope: S,
116 compute_state: &ComputeState,
117 until: Antichain<mz_repr::Timestamp>,
118 dataflow_expiration: Antichain<mz_repr::Timestamp>,
119 ) -> Self {
120 use mz_ore::collections::CollectionExt as IteratorExt;
121 let dataflow_id = *scope.addr().into_first();
122 let as_of_frontier = dataflow
123 .as_of
124 .clone()
125 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
126
127 let (hydration_logger, compute_logger) = if dataflow.is_transient() {
133 (None, None)
134 } else {
135 (
136 Some(HydrationLogger {
137 export_ids: dataflow.export_ids().collect(),
138 tx: compute_state.hydration_tx.clone(),
139 }),
140 compute_state.compute_logger.clone(),
141 )
142 };
143
144 Self {
145 scope,
146 debug_name: dataflow.debug_name.clone(),
147 dataflow_id,
148 as_of_frontier,
149 until,
150 bindings: BTreeMap::new(),
151 shutdown_probe: Default::default(),
152 hydration_logger,
153 compute_logger,
154 linear_join_spec: compute_state.linear_join_spec,
155 dataflow_expiration,
156 config_set: Rc::clone(&compute_state.worker_config),
157 }
158 }
159}
160
161impl<S: Scope, T> Context<S, T>
162where
163 T: MzTimestamp,
164 S::Timestamp: MzTimestamp + Refines<T>,
165{
166 pub fn insert_id(
171 &mut self,
172 id: Id,
173 collection: CollectionBundle<S, T>,
174 ) -> Option<CollectionBundle<S, T>> {
175 self.bindings.insert(id, collection)
176 }
177 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
181 self.bindings.remove(&id)
182 }
183 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
185 if !self.bindings.contains_key(&id) {
186 self.bindings.insert(id, collection);
187 } else {
188 let binding = self
189 .bindings
190 .get_mut(&id)
191 .expect("Binding verified to exist");
192 if collection.collection.is_some() {
193 binding.collection = collection.collection;
194 }
195 for (key, flavor) in collection.arranged.into_iter() {
196 binding.arranged.insert(key, flavor);
197 }
198 }
199 }
200 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
202 self.bindings.get(&id).cloned()
203 }
204
205 pub(super) fn error_logger(&self) -> ErrorLogger {
206 ErrorLogger::new(self.shutdown_probe.clone(), self.debug_name.clone())
207 }
208}
209
210impl<S: Scope, T> Context<S, T>
211where
212 T: MzTimestamp,
213 S::Timestamp: MzTimestamp + Refines<T>,
214{
215 pub fn enter_region<'a>(
217 &self,
218 region: &Child<'a, S, S::Timestamp>,
219 bindings: Option<&std::collections::BTreeSet<Id>>,
220 ) -> Context<Child<'a, S, S::Timestamp>, T> {
221 let bindings = self
222 .bindings
223 .iter()
224 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
225 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
226 .collect();
227
228 Context {
229 scope: region.clone(),
230 debug_name: self.debug_name.clone(),
231 dataflow_id: self.dataflow_id.clone(),
232 as_of_frontier: self.as_of_frontier.clone(),
233 until: self.until.clone(),
234 shutdown_probe: self.shutdown_probe.clone(),
235 hydration_logger: self.hydration_logger.clone(),
236 compute_logger: self.compute_logger.clone(),
237 linear_join_spec: self.linear_join_spec.clone(),
238 bindings,
239 dataflow_expiration: self.dataflow_expiration.clone(),
240 config_set: Rc::clone(&self.config_set),
241 }
242 }
243}
244
245pub(super) fn shutdown_token<G: Scope>(scope: &mut G) -> (ShutdownProbe, PressOnDropButton) {
246 let (button_handle, button) = mz_timely_util::builder_async::button(scope, scope.addr());
247 let probe = ShutdownProbe::new(button_handle);
248 let token = button.press_on_drop();
249 (probe, token)
250}
251
252#[derive(Clone, Default)]
258pub(super) struct ShutdownProbe(Option<Rc<RefCell<ButtonHandle>>>);
259
260impl ShutdownProbe {
261 fn new(button: ButtonHandle) -> Self {
263 Self(Some(Rc::new(RefCell::new(button))))
264 }
265
266 pub(super) fn in_shutdown(&self) -> bool {
271 match &self.0 {
272 Some(t) => t.borrow_mut().all_pressed(),
273 None => false,
274 }
275 }
276
277 pub(super) fn in_local_shutdown(&self) -> bool {
282 match &self.0 {
283 Some(t) => t.borrow_mut().local_pressed(),
284 None => false,
285 }
286 }
287}
288
289#[derive(Clone)]
291pub(super) struct HydrationLogger {
292 export_ids: Vec<GlobalId>,
293 tx: mpsc::Sender<HydrationEvent>,
294}
295
296impl HydrationLogger {
297 pub fn log(&self, lir_id: LirId, hydrated: bool) {
303 for &export_id in &self.export_ids {
304 let event = HydrationEvent {
305 export_id,
306 lir_id,
307 hydrated,
308 };
309 if self.tx.send(event).is_err() {
310 error!("hydration event receiver dropped unexpectely");
311 }
312 }
313 }
314}
315
316#[derive(Clone)]
318pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
319where
320 T: MzTimestamp,
321 S::Timestamp: MzTimestamp + Refines<T>,
322{
323 Local(
325 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
326 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
327 ),
328 Trace(
333 GlobalId,
334 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
335 Arranged<S, ErrEnter<T, S::Timestamp>>,
336 ),
337}
338
339impl<S: Scope, T> ArrangementFlavor<S, T>
340where
341 T: MzTimestamp,
342 S::Timestamp: MzTimestamp + Refines<T>,
343{
344 #[deprecated(note = "Use `flat_map` instead.")]
352 pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
353 let mut datums = DatumVec::new();
354 let logic = move |k: DatumSeq, v: DatumSeq| {
355 let mut datums_borrow = datums.borrow();
356 datums_borrow.extend(k);
357 datums_borrow.extend(v);
358 SharedRow::pack(&**datums_borrow)
359 };
360 match &self {
361 ArrangementFlavor::Local(oks, errs) => (
362 oks.as_collection(logic),
363 errs.as_collection(|k, &()| k.clone()),
364 ),
365 ArrangementFlavor::Trace(_, oks, errs) => (
366 oks.as_collection(logic),
367 errs.as_collection(|k, &()| k.clone()),
368 ),
369 }
370 }
371
372 pub fn flat_map<D, I, L>(
385 &self,
386 key: Option<&Row>,
387 max_demand: usize,
388 mut logic: L,
389 ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
390 where
391 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
392 D: Data,
393 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
394 {
395 let refuel = 1000000;
399
400 let mut datums = DatumVec::new();
401 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
402 let mut datums_borrow = datums.borrow();
403 datums_borrow.extend(k.to_datum_iter().take(max_demand));
404 let max_demand = max_demand.saturating_sub(datums_borrow.len());
405 datums_borrow.extend(v.to_datum_iter().take(max_demand));
406 logic(&mut datums_borrow, t, d)
407 };
408
409 match &self {
410 ArrangementFlavor::Local(oks, errs) => {
411 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
412 let errs = errs.as_collection(|k, &()| k.clone());
413 (oks, errs)
414 }
415 ArrangementFlavor::Trace(_, oks, errs) => {
416 let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
417 let errs = errs.as_collection(|k, &()| k.clone());
418 (oks, errs)
419 }
420 }
421 }
422}
423impl<S: Scope, T> ArrangementFlavor<S, T>
424where
425 T: MzTimestamp,
426 S::Timestamp: MzTimestamp + Refines<T>,
427{
428 pub fn scope(&self) -> S {
430 match self {
431 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
432 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
433 }
434 }
435
436 pub fn enter_region<'a>(
438 &self,
439 region: &Child<'a, S, S::Timestamp>,
440 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
441 match self {
442 ArrangementFlavor::Local(oks, errs) => {
443 ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
444 }
445 ArrangementFlavor::Trace(gid, oks, errs) => {
446 ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
447 }
448 }
449 }
450}
451impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
452where
453 T: MzTimestamp,
454 S::Timestamp: MzTimestamp + Refines<T>,
455{
456 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
458 match self {
459 ArrangementFlavor::Local(oks, errs) => {
460 ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
461 }
462 ArrangementFlavor::Trace(gid, oks, errs) => {
463 ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
464 }
465 }
466 }
467}
468
469#[derive(Clone)]
474pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
475where
476 T: MzTimestamp,
477 S::Timestamp: MzTimestamp + Refines<T>,
478{
479 pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
480 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
481}
482
483impl<S: Scope, T> CollectionBundle<S, T>
484where
485 T: MzTimestamp,
486 S::Timestamp: MzTimestamp + Refines<T>,
487{
488 pub fn from_collections(
490 oks: Collection<S, Row, Diff>,
491 errs: Collection<S, DataflowError, Diff>,
492 ) -> Self {
493 Self {
494 collection: Some((oks, errs)),
495 arranged: BTreeMap::default(),
496 }
497 }
498
499 pub fn from_expressions(
501 exprs: Vec<MirScalarExpr>,
502 arrangements: ArrangementFlavor<S, T>,
503 ) -> Self {
504 let mut arranged = BTreeMap::new();
505 arranged.insert(exprs, arrangements);
506 Self {
507 collection: None,
508 arranged,
509 }
510 }
511
512 pub fn from_columns<I: IntoIterator<Item = usize>>(
514 columns: I,
515 arrangements: ArrangementFlavor<S, T>,
516 ) -> Self {
517 let mut keys = Vec::new();
518 for column in columns {
519 keys.push(MirScalarExpr::column(column));
520 }
521 Self::from_expressions(keys, arrangements)
522 }
523
524 pub fn scope(&self) -> S {
526 if let Some((oks, _errs)) = &self.collection {
527 oks.inner.scope()
528 } else {
529 self.arranged
530 .values()
531 .next()
532 .expect("Must contain a valid collection")
533 .scope()
534 }
535 }
536
537 pub fn enter_region<'a>(
539 &self,
540 region: &Child<'a, S, S::Timestamp>,
541 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
542 CollectionBundle {
543 collection: self
544 .collection
545 .as_ref()
546 .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
547 arranged: self
548 .arranged
549 .iter()
550 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
551 .collect(),
552 }
553 }
554}
555
556impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
557where
558 T: MzTimestamp,
559 S::Timestamp: MzTimestamp + Refines<T>,
560{
561 pub fn leave_region(&self) -> CollectionBundle<S, T> {
563 CollectionBundle {
564 collection: self
565 .collection
566 .as_ref()
567 .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
568 arranged: self
569 .arranged
570 .iter()
571 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
572 .collect(),
573 }
574 }
575}
576
577impl<S: Scope, T> CollectionBundle<S, T>
578where
579 T: MzTimestamp,
580 S::Timestamp: MzTimestamp + Refines<T>,
581{
582 pub fn as_specific_collection(
595 &self,
596 key: Option<&[MirScalarExpr]>,
597 config_set: &ConfigSet,
598 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
599 match key {
605 None => self
606 .collection
607 .clone()
608 .expect("The unarranged collection doesn't exist."),
609 Some(key) => {
610 let arranged = self.arranged.get(key).unwrap_or_else(|| {
611 panic!("The collection arranged by {:?} doesn't exist.", key)
612 });
613 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
614 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
616 Some((SharedRow::pack(borrow.iter()), t, r))
617 });
618 (ok.as_collection(), err)
619 } else {
620 #[allow(deprecated)]
621 arranged.as_collection()
622 }
623 }
624 }
625 }
626
627 pub fn flat_map<D, I, L>(
643 &self,
644 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
645 max_demand: usize,
646 mut logic: L,
647 ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
648 where
649 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
650 D: Data,
651 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
652 {
653 if let Some((key, val)) = key_val {
657 self.arrangement(&key)
658 .expect("Should have ensured during planning that this arrangement exists.")
659 .flat_map(val.as_ref(), max_demand, logic)
660 } else {
661 use timely::dataflow::operators::Map;
662 let (oks, errs) = self
663 .collection
664 .clone()
665 .expect("Invariant violated: CollectionBundle contains no collection.");
666 let mut datums = DatumVec::new();
667 let oks = oks.inner.flat_map(move |(v, t, d)| {
668 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
669 });
670 (oks, errs)
671 }
672 }
673
674 fn flat_map_core<Tr, D, I, L>(
682 trace: &Arranged<S, Tr>,
683 key: Option<&Tr::KeyOwn>,
684 mut logic: L,
685 refuel: usize,
686 ) -> Stream<S, I::Item>
687 where
688 Tr: for<'a> TraceReader<
689 Key<'a>: ToDatumIter,
690 KeyOwn: PartialEq,
691 Val<'a>: ToDatumIter,
692 Time = S::Timestamp,
693 Diff = mz_repr::Diff,
694 > + Clone
695 + 'static,
696 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
697 D: Data,
698 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
699 {
700 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
701
702 let mut key_con = Tr::KeyContainer::with_capacity(1);
703 if let Some(key) = &key {
704 key_con.push_own(key);
705 }
706 let mode = if key.is_some() { "index" } else { "scan" };
707 let name = format!("ArrangementFlatMap({})", mode);
708 use timely::dataflow::operators::Operator;
709 trace
710 .stream
711 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
712 let activator = trace.stream.scope().activator_for(info.address);
714 let mut todo = std::collections::VecDeque::new();
716 move |input, output| {
717 let key = key_con.get(0);
718 input.for_each(|time, data| {
720 let capability = time.retain();
721 for batch in data.iter() {
722 todo.push_back(PendingWork::new(
724 capability.clone(),
725 batch.cursor(),
726 batch.clone(),
727 ));
728 }
729 });
730
731 let mut fuel = refuel;
733 while !todo.is_empty() && fuel > 0 {
734 todo.front_mut().unwrap().do_work(
735 key.as_ref(),
736 &mut logic,
737 &mut fuel,
738 output,
739 );
740 if fuel > 0 {
741 todo.pop_front();
742 }
743 }
744 if !todo.is_empty() {
746 activator.activate();
747 }
748 }
749 })
750 }
751
752 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
757 self.arranged.get(key).map(|x| x.clone())
758 }
759}
760
761impl<S, T> CollectionBundle<S, T>
762where
763 T: MzTimestamp,
764 S: Scope,
765 S::Timestamp: Refines<T> + RenderTimestamp,
766{
767 pub fn as_collection_core(
776 &self,
777 mut mfp: MapFilterProject,
778 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
779 until: Antichain<mz_repr::Timestamp>,
780 config_set: &ConfigSet,
781 ) -> (
782 Collection<S, mz_repr::Row, Diff>,
783 Collection<S, DataflowError, Diff>,
784 ) {
785 mfp.optimize();
786 let mfp_plan = mfp.clone().into_plan().unwrap();
787
788 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
794 true
795 } else {
796 false
797 };
798
799 if mfp_plan.is_identity() && !has_key_val {
800 let key = key_val.map(|(k, _v)| k);
801 return self.as_specific_collection(key.as_deref(), config_set);
802 }
803
804 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
805 mfp.permute_fn(|c| c, max_demand);
806 mfp.optimize();
807 let mfp_plan = mfp.into_plan().unwrap();
808
809 let mut datum_vec = DatumVec::new();
810 let until = std::rc::Rc::new(until);
812
813 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
814 let mut row_builder = SharedRow::get();
815 let until = std::rc::Rc::clone(&until);
816 let temp_storage = RowArena::new();
817 let row_iter = row_datums.iter();
818 let mut datums_local = datum_vec.borrow();
819 datums_local.extend(row_iter);
820 let time = time.clone();
821 let event_time = time.event_time();
822 mfp_plan
823 .evaluate(
824 &mut datums_local,
825 &temp_storage,
826 event_time,
827 diff.clone(),
828 move |time| !until.less_equal(time),
829 &mut row_builder,
830 )
831 .map(move |x| match x {
832 Ok((row, event_time, diff)) => {
833 let mut time: S::Timestamp = time.clone();
835 *time.event_time_mut() = event_time;
836 (Ok(row), time, diff)
837 }
838 Err((e, event_time, diff)) => {
839 let mut time: S::Timestamp = time.clone();
841 *time.event_time_mut() = event_time;
842 (Err(e), time, diff)
843 }
844 })
845 });
846
847 use differential_dataflow::AsCollection;
848 let (oks, errs) = stream
849 .as_collection()
850 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
851 "OkErr",
852 |x| x,
853 );
854
855 (oks, errors.concat(&errs))
856 }
857 pub fn ensure_collections(
858 mut self,
859 collections: AvailableCollections,
860 input_key: Option<Vec<MirScalarExpr>>,
861 input_mfp: MapFilterProject,
862 until: Antichain<mz_repr::Timestamp>,
863 config_set: &ConfigSet,
864 ) -> Self {
865 if collections == Default::default() {
866 return self;
867 }
868 for (key, _, _) in collections.arranged.iter() {
877 soft_assert_or_log!(
878 !self.arranged.contains_key(key),
879 "LIR ArrangeBy tried to create an existing arrangement"
880 );
881 }
882
883 let form_raw_collection = collections.raw
885 || collections
886 .arranged
887 .iter()
888 .any(|(key, _, _)| !self.arranged.contains_key(key));
889 if form_raw_collection && self.collection.is_none() {
890 self.collection = Some(self.as_collection_core(
891 input_mfp,
892 input_key.map(|k| (k, None)),
893 until,
894 config_set,
895 ));
896 }
897 for (key, _, thinning) in collections.arranged {
898 if !self.arranged.contains_key(&key) {
899 let name = format!("ArrangeBy[{:?}]", key);
901
902 let (oks, errs) = self
903 .collection
904 .clone()
905 .expect("Collection constructed above");
906 let (oks, errs_keyed) =
907 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
908 let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
909 let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
910 &format!("{}-errors", name),
911 );
912 self.arranged
913 .insert(key, ArrangementFlavor::Local(oks, errs));
914 }
915 }
916 self
917 }
918
919 fn arrange_collection(
926 name: &String,
927 oks: Collection<S, Row, Diff>,
928 key: Vec<MirScalarExpr>,
929 thinning: Vec<usize>,
930 ) -> (
931 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
932 Collection<S, DataflowError, Diff>,
933 ) {
934 let (oks, errs) = oks
939 .inner
940 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
941 Pipeline,
942 "FormArrangementKey",
943 move |_, _| {
944 Box::new(move |input, ok, err| {
945 let mut key_buf = Row::default();
946 let mut val_buf = Row::default();
947 let mut datums = DatumVec::new();
948 let mut temp_storage = RowArena::new();
949 while let Some((time, data)) = input.next() {
950 let mut ok_session = ok.session_with_builder(&time);
951 let mut err_session = err.session(&time);
952 for (row, time, diff) in data.iter() {
953 temp_storage.clear();
954 let datums = datums.borrow_with(row);
955 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
956 match key_buf.packer().try_extend(key_iter) {
957 Ok(()) => {
958 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
959 val_buf.packer().extend(val_datum_iter);
960 ok_session.give(((&*key_buf, &*val_buf), time, diff));
961 }
962 Err(e) => {
963 err_session.give((e.into(), time.clone(), *diff));
964 }
965 }
966 }
967 }
968 })
969 },
970 );
971 let oks = oks
972 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
973 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
974 );
975 (oks, errs.as_collection())
976 }
977}
978
979struct PendingWork<C>
980where
981 C: Cursor,
982{
983 capability: Capability<C::Time>,
984 cursor: C,
985 batch: C::Storage,
986}
987
988impl<C> PendingWork<C>
989where
990 C: Cursor<KeyOwn: PartialEq + Sized>,
991{
992 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
994 Self {
995 capability,
996 cursor,
997 batch,
998 }
999 }
1000 fn do_work<I, D, L>(
1002 &mut self,
1003 key: Option<&C::Key<'_>>,
1004 logic: &mut L,
1005 fuel: &mut usize,
1006 output: &mut OutputHandleCore<
1007 '_,
1008 C::Time,
1009 ConsolidatingContainerBuilder<Vec<I::Item>>,
1010 timely::dataflow::channels::pushers::Tee<C::Time, Vec<I::Item>>,
1011 >,
1012 ) where
1013 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
1014 D: Data,
1015 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
1016 {
1017 use differential_dataflow::consolidation::consolidate;
1018
1019 let mut work: usize = 0;
1021 let mut session = output.session_with_builder(&self.capability);
1022 let mut buffer = Vec::new();
1023 if let Some(key) = key {
1024 let key = C::KeyContainer::reborrow(*key);
1025 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
1026 self.cursor.seek_key(&self.batch, key);
1027 }
1028 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
1029 let key = self.cursor.key(&self.batch);
1030 while let Some(val) = self.cursor.get_val(&self.batch) {
1031 self.cursor.map_times(&self.batch, |time, diff| {
1032 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1033 });
1034 consolidate(&mut buffer);
1035 for (time, diff) in buffer.drain(..) {
1036 for datum in logic(key, val, time, diff) {
1037 session.give(datum);
1038 work += 1;
1039 }
1040 }
1041 self.cursor.step_val(&self.batch);
1042 if work >= *fuel {
1043 *fuel = 0;
1044 return;
1045 }
1046 }
1047 }
1048 } else {
1049 while let Some(key) = self.cursor.get_key(&self.batch) {
1050 while let Some(val) = self.cursor.get_val(&self.batch) {
1051 self.cursor.map_times(&self.batch, |time, diff| {
1052 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1053 });
1054 consolidate(&mut buffer);
1055 for (time, diff) in buffer.drain(..) {
1056 for datum in logic(key, val, time, diff) {
1057 session.give(datum);
1058 work += 1;
1059 }
1060 }
1061 self.cursor.step_val(&self.batch);
1062 if work >= *fuel {
1063 *fuel = 0;
1064 return;
1065 }
1066 }
1067 self.cursor.step_key(&self.batch);
1068 }
1069 }
1070 *fuel -= work;
1071 }
1072}