1use std::time::{Duration, Instant};
15
16use columnar::Columnar;
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::containers::Columnation;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::operators::arrange::arrangement::Arranged;
21use differential_dataflow::trace::TraceReader;
22use differential_dataflow::{AsCollection, Collection, Data};
23use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
26use mz_dyncfg::ConfigSet;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
29use mz_storage_types::errors::DataflowError;
30use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange};
31use mz_timely_util::operator::{CollectionExt, StreamExt};
32use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
33use timely::dataflow::operators::OkErr;
34use timely::dataflow::scopes::Child;
35use timely::dataflow::{Scope, ScopeParent};
36use timely::progress::timestamp::{Refines, Timestamp};
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::render::RenderTimestamp;
40use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};
41use crate::render::join::mz_join_core::mz_join_core;
42use crate::row_spine::{RowRowBuilder, RowRowSpine};
43use crate::typedefs::{RowRowAgent, RowRowEnter};
44
45#[derive(Clone, Copy)]
49enum LinearJoinImpl {
50 Materialize,
51 DifferentialDataflow,
52}
53
54#[derive(Clone, Copy)]
61pub struct LinearJoinSpec {
62 implementation: LinearJoinImpl,
63 yielding: YieldSpec,
64}
65
66impl Default for LinearJoinSpec {
67 fn default() -> Self {
68 Self {
69 implementation: LinearJoinImpl::Materialize,
70 yielding: Default::default(),
71 }
72 }
73}
74
75impl LinearJoinSpec {
76 pub fn from_config(config: &ConfigSet) -> Self {
78 let implementation = match ENABLE_MZ_JOIN_CORE.get(config) {
79 true => LinearJoinImpl::Materialize,
80 false => LinearJoinImpl::DifferentialDataflow,
81 };
82
83 let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
84 let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
85 tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
86 YieldSpec::default()
87 });
88
89 Self {
90 implementation,
91 yielding,
92 }
93 }
94
95 fn render<G, Tr1, Tr2, L, I>(
97 &self,
98 arranged1: &Arranged<G, Tr1>,
99 arranged2: &Arranged<G, Tr2>,
100 shutdown_token: ShutdownToken,
101 result: L,
102 ) -> Collection<G, I::Item, Diff>
103 where
104 G: Scope,
105 G::Timestamp: Lattice,
106 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
107 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
108 + Clone
109 + 'static,
110 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
111 I: IntoIterator,
112 I::Item: Data,
113 {
114 use LinearJoinImpl::*;
115
116 match (
117 self.implementation,
118 self.yielding.after_work,
119 self.yielding.after_time,
120 ) {
121 (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
122 (Materialize, Some(work_limit), Some(time_limit)) => {
123 let yield_fn =
124 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
125 mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
126 }
127 (Materialize, Some(work_limit), None) => {
128 let yield_fn = move |_start, work| work >= work_limit;
129 mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
130 }
131 (Materialize, None, Some(time_limit)) => {
132 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
133 mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
134 }
135 (Materialize, None, None) => {
136 let yield_fn = |_start, _work| false;
137 mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
138 }
139 }
140 }
141}
142
143#[derive(Clone, Copy)]
145struct YieldSpec {
146 after_work: Option<usize>,
148 after_time: Option<Duration>,
150}
151
152impl Default for YieldSpec {
153 fn default() -> Self {
154 Self {
155 after_work: Some(1_000_000),
156 after_time: Some(Duration::from_millis(100)),
157 }
158 }
159}
160
161impl YieldSpec {
162 fn try_from_str(s: &str) -> Option<Self> {
163 let mut after_work = None;
164 let mut after_time = None;
165
166 let options = s.split(',').map(|o| o.trim());
167 for option in options {
168 let mut iter = option.split(':').map(|p| p.trim());
169 match std::array::from_fn(|_| iter.next()) {
170 [Some("work"), Some(amount), None] => {
171 let amount = amount.parse().ok()?;
172 after_work = Some(amount);
173 }
174 [Some("time"), Some(millis), None] => {
175 let millis = millis.parse().ok()?;
176 let duration = Duration::from_millis(millis);
177 after_time = Some(duration);
178 }
179 _ => return None,
180 }
181 }
182
183 Some(Self {
184 after_work,
185 after_time,
186 })
187 }
188}
189
190enum JoinedFlavor<G, T>
192where
193 G: Scope,
194 G::Timestamp: Lattice + Refines<T> + Columnation,
195 T: Timestamp + Lattice + Columnation,
196{
197 Collection(Collection<G, Row, Diff>),
199 Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
201 Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
203}
204
205impl<G, T> Context<G, T>
206where
207 G: Scope,
208 G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
209 <G::Timestamp as Columnar>::Container: Clone + Send,
210 for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
211 T: Timestamp + Lattice + Columnation,
212{
213 pub(crate) fn render_join(
214 &self,
215 inputs: Vec<CollectionBundle<G, T>>,
216 linear_plan: LinearJoinPlan,
217 ) -> CollectionBundle<G, T> {
218 self.scope.clone().region_named("Join(Linear)", |inner| {
219 self.render_join_inner(inputs, linear_plan, inner)
220 })
221 }
222
223 fn render_join_inner(
224 &self,
225 inputs: Vec<CollectionBundle<G, T>>,
226 linear_plan: LinearJoinPlan,
227 inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
228 ) -> CollectionBundle<G, T> {
229 let mut errors = Vec::new();
231
232 let arrangement = linear_plan
237 .stage_plans
238 .get(0)
239 .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
240 let mut joined = match (arrangement, linear_plan.initial_closure) {
242 (Some(ArrangementFlavor::Local(oks, errs)), None) => {
243 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
244 JoinedFlavor::Local(oks.enter_region(inner))
245 }
246 (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
247 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
248 JoinedFlavor::Trace(oks.enter_region(inner))
249 }
250 (_, initial_closure) => {
251 let (joined, errs) = inputs[linear_plan.source_relation]
254 .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
255 errors.push(errs.enter_region(inner));
256 let mut joined = joined.enter_region(inner);
257
258 if let Some(closure) = initial_closure {
261 let name = "LinearJoinInitialization";
265 type CB<C> = ConsolidatingContainerBuilder<C>;
266 let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
267 let mut datums = DatumVec::new();
269 move |row| {
270 let binding = SharedRow::get();
271 let mut row_builder = binding.borrow_mut();
272 let temp_storage = RowArena::new();
273 let mut datums_local = datums.borrow_with(&row);
274 closure
276 .apply(&mut datums_local, &temp_storage, &mut row_builder)
277 .map(|row| row.cloned())
278 .map_err(DataflowError::from)
279 .transpose()
280 }
281 });
282 joined = j;
283 errors.push(errs);
284 }
285
286 JoinedFlavor::Collection(joined)
287 }
288 };
289
290 for stage_plan in linear_plan.stage_plans.into_iter() {
292 let stream = self.differential_join(
295 joined,
296 inputs[stage_plan.lookup_relation].enter_region(inner),
297 stage_plan,
298 &mut errors,
299 );
300 joined = JoinedFlavor::Collection(stream);
302 }
303
304 let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
308 if let Some(closure) = linear_plan.final_closure {
309 let name = "LinearJoinFinalization";
310 type CB<C> = ConsolidatingContainerBuilder<C>;
311 let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
312 let mut datums = DatumVec::new();
314 move |row| {
315 let binding = SharedRow::get();
316 let mut row_builder = binding.borrow_mut();
317 let temp_storage = RowArena::new();
318 let mut datums_local = datums.borrow_with(&row);
319 closure
321 .apply(&mut datums_local, &temp_storage, &mut row_builder)
322 .map(|row| row.cloned())
323 .map_err(DataflowError::from)
324 .transpose()
325 }
326 });
327
328 joined = updates;
329 errors.push(errs);
330 }
331
332 CollectionBundle::from_collections(
334 joined,
335 differential_dataflow::collection::concatenate(inner, errors),
336 )
337 } else {
338 panic!("Unexpectedly arranged join output");
339 };
340 bundle.leave_region()
341 }
342
343 fn differential_join<S>(
346 &self,
347 mut joined: JoinedFlavor<S, T>,
348 lookup_relation: CollectionBundle<S, T>,
349 LinearStagePlan {
350 stream_key,
351 stream_thinning,
352 lookup_key,
353 closure,
354 lookup_relation: _,
355 }: LinearStagePlan,
356 errors: &mut Vec<Collection<S, DataflowError, Diff>>,
357 ) -> Collection<S, Row, Diff>
358 where
359 S: Scope<Timestamp = G::Timestamp>,
360 {
361 if let JoinedFlavor::Collection(stream) = joined {
363 let name = "LinearJoinKeyPreparation";
364 let (keyed, errs) = stream
365 .inner
366 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
367 Pipeline,
368 name,
369 |_, _| {
370 Box::new(move |input, ok, errs| {
371 let mut temp_storage = RowArena::new();
372 let mut key_buf = Row::default();
373 let mut val_buf = Row::default();
374 let mut datums = DatumVec::new();
375 while let Some((time, data)) = input.next() {
376 let mut ok_session = ok.session_with_builder(&time);
377 let mut err_session = errs.session(&time);
378 for (row, time, diff) in data.iter() {
379 temp_storage.clear();
380 let datums_local = datums.borrow_with(row);
381 let datums = stream_key
382 .iter()
383 .map(|e| e.eval(&datums_local, &temp_storage));
384 let result = key_buf.packer().try_extend(datums);
385 match result {
386 Ok(()) => {
387 val_buf.packer().extend(
388 stream_thinning.iter().map(|e| datums_local[*e]),
389 );
390 ok_session.give(((&key_buf, &val_buf), time, diff));
391 }
392 Err(e) => {
393 err_session.give((e.into(), time.clone(), *diff));
394 }
395 }
396 }
397 }
398 })
399 },
400 );
401
402 errors.push(errs.as_collection());
403
404 let arranged = keyed
405 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
406 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
407 );
408 joined = JoinedFlavor::Local(arranged);
409 }
410
411 let arrangement = lookup_relation
413 .arrangement(&lookup_key[..])
414 .expect("Arrangement absent despite explicit construction");
415
416 match joined {
417 JoinedFlavor::Collection(_) => {
418 unreachable!("JoinedFlavor::Collection variant avoided at top of method");
419 }
420 JoinedFlavor::Local(local) => match arrangement {
421 ArrangementFlavor::Local(oks, errs1) => {
422 let (oks, errs2) = self
423 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
424 local, oks, closure,
425 );
426
427 errors.push(errs1.as_collection(|k, _v| k.clone()));
428 errors.extend(errs2);
429 oks
430 }
431 ArrangementFlavor::Trace(_gid, oks, errs1) => {
432 let (oks, errs2) = self
433 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
434 local, oks, closure,
435 );
436
437 errors.push(errs1.as_collection(|k, _v| k.clone()));
438 errors.extend(errs2);
439 oks
440 }
441 },
442 JoinedFlavor::Trace(trace) => match arrangement {
443 ArrangementFlavor::Local(oks, errs1) => {
444 let (oks, errs2) = self
445 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
446 trace, oks, closure,
447 );
448
449 errors.push(errs1.as_collection(|k, _v| k.clone()));
450 errors.extend(errs2);
451 oks
452 }
453 ArrangementFlavor::Trace(_gid, oks, errs1) => {
454 let (oks, errs2) = self
455 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
456 trace, oks, closure,
457 );
458
459 errors.push(errs1.as_collection(|k, _v| k.clone()));
460 errors.extend(errs2);
461 oks
462 }
463 },
464 }
465 }
466
467 fn differential_join_inner<S, Tr1, Tr2>(
474 &self,
475 prev_keyed: Arranged<S, Tr1>,
476 next_input: Arranged<S, Tr2>,
477 closure: JoinClosure,
478 ) -> (
479 Collection<S, Row, Diff>,
480 Option<Collection<S, DataflowError, Diff>>,
481 )
482 where
483 S: Scope<Timestamp = G::Timestamp>,
484 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
485 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
486 + Clone
487 + 'static,
488 for<'a> Tr1::Key<'a>: ToDatumIter,
489 for<'a> Tr1::Val<'a>: ToDatumIter,
490 for<'a> Tr2::Val<'a>: ToDatumIter,
491 {
492 let mut datums = DatumVec::new();
494
495 if closure.could_error() {
496 let (oks, err) = self
497 .linear_join_spec
498 .render(
499 &prev_keyed,
500 &next_input,
501 self.shutdown_token.clone(),
502 move |key, old, new| {
503 let binding = SharedRow::get();
504 let mut row_builder = binding.borrow_mut();
505 let temp_storage = RowArena::new();
506
507 let mut datums_local = datums.borrow();
508 datums_local.extend(key.to_datum_iter());
509 datums_local.extend(old.to_datum_iter());
510 datums_local.extend(new.to_datum_iter());
511
512 closure
513 .apply(&mut datums_local, &temp_storage, &mut row_builder)
514 .map(|row| row.cloned())
515 .map_err(DataflowError::from)
516 .transpose()
517 },
518 )
519 .inner
520 .ok_err(|(x, t, d)| {
521 match x {
523 Ok(x) => Ok((x, t, d)),
524 Err(x) => Err((x, t, d)),
525 }
526 });
527
528 (oks.as_collection(), Some(err.as_collection()))
529 } else {
530 let oks = self.linear_join_spec.render(
531 &prev_keyed,
532 &next_input,
533 self.shutdown_token.clone(),
534 move |key, old, new| {
535 let binding = SharedRow::get();
536 let mut row_builder = binding.borrow_mut();
537 let temp_storage = RowArena::new();
538
539 let mut datums_local = datums.borrow();
540 datums_local.extend(key.to_datum_iter());
541 datums_local.extend(old.to_datum_iter());
542 datums_local.extend(new.to_datum_iter());
543
544 closure
545 .apply(&mut datums_local, &temp_storage, &mut row_builder)
546 .expect("Closure claimed to never error")
547 .cloned()
548 },
549 );
550
551 (oks, None)
552 }
553 }
554}