#![allow(missing_docs)]
use std::cmp::{max, min};
use std::iter::Sum;
use std::ops::Deref;
use std::{fmt, iter};
use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
use dec::OrderedDecimal;
use itertools::Itertools;
use mz_lowertest::MzReflect;
use mz_ore::cast::CastFrom;
use mz_ore::soft_assert_or_log;
use mz_ore::str::separated;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::adt::array::ArrayDimension;
use mz_repr::adt::date::Date;
use mz_repr::adt::interval::Interval;
use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
use mz_repr::adt::regex::Regex as ReprRegex;
use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
use mz_repr::{ColumnName, ColumnType, Datum, Diff, RelationType, Row, RowArena, ScalarType};
use num::{CheckedAdd, Integer, Signed, ToPrimitive};
use ordered_float::OrderedFloat;
use proptest::prelude::{Arbitrary, Just};
use proptest::strategy::{BoxedStrategy, Strategy, Union};
use proptest_derive::Arbitrary;
use regex::Regex;
use serde::{Deserialize, Serialize};
use crate::explain::{HumanizedExpr, HumanizerMode};
use crate::relation::proto_aggregate_func::{self, ProtoColumnOrders, ProtoFusedValueWindowFunc};
use crate::relation::proto_table_func::ProtoTabletizedScalar;
use crate::relation::{
compare_columns, proto_table_func, ColumnOrder, ProtoAggregateFunc, ProtoTableFunc,
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
use crate::EvalError;
use crate::WindowFrameBound::{
CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
};
use crate::WindowFrameUnits::{Groups, Range, Rows};
include!(concat!(env!("OUT_DIR"), "/mz_expr.relation.func.rs"));
fn max_string<'a, I>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
match datums
.into_iter()
.filter(|d| !d.is_null())
.max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
{
Some(datum) => datum,
None => Datum::Null,
}
}
fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
DatumType: TryFrom<Datum<'a>> + Ord,
<DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
Datum<'a>: From<Option<DatumType>>,
{
let x: Option<DatumType> = datums
.into_iter()
.filter(|d| !d.is_null())
.map(|d| DatumType::try_from(d).expect("unexpected type"))
.max();
x.into()
}
fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
DatumType: TryFrom<Datum<'a>> + Ord,
<DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
Datum<'a>: From<Option<DatumType>>,
{
let x: Option<DatumType> = datums
.into_iter()
.filter(|d| !d.is_null())
.map(|d| DatumType::try_from(d).expect("unexpected type"))
.min();
x.into()
}
fn min_string<'a, I>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
match datums
.into_iter()
.filter(|d| !d.is_null())
.min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
{
Some(datum) => datum,
None => Datum::Null,
}
}
fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
DatumType: TryFrom<Datum<'a>>,
<DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
{
let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
if datums.peek().is_none() {
Datum::Null
} else {
let x = datums
.map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
.sum::<ResultType>();
x.into()
}
}
fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let mut cx = numeric::cx_datum();
let mut sum = Numeric::zero();
let mut empty = true;
for d in datums {
if !d.is_null() {
empty = false;
cx.add(&mut sum, &d.unwrap_numeric().0);
}
}
match empty {
true => Datum::Null,
false => Datum::from(sum),
}
}
#[allow(clippy::as_conversions)]
fn count<'a, I>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
Datum::from(x)
}
fn any<'a, I>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
datums
.into_iter()
.fold(Datum::False, |state, next| match (state, next) {
(Datum::True, _) | (_, Datum::True) => Datum::True,
(Datum::Null, _) | (_, Datum::Null) => Datum::Null,
_ => Datum::False,
})
}
fn all<'a, I>(datums: I) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
datums
.into_iter()
.fold(Datum::True, |state, next| match (state, next) {
(Datum::False, _) | (_, Datum::False) => Datum::False,
(Datum::Null, _) | (_, Datum::Null) => Datum::Null,
_ => Datum::True,
})
}
fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
const EMPTY_SEP: &str = "";
let datums = order_aggregate_datums(datums, order_by);
let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
if d.is_null() {
return None;
}
let mut value_sep = d.unwrap_list().iter();
match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
(Datum::Null, _) => None,
(Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
(Datum::String(val), Datum::String(sep)) => Some((sep, val)),
_ => unreachable!(),
}
});
let mut s = String::default();
match sep_value_pairs.next() {
Some((_, value)) => s.push_str(value),
None => return Datum::Null,
}
for (sep, value) in sep_value_pairs {
s.push_str(sep);
s.push_str(value);
}
Datum::String(temp_storage.push_string(s))
}
fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
temp_storage.make_datum(|packer| {
packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
})
}
fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
temp_storage.make_datum(|packer| {
let mut datums: Vec<_> = datums
.into_iter()
.filter_map(|d| {
if d.is_null() {
return None;
}
let mut list = d.unwrap_list().iter();
let key = list.next().unwrap();
let val = list.next().unwrap();
if key.is_null() {
None
} else {
Some((key.unwrap_str(), val))
}
})
.collect();
datums.sort_by_key(|(k, _v)| *k);
datums.reverse();
datums.dedup_by_key(|(k, _v)| *k);
datums.reverse();
packer.push_dict(datums);
})
}
pub fn order_aggregate_datums<'a: 'b, 'b, I>(
datums: I,
order_by: &[ColumnOrder],
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
order_aggregate_datums_with_rank_inner(datums, order_by)
.into_iter()
.map(|(payload, _order_datums)| payload)
}
fn order_aggregate_datums_with_rank<'a, I>(
datums: I,
order_by: &[ColumnOrder],
) -> impl Iterator<Item = (Datum<'a>, Row)>
where
I: IntoIterator<Item = Datum<'a>>,
{
order_aggregate_datums_with_rank_inner(datums, order_by)
.into_iter()
.map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
}
fn order_aggregate_datums_with_rank_inner<'a, I>(
datums: I,
order_by: &[ColumnOrder],
) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
where
I: IntoIterator<Item = Datum<'a>>,
{
let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
.into_iter()
.map(|d| {
let list = d.unwrap_list();
let mut list_it = list.iter();
let payload = list_it.next().unwrap();
let mut order_by_datums = Vec::with_capacity(order_by.len());
for _ in 0..order_by.len() {
order_by_datums.push(
list_it
.next()
.expect("must have exactly the same number of Datums as `order_by`"),
);
}
(payload, order_by_datums)
})
.collect();
let mut sort_by =
|(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
(payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
compare_columns(
order_by,
left_order_by_datums,
right_order_by_datums,
|| payload_left.cmp(payload_right),
)
};
decoded.sort_unstable_by(&mut sort_by);
decoded
}
fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
let datums: Vec<_> = datums
.into_iter()
.map(|d| d.unwrap_array().elements().iter())
.flatten()
.collect();
let dims = ArrayDimension {
lower_bound: 1,
length: datums.len(),
};
temp_storage.make_datum(|packer| {
packer.push_array(&[dims], datums).unwrap();
})
}
fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
temp_storage.make_datum(|packer| {
packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
})
}
fn row_number<'a, I>(
datums: I,
callers_temp_storage: &'a RowArena,
order_by: &[ColumnOrder],
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let datums = row_number_no_list(datums, &temp_storage, order_by);
callers_temp_storage.make_datum(|packer| {
packer.push_list(datums);
})
}
fn row_number_no_list<'a: 'b, 'b, I>(
datums: I,
callers_temp_storage: &'b RowArena,
order_by: &[ColumnOrder],
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
callers_temp_storage.reserve(datums.size_hint().0);
datums
.into_iter()
.map(|d| d.unwrap_list().iter())
.flatten()
.zip(1i64..)
.map(|(d, i)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(Datum::Int64(i));
packer.push(d);
});
})
})
}
fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let datums = rank_no_list(datums, &temp_storage, order_by);
callers_temp_storage.make_datum(|packer| {
packer.push_list(datums);
})
}
fn rank_no_list<'a: 'b, 'b, I>(
datums: I,
callers_temp_storage: &'b RowArena,
order_by: &[ColumnOrder],
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums_with_rank(datums, order_by);
let mut datums = datums
.into_iter()
.map(|(d0, order_row)| {
d0.unwrap_list()
.iter()
.map(move |d1| (d1, order_row.clone()))
})
.flatten();
callers_temp_storage.reserve(datums.size_hint().0);
datums
.next()
.map_or(vec![], |(first_datum, first_order_row)| {
datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
*acc_row_num += 1;
if *acc_row != next_order_row {
*acc_rank = *acc_row_num;
*acc_row = next_order_row;
}
(*output).push((next_datum, *acc_rank));
acc
})
}.3).into_iter().map(|(d, i)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(Datum::Int64(i));
packer.push(d);
});
})
})
}
fn dense_rank<'a, I>(
datums: I,
callers_temp_storage: &'a RowArena,
order_by: &[ColumnOrder],
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let datums = dense_rank_no_list(datums, &temp_storage, order_by);
callers_temp_storage.make_datum(|packer| {
packer.push_list(datums);
})
}
fn dense_rank_no_list<'a: 'b, 'b, I>(
datums: I,
callers_temp_storage: &'b RowArena,
order_by: &[ColumnOrder],
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums_with_rank(datums, order_by);
let mut datums = datums
.into_iter()
.map(|(d0, order_row)| {
d0.unwrap_list()
.iter()
.map(move |d1| (d1, order_row.clone()))
})
.flatten();
callers_temp_storage.reserve(datums.size_hint().0);
datums
.next()
.map_or(vec![], |(first_datum, first_order_row)| {
datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
if *acc_row != next_order_row {
*acc_rank += 1;
*acc_row = next_order_row;
}
(*output).push((next_datum, *acc_rank));
acc
})
}.2).into_iter().map(|(d, i)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(Datum::Int64(i));
packer.push(d);
});
})
})
}
fn lag_lead<'a, I>(
datums: I,
callers_temp_storage: &'a RowArena,
order_by: &[ColumnOrder],
lag_lead_type: &LagLeadType,
ignore_nulls: &bool,
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
callers_temp_storage.make_datum(|packer| {
packer.push_list(iter);
})
}
fn lag_lead_no_list<'a: 'b, 'b, I>(
datums: I,
callers_temp_storage: &'b RowArena,
order_by: &[ColumnOrder],
lag_lead_type: &LagLeadType,
ignore_nulls: &bool,
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
.into_iter()
.map(|d| {
let mut iter = d.unwrap_list().iter();
let original_row = iter.next().unwrap();
let (input_value, offset, default_value) =
unwrap_lag_lead_encoded_args(iter.next().unwrap());
(original_row, (input_value, offset, default_value))
})
.unzip();
let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
callers_temp_storage.reserve(result.len());
result
.into_iter()
.zip_eq(orig_rows)
.map(|(result_value, original_row)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(result_value);
packer.push(original_row);
});
})
})
}
fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
let mut encoded_args_iter = encoded_args.unwrap_list().iter();
let (input_value, offset, default_value) = (
encoded_args_iter.next().unwrap(),
encoded_args_iter.next().unwrap(),
encoded_args_iter.next().unwrap(),
);
(input_value, offset, default_value)
}
fn lag_lead_inner<'a>(
args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
lag_lead_type: &LagLeadType,
ignore_nulls: &bool,
) -> Vec<Datum<'a>> {
if *ignore_nulls {
lag_lead_inner_ignore_nulls(args, lag_lead_type)
} else {
lag_lead_inner_respect_nulls(args, lag_lead_type)
}
}
fn lag_lead_inner_respect_nulls<'a>(
args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
lag_lead_type: &LagLeadType,
) -> Vec<Datum<'a>> {
let mut result: Vec<Datum> = Vec::with_capacity(args.len());
for (idx, (_, offset, default_value)) in args.iter().enumerate() {
if offset.is_null() {
result.push(Datum::Null);
continue;
}
let idx = i64::try_from(idx).expect("Array index does not fit in i64");
let offset = i64::from(offset.unwrap_int32());
let offset = match lag_lead_type {
LagLeadType::Lag => -offset,
LagLeadType::Lead => offset,
};
let datums_get = |i: i64| -> Option<Datum> {
match u64::try_from(i) {
Ok(i) => args
.get(usize::cast_from(i))
.map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
};
let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
result.push(lagged_value);
}
result
}
#[allow(clippy::as_conversions)]
fn lag_lead_inner_ignore_nulls<'a>(
args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
lag_lead_type: &LagLeadType,
) -> Vec<Datum<'a>> {
if i64::try_from(args.len()).is_err() {
panic!("window partition way too big")
}
let mut skip_nulls_backward = vec![None; args.len()];
let mut last_non_null: i64 = -1;
let pairs = args
.iter()
.enumerate()
.zip_eq(skip_nulls_backward.iter_mut());
for ((i, (d, _, _)), slot) in pairs {
if d.is_null() {
*slot = Some(last_non_null);
} else {
last_non_null = i as i64;
}
}
let mut skip_nulls_forward = vec![None; args.len()];
let mut last_non_null: i64 = args.len() as i64;
let pairs = args
.iter()
.enumerate()
.rev()
.zip_eq(skip_nulls_forward.iter_mut().rev());
for ((i, (d, _, _)), slot) in pairs {
if d.is_null() {
*slot = Some(last_non_null);
} else {
last_non_null = i as i64;
}
}
let mut result: Vec<Datum> = Vec::with_capacity(args.len());
for (idx, (_, offset, default_value)) in args.iter().enumerate() {
if offset.is_null() {
result.push(Datum::Null);
continue;
}
let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
let offset = match lag_lead_type {
LagLeadType::Lag => -offset,
LagLeadType::Lead => offset,
};
let increment = offset.signum();
let datums_get = |i: i64| -> Option<Datum> {
match u64::try_from(i) {
Ok(i) => args
.get(usize::cast_from(i))
.map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
};
let lagged_value = if increment != 0 {
let mut j = idx;
for _ in 0..num::abs(offset) {
j += increment;
if datums_get(j).is_some_and(|d| d.is_null()) {
let ju = j as usize; if increment > 0 {
j = skip_nulls_forward[ju].expect("checked above that it's null");
} else {
j = skip_nulls_backward[ju].expect("checked above that it's null");
}
}
if datums_get(j).is_none() {
break;
}
}
match datums_get(j) {
Some(datum) => datum,
None => *default_value,
}
} else {
assert_eq!(offset, 0);
let datum = datums_get(idx).expect("known to exist");
if !datum.is_null() {
datum
} else {
panic!("0 offset in lag/lead IGNORE NULLS");
}
};
result.push(lagged_value);
}
result
}
fn first_value<'a, I>(
datums: I,
callers_temp_storage: &'a RowArena,
order_by: &[ColumnOrder],
window_frame: &WindowFrame,
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
callers_temp_storage.make_datum(|packer| {
packer.push_list(iter);
})
}
fn first_value_no_list<'a: 'b, 'b, I>(
datums: I,
callers_temp_storage: &'b RowArena,
order_by: &[ColumnOrder],
window_frame: &WindowFrame,
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums(datums, order_by);
let (orig_rows, args): (Vec<_>, Vec<_>) = datums
.into_iter()
.map(|d| {
let mut iter = d.unwrap_list().iter();
let original_row = iter.next().unwrap();
let input_value = iter.next().unwrap();
(original_row, input_value)
})
.unzip();
let results = first_value_inner(args, window_frame);
callers_temp_storage.reserve(results.len());
results
.into_iter()
.zip_eq(orig_rows)
.map(|(result_value, original_row)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(result_value);
packer.push(original_row);
});
})
})
}
fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
let length = datums.len();
let mut result: Vec<Datum> = Vec::with_capacity(length);
for (idx, current_datum) in datums.iter().enumerate() {
let first_value = match &window_frame.start_bound {
WindowFrameBound::CurrentRow => *current_datum,
WindowFrameBound::UnboundedPreceding => {
if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
let end_offset = usize::cast_from(*end_offset);
if idx < end_offset {
Datum::Null
} else {
datums[0]
}
} else {
datums[0]
}
}
WindowFrameBound::OffsetPreceding(offset) => {
let start_offset = usize::cast_from(*offset);
let start_idx = idx.saturating_sub(start_offset);
if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
let end_offset = usize::cast_from(*end_offset);
if start_offset < end_offset || idx < end_offset {
Datum::Null
} else {
datums[start_idx]
}
} else {
datums[start_idx]
}
}
WindowFrameBound::OffsetFollowing(offset) => {
let start_offset = usize::cast_from(*offset);
let start_idx = idx.saturating_add(start_offset);
if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
if offset > end_offset || start_idx >= length {
Datum::Null
} else {
datums[start_idx]
}
} else {
datums
.get(start_idx)
.map(|d| d.clone())
.unwrap_or(Datum::Null)
}
}
WindowFrameBound::UnboundedFollowing => unreachable!(),
};
result.push(first_value);
}
result
}
fn last_value<'a, I>(
datums: I,
callers_temp_storage: &'a RowArena,
order_by: &[ColumnOrder],
window_frame: &WindowFrame,
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
callers_temp_storage.make_datum(|packer| {
packer.push_list(iter);
})
}
fn last_value_no_list<'a: 'b, 'b, I>(
datums: I,
callers_temp_storage: &'b RowArena,
order_by: &[ColumnOrder],
window_frame: &WindowFrame,
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let datums = order_aggregate_datums_with_rank(datums, order_by);
let size_hint = datums.size_hint().0;
let mut args = Vec::with_capacity(size_hint);
let mut original_rows = Vec::with_capacity(size_hint);
let mut order_by_rows = Vec::with_capacity(size_hint);
for (d, order_by_row) in datums.into_iter() {
let mut iter = d.unwrap_list().iter();
let original_row = iter.next().unwrap();
let input_value = iter.next().unwrap();
order_by_rows.push(order_by_row);
original_rows.push(original_row);
args.push(input_value);
}
let results = last_value_inner(args, &order_by_rows, window_frame);
callers_temp_storage.reserve(results.len());
results
.into_iter()
.zip_eq(original_rows)
.map(|(result_value, original_row)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(result_value);
packer.push(original_row);
});
})
})
}
fn last_value_inner<'a>(
args: Vec<Datum<'a>>,
order_by_rows: &Vec<Row>,
window_frame: &WindowFrame,
) -> Vec<Datum<'a>> {
let length = args.len();
let mut results: Vec<Datum> = Vec::with_capacity(length);
for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
let last_value = match &window_frame.end_bound {
WindowFrameBound::CurrentRow => match &window_frame.units {
WindowFrameUnits::Rows => *current_datum,
WindowFrameUnits::Range => {
let target_idx = order_by_rows[idx..]
.iter()
.enumerate()
.take_while(|(_, row)| *row == order_by_row)
.last()
.unwrap()
.0
+ idx;
args[target_idx]
}
WindowFrameUnits::Groups => unreachable!(),
},
WindowFrameBound::UnboundedFollowing => {
if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
let start_offset = usize::cast_from(*start_offset);
if idx + start_offset > length - 1 {
Datum::Null
} else {
args[length - 1]
}
} else {
args[length - 1]
}
}
WindowFrameBound::OffsetFollowing(offset) => {
let end_offset = usize::cast_from(*offset);
let end_idx = idx.saturating_add(end_offset);
if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
let start_offset = usize::cast_from(*start_offset);
let start_idx = idx.saturating_add(start_offset);
if end_offset < start_offset || start_idx >= length {
Datum::Null
} else {
args.get(end_idx).unwrap_or(&args[length - 1]).clone()
}
} else {
args.get(end_idx).unwrap_or(&args[length - 1]).clone()
}
}
WindowFrameBound::OffsetPreceding(offset) => {
let end_offset = usize::cast_from(*offset);
let end_idx = idx.saturating_sub(end_offset);
if idx < end_offset {
Datum::Null
} else if let WindowFrameBound::OffsetPreceding(start_offset) =
&window_frame.start_bound
{
if offset > start_offset {
Datum::Null
} else {
args[end_idx]
}
} else {
args[end_idx]
}
}
WindowFrameBound::UnboundedPreceding => unreachable!(),
};
results.push(last_value);
}
results
}
fn fused_value_window_func<'a, I>(
input_datums: I,
callers_temp_storage: &'a RowArena,
funcs: &Vec<AggregateFunc>,
order_by: &Vec<ColumnOrder>,
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
let temp_storage = RowArena::new();
let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
callers_temp_storage.make_datum(|packer| {
packer.push_list(iter);
})
}
fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
input_datums: I,
callers_temp_storage: &'b RowArena,
funcs: &Vec<AggregateFunc>,
order_by: &Vec<ColumnOrder>,
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
{
let has_last_value = funcs
.iter()
.any(|f| matches!(f, AggregateFunc::LastValue { .. }));
let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
let size_hint = input_datums_with_ranks.size_hint().0;
let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
let mut original_rows = Vec::with_capacity(size_hint);
let mut order_by_rows = Vec::with_capacity(size_hint);
for (d, order_by_row) in input_datums_with_ranks {
let mut iter = d.unwrap_list().iter();
let original_row = iter.next().unwrap();
original_rows.push(original_row);
let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
for i in 0..funcs.len() {
let encoded_args = argss_iter.next().unwrap();
encoded_argsss[i].push(encoded_args);
}
if has_last_value {
order_by_rows.push(order_by_row);
}
}
let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
let results = match func {
AggregateFunc::LagLead {
order_by: inner_order_by,
lag_lead,
ignore_nulls,
} => {
assert_eq!(order_by, inner_order_by);
let unwrapped_argss = encoded_argss
.into_iter()
.map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
.collect();
lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
}
AggregateFunc::FirstValue {
order_by: inner_order_by,
window_frame,
} => {
assert_eq!(order_by, inner_order_by);
first_value_inner(encoded_argss, window_frame)
}
AggregateFunc::LastValue {
order_by: inner_order_by,
window_frame,
} => {
assert_eq!(order_by, inner_order_by);
last_value_inner(encoded_argss, &order_by_rows, window_frame)
}
_ => panic!("unknown window function in FusedValueWindowFunc"),
};
for (i, r) in results.into_iter().enumerate() {
results_per_row[i].push(r);
}
}
callers_temp_storage.reserve(2 * original_rows.len());
results_per_row
.into_iter()
.enumerate()
.map(move |(i, results)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer
.push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
packer.push(original_rows[i]);
});
})
})
}
fn window_aggr<'a, I, A>(
input_datums: I,
callers_temp_storage: &'a RowArena,
wrapped_aggregate: &AggregateFunc,
order_by: &[ColumnOrder],
window_frame: &WindowFrame,
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
A: OneByOneAggr,
{
let temp_storage = RowArena::new();
let iter = window_aggr_no_list::<I, A>(
input_datums,
&temp_storage,
wrapped_aggregate,
order_by,
window_frame,
);
callers_temp_storage.make_datum(|packer| {
packer.push_list(iter);
})
}
fn window_aggr_no_list<'a: 'b, 'b, I, A>(
input_datums: I,
callers_temp_storage: &'b RowArena,
wrapped_aggregate: &AggregateFunc,
order_by: &[ColumnOrder],
window_frame: &WindowFrame,
) -> impl Iterator<Item = Datum<'b>>
where
I: IntoIterator<Item = Datum<'a>>,
A: OneByOneAggr,
{
let datums = order_aggregate_datums_with_rank(input_datums, order_by);
let mut input_datums = datums
.into_iter()
.map(|(d, order_by_row)| {
let mut iter = d.unwrap_list().iter();
let original_row = iter.next().unwrap();
let input_value = iter.next().unwrap();
(input_value, original_row, order_by_row)
})
.collect_vec();
let length = input_datums.len();
let mut result: Vec<(Datum, Datum)> = Vec::with_capacity(length);
callers_temp_storage.reserve(length);
soft_assert_or_log!(
!((matches!(window_frame.units, WindowFrameUnits::Groups)
|| matches!(window_frame.units, WindowFrameUnits::Range))
&& !window_frame.includes_current_row()),
"window frame without current row"
);
if (matches!(
window_frame.start_bound,
WindowFrameBound::UnboundedPreceding
) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
|| (order_by.is_empty()
&& (matches!(window_frame.units, WindowFrameUnits::Groups)
|| matches!(window_frame.units, WindowFrameUnits::Range))
&& window_frame.includes_current_row())
{
let input_values = input_datums
.iter()
.map(|(input_value, _original_row, _order_by_row)| input_value.clone())
.collect_vec(); let result_value = wrapped_aggregate.eval(input_values, callers_temp_storage);
for (_current_datum, original_row, _order_by_row) in input_datums.iter() {
result.push((result_value, *original_row));
}
} else {
fn rows_between_unbounded_preceding_and_current_row<'a, 'b, A>(
input_datums: Vec<(Datum<'a>, Datum<'b>, Row)>,
result: &mut Vec<(Datum<'a>, Datum<'b>)>,
mut one_by_one_aggr: A,
temp_storage: &'a RowArena,
) where
A: OneByOneAggr,
{
for (current_datum, original_row, _order_by_row) in input_datums.into_iter() {
one_by_one_aggr.give(¤t_datum);
let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
result.push((result_value, original_row));
}
}
fn groups_between_unbounded_preceding_and_current_row<'a, 'b, A>(
input_datums: Vec<(Datum<'a>, Datum<'b>, Row)>,
result: &mut Vec<(Datum<'a>, Datum<'b>)>,
mut one_by_one_aggr: A,
temp_storage: &'a RowArena,
) where
A: OneByOneAggr,
{
let mut peer_group_start = 0;
while peer_group_start < input_datums.len() {
let mut peer_group_end = peer_group_start + 1;
while peer_group_end < input_datums.len()
&& input_datums[peer_group_start].2 == input_datums[peer_group_end].2
{
peer_group_end += 1;
}
for (current_datum, _original_row, _order_by_row) in
input_datums[peer_group_start..peer_group_end].iter()
{
one_by_one_aggr.give(current_datum);
}
let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
for (_current_datum, original_row, _order_by_row) in
input_datums[peer_group_start..peer_group_end].iter()
{
result.push((agg_for_peer_group, *original_row));
}
peer_group_start = peer_group_end;
}
}
fn rows_between_offset_and_offset<'a, 'b>(
input_datums: Vec<(Datum<'a>, Datum<'b>, Row)>,
result: &mut Vec<(Datum<'a>, Datum<'b>)>,
wrapped_aggregate: &AggregateFunc,
temp_storage: &'a RowArena,
offset_start: i64,
offset_end: i64,
) {
let len = input_datums
.len()
.to_i64()
.expect("window partition's len should fit into i64");
for (i, (_current_datum, original_row, _order_by_row)) in
input_datums.iter().enumerate()
{
let i = i.to_i64().expect("window partition shouldn't be super big");
let frame_start = max(i + offset_start, 0)
.to_usize()
.expect("The max made sure it's not negative");
let frame_end = min(i + offset_end, len - 1).to_usize();
match frame_end {
Some(frame_end) => {
if frame_start <= frame_end {
let frame_values = input_datums[frame_start..=frame_end].iter().map(
|(input_value, _original_row, _order_by_row)| input_value.clone(),
);
let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
result.push((result_value, original_row.clone()));
} else {
let result_value = wrapped_aggregate.default();
result.push((result_value, original_row.clone()));
}
}
None => {
let result_value = wrapped_aggregate.default();
result.push((result_value, original_row.clone()));
}
}
}
}
match (
&window_frame.units,
&window_frame.start_bound,
&window_frame.end_bound,
) {
(Rows, UnboundedPreceding, CurrentRow) => {
rows_between_unbounded_preceding_and_current_row::<A>(
input_datums,
&mut result,
A::new(wrapped_aggregate, false),
callers_temp_storage,
);
}
(Rows, CurrentRow, UnboundedFollowing) => {
input_datums.reverse();
rows_between_unbounded_preceding_and_current_row::<A>(
input_datums,
&mut result,
A::new(wrapped_aggregate, true),
callers_temp_storage,
);
result.reverse();
}
(Range, UnboundedPreceding, CurrentRow) => {
groups_between_unbounded_preceding_and_current_row::<A>(
input_datums,
&mut result,
A::new(wrapped_aggregate, false),
callers_temp_storage,
);
}
(Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
let start_prec = start_prec.to_i64().expect(
"window frame start OFFSET shouldn't be super big (the planning ensured this)",
);
let end_prec = end_prec.to_i64().expect(
"window frame end OFFSET shouldn't be super big (the planning ensured this)",
);
rows_between_offset_and_offset(
input_datums,
&mut result,
wrapped_aggregate,
callers_temp_storage,
-start_prec,
-end_prec,
);
}
(Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
let start_prec = start_prec.to_i64().expect(
"window frame start OFFSET shouldn't be super big (the planning ensured this)",
);
let end_fol = end_fol.to_i64().expect(
"window frame end OFFSET shouldn't be super big (the planning ensured this)",
);
rows_between_offset_and_offset(
input_datums,
&mut result,
wrapped_aggregate,
callers_temp_storage,
-start_prec,
end_fol,
);
}
(Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
let start_fol = start_fol.to_i64().expect(
"window frame start OFFSET shouldn't be super big (the planning ensured this)",
);
let end_fol = end_fol.to_i64().expect(
"window frame end OFFSET shouldn't be super big (the planning ensured this)",
);
rows_between_offset_and_offset(
input_datums,
&mut result,
wrapped_aggregate,
callers_temp_storage,
start_fol,
end_fol,
);
}
(Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
unreachable!() }
(Rows, OffsetPreceding(start_prec), CurrentRow) => {
let start_prec = start_prec.to_i64().expect(
"window frame start OFFSET shouldn't be super big (the planning ensured this)",
);
let end_fol = 0;
rows_between_offset_and_offset(
input_datums,
&mut result,
wrapped_aggregate,
callers_temp_storage,
-start_prec,
end_fol,
);
}
(Rows, CurrentRow, OffsetFollowing(end_fol)) => {
let start_fol = 0;
let end_fol = end_fol.to_i64().expect(
"window frame end OFFSET shouldn't be super big (the planning ensured this)",
);
rows_between_offset_and_offset(
input_datums,
&mut result,
wrapped_aggregate,
callers_temp_storage,
start_fol,
end_fol,
);
}
(Rows, CurrentRow, CurrentRow) => {
let start_fol = 0;
let end_fol = 0;
rows_between_offset_and_offset(
input_datums,
&mut result,
wrapped_aggregate,
callers_temp_storage,
start_fol,
end_fol,
);
}
(Rows, CurrentRow, OffsetPreceding(_))
| (Rows, UnboundedFollowing, _)
| (Rows, _, UnboundedPreceding)
| (Rows, OffsetFollowing(..), CurrentRow) => {
unreachable!() }
(Rows, UnboundedPreceding, UnboundedFollowing) => {
unreachable!()
}
(Rows, UnboundedPreceding, OffsetPreceding(_))
| (Rows, UnboundedPreceding, OffsetFollowing(_))
| (Rows, OffsetPreceding(..), UnboundedFollowing)
| (Rows, OffsetFollowing(..), UnboundedFollowing) => {
unreachable!()
}
(Range, _, _) => {
unreachable!()
}
(Groups, _, _) => {
unreachable!()
}
}
}
result.into_iter().map(|(result_value, original_row)| {
callers_temp_storage.make_datum(|packer| {
packer.push_list_with(|packer| {
packer.push(result_value);
packer.push(original_row);
});
})
})
}
pub trait OneByOneAggr {
fn new(agg: &AggregateFunc, reverse: bool) -> Self;
fn give(&mut self, d: &Datum);
fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
}
#[derive(Debug)]
pub struct NaiveOneByOneAggr {
agg: AggregateFunc,
input: Vec<Row>,
reverse: bool,
}
impl OneByOneAggr for NaiveOneByOneAggr {
fn new(agg: &AggregateFunc, reverse: bool) -> Self {
NaiveOneByOneAggr {
agg: agg.clone(),
input: Vec::new(),
reverse,
}
}
fn give(&mut self, d: &Datum) {
let mut row = Row::default();
row.packer().push(d);
self.input.push(row);
}
fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
temp_storage.make_datum(|packer| {
packer.push(if !self.reverse {
self.agg
.eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
} else {
self.agg.eval(
self.input.iter().rev().map(|r| r.unpack_first()),
temp_storage,
)
});
})
}
}
#[derive(
Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
)]
pub enum LagLeadType {
Lag,
Lead,
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
pub enum AggregateFunc {
MaxNumeric,
MaxInt16,
MaxInt32,
MaxInt64,
MaxUInt16,
MaxUInt32,
MaxUInt64,
MaxMzTimestamp,
MaxFloat32,
MaxFloat64,
MaxBool,
MaxString,
MaxDate,
MaxTimestamp,
MaxTimestampTz,
MaxInterval,
MaxTime,
MinNumeric,
MinInt16,
MinInt32,
MinInt64,
MinUInt16,
MinUInt32,
MinUInt64,
MinMzTimestamp,
MinFloat32,
MinFloat64,
MinBool,
MinString,
MinDate,
MinTimestamp,
MinTimestampTz,
MinInterval,
MinTime,
SumInt16,
SumInt32,
SumInt64,
SumUInt16,
SumUInt32,
SumUInt64,
SumFloat32,
SumFloat64,
SumNumeric,
Count,
Any,
All,
JsonbAgg {
order_by: Vec<ColumnOrder>,
},
JsonbObjectAgg {
order_by: Vec<ColumnOrder>,
},
MapAgg {
order_by: Vec<ColumnOrder>,
value_type: ScalarType,
},
ArrayConcat {
order_by: Vec<ColumnOrder>,
},
ListConcat {
order_by: Vec<ColumnOrder>,
},
StringAgg {
order_by: Vec<ColumnOrder>,
},
RowNumber {
order_by: Vec<ColumnOrder>,
},
Rank {
order_by: Vec<ColumnOrder>,
},
DenseRank {
order_by: Vec<ColumnOrder>,
},
LagLead {
order_by: Vec<ColumnOrder>,
lag_lead: LagLeadType,
ignore_nulls: bool,
},
FirstValue {
order_by: Vec<ColumnOrder>,
window_frame: WindowFrame,
},
LastValue {
order_by: Vec<ColumnOrder>,
window_frame: WindowFrame,
},
WindowAggregate {
wrapped_aggregate: Box<AggregateFunc>,
order_by: Vec<ColumnOrder>,
window_frame: WindowFrame,
},
FusedValueWindowFunc {
funcs: Vec<AggregateFunc>,
order_by: Vec<ColumnOrder>,
},
Dummy,
}
impl Arbitrary for AggregateFunc {
type Parameters = ();
type Strategy = Union<BoxedStrategy<Self>>;
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
use proptest::collection::vec;
use proptest::prelude::any as proptest_any;
Union::new(vec![
Just(AggregateFunc::MaxNumeric).boxed(),
Just(AggregateFunc::MaxInt16).boxed(),
Just(AggregateFunc::MaxInt32).boxed(),
Just(AggregateFunc::MaxInt64).boxed(),
Just(AggregateFunc::MaxUInt16).boxed(),
Just(AggregateFunc::MaxUInt32).boxed(),
Just(AggregateFunc::MaxUInt64).boxed(),
Just(AggregateFunc::MaxMzTimestamp).boxed(),
Just(AggregateFunc::MaxFloat32).boxed(),
Just(AggregateFunc::MaxFloat64).boxed(),
Just(AggregateFunc::MaxBool).boxed(),
Just(AggregateFunc::MaxString).boxed(),
Just(AggregateFunc::MaxTimestamp).boxed(),
Just(AggregateFunc::MaxDate).boxed(),
Just(AggregateFunc::MaxTimestampTz).boxed(),
Just(AggregateFunc::MaxInterval).boxed(),
Just(AggregateFunc::MaxTime).boxed(),
Just(AggregateFunc::MinNumeric).boxed(),
Just(AggregateFunc::MinInt16).boxed(),
Just(AggregateFunc::MinInt32).boxed(),
Just(AggregateFunc::MinInt64).boxed(),
Just(AggregateFunc::MinUInt16).boxed(),
Just(AggregateFunc::MinUInt32).boxed(),
Just(AggregateFunc::MinUInt64).boxed(),
Just(AggregateFunc::MinMzTimestamp).boxed(),
Just(AggregateFunc::MinFloat32).boxed(),
Just(AggregateFunc::MinFloat64).boxed(),
Just(AggregateFunc::MinBool).boxed(),
Just(AggregateFunc::MinString).boxed(),
Just(AggregateFunc::MinDate).boxed(),
Just(AggregateFunc::MinTimestamp).boxed(),
Just(AggregateFunc::MinTimestampTz).boxed(),
Just(AggregateFunc::MinInterval).boxed(),
Just(AggregateFunc::MinTime).boxed(),
Just(AggregateFunc::SumInt16).boxed(),
Just(AggregateFunc::SumInt32).boxed(),
Just(AggregateFunc::SumInt64).boxed(),
Just(AggregateFunc::SumUInt16).boxed(),
Just(AggregateFunc::SumUInt32).boxed(),
Just(AggregateFunc::SumUInt64).boxed(),
Just(AggregateFunc::SumFloat32).boxed(),
Just(AggregateFunc::SumFloat64).boxed(),
Just(AggregateFunc::SumNumeric).boxed(),
Just(AggregateFunc::Count).boxed(),
Just(AggregateFunc::Any).boxed(),
Just(AggregateFunc::All).boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::JsonbAgg { order_by })
.boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::JsonbObjectAgg { order_by })
.boxed(),
(
vec(proptest_any::<ColumnOrder>(), 1..4),
proptest_any::<ScalarType>(),
)
.prop_map(|(order_by, value_type)| AggregateFunc::MapAgg {
order_by,
value_type,
})
.boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::ArrayConcat { order_by })
.boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::ListConcat { order_by })
.boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::StringAgg { order_by })
.boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::RowNumber { order_by })
.boxed(),
vec(proptest_any::<ColumnOrder>(), 1..4)
.prop_map(|order_by| AggregateFunc::DenseRank { order_by })
.boxed(),
(
vec(proptest_any::<ColumnOrder>(), 1..4),
proptest_any::<LagLeadType>(),
proptest_any::<bool>(),
)
.prop_map(
|(order_by, lag_lead, ignore_nulls)| AggregateFunc::LagLead {
order_by,
lag_lead,
ignore_nulls,
},
)
.boxed(),
(
vec(proptest_any::<ColumnOrder>(), 1..4),
proptest_any::<WindowFrame>(),
)
.prop_map(|(order_by, window_frame)| AggregateFunc::FirstValue {
order_by,
window_frame,
})
.boxed(),
(
vec(proptest_any::<ColumnOrder>(), 1..4),
proptest_any::<WindowFrame>(),
)
.prop_map(|(order_by, window_frame)| AggregateFunc::LastValue {
order_by,
window_frame,
})
.boxed(),
Just(AggregateFunc::Dummy).boxed(),
])
}
}
impl RustType<ProtoColumnOrders> for Vec<ColumnOrder> {
fn into_proto(&self) -> ProtoColumnOrders {
ProtoColumnOrders {
orders: self.into_proto(),
}
}
fn from_proto(proto: ProtoColumnOrders) -> Result<Self, TryFromProtoError> {
proto.orders.into_rust()
}
}
impl RustType<ProtoAggregateFunc> for AggregateFunc {
fn into_proto(&self) -> ProtoAggregateFunc {
use proto_aggregate_func::Kind;
ProtoAggregateFunc {
kind: Some(match self {
AggregateFunc::MaxNumeric => Kind::MaxNumeric(()),
AggregateFunc::MaxInt16 => Kind::MaxInt16(()),
AggregateFunc::MaxInt32 => Kind::MaxInt32(()),
AggregateFunc::MaxInt64 => Kind::MaxInt64(()),
AggregateFunc::MaxUInt16 => Kind::MaxUint16(()),
AggregateFunc::MaxUInt32 => Kind::MaxUint32(()),
AggregateFunc::MaxUInt64 => Kind::MaxUint64(()),
AggregateFunc::MaxMzTimestamp => Kind::MaxMzTimestamp(()),
AggregateFunc::MaxFloat32 => Kind::MaxFloat32(()),
AggregateFunc::MaxFloat64 => Kind::MaxFloat64(()),
AggregateFunc::MaxBool => Kind::MaxBool(()),
AggregateFunc::MaxString => Kind::MaxString(()),
AggregateFunc::MaxDate => Kind::MaxDate(()),
AggregateFunc::MaxTimestamp => Kind::MaxTimestamp(()),
AggregateFunc::MaxTimestampTz => Kind::MaxTimestampTz(()),
AggregateFunc::MinNumeric => Kind::MinNumeric(()),
AggregateFunc::MaxInterval => Kind::MaxInterval(()),
AggregateFunc::MaxTime => Kind::MaxTime(()),
AggregateFunc::MinInt16 => Kind::MinInt16(()),
AggregateFunc::MinInt32 => Kind::MinInt32(()),
AggregateFunc::MinInt64 => Kind::MinInt64(()),
AggregateFunc::MinUInt16 => Kind::MinUint16(()),
AggregateFunc::MinUInt32 => Kind::MinUint32(()),
AggregateFunc::MinUInt64 => Kind::MinUint64(()),
AggregateFunc::MinMzTimestamp => Kind::MinMzTimestamp(()),
AggregateFunc::MinFloat32 => Kind::MinFloat32(()),
AggregateFunc::MinFloat64 => Kind::MinFloat64(()),
AggregateFunc::MinBool => Kind::MinBool(()),
AggregateFunc::MinString => Kind::MinString(()),
AggregateFunc::MinDate => Kind::MinDate(()),
AggregateFunc::MinTimestamp => Kind::MinTimestamp(()),
AggregateFunc::MinTimestampTz => Kind::MinTimestampTz(()),
AggregateFunc::MinInterval => Kind::MinInterval(()),
AggregateFunc::MinTime => Kind::MinTime(()),
AggregateFunc::SumInt16 => Kind::SumInt16(()),
AggregateFunc::SumInt32 => Kind::SumInt32(()),
AggregateFunc::SumInt64 => Kind::SumInt64(()),
AggregateFunc::SumUInt16 => Kind::SumUint16(()),
AggregateFunc::SumUInt32 => Kind::SumUint32(()),
AggregateFunc::SumUInt64 => Kind::SumUint64(()),
AggregateFunc::SumFloat32 => Kind::SumFloat32(()),
AggregateFunc::SumFloat64 => Kind::SumFloat64(()),
AggregateFunc::SumNumeric => Kind::SumNumeric(()),
AggregateFunc::Count => Kind::Count(()),
AggregateFunc::Any => Kind::Any(()),
AggregateFunc::All => Kind::All(()),
AggregateFunc::JsonbAgg { order_by } => Kind::JsonbAgg(order_by.into_proto()),
AggregateFunc::JsonbObjectAgg { order_by } => {
Kind::JsonbObjectAgg(order_by.into_proto())
}
AggregateFunc::MapAgg {
order_by,
value_type,
} => Kind::MapAgg(proto_aggregate_func::ProtoMapAgg {
order_by: Some(order_by.into_proto()),
value_type: Some(value_type.into_proto()),
}),
AggregateFunc::ArrayConcat { order_by } => Kind::ArrayConcat(order_by.into_proto()),
AggregateFunc::ListConcat { order_by } => Kind::ListConcat(order_by.into_proto()),
AggregateFunc::StringAgg { order_by } => Kind::StringAgg(order_by.into_proto()),
AggregateFunc::RowNumber { order_by } => Kind::RowNumber(order_by.into_proto()),
AggregateFunc::Rank { order_by } => Kind::Rank(order_by.into_proto()),
AggregateFunc::DenseRank { order_by } => Kind::DenseRank(order_by.into_proto()),
AggregateFunc::LagLead {
order_by,
lag_lead,
ignore_nulls,
} => Kind::LagLead(proto_aggregate_func::ProtoLagLead {
order_by: Some(order_by.into_proto()),
lag_lead: Some(match lag_lead {
LagLeadType::Lag => proto_aggregate_func::proto_lag_lead::LagLead::Lag(()),
LagLeadType::Lead => {
proto_aggregate_func::proto_lag_lead::LagLead::Lead(())
}
}),
ignore_nulls: *ignore_nulls,
}),
AggregateFunc::FirstValue {
order_by,
window_frame,
} => Kind::FirstValue(proto_aggregate_func::ProtoFramedWindowFunc {
order_by: Some(order_by.into_proto()),
window_frame: Some(window_frame.into_proto()),
}),
AggregateFunc::LastValue {
order_by,
window_frame,
} => Kind::LastValue(proto_aggregate_func::ProtoFramedWindowFunc {
order_by: Some(order_by.into_proto()),
window_frame: Some(window_frame.into_proto()),
}),
AggregateFunc::WindowAggregate {
wrapped_aggregate,
order_by,
window_frame,
} => Kind::WindowAggregate(Box::new(proto_aggregate_func::ProtoWindowAggregate {
wrapped_aggregate: Some(wrapped_aggregate.into_proto()),
order_by: Some(order_by.into_proto()),
window_frame: Some(window_frame.into_proto()),
})),
AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
Kind::FusedValueWindowFunc(ProtoFusedValueWindowFunc {
funcs: funcs.into_proto(),
order_by: Some(order_by.into_proto()),
})
}
AggregateFunc::Dummy => Kind::Dummy(()),
}),
}
}
fn from_proto(proto: ProtoAggregateFunc) -> Result<Self, TryFromProtoError> {
use proto_aggregate_func::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoAggregateFunc::kind"))?;
Ok(match kind {
Kind::MaxNumeric(()) => AggregateFunc::MaxNumeric,
Kind::MaxInt16(()) => AggregateFunc::MaxInt16,
Kind::MaxInt32(()) => AggregateFunc::MaxInt32,
Kind::MaxInt64(()) => AggregateFunc::MaxInt64,
Kind::MaxUint16(()) => AggregateFunc::MaxUInt16,
Kind::MaxUint32(()) => AggregateFunc::MaxUInt32,
Kind::MaxUint64(()) => AggregateFunc::MaxUInt64,
Kind::MaxMzTimestamp(()) => AggregateFunc::MaxMzTimestamp,
Kind::MaxFloat32(()) => AggregateFunc::MaxFloat32,
Kind::MaxFloat64(()) => AggregateFunc::MaxFloat64,
Kind::MaxBool(()) => AggregateFunc::MaxBool,
Kind::MaxString(()) => AggregateFunc::MaxString,
Kind::MaxDate(()) => AggregateFunc::MaxDate,
Kind::MaxTimestamp(()) => AggregateFunc::MaxTimestamp,
Kind::MaxTimestampTz(()) => AggregateFunc::MaxTimestampTz,
Kind::MaxInterval(()) => AggregateFunc::MaxInterval,
Kind::MaxTime(()) => AggregateFunc::MaxTime,
Kind::MinNumeric(()) => AggregateFunc::MinNumeric,
Kind::MinInt16(()) => AggregateFunc::MinInt16,
Kind::MinInt32(()) => AggregateFunc::MinInt32,
Kind::MinInt64(()) => AggregateFunc::MinInt64,
Kind::MinUint16(()) => AggregateFunc::MinUInt16,
Kind::MinUint32(()) => AggregateFunc::MinUInt32,
Kind::MinUint64(()) => AggregateFunc::MinUInt64,
Kind::MinMzTimestamp(()) => AggregateFunc::MinMzTimestamp,
Kind::MinFloat32(()) => AggregateFunc::MinFloat32,
Kind::MinFloat64(()) => AggregateFunc::MinFloat64,
Kind::MinBool(()) => AggregateFunc::MinBool,
Kind::MinString(()) => AggregateFunc::MinString,
Kind::MinDate(()) => AggregateFunc::MinDate,
Kind::MinTimestamp(()) => AggregateFunc::MinTimestamp,
Kind::MinTimestampTz(()) => AggregateFunc::MinTimestampTz,
Kind::MinInterval(()) => AggregateFunc::MinInterval,
Kind::MinTime(()) => AggregateFunc::MinTime,
Kind::SumInt16(()) => AggregateFunc::SumInt16,
Kind::SumInt32(()) => AggregateFunc::SumInt32,
Kind::SumInt64(()) => AggregateFunc::SumInt64,
Kind::SumUint16(()) => AggregateFunc::SumUInt16,
Kind::SumUint32(()) => AggregateFunc::SumUInt32,
Kind::SumUint64(()) => AggregateFunc::SumUInt64,
Kind::SumFloat32(()) => AggregateFunc::SumFloat32,
Kind::SumFloat64(()) => AggregateFunc::SumFloat64,
Kind::SumNumeric(()) => AggregateFunc::SumNumeric,
Kind::Count(()) => AggregateFunc::Count,
Kind::Any(()) => AggregateFunc::Any,
Kind::All(()) => AggregateFunc::All,
Kind::JsonbAgg(order_by) => AggregateFunc::JsonbAgg {
order_by: order_by.into_rust()?,
},
Kind::JsonbObjectAgg(order_by) => AggregateFunc::JsonbObjectAgg {
order_by: order_by.into_rust()?,
},
Kind::MapAgg(pma) => AggregateFunc::MapAgg {
order_by: pma.order_by.into_rust_if_some("ProtoMapAgg::order_by")?,
value_type: pma
.value_type
.into_rust_if_some("ProtoMapAgg::value_type")?,
},
Kind::ArrayConcat(order_by) => AggregateFunc::ArrayConcat {
order_by: order_by.into_rust()?,
},
Kind::ListConcat(order_by) => AggregateFunc::ListConcat {
order_by: order_by.into_rust()?,
},
Kind::StringAgg(order_by) => AggregateFunc::StringAgg {
order_by: order_by.into_rust()?,
},
Kind::RowNumber(order_by) => AggregateFunc::RowNumber {
order_by: order_by.into_rust()?,
},
Kind::Rank(order_by) => AggregateFunc::Rank {
order_by: order_by.into_rust()?,
},
Kind::DenseRank(order_by) => AggregateFunc::DenseRank {
order_by: order_by.into_rust()?,
},
Kind::LagLead(pll) => AggregateFunc::LagLead {
order_by: pll.order_by.into_rust_if_some("ProtoLagLead::order_by")?,
lag_lead: match pll.lag_lead {
Some(proto_aggregate_func::proto_lag_lead::LagLead::Lag(())) => {
LagLeadType::Lag
}
Some(proto_aggregate_func::proto_lag_lead::LagLead::Lead(())) => {
LagLeadType::Lead
}
None => {
return Err(TryFromProtoError::MissingField(
"ProtoLagLead::lag_lead".into(),
))
}
},
ignore_nulls: pll.ignore_nulls,
},
Kind::FirstValue(pfv) => AggregateFunc::FirstValue {
order_by: pfv
.order_by
.into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
window_frame: pfv
.window_frame
.into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
},
Kind::LastValue(pfv) => AggregateFunc::LastValue {
order_by: pfv
.order_by
.into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
window_frame: pfv
.window_frame
.into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
},
Kind::WindowAggregate(paf) => AggregateFunc::WindowAggregate {
wrapped_aggregate: paf
.wrapped_aggregate
.into_rust_if_some("ProtoWindowAggregate::wrapped_aggregate")?,
order_by: paf
.order_by
.into_rust_if_some("ProtoWindowAggregate::order_by")?,
window_frame: paf
.window_frame
.into_rust_if_some("ProtoWindowAggregate::window_frame")?,
},
Kind::FusedValueWindowFunc(fvwf) => AggregateFunc::FusedValueWindowFunc {
funcs: fvwf.funcs.into_rust()?,
order_by: fvwf
.order_by
.into_rust_if_some("ProtoFusedValueWindowFunc::order_by")?,
},
Kind::Dummy(()) => AggregateFunc::Dummy,
})
}
}
impl AggregateFunc {
pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
{
match self {
AggregateFunc::MaxNumeric => {
max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
}
AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
AggregateFunc::MaxString => max_string(datums),
AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
AggregateFunc::MaxTimestamp => {
max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
}
AggregateFunc::MaxTimestampTz => {
max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
}
AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
AggregateFunc::MinNumeric => {
min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
}
AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
AggregateFunc::MinString => min_string(datums),
AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
AggregateFunc::MinTimestamp => {
min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
}
AggregateFunc::MinTimestampTz => {
min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
}
AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
AggregateFunc::SumNumeric => sum_numeric(datums),
AggregateFunc::Count => count(datums),
AggregateFunc::Any => any(datums),
AggregateFunc::All => all(datums),
AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
dict_agg(datums, temp_storage, order_by)
}
AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
AggregateFunc::LagLead {
order_by,
lag_lead: lag_lead_type,
ignore_nulls,
} => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
AggregateFunc::FirstValue {
order_by,
window_frame,
} => first_value(datums, temp_storage, order_by, window_frame),
AggregateFunc::LastValue {
order_by,
window_frame,
} => last_value(datums, temp_storage, order_by, window_frame),
AggregateFunc::WindowAggregate {
wrapped_aggregate,
order_by,
window_frame,
} => window_aggr::<_, NaiveOneByOneAggr>(
datums,
temp_storage,
wrapped_aggregate,
order_by,
window_frame,
),
AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
fused_value_window_func(datums, temp_storage, funcs, order_by)
}
AggregateFunc::Dummy => Datum::Dummy,
}
}
pub fn eval_with_fast_window_agg<'a, I, W>(
&self,
datums: I,
temp_storage: &'a RowArena,
) -> Datum<'a>
where
I: IntoIterator<Item = Datum<'a>>,
W: OneByOneAggr,
{
match self {
AggregateFunc::WindowAggregate {
wrapped_aggregate,
order_by,
window_frame,
} => window_aggr::<_, W>(
datums,
temp_storage,
wrapped_aggregate,
order_by,
window_frame,
),
_ => self.eval(datums, temp_storage),
}
}
pub fn eval_with_unnest_list<'a, I, W>(
&self,
datums: I,
temp_storage: &'a RowArena,
) -> impl Iterator<Item = Datum<'a>>
where
I: IntoIterator<Item = Datum<'a>>,
W: OneByOneAggr,
{
assert!(self.can_fuse_with_unnest_list());
match self {
AggregateFunc::RowNumber { order_by } => {
row_number_no_list(datums, temp_storage, order_by).collect_vec()
}
AggregateFunc::Rank { order_by } => {
rank_no_list(datums, temp_storage, order_by).collect_vec()
}
AggregateFunc::DenseRank { order_by } => {
dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
}
AggregateFunc::LagLead {
order_by,
lag_lead: lag_lead_type,
ignore_nulls,
} => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
.collect_vec(),
AggregateFunc::FirstValue {
order_by,
window_frame,
} => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
AggregateFunc::LastValue {
order_by,
window_frame,
} => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
}
AggregateFunc::WindowAggregate {
wrapped_aggregate,
order_by,
window_frame,
} => window_aggr_no_list::<_, W>(
datums,
temp_storage,
wrapped_aggregate,
order_by,
window_frame,
)
.collect_vec(),
_ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
}
.into_iter()
}
pub fn default(&self) -> Datum<'static> {
match self {
AggregateFunc::Count => Datum::Int64(0),
AggregateFunc::Any => Datum::False,
AggregateFunc::All => Datum::True,
AggregateFunc::Dummy => Datum::Dummy,
_ => Datum::Null,
}
}
pub fn identity_datum(&self) -> Datum<'static> {
match self {
AggregateFunc::Any => Datum::False,
AggregateFunc::All => Datum::True,
AggregateFunc::Dummy => Datum::Dummy,
AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
AggregateFunc::ListConcat { .. } => Datum::empty_list(),
AggregateFunc::RowNumber { .. } => Datum::empty_list(),
AggregateFunc::Rank { .. } => Datum::empty_list(),
AggregateFunc::DenseRank { .. } => Datum::empty_list(),
AggregateFunc::LagLead { .. } => Datum::empty_list(),
AggregateFunc::FirstValue { .. } => Datum::empty_list(),
AggregateFunc::LastValue { .. } => Datum::empty_list(),
AggregateFunc::WindowAggregate { .. } => Datum::empty_list(),
AggregateFunc::FusedValueWindowFunc { .. } => Datum::empty_list(),
AggregateFunc::MaxNumeric
| AggregateFunc::MaxInt16
| AggregateFunc::MaxInt32
| AggregateFunc::MaxInt64
| AggregateFunc::MaxUInt16
| AggregateFunc::MaxUInt32
| AggregateFunc::MaxUInt64
| AggregateFunc::MaxMzTimestamp
| AggregateFunc::MaxFloat32
| AggregateFunc::MaxFloat64
| AggregateFunc::MaxBool
| AggregateFunc::MaxString
| AggregateFunc::MaxDate
| AggregateFunc::MaxTimestamp
| AggregateFunc::MaxTimestampTz
| AggregateFunc::MaxInterval
| AggregateFunc::MaxTime
| AggregateFunc::MinNumeric
| AggregateFunc::MinInt16
| AggregateFunc::MinInt32
| AggregateFunc::MinInt64
| AggregateFunc::MinUInt16
| AggregateFunc::MinUInt32
| AggregateFunc::MinUInt64
| AggregateFunc::MinMzTimestamp
| AggregateFunc::MinFloat32
| AggregateFunc::MinFloat64
| AggregateFunc::MinBool
| AggregateFunc::MinString
| AggregateFunc::MinDate
| AggregateFunc::MinTimestamp
| AggregateFunc::MinTimestampTz
| AggregateFunc::MinInterval
| AggregateFunc::MinTime
| AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64
| AggregateFunc::SumFloat32
| AggregateFunc::SumFloat64
| AggregateFunc::SumNumeric
| AggregateFunc::Count
| AggregateFunc::JsonbAgg { .. }
| AggregateFunc::JsonbObjectAgg { .. }
| AggregateFunc::MapAgg { .. }
| AggregateFunc::StringAgg { .. } => Datum::Null,
}
}
pub fn can_fuse_with_unnest_list(&self) -> bool {
match self {
AggregateFunc::RowNumber { .. }
| AggregateFunc::Rank { .. }
| AggregateFunc::DenseRank { .. }
| AggregateFunc::LagLead { .. }
| AggregateFunc::FirstValue { .. }
| AggregateFunc::LastValue { .. }
| AggregateFunc::WindowAggregate { .. }
| AggregateFunc::FusedValueWindowFunc { .. } => true,
AggregateFunc::ArrayConcat { .. }
| AggregateFunc::ListConcat { .. }
| AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::Dummy
| AggregateFunc::MaxNumeric
| AggregateFunc::MaxInt16
| AggregateFunc::MaxInt32
| AggregateFunc::MaxInt64
| AggregateFunc::MaxUInt16
| AggregateFunc::MaxUInt32
| AggregateFunc::MaxUInt64
| AggregateFunc::MaxMzTimestamp
| AggregateFunc::MaxFloat32
| AggregateFunc::MaxFloat64
| AggregateFunc::MaxBool
| AggregateFunc::MaxString
| AggregateFunc::MaxDate
| AggregateFunc::MaxTimestamp
| AggregateFunc::MaxTimestampTz
| AggregateFunc::MaxInterval
| AggregateFunc::MaxTime
| AggregateFunc::MinNumeric
| AggregateFunc::MinInt16
| AggregateFunc::MinInt32
| AggregateFunc::MinInt64
| AggregateFunc::MinUInt16
| AggregateFunc::MinUInt32
| AggregateFunc::MinUInt64
| AggregateFunc::MinMzTimestamp
| AggregateFunc::MinFloat32
| AggregateFunc::MinFloat64
| AggregateFunc::MinBool
| AggregateFunc::MinString
| AggregateFunc::MinDate
| AggregateFunc::MinTimestamp
| AggregateFunc::MinTimestampTz
| AggregateFunc::MinInterval
| AggregateFunc::MinTime
| AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64
| AggregateFunc::SumFloat32
| AggregateFunc::SumFloat64
| AggregateFunc::SumNumeric
| AggregateFunc::Count
| AggregateFunc::JsonbAgg { .. }
| AggregateFunc::JsonbObjectAgg { .. }
| AggregateFunc::MapAgg { .. }
| AggregateFunc::StringAgg { .. } => false,
}
}
pub fn output_type(&self, input_type: ColumnType) -> ColumnType {
let scalar_type = match self {
AggregateFunc::Count => ScalarType::Int64,
AggregateFunc::Any => ScalarType::Bool,
AggregateFunc::All => ScalarType::Bool,
AggregateFunc::JsonbAgg { .. } => ScalarType::Jsonb,
AggregateFunc::JsonbObjectAgg { .. } => ScalarType::Jsonb,
AggregateFunc::SumInt16 => ScalarType::Int64,
AggregateFunc::SumInt32 => ScalarType::Int64,
AggregateFunc::SumInt64 => ScalarType::Numeric {
max_scale: Some(NumericMaxScale::ZERO),
},
AggregateFunc::SumUInt16 => ScalarType::UInt64,
AggregateFunc::SumUInt32 => ScalarType::UInt64,
AggregateFunc::SumUInt64 => ScalarType::Numeric {
max_scale: Some(NumericMaxScale::ZERO),
},
AggregateFunc::MapAgg { value_type, .. } => ScalarType::Map {
value_type: Box::new(value_type.clone()),
custom_id: None,
},
AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
match input_type.scalar_type {
ScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
_ => unreachable!(),
}
}
AggregateFunc::StringAgg { .. } => ScalarType::String,
AggregateFunc::RowNumber { .. } => {
AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
}
AggregateFunc::Rank { .. } => {
AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
}
AggregateFunc::DenseRank { .. } => {
AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
}
AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
let fields = input_type.scalar_type.unwrap_record_element_type();
let original_row_type = fields[0].unwrap_record_element_type()[0]
.clone()
.nullable(false);
let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
let column_name = Self::lag_lead_result_column_name(lag_lead_type);
ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(column_name, output_type_inner),
(ColumnName::from("?orig_row?"), original_row_type),
],
custom_id: None,
}),
custom_id: None,
}
}
AggregateFunc::FirstValue { .. } => {
let fields = input_type.scalar_type.unwrap_record_element_type();
let original_row_type = fields[0].unwrap_record_element_type()[0]
.clone()
.nullable(false);
let value_type = fields[0].unwrap_record_element_type()[1]
.clone()
.nullable(true); ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(ColumnName::from("?first_value?"), value_type),
(ColumnName::from("?orig_row?"), original_row_type),
],
custom_id: None,
}),
custom_id: None,
}
}
AggregateFunc::LastValue { .. } => {
let fields = input_type.scalar_type.unwrap_record_element_type();
let original_row_type = fields[0].unwrap_record_element_type()[0]
.clone()
.nullable(false);
let value_type = fields[0].unwrap_record_element_type()[1]
.clone()
.nullable(true); ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(ColumnName::from("?last_value?"), value_type),
(ColumnName::from("?orig_row?"), original_row_type),
],
custom_id: None,
}),
custom_id: None,
}
}
AggregateFunc::WindowAggregate {
wrapped_aggregate, ..
} => {
let fields = input_type.scalar_type.unwrap_record_element_type();
let original_row_type = fields[0].unwrap_record_element_type()[0]
.clone()
.nullable(false);
let arg_type = fields[0].unwrap_record_element_type()[1]
.clone()
.nullable(true);
let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
(ColumnName::from("?orig_row?"), original_row_type),
],
custom_id: None,
}),
custom_id: None,
}
}
AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
let fields = input_type.scalar_type.unwrap_record_element_type();
let original_row_type = fields[0].unwrap_record_element_type()[0]
.clone()
.nullable(false);
let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(ColumnName::from("?fused_value_window_func?"), ScalarType::Record {
fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
match func {
AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
(
Self::lag_lead_result_column_name(lag_lead_type),
Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
)
},
AggregateFunc::FirstValue { .. } => {
(
ColumnName::from("?first_value?"),
arg_type.clone().nullable(true),
)
}
AggregateFunc::LastValue { .. } => {
(
ColumnName::from("?last_value?"),
arg_type.clone().nullable(true),
)
}
_ => panic!("FusedValueWindowFunc has an unknown function"),
}
}).collect(),
custom_id: None,
}.nullable(false)),
(ColumnName::from("?orig_row?"), original_row_type),
],
custom_id: None,
}),
custom_id: None,
}
}
AggregateFunc::Dummy
| AggregateFunc::MaxNumeric
| AggregateFunc::MaxInt16
| AggregateFunc::MaxInt32
| AggregateFunc::MaxInt64
| AggregateFunc::MaxUInt16
| AggregateFunc::MaxUInt32
| AggregateFunc::MaxUInt64
| AggregateFunc::MaxMzTimestamp
| AggregateFunc::MaxFloat32
| AggregateFunc::MaxFloat64
| AggregateFunc::MaxBool
| AggregateFunc::MaxString
| AggregateFunc::MaxDate
| AggregateFunc::MaxTimestamp
| AggregateFunc::MaxTimestampTz
| AggregateFunc::MaxInterval
| AggregateFunc::MaxTime
| AggregateFunc::MinNumeric
| AggregateFunc::MinInt16
| AggregateFunc::MinInt32
| AggregateFunc::MinInt64
| AggregateFunc::MinUInt16
| AggregateFunc::MinUInt32
| AggregateFunc::MinUInt64
| AggregateFunc::MinMzTimestamp
| AggregateFunc::MinFloat32
| AggregateFunc::MinFloat64
| AggregateFunc::MinBool
| AggregateFunc::MinString
| AggregateFunc::MinDate
| AggregateFunc::MinTimestamp
| AggregateFunc::MinTimestampTz
| AggregateFunc::MinInterval
| AggregateFunc::MinTime
| AggregateFunc::SumFloat32
| AggregateFunc::SumFloat64
| AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
};
let nullable = match self {
AggregateFunc::Count => false,
AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
ScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
ScalarType::Record { fields, .. } => fields[0].1.nullable,
_ => unreachable!(),
},
_ => unreachable!(),
},
_ => input_type.nullable,
};
scalar_type.nullable(nullable)
}
fn output_type_ranking_window_funcs(input_type: &ColumnType, col_name: &str) -> ScalarType {
match input_type.scalar_type {
ScalarType::Record { ref fields, .. } => ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(
ColumnName::from(col_name),
ScalarType::Int64.nullable(false),
),
(ColumnName::from("?orig_row?"), {
let inner = match &fields[0].1.scalar_type {
ScalarType::List { element_type, .. } => element_type.clone(),
_ => unreachable!(),
};
inner.nullable(false)
}),
],
custom_id: None,
}),
custom_id: None,
},
_ => unreachable!(),
}
}
fn lag_lead_output_type_inner_from_encoded_args(encoded_args_type: &ScalarType) -> ColumnType {
encoded_args_type.unwrap_record_element_type()[0]
.clone()
.nullable(true)
}
fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
ColumnName::from(match lag_lead_type {
LagLeadType::Lag => "?lag?",
LagLeadType::Lead => "?lead?",
})
}
pub fn propagates_nonnull_constraint(&self) -> bool {
match self {
AggregateFunc::MaxNumeric
| AggregateFunc::MaxInt16
| AggregateFunc::MaxInt32
| AggregateFunc::MaxInt64
| AggregateFunc::MaxUInt16
| AggregateFunc::MaxUInt32
| AggregateFunc::MaxUInt64
| AggregateFunc::MaxMzTimestamp
| AggregateFunc::MaxFloat32
| AggregateFunc::MaxFloat64
| AggregateFunc::MaxBool
| AggregateFunc::MaxString
| AggregateFunc::MaxDate
| AggregateFunc::MaxTimestamp
| AggregateFunc::MaxTimestampTz
| AggregateFunc::MinNumeric
| AggregateFunc::MinInt16
| AggregateFunc::MinInt32
| AggregateFunc::MinInt64
| AggregateFunc::MinUInt16
| AggregateFunc::MinUInt32
| AggregateFunc::MinUInt64
| AggregateFunc::MinMzTimestamp
| AggregateFunc::MinFloat32
| AggregateFunc::MinFloat64
| AggregateFunc::MinBool
| AggregateFunc::MinString
| AggregateFunc::MinDate
| AggregateFunc::MinTimestamp
| AggregateFunc::MinTimestampTz
| AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64
| AggregateFunc::SumFloat32
| AggregateFunc::SumFloat64
| AggregateFunc::SumNumeric
| AggregateFunc::StringAgg { .. } => true,
AggregateFunc::Count => false,
_ => false,
}
}
}
fn jsonb_each<'a>(
a: Datum<'a>,
temp_storage: &'a RowArena,
stringify: bool,
) -> impl Iterator<Item = (Row, Diff)> + 'a {
let map = match a {
Datum::Map(dict) => dict,
_ => mz_repr::DatumMap::empty(),
};
map.iter().map(move |(k, mut v)| {
if stringify {
v = jsonb_stringify(v, temp_storage);
}
(Row::pack_slice(&[Datum::String(k), v]), 1)
})
}
fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
let map = match a {
Datum::Map(dict) => dict,
_ => mz_repr::DatumMap::empty(),
};
map.iter()
.map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), 1))
}
fn jsonb_array_elements<'a>(
a: Datum<'a>,
temp_storage: &'a RowArena,
stringify: bool,
) -> impl Iterator<Item = (Row, Diff)> + 'a {
let list = match a {
Datum::List(list) => list,
_ => mz_repr::DatumList::empty(),
};
list.iter().map(move |mut e| {
if stringify {
e = jsonb_stringify(e, temp_storage);
}
(Row::pack_slice(&[e]), 1)
})
}
fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
let r = r.inner();
let a = a.unwrap_str();
let captures = r.captures(a)?;
let datums = captures
.iter()
.skip(1)
.map(|m| Datum::from(m.map(|m| m.as_str())));
Some((Row::pack(datums), 1))
}
fn generate_series<N>(
start: N,
stop: N,
step: N,
) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
where
N: Integer + Signed + CheckedAdd + Clone,
Datum<'static>: From<N>,
{
if step == N::zero() {
return Err(EvalError::InvalidParameterValue(
"step size cannot equal zero".to_owned(),
));
}
Ok(num::range_step_inclusive(start, stop, step)
.map(move |i| (Row::pack_slice(&[Datum::from(i)]), 1)))
}
#[derive(Clone)]
pub struct TimestampRangeStepInclusive<T> {
state: CheckedTimestamp<T>,
stop: CheckedTimestamp<T>,
step: Interval,
rev: bool,
done: bool,
}
impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
type Item = CheckedTimestamp<T>;
#[inline]
fn next(&mut self) -> Option<CheckedTimestamp<T>> {
if !self.done
&& ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
{
let result = self.state.clone();
match add_timestamp_months(self.state.deref(), self.step.months) {
Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
Some(v) => match CheckedTimestamp::from_timestamplike(v) {
Ok(v) => self.state = v,
Err(_) => self.done = true,
},
None => self.done = true,
},
Err(..) => {
self.done = true;
}
}
Some(result)
} else {
None
}
}
}
fn generate_series_ts<T: TimestampLike>(
start: CheckedTimestamp<T>,
stop: CheckedTimestamp<T>,
step: Interval,
conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
let normalized_step = step.as_microseconds();
if normalized_step == 0 {
return Err(EvalError::InvalidParameterValue(
"step size cannot equal zero".to_owned(),
));
}
let rev = normalized_step < 0;
let trsi = TimestampRangeStepInclusive {
state: start,
stop,
step,
rev,
done: false,
};
Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), 1)))
}
fn generate_subscripts_array(
a: Datum,
dim: i32,
) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
if dim <= 0 {
return Ok(Box::new(iter::empty()));
}
match a.unwrap_array().dims().into_iter().nth(
(dim - 1)
.try_into()
.map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string()))?,
) {
Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
requested_dim
.lower_bound
.try_into()
.map_err(|_| EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string()))?,
requested_dim
.length
.try_into()
.map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string()))?,
1,
)?)),
None => Ok(Box::new(iter::empty())),
}
}
fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
a.unwrap_array()
.elements()
.iter()
.map(move |e| (Row::pack_slice(&[e]), 1))
}
fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
a.unwrap_list()
.iter()
.map(move |e| (Row::pack_slice(&[e]), 1))
}
fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
a.unwrap_map()
.iter()
.map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), 1))
}
impl AggregateFunc {
pub fn name(&self) -> &'static str {
match self {
Self::MaxNumeric => "max",
Self::MaxInt16 => "max",
Self::MaxInt32 => "max",
Self::MaxInt64 => "max",
Self::MaxUInt16 => "max",
Self::MaxUInt32 => "max",
Self::MaxUInt64 => "max",
Self::MaxMzTimestamp => "max",
Self::MaxFloat32 => "max",
Self::MaxFloat64 => "max",
Self::MaxBool => "max",
Self::MaxString => "max",
Self::MaxDate => "max",
Self::MaxTimestamp => "max",
Self::MaxTimestampTz => "max",
Self::MaxInterval => "max",
Self::MaxTime => "max",
Self::MinNumeric => "min",
Self::MinInt16 => "min",
Self::MinInt32 => "min",
Self::MinInt64 => "min",
Self::MinUInt16 => "min",
Self::MinUInt32 => "min",
Self::MinUInt64 => "min",
Self::MinMzTimestamp => "min",
Self::MinFloat32 => "min",
Self::MinFloat64 => "min",
Self::MinBool => "min",
Self::MinString => "min",
Self::MinDate => "min",
Self::MinTimestamp => "min",
Self::MinTimestampTz => "min",
Self::MinInterval => "min",
Self::MinTime => "min",
Self::SumInt16 => "sum",
Self::SumInt32 => "sum",
Self::SumInt64 => "sum",
Self::SumUInt16 => "sum",
Self::SumUInt32 => "sum",
Self::SumUInt64 => "sum",
Self::SumFloat32 => "sum",
Self::SumFloat64 => "sum",
Self::SumNumeric => "sum",
Self::Count => "count",
Self::Any => "any",
Self::All => "all",
Self::JsonbAgg { .. } => "jsonb_agg",
Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
Self::MapAgg { .. } => "map_agg",
Self::ArrayConcat { .. } => "array_agg",
Self::ListConcat { .. } => "list_agg",
Self::StringAgg { .. } => "string_agg",
Self::RowNumber { .. } => "row_number",
Self::Rank { .. } => "rank",
Self::DenseRank { .. } => "dense_rank",
Self::LagLead {
lag_lead: LagLeadType::Lag,
..
} => "lag",
Self::LagLead {
lag_lead: LagLeadType::Lead,
..
} => "lead",
Self::FirstValue { .. } => "first_value",
Self::LastValue { .. } => "last_value",
Self::WindowAggregate { .. } => "window_agg",
Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
Self::Dummy => "dummy",
}
}
}
impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
where
M: HumanizerMode,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use AggregateFunc::*;
let name = self.expr.name();
match self.expr {
JsonbAgg { order_by }
| JsonbObjectAgg { order_by }
| MapAgg { order_by, .. }
| ArrayConcat { order_by }
| ListConcat { order_by }
| StringAgg { order_by }
| RowNumber { order_by }
| Rank { order_by }
| DenseRank { order_by } => {
let order_by = order_by.iter().map(|col| self.child(col));
write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
}
LagLead {
lag_lead: _,
ignore_nulls,
order_by,
} => {
let order_by = order_by.iter().map(|col| self.child(col));
f.write_str(name)?;
f.write_str("[")?;
if *ignore_nulls {
f.write_str("ignore_nulls=true, ")?;
}
write!(f, "order_by=[{}]", separated(", ", order_by))?;
f.write_str("]")
}
FirstValue {
order_by,
window_frame,
} => {
let order_by = order_by.iter().map(|col| self.child(col));
f.write_str(name)?;
f.write_str("[")?;
write!(f, "order_by=[{}]", separated(", ", order_by))?;
if *window_frame != WindowFrame::default() {
write!(f, " {}", window_frame)?;
}
f.write_str("]")
}
LastValue {
order_by,
window_frame,
} => {
let order_by = order_by.iter().map(|col| self.child(col));
f.write_str(name)?;
f.write_str("[")?;
write!(f, "order_by=[{}]", separated(", ", order_by))?;
if *window_frame != WindowFrame::default() {
write!(f, " {}", window_frame)?;
}
f.write_str("]")
}
WindowAggregate {
wrapped_aggregate,
order_by,
window_frame,
} => {
let order_by = order_by.iter().map(|col| self.child(col));
let wrapped_aggregate = self.child(wrapped_aggregate.deref());
f.write_str(name)?;
f.write_str("[")?;
write!(f, "{} ", wrapped_aggregate)?;
write!(f, "order_by=[{}]", separated(", ", order_by))?;
if *window_frame != WindowFrame::default() {
write!(f, " {}", window_frame)?;
}
f.write_str("]")
}
FusedValueWindowFunc { funcs, order_by } => {
let order_by = order_by.iter().map(|col| self.child(col));
let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
f.write_str(name)?;
f.write_str("[")?;
write!(f, "{} ", funcs)?;
write!(f, "order_by=[{}]", separated(", ", order_by))?;
f.write_str("]")
}
_ => f.write_str(name),
}
}
}
#[derive(
Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
)]
pub struct CaptureGroupDesc {
pub index: u32,
pub name: Option<String>,
pub nullable: bool,
}
impl RustType<ProtoCaptureGroupDesc> for CaptureGroupDesc {
fn into_proto(&self) -> ProtoCaptureGroupDesc {
ProtoCaptureGroupDesc {
index: self.index,
name: self.name.clone(),
nullable: self.nullable,
}
}
fn from_proto(proto: ProtoCaptureGroupDesc) -> Result<Self, TryFromProtoError> {
Ok(Self {
index: proto.index,
name: proto.name,
nullable: proto.nullable,
})
}
}
#[derive(
Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
)]
pub struct AnalyzedRegex(
#[proptest(strategy = "mz_repr::adt::regex::any_regex()")] ReprRegex,
Vec<CaptureGroupDesc>,
);
impl RustType<ProtoAnalyzedRegex> for AnalyzedRegex {
fn into_proto(&self) -> ProtoAnalyzedRegex {
ProtoAnalyzedRegex {
regex: Some(self.0.into_proto()),
groups: self.1.into_proto(),
}
}
fn from_proto(proto: ProtoAnalyzedRegex) -> Result<Self, TryFromProtoError> {
Ok(AnalyzedRegex(
proto.regex.into_rust_if_some("ProtoAnalyzedRegex::regex")?,
proto.groups.into_rust()?,
))
}
}
impl AnalyzedRegex {
pub fn new(s: String) -> Result<Self, regex::Error> {
let r = ReprRegex::new(s, false)?;
#[allow(clippy::as_conversions)]
let descs: Vec<_> = r
.capture_names()
.enumerate()
.skip(1)
.map(|(i, name)| CaptureGroupDesc {
index: i as u32,
name: name.map(String::from),
nullable: true,
})
.collect();
Ok(Self(r, descs))
}
pub fn capture_groups_len(&self) -> usize {
self.1.len()
}
pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
self.1.iter()
}
pub fn inner(&self) -> &Regex {
&(self.0).regex
}
}
pub fn csv_extract(a: Datum, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
let bytes = a.unwrap_str().as_bytes();
let mut row = Row::default();
let csv_reader = csv::ReaderBuilder::new()
.has_headers(false)
.from_reader(bytes);
csv_reader.into_records().filter_map(move |res| match res {
Ok(sr) if sr.len() == n_cols => {
row.packer().extend(sr.iter().map(Datum::String));
Some((row.clone(), 1))
}
_ => None,
})
}
pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
let n = a.unwrap_int64();
if n != 0 {
Some((Row::default(), n))
} else {
None
}
}
fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
datums.chunks(width).map(|chunk| (Row::pack(chunk), 1))
}
fn acl_explode<'a>(
acl_items: Datum<'a>,
temp_storage: &'a RowArena,
) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
let acl_items = acl_items.unwrap_array();
let mut res = Vec::new();
for acl_item in acl_items.elements().iter() {
if acl_item.is_null() {
return Err(EvalError::AclArrayNullElement);
}
let acl_item = acl_item.unwrap_acl_item();
for privilege in acl_item.acl_mode.explode() {
let row = [
Datum::UInt32(acl_item.grantor.0),
Datum::UInt32(acl_item.grantee.0),
Datum::String(temp_storage.push_string(privilege.to_string())),
Datum::False,
];
res.push((Row::pack_slice(&row), 1));
}
}
Ok(res.into_iter())
}
fn mz_acl_explode<'a>(
mz_acl_items: Datum<'a>,
temp_storage: &'a RowArena,
) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
let mz_acl_items = mz_acl_items.unwrap_array();
let mut res = Vec::new();
for mz_acl_item in mz_acl_items.elements().iter() {
if mz_acl_item.is_null() {
return Err(EvalError::MzAclArrayNullElement);
}
let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
for privilege in mz_acl_item.acl_mode.explode() {
let row = [
Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
Datum::String(temp_storage.push_string(privilege.to_string())),
Datum::False,
];
res.push((Row::pack_slice(&row), 1));
}
}
Ok(res.into_iter())
}
#[derive(
Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
)]
pub enum TableFunc {
AclExplode,
MzAclExplode,
JsonbEach {
stringify: bool,
},
JsonbObjectKeys,
JsonbArrayElements {
stringify: bool,
},
RegexpExtract(AnalyzedRegex),
CsvExtract(usize),
GenerateSeriesInt32,
GenerateSeriesInt64,
GenerateSeriesTimestamp,
GenerateSeriesTimestampTz,
Repeat,
UnnestArray {
el_typ: ScalarType,
},
UnnestList {
el_typ: ScalarType,
},
UnnestMap {
value_type: ScalarType,
},
Wrap {
types: Vec<ColumnType>,
width: usize,
},
GenerateSubscriptsArray,
TabletizedScalar {
name: String,
relation: RelationType,
},
}
impl RustType<ProtoTableFunc> for TableFunc {
fn into_proto(&self) -> ProtoTableFunc {
use proto_table_func::{Kind, ProtoWrap};
ProtoTableFunc {
kind: Some(match self {
TableFunc::AclExplode => Kind::AclExplode(()),
TableFunc::MzAclExplode => Kind::MzAclExplode(()),
TableFunc::JsonbEach { stringify } => Kind::JsonbEach(*stringify),
TableFunc::JsonbObjectKeys => Kind::JsonbObjectKeys(()),
TableFunc::JsonbArrayElements { stringify } => Kind::JsonbArrayElements(*stringify),
TableFunc::RegexpExtract(x) => Kind::RegexpExtract(x.into_proto()),
TableFunc::CsvExtract(x) => Kind::CsvExtract(x.into_proto()),
TableFunc::GenerateSeriesInt32 => Kind::GenerateSeriesInt32(()),
TableFunc::GenerateSeriesInt64 => Kind::GenerateSeriesInt64(()),
TableFunc::GenerateSeriesTimestamp => Kind::GenerateSeriesTimestamp(()),
TableFunc::GenerateSeriesTimestampTz => Kind::GenerateSeriesTimestampTz(()),
TableFunc::Repeat => Kind::Repeat(()),
TableFunc::UnnestArray { el_typ } => Kind::UnnestArray(el_typ.into_proto()),
TableFunc::UnnestList { el_typ } => Kind::UnnestList(el_typ.into_proto()),
TableFunc::UnnestMap { value_type } => Kind::UnnestMap(value_type.into_proto()),
TableFunc::Wrap { types, width } => Kind::Wrap(ProtoWrap {
types: types.into_proto(),
width: width.into_proto(),
}),
TableFunc::GenerateSubscriptsArray => Kind::GenerateSubscriptsArray(()),
TableFunc::TabletizedScalar { name, relation } => {
Kind::TabletizedScalar(ProtoTabletizedScalar {
name: name.into_proto(),
relation: Some(relation.into_proto()),
})
}
}),
}
}
fn from_proto(proto: ProtoTableFunc) -> Result<Self, TryFromProtoError> {
use proto_table_func::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoTableFunc::Kind"))?;
Ok(match kind {
Kind::AclExplode(()) => TableFunc::AclExplode,
Kind::MzAclExplode(()) => TableFunc::MzAclExplode,
Kind::JsonbEach(stringify) => TableFunc::JsonbEach { stringify },
Kind::JsonbObjectKeys(()) => TableFunc::JsonbObjectKeys,
Kind::JsonbArrayElements(stringify) => TableFunc::JsonbArrayElements { stringify },
Kind::RegexpExtract(x) => TableFunc::RegexpExtract(x.into_rust()?),
Kind::CsvExtract(x) => TableFunc::CsvExtract(x.into_rust()?),
Kind::GenerateSeriesInt32(()) => TableFunc::GenerateSeriesInt32,
Kind::GenerateSeriesInt64(()) => TableFunc::GenerateSeriesInt64,
Kind::GenerateSeriesTimestamp(()) => TableFunc::GenerateSeriesTimestamp,
Kind::GenerateSeriesTimestampTz(()) => TableFunc::GenerateSeriesTimestampTz,
Kind::Repeat(()) => TableFunc::Repeat,
Kind::UnnestArray(x) => TableFunc::UnnestArray {
el_typ: x.into_rust()?,
},
Kind::UnnestList(x) => TableFunc::UnnestList {
el_typ: x.into_rust()?,
},
Kind::UnnestMap(value_type) => TableFunc::UnnestMap {
value_type: value_type.into_rust()?,
},
Kind::Wrap(x) => TableFunc::Wrap {
width: x.width.into_rust()?,
types: x.types.into_rust()?,
},
Kind::GenerateSubscriptsArray(()) => TableFunc::GenerateSubscriptsArray,
Kind::TabletizedScalar(v) => TableFunc::TabletizedScalar {
name: v.name,
relation: v
.relation
.into_rust_if_some("ProtoTabletizedScalar::relation")?,
},
})
}
}
impl TableFunc {
pub fn eval<'a>(
&'a self,
datums: &'a [Datum<'a>],
temp_storage: &'a RowArena,
) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
return Ok(Box::new(vec![].into_iter()));
}
match self {
TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
TableFunc::JsonbEach { stringify } => {
Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
}
TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
datums[0],
temp_storage,
*stringify,
))),
TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
TableFunc::GenerateSeriesInt32 => {
let res = generate_series(
datums[0].unwrap_int32(),
datums[1].unwrap_int32(),
datums[2].unwrap_int32(),
)?;
Ok(Box::new(res))
}
TableFunc::GenerateSeriesInt64 => {
let res = generate_series(
datums[0].unwrap_int64(),
datums[1].unwrap_int64(),
datums[2].unwrap_int64(),
)?;
Ok(Box::new(res))
}
TableFunc::GenerateSeriesTimestamp => {
fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
Datum::from(d)
}
let res = generate_series_ts(
datums[0].unwrap_timestamp(),
datums[1].unwrap_timestamp(),
datums[2].unwrap_interval(),
pass_through,
)?;
Ok(Box::new(res))
}
TableFunc::GenerateSeriesTimestampTz => {
fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
Datum::from(d)
}
let res = generate_series_ts(
datums[0].unwrap_timestamptz(),
datums[1].unwrap_timestamptz(),
datums[2].unwrap_interval(),
gen_ts_tz,
)?;
Ok(Box::new(res))
}
TableFunc::GenerateSubscriptsArray => {
generate_subscripts_array(datums[0], datums[1].unwrap_int32())
}
TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
TableFunc::TabletizedScalar { .. } => {
let r = Row::pack_slice(datums);
Ok(Box::new(std::iter::once((r, 1))))
}
}
}
pub fn output_type(&self) -> RelationType {
let (column_types, keys) = match self {
TableFunc::AclExplode => {
let column_types = vec![
ScalarType::Oid.nullable(false),
ScalarType::Oid.nullable(false),
ScalarType::String.nullable(false),
ScalarType::Bool.nullable(false),
];
let keys = vec![];
(column_types, keys)
}
TableFunc::MzAclExplode => {
let column_types = vec![
ScalarType::String.nullable(false),
ScalarType::String.nullable(false),
ScalarType::String.nullable(false),
ScalarType::Bool.nullable(false),
];
let keys = vec![];
(column_types, keys)
}
TableFunc::JsonbEach { stringify: true } => {
let column_types = vec![
ScalarType::String.nullable(false),
ScalarType::String.nullable(true),
];
let keys = vec![];
(column_types, keys)
}
TableFunc::JsonbEach { stringify: false } => {
let column_types = vec![
ScalarType::String.nullable(false),
ScalarType::Jsonb.nullable(false),
];
let keys = vec![];
(column_types, keys)
}
TableFunc::JsonbObjectKeys => {
let column_types = vec![ScalarType::String.nullable(false)];
let keys = vec![];
(column_types, keys)
}
TableFunc::JsonbArrayElements { stringify: true } => {
let column_types = vec![ScalarType::String.nullable(true)];
let keys = vec![];
(column_types, keys)
}
TableFunc::JsonbArrayElements { stringify: false } => {
let column_types = vec![ScalarType::Jsonb.nullable(false)];
let keys = vec![];
(column_types, keys)
}
TableFunc::RegexpExtract(a) => {
let column_types = a
.capture_groups_iter()
.map(|cg| ScalarType::String.nullable(cg.nullable))
.collect();
let keys = vec![];
(column_types, keys)
}
TableFunc::CsvExtract(n_cols) => {
let column_types = iter::repeat(ScalarType::String.nullable(false))
.take(*n_cols)
.collect();
let keys = vec![];
(column_types, keys)
}
TableFunc::GenerateSeriesInt32 => {
let column_types = vec![ScalarType::Int32.nullable(false)];
let keys = vec![vec![0]];
(column_types, keys)
}
TableFunc::GenerateSeriesInt64 => {
let column_types = vec![ScalarType::Int64.nullable(false)];
let keys = vec![vec![0]];
(column_types, keys)
}
TableFunc::GenerateSeriesTimestamp => {
let column_types = vec![ScalarType::Timestamp { precision: None }.nullable(false)];
let keys = vec![vec![0]];
(column_types, keys)
}
TableFunc::GenerateSeriesTimestampTz => {
let column_types =
vec![ScalarType::TimestampTz { precision: None }.nullable(false)];
let keys = vec![vec![0]];
(column_types, keys)
}
TableFunc::GenerateSubscriptsArray => {
let column_types = vec![ScalarType::Int32.nullable(false)];
let keys = vec![vec![0]];
(column_types, keys)
}
TableFunc::Repeat => {
let column_types = vec![];
let keys = vec![];
(column_types, keys)
}
TableFunc::UnnestArray { el_typ } => {
let column_types = vec![el_typ.clone().nullable(true)];
let keys = vec![];
(column_types, keys)
}
TableFunc::UnnestList { el_typ } => {
let column_types = vec![el_typ.clone().nullable(true)];
let keys = vec![];
(column_types, keys)
}
TableFunc::UnnestMap { value_type } => {
let column_types = vec![
ScalarType::String.nullable(false),
value_type.clone().nullable(true),
];
let keys = vec![vec![0]];
(column_types, keys)
}
TableFunc::Wrap { types, .. } => {
let column_types = types.clone();
let keys = vec![];
(column_types, keys)
}
TableFunc::TabletizedScalar { relation, .. } => {
return relation.clone();
}
};
if !keys.is_empty() {
RelationType::new(column_types).with_keys(keys)
} else {
RelationType::new(column_types)
}
}
pub fn output_arity(&self) -> usize {
match self {
TableFunc::AclExplode => 4,
TableFunc::MzAclExplode => 4,
TableFunc::JsonbEach { .. } => 2,
TableFunc::JsonbObjectKeys => 1,
TableFunc::JsonbArrayElements { .. } => 1,
TableFunc::RegexpExtract(a) => a.capture_groups_len(),
TableFunc::CsvExtract(n_cols) => *n_cols,
TableFunc::GenerateSeriesInt32 => 1,
TableFunc::GenerateSeriesInt64 => 1,
TableFunc::GenerateSeriesTimestamp => 1,
TableFunc::GenerateSeriesTimestampTz => 1,
TableFunc::GenerateSubscriptsArray => 1,
TableFunc::Repeat => 0,
TableFunc::UnnestArray { .. } => 1,
TableFunc::UnnestList { .. } => 1,
TableFunc::UnnestMap { .. } => 2,
TableFunc::Wrap { width, .. } => *width,
TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
}
}
pub fn empty_on_null_input(&self) -> bool {
match self {
TableFunc::AclExplode
| TableFunc::MzAclExplode
| TableFunc::JsonbEach { .. }
| TableFunc::JsonbObjectKeys
| TableFunc::JsonbArrayElements { .. }
| TableFunc::GenerateSeriesInt32
| TableFunc::GenerateSeriesInt64
| TableFunc::GenerateSeriesTimestamp
| TableFunc::GenerateSeriesTimestampTz
| TableFunc::GenerateSubscriptsArray
| TableFunc::RegexpExtract(_)
| TableFunc::CsvExtract(_)
| TableFunc::Repeat
| TableFunc::UnnestArray { .. }
| TableFunc::UnnestList { .. }
| TableFunc::UnnestMap { .. } => true,
TableFunc::Wrap { .. } => false,
TableFunc::TabletizedScalar { .. } => false,
}
}
pub fn preserves_monotonicity(&self) -> bool {
match self {
TableFunc::AclExplode => false,
TableFunc::MzAclExplode => false,
TableFunc::JsonbEach { .. } => true,
TableFunc::JsonbObjectKeys => true,
TableFunc::JsonbArrayElements { .. } => true,
TableFunc::RegexpExtract(_) => true,
TableFunc::CsvExtract(_) => true,
TableFunc::GenerateSeriesInt32 => true,
TableFunc::GenerateSeriesInt64 => true,
TableFunc::GenerateSeriesTimestamp => true,
TableFunc::GenerateSeriesTimestampTz => true,
TableFunc::GenerateSubscriptsArray => true,
TableFunc::Repeat => false,
TableFunc::UnnestArray { .. } => true,
TableFunc::UnnestList { .. } => true,
TableFunc::UnnestMap { .. } => true,
TableFunc::Wrap { .. } => true,
TableFunc::TabletizedScalar { .. } => true,
}
}
}
impl fmt::Display for TableFunc {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TableFunc::AclExplode => f.write_str("aclexplode"),
TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
TableFunc::Repeat => f.write_str("repeat_row"),
TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
}
}
}
#[cfg(test)]
mod tests {
use super::{AggregateFunc, ProtoAggregateFunc, ProtoTableFunc, TableFunc};
use mz_ore::assert_ok;
use mz_proto::protobuf_roundtrip;
use proptest::prelude::*;
proptest! {
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn aggregate_func_protobuf_roundtrip(expect in any::<AggregateFunc>() ) {
let actual = protobuf_roundtrip::<_, ProtoAggregateFunc>(&expect);
assert_ok!(actual);
assert_eq!(actual.unwrap(), expect);
}
}
proptest! {
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn table_func_protobuf_roundtrip(expect in any::<TableFunc>() ) {
let actual = protobuf_roundtrip::<_, ProtoTableFunc>(&expect);
assert_ok!(actual);
assert_eq!(actual.unwrap(), expect);
}
}
}