1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::scopes::Child;
40use timely::dataflow::{Scope, StreamVec};
41use timely::progress::operate::FrontierInterest;
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::ErrorLogger;
48use crate::render::{LinearJoinSpec, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
52 RowRowSpine,
53};
54
55pub struct Context<S: Scope, T = mz_repr::Timestamp>
66where
67 T: MzTimestamp,
68 S::Timestamp: MzTimestamp + Refines<T>,
69{
70 pub(crate) scope: S,
74 pub debug_name: String,
76 pub dataflow_id: usize,
78 pub export_ids: Vec<GlobalId>,
80 pub as_of_frontier: Antichain<T>,
85 pub until: Antichain<T>,
88 pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
90 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
92 pub(super) linear_join_spec: LinearJoinSpec,
94 pub dataflow_expiration: Antichain<T>,
97 pub config_set: Rc<ConfigSet>,
99}
100
101impl<S: Scope> Context<S>
102where
103 S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
104{
105 pub fn for_dataflow_in<Plan>(
107 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
108 scope: S,
109 compute_state: &ComputeState,
110 until: Antichain<mz_repr::Timestamp>,
111 dataflow_expiration: Antichain<mz_repr::Timestamp>,
112 ) -> Self {
113 use mz_ore::collections::CollectionExt as IteratorExt;
114 let dataflow_id = *scope.addr().into_first();
115 let as_of_frontier = dataflow
116 .as_of
117 .clone()
118 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
119
120 let export_ids = dataflow.export_ids().collect();
121
122 let compute_logger = if dataflow.is_transient() {
126 None
127 } else {
128 compute_state.compute_logger.clone()
129 };
130
131 Self {
132 scope,
133 debug_name: dataflow.debug_name.clone(),
134 dataflow_id,
135 export_ids,
136 as_of_frontier,
137 until,
138 bindings: BTreeMap::new(),
139 compute_logger,
140 linear_join_spec: compute_state.linear_join_spec,
141 dataflow_expiration,
142 config_set: Rc::clone(&compute_state.worker_config),
143 }
144 }
145}
146
147impl<S: Scope, T> Context<S, T>
148where
149 T: MzTimestamp,
150 S::Timestamp: MzTimestamp + Refines<T>,
151{
152 pub fn insert_id(
157 &mut self,
158 id: Id,
159 collection: CollectionBundle<S, T>,
160 ) -> Option<CollectionBundle<S, T>> {
161 self.bindings.insert(id, collection)
162 }
163 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
167 self.bindings.remove(&id)
168 }
169 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
171 if !self.bindings.contains_key(&id) {
172 self.bindings.insert(id, collection);
173 } else {
174 let binding = self
175 .bindings
176 .get_mut(&id)
177 .expect("Binding verified to exist");
178 if collection.collection.is_some() {
179 binding.collection = collection.collection;
180 }
181 for (key, flavor) in collection.arranged.into_iter() {
182 binding.arranged.insert(key, flavor);
183 }
184 }
185 }
186 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
188 self.bindings.get(&id).cloned()
189 }
190
191 pub(super) fn error_logger(&self) -> ErrorLogger {
192 ErrorLogger::new(self.debug_name.clone())
193 }
194}
195
196impl<S: Scope, T> Context<S, T>
197where
198 T: MzTimestamp,
199 S::Timestamp: MzTimestamp + Refines<T>,
200{
201 pub fn enter_region<'a>(
203 &self,
204 region: &Child<'a, S, S::Timestamp>,
205 bindings: Option<&std::collections::BTreeSet<Id>>,
206 ) -> Context<Child<'a, S, S::Timestamp>, T> {
207 let bindings = self
208 .bindings
209 .iter()
210 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
211 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
212 .collect();
213
214 Context {
215 scope: region.clone(),
216 debug_name: self.debug_name.clone(),
217 dataflow_id: self.dataflow_id.clone(),
218 export_ids: self.export_ids.clone(),
219 as_of_frontier: self.as_of_frontier.clone(),
220 until: self.until.clone(),
221 compute_logger: self.compute_logger.clone(),
222 linear_join_spec: self.linear_join_spec.clone(),
223 bindings,
224 dataflow_expiration: self.dataflow_expiration.clone(),
225 config_set: Rc::clone(&self.config_set),
226 }
227 }
228}
229
230#[derive(Clone)]
232pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
233where
234 T: MzTimestamp,
235 S::Timestamp: MzTimestamp + Refines<T>,
236{
237 Local(
239 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
240 Arranged<S, ErrAgent<S::Timestamp, Diff>>,
241 ),
242 Trace(
247 GlobalId,
248 Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
249 Arranged<S, ErrEnter<T, S::Timestamp>>,
250 ),
251}
252
253impl<S: Scope, T> ArrangementFlavor<S, T>
254where
255 T: MzTimestamp,
256 S::Timestamp: MzTimestamp + Refines<T>,
257{
258 #[deprecated(note = "Use `flat_map` instead.")]
266 pub fn as_collection(
267 &self,
268 ) -> (
269 VecCollection<S, Row, Diff>,
270 VecCollection<S, DataflowError, Diff>,
271 ) {
272 let mut datums = DatumVec::new();
273 let logic = move |k: DatumSeq, v: DatumSeq| {
274 let mut datums_borrow = datums.borrow();
275 datums_borrow.extend(k);
276 datums_borrow.extend(v);
277 SharedRow::pack(&**datums_borrow)
278 };
279 match &self {
280 ArrangementFlavor::Local(oks, errs) => (
281 oks.clone().as_collection(logic),
282 errs.clone().as_collection(|k, &()| k.clone()),
283 ),
284 ArrangementFlavor::Trace(_, oks, errs) => (
285 oks.clone().as_collection(logic),
286 errs.clone().as_collection(|k, &()| k.clone()),
287 ),
288 }
289 }
290
291 pub fn flat_map<D, I, L>(
304 &self,
305 key: Option<&Row>,
306 max_demand: usize,
307 mut logic: L,
308 ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
309 where
310 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
311 D: Data,
312 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
313 {
314 let refuel = 1000000;
318
319 let mut datums = DatumVec::new();
320 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
321 let mut datums_borrow = datums.borrow();
322 datums_borrow.extend(k.to_datum_iter().take(max_demand));
323 let max_demand = max_demand.saturating_sub(datums_borrow.len());
324 datums_borrow.extend(v.to_datum_iter().take(max_demand));
325 logic(&mut datums_borrow, t, d)
326 };
327
328 match &self {
329 ArrangementFlavor::Local(oks, errs) => {
330 let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
331 let errs = errs.clone().as_collection(|k, &()| k.clone());
332 (oks, errs)
333 }
334 ArrangementFlavor::Trace(_, oks, errs) => {
335 let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
336 let errs = errs.clone().as_collection(|k, &()| k.clone());
337 (oks, errs)
338 }
339 }
340 }
341}
342impl<S: Scope, T> ArrangementFlavor<S, T>
343where
344 T: MzTimestamp,
345 S::Timestamp: MzTimestamp + Refines<T>,
346{
347 pub fn scope(&self) -> S {
349 match self {
350 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
351 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
352 }
353 }
354
355 pub fn enter_region<'a>(
357 &self,
358 region: &Child<'a, S, S::Timestamp>,
359 ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
360 match self {
361 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
362 oks.clone().enter_region(region),
363 errs.clone().enter_region(region),
364 ),
365 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
366 *gid,
367 oks.clone().enter_region(region),
368 errs.clone().enter_region(region),
369 ),
370 }
371 }
372}
373impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
374where
375 T: MzTimestamp,
376 S::Timestamp: MzTimestamp + Refines<T>,
377{
378 pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
380 match self {
381 ArrangementFlavor::Local(oks, errs) => {
382 ArrangementFlavor::Local(oks.clone().leave_region(), errs.clone().leave_region())
383 }
384 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
385 *gid,
386 oks.clone().leave_region(),
387 errs.clone().leave_region(),
388 ),
389 }
390 }
391}
392
393#[derive(Clone)]
398pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
399where
400 T: MzTimestamp,
401 S::Timestamp: MzTimestamp + Refines<T>,
402{
403 pub collection: Option<(
404 VecCollection<S, Row, Diff>,
405 VecCollection<S, DataflowError, Diff>,
406 )>,
407 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
408}
409
410impl<S: Scope, T> CollectionBundle<S, T>
411where
412 T: MzTimestamp,
413 S::Timestamp: MzTimestamp + Refines<T>,
414{
415 pub fn from_collections(
417 oks: VecCollection<S, Row, Diff>,
418 errs: VecCollection<S, DataflowError, Diff>,
419 ) -> Self {
420 Self {
421 collection: Some((oks, errs)),
422 arranged: BTreeMap::default(),
423 }
424 }
425
426 pub fn from_expressions(
428 exprs: Vec<MirScalarExpr>,
429 arrangements: ArrangementFlavor<S, T>,
430 ) -> Self {
431 let mut arranged = BTreeMap::new();
432 arranged.insert(exprs, arrangements);
433 Self {
434 collection: None,
435 arranged,
436 }
437 }
438
439 pub fn from_columns<I: IntoIterator<Item = usize>>(
441 columns: I,
442 arrangements: ArrangementFlavor<S, T>,
443 ) -> Self {
444 let mut keys = Vec::new();
445 for column in columns {
446 keys.push(MirScalarExpr::column(column));
447 }
448 Self::from_expressions(keys, arrangements)
449 }
450
451 pub fn scope(&self) -> S {
453 if let Some((oks, _errs)) = &self.collection {
454 oks.inner.scope()
455 } else {
456 self.arranged
457 .values()
458 .next()
459 .expect("Must contain a valid collection")
460 .scope()
461 }
462 }
463
464 pub fn enter_region<'a>(
466 &self,
467 region: &Child<'a, S, S::Timestamp>,
468 ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
469 CollectionBundle {
470 collection: self.collection.as_ref().map(|(oks, errs)| {
471 (
472 oks.clone().enter_region(region),
473 errs.clone().enter_region(region),
474 )
475 }),
476 arranged: self
477 .arranged
478 .iter()
479 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
480 .collect(),
481 }
482 }
483}
484
485impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
486where
487 T: MzTimestamp,
488 S::Timestamp: MzTimestamp + Refines<T>,
489{
490 pub fn leave_region(&self) -> CollectionBundle<S, T> {
492 CollectionBundle {
493 collection: self
494 .collection
495 .as_ref()
496 .map(|(oks, errs)| (oks.clone().leave_region(), errs.clone().leave_region())),
497 arranged: self
498 .arranged
499 .iter()
500 .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
501 .collect(),
502 }
503 }
504}
505
506impl<S: Scope, T> CollectionBundle<S, T>
507where
508 T: MzTimestamp,
509 S::Timestamp: MzTimestamp + Refines<T>,
510{
511 pub fn as_specific_collection(
524 &self,
525 key: Option<&[MirScalarExpr]>,
526 config_set: &ConfigSet,
527 ) -> (
528 VecCollection<S, Row, Diff>,
529 VecCollection<S, DataflowError, Diff>,
530 ) {
531 match key {
537 None => self
538 .collection
539 .clone()
540 .expect("The unarranged collection doesn't exist."),
541 Some(key) => {
542 let arranged = self.arranged.get(key).unwrap_or_else(|| {
543 panic!("The collection arranged by {:?} doesn't exist.", key)
544 });
545 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
546 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
548 Some((SharedRow::pack(borrow.iter()), t, r))
549 });
550 (ok.as_collection(), err)
551 } else {
552 #[allow(deprecated)]
553 arranged.as_collection()
554 }
555 }
556 }
557 }
558
559 pub fn flat_map<D, I, L>(
575 &self,
576 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
577 max_demand: usize,
578 mut logic: L,
579 ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
580 where
581 I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
582 D: Data,
583 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
584 {
585 if let Some((key, val)) = key_val {
589 self.arrangement(&key)
590 .expect("Should have ensured during planning that this arrangement exists.")
591 .flat_map(val.as_ref(), max_demand, logic)
592 } else {
593 use timely::dataflow::operators::vec::Map;
594 let (oks, errs) = self
595 .collection
596 .clone()
597 .expect("Invariant violated: CollectionBundle contains no collection.");
598 let mut datums = DatumVec::new();
599 let oks = oks.inner.flat_map(move |(v, t, d)| {
600 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
601 });
602 (oks, errs)
603 }
604 }
605
606 fn flat_map_core<Tr, D, I, L>(
614 trace: Arranged<S, Tr>,
615 key: Option<&Tr::KeyOwn>,
616 mut logic: L,
617 refuel: usize,
618 ) -> StreamVec<S, I::Item>
619 where
620 Tr: for<'a> TraceReader<
621 Key<'a>: ToDatumIter,
622 KeyOwn: PartialEq,
623 Val<'a>: ToDatumIter,
624 Time = S::Timestamp,
625 Diff = mz_repr::Diff,
626 > + Clone
627 + 'static,
628 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
629 D: Data,
630 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
631 {
632 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
633 let scope = trace.stream.scope();
634
635 let mut key_con = Tr::KeyContainer::with_capacity(1);
636 if let Some(key) = &key {
637 key_con.push_own(key);
638 }
639 let mode = if key.is_some() { "index" } else { "scan" };
640 let name = format!("ArrangementFlatMap({})", mode);
641 use timely::dataflow::operators::Operator;
642 trace
643 .stream
644 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
645 let activator = scope.activator_for(info.address);
647 let mut todo = std::collections::VecDeque::new();
649 move |input, output| {
650 let key = key_con.get(0);
651 input.for_each(|time, data| {
653 let capability = time.retain(0);
654 for batch in data.iter() {
655 todo.push_back(PendingWork::new(
657 capability.clone(),
658 batch.cursor(),
659 batch.clone(),
660 ));
661 }
662 });
663
664 let mut fuel = refuel;
666 while !todo.is_empty() && fuel > 0 {
667 todo.front_mut().unwrap().do_work(
668 key.as_ref(),
669 &mut logic,
670 &mut fuel,
671 output,
672 );
673 if fuel > 0 {
674 todo.pop_front();
675 }
676 }
677 if !todo.is_empty() {
679 activator.activate();
680 }
681 }
682 })
683 }
684
685 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
690 self.arranged.get(key).map(|x| x.clone())
691 }
692}
693
694impl<S, T> CollectionBundle<S, T>
695where
696 T: MzTimestamp,
697 S: Scope,
698 S::Timestamp: Refines<T> + RenderTimestamp,
699{
700 pub fn as_collection_core(
709 &self,
710 mut mfp: MapFilterProject,
711 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
712 until: Antichain<mz_repr::Timestamp>,
713 config_set: &ConfigSet,
714 ) -> (
715 VecCollection<S, mz_repr::Row, Diff>,
716 VecCollection<S, DataflowError, Diff>,
717 ) {
718 mfp.optimize();
719 let mfp_plan = mfp.clone().into_plan().unwrap();
720
721 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
727 true
728 } else {
729 false
730 };
731
732 if mfp_plan.is_identity() && !has_key_val {
733 let key = key_val.map(|(k, _v)| k);
734 return self.as_specific_collection(key.as_deref(), config_set);
735 }
736
737 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
738 mfp.permute_fn(|c| c, max_demand);
739 mfp.optimize();
740 let mfp_plan = mfp.into_plan().unwrap();
741
742 let mut datum_vec = DatumVec::new();
743 let until = std::rc::Rc::new(until);
745
746 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
747 let mut row_builder = SharedRow::get();
748 let until = std::rc::Rc::clone(&until);
749 let temp_storage = RowArena::new();
750 let row_iter = row_datums.iter();
751 let mut datums_local = datum_vec.borrow();
752 datums_local.extend(row_iter);
753 let time = time.clone();
754 let event_time = time.event_time();
755 mfp_plan
756 .evaluate(
757 &mut datums_local,
758 &temp_storage,
759 event_time,
760 diff.clone(),
761 move |time| !until.less_equal(time),
762 &mut row_builder,
763 )
764 .map(move |x| match x {
765 Ok((row, event_time, diff)) => {
766 let mut time: S::Timestamp = time.clone();
768 *time.event_time_mut() = event_time;
769 (Ok(row), time, diff)
770 }
771 Err((e, event_time, diff)) => {
772 let mut time: S::Timestamp = time.clone();
774 *time.event_time_mut() = event_time;
775 (Err(e), time, diff)
776 }
777 })
778 });
779
780 use differential_dataflow::AsCollection;
781 let (oks, errs) = stream
782 .as_collection()
783 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
784 "OkErr",
785 |x| x,
786 );
787
788 (oks, errors.concat(errs))
789 }
790 pub fn ensure_collections(
791 mut self,
792 collections: AvailableCollections,
793 input_key: Option<Vec<MirScalarExpr>>,
794 input_mfp: MapFilterProject,
795 until: Antichain<mz_repr::Timestamp>,
796 config_set: &ConfigSet,
797 ) -> Self {
798 if collections == Default::default() {
799 return self;
800 }
801 for (key, _, _) in collections.arranged.iter() {
810 soft_assert_or_log!(
811 !self.arranged.contains_key(key),
812 "LIR ArrangeBy tried to create an existing arrangement"
813 );
814 }
815
816 let form_raw_collection = collections.raw
818 || collections
819 .arranged
820 .iter()
821 .any(|(key, _, _)| !self.arranged.contains_key(key));
822 if form_raw_collection && self.collection.is_none() {
823 self.collection = Some(self.as_collection_core(
824 input_mfp,
825 input_key.map(|k| (k, None)),
826 until,
827 config_set,
828 ));
829 }
830 for (key, _, thinning) in collections.arranged {
831 if !self.arranged.contains_key(&key) {
832 let name = format!("ArrangeBy[{:?}]", key);
834
835 let (oks, errs) = self
836 .collection
837 .take()
838 .expect("Collection constructed above");
839 let (oks, errs_keyed, passthrough) =
840 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
841 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
842 self.collection = Some((passthrough, errs));
843 let errs =
844 errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
845 &format!("{}-errors", name),
846 );
847 self.arranged
848 .insert(key, ArrangementFlavor::Local(oks, errs));
849 }
850 }
851 self
852 }
853
854 fn arrange_collection(
865 name: &String,
866 oks: VecCollection<S, Row, Diff>,
867 key: Vec<MirScalarExpr>,
868 thinning: Vec<usize>,
869 ) -> (
870 Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
871 VecCollection<S, DataflowError, Diff>,
872 VecCollection<S, Row, Diff>,
873 ) {
874 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
879 let (ok_output, ok_stream) = builder.new_output();
880 let mut ok_output =
881 OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output);
882 let (err_output, err_stream) = builder.new_output();
883 let mut err_output = OutputBuilder::from(err_output);
884 let (passthrough_output, passthrough_stream) = builder.new_output();
885 let mut passthrough_output = OutputBuilder::from(passthrough_output);
886 let mut input = builder.new_input(oks.inner, Pipeline);
887 builder.set_notify_for(0, FrontierInterest::Never);
888 builder.build(move |_capabilities| {
889 let mut key_buf = Row::default();
890 let mut val_buf = Row::default();
891 let mut datums = DatumVec::new();
892 let mut temp_storage = RowArena::new();
893 move |_frontiers| {
894 let mut ok_output = ok_output.activate();
895 let mut err_output = err_output.activate();
896 let mut passthrough_output = passthrough_output.activate();
897 input.for_each(|time, data| {
898 let mut ok_session = ok_output.session_with_builder(&time);
899 let mut err_session = err_output.session(&time);
900 for (row, time, diff) in data.iter() {
901 temp_storage.clear();
902 let datums = datums.borrow_with(row);
903 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
904 match key_buf.packer().try_extend(key_iter) {
905 Ok(()) => {
906 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
907 val_buf.packer().extend(val_datum_iter);
908 ok_session.give(((&*key_buf, &*val_buf), time, diff));
909 }
910 Err(e) => {
911 err_session.give((e.into(), time.clone(), *diff));
912 }
913 }
914 }
915 passthrough_output.session(&time).give_container(data);
916 });
917 }
918 });
919
920 let oks = ok_stream
921 .mz_arrange_core::<
922 _,
923 Col2ValBatcher<_, _, _, _>,
924 RowRowBuilder<_, _>,
925 RowRowSpine<_, _>,
926 >(
927 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
928 columnar_exchange::<Row, Row, S::Timestamp, Diff>,
929 ),
930 name
931 );
932 (
933 oks,
934 err_stream.as_collection(),
935 passthrough_stream.as_collection(),
936 )
937 }
938}
939
940struct PendingWork<C>
941where
942 C: Cursor,
943{
944 capability: Capability<C::Time>,
945 cursor: C,
946 batch: C::Storage,
947}
948
949impl<C> PendingWork<C>
950where
951 C: Cursor<KeyOwn: PartialEq + Sized>,
952{
953 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
955 Self {
956 capability,
957 cursor,
958 batch,
959 }
960 }
961 fn do_work<I, D, L>(
963 &mut self,
964 key: Option<&C::Key<'_>>,
965 logic: &mut L,
966 fuel: &mut usize,
967 output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
968 ) where
969 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
970 D: Data,
971 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
972 {
973 use differential_dataflow::consolidation::consolidate;
974
975 let mut work: usize = 0;
977 let mut session = output.session_with_builder(&self.capability);
978 let mut buffer = Vec::new();
979 if let Some(key) = key {
980 let key = C::KeyContainer::reborrow(*key);
981 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
982 self.cursor.seek_key(&self.batch, key);
983 }
984 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
985 let key = self.cursor.key(&self.batch);
986 while let Some(val) = self.cursor.get_val(&self.batch) {
987 self.cursor.map_times(&self.batch, |time, diff| {
988 buffer.push((C::owned_time(time), C::owned_diff(diff)));
989 });
990 consolidate(&mut buffer);
991 for (time, diff) in buffer.drain(..) {
992 for datum in logic(key, val, time, diff) {
993 session.give(datum);
994 work += 1;
995 }
996 }
997 self.cursor.step_val(&self.batch);
998 if work >= *fuel {
999 *fuel = 0;
1000 return;
1001 }
1002 }
1003 }
1004 } else {
1005 while let Some(key) = self.cursor.get_key(&self.batch) {
1006 while let Some(val) = self.cursor.get_val(&self.batch) {
1007 self.cursor.map_times(&self.batch, |time, diff| {
1008 buffer.push((C::owned_time(time), C::owned_diff(diff)));
1009 });
1010 consolidate(&mut buffer);
1011 for (time, diff) in buffer.drain(..) {
1012 for datum in logic(key, val, time, diff) {
1013 session.give(datum);
1014 work += 1;
1015 }
1016 }
1017 self.cursor.step_val(&self.batch);
1018 if work >= *fuel {
1019 *fuel = 0;
1020 return;
1021 }
1022 }
1023 self.cursor.step_key(&self.batch);
1024 }
1025 }
1026 *fuel -= work;
1027 }
1028}