1use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
24use mz_dyncfg::ConfigSet;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
32use timely::dataflow::operators::OkErr;
33use timely::dataflow::scopes::Child;
34use timely::dataflow::{Scope, ScopeParent};
35use timely::progress::timestamp::Refines;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
40use crate::render::join::mz_join_core::mz_join_core;
41use crate::row_spine::{RowRowBuilder, RowRowSpine};
42use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
43
44#[derive(Clone, Copy)]
48enum LinearJoinImpl {
49 Materialize,
50 DifferentialDataflow,
51}
52
53#[derive(Clone, Copy)]
60pub struct LinearJoinSpec {
61 implementation: LinearJoinImpl,
62 yielding: YieldSpec,
63}
64
65impl Default for LinearJoinSpec {
66 fn default() -> Self {
67 Self {
68 implementation: LinearJoinImpl::Materialize,
69 yielding: Default::default(),
70 }
71 }
72}
73
74impl LinearJoinSpec {
75 pub fn from_config(config: &ConfigSet) -> Self {
77 let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
78 LinearJoinImpl::Materialize
79 } else {
80 LinearJoinImpl::DifferentialDataflow
81 };
82
83 let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
84 let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
85 tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
86 YieldSpec::default()
87 });
88
89 Self {
90 implementation,
91 yielding,
92 }
93 }
94
95 fn render<G, Tr1, Tr2, L, I>(
97 &self,
98 arranged1: &Arranged<G, Tr1>,
99 arranged2: &Arranged<G, Tr2>,
100 result: L,
101 ) -> VecCollection<G, I::Item, Diff>
102 where
103 G: Scope,
104 G::Timestamp: Lattice,
105 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
106 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
107 + Clone
108 + 'static,
109 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
110 I: IntoIterator<Item: Data> + 'static,
111 {
112 use LinearJoinImpl::*;
113
114 match (
115 self.implementation,
116 self.yielding.after_work,
117 self.yielding.after_time,
118 ) {
119 (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
120 (Materialize, Some(work_limit), Some(time_limit)) => {
121 let yield_fn =
122 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
123 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
124 }
125 (Materialize, Some(work_limit), None) => {
126 let yield_fn = move |_start, work| work >= work_limit;
127 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
128 }
129 (Materialize, None, Some(time_limit)) => {
130 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
131 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
132 }
133 (Materialize, None, None) => {
134 let yield_fn = |_start, _work| false;
135 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
136 }
137 }
138 }
139}
140
141#[derive(Clone, Copy)]
143struct YieldSpec {
144 after_work: Option<usize>,
146 after_time: Option<Duration>,
148}
149
150impl Default for YieldSpec {
151 fn default() -> Self {
152 Self {
153 after_work: Some(1_000_000),
154 after_time: Some(Duration::from_millis(100)),
155 }
156 }
157}
158
159impl YieldSpec {
160 fn try_from_str(s: &str) -> Option<Self> {
161 let mut after_work = None;
162 let mut after_time = None;
163
164 let options = s.split(',').map(|o| o.trim());
165 for option in options {
166 let mut iter = option.split(':').map(|p| p.trim());
167 match std::array::from_fn(|_| iter.next()) {
168 [Some("work"), Some(amount), None] => {
169 let amount = amount.parse().ok()?;
170 after_work = Some(amount);
171 }
172 [Some("time"), Some(millis), None] => {
173 let millis = millis.parse().ok()?;
174 let duration = Duration::from_millis(millis);
175 after_time = Some(duration);
176 }
177 _ => return None,
178 }
179 }
180
181 Some(Self {
182 after_work,
183 after_time,
184 })
185 }
186}
187
188enum JoinedFlavor<G, T>
190where
191 G: Scope,
192 G::Timestamp: Refines<T> + MzTimestamp,
193 T: MzTimestamp,
194{
195 Collection(VecCollection<G, Row, Diff>),
197 Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
199 Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
201}
202
203impl<G, T> Context<G, T>
204where
205 G: Scope,
206 G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
207 T: MzTimestamp,
208{
209 pub(crate) fn render_join(
210 &self,
211 inputs: Vec<CollectionBundle<G, T>>,
212 linear_plan: LinearJoinPlan,
213 ) -> CollectionBundle<G, T> {
214 self.scope.clone().region_named("Join(Linear)", |inner| {
215 self.render_join_inner(inputs, linear_plan, inner)
216 })
217 }
218
219 fn render_join_inner(
220 &self,
221 inputs: Vec<CollectionBundle<G, T>>,
222 linear_plan: LinearJoinPlan,
223 inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
224 ) -> CollectionBundle<G, T> {
225 let mut errors = Vec::new();
227
228 let arrangement = linear_plan
233 .stage_plans
234 .get(0)
235 .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
236 let mut joined = match (arrangement, linear_plan.initial_closure) {
238 (Some(ArrangementFlavor::Local(oks, errs)), None) => {
239 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
240 JoinedFlavor::Local(oks.enter_region(inner))
241 }
242 (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
243 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
244 JoinedFlavor::Trace(oks.enter_region(inner))
245 }
246 (_, initial_closure) => {
247 let (joined, errs) = inputs[linear_plan.source_relation]
250 .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
251 errors.push(errs.enter_region(inner));
252 let mut joined = joined.enter_region(inner);
253
254 if let Some(closure) = initial_closure {
257 let name = "LinearJoinInitialization";
261 type CB<C> = ConsolidatingContainerBuilder<C>;
262 let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
263 let mut datums = DatumVec::new();
265 move |row| {
266 let mut row_builder = SharedRow::get();
267 let temp_storage = RowArena::new();
268 let mut datums_local = datums.borrow_with(&row);
269 closure
271 .apply(&mut datums_local, &temp_storage, &mut row_builder)
272 .map(|row| row.cloned())
273 .map_err(DataflowError::from)
274 .transpose()
275 }
276 });
277 joined = j;
278 errors.push(errs);
279 }
280
281 JoinedFlavor::Collection(joined)
282 }
283 };
284
285 for stage_plan in linear_plan.stage_plans.into_iter() {
287 let stream = self.differential_join(
290 joined,
291 inputs[stage_plan.lookup_relation].enter_region(inner),
292 stage_plan,
293 &mut errors,
294 );
295 joined = JoinedFlavor::Collection(stream);
297 }
298
299 let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
303 if let Some(closure) = linear_plan.final_closure {
304 let name = "LinearJoinFinalization";
305 type CB<C> = ConsolidatingContainerBuilder<C>;
306 let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
307 let mut datums = DatumVec::new();
309 move |row| {
310 let mut row_builder = SharedRow::get();
311 let temp_storage = RowArena::new();
312 let mut datums_local = datums.borrow_with(&row);
313 closure
315 .apply(&mut datums_local, &temp_storage, &mut row_builder)
316 .map(|row| row.cloned())
317 .map_err(DataflowError::from)
318 .transpose()
319 }
320 });
321
322 joined = updates;
323 errors.push(errs);
324 }
325
326 CollectionBundle::from_collections(
328 joined,
329 differential_dataflow::collection::concatenate(inner, errors),
330 )
331 } else {
332 panic!("Unexpectedly arranged join output");
333 };
334 bundle.leave_region()
335 }
336
337 fn differential_join<S>(
340 &self,
341 mut joined: JoinedFlavor<S, T>,
342 lookup_relation: CollectionBundle<S, T>,
343 LinearStagePlan {
344 stream_key,
345 stream_thinning,
346 lookup_key,
347 closure,
348 lookup_relation: _,
349 }: LinearStagePlan,
350 errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
351 ) -> VecCollection<S, Row, Diff>
352 where
353 S: Scope<Timestamp = G::Timestamp>,
354 {
355 if let JoinedFlavor::Collection(stream) = joined {
357 let name = "LinearJoinKeyPreparation";
358 let (keyed, errs) = stream
359 .inner
360 .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
361 Pipeline,
362 name,
363 |_, _| {
364 Box::new(move |input, ok, errs| {
365 let mut temp_storage = RowArena::new();
366 let mut key_buf = Row::default();
367 let mut val_buf = Row::default();
368 let mut datums = DatumVec::new();
369 input.for_each(|time, data| {
370 let mut ok_session = ok.session_with_builder(&time);
371 let mut err_session = errs.session(&time);
372 for (row, time, diff) in data.iter() {
373 temp_storage.clear();
374 let datums_local = datums.borrow_with(row);
375 let datums = stream_key
376 .iter()
377 .map(|e| e.eval(&datums_local, &temp_storage));
378 let result = key_buf.packer().try_extend(datums);
379 match result {
380 Ok(()) => {
381 val_buf.packer().extend(
382 stream_thinning.iter().map(|e| datums_local[*e]),
383 );
384 ok_session.give(((&key_buf, &val_buf), time, diff));
385 }
386 Err(e) => {
387 err_session.give((e.into(), time.clone(), *diff));
388 }
389 }
390 }
391 });
392 })
393 },
394 );
395
396 errors.push(errs.as_collection());
397
398 let arranged = keyed
399 .mz_arrange_core::<
400 _,
401 Col2ValBatcher<_, _, _, _>,
402 RowRowBuilder<_, _>,
403 RowRowSpine<_, _>,
404 >(
405 ExchangeCore::<ColumnBuilder<_>, _>::new_core(
406 columnar_exchange::<Row, Row, S::Timestamp, Diff>,
407 ),
408 "JoinStage"
409 );
410 joined = JoinedFlavor::Local(arranged);
411 }
412
413 let arrangement = lookup_relation
415 .arrangement(&lookup_key[..])
416 .expect("Arrangement absent despite explicit construction");
417
418 match joined {
419 JoinedFlavor::Collection(_) => {
420 unreachable!("JoinedFlavor::VecCollection variant avoided at top of method");
421 }
422 JoinedFlavor::Local(local) => match arrangement {
423 ArrangementFlavor::Local(oks, errs1) => {
424 let (oks, errs2) = self
425 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
426 local, oks, closure,
427 );
428
429 errors.push(errs1.as_collection(|k, _v| k.clone()));
430 errors.extend(errs2);
431 oks
432 }
433 ArrangementFlavor::Trace(_gid, oks, errs1) => {
434 let (oks, errs2) = self
435 .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
436 local, oks, closure,
437 );
438
439 errors.push(errs1.as_collection(|k, _v| k.clone()));
440 errors.extend(errs2);
441 oks
442 }
443 },
444 JoinedFlavor::Trace(trace) => match arrangement {
445 ArrangementFlavor::Local(oks, errs1) => {
446 let (oks, errs2) = self
447 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
448 trace, oks, closure,
449 );
450
451 errors.push(errs1.as_collection(|k, _v| k.clone()));
452 errors.extend(errs2);
453 oks
454 }
455 ArrangementFlavor::Trace(_gid, oks, errs1) => {
456 let (oks, errs2) = self
457 .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
458 trace, oks, closure,
459 );
460
461 errors.push(errs1.as_collection(|k, _v| k.clone()));
462 errors.extend(errs2);
463 oks
464 }
465 },
466 }
467 }
468
469 fn differential_join_inner<S, Tr1, Tr2>(
476 &self,
477 prev_keyed: Arranged<S, Tr1>,
478 next_input: Arranged<S, Tr2>,
479 closure: JoinClosure,
480 ) -> (
481 VecCollection<S, Row, Diff>,
482 Option<VecCollection<S, DataflowError, Diff>>,
483 )
484 where
485 S: Scope<Timestamp = G::Timestamp>,
486 Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
487 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
488 + Clone
489 + 'static,
490 for<'a> Tr1::Key<'a>: ToDatumIter,
491 for<'a> Tr1::Val<'a>: ToDatumIter,
492 for<'a> Tr2::Val<'a>: ToDatumIter,
493 {
494 let mut datums = DatumVec::new();
496
497 if closure.could_error() {
498 let (oks, err) = self
499 .linear_join_spec
500 .render(&prev_keyed, &next_input, move |key, old, new| {
501 let mut row_builder = SharedRow::get();
502 let temp_storage = RowArena::new();
503
504 let mut datums_local = datums.borrow();
505 datums_local.extend(key.to_datum_iter());
506 datums_local.extend(old.to_datum_iter());
507 datums_local.extend(new.to_datum_iter());
508
509 closure
510 .apply(&mut datums_local, &temp_storage, &mut row_builder)
511 .map(|row| row.cloned())
512 .map_err(DataflowError::from)
513 .transpose()
514 })
515 .inner
516 .ok_err(|(x, t, d)| {
517 match x {
519 Ok(x) => Ok((x, t, d)),
520 Err(x) => Err((x, t, d)),
521 }
522 });
523
524 (oks.as_collection(), Some(err.as_collection()))
525 } else {
526 let oks =
527 self.linear_join_spec
528 .render(&prev_keyed, &next_input, move |key, old, new| {
529 let mut row_builder = SharedRow::get();
530 let temp_storage = RowArena::new();
531
532 let mut datums_local = datums.borrow();
533 datums_local.extend(key.to_datum_iter());
534 datums_local.extend(old.to_datum_iter());
535 datums_local.extend(new.to_datum_iter());
536
537 closure
538 .apply(&mut datums_local, &temp_storage, &mut row_builder)
539 .expect("Closure claimed to never error")
540 .cloned()
541 });
542
543 (oks, None)
544 }
545 }
546}