1use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Collection, Data};
21use mz_compute_types::dyncfgs::{
22 ENABLE_MZ_JOIN_CORE, ENABLE_MZ_JOIN_CORE_V2, LINEAR_JOIN_YIELDING,
23};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
26use mz_dyncfg::ConfigSet;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
29use mz_storage_types::errors::DataflowError;
30use mz_timely_util::columnar::builder::ColumnBuilder;
31use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
32use mz_timely_util::operator::{CollectionExt, StreamExt};
33use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
34use timely::dataflow::operators::OkErr;
35use timely::dataflow::scopes::Child;
36use timely::dataflow::{Scope, ScopeParent};
37use timely::progress::timestamp::Refines;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::render::RenderTimestamp;
41use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
42use crate::render::join::mz_join_core::mz_join_core;
43use crate::render::join::mz_join_core_v2::mz_join_core as mz_join_core_v2;
44use crate::row_spine::{RowRowBuilder, RowRowSpine};
45use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
46
47#[derive(Clone, Copy)]
51enum LinearJoinImpl {
52 Materialize,
53 MaterializeV2,
54 DifferentialDataflow,
55}
56
57#[derive(Clone, Copy)]
64pub struct LinearJoinSpec {
65 implementation: LinearJoinImpl,
66 yielding: YieldSpec,
67}
68
69impl Default for LinearJoinSpec {
70 fn default() -> Self {
71 Self {
72 implementation: LinearJoinImpl::Materialize,
73 yielding: Default::default(),
74 }
75 }
76}
77
78impl LinearJoinSpec {
79 pub fn from_config(config: &ConfigSet) -> Self {
81 let implementation = if ENABLE_MZ_JOIN_CORE_V2.get(config) {
82 LinearJoinImpl::MaterializeV2
83 } else if ENABLE_MZ_JOIN_CORE.get(config) {
84 LinearJoinImpl::Materialize
85 } else {
86 LinearJoinImpl::DifferentialDataflow
87 };
88
89 let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
90 let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
91 tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
92 YieldSpec::default()
93 });
94
95 Self {
96 implementation,
97 yielding,
98 }
99 }
100
101 fn render<G, Tr1, Tr2, L, I>(
103 &self,
104 arranged1: &Arranged<G, Tr1>,
105 arranged2: &Arranged<G, Tr2>,
106 shutdown_probe: ShutdownProbe,
107 result: L,
108 ) -> Collection<G, I::Item, Diff>
109 where
110 G: Scope,
111 G::Timestamp: Lattice,
112 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
113 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
114 + Clone
115 + 'static,
116 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
117 I: IntoIterator<Item: Data> + 'static,
118 {
119 use LinearJoinImpl::*;
120
121 match (
122 self.implementation,
123 self.yielding.after_work,
124 self.yielding.after_time,
125 ) {
126 (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
127 (Materialize, Some(work_limit), Some(time_limit)) => {
128 let yield_fn =
129 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
130 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
131 }
132 (Materialize, Some(work_limit), None) => {
133 let yield_fn = move |_start, work| work >= work_limit;
134 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
135 }
136 (Materialize, None, Some(time_limit)) => {
137 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
138 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
139 }
140 (Materialize, None, None) => {
141 let yield_fn = |_start, _work| false;
142 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
143 }
144 (MaterializeV2, Some(work_limit), Some(time_limit)) => {
145 let yield_fn =
146 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
147 mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
148 .as_collection()
149 }
150 (MaterializeV2, Some(work_limit), None) => {
151 let yield_fn = move |_start, work| work >= work_limit;
152 mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
153 .as_collection()
154 }
155 (MaterializeV2, None, Some(time_limit)) => {
156 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
157 mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
158 .as_collection()
159 }
160 (MaterializeV2, None, None) => {
161 let yield_fn = |_start, _work| false;
162 mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
163 .as_collection()
164 }
165 }
166 }
167}
168
169#[derive(Clone, Copy)]
171struct YieldSpec {
172 after_work: Option<usize>,
174 after_time: Option<Duration>,
176}
177
178impl Default for YieldSpec {
179 fn default() -> Self {
180 Self {
181 after_work: Some(1_000_000),
182 after_time: Some(Duration::from_millis(100)),
183 }
184 }
185}
186
187impl YieldSpec {
188 fn try_from_str(s: &str) -> Option<Self> {
189 let mut after_work = None;
190 let mut after_time = None;
191
192 let options = s.split(',').map(|o| o.trim());
193 for option in options {
194 let mut iter = option.split(':').map(|p| p.trim());
195 match std::array::from_fn(|_| iter.next()) {
196 [Some("work"), Some(amount), None] => {
197 let amount = amount.parse().ok()?;
198 after_work = Some(amount);
199 }
200 [Some("time"), Some(millis), None] => {
201 let millis = millis.parse().ok()?;
202 let duration = Duration::from_millis(millis);
203 after_time = Some(duration);
204 }
205 _ => return None,
206 }
207 }
208
209 Some(Self {
210 after_work,
211 after_time,
212 })
213 }
214}
215
216enum JoinedFlavor<G, T>
218where
219 G: Scope,
220 G::Timestamp: Refines<T> + MzTimestamp,
221 T: MzTimestamp,
222{
223 Collection(Collection<G, Row, Diff>),
225 Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
227 Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
229}
230
231impl<G, T> Context<G, T>
232where
233 G: Scope,
234 G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
235 T: MzTimestamp,
236{
237 pub(crate) fn render_join(
238 &self,
239 inputs: Vec<CollectionBundle<G, T>>,
240 linear_plan: LinearJoinPlan,
241 ) -> CollectionBundle<G, T> {
242 self.scope.clone().region_named("Join(Linear)", |inner| {
243 self.render_join_inner(inputs, linear_plan, inner)
244 })
245 }
246
247 fn render_join_inner(
248 &self,
249 inputs: Vec<CollectionBundle<G, T>>,
250 linear_plan: LinearJoinPlan,
251 inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
252 ) -> CollectionBundle<G, T> {
253 let mut errors = Vec::new();
255
256 let arrangement = linear_plan
261 .stage_plans
262 .get(0)
263 .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
264 let mut joined = match (arrangement, linear_plan.initial_closure) {
266 (Some(ArrangementFlavor::Local(oks, errs)), None) => {
267 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
268 JoinedFlavor::Local(oks.enter_region(inner))
269 }
270 (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
271 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
272 JoinedFlavor::Trace(oks.enter_region(inner))
273 }
274 (_, initial_closure) => {
275 let (joined, errs) = inputs[linear_plan.source_relation]
278 .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
279 errors.push(errs.enter_region(inner));
280 let mut joined = joined.enter_region(inner);
281
282 if let Some(closure) = initial_closure {
285 let name = "LinearJoinInitialization";
289 type CB<C> = ConsolidatingContainerBuilder<C>;
290 let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
291 let mut datums = DatumVec::new();
293 move |row| {
294 let mut row_builder = SharedRow::get();
295 let temp_storage = RowArena::new();
296 let mut datums_local = datums.borrow_with(&row);
297 closure
299 .apply(&mut datums_local, &temp_storage, &mut row_builder)
300 .map(|row| row.cloned())
301 .map_err(DataflowError::from)
302 .transpose()
303 }
304 });
305 joined = j;
306 errors.push(errs);
307 }
308
309 JoinedFlavor::Collection(joined)
310 }
311 };
312
313 for stage_plan in linear_plan.stage_plans.into_iter() {
315 let stream = self.differential_join(
318 joined,
319 inputs[stage_plan.lookup_relation].enter_region(inner),
320 stage_plan,
321 &mut errors,
322 );
323 joined = JoinedFlavor::Collection(stream);
325 }
326
327 let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
331 if let Some(closure) = linear_plan.final_closure {
332 let name = "LinearJoinFinalization";
333 type CB<C> = ConsolidatingContainerBuilder<C>;
334 let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
335 let mut datums = DatumVec::new();
337 move |row| {
338 let mut row_builder = SharedRow::get();
339 let temp_storage = RowArena::new();
340 let mut datums_local = datums.borrow_with(&row);
341 closure
343 .apply(&mut datums_local, &temp_storage, &mut row_builder)
344 .map(|row| row.cloned())
345 .map_err(DataflowError::from)
346 .transpose()
347 }
348 });
349
350 joined = updates;
351 errors.push(errs);
352 }
353
354 CollectionBundle::from_collections(
356 joined,
357 differential_dataflow::collection::concatenate(inner, errors),
358 )
359 } else {
360 panic!("Unexpectedly arranged join output");
361 };
362 bundle.leave_region()
363 }
364
365 fn differential_join<S>(
368 &self,
369 mut joined: JoinedFlavor<S, T>,
370 lookup_relation: CollectionBundle<S, T>,
371 LinearStagePlan {
372 stream_key,
373 stream_thinning,
374 lookup_key,
375 closure,
376 lookup_relation: _,
377 }: LinearStagePlan,
378 errors: &mut Vec<Collection<S, DataflowError, Diff>>,
379 ) -> Collection<S, Row, Diff>
380 where
381 S: Scope<Timestamp = G::Timestamp>,
382 {
383 if let JoinedFlavor::Collection(stream) = joined {
385 let name = "LinearJoinKeyPreparation";
386 let (keyed, errs) = stream
387 .inner
388 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
389 Pipeline,
390 name,
391 |_, _| {
392 Box::new(move |input, ok, errs| {
393 let mut temp_storage = RowArena::new();
394 let mut key_buf = Row::default();
395 let mut val_buf = Row::default();
396 let mut datums = DatumVec::new();
397 while let Some((time, data)) = input.next() {
398 let mut ok_session = ok.session_with_builder(&time);
399 let mut err_session = errs.session(&time);
400 for (row, time, diff) in data.iter() {
401 temp_storage.clear();
402 let datums_local = datums.borrow_with(row);
403 let datums = stream_key
404 .iter()
405 .map(|e| e.eval(&datums_local, &temp_storage));
406 let result = key_buf.packer().try_extend(datums);
407 match result {
408 Ok(()) => {
409 val_buf.packer().extend(
410 stream_thinning.iter().map(|e| datums_local[*e]),
411 );
412 ok_session.give(((&key_buf, &val_buf), time, diff));
413 }
414 Err(e) => {
415 err_session.give((e.into(), time.clone(), *diff));
416 }
417 }
418 }
419 }
420 })
421 },
422 );
423
424 errors.push(errs.as_collection());
425
426 let arranged = keyed
427 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
428 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
429 );
430 joined = JoinedFlavor::Local(arranged);
431 }
432
433 let arrangement = lookup_relation
435 .arrangement(&lookup_key[..])
436 .expect("Arrangement absent despite explicit construction");
437
438 match joined {
439 JoinedFlavor::Collection(_) => {
440 unreachable!("JoinedFlavor::Collection variant avoided at top of method");
441 }
442 JoinedFlavor::Local(local) => match arrangement {
443 ArrangementFlavor::Local(oks, errs1) => {
444 let (oks, errs2) = self
445 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
446 local, oks, closure,
447 );
448
449 errors.push(errs1.as_collection(|k, _v| k.clone()));
450 errors.extend(errs2);
451 oks
452 }
453 ArrangementFlavor::Trace(_gid, oks, errs1) => {
454 let (oks, errs2) = self
455 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
456 local, oks, closure,
457 );
458
459 errors.push(errs1.as_collection(|k, _v| k.clone()));
460 errors.extend(errs2);
461 oks
462 }
463 },
464 JoinedFlavor::Trace(trace) => match arrangement {
465 ArrangementFlavor::Local(oks, errs1) => {
466 let (oks, errs2) = self
467 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
468 trace, oks, closure,
469 );
470
471 errors.push(errs1.as_collection(|k, _v| k.clone()));
472 errors.extend(errs2);
473 oks
474 }
475 ArrangementFlavor::Trace(_gid, oks, errs1) => {
476 let (oks, errs2) = self
477 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
478 trace, oks, closure,
479 );
480
481 errors.push(errs1.as_collection(|k, _v| k.clone()));
482 errors.extend(errs2);
483 oks
484 }
485 },
486 }
487 }
488
489 fn differential_join_inner<S, Tr1, Tr2>(
496 &self,
497 prev_keyed: Arranged<S, Tr1>,
498 next_input: Arranged<S, Tr2>,
499 closure: JoinClosure,
500 ) -> (
501 Collection<S, Row, Diff>,
502 Option<Collection<S, DataflowError, Diff>>,
503 )
504 where
505 S: Scope<Timestamp = G::Timestamp>,
506 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
507 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
508 + Clone
509 + 'static,
510 for<'a> Tr1::Key<'a>: ToDatumIter,
511 for<'a> Tr1::Val<'a>: ToDatumIter,
512 for<'a> Tr2::Val<'a>: ToDatumIter,
513 {
514 let mut datums = DatumVec::new();
516
517 if closure.could_error() {
518 let (oks, err) = self
519 .linear_join_spec
520 .render(
521 &prev_keyed,
522 &next_input,
523 self.shutdown_probe.clone(),
524 move |key, old, new| {
525 let mut row_builder = SharedRow::get();
526 let temp_storage = RowArena::new();
527
528 let mut datums_local = datums.borrow();
529 datums_local.extend(key.to_datum_iter());
530 datums_local.extend(old.to_datum_iter());
531 datums_local.extend(new.to_datum_iter());
532
533 closure
534 .apply(&mut datums_local, &temp_storage, &mut row_builder)
535 .map(|row| row.cloned())
536 .map_err(DataflowError::from)
537 .transpose()
538 },
539 )
540 .inner
541 .ok_err(|(x, t, d)| {
542 match x {
544 Ok(x) => Ok((x, t, d)),
545 Err(x) => Err((x, t, d)),
546 }
547 });
548
549 (oks.as_collection(), Some(err.as_collection()))
550 } else {
551 let oks = self.linear_join_spec.render(
552 &prev_keyed,
553 &next_input,
554 self.shutdown_probe.clone(),
555 move |key, old, new| {
556 let mut row_builder = SharedRow::get();
557 let temp_storage = RowArena::new();
558
559 let mut datums_local = datums.borrow();
560 datums_local.extend(key.to_datum_iter());
561 datums_local.extend(old.to_datum_iter());
562 datums_local.extend(new.to_datum_iter());
563
564 closure
565 .apply(&mut datums_local, &temp_storage, &mut row_builder)
566 .expect("Closure claimed to never error")
567 .cloned()
568 },
569 );
570
571 (oks, None)
572 }
573 }
574}