use std::collections::BTreeMap;
use std::fmt::Debug;
use mz_repr::{ColumnType, Datum, RelationType, Row, RowArena, ScalarType};
use crate::{
BinaryFunc, EvalError, MapFilterProject, MfpPlan, MirScalarExpr, UnaryFunc,
UnmaterializableFunc, VariadicFunc,
};
#[derive(Clone, Eq, PartialEq, Debug)]
enum Values<'a> {
Empty,
Within(Datum<'a>, Datum<'a>),
Nested(BTreeMap<Datum<'a>, ResultSpec<'a>>),
All,
}
impl<'a> Values<'a> {
fn just(a: Datum<'a>) -> Values<'a> {
match a {
Datum::Map(datum_map) => Values::Nested(
datum_map
.iter()
.map(|(key, val)| (key.into(), ResultSpec::value(val)))
.collect(),
),
other => Self::Within(other, other),
}
}
fn union(self, other: Values<'a>) -> Values<'a> {
match (self, other) {
(Values::Empty, r) => r,
(r, Values::Empty) => r,
(Values::Within(a0, a1), Values::Within(b0, b1)) => {
Values::Within(a0.min(b0), a1.max(b1))
}
(Values::Nested(mut a), Values::Nested(mut b)) => {
a.retain(|datum, values| {
if let Some(other_values) = b.remove(datum) {
*values = values.clone().union(other_values);
}
*values != ResultSpec::anything()
});
if a.is_empty() {
Values::All
} else {
Values::Nested(a)
}
}
_ => Values::All,
}
}
fn intersect(self, other: Values<'a>) -> Values<'a> {
match (self, other) {
(Values::Empty, _) => Values::Empty,
(_, Values::Empty) => Values::Empty,
(Values::Within(a0, a1), Values::Within(b0, b1)) => {
let min = a0.max(b0);
let max = a1.min(b1);
if min <= max {
Values::Within(min, max)
} else {
Values::Empty
}
}
(Values::Nested(mut a), Values::Nested(b)) => {
for (datum, other_spec) in b {
let spec = a.entry(datum).or_insert_with(ResultSpec::anything);
*spec = spec.clone().intersect(other_spec);
}
Values::Nested(a)
}
(Values::All, v) => v,
(v, Values::All) => v,
(Values::Nested(_), Values::Within(_, _)) => Values::Empty,
(Values::Within(_, _), Values::Nested(_)) => Values::Empty,
}
}
fn may_contain(&self, value: Datum<'a>) -> bool {
match self {
Values::Empty => false,
Values::Within(min, max) => *min <= value && value <= *max,
Values::All => true,
Values::Nested(field_map) => match value {
Datum::Map(datum_map) => {
datum_map
.iter()
.all(|(key, val)| match field_map.get(&key.into()) {
None => true,
Some(nested) => nested.may_contain(val),
})
}
_ => false,
},
}
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ResultSpec<'a> {
nullable: bool,
fallible: bool,
values: Values<'a>,
}
impl<'a> ResultSpec<'a> {
pub fn nothing() -> Self {
ResultSpec {
nullable: false,
fallible: false,
values: Values::Empty,
}
}
pub fn anything() -> Self {
ResultSpec {
nullable: true,
fallible: true,
values: Values::All,
}
}
pub fn any_infallible() -> Self {
ResultSpec {
nullable: true,
fallible: false,
values: Values::All,
}
}
pub fn null() -> Self {
ResultSpec {
nullable: true,
..Self::nothing()
}
}
pub fn fails() -> Self {
ResultSpec {
fallible: true,
..Self::nothing()
}
}
pub fn has_type(col: &ColumnType, fallible: bool) -> ResultSpec<'a> {
let values = match &col.scalar_type {
ScalarType::Bool => Values::Within(Datum::False, Datum::True),
_ => Values::All,
};
ResultSpec {
nullable: col.nullable,
fallible,
values,
}
}
pub fn value(value: Datum<'a>) -> ResultSpec<'a> {
match value {
Datum::Null => Self::null(),
nonnull => ResultSpec {
values: Values::just(nonnull),
..Self::nothing()
},
}
}
pub fn value_between(min: Datum<'a>, max: Datum<'a>) -> ResultSpec<'a> {
assert!(!min.is_null());
assert!(!max.is_null());
if min <= max {
ResultSpec {
values: Values::Within(min, max),
..ResultSpec::nothing()
}
} else {
ResultSpec::nothing()
}
}
pub fn value_all() -> ResultSpec<'a> {
ResultSpec {
values: Values::All,
..ResultSpec::nothing()
}
}
pub fn map_spec(map: BTreeMap<Datum<'a>, ResultSpec<'a>>) -> ResultSpec<'a> {
ResultSpec {
values: Values::Nested(map),
..ResultSpec::nothing()
}
}
pub fn union(self, other: ResultSpec<'a>) -> ResultSpec<'a> {
ResultSpec {
nullable: self.nullable || other.nullable,
fallible: self.fallible || other.fallible,
values: self.values.union(other.values),
}
}
pub fn intersect(self, other: ResultSpec<'a>) -> ResultSpec<'a> {
ResultSpec {
nullable: self.nullable && other.nullable,
fallible: self.fallible && other.fallible,
values: self.values.intersect(other.values),
}
}
pub fn may_contain(&self, value: Datum<'a>) -> bool {
if value == Datum::Null {
return self.nullable;
}
self.values.may_contain(value)
}
pub fn may_fail(&self) -> bool {
self.fallible
}
fn flat_map(
&self,
is_monotone: bool,
mut result_map: impl FnMut(Result<Datum<'a>, EvalError>) -> ResultSpec<'a>,
) -> ResultSpec<'a> {
let null_spec = if self.nullable {
result_map(Ok(Datum::Null))
} else {
ResultSpec::nothing()
};
let error_spec = if self.fallible {
let map_err = result_map(Err(EvalError::Internal("".into())));
let raise_err = ResultSpec::fails();
raise_err.union(map_err)
} else {
ResultSpec::nothing()
};
let values_spec = match self.values {
Values::Empty => ResultSpec::nothing(),
Values::Within(min, max) if min == max => result_map(Ok(min)),
Values::Within(Datum::False, Datum::True) => {
result_map(Ok(Datum::False)).union(result_map(Ok(Datum::True)))
}
Values::Within(min, max) if is_monotone => {
let min_result = result_map(Ok(min));
let max_result = result_map(Ok(max));
match (min_result, max_result) {
(
ResultSpec {
nullable: false,
fallible: false,
values: a_values,
},
ResultSpec {
nullable: false,
fallible: false,
values: b_values,
},
) => ResultSpec {
nullable: false,
fallible: false,
values: a_values.union(b_values),
},
(
ResultSpec {
nullable: true,
fallible: false,
values: Values::Empty,
},
ResultSpec {
nullable: true,
fallible: false,
values: Values::Empty,
},
) => ResultSpec::null(),
_ => ResultSpec::anything(),
}
}
Values::Within(_, _) | Values::Nested(_) | Values::All => ResultSpec::anything(),
};
null_spec.union(error_spec).union(values_spec)
}
}
pub trait Interpreter {
type Summary: Clone + Debug + Sized;
fn column(&self, id: usize) -> Self::Summary;
fn literal(&self, result: &Result<Row, EvalError>, col_type: &ColumnType) -> Self::Summary;
fn unmaterializable(&self, func: &UnmaterializableFunc) -> Self::Summary;
fn unary(&self, func: &UnaryFunc, expr: Self::Summary) -> Self::Summary;
fn binary(&self, func: &BinaryFunc, left: Self::Summary, right: Self::Summary)
-> Self::Summary;
fn variadic(&self, func: &VariadicFunc, exprs: Vec<Self::Summary>) -> Self::Summary;
fn cond(&self, cond: Self::Summary, then: Self::Summary, els: Self::Summary) -> Self::Summary;
fn expr(&self, expr: &MirScalarExpr) -> Self::Summary {
match expr {
MirScalarExpr::Column(id) => self.column(*id),
MirScalarExpr::Literal(value, col_type) => self.literal(value, col_type),
MirScalarExpr::CallUnmaterializable(func) => self.unmaterializable(func),
MirScalarExpr::CallUnary { func, expr } => {
let expr_range = self.expr(expr);
self.unary(func, expr_range)
}
MirScalarExpr::CallBinary { func, expr1, expr2 } => {
let expr1_range = self.expr(expr1);
let expr2_range = self.expr(expr2);
self.binary(func, expr1_range, expr2_range)
}
MirScalarExpr::CallVariadic { func, exprs } => {
let exprs: Vec<_> = exprs.into_iter().map(|e| self.expr(e)).collect();
self.variadic(func, exprs)
}
MirScalarExpr::If { cond, then, els } => {
let cond_range = self.expr(cond);
let then_range = self.expr(then);
let els_range = self.expr(els);
self.cond(cond_range, then_range, els_range)
}
}
}
fn mfp_filter(&self, mfp: &MapFilterProject) -> Self::Summary {
let mfp_eval = MfpEval::new(self, mfp.input_arity, &mfp.expressions);
let predicates = mfp
.predicates
.iter()
.map(|(_, e)| mfp_eval.expr(e))
.collect();
mfp_eval.variadic(&VariadicFunc::And, predicates)
}
fn mfp_plan_filter(&self, plan: &MfpPlan) -> Self::Summary {
let mfp_eval = MfpEval::new(self, plan.mfp.input_arity, &plan.mfp.expressions);
let mut results: Vec<_> = plan
.mfp
.predicates
.iter()
.map(|(_, e)| mfp_eval.expr(e))
.collect();
let mz_now = mfp_eval.unmaterializable(&UnmaterializableFunc::MzNow);
for bound in &plan.lower_bounds {
let bound_range = mfp_eval.expr(bound);
let result = mfp_eval.binary(&BinaryFunc::Lte, bound_range, mz_now.clone());
results.push(result);
}
for bound in &plan.upper_bounds {
let bound_range = mfp_eval.expr(bound);
let result = mfp_eval.binary(&BinaryFunc::Gte, bound_range, mz_now.clone());
results.push(result);
}
self.variadic(&VariadicFunc::And, results)
}
}
pub(crate) struct MfpEval<'a, E: Interpreter + ?Sized> {
evaluator: &'a E,
input_arity: usize,
expressions: Vec<E::Summary>,
}
impl<'a, E: Interpreter + ?Sized> MfpEval<'a, E> {
pub(crate) fn new(evaluator: &'a E, input_arity: usize, expressions: &[MirScalarExpr]) -> Self {
let mut mfp_eval = MfpEval {
evaluator,
input_arity,
expressions: vec![],
};
for expr in expressions {
let result = mfp_eval.expr(expr);
mfp_eval.expressions.push(result);
}
mfp_eval
}
}
impl<'a, E: Interpreter + ?Sized> Interpreter for MfpEval<'a, E> {
type Summary = E::Summary;
fn column(&self, id: usize) -> Self::Summary {
if id < self.input_arity {
self.evaluator.column(id)
} else {
self.expressions[id - self.input_arity].clone()
}
}
fn literal(&self, result: &Result<Row, EvalError>, col_type: &ColumnType) -> Self::Summary {
self.evaluator.literal(result, col_type)
}
fn unmaterializable(&self, func: &UnmaterializableFunc) -> Self::Summary {
self.evaluator.unmaterializable(func)
}
fn unary(&self, func: &UnaryFunc, expr: Self::Summary) -> Self::Summary {
self.evaluator.unary(func, expr)
}
fn binary(
&self,
func: &BinaryFunc,
left: Self::Summary,
right: Self::Summary,
) -> Self::Summary {
self.evaluator.binary(func, left, right)
}
fn variadic(&self, func: &VariadicFunc, exprs: Vec<Self::Summary>) -> Self::Summary {
self.evaluator.variadic(func, exprs)
}
fn cond(&self, cond: Self::Summary, then: Self::Summary, els: Self::Summary) -> Self::Summary {
self.evaluator.cond(cond, then, els)
}
}
struct SpecialUnary {
map_fn: for<'a, 'b> fn(&'b ColumnSpecs<'a>, ResultSpec<'a>) -> ResultSpec<'a>,
pushdownable: bool,
}
impl SpecialUnary {
fn for_func(func: &UnaryFunc) -> Option<SpecialUnary> {
fn eagerly<'b>(
spec: ResultSpec<'b>,
value_fn: impl FnOnce(Values<'b>) -> ResultSpec<'b>,
) -> ResultSpec<'b> {
let result = match spec.values {
Values::Empty => ResultSpec::nothing(),
other => value_fn(other),
};
ResultSpec {
fallible: spec.fallible || result.fallible,
nullable: spec.nullable || result.nullable,
values: result.values,
}
}
match func {
UnaryFunc::TryParseMonotonicIso8601Timestamp(_) => Some(SpecialUnary {
map_fn: |specs, range| {
let expr = MirScalarExpr::CallUnary {
func: UnaryFunc::TryParseMonotonicIso8601Timestamp(
crate::func::TryParseMonotonicIso8601Timestamp,
),
expr: Box::new(MirScalarExpr::Column(0)),
};
let eval = |d| specs.eval_result(expr.eval(&[d], specs.arena));
eagerly(range, |values| {
match values {
Values::Within(a, b) if a == b => eval(a),
Values::Within(a, b) => {
let spec = eval(a).union(eval(b));
let values_spec = if spec.nullable {
ResultSpec::value_all()
} else {
spec
};
values_spec.union(ResultSpec::null())
}
_ => ResultSpec::any_infallible(),
}
})
},
pushdownable: true,
}),
_ => None,
}
}
}
struct SpecialBinary {
map_fn: for<'a> fn(ResultSpec<'a>, ResultSpec<'a>) -> ResultSpec<'a>,
pushdownable: (bool, bool),
}
impl SpecialBinary {
fn for_func(func: &BinaryFunc) -> Option<SpecialBinary> {
fn eagerly<'b>(
left: ResultSpec<'b>,
right: ResultSpec<'b>,
value_fn: impl FnOnce(Values<'b>, Values<'b>) -> ResultSpec<'b>,
) -> ResultSpec<'b> {
let result = match (left.values, right.values) {
(Values::Empty, _) | (_, Values::Empty) => ResultSpec::nothing(),
(l, r) => value_fn(l, r),
};
ResultSpec {
fallible: left.fallible || right.fallible || result.fallible,
nullable: left.nullable || right.nullable || result.nullable,
values: result.values,
}
}
fn jsonb_get_string<'b>(
left: ResultSpec<'b>,
right: ResultSpec<'b>,
stringify: bool,
) -> ResultSpec<'b> {
eagerly(left, right, |left, right| {
let nested_spec = match (left, right) {
(Values::Nested(mut map_spec), Values::Within(key, key2)) if key == key2 => {
map_spec.remove(&key)
}
_ => None,
};
if let Some(field_spec) = nested_spec {
if stringify {
let values = match field_spec.values {
Values::Empty => Values::Empty,
Values::Within(min @ Datum::String(_), max @ Datum::String(_)) => {
Values::Within(min, max)
}
Values::Within(_, _) | Values::Nested(_) | Values::All => Values::All,
};
ResultSpec {
values,
..field_spec
}
} else {
field_spec
}
} else {
ResultSpec::any_infallible()
}
})
}
fn eq<'b>(left: ResultSpec<'b>, right: ResultSpec<'b>) -> ResultSpec<'b> {
eagerly(left, right, |left, right| {
let maybe_true = match left.clone().intersect(right.clone()) {
Values::Empty => ResultSpec::nothing(),
_ => ResultSpec::value(Datum::True),
};
let maybe_false = match left.union(right) {
Values::Within(a, b) if a == b => ResultSpec::nothing(),
_ => ResultSpec::value(Datum::False),
};
maybe_true.union(maybe_false)
})
}
match func {
BinaryFunc::JsonbGetString { stringify } => Some(SpecialBinary {
map_fn: if *stringify {
|l, r| jsonb_get_string(l, r, true)
} else {
|l, r| jsonb_get_string(l, r, false)
},
pushdownable: (true, false),
}),
BinaryFunc::Eq => Some(SpecialBinary {
map_fn: eq,
pushdownable: (true, true),
}),
_ => None,
}
}
}
#[derive(Clone, Debug)]
pub struct ColumnSpec<'a> {
pub col_type: ColumnType,
pub range: ResultSpec<'a>,
}
#[derive(Clone, Debug)]
pub struct ColumnSpecs<'a> {
pub relation: &'a RelationType,
pub columns: Vec<ResultSpec<'a>>,
pub unmaterializables: BTreeMap<UnmaterializableFunc, ResultSpec<'a>>,
pub arena: &'a RowArena,
}
impl<'a> ColumnSpecs<'a> {
const MAX_EVAL_ARGS: usize = 6;
pub fn new(relation: &'a RelationType, arena: &'a RowArena) -> Self {
let columns = relation
.column_types
.iter()
.map(|ct| ResultSpec::has_type(ct, false))
.collect();
ColumnSpecs {
relation,
columns,
unmaterializables: Default::default(),
arena,
}
}
pub fn push_column(&mut self, id: usize, update: ResultSpec<'a>) {
let range = self.columns.get_mut(id).expect("valid column id");
*range = range.clone().intersect(update);
}
pub fn push_unmaterializable(&mut self, func: UnmaterializableFunc, update: ResultSpec<'a>) {
let range = self
.unmaterializables
.entry(func.clone())
.or_insert_with(|| ResultSpec::has_type(&func.output_type(), true));
*range = range.clone().intersect(update);
}
fn eval_result<'b, E>(&self, result: Result<Datum<'b>, E>) -> ResultSpec<'a> {
match result {
Ok(Datum::Null) => ResultSpec {
nullable: true,
..ResultSpec::nothing()
},
Ok(d) => ResultSpec {
values: Values::just(self.arena.make_datum(|packer| packer.push(d))),
..ResultSpec::nothing()
},
Err(_) => ResultSpec {
fallible: true,
..ResultSpec::nothing()
},
}
}
fn set_literal(expr: &mut MirScalarExpr, update: Result<Datum, EvalError>) {
match expr {
MirScalarExpr::Literal(literal, col_type) => match update {
Err(error) => *literal = Err(error),
Ok(datum) => {
assert!(
datum.is_instance_of(col_type),
"{datum:?} must be an instance of {col_type:?}"
);
match literal {
Ok(row) => row.packer().push(datum),
literal => *literal = Ok(Row::pack_slice(&[datum])),
}
}
},
_ => panic!("not a literal"),
}
}
fn set_argument(expr: &mut MirScalarExpr, arg: usize, value: Result<Datum, EvalError>) {
match (expr, arg) {
(MirScalarExpr::CallUnary { expr, .. }, 0) => Self::set_literal(expr, value),
(MirScalarExpr::CallBinary { expr1, .. }, 0) => Self::set_literal(expr1, value),
(MirScalarExpr::CallBinary { expr2, .. }, 1) => Self::set_literal(expr2, value),
(MirScalarExpr::CallVariadic { exprs, .. }, n) if n < exprs.len() => {
Self::set_literal(&mut exprs[n], value)
}
_ => panic!("illegal argument for expression"),
}
}
fn placeholder(col_type: ColumnType) -> MirScalarExpr {
MirScalarExpr::Literal(Err(EvalError::Internal("".into())), col_type)
}
}
impl<'a> Interpreter for ColumnSpecs<'a> {
type Summary = ColumnSpec<'a>;
fn column(&self, id: usize) -> Self::Summary {
let col_type = self.relation.column_types[id].clone();
let range = self.columns[id].clone();
ColumnSpec { col_type, range }
}
fn literal(&self, result: &Result<Row, EvalError>, col_type: &ColumnType) -> Self::Summary {
let col_type = col_type.clone();
let range = self.eval_result(result.as_ref().map(|row| {
self.arena
.make_datum(|packer| packer.push(row.unpack_first()))
}));
ColumnSpec { col_type, range }
}
fn unmaterializable(&self, func: &UnmaterializableFunc) -> Self::Summary {
let col_type = func.output_type();
let range = self
.unmaterializables
.get(func)
.cloned()
.unwrap_or(ResultSpec::has_type(&func.output_type(), true));
ColumnSpec { col_type, range }
}
fn unary(&self, func: &UnaryFunc, summary: Self::Summary) -> Self::Summary {
let fallible = func.could_error() || summary.range.fallible;
let mapped_spec = if let Some(special) = SpecialUnary::for_func(func) {
(special.map_fn)(self, summary.range)
} else {
let is_monotone = func.is_monotone();
let mut expr = MirScalarExpr::CallUnary {
func: func.clone(),
expr: Box::new(Self::placeholder(summary.col_type.clone())),
};
summary.range.flat_map(is_monotone, |datum| {
Self::set_argument(&mut expr, 0, datum);
self.eval_result(expr.eval(&[], self.arena))
})
};
let col_type = func.output_type(summary.col_type);
let range = mapped_spec.intersect(ResultSpec::has_type(&col_type, fallible));
ColumnSpec { col_type, range }
}
fn binary(
&self,
func: &BinaryFunc,
left: Self::Summary,
right: Self::Summary,
) -> Self::Summary {
let (left_monotonic, right_monotonic) = func.is_monotone();
let fallible = func.could_error() || left.range.fallible || right.range.fallible;
let mapped_spec = if let Some(special) = SpecialBinary::for_func(func) {
(special.map_fn)(left.range, right.range)
} else {
let mut expr = MirScalarExpr::CallBinary {
func: func.clone(),
expr1: Box::new(Self::placeholder(left.col_type.clone())),
expr2: Box::new(Self::placeholder(right.col_type.clone())),
};
left.range.flat_map(left_monotonic, |left_result| {
Self::set_argument(&mut expr, 0, left_result);
right.range.flat_map(right_monotonic, |right_result| {
Self::set_argument(&mut expr, 1, right_result);
self.eval_result(expr.eval(&[], self.arena))
})
})
};
let col_type = func.output_type(left.col_type, right.col_type);
let range = mapped_spec.intersect(ResultSpec::has_type(&col_type, fallible));
ColumnSpec { col_type, range }
}
fn variadic(&self, func: &VariadicFunc, args: Vec<Self::Summary>) -> Self::Summary {
let fallible = func.could_error() || args.iter().any(|s| s.range.fallible);
if func.is_associative() && args.len() > 2 {
return args
.into_iter()
.reduce(|a, b| self.variadic(func, vec![a, b]))
.expect("reducing over a non-empty argument list");
}
let mapped_spec = if args.len() >= Self::MAX_EVAL_ARGS {
ResultSpec::anything()
} else {
fn eval_loop<'a>(
is_monotonic: bool,
expr: &mut MirScalarExpr,
args: &[ColumnSpec<'a>],
index: usize,
datum_map: &mut impl FnMut(&MirScalarExpr) -> ResultSpec<'a>,
) -> ResultSpec<'a> {
if index >= args.len() {
datum_map(expr)
} else {
args[index].range.flat_map(is_monotonic, |datum| {
ColumnSpecs::set_argument(expr, index, datum);
eval_loop(is_monotonic, expr, args, index + 1, datum_map)
})
}
}
let mut fn_expr = MirScalarExpr::CallVariadic {
func: func.clone(),
exprs: args
.iter()
.map(|spec| Self::placeholder(spec.col_type.clone()))
.collect(),
};
eval_loop(func.is_monotone(), &mut fn_expr, &args, 0, &mut |expr| {
self.eval_result(expr.eval(&[], self.arena))
})
};
let col_types = args.into_iter().map(|spec| spec.col_type).collect();
let col_type = func.output_type(col_types);
let range = mapped_spec.intersect(ResultSpec::has_type(&col_type, fallible));
ColumnSpec { col_type, range }
}
fn cond(&self, cond: Self::Summary, then: Self::Summary, els: Self::Summary) -> Self::Summary {
let col_type = ColumnType {
scalar_type: then.col_type.scalar_type,
nullable: then.col_type.nullable || els.col_type.nullable,
};
let range = cond
.range
.flat_map(true, |datum| match datum {
Ok(Datum::True) => then.range.clone(),
Ok(Datum::False) => els.range.clone(),
_ => ResultSpec::fails(),
})
.intersect(ResultSpec::has_type(&col_type, true));
ColumnSpec { col_type, range }
}
}
#[derive(Debug)]
pub struct Trace;
#[derive(Copy, Clone, Debug, PartialOrd, PartialEq, Ord, Eq)]
pub enum TraceSummary {
Constant,
Dynamic,
Unknown,
}
impl TraceSummary {
fn apply_fn(self, pushdownable: bool) -> Self {
match self {
TraceSummary::Constant => TraceSummary::Constant,
TraceSummary::Dynamic => match pushdownable {
true => TraceSummary::Dynamic,
false => TraceSummary::Unknown,
},
TraceSummary::Unknown => TraceSummary::Unknown,
}
}
pub fn pushdownable(self) -> bool {
match self {
TraceSummary::Constant | TraceSummary::Dynamic => true,
TraceSummary::Unknown => false,
}
}
}
impl Interpreter for Trace {
type Summary = TraceSummary;
fn column(&self, _id: usize) -> Self::Summary {
TraceSummary::Dynamic
}
fn literal(&self, _result: &Result<Row, EvalError>, _col_type: &ColumnType) -> Self::Summary {
TraceSummary::Constant
}
fn unmaterializable(&self, _func: &UnmaterializableFunc) -> Self::Summary {
TraceSummary::Dynamic
}
fn unary(&self, func: &UnaryFunc, expr: Self::Summary) -> Self::Summary {
let pushdownable = match SpecialUnary::for_func(func) {
None => func.is_monotone(),
Some(special) => special.pushdownable,
};
expr.apply_fn(pushdownable)
}
fn binary(
&self,
func: &BinaryFunc,
left: Self::Summary,
right: Self::Summary,
) -> Self::Summary {
let (left_pushdownable, right_pushdownable) = match SpecialBinary::for_func(func) {
None => func.is_monotone(),
Some(special) => special.pushdownable,
};
left.apply_fn(left_pushdownable)
.max(right.apply_fn(right_pushdownable))
}
fn variadic(&self, func: &VariadicFunc, exprs: Vec<Self::Summary>) -> Self::Summary {
if !func.is_associative() && exprs.len() >= ColumnSpecs::MAX_EVAL_ARGS {
return TraceSummary::Unknown;
}
let pushdownable_fn = func.is_monotone();
exprs
.into_iter()
.map(|pushdownable_arg| pushdownable_arg.apply_fn(pushdownable_fn))
.max()
.unwrap_or(TraceSummary::Constant)
}
fn cond(&self, cond: Self::Summary, then: Self::Summary, els: Self::Summary) -> Self::Summary {
let cond = cond.min(TraceSummary::Dynamic);
cond.max(then).max(els)
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use mz_repr::adt::datetime::DateTimeUnits;
use mz_repr::{Datum, PropDatum, RowArena, ScalarType};
use proptest::prelude::*;
use proptest::sample::{select, Index};
use crate::func::*;
use crate::{BinaryFunc, MirScalarExpr, UnaryFunc};
use super::*;
#[derive(Debug)]
struct ExpressionData {
relation_type: RelationType,
specs: Vec<ResultSpec<'static>>,
rows: Vec<Row>,
expr: MirScalarExpr,
}
const NUM_TYPE: ScalarType = ScalarType::Numeric { max_scale: None };
static SCALAR_TYPES: &[ScalarType] = &[
ScalarType::Bool,
ScalarType::Jsonb,
NUM_TYPE,
ScalarType::Date,
ScalarType::Timestamp { precision: None },
ScalarType::MzTimestamp,
ScalarType::String,
];
const INTERESTING_UNARY_FUNCS: &[UnaryFunc] = {
&[
UnaryFunc::CastNumericToMzTimestamp(CastNumericToMzTimestamp),
UnaryFunc::NegNumeric(NegNumeric),
UnaryFunc::CastJsonbToNumeric(CastJsonbToNumeric(None)),
UnaryFunc::CastJsonbToBool(CastJsonbToBool),
UnaryFunc::CastJsonbToString(CastJsonbToString),
UnaryFunc::DateTruncTimestamp(DateTruncTimestamp(DateTimeUnits::Epoch)),
UnaryFunc::ExtractTimestamp(ExtractTimestamp(DateTimeUnits::Epoch)),
UnaryFunc::ExtractDate(ExtractDate(DateTimeUnits::Epoch)),
UnaryFunc::Not(Not),
UnaryFunc::IsNull(IsNull),
UnaryFunc::IsFalse(IsFalse),
UnaryFunc::TryParseMonotonicIso8601Timestamp(TryParseMonotonicIso8601Timestamp),
]
};
fn unary_typecheck(func: &UnaryFunc, arg: &ColumnType) -> bool {
use UnaryFunc::*;
match func {
CastNumericToMzTimestamp(_) | NegNumeric(_) => arg.scalar_type.base_eq(&NUM_TYPE),
CastJsonbToNumeric(_) | CastJsonbToBool(_) | CastJsonbToString(_) => {
arg.scalar_type.base_eq(&ScalarType::Jsonb)
}
ExtractTimestamp(_) | DateTruncTimestamp(_) => arg
.scalar_type
.base_eq(&ScalarType::Timestamp { precision: None }),
ExtractDate(_) => arg.scalar_type.base_eq(&ScalarType::Date),
Not(_) => arg.scalar_type.base_eq(&ScalarType::Bool),
IsNull(_) => true,
TryParseMonotonicIso8601Timestamp(_) => arg.scalar_type.base_eq(&ScalarType::String),
_ => false,
}
}
const INTERESTING_BINARY_FUNCS: &[BinaryFunc] = {
use BinaryFunc::*;
&[
AddTimestampInterval,
AddNumeric,
SubNumeric,
MulNumeric,
DivNumeric,
Eq,
Lt,
Gt,
Lte,
Gte,
DateTruncTimestamp,
JsonbGetString { stringify: true },
JsonbGetString { stringify: false },
]
};
fn binary_typecheck(func: &BinaryFunc, arg0: &ColumnType, arg1: &ColumnType) -> bool {
use BinaryFunc::*;
match func {
AddTimestampInterval => {
arg0.scalar_type
.base_eq(&ScalarType::Timestamp { precision: None })
&& arg1.scalar_type.base_eq(&ScalarType::Interval)
}
AddNumeric | SubNumeric | MulNumeric | DivNumeric => {
arg0.scalar_type.base_eq(&NUM_TYPE) && arg1.scalar_type.base_eq(&NUM_TYPE)
}
Eq | Lt | Gt | Lte | Gte => arg0.scalar_type.base_eq(&arg1.scalar_type),
DateTruncTimestamp => {
arg0.scalar_type.base_eq(&ScalarType::String)
&& arg1
.scalar_type
.base_eq(&ScalarType::Timestamp { precision: None })
}
JsonbGetString { .. } => {
arg0.scalar_type.base_eq(&ScalarType::Jsonb)
&& arg1.scalar_type.base_eq(&ScalarType::String)
}
_ => false,
}
}
const INTERESTING_VARIADIC_FUNCS: &[VariadicFunc] = {
use VariadicFunc::*;
&[Coalesce, Greatest, Least, And, Or, Concat, ConcatWs]
};
fn variadic_typecheck(func: &VariadicFunc, args: &[ColumnType]) -> bool {
use VariadicFunc::*;
fn all_eq<'a>(iter: impl IntoIterator<Item = &'a ColumnType>, other: &ScalarType) -> bool {
iter.into_iter().all(|t| t.scalar_type.base_eq(other))
}
match func {
Coalesce | Greatest | Least => match args {
[] => true,
[first, rest @ ..] => all_eq(rest, &first.scalar_type),
},
And | Or => all_eq(args, &ScalarType::Bool),
Concat => all_eq(args, &ScalarType::String),
ConcatWs => args.len() > 1 && all_eq(args, &ScalarType::String),
_ => false,
}
}
fn gen_datums_for_type(typ: &ColumnType) -> BoxedStrategy<Datum<'static>> {
let mut values: Vec<Datum<'static>> = typ.scalar_type.interesting_datums().collect();
if typ.nullable {
values.push(Datum::Null)
}
select(values).boxed()
}
fn gen_column() -> impl Strategy<Value = (ColumnType, Datum<'static>, ResultSpec<'static>)> {
let col_type = (select(SCALAR_TYPES), any::<bool>())
.prop_map(|(t, b)| t.nullable(b))
.prop_filter("need at least one value", |c| {
c.scalar_type.interesting_datums().count() > 0
});
let result_spec = select(vec![
ResultSpec::nothing(),
ResultSpec::null(),
ResultSpec::anything(),
ResultSpec::value_all(),
]);
(col_type, result_spec).prop_flat_map(|(col, result_spec)| {
gen_datums_for_type(&col).prop_map(move |datum| {
let result_spec = result_spec.clone().union(ResultSpec::value(datum));
(col.clone(), datum, result_spec)
})
})
}
fn gen_expr_for_relation(
relation: &RelationType,
) -> BoxedStrategy<(MirScalarExpr, ColumnType)> {
let column_gen = {
let column_types = relation.column_types.clone();
any::<Index>()
.prop_map(move |idx| {
let id = idx.index(column_types.len());
(MirScalarExpr::Column(id), column_types[id].clone())
})
.boxed()
};
let literal_gen = (select(SCALAR_TYPES), any::<bool>())
.prop_map(|(s, b)| s.nullable(b))
.prop_flat_map(|ct| {
let error_gen = any::<EvalError>().prop_map(Err).boxed();
let value_gen = gen_datums_for_type(&ct)
.prop_map(move |datum| Ok(Row::pack_slice(&[datum])))
.boxed();
error_gen.prop_union(value_gen).prop_map(move |result| {
(MirScalarExpr::Literal(result, ct.clone()), ct.clone())
})
})
.boxed();
column_gen
.prop_union(literal_gen)
.prop_recursive(4, 64, 8, |self_gen| {
let unary_gen = (select(INTERESTING_UNARY_FUNCS), self_gen.clone())
.prop_filter_map("unary func", |(func, (expr_in, type_in))| {
if !unary_typecheck(&func, &type_in) {
return None;
}
let type_out = func.output_type(type_in);
let expr_out = MirScalarExpr::CallUnary {
func,
expr: Box::new(expr_in),
};
Some((expr_out, type_out))
})
.boxed();
let binary_gen = (
select(INTERESTING_BINARY_FUNCS),
self_gen.clone(),
self_gen.clone(),
)
.prop_filter_map(
"binary func",
|(func, (expr_left, type_left), (expr_right, type_right))| {
if !binary_typecheck(&func, &type_left, &type_right) {
return None;
}
let type_out = func.output_type(type_left, type_right);
let expr_out = MirScalarExpr::CallBinary {
func,
expr1: Box::new(expr_left),
expr2: Box::new(expr_right),
};
Some((expr_out, type_out))
},
)
.boxed();
let variadic_gen = (
select(INTERESTING_VARIADIC_FUNCS),
prop::collection::vec(self_gen, 1..4),
)
.prop_filter_map("variadic func", |(func, exprs)| {
let (exprs_in, type_in): (_, Vec<_>) = exprs.into_iter().unzip();
if !variadic_typecheck(&func, &type_in) {
return None;
}
let type_out = func.output_type(type_in);
let expr_out = MirScalarExpr::CallVariadic {
func,
exprs: exprs_in,
};
Some((expr_out, type_out))
})
.boxed();
unary_gen
.prop_union(binary_gen)
.boxed()
.prop_union(variadic_gen)
})
.boxed()
}
fn gen_expr_data() -> impl Strategy<Value = ExpressionData> {
let columns = prop::collection::vec(gen_column(), 1..10);
columns.prop_flat_map(|data| {
let (columns, datums, specs): (Vec<_>, Vec<_>, Vec<_>) = data.into_iter().multiunzip();
let relation = RelationType::new(columns);
let row = Row::pack_slice(&datums);
gen_expr_for_relation(&relation).prop_map(move |(expr, _)| ExpressionData {
relation_type: relation.clone(),
specs: specs.clone(),
rows: vec![row.clone()],
expr,
})
})
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_trivial_spec_matches() {
fn check(datum: PropDatum) -> Result<(), TestCaseError> {
let datum: Datum = (&datum).into();
let spec = if datum.is_null() {
ResultSpec::null()
} else {
ResultSpec::value(datum)
};
assert!(spec.may_contain(datum));
Ok(())
}
proptest!(|(datum in mz_repr::arb_datum())| {
check(datum)?;
});
assert!(ResultSpec::fails().may_fail());
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_equivalence() {
fn check(data: ExpressionData) -> Result<(), TestCaseError> {
let ExpressionData {
relation_type,
specs,
rows,
expr,
} = data;
let arena = RowArena::new();
let mut interpreter = ColumnSpecs::new(&relation_type, &arena);
for (id, spec) in specs.into_iter().enumerate() {
interpreter.push_column(id, spec);
}
let spec = interpreter.expr(&expr);
for row in &rows {
let datums: Vec<_> = row.iter().collect();
let eval_result = expr.eval(&datums, &arena);
match eval_result {
Ok(value) => {
assert!(spec.range.may_contain(value))
}
Err(_) => {
assert!(spec.range.may_fail());
}
}
}
Ok(())
}
proptest!(|(data in gen_expr_data())| {
check(data)?;
});
}
#[mz_ore::test]
fn test_mfp() {
use MirScalarExpr::*;
let mfp = MapFilterProject {
expressions: vec![],
predicates: vec![
(
1,
CallUnary {
func: UnaryFunc::IsNull(IsNull),
expr: Box::new(CallBinary {
func: BinaryFunc::MulInt32,
expr1: Box::new(Column(0)),
expr2: Box::new(Column(0)),
}),
},
),
(
1,
CallBinary {
func: BinaryFunc::Eq,
expr1: Box::new(Column(0)),
expr2: Box::new(Literal(
Ok(Row::pack_slice(&[Datum::Int32(1727694505)])),
ScalarType::Int32.nullable(false),
)),
},
),
],
projection: vec![],
input_arity: 1,
};
let relation = RelationType::new(vec![ScalarType::Int32.nullable(true)]);
let arena = RowArena::new();
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(0, ResultSpec::value(Datum::Int32(-1294725158)));
let spec = interpreter.mfp_filter(&mfp);
assert!(spec.range.may_fail());
}
#[mz_ore::test]
fn test_concat() {
let expr = MirScalarExpr::CallVariadic {
func: VariadicFunc::Concat,
exprs: vec![
MirScalarExpr::Column(0),
MirScalarExpr::literal_ok(Datum::String("a"), ScalarType::String),
MirScalarExpr::literal_ok(Datum::String("b"), ScalarType::String),
],
};
let relation = RelationType::new(vec![ScalarType::String.nullable(false)]);
let arena = RowArena::new();
let interpreter = ColumnSpecs::new(&relation, &arena);
let spec = interpreter.expr(&expr);
assert!(spec.range.may_contain(Datum::String("blab")));
}
#[mz_ore::test]
fn test_eval_range() {
let period_ms = MirScalarExpr::Literal(
Ok(Row::pack_slice(&[Datum::Int64(10)])),
ScalarType::Int64.nullable(false),
);
let expr = MirScalarExpr::CallBinary {
func: BinaryFunc::Gte,
expr1: Box::new(MirScalarExpr::CallUnmaterializable(
UnmaterializableFunc::MzNow,
)),
expr2: Box::new(MirScalarExpr::CallUnary {
func: UnaryFunc::CastInt64ToMzTimestamp(CastInt64ToMzTimestamp),
expr: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::MulInt64,
expr1: Box::new(period_ms.clone()),
expr2: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::DivInt64,
expr1: Box::new(MirScalarExpr::Column(0)),
expr2: Box::new(period_ms),
}),
}),
}),
};
let relation = RelationType::new(vec![ScalarType::Int64.nullable(false)]);
{
let arena = RowArena::new();
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_unmaterializable(
UnmaterializableFunc::MzNow,
ResultSpec::value_between(
Datum::MzTimestamp(10.into()),
Datum::MzTimestamp(20.into()),
),
);
interpreter.push_column(0, ResultSpec::value_between(30i64.into(), 40i64.into()));
let range_out = interpreter.expr(&expr).range;
assert!(range_out.may_contain(Datum::False));
assert!(!range_out.may_contain(Datum::True));
assert!(!range_out.may_contain(Datum::Null));
assert!(!range_out.may_fail());
}
{
let arena = RowArena::new();
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_unmaterializable(
UnmaterializableFunc::MzNow,
ResultSpec::value_between(
Datum::MzTimestamp(10.into()),
Datum::MzTimestamp(35.into()),
),
);
interpreter.push_column(0, ResultSpec::value_between(30i64.into(), 40i64.into()));
let range_out = interpreter.expr(&expr).range;
assert!(range_out.may_contain(Datum::False));
assert!(range_out.may_contain(Datum::True));
assert!(!range_out.may_contain(Datum::Null));
assert!(!range_out.may_fail());
}
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_jsonb() {
let arena = RowArena::new();
let expr = MirScalarExpr::CallUnary {
func: UnaryFunc::CastJsonbToNumeric(CastJsonbToNumeric(None)),
expr: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::JsonbGetString { stringify: false },
expr1: Box::new(MirScalarExpr::Column(0)),
expr2: Box::new(MirScalarExpr::Literal(
Ok(Row::pack_slice(&["ts".into()])),
ScalarType::String.nullable(false),
)),
}),
};
let relation = RelationType::new(vec![ScalarType::Jsonb.nullable(true)]);
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::map_spec(
[(
"ts".into(),
ResultSpec::value_between(
Datum::Numeric(100.into()),
Datum::Numeric(300.into()),
),
)]
.into_iter()
.collect(),
),
);
let range_out = interpreter.expr(&expr).range;
assert!(!range_out.may_contain(Datum::Numeric(0.into())));
assert!(range_out.may_contain(Datum::Numeric(200.into())));
assert!(!range_out.may_contain(Datum::Numeric(400.into())));
}
#[mz_ore::test]
fn test_like() {
let arena = RowArena::new();
let expr = MirScalarExpr::CallUnary {
func: UnaryFunc::IsLikeMatch(IsLikeMatch(
crate::like_pattern::compile("%whatever%", true).unwrap(),
)),
expr: Box::new(MirScalarExpr::Column(0)),
};
let relation = RelationType::new(vec![ScalarType::String.nullable(true)]);
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::value_between(Datum::String("aardvark"), Datum::String("zebra")),
);
let range_out = interpreter.expr(&expr).range;
assert!(
!range_out.fallible,
"like function should not error on non-error input"
);
assert!(range_out.may_contain(Datum::True));
assert!(range_out.may_contain(Datum::False));
assert!(range_out.may_contain(Datum::Null));
}
#[mz_ore::test]
fn test_try_parse_monotonic_iso8601_timestamp() {
use chrono::NaiveDateTime;
let arena = RowArena::new();
let expr = MirScalarExpr::CallUnary {
func: UnaryFunc::TryParseMonotonicIso8601Timestamp(TryParseMonotonicIso8601Timestamp),
expr: Box::new(MirScalarExpr::Column(0)),
};
let relation = RelationType::new(vec![ScalarType::String.nullable(true)]);
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::value_between(
Datum::String("2024-01-11T00:00:00.000Z"),
Datum::String("2024-01-11T20:00:00.000Z"),
),
);
let timestamp = |ts| {
Datum::Timestamp(
NaiveDateTime::parse_from_str(ts, "%Y-%m-%dT%H:%M:%S")
.unwrap()
.try_into()
.unwrap(),
)
};
let range_out = interpreter.expr(&expr).range;
assert!(!range_out.fallible);
assert!(range_out.nullable);
assert!(!range_out.may_contain(timestamp("2024-01-10T10:00:00")));
assert!(range_out.may_contain(timestamp("2024-01-11T10:00:00")));
assert!(!range_out.may_contain(timestamp("2024-01-12T10:00:00")));
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::value_between(Datum::String("2024-01-1"), Datum::String("2024-01-2")),
);
let range_out = interpreter.expr(&expr).range;
assert!(!range_out.fallible);
assert!(range_out.nullable);
assert!(range_out.may_contain(timestamp("2024-01-10T10:00:00")));
assert!(range_out.may_contain(timestamp("2024-01-11T10:00:00")));
assert!(range_out.may_contain(timestamp("2024-01-12T10:00:00")));
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::value_between(
Datum::String("2024-01-1"),
Datum::String("2024-01-12T10:00:00"),
)
.union(ResultSpec::null()),
);
let range_out = interpreter.expr(&expr).range;
assert!(!range_out.fallible);
assert!(range_out.nullable);
assert!(range_out.may_contain(timestamp("2024-01-10T10:00:00")));
assert!(range_out.may_contain(timestamp("2024-01-11T10:00:00")));
assert!(range_out.may_contain(timestamp("2024-01-12T10:00:00")));
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::value_between(
Datum::String("2024-01-11T10:00:00.000Z"),
Datum::String("2024-01-11T10:00:00.000Z"),
),
);
let range_out = interpreter.expr(&expr).range;
assert!(!range_out.fallible);
assert!(!range_out.nullable);
assert!(!range_out.may_contain(timestamp("2024-01-10T10:00:00")));
assert!(range_out.may_contain(timestamp("2024-01-11T10:00:00")));
assert!(!range_out.may_contain(timestamp("2024-01-12T10:00:00")));
}
#[mz_ore::test]
fn test_inequality() {
let arena = RowArena::new();
let expr = MirScalarExpr::CallBinary {
func: BinaryFunc::Gte,
expr1: Box::new(MirScalarExpr::Column(0)),
expr2: Box::new(MirScalarExpr::CallUnmaterializable(
UnmaterializableFunc::MzNow,
)),
};
let relation = RelationType::new(vec![ScalarType::MzTimestamp.nullable(true)]);
let mut interpreter = ColumnSpecs::new(&relation, &arena);
interpreter.push_column(
0,
ResultSpec::value_between(
Datum::MzTimestamp(1704736444949u64.into()),
Datum::MzTimestamp(1704736444949u64.into()),
)
.union(ResultSpec::null()),
);
interpreter.push_unmaterializable(
UnmaterializableFunc::MzNow,
ResultSpec::value_between(
Datum::MzTimestamp(1704738791000u64.into()),
Datum::MzTimestamp(18446744073709551615u64.into()),
),
);
let range_out = interpreter.expr(&expr).range;
assert!(
!range_out.fallible,
"<= function should not error on non-error input"
);
assert!(!range_out.may_contain(Datum::True));
assert!(range_out.may_contain(Datum::False));
assert!(range_out.may_contain(Datum::Null));
}
#[mz_ore::test]
fn test_trace() {
use super::Trace;
let expr = MirScalarExpr::CallBinary {
func: BinaryFunc::Gte,
expr1: Box::new(MirScalarExpr::Column(0)),
expr2: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::AddInt64,
expr1: Box::new(MirScalarExpr::Column(1)),
expr2: Box::new(MirScalarExpr::CallUnary {
func: UnaryFunc::NegInt64(NegInt64),
expr: Box::new(MirScalarExpr::Column(3)),
}),
}),
};
let summary = Trace.expr(&expr);
assert!(summary.pushdownable());
}
}