use std::time::{Duration, Instant};
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::arrangement::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::{AsCollection, Collection, Data};
use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
use mz_compute_types::plan::join::JoinClosure;
use mz_dyncfg::ConfigSet;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;
use timely::container::columnation::Columnation;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::operators::OkErr;
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::{Refines, Timestamp};
use crate::extensions::arrange::MzArrange;
use crate::render::context::{
ArrangementFlavor, CollectionBundle, Context, MzArrangement, MzArrangementImport, ShutdownToken,
};
use crate::render::join::mz_join_core::mz_join_core;
use crate::row_spine::RowRowSpine;
use crate::typedefs::{RowRowAgent, RowRowEnter};
#[derive(Clone, Copy)]
enum LinearJoinImpl {
Materialize,
DifferentialDataflow,
}
#[derive(Clone, Copy)]
pub struct LinearJoinSpec {
implementation: LinearJoinImpl,
yielding: YieldSpec,
}
impl Default for LinearJoinSpec {
fn default() -> Self {
Self {
implementation: LinearJoinImpl::Materialize,
yielding: Default::default(),
}
}
}
impl LinearJoinSpec {
pub fn from_config(config: &ConfigSet) -> Self {
let implementation = match ENABLE_MZ_JOIN_CORE.get(config) {
true => LinearJoinImpl::Materialize,
false => LinearJoinImpl::DifferentialDataflow,
};
let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
YieldSpec::default()
});
Self {
implementation,
yielding,
}
}
fn render<G, Tr1, Tr2, L, I>(
&self,
arranged1: &Arranged<G, Tr1>,
arranged2: &Arranged<G, Tr2>,
shutdown_token: ShutdownToken,
result: L,
) -> Collection<G, I::Item, Diff>
where
G: Scope,
G::Timestamp: Lattice,
Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
+ Clone
+ 'static,
L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
I: IntoIterator,
I::Item: Data,
{
use LinearJoinImpl::*;
match (
self.implementation,
self.yielding.after_work,
self.yielding.after_time,
) {
(DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
(Materialize, Some(work_limit), Some(time_limit)) => {
let yield_fn =
move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
}
(Materialize, Some(work_limit), None) => {
let yield_fn = move |_start, work| work >= work_limit;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
}
(Materialize, None, Some(time_limit)) => {
let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
}
(Materialize, None, None) => {
let yield_fn = |_start, _work| false;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
}
}
}
}
#[derive(Clone, Copy)]
struct YieldSpec {
after_work: Option<usize>,
after_time: Option<Duration>,
}
impl Default for YieldSpec {
fn default() -> Self {
Self {
after_work: Some(1_000_000),
after_time: Some(Duration::from_millis(100)),
}
}
}
impl YieldSpec {
fn try_from_str(s: &str) -> Option<Self> {
let mut after_work = None;
let mut after_time = None;
let options = s.split(',').map(|o| o.trim());
for option in options {
let parts: Vec<_> = option.split(':').map(|p| p.trim()).collect();
match &parts[..] {
["work", amount] => {
let amount = amount.parse().ok()?;
after_work = Some(amount);
}
["time", millis] => {
let millis = millis.parse().ok()?;
let duration = Duration::from_millis(millis);
after_time = Some(duration);
}
_ => return None,
}
}
Some(Self {
after_work,
after_time,
})
}
}
enum JoinedFlavor<G, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
{
Collection(Collection<G, Row, Diff>),
Local(MzArrangement<G>),
Trace(MzArrangementImport<G, T>),
}
impl<G, T> Context<G, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
{
pub(crate) fn render_join(
&self,
inputs: Vec<CollectionBundle<G, T>>,
linear_plan: LinearJoinPlan,
) -> CollectionBundle<G, T> {
self.scope.clone().region_named("Join(Linear)", |inner| {
self.render_join_inner(inputs, linear_plan, inner)
})
}
fn render_join_inner(
&self,
inputs: Vec<CollectionBundle<G, T>>,
linear_plan: LinearJoinPlan,
inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
) -> CollectionBundle<G, T> {
let mut errors = Vec::new();
let arrangement = linear_plan
.stage_plans
.get(0)
.and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
let mut joined = match (arrangement, linear_plan.initial_closure) {
(Some(ArrangementFlavor::Local(oks, errs)), None) => {
errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
JoinedFlavor::Local(oks.enter_region(inner))
}
(Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
JoinedFlavor::Trace(oks.enter_region(inner))
}
(_, initial_closure) => {
let (joined, errs) = inputs[linear_plan.source_relation]
.as_specific_collection(linear_plan.source_key.as_deref());
errors.push(errs.enter_region(inner));
let mut joined = joined.enter_region(inner);
if let Some(closure) = initial_closure {
let name = "LinearJoinInitialization";
type CB<C> = ConsolidatingContainerBuilder<C>;
let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
let mut datums = DatumVec::new();
move |row| {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with(&row);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.map_err(DataflowError::from)
.transpose()
}
});
joined = j;
errors.push(errs);
}
JoinedFlavor::Collection(joined)
}
};
for stage_plan in linear_plan.stage_plans.into_iter() {
let stream = self.differential_join(
joined,
inputs[stage_plan.lookup_relation].enter_region(inner),
stage_plan,
&mut errors,
);
joined = JoinedFlavor::Collection(stream);
}
let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
if let Some(closure) = linear_plan.final_closure {
let name = "LinearJoinFinalization";
type CB<C> = ConsolidatingContainerBuilder<C>;
let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
let mut datums = DatumVec::new();
move |row| {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with(&row);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.map_err(DataflowError::from)
.transpose()
}
});
joined = updates;
errors.push(errs);
}
CollectionBundle::from_collections(
joined,
differential_dataflow::collection::concatenate(inner, errors),
)
} else {
panic!("Unexpectedly arranged join output");
};
bundle.leave_region()
}
fn differential_join<S>(
&self,
mut joined: JoinedFlavor<S, T>,
lookup_relation: CollectionBundle<S, T>,
LinearStagePlan {
stream_key,
stream_thinning,
lookup_key,
closure,
lookup_relation: _,
}: LinearStagePlan,
errors: &mut Vec<Collection<S, DataflowError, Diff>>,
) -> Collection<S, Row, Diff>
where
S: Scope<Timestamp = G::Timestamp>,
{
if let JoinedFlavor::Collection(stream) = joined {
let name = "LinearJoinKeyPreparation";
type CB<C> = CapacityContainerBuilder<C>;
let (keyed, errs) = stream.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
let mut datums = DatumVec::new();
move |row| {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let temp_storage = RowArena::new();
let datums_local = datums.borrow_with(&row);
row_builder.packer().try_extend(
stream_key
.iter()
.map(|e| e.eval(&datums_local, &temp_storage)),
)?;
let key = row_builder.clone();
row_builder
.packer()
.extend(stream_thinning.iter().map(|e| datums_local[*e]));
let value = row_builder.clone();
Ok((key, value))
}
});
errors.push(errs);
let arranged = keyed.mz_arrange::<RowRowSpine<_, _>>("JoinStage");
joined = JoinedFlavor::Local(MzArrangement::RowRow(arranged));
}
let arrangement = lookup_relation
.arrangement(&lookup_key[..])
.expect("Arrangement absent despite explicit construction");
use MzArrangement as A;
use MzArrangementImport as I;
match joined {
JoinedFlavor::Collection(_) => {
unreachable!("JoinedFlavor::Collection variant avoided at top of method");
}
JoinedFlavor::Local(local) => match arrangement {
ArrangementFlavor::Local(oks, errs1) => {
let (oks, errs2) = match (local, oks) {
(A::RowRow(prev_keyed), A::RowRow(next_input)) => self
.differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
prev_keyed, next_input, closure,
),
};
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
ArrangementFlavor::Trace(_gid, oks, errs1) => {
let (oks, errs2) = match (local, oks) {
(A::RowRow(prev_keyed), I::RowRow(next_input)) => self
.differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
prev_keyed, next_input, closure,
),
};
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
},
JoinedFlavor::Trace(trace) => match arrangement {
ArrangementFlavor::Local(oks, errs1) => {
let (oks, errs2) = match (trace, oks) {
(I::RowRow(prev_keyed), A::RowRow(next_input)) => self
.differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
prev_keyed, next_input, closure,
),
};
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
ArrangementFlavor::Trace(_gid, oks, errs1) => {
let (oks, errs2) = match (trace, oks) {
(I::RowRow(prev_keyed), I::RowRow(next_input)) => self
.differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
prev_keyed, next_input, closure,
),
};
errors.push(errs1.as_collection(|k, _v| k.clone()));
errors.extend(errs2);
oks
}
},
}
}
fn differential_join_inner<S, Tr1, Tr2>(
&self,
prev_keyed: Arranged<S, Tr1>,
next_input: Arranged<S, Tr2>,
closure: JoinClosure,
) -> (
Collection<S, Row, Diff>,
Option<Collection<S, DataflowError, Diff>>,
)
where
S: Scope<Timestamp = G::Timestamp>,
Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
+ Clone
+ 'static,
for<'a> Tr1::Key<'a>: ToDatumIter,
for<'a> Tr1::Val<'a>: ToDatumIter,
for<'a> Tr2::Val<'a>: ToDatumIter,
{
let mut datums = DatumVec::new();
if closure.could_error() {
let (oks, err) = self
.linear_join_spec
.render(
&prev_keyed,
&next_input,
self.shutdown_token.clone(),
move |key, old, new| {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let temp_storage = RowArena::new();
let key = key.to_datum_iter();
let old = old.to_datum_iter();
let new = new.to_datum_iter();
let mut datums_local = datums.borrow();
datums_local.extend(key);
datums_local.extend(old);
datums_local.extend(new);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.map_err(DataflowError::from)
.transpose()
},
)
.inner
.ok_err(|(x, t, d)| {
match x {
Ok(x) => Ok((x, t, d)),
Err(x) => Err((x, t, d)),
}
});
(oks.as_collection(), Some(err.as_collection()))
} else {
let oks = self.linear_join_spec.render(
&prev_keyed,
&next_input,
self.shutdown_token.clone(),
move |key, old, new| {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let temp_storage = RowArena::new();
let key = key.to_datum_iter();
let old = old.to_datum_iter();
let new = new.to_datum_iter();
let mut datums_local = datums.borrow();
datums_local.extend(key);
datums_local.extend(old);
datums_local.extend(new);
closure
.apply(&mut datums_local, &temp_storage, &mut row_builder)
.expect("Closure claimed to never error")
},
);
(oks, None)
}
}
}