1use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dyncfgs::{
22 ENABLE_COLUMN_PAGED_BATCHER, ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING,
23};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
26use mz_dyncfg::ConfigSet;
27use mz_expr::Eval;
28use mz_repr::fixed_length::ToDatumIter;
29use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
30use mz_timely_util::columnar::batcher;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange};
33use mz_timely_util::operator::{CollectionExt, StreamExt};
34use timely::dataflow::Scope;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::OkErr;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::render::RenderTimestamp;
40use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
41use crate::render::errors::DataflowErrorSer;
42use crate::render::join::mz_join_core::mz_join_core;
43use crate::typedefs::{RowRowAgent, RowRowEnter};
44use mz_row_spine::{RowRowBuilder, RowRowColPagedBuilder, RowRowSpine};
45
46#[derive(Clone, Copy)]
50enum LinearJoinImpl {
51 Materialize,
52 DifferentialDataflow,
53}
54
55#[derive(Clone, Copy)]
62pub struct LinearJoinSpec {
63 implementation: LinearJoinImpl,
64 yielding: YieldSpec,
65}
66
67impl Default for LinearJoinSpec {
68 fn default() -> Self {
69 Self {
70 implementation: LinearJoinImpl::Materialize,
71 yielding: Default::default(),
72 }
73 }
74}
75
76impl LinearJoinSpec {
77 pub fn from_config(config: &ConfigSet) -> Self {
79 let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
80 LinearJoinImpl::Materialize
81 } else {
82 LinearJoinImpl::DifferentialDataflow
83 };
84
85 let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
86 let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
87 tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
88 YieldSpec::default()
89 });
90
91 Self {
92 implementation,
93 yielding,
94 }
95 }
96
97 fn render<'s, T, Tr1, Tr2, L, I>(
99 &self,
100 arranged1: Arranged<'s, Tr1>,
101 arranged2: Arranged<'s, Tr2>,
102 result: L,
103 ) -> VecCollection<'s, T, I::Item, Diff>
104 where
105 T: Lattice + timely::progress::Timestamp,
106 Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
107 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
108 L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
109 I: IntoIterator<Item: Data> + 'static,
110 {
111 use LinearJoinImpl::*;
112
113 match (
114 self.implementation,
115 self.yielding.after_work,
116 self.yielding.after_time,
117 ) {
118 (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
119 (Materialize, Some(work_limit), Some(time_limit)) => {
120 let yield_fn =
121 move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
122 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
123 }
124 (Materialize, Some(work_limit), None) => {
125 let yield_fn = move |_start, work| work >= work_limit;
126 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
127 }
128 (Materialize, None, Some(time_limit)) => {
129 let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
130 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
131 }
132 (Materialize, None, None) => {
133 let yield_fn = |_start, _work| false;
134 mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
135 }
136 }
137 }
138}
139
140#[derive(Clone, Copy)]
142struct YieldSpec {
143 after_work: Option<usize>,
145 after_time: Option<Duration>,
147}
148
149impl Default for YieldSpec {
150 fn default() -> Self {
151 Self {
152 after_work: Some(1_000_000),
153 after_time: Some(Duration::from_millis(100)),
154 }
155 }
156}
157
158impl YieldSpec {
159 fn try_from_str(s: &str) -> Option<Self> {
160 let mut after_work = None;
161 let mut after_time = None;
162
163 let options = s.split(',').map(|o| o.trim());
164 for option in options {
165 let mut iter = option.split(':').map(|p| p.trim());
166 match std::array::from_fn(|_| iter.next()) {
167 [Some("work"), Some(amount), None] => {
168 let amount = amount.parse().ok()?;
169 after_work = Some(amount);
170 }
171 [Some("time"), Some(millis), None] => {
172 let millis = millis.parse().ok()?;
173 let duration = Duration::from_millis(millis);
174 after_time = Some(duration);
175 }
176 _ => return None,
177 }
178 }
179
180 Some(Self {
181 after_work,
182 after_time,
183 })
184 }
185}
186
187enum JoinedFlavor<'scope, T: RenderTimestamp> {
189 Collection(VecCollection<'scope, T, Row, Diff>),
191 Local(Arranged<'scope, RowRowAgent<T, Diff>>),
193 Trace(Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>),
195}
196
197impl<'scope, T> Context<'scope, T>
198where
199 T: Lattice + RenderTimestamp,
200{
201 pub(crate) fn render_join(
202 &self,
203 inputs: Vec<CollectionBundle<'scope, T>>,
204 linear_plan: LinearJoinPlan,
205 ) -> CollectionBundle<'scope, T> {
206 self.scope.clone().region_named("Join(Linear)", |inner| {
207 self.render_join_inner(inputs, linear_plan, inner)
208 })
209 }
210
211 fn render_join_inner(
212 &self,
213 inputs: Vec<CollectionBundle<'scope, T>>,
214 linear_plan: LinearJoinPlan,
215 inner: Scope<'_, T>,
216 ) -> CollectionBundle<'scope, T> {
217 let mut errors = Vec::new();
219
220 let arrangement = linear_plan
225 .stage_plans
226 .get(0)
227 .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
228 let mut joined = match (arrangement, linear_plan.initial_closure) {
230 (Some(ArrangementFlavor::Local(oks, errs)), None) => {
231 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
232 JoinedFlavor::Local(oks.enter_region(inner))
233 }
234 (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
235 errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
236 JoinedFlavor::Trace(oks.enter_region(inner))
237 }
238 (_, initial_closure) => {
239 let (joined, errs) = inputs[linear_plan.source_relation]
242 .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
243 errors.push(errs.enter_region(inner));
244 let mut joined = joined.enter_region(inner);
245
246 if let Some(closure) = initial_closure {
249 let name = "LinearJoinInitialization";
253 type CB<C> = ConsolidatingContainerBuilder<C>;
254 let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
255 let mut datums = DatumVec::new();
257 move |row| {
258 let mut row_builder = SharedRow::get();
259 let temp_storage = RowArena::new();
260 let mut datums_local = datums.borrow_with(&row);
261 closure
263 .apply(&mut datums_local, &temp_storage, &mut row_builder)
264 .map(|row| row.cloned())
265 .map_err(DataflowErrorSer::from)
266 .transpose()
267 }
268 });
269 joined = j;
270 errors.push(errs);
271 }
272
273 JoinedFlavor::Collection(joined)
274 }
275 };
276
277 for stage_plan in linear_plan.stage_plans.into_iter() {
279 let stream = self.differential_join(
282 joined,
283 inputs[stage_plan.lookup_relation].enter_region(inner),
284 stage_plan,
285 &mut errors,
286 );
287 joined = JoinedFlavor::Collection(stream);
289 }
290
291 let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
295 if let Some(closure) = linear_plan.final_closure {
296 let name = "LinearJoinFinalization";
297 type CB<C> = ConsolidatingContainerBuilder<C>;
298 let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
299 let mut datums = DatumVec::new();
301 move |row| {
302 let mut row_builder = SharedRow::get();
303 let temp_storage = RowArena::new();
304 let mut datums_local = datums.borrow_with(&row);
305 closure
307 .apply(&mut datums_local, &temp_storage, &mut row_builder)
308 .map(|row| row.cloned())
309 .map_err(DataflowErrorSer::from)
310 .transpose()
311 }
312 });
313
314 joined = updates;
315 errors.push(errs);
316 }
317
318 CollectionBundle::from_collections(
320 joined,
321 differential_dataflow::collection::concatenate(inner, errors),
322 )
323 } else {
324 panic!("Unexpectedly arranged join output");
325 };
326 bundle.leave_region(self.scope)
327 }
328
329 fn differential_join<'s>(
332 &self,
333 mut joined: JoinedFlavor<'s, T>,
334 lookup_relation: CollectionBundle<'s, T>,
335 LinearStagePlan {
336 stream_key,
337 stream_thinning,
338 lookup_key,
339 closure,
340 lookup_relation: _,
341 }: LinearStagePlan,
342 errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
343 ) -> VecCollection<'s, T, Row, Diff> {
344 if let JoinedFlavor::Collection(stream) = joined {
346 let name = "LinearJoinKeyPreparation";
347 let (keyed, errs) = stream
348 .inner
349 .unary_fallible::<ColumnBuilder<((Row, Row), T, Diff)>, _, _, _>(
350 Pipeline,
351 name,
352 |_, _| {
353 Box::new(move |input, ok, errs| {
354 let mut temp_storage = RowArena::new();
355 let mut key_buf = Row::default();
356 let mut val_buf = Row::default();
357 let mut datums = DatumVec::new();
358 input.for_each(|time, data| {
359 let mut ok_session = ok.session_with_builder(&time);
360 let mut err_session = errs.session(&time);
361 for (row, time, diff) in data.iter() {
362 temp_storage.clear();
363 let datums_local = datums.borrow_with(row);
364 let datums = stream_key
365 .iter()
366 .map(|e| e.eval(&datums_local, &temp_storage));
367 let result = key_buf.packer().try_extend(datums);
368 match result {
369 Ok(()) => {
370 val_buf.packer().extend(
371 stream_thinning.iter().map(|e| datums_local[*e]),
372 );
373 ok_session.give(((&key_buf, &val_buf), time, diff));
374 }
375 Err(e) => {
376 err_session.give((e.into(), time.clone(), *diff));
377 }
378 }
379 }
380 });
381 })
382 },
383 );
384
385 errors.push(errs.as_collection());
386
387 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
388 columnar_exchange::<Row, Row, T, Diff>,
389 );
390 let arranged = if ENABLE_COLUMN_PAGED_BATCHER.get(&self.config_set) {
391 keyed.mz_arrange_core::<
392 _,
393 batcher::ColumnChunker<_>,
394 Col2ValPagedBatcher<_, _, _, _>,
395 RowRowColPagedBuilder<_, _>,
396 RowRowSpine<_, _>,
397 >(exchange, "JoinStage")
398 } else {
399 keyed.mz_arrange_core::<
400 _,
401 batcher::Chunker<_>,
402 Col2ValBatcher<_, _, _, _>,
403 RowRowBuilder<_, _>,
404 RowRowSpine<_, _>,
405 >(exchange, "JoinStage")
406 };
407 joined = JoinedFlavor::Local(arranged);
408 }
409
410 let arrangement = lookup_relation
412 .arrangement(&lookup_key[..])
413 .expect("Arrangement absent despite explicit construction");
414
415 match joined {
416 JoinedFlavor::Collection(_) => {
417 unreachable!("JoinedFlavor::VecCollection variant avoided at top of method");
418 }
419 JoinedFlavor::Local(local) => match arrangement {
420 ArrangementFlavor::Local(oks, errs1) => {
421 let (oks, errs2) = self
422 .differential_join_inner::<RowRowAgent<_, _>, RowRowAgent<_, _>>(
423 local, oks, closure,
424 );
425
426 errors.push(errs1.as_collection(|k, _v| k.clone()));
427 errors.extend(errs2);
428 oks
429 }
430 ArrangementFlavor::Trace(_gid, oks, errs1) => {
431 let (oks, errs2) = self
432 .differential_join_inner::<RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
433 local, oks, closure,
434 );
435
436 errors.push(errs1.as_collection(|k, _v| k.clone()));
437 errors.extend(errs2);
438 oks
439 }
440 },
441 JoinedFlavor::Trace(trace) => match arrangement {
442 ArrangementFlavor::Local(oks, errs1) => {
443 let (oks, errs2) = self
444 .differential_join_inner::<RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
445 trace, oks, closure,
446 );
447
448 errors.push(errs1.as_collection(|k, _v| k.clone()));
449 errors.extend(errs2);
450 oks
451 }
452 ArrangementFlavor::Trace(_gid, oks, errs1) => {
453 let (oks, errs2) = self
454 .differential_join_inner::<RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
455 trace, oks, closure,
456 );
457
458 errors.push(errs1.as_collection(|k, _v| k.clone()));
459 errors.extend(errs2);
460 oks
461 }
462 },
463 }
464 }
465
466 fn differential_join_inner<'s, Tr1, Tr2>(
473 &self,
474 prev_keyed: Arranged<'s, Tr1>,
475 next_input: Arranged<'s, Tr2>,
476 closure: JoinClosure,
477 ) -> (
478 VecCollection<'s, T, Row, Diff>,
479 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
480 )
481 where
482 Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
483 Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
484 for<'a> Tr1::Key<'a>: ToDatumIter,
485 for<'a> Tr1::Val<'a>: ToDatumIter,
486 for<'a> Tr2::Val<'a>: ToDatumIter,
487 {
488 let mut datums = DatumVec::new();
490
491 if closure.could_error() {
492 let (oks, err) = self
493 .linear_join_spec
494 .render(prev_keyed, next_input, move |key, old, new| {
495 let mut row_builder = SharedRow::get();
496 let temp_storage = RowArena::new();
497
498 let mut datums_local = datums.borrow();
499 datums_local.extend(key.to_datum_iter());
500 datums_local.extend(old.to_datum_iter());
501 datums_local.extend(new.to_datum_iter());
502
503 closure
504 .apply(&mut datums_local, &temp_storage, &mut row_builder)
505 .map(|row| row.cloned())
506 .map_err(DataflowErrorSer::from)
507 .transpose()
508 })
509 .inner
510 .ok_err(|(x, t, d)| {
511 match x {
513 Ok(x) => Ok((x, t, d)),
514 Err(x) => Err((x, t, d)),
515 }
516 });
517
518 (oks.as_collection(), Some(err.as_collection()))
519 } else {
520 let oks = self
521 .linear_join_spec
522 .render(prev_keyed, next_input, move |key, old, new| {
523 let mut row_builder = SharedRow::get();
524 let temp_storage = RowArena::new();
525
526 let mut datums_local = datums.borrow();
527 datums_local.extend(key.to_datum_iter());
528 datums_local.extend(old.to_datum_iter());
529 datums_local.extend(new.to_datum_iter());
530
531 closure
532 .apply(&mut datums_local, &temp_storage, &mut row_builder)
533 .expect("Closure claimed to never error")
534 .cloned()
535 });
536
537 (oks, None)
538 }
539 }
540}