1use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
24use mz_dyncfg::ConfigSet;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::dataflow::Scope;
32use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
33use timely::dataflow::operators::OkErr;
34
35use crate::extensions::arrange::MzArrangeCore;
36use crate::render::RenderTimestamp;
37use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
38use crate::render::join::mz_join_core::mz_join_core;
39use crate::row_spine::{RowRowBuilder, RowRowSpine};
40use crate::typedefs::{RowRowAgent, RowRowEnter};
41
42#[derive(Clone, Copy)]
46enum LinearJoinImpl {
47 Materialize,
48 DifferentialDataflow,
49}
50
51#[derive(Clone, Copy)]
58pub struct LinearJoinSpec {
59 implementation: LinearJoinImpl,
60 yielding: YieldSpec,
61}
62
63impl Default for LinearJoinSpec {
64 fn default() -> Self {
65 Self {
66 implementation: LinearJoinImpl::Materialize,
67 yielding: Default::default(),
68 }
69 }
70}
71
72impl LinearJoinSpec {
73 pub fn from_config(config: &ConfigSet) -> Self {
75 let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
76 LinearJoinImpl::Materialize
77 } else {
78 LinearJoinImpl::DifferentialDataflow
79 };
80
81 let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
82 let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
83 tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
84 YieldSpec::default()
85 });
86
87 Self {
88 implementation,
89 yielding,
90 }
91 }
92
93 fn render<'s, T, Tr1, Tr2, L, I>(
95 &self,
96 arranged1: Arranged<'s, Tr1>,
97 arranged2: Arranged<'s, Tr2>,
98 result: L,
99 ) -> VecCollection<'s, T, I::Item, Diff>
100 where
101 T: Lattice + timely::progress::Timestamp,
102 Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
103 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
104 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
105 I: IntoIterator<Item: Data> + 'static,
106 {
107 use LinearJoinImpl::*;
108
109 match (
110 self.implementation,
111 self.yielding.after_work,
112 self.yielding.after_time,
113 ) {
114 (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
115 (Materialize, Some(work_limit), Some(time_limit)) => {
116 let yield_fn =
117 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
118 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
119 }
120 (Materialize, Some(work_limit), None) => {
121 let yield_fn = move |_start, work| work >= work_limit;
122 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
123 }
124 (Materialize, None, Some(time_limit)) => {
125 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
126 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
127 }
128 (Materialize, None, None) => {
129 let yield_fn = |_start, _work| false;
130 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
131 }
132 }
133 }
134}
135
136#[derive(Clone, Copy)]
138struct YieldSpec {
139 after_work: Option<usize>,
141 after_time: Option<Duration>,
143}
144
145impl Default for YieldSpec {
146 fn default() -> Self {
147 Self {
148 after_work: Some(1_000_000),
149 after_time: Some(Duration::from_millis(100)),
150 }
151 }
152}
153
154impl YieldSpec {
155 fn try_from_str(s: &str) -> Option<Self> {
156 let mut after_work = None;
157 let mut after_time = None;
158
159 let options = s.split(',').map(|o| o.trim());
160 for option in options {
161 let mut iter = option.split(':').map(|p| p.trim());
162 match std::array::from_fn(|_| iter.next()) {
163 [Some("work"), Some(amount), None] => {
164 let amount = amount.parse().ok()?;
165 after_work = Some(amount);
166 }
167 [Some("time"), Some(millis), None] => {
168 let millis = millis.parse().ok()?;
169 let duration = Duration::from_millis(millis);
170 after_time = Some(duration);
171 }
172 _ => return None,
173 }
174 }
175
176 Some(Self {
177 after_work,
178 after_time,
179 })
180 }
181}
182
183enum JoinedFlavor<'scope, T: RenderTimestamp> {
185 Collection(VecCollection<'scope, T, Row, Diff>),
187 Local(Arranged<'scope, RowRowAgent<T, Diff>>),
189 Trace(Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>),
191}
192
193impl<'scope, T> Context<'scope, T>
194where
195 T: Lattice + RenderTimestamp,
196{
197 pub(crate) fn render_join(
198 &self,
199 inputs: Vec<CollectionBundle<'scope, T>>,
200 linear_plan: LinearJoinPlan,
201 ) -> CollectionBundle<'scope, T> {
202 self.scope.clone().region_named("Join(Linear)", |inner| {
203 self.render_join_inner(inputs, linear_plan, inner)
204 })
205 }
206
207 fn render_join_inner(
208 &self,
209 inputs: Vec<CollectionBundle<'scope, T>>,
210 linear_plan: LinearJoinPlan,
211 inner: Scope<'_, T>,
212 ) -> CollectionBundle<'scope, T> {
213 let mut errors = Vec::new();
215
216 let arrangement = linear_plan
221 .stage_plans
222 .get(0)
223 .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
224 let mut joined = match (arrangement, linear_plan.initial_closure) {
226 (Some(ArrangementFlavor::Local(oks, errs)), None) => {
227 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
228 JoinedFlavor::Local(oks.enter_region(inner))
229 }
230 (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
231 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
232 JoinedFlavor::Trace(oks.enter_region(inner))
233 }
234 (_, initial_closure) => {
235 let (joined, errs) = inputs[linear_plan.source_relation]
238 .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
239 errors.push(errs.enter_region(inner));
240 let mut joined = joined.enter_region(inner);
241
242 if let Some(closure) = initial_closure {
245 let name = "LinearJoinInitialization";
249 type CB<C> = ConsolidatingContainerBuilder<C>;
250 let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
251 let mut datums = DatumVec::new();
253 move |row| {
254 let mut row_builder = SharedRow::get();
255 let temp_storage = RowArena::new();
256 let mut datums_local = datums.borrow_with(&row);
257 closure
259 .apply(&mut datums_local, &temp_storage, &mut row_builder)
260 .map(|row| row.cloned())
261 .map_err(DataflowError::from)
262 .transpose()
263 }
264 });
265 joined = j;
266 errors.push(errs);
267 }
268
269 JoinedFlavor::Collection(joined)
270 }
271 };
272
273 for stage_plan in linear_plan.stage_plans.into_iter() {
275 let stream = self.differential_join(
278 joined,
279 inputs[stage_plan.lookup_relation].enter_region(inner),
280 stage_plan,
281 &mut errors,
282 );
283 joined = JoinedFlavor::Collection(stream);
285 }
286
287 let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
291 if let Some(closure) = linear_plan.final_closure {
292 let name = "LinearJoinFinalization";
293 type CB<C> = ConsolidatingContainerBuilder<C>;
294 let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
295 let mut datums = DatumVec::new();
297 move |row| {
298 let mut row_builder = SharedRow::get();
299 let temp_storage = RowArena::new();
300 let mut datums_local = datums.borrow_with(&row);
301 closure
303 .apply(&mut datums_local, &temp_storage, &mut row_builder)
304 .map(|row| row.cloned())
305 .map_err(DataflowError::from)
306 .transpose()
307 }
308 });
309
310 joined = updates;
311 errors.push(errs);
312 }
313
314 CollectionBundle::from_collections(
316 joined,
317 differential_dataflow::collection::concatenate(inner, errors),
318 )
319 } else {
320 panic!("Unexpectedly arranged join output");
321 };
322 bundle.leave_region(self.scope)
323 }
324
325 fn differential_join<'s>(
328 &self,
329 mut joined: JoinedFlavor<'s, T>,
330 lookup_relation: CollectionBundle<'s, T>,
331 LinearStagePlan {
332 stream_key,
333 stream_thinning,
334 lookup_key,
335 closure,
336 lookup_relation: _,
337 }: LinearStagePlan,
338 errors: &mut Vec<VecCollection<'s, T, DataflowError, Diff>>,
339 ) -> VecCollection<'s, T, Row, Diff> {
340 if let JoinedFlavor::Collection(stream) = joined {
342 let name = "LinearJoinKeyPreparation";
343 let (keyed, errs) = stream
344 .inner
345 .unary_fallible::<ColumnBuilder<((Row, Row), T, Diff)>, _, _, _>(
346 Pipeline,
347 name,
348 |_, _| {
349 Box::new(move |input, ok, errs| {
350 let mut temp_storage = RowArena::new();
351 let mut key_buf = Row::default();
352 let mut val_buf = Row::default();
353 let mut datums = DatumVec::new();
354 input.for_each(|time, data| {
355 let mut ok_session = ok.session_with_builder(&time);
356 let mut err_session = errs.session(&time);
357 for (row, time, diff) in data.iter() {
358 temp_storage.clear();
359 let datums_local = datums.borrow_with(row);
360 let datums = stream_key
361 .iter()
362 .map(|e| e.eval(&datums_local, &temp_storage));
363 let result = key_buf.packer().try_extend(datums);
364 match result {
365 Ok(()) => {
366 val_buf.packer().extend(
367 stream_thinning.iter().map(|e| datums_local[*e]),
368 );
369 ok_session.give(((&key_buf, &val_buf), time, diff));
370 }
371 Err(e) => {
372 err_session.give((e.into(), time.clone(), *diff));
373 }
374 }
375 }
376 });
377 })
378 },
379 );
380
381 errors.push(errs.as_collection());
382
383 let arranged = keyed
384 .mz_arrange_core::<
385 _,
386 Col2ValBatcher<_, _, _, _>,
387 RowRowBuilder<_, _>,
388 RowRowSpine<_, _>,
389 >(
390 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
391 columnar_exchange::<Row, Row, T, Diff>,
392 ),
393 "JoinStage"
394 );
395 joined = JoinedFlavor::Local(arranged);
396 }
397
398 let arrangement = lookup_relation
400 .arrangement(&lookup_key[..])
401 .expect("Arrangement absent despite explicit construction");
402
403 match joined {
404 JoinedFlavor::Collection(_) => {
405 unreachable!("JoinedFlavor::VecCollection variant avoided at top of method");
406 }
407 JoinedFlavor::Local(local) => match arrangement {
408 ArrangementFlavor::Local(oks, errs1) => {
409 let (oks, errs2) = self
410 .differential_join_inner::<RowRowAgent<_, _>, RowRowAgent<_, _>>(
411 local, oks, closure,
412 );
413
414 errors.push(errs1.as_collection(|k, _v| k.clone()));
415 errors.extend(errs2);
416 oks
417 }
418 ArrangementFlavor::Trace(_gid, oks, errs1) => {
419 let (oks, errs2) = self
420 .differential_join_inner::<RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
421 local, oks, closure,
422 );
423
424 errors.push(errs1.as_collection(|k, _v| k.clone()));
425 errors.extend(errs2);
426 oks
427 }
428 },
429 JoinedFlavor::Trace(trace) => match arrangement {
430 ArrangementFlavor::Local(oks, errs1) => {
431 let (oks, errs2) = self
432 .differential_join_inner::<RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
433 trace, oks, closure,
434 );
435
436 errors.push(errs1.as_collection(|k, _v| k.clone()));
437 errors.extend(errs2);
438 oks
439 }
440 ArrangementFlavor::Trace(_gid, oks, errs1) => {
441 let (oks, errs2) = self
442 .differential_join_inner::<RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
443 trace, oks, closure,
444 );
445
446 errors.push(errs1.as_collection(|k, _v| k.clone()));
447 errors.extend(errs2);
448 oks
449 }
450 },
451 }
452 }
453
454 fn differential_join_inner<'s, Tr1, Tr2>(
461 &self,
462 prev_keyed: Arranged<'s, Tr1>,
463 next_input: Arranged<'s, Tr2>,
464 closure: JoinClosure,
465 ) -> (
466 VecCollection<'s, T, Row, Diff>,
467 Option<VecCollection<'s, T, DataflowError, Diff>>,
468 )
469 where
470 Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
471 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
472 for<'a> Tr1::Key<'a>: ToDatumIter,
473 for<'a> Tr1::Val<'a>: ToDatumIter,
474 for<'a> Tr2::Val<'a>: ToDatumIter,
475 {
476 let mut datums = DatumVec::new();
478
479 if closure.could_error() {
480 let (oks, err) = self
481 .linear_join_spec
482 .render(prev_keyed, next_input, move |key, old, new| {
483 let mut row_builder = SharedRow::get();
484 let temp_storage = RowArena::new();
485
486 let mut datums_local = datums.borrow();
487 datums_local.extend(key.to_datum_iter());
488 datums_local.extend(old.to_datum_iter());
489 datums_local.extend(new.to_datum_iter());
490
491 closure
492 .apply(&mut datums_local, &temp_storage, &mut row_builder)
493 .map(|row| row.cloned())
494 .map_err(DataflowError::from)
495 .transpose()
496 })
497 .inner
498 .ok_err(|(x, t, d)| {
499 match x {
501 Ok(x) => Ok((x, t, d)),
502 Err(x) => Err((x, t, d)),
503 }
504 });
505
506 (oks.as_collection(), Some(err.as_collection()))
507 } else {
508 let oks = self
509 .linear_join_spec
510 .render(prev_keyed, next_input, move |key, old, new| {
511 let mut row_builder = SharedRow::get();
512 let temp_storage = RowArena::new();
513
514 let mut datums_local = datums.borrow();
515 datums_local.extend(key.to_datum_iter());
516 datums_local.extend(old.to_datum_iter());
517 datums_local.extend(new.to_datum_iter());
518
519 closure
520 .apply(&mut datums_local, &temp_storage, &mut row_builder)
521 .expect("Closure claimed to never error")
522 .cloned()
523 });
524
525 (oks, None)
526 }
527 }
528}