1use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::{Scope, StreamVec};
40use timely::progress::operate::FrontierInterest;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::compute_state::ComputeState;
44use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
45use crate::render::errors::ErrorLogger;
46use crate::render::{LinearJoinSpec, RenderTimestamp};
47use crate::row_spine::{DatumSeq, RowRowBuilder};
48use crate::typedefs::{
49 ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
50};
51
52pub struct Context<'scope, T: RenderTimestamp> {
60 pub(crate) scope: Scope<'scope, T>,
64 pub debug_name: String,
66 pub dataflow_id: usize,
68 pub export_ids: Vec<GlobalId>,
70 pub as_of_frontier: Antichain<mz_repr::Timestamp>,
75 pub until: Antichain<mz_repr::Timestamp>,
78 pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
80 pub(super) compute_logger: Option<crate::logging::compute::Logger>,
82 pub(super) linear_join_spec: LinearJoinSpec,
84 pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
87 pub config_set: Rc<ConfigSet>,
89}
90
91impl<'scope, T: RenderTimestamp> Context<'scope, T> {
92 pub fn for_dataflow_in<Plan>(
94 dataflow: &DataflowDescription<Plan, CollectionMetadata>,
95 scope: Scope<'scope, T>,
96 compute_state: &ComputeState,
97 until: Antichain<mz_repr::Timestamp>,
98 dataflow_expiration: Antichain<mz_repr::Timestamp>,
99 ) -> Self {
100 use mz_ore::collections::CollectionExt as IteratorExt;
101 let dataflow_id = *scope.addr().into_first();
102 let as_of_frontier = dataflow
103 .as_of
104 .clone()
105 .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
106
107 let export_ids = dataflow.export_ids().collect();
108
109 let compute_logger = if dataflow.is_transient() {
113 None
114 } else {
115 compute_state.compute_logger.clone()
116 };
117
118 Self {
119 scope,
120 debug_name: dataflow.debug_name.clone(),
121 dataflow_id,
122 export_ids,
123 as_of_frontier,
124 until,
125 bindings: BTreeMap::new(),
126 compute_logger,
127 linear_join_spec: compute_state.linear_join_spec,
128 dataflow_expiration,
129 config_set: Rc::clone(&compute_state.worker_config),
130 }
131 }
132}
133
134impl<'scope, T: RenderTimestamp> Context<'scope, T> {
135 pub fn insert_id(
140 &mut self,
141 id: Id,
142 collection: CollectionBundle<'scope, T>,
143 ) -> Option<CollectionBundle<'scope, T>> {
144 self.bindings.insert(id, collection)
145 }
146 pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
150 self.bindings.remove(&id)
151 }
152 pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
154 if !self.bindings.contains_key(&id) {
155 self.bindings.insert(id, collection);
156 } else {
157 let binding = self
158 .bindings
159 .get_mut(&id)
160 .expect("Binding verified to exist");
161 if collection.collection.is_some() {
162 binding.collection = collection.collection;
163 }
164 for (key, flavor) in collection.arranged.into_iter() {
165 binding.arranged.insert(key, flavor);
166 }
167 }
168 }
169 pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
171 self.bindings.get(&id).cloned()
172 }
173
174 pub(super) fn error_logger(&self) -> ErrorLogger {
175 ErrorLogger::new(self.debug_name.clone())
176 }
177}
178
179impl<'scope, T: RenderTimestamp> Context<'scope, T> {
180 pub fn enter_region<'a>(
182 &self,
183 region: Scope<'a, T>,
184 bindings: Option<&std::collections::BTreeSet<Id>>,
185 ) -> Context<'a, T> {
186 let bindings = self
187 .bindings
188 .iter()
189 .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
190 .map(|(key, bundle)| (*key, bundle.enter_region(region)))
191 .collect();
192
193 Context {
194 scope: region,
195 debug_name: self.debug_name.clone(),
196 dataflow_id: self.dataflow_id.clone(),
197 export_ids: self.export_ids.clone(),
198 as_of_frontier: self.as_of_frontier.clone(),
199 until: self.until.clone(),
200 compute_logger: self.compute_logger.clone(),
201 linear_join_spec: self.linear_join_spec.clone(),
202 bindings,
203 dataflow_expiration: self.dataflow_expiration.clone(),
204 config_set: Rc::clone(&self.config_set),
205 }
206 }
207}
208
209#[derive(Clone)]
211pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
212 Local(
214 Arranged<'scope, RowRowAgent<T, Diff>>,
215 Arranged<'scope, ErrAgent<T, Diff>>,
216 ),
217 Trace(
222 GlobalId,
223 Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
224 Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
225 ),
226}
227
228impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
229 #[deprecated(note = "Use `flat_map` instead.")]
237 pub fn as_collection(
238 &self,
239 ) -> (
240 VecCollection<'scope, T, Row, Diff>,
241 VecCollection<'scope, T, DataflowError, Diff>,
242 ) {
243 let mut datums = DatumVec::new();
244 let logic = move |k: DatumSeq, v: DatumSeq| {
245 let mut datums_borrow = datums.borrow();
246 datums_borrow.extend(k);
247 datums_borrow.extend(v);
248 SharedRow::pack(&**datums_borrow)
249 };
250 match &self {
251 ArrangementFlavor::Local(oks, errs) => (
252 oks.clone().as_collection(logic),
253 errs.clone().as_collection(|k, &()| k.clone()),
254 ),
255 ArrangementFlavor::Trace(_, oks, errs) => (
256 oks.clone().as_collection(logic),
257 errs.clone().as_collection(|k, &()| k.clone()),
258 ),
259 }
260 }
261
262 pub fn flat_map<D, I, L>(
275 &self,
276 key: Option<&Row>,
277 max_demand: usize,
278 mut logic: L,
279 ) -> (
280 StreamVec<'scope, T, I::Item>,
281 VecCollection<'scope, T, DataflowError, Diff>,
282 )
283 where
284 I: IntoIterator<Item = (D, T, Diff)>,
285 D: Data,
286 L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff) -> I + 'static,
287 {
288 let refuel = 1000000;
292
293 let mut datums = DatumVec::new();
294 let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
295 let mut datums_borrow = datums.borrow();
296 datums_borrow.extend(k.to_datum_iter().take(max_demand));
297 let max_demand = max_demand.saturating_sub(datums_borrow.len());
298 datums_borrow.extend(v.to_datum_iter().take(max_demand));
299 logic(&mut datums_borrow, t, d)
300 };
301
302 match &self {
303 ArrangementFlavor::Local(oks, errs) => {
304 let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
305 let errs = errs.clone().as_collection(|k, &()| k.clone());
306 (oks, errs)
307 }
308 ArrangementFlavor::Trace(_, oks, errs) => {
309 let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
310 let errs = errs.clone().as_collection(|k, &()| k.clone());
311 (oks, errs)
312 }
313 }
314 }
315}
316impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
317 pub fn scope(&self) -> Scope<'scope, T> {
319 match self {
320 ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
321 ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
322 }
323 }
324
325 pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
327 match self {
328 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
329 oks.clone().enter_region(region),
330 errs.clone().enter_region(region),
331 ),
332 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
333 *gid,
334 oks.clone().enter_region(region),
335 errs.clone().enter_region(region),
336 ),
337 }
338 }
339}
340impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
341 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
343 match self {
344 ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
345 oks.clone().leave_region(outer),
346 errs.clone().leave_region(outer),
347 ),
348 ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
349 *gid,
350 oks.clone().leave_region(outer),
351 errs.clone().leave_region(outer),
352 ),
353 }
354 }
355}
356
357#[derive(Clone)]
362pub struct CollectionBundle<'scope, T: RenderTimestamp> {
363 pub collection: Option<(
364 VecCollection<'scope, T, Row, Diff>,
365 VecCollection<'scope, T, DataflowError, Diff>,
366 )>,
367 pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
368}
369
370impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
371 pub fn from_collections(
373 oks: VecCollection<'scope, T, Row, Diff>,
374 errs: VecCollection<'scope, T, DataflowError, Diff>,
375 ) -> Self {
376 Self {
377 collection: Some((oks, errs)),
378 arranged: BTreeMap::default(),
379 }
380 }
381
382 pub fn from_expressions(
384 exprs: Vec<MirScalarExpr>,
385 arrangements: ArrangementFlavor<'scope, T>,
386 ) -> Self {
387 let mut arranged = BTreeMap::new();
388 arranged.insert(exprs, arrangements);
389 Self {
390 collection: None,
391 arranged,
392 }
393 }
394
395 pub fn from_columns<I: IntoIterator<Item = usize>>(
397 columns: I,
398 arrangements: ArrangementFlavor<'scope, T>,
399 ) -> Self {
400 let mut keys = Vec::new();
401 for column in columns {
402 keys.push(MirScalarExpr::column(column));
403 }
404 Self::from_expressions(keys, arrangements)
405 }
406
407 pub fn scope(&self) -> Scope<'scope, T> {
409 if let Some((oks, _errs)) = &self.collection {
410 oks.inner.scope()
411 } else {
412 self.arranged
413 .values()
414 .next()
415 .expect("Must contain a valid collection")
416 .scope()
417 }
418 }
419
420 pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
422 CollectionBundle {
423 collection: self.collection.as_ref().map(|(oks, errs)| {
424 (
425 oks.clone().enter_region(region),
426 errs.clone().enter_region(region),
427 )
428 }),
429 arranged: self
430 .arranged
431 .iter()
432 .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
433 .collect(),
434 }
435 }
436}
437
438impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
439 pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
441 CollectionBundle {
442 collection: self.collection.as_ref().map(|(oks, errs)| {
443 (
444 oks.clone().leave_region(outer),
445 errs.clone().leave_region(outer),
446 )
447 }),
448 arranged: self
449 .arranged
450 .iter()
451 .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
452 .collect(),
453 }
454 }
455}
456
457impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
458 pub fn as_specific_collection(
471 &self,
472 key: Option<&[MirScalarExpr]>,
473 config_set: &ConfigSet,
474 ) -> (
475 VecCollection<'scope, T, Row, Diff>,
476 VecCollection<'scope, T, DataflowError, Diff>,
477 ) {
478 match key {
484 None => self
485 .collection
486 .clone()
487 .expect("The unarranged collection doesn't exist."),
488 Some(key) => {
489 let arranged = self.arranged.get(key).unwrap_or_else(|| {
490 panic!("The collection arranged by {:?} doesn't exist.", key)
491 });
492 if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
493 let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
495 Some((SharedRow::pack(borrow.iter()), t, r))
496 });
497 (ok.as_collection(), err)
498 } else {
499 #[allow(deprecated)]
500 arranged.as_collection()
501 }
502 }
503 }
504 }
505
506 pub fn flat_map<D, I, L>(
522 &self,
523 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
524 max_demand: usize,
525 mut logic: L,
526 ) -> (
527 StreamVec<'scope, T, I::Item>,
528 VecCollection<'scope, T, DataflowError, Diff>,
529 )
530 where
531 I: IntoIterator<Item = (D, T, Diff)>,
532 D: Data,
533 L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static,
534 {
535 if let Some((key, val)) = key_val {
539 self.arrangement(&key)
540 .expect("Should have ensured during planning that this arrangement exists.")
541 .flat_map(val.as_ref(), max_demand, logic)
542 } else {
543 use timely::dataflow::operators::vec::Map;
544 let (oks, errs) = self
545 .collection
546 .clone()
547 .expect("Invariant violated: CollectionBundle contains no collection.");
548 let mut datums = DatumVec::new();
549 let oks = oks.inner.flat_map(move |(v, t, d)| {
550 logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
551 });
552 (oks, errs)
553 }
554 }
555
556 fn flat_map_core<Tr, D, I, L>(
564 trace: Arranged<'scope, Tr>,
565 key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
566 mut logic: L,
567 refuel: usize,
568 ) -> StreamVec<'scope, T, I::Item>
569 where
570 Tr: for<'a> TraceReader<
571 Key<'a>: ToDatumIter,
572 Val<'a>: ToDatumIter,
573 Time = T,
574 Diff = mz_repr::Diff,
575 > + Clone
576 + 'static,
577 <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
578 I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
579 D: Data,
580 L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff) -> I + 'static,
581 {
582 use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
583 let scope = trace.stream.scope();
584
585 let mut key_con = Tr::KeyContainer::with_capacity(1);
586 if let Some(key) = &key {
587 key_con.push_own(key);
588 }
589 let mode = if key.is_some() { "index" } else { "scan" };
590 let name = format!("ArrangementFlatMap({})", mode);
591 use timely::dataflow::operators::Operator;
592 trace
593 .stream
594 .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
595 let activator = scope.activator_for(info.address);
597 let mut todo = std::collections::VecDeque::new();
599 move |input, output| {
600 let key = key_con.get(0);
601 input.for_each(|time, data| {
603 let capability = time.retain(0);
604 for batch in data.iter() {
605 todo.push_back(PendingWork::new(
607 capability.clone(),
608 batch.cursor(),
609 batch.clone(),
610 ));
611 }
612 });
613
614 let mut fuel = refuel;
616 while !todo.is_empty() && fuel > 0 {
617 todo.front_mut().unwrap().do_work(
618 key.as_ref(),
619 &mut logic,
620 &mut fuel,
621 output,
622 );
623 if fuel > 0 {
624 todo.pop_front();
625 }
626 }
627 if !todo.is_empty() {
629 activator.activate();
630 }
631 }
632 })
633 }
634
635 pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
640 self.arranged.get(key).map(|x| x.clone())
641 }
642}
643
644impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
645 pub fn as_collection_core(
654 &self,
655 mut mfp: MapFilterProject,
656 key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
657 until: Antichain<mz_repr::Timestamp>,
658 config_set: &ConfigSet,
659 ) -> (
660 VecCollection<'scope, T, mz_repr::Row, Diff>,
661 VecCollection<'scope, T, DataflowError, Diff>,
662 ) {
663 mfp.optimize();
664 let mfp_plan = mfp.clone().into_plan().unwrap();
665
666 let has_key_val = if let Some((_key, Some(_val))) = &key_val {
672 true
673 } else {
674 false
675 };
676
677 if mfp_plan.is_identity() && !has_key_val {
678 let key = key_val.map(|(k, _v)| k);
679 return self.as_specific_collection(key.as_deref(), config_set);
680 }
681
682 let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
683 mfp.permute_fn(|c| c, max_demand);
684 mfp.optimize();
685 let mfp_plan = mfp.into_plan().unwrap();
686
687 let mut datum_vec = DatumVec::new();
688 let until = std::rc::Rc::new(until);
690
691 let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
692 let mut row_builder = SharedRow::get();
693 let until = std::rc::Rc::clone(&until);
694 let temp_storage = RowArena::new();
695 let row_iter = row_datums.iter();
696 let mut datums_local = datum_vec.borrow();
697 datums_local.extend(row_iter);
698 let time = time.clone();
699 let event_time = time.event_time();
700 mfp_plan
701 .evaluate(
702 &mut datums_local,
703 &temp_storage,
704 event_time,
705 diff.clone(),
706 move |time| !until.less_equal(time),
707 &mut row_builder,
708 )
709 .map(move |x| match x {
710 Ok((row, event_time, diff)) => {
711 let mut time: T = time.clone();
713 *time.event_time_mut() = event_time;
714 (Ok(row), time, diff)
715 }
716 Err((e, event_time, diff)) => {
717 let mut time: T = time.clone();
719 *time.event_time_mut() = event_time;
720 (Err(e), time, diff)
721 }
722 })
723 });
724
725 use differential_dataflow::AsCollection;
726 let (oks, errs) = stream
727 .as_collection()
728 .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
729 "OkErr",
730 |x| x,
731 );
732
733 (oks, errors.concat(errs))
734 }
735 pub fn ensure_collections(
736 mut self,
737 collections: AvailableCollections,
738 input_key: Option<Vec<MirScalarExpr>>,
739 input_mfp: MapFilterProject,
740 until: Antichain<mz_repr::Timestamp>,
741 config_set: &ConfigSet,
742 ) -> Self {
743 if collections == Default::default() {
744 return self;
745 }
746 for (key, _, _) in collections.arranged.iter() {
755 soft_assert_or_log!(
756 !self.arranged.contains_key(key),
757 "LIR ArrangeBy tried to create an existing arrangement"
758 );
759 }
760
761 let form_raw_collection = collections.raw
763 || collections
764 .arranged
765 .iter()
766 .any(|(key, _, _)| !self.arranged.contains_key(key));
767 if form_raw_collection && self.collection.is_none() {
768 self.collection = Some(self.as_collection_core(
769 input_mfp,
770 input_key.map(|k| (k, None)),
771 until,
772 config_set,
773 ));
774 }
775 for (key, _, thinning) in collections.arranged {
776 if !self.arranged.contains_key(&key) {
777 let name = format!("ArrangeBy[{:?}]", key);
779
780 let (oks, errs) = self
781 .collection
782 .take()
783 .expect("Collection constructed above");
784 let (oks, errs_keyed, passthrough) =
785 Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
786 let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
787 self.collection = Some((passthrough, errs));
788 let errs =
789 errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
790 &format!("{}-errors", name),
791 );
792 self.arranged
793 .insert(key, ArrangementFlavor::Local(oks, errs));
794 }
795 }
796 self
797 }
798
799 fn arrange_collection(
810 name: &String,
811 oks: VecCollection<'scope, T, Row, Diff>,
812 key: Vec<MirScalarExpr>,
813 thinning: Vec<usize>,
814 ) -> (
815 Arranged<'scope, RowRowAgent<T, Diff>>,
816 VecCollection<'scope, T, DataflowError, Diff>,
817 VecCollection<'scope, T, Row, Diff>,
818 ) {
819 let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
824 let (ok_output, ok_stream) = builder.new_output();
825 let mut ok_output =
826 OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
827 let (err_output, err_stream) = builder.new_output();
828 let mut err_output = OutputBuilder::from(err_output);
829 let (passthrough_output, passthrough_stream) = builder.new_output();
830 let mut passthrough_output = OutputBuilder::from(passthrough_output);
831 let mut input = builder.new_input(oks.inner, Pipeline);
832 builder.set_notify_for(0, FrontierInterest::Never);
833 builder.build(move |_capabilities| {
834 let mut key_buf = Row::default();
835 let mut val_buf = Row::default();
836 let mut datums = DatumVec::new();
837 let mut temp_storage = RowArena::new();
838 move |_frontiers| {
839 let mut ok_output = ok_output.activate();
840 let mut err_output = err_output.activate();
841 let mut passthrough_output = passthrough_output.activate();
842 input.for_each(|time, data| {
843 let mut ok_session = ok_output.session_with_builder(&time);
844 let mut err_session = err_output.session(&time);
845 for (row, time, diff) in data.iter() {
846 temp_storage.clear();
847 let datums = datums.borrow_with(row);
848 let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
849 match key_buf.packer().try_extend(key_iter) {
850 Ok(()) => {
851 let val_datum_iter = thinning.iter().map(|c| datums[*c]);
852 val_buf.packer().extend(val_datum_iter);
853 ok_session.give(((&*key_buf, &*val_buf), time, diff));
854 }
855 Err(e) => {
856 err_session.give((e.into(), time.clone(), *diff));
857 }
858 }
859 }
860 passthrough_output.session(&time).give_container(data);
861 });
862 }
863 });
864
865 let oks = ok_stream
866 .mz_arrange_core::<
867 _,
868 Col2ValBatcher<_, _, _, _>,
869 RowRowBuilder<_, _>,
870 RowRowSpine<_, _>,
871 >(
872 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
873 columnar_exchange::<Row, Row, T, Diff>,
874 ),
875 name
876 );
877 (
878 oks,
879 err_stream.as_collection(),
880 passthrough_stream.as_collection(),
881 )
882 }
883}
884
885struct PendingWork<C>
886where
887 C: Cursor,
888{
889 capability: Capability<C::Time>,
890 cursor: C,
891 batch: C::Storage,
892}
893
894impl<C> PendingWork<C>
895where
896 C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
897{
898 fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
900 Self {
901 capability,
902 cursor,
903 batch,
904 }
905 }
906 fn do_work<I, D, L>(
908 &mut self,
909 key: Option<&C::Key<'_>>,
910 logic: &mut L,
911 fuel: &mut usize,
912 output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
913 ) where
914 I: IntoIterator<Item = (D, C::Time, C::Diff)>,
915 D: Data,
916 L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
917 {
918 use differential_dataflow::consolidation::consolidate;
919
920 let mut work: usize = 0;
922 let mut session = output.session_with_builder(&self.capability);
923 let mut buffer = Vec::new();
924 if let Some(key) = key {
925 let key = C::KeyContainer::reborrow(*key);
926 if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
927 self.cursor.seek_key(&self.batch, key);
928 }
929 if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
930 let key = self.cursor.key(&self.batch);
931 while let Some(val) = self.cursor.get_val(&self.batch) {
932 self.cursor.map_times(&self.batch, |time, diff| {
933 buffer.push((C::owned_time(time), C::owned_diff(diff)));
934 });
935 consolidate(&mut buffer);
936 for (time, diff) in buffer.drain(..) {
937 for datum in logic(key, val, time, diff) {
938 session.give(datum);
939 work += 1;
940 }
941 }
942 self.cursor.step_val(&self.batch);
943 if work >= *fuel {
944 *fuel = 0;
945 return;
946 }
947 }
948 }
949 } else {
950 while let Some(key) = self.cursor.get_key(&self.batch) {
951 while let Some(val) = self.cursor.get_val(&self.batch) {
952 self.cursor.map_times(&self.batch, |time, diff| {
953 buffer.push((C::owned_time(time), C::owned_diff(diff)));
954 });
955 consolidate(&mut buffer);
956 for (time, diff) in buffer.drain(..) {
957 for datum in logic(key, val, time, diff) {
958 session.give(datum);
959 work += 1;
960 }
961 }
962 self.cursor.step_val(&self.batch);
963 if work >= *fuel {
964 *fuel = 0;
965 return;
966 }
967 }
968 self.cursor.step_key(&self.batch);
969 }
970 }
971 *fuel -= work;
972 }
973}