1use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Collection, Data};
21use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
24use mz_dyncfg::ConfigSet;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
32use timely::dataflow::operators::OkErr;
33use timely::dataflow::scopes::Child;
34use timely::dataflow::{Scope, ScopeParent};
35use timely::progress::timestamp::Refines;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
40use crate::render::join::mz_join_core::mz_join_core;
41use crate::row_spine::{RowRowBuilder, RowRowSpine};
42use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
43
44#[derive(Clone, Copy)]
48enum LinearJoinImpl {
49 Materialize,
50 DifferentialDataflow,
51}
52
53#[derive(Clone, Copy)]
60pub struct LinearJoinSpec {
61 implementation: LinearJoinImpl,
62 yielding: YieldSpec,
63}
64
65impl Default for LinearJoinSpec {
66 fn default() -> Self {
67 Self {
68 implementation: LinearJoinImpl::Materialize,
69 yielding: Default::default(),
70 }
71 }
72}
73
74impl LinearJoinSpec {
75 pub fn from_config(config: &ConfigSet) -> Self {
77 let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
78 LinearJoinImpl::Materialize
79 } else {
80 LinearJoinImpl::DifferentialDataflow
81 };
82
83 let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
84 let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
85 tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
86 YieldSpec::default()
87 });
88
89 Self {
90 implementation,
91 yielding,
92 }
93 }
94
95 fn render<G, Tr1, Tr2, L, I>(
97 &self,
98 arranged1: &Arranged<G, Tr1>,
99 arranged2: &Arranged<G, Tr2>,
100 shutdown_probe: ShutdownProbe,
101 result: L,
102 ) -> Collection<G, I::Item, Diff>
103 where
104 G: Scope,
105 G::Timestamp: Lattice,
106 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
107 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
108 + Clone
109 + 'static,
110 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
111 I: IntoIterator<Item: Data> + 'static,
112 {
113 use LinearJoinImpl::*;
114
115 match (
116 self.implementation,
117 self.yielding.after_work,
118 self.yielding.after_time,
119 ) {
120 (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
121 (Materialize, Some(work_limit), Some(time_limit)) => {
122 let yield_fn =
123 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
124 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
125 }
126 (Materialize, Some(work_limit), None) => {
127 let yield_fn = move |_start, work| work >= work_limit;
128 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
129 }
130 (Materialize, None, Some(time_limit)) => {
131 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
132 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
133 }
134 (Materialize, None, None) => {
135 let yield_fn = |_start, _work| false;
136 mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
137 }
138 }
139 }
140}
141
142#[derive(Clone, Copy)]
144struct YieldSpec {
145 after_work: Option<usize>,
147 after_time: Option<Duration>,
149}
150
151impl Default for YieldSpec {
152 fn default() -> Self {
153 Self {
154 after_work: Some(1_000_000),
155 after_time: Some(Duration::from_millis(100)),
156 }
157 }
158}
159
160impl YieldSpec {
161 fn try_from_str(s: &str) -> Option<Self> {
162 let mut after_work = None;
163 let mut after_time = None;
164
165 let options = s.split(',').map(|o| o.trim());
166 for option in options {
167 let mut iter = option.split(':').map(|p| p.trim());
168 match std::array::from_fn(|_| iter.next()) {
169 [Some("work"), Some(amount), None] => {
170 let amount = amount.parse().ok()?;
171 after_work = Some(amount);
172 }
173 [Some("time"), Some(millis), None] => {
174 let millis = millis.parse().ok()?;
175 let duration = Duration::from_millis(millis);
176 after_time = Some(duration);
177 }
178 _ => return None,
179 }
180 }
181
182 Some(Self {
183 after_work,
184 after_time,
185 })
186 }
187}
188
189enum JoinedFlavor<G, T>
191where
192 G: Scope,
193 G::Timestamp: Refines<T> + MzTimestamp,
194 T: MzTimestamp,
195{
196 Collection(Collection<G, Row, Diff>),
198 Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
200 Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
202}
203
204impl<G, T> Context<G, T>
205where
206 G: Scope,
207 G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
208 T: MzTimestamp,
209{
210 pub(crate) fn render_join(
211 &self,
212 inputs: Vec<CollectionBundle<G, T>>,
213 linear_plan: LinearJoinPlan,
214 ) -> CollectionBundle<G, T> {
215 self.scope.clone().region_named("Join(Linear)", |inner| {
216 self.render_join_inner(inputs, linear_plan, inner)
217 })
218 }
219
220 fn render_join_inner(
221 &self,
222 inputs: Vec<CollectionBundle<G, T>>,
223 linear_plan: LinearJoinPlan,
224 inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
225 ) -> CollectionBundle<G, T> {
226 let mut errors = Vec::new();
228
229 let arrangement = linear_plan
234 .stage_plans
235 .get(0)
236 .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
237 let mut joined = match (arrangement, linear_plan.initial_closure) {
239 (Some(ArrangementFlavor::Local(oks, errs)), None) => {
240 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
241 JoinedFlavor::Local(oks.enter_region(inner))
242 }
243 (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
244 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
245 JoinedFlavor::Trace(oks.enter_region(inner))
246 }
247 (_, initial_closure) => {
248 let (joined, errs) = inputs[linear_plan.source_relation]
251 .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
252 errors.push(errs.enter_region(inner));
253 let mut joined = joined.enter_region(inner);
254
255 if let Some(closure) = initial_closure {
258 let name = "LinearJoinInitialization";
262 type CB<C> = ConsolidatingContainerBuilder<C>;
263 let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
264 let mut datums = DatumVec::new();
266 move |row| {
267 let mut row_builder = SharedRow::get();
268 let temp_storage = RowArena::new();
269 let mut datums_local = datums.borrow_with(&row);
270 closure
272 .apply(&mut datums_local, &temp_storage, &mut row_builder)
273 .map(|row| row.cloned())
274 .map_err(DataflowError::from)
275 .transpose()
276 }
277 });
278 joined = j;
279 errors.push(errs);
280 }
281
282 JoinedFlavor::Collection(joined)
283 }
284 };
285
286 for stage_plan in linear_plan.stage_plans.into_iter() {
288 let stream = self.differential_join(
291 joined,
292 inputs[stage_plan.lookup_relation].enter_region(inner),
293 stage_plan,
294 &mut errors,
295 );
296 joined = JoinedFlavor::Collection(stream);
298 }
299
300 let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
304 if let Some(closure) = linear_plan.final_closure {
305 let name = "LinearJoinFinalization";
306 type CB<C> = ConsolidatingContainerBuilder<C>;
307 let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
308 let mut datums = DatumVec::new();
310 move |row| {
311 let mut row_builder = SharedRow::get();
312 let temp_storage = RowArena::new();
313 let mut datums_local = datums.borrow_with(&row);
314 closure
316 .apply(&mut datums_local, &temp_storage, &mut row_builder)
317 .map(|row| row.cloned())
318 .map_err(DataflowError::from)
319 .transpose()
320 }
321 });
322
323 joined = updates;
324 errors.push(errs);
325 }
326
327 CollectionBundle::from_collections(
329 joined,
330 differential_dataflow::collection::concatenate(inner, errors),
331 )
332 } else {
333 panic!("Unexpectedly arranged join output");
334 };
335 bundle.leave_region()
336 }
337
338 fn differential_join<S>(
341 &self,
342 mut joined: JoinedFlavor<S, T>,
343 lookup_relation: CollectionBundle<S, T>,
344 LinearStagePlan {
345 stream_key,
346 stream_thinning,
347 lookup_key,
348 closure,
349 lookup_relation: _,
350 }: LinearStagePlan,
351 errors: &mut Vec<Collection<S, DataflowError, Diff>>,
352 ) -> Collection<S, Row, Diff>
353 where
354 S: Scope<Timestamp = G::Timestamp>,
355 {
356 if let JoinedFlavor::Collection(stream) = joined {
358 let name = "LinearJoinKeyPreparation";
359 let (keyed, errs) = stream
360 .inner
361 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
362 Pipeline,
363 name,
364 |_, _| {
365 Box::new(move |input, ok, errs| {
366 let mut temp_storage = RowArena::new();
367 let mut key_buf = Row::default();
368 let mut val_buf = Row::default();
369 let mut datums = DatumVec::new();
370 while let Some((time, data)) = input.next() {
371 let mut ok_session = ok.session_with_builder(&time);
372 let mut err_session = errs.session(&time);
373 for (row, time, diff) in data.iter() {
374 temp_storage.clear();
375 let datums_local = datums.borrow_with(row);
376 let datums = stream_key
377 .iter()
378 .map(|e| e.eval(&datums_local, &temp_storage));
379 let result = key_buf.packer().try_extend(datums);
380 match result {
381 Ok(()) => {
382 val_buf.packer().extend(
383 stream_thinning.iter().map(|e| datums_local[*e]),
384 );
385 ok_session.give(((&key_buf, &val_buf), time, diff));
386 }
387 Err(e) => {
388 err_session.give((e.into(), time.clone(), *diff));
389 }
390 }
391 }
392 }
393 })
394 },
395 );
396
397 errors.push(errs.as_collection());
398
399 let arranged = keyed
400 .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
401 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
402 );
403 joined = JoinedFlavor::Local(arranged);
404 }
405
406 let arrangement = lookup_relation
408 .arrangement(&lookup_key[..])
409 .expect("Arrangement absent despite explicit construction");
410
411 match joined {
412 JoinedFlavor::Collection(_) => {
413 unreachable!("JoinedFlavor::Collection variant avoided at top of method");
414 }
415 JoinedFlavor::Local(local) => match arrangement {
416 ArrangementFlavor::Local(oks, errs1) => {
417 let (oks, errs2) = self
418 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
419 local, oks, closure,
420 );
421
422 errors.push(errs1.as_collection(|k, _v| k.clone()));
423 errors.extend(errs2);
424 oks
425 }
426 ArrangementFlavor::Trace(_gid, oks, errs1) => {
427 let (oks, errs2) = self
428 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
429 local, oks, closure,
430 );
431
432 errors.push(errs1.as_collection(|k, _v| k.clone()));
433 errors.extend(errs2);
434 oks
435 }
436 },
437 JoinedFlavor::Trace(trace) => match arrangement {
438 ArrangementFlavor::Local(oks, errs1) => {
439 let (oks, errs2) = self
440 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
441 trace, oks, closure,
442 );
443
444 errors.push(errs1.as_collection(|k, _v| k.clone()));
445 errors.extend(errs2);
446 oks
447 }
448 ArrangementFlavor::Trace(_gid, oks, errs1) => {
449 let (oks, errs2) = self
450 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
451 trace, oks, closure,
452 );
453
454 errors.push(errs1.as_collection(|k, _v| k.clone()));
455 errors.extend(errs2);
456 oks
457 }
458 },
459 }
460 }
461
462 fn differential_join_inner<S, Tr1, Tr2>(
469 &self,
470 prev_keyed: Arranged<S, Tr1>,
471 next_input: Arranged<S, Tr2>,
472 closure: JoinClosure,
473 ) -> (
474 Collection<S, Row, Diff>,
475 Option<Collection<S, DataflowError, Diff>>,
476 )
477 where
478 S: Scope<Timestamp = G::Timestamp>,
479 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
480 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
481 + Clone
482 + 'static,
483 for<'a> Tr1::Key<'a>: ToDatumIter,
484 for<'a> Tr1::Val<'a>: ToDatumIter,
485 for<'a> Tr2::Val<'a>: ToDatumIter,
486 {
487 let mut datums = DatumVec::new();
489
490 if closure.could_error() {
491 let (oks, err) = self
492 .linear_join_spec
493 .render(
494 &prev_keyed,
495 &next_input,
496 self.shutdown_probe.clone(),
497 move |key, old, new| {
498 let mut row_builder = SharedRow::get();
499 let temp_storage = RowArena::new();
500
501 let mut datums_local = datums.borrow();
502 datums_local.extend(key.to_datum_iter());
503 datums_local.extend(old.to_datum_iter());
504 datums_local.extend(new.to_datum_iter());
505
506 closure
507 .apply(&mut datums_local, &temp_storage, &mut row_builder)
508 .map(|row| row.cloned())
509 .map_err(DataflowError::from)
510 .transpose()
511 },
512 )
513 .inner
514 .ok_err(|(x, t, d)| {
515 match x {
517 Ok(x) => Ok((x, t, d)),
518 Err(x) => Err((x, t, d)),
519 }
520 });
521
522 (oks.as_collection(), Some(err.as_collection()))
523 } else {
524 let oks = self.linear_join_spec.render(
525 &prev_keyed,
526 &next_input,
527 self.shutdown_probe.clone(),
528 move |key, old, new| {
529 let mut row_builder = SharedRow::get();
530 let temp_storage = RowArena::new();
531
532 let mut datums_local = datums.borrow();
533 datums_local.extend(key.to_datum_iter());
534 datums_local.extend(old.to_datum_iter());
535 datums_local.extend(new.to_datum_iter());
536
537 closure
538 .apply(&mut datums_local, &temp_storage, &mut row_builder)
539 .expect("Closure claimed to never error")
540 .cloned()
541 },
542 );
543
544 (oks, None)
545 }
546 }
547}