use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::convert::{TryFrom, TryInto};
use std::num::NonZeroU64;
use std::{iter, mem};
use itertools::Itertools;
use mz_expr::virtual_syntax::AlgExcept;
use mz_expr::{func as expr_func, Id, LetRecLimit, LocalId, MirScalarExpr, RowSetFinishing};
use mz_ore::assert_none;
use mz_ore::collections::CollectionExt;
use mz_ore::option::FallibleMapExt;
use mz_ore::stack::{CheckedRecursion, RecursionGuard};
use mz_ore::str::StrExt;
use mz_repr::adt::char::CharLength;
use mz_repr::adt::numeric::{NumericMaxScale, NUMERIC_DATUM_MAX_PRECISION};
use mz_repr::adt::timestamp::TimestampPrecision;
use mz_repr::adt::varchar::VarCharMaxLength;
use mz_repr::{
strconv, CatalogItemId, ColumnName, ColumnType, Datum, RelationDesc, RelationType,
RelationVersionSelector, Row, RowArena, ScalarType,
};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::visit::Visit;
use mz_sql_parser::ast::visit_mut::{self, VisitMut};
use mz_sql_parser::ast::{
visit, AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
WindowFrameBound, WindowFrameUnits, WindowSpec,
};
use mz_sql_parser::ident;
use uuid::Uuid;
use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
use crate::func::{self, Func, FuncSpec};
use crate::names::{
Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
};
use crate::normalize;
use crate::plan::error::PlanError;
use crate::plan::expr::{
AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
};
use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
use crate::plan::statement::{show, StatementContext, StatementDesc};
use crate::plan::typeconv::{self, CastContext};
use crate::plan::PlanError::InvalidWmrRecursionLimit;
use crate::plan::{
literal, transform_ast, Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation,
WebhookValidationSecret,
};
use crate::session::vars::{self, FeatureFlag};
#[derive(Debug)]
pub struct PlannedRootQuery<E> {
pub expr: E,
pub desc: RelationDesc,
pub finishing: RowSetFinishing<HirScalarExpr>,
pub scope: Scope,
}
#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
pub fn plan_root_query(
scx: &StatementContext,
mut query: Query<Aug>,
lifetime: QueryLifetime,
) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
transform_ast::transform(scx, &mut query)?;
let mut qcx = QueryContext::root(scx, lifetime);
let PlannedQuery {
mut expr,
scope,
order_by,
limit,
offset,
project,
group_size_hints,
} = plan_query(&mut qcx, &query)?;
let mut finishing = RowSetFinishing {
limit,
offset,
project,
order_by,
};
try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
if lifetime.is_maintained() {
expr.finish_maintained(&mut finishing, group_size_hints);
}
let typ = qcx.relation_type(&expr);
let typ = RelationType::new(
finishing
.project
.iter()
.map(|i| typ.column_types[*i].clone())
.collect(),
);
let desc = RelationDesc::new(typ, scope.column_names());
Ok(PlannedRootQuery {
expr,
desc,
finishing,
scope,
})
}
#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
pub fn plan_ct_query(
qcx: &mut QueryContext,
mut query: Query<Aug>,
) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
transform_ast::transform(qcx.scx, &mut query)?;
let PlannedQuery {
mut expr,
scope,
order_by,
limit,
offset,
project,
group_size_hints,
} = plan_query(qcx, &query)?;
let mut finishing = RowSetFinishing {
limit,
offset,
project,
order_by,
};
try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
expr.finish_maintained(&mut finishing, group_size_hints);
let typ = qcx.relation_type(&expr);
let typ = RelationType::new(
finishing
.project
.iter()
.map(|i| typ.column_types[*i].clone())
.collect(),
);
let desc = RelationDesc::new(typ, scope.column_names());
Ok(PlannedRootQuery {
expr,
desc,
finishing,
scope,
})
}
fn try_push_projection_order_by(
expr: &mut HirRelationExpr,
project: &mut Vec<usize>,
order_by: &mut Vec<ColumnOrder>,
) -> bool {
let mut unproject = vec![None; expr.arity()];
for (out_i, in_i) in project.iter().copied().enumerate() {
unproject[in_i] = Some(out_i);
}
if order_by
.iter()
.all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
{
let trivial_project = (0..project.len()).collect();
*expr = expr.take().project(mem::replace(project, trivial_project));
for ob in order_by {
ob.column = unproject[ob.column].unwrap();
}
true
} else {
false
}
}
pub fn plan_insert_query(
scx: &StatementContext,
table_name: ResolvedItemName,
columns: Vec<Ident>,
source: InsertSource<Aug>,
returning: Vec<SelectItem<Aug>>,
) -> Result<
(
CatalogItemId,
HirRelationExpr,
PlannedRootQuery<Vec<HirScalarExpr>>,
),
PlanError,
> {
let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
let table = scx.get_item_by_resolved_name(&table_name)?;
if table.item_type() != CatalogItemType::Table {
sql_bail!(
"cannot insert into {} '{}'",
table.item_type(),
table_name.full_name_str()
);
}
let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
let mut defaults = table
.writable_table_details()
.ok_or_else(|| {
sql_err!(
"cannot insert into non-writeable table '{}'",
table_name.full_name_str()
)
})?
.to_vec();
for default in &mut defaults {
transform_ast::transform(scx, default)?;
}
if table.id().is_system() {
sql_bail!(
"cannot insert into system table '{}'",
table_name.full_name_str()
);
}
let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
let mut source_types = Vec::with_capacity(columns.len());
let mut ordering = Vec::with_capacity(columns.len());
if columns.is_empty() {
source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
ordering.extend(0..desc.arity());
} else {
let column_by_name: BTreeMap<&ColumnName, (usize, &ColumnType)> = desc
.iter()
.enumerate()
.map(|(idx, (name, typ))| (name, (idx, typ)))
.collect();
for c in &columns {
if let Some((idx, typ)) = column_by_name.get(c) {
ordering.push(*idx);
source_types.push(&typ.scalar_type);
} else {
sql_bail!(
"column {} of relation {} does not exist",
c.as_str().quoted(),
table_name.full_name_str().quoted()
);
}
}
if let Some(dup) = columns.iter().duplicates().next() {
sql_bail!("column {} specified more than once", dup.as_str().quoted());
}
};
let expr = match source {
InsertSource::Query(mut query) => {
transform_ast::transform(scx, &mut query)?;
match query {
Query {
body: SetExpr::Values(Values(values)),
ctes,
order_by,
limit: None,
offset: None,
} if ctes.is_empty() && order_by.is_empty() => {
let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
plan_values_insert(&qcx, &names, &source_types, &values)?
}
_ => {
let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
expr
}
}
}
InsertSource::DefaultValues => {
HirRelationExpr::constant(vec![vec![]], RelationType::empty())
}
};
let expr_arity = expr.arity();
let max_columns = if columns.is_empty() {
desc.arity()
} else {
columns.len()
};
if expr_arity > max_columns {
sql_bail!("INSERT has more expressions than target columns");
}
if expr_arity < columns.len() {
sql_bail!("INSERT has more target columns than expressions");
}
source_types.truncate(expr_arity);
ordering.truncate(expr_arity);
let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
sql_err!(
"column {} is of type {} but expression is of type {}",
desc.get_name(ordering[e.column]).as_str().quoted(),
qcx.humanize_scalar_type(&e.target_type),
qcx.humanize_scalar_type(&e.source_type),
)
})?;
let mut map_exprs = vec![];
let mut project_key = Vec::with_capacity(desc.arity());
let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
let column_details = desc.iter_types().zip_eq(defaults).enumerate();
for (col_idx, (col_typ, default)) in column_details {
if let Some(src_idx) = col_to_source.get(&col_idx) {
project_key.push(*src_idx);
} else {
let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
project_key.push(expr_arity + map_exprs.len());
map_exprs.push(hir);
}
}
let returning = {
let (scope, typ) = if let ResolvedItemName::Item {
full_name,
version: _,
..
} = table_name
{
let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
let typ = desc.typ().clone();
(scope, typ)
} else {
(Scope::empty(), RelationType::empty())
};
let ecx = &ExprContext {
qcx: &qcx,
name: "RETURNING clause",
scope: &scope,
relation_type: &typ,
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let table_func_names = BTreeMap::new();
let mut output_columns = vec![];
let mut new_exprs = vec![];
let mut new_type = RelationType::empty();
for mut si in returning {
transform_ast::transform(scx, &mut si)?;
for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
let expr = match &select_item {
ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
};
output_columns.push(column_name);
let typ = ecx.column_type(&expr);
new_type.column_types.push(typ);
new_exprs.push(expr);
}
}
let desc = RelationDesc::new(new_type, output_columns);
let desc_arity = desc.arity();
PlannedRootQuery {
expr: new_exprs,
desc,
finishing: RowSetFinishing {
order_by: vec![],
limit: None,
offset: 0,
project: (0..desc_arity).collect(),
},
scope,
}
};
Ok((
table.id(),
expr.map(map_exprs).project(project_key),
returning,
))
}
pub fn plan_copy_item(
scx: &StatementContext,
item_name: ResolvedItemName,
columns: Vec<Ident>,
) -> Result<(CatalogItemId, RelationDesc, Vec<usize>), PlanError> {
let item = scx.get_item_by_resolved_name(&item_name)?;
let mut desc = item
.desc(&scx.catalog.resolve_full_name(item.name()))?
.into_owned();
let mut ordering = Vec::with_capacity(columns.len());
if columns.is_empty() {
ordering.extend(0..desc.arity());
} else {
let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
let column_by_name: BTreeMap<&ColumnName, (usize, &ColumnType)> = desc
.iter()
.enumerate()
.map(|(idx, (name, typ))| (name, (idx, typ)))
.collect();
let mut names = Vec::with_capacity(columns.len());
let mut source_types = Vec::with_capacity(columns.len());
for c in &columns {
if let Some((idx, typ)) = column_by_name.get(c) {
ordering.push(*idx);
source_types.push((*typ).clone());
names.push(c.clone());
} else {
sql_bail!(
"column {} of relation {} does not exist",
c.as_str().quoted(),
item_name.full_name_str().quoted()
);
}
}
if let Some(dup) = columns.iter().duplicates().next() {
sql_bail!("column {} specified more than once", dup.as_str().quoted());
}
desc = RelationDesc::new(RelationType::new(source_types), names);
};
Ok((item.id(), desc, ordering))
}
pub fn plan_copy_from(
scx: &StatementContext,
table_name: ResolvedItemName,
columns: Vec<Ident>,
) -> Result<(CatalogItemId, RelationDesc, Vec<usize>), PlanError> {
let table = scx.get_item_by_resolved_name(&table_name)?;
if table.item_type() != CatalogItemType::Table {
sql_bail!(
"cannot insert into {} '{}'",
table.item_type(),
table_name.full_name_str()
);
}
let _ = table.writable_table_details().ok_or_else(|| {
sql_err!(
"cannot insert into non-writeable table '{}'",
table_name.full_name_str()
)
})?;
if table.id().is_system() {
sql_bail!(
"cannot insert into system table '{}'",
table_name.full_name_str()
);
}
let (id, desc, ordering) = plan_copy_item(scx, table_name, columns)?;
Ok((id, desc, ordering))
}
pub fn plan_copy_from_rows(
pcx: &PlanContext,
catalog: &dyn SessionCatalog,
id: CatalogItemId,
columns: Vec<usize>,
rows: Vec<mz_repr::Row>,
) -> Result<HirRelationExpr, PlanError> {
let scx = StatementContext::new(Some(pcx), catalog);
let table = catalog
.get_item(&id)
.at_version(RelationVersionSelector::Latest);
let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
let mut defaults = table
.writable_table_details()
.ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
.to_vec();
for default in &mut defaults {
transform_ast::transform(&scx, default)?;
}
let column_types = columns
.iter()
.map(|x| desc.typ().column_types[*x].clone())
.map(|mut x| {
x.nullable = true;
x
})
.collect();
let typ = RelationType::new(column_types);
let expr = HirRelationExpr::Constant {
rows,
typ: typ.clone(),
};
let default: Vec<_> = (0..desc.arity()).collect();
if columns == default {
return Ok(expr);
}
let mut map_exprs = vec![];
let mut project_key = Vec::with_capacity(desc.arity());
let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
let column_details = desc.iter_types().zip_eq(defaults).enumerate();
for (col_idx, (col_typ, default)) in column_details {
if let Some(src_idx) = col_to_source.get(&col_idx) {
project_key.push(*src_idx);
} else {
let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
project_key.push(typ.arity() + map_exprs.len());
map_exprs.push(hir);
}
}
Ok(expr.map(map_exprs).project(project_key))
}
pub struct ReadThenWritePlan {
pub id: CatalogItemId,
pub selection: HirRelationExpr,
pub assignments: BTreeMap<usize, HirScalarExpr>,
pub finishing: RowSetFinishing,
}
pub fn plan_delete_query(
scx: &StatementContext,
mut delete_stmt: DeleteStatement<Aug>,
) -> Result<ReadThenWritePlan, PlanError> {
transform_ast::transform(scx, &mut delete_stmt)?;
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
plan_mutation_query_inner(
qcx,
delete_stmt.table_name,
delete_stmt.alias,
delete_stmt.using,
vec![],
delete_stmt.selection,
)
}
pub fn plan_update_query(
scx: &StatementContext,
mut update_stmt: UpdateStatement<Aug>,
) -> Result<ReadThenWritePlan, PlanError> {
transform_ast::transform(scx, &mut update_stmt)?;
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
plan_mutation_query_inner(
qcx,
update_stmt.table_name,
update_stmt.alias,
vec![],
update_stmt.assignments,
update_stmt.selection,
)
}
pub fn plan_mutation_query_inner(
qcx: QueryContext,
table_name: ResolvedItemName,
alias: Option<TableAlias>,
using: Vec<TableWithJoins<Aug>>,
assignments: Vec<Assignment<Aug>>,
selection: Option<Expr<Aug>>,
) -> Result<ReadThenWritePlan, PlanError> {
let (id, version) = match table_name {
ResolvedItemName::Item { id, version, .. } => (id, version),
_ => sql_bail!("cannot mutate non-user table"),
};
let item = qcx.scx.get_item(&id).at_version(version);
if item.item_type() != CatalogItemType::Table {
sql_bail!(
"cannot mutate {} '{}'",
item.item_type(),
table_name.full_name_str()
);
}
let _ = item.writable_table_details().ok_or_else(|| {
sql_err!(
"cannot mutate non-writeable table '{}'",
table_name.full_name_str()
)
})?;
if id.is_system() {
sql_bail!(
"cannot mutate system table '{}'",
table_name.full_name_str()
);
}
let (mut get, scope) = qcx.resolve_table_name(table_name)?;
let scope = plan_table_alias(scope, alias.as_ref())?;
let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
let relation_type = qcx.relation_type(&get);
if using.is_empty() {
if let Some(expr) = selection {
let ecx = &ExprContext {
qcx: &qcx,
name: "WHERE clause",
scope: &scope,
relation_type: &relation_type,
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let expr = plan_expr(ecx, &expr)?.type_as(ecx, &ScalarType::Bool)?;
get = get.filter(vec![expr]);
}
} else {
get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
}
let mut sets = BTreeMap::new();
for Assignment { id, value } in assignments {
let name = normalize::column_name(id);
match desc.get_by_name(&name) {
Some((idx, typ)) => {
let ecx = &ExprContext {
qcx: &qcx,
name: "SET clause",
scope: &scope,
relation_type: &relation_type,
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: true,
allow_windows: false,
};
let expr = plan_expr(ecx, &value)?.cast_to(
ecx,
CastContext::Assignment,
&typ.scalar_type,
)?;
if sets.insert(idx, expr).is_some() {
sql_bail!("column {} set twice", name)
}
}
None => sql_bail!("unknown column {}", name),
};
}
let finishing = RowSetFinishing {
order_by: vec![],
limit: None,
offset: 0,
project: (0..desc.arity()).collect(),
};
Ok(ReadThenWritePlan {
id,
selection: get,
finishing,
assignments: sets,
})
}
fn handle_mutation_using_clause(
qcx: &QueryContext,
selection: Option<Expr<Aug>>,
using: Vec<TableWithJoins<Aug>>,
get: HirRelationExpr,
outer_scope: Scope,
) -> Result<HirRelationExpr, PlanError> {
let (mut using_rel_expr, using_scope) =
using.into_iter().try_fold(plan_join_identity(), |l, twj| {
let (left, left_scope) = l;
plan_join(
qcx,
left,
left_scope,
&Join {
relation: TableFactor::NestedJoin {
join: Box::new(twj),
alias: None,
},
join_operator: JoinOperator::CrossJoin,
},
)
})?;
if let Some(expr) = selection {
let on = HirScalarExpr::literal_true();
let joined = using_rel_expr
.clone()
.join(get.clone(), on, JoinKind::Inner);
let joined_scope = using_scope.product(outer_scope)?;
let joined_relation_type = qcx.relation_type(&joined);
let ecx = &ExprContext {
qcx,
name: "WHERE clause",
scope: &joined_scope,
relation_type: &joined_relation_type,
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &ScalarType::Bool)?;
let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
#[allow(deprecated)]
expr.visit_mut(&mut |e| {
if let HirScalarExpr::Column(c) = e {
if c.column >= using_rel_arity {
c.level += 1;
c.column -= using_rel_arity;
};
}
});
using_rel_expr = using_rel_expr.filter(vec![expr]);
} else {
let _joined_scope = using_scope.product(outer_scope)?;
}
Ok(get.filter(vec![using_rel_expr.exists()]))
}
#[derive(Debug)]
pub(crate) struct CastRelationError {
pub(crate) column: usize,
pub(crate) source_type: ScalarType,
pub(crate) target_type: ScalarType,
}
pub(crate) fn cast_relation<'a, I>(
qcx: &QueryContext,
ccx: CastContext,
expr: HirRelationExpr,
target_types: I,
) -> Result<HirRelationExpr, CastRelationError>
where
I: IntoIterator<Item = &'a ScalarType>,
{
let ecx = &ExprContext {
qcx,
name: "values",
scope: &Scope::empty(),
relation_type: &qcx.relation_type(&expr),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let mut map_exprs = vec![];
let mut project_key = vec![];
for (i, target_typ) in target_types.into_iter().enumerate() {
let expr = HirScalarExpr::column(i);
match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
Ok(cast_expr) => {
if expr == cast_expr {
project_key.push(i);
} else {
project_key.push(ecx.relation_type.arity() + map_exprs.len());
map_exprs.push(cast_expr);
}
}
Err(_) => {
return Err(CastRelationError {
column: i,
source_type: ecx.scalar_type(&expr),
target_type: target_typ.clone(),
});
}
}
}
Ok(expr.map(map_exprs).project(project_key))
}
pub fn plan_up_to(
scx: &StatementContext,
mut up_to: Expr<Aug>,
) -> Result<MirScalarExpr, PlanError> {
let scope = Scope::empty();
let desc = RelationDesc::empty();
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
transform_ast::transform(scx, &mut up_to)?;
let ecx = &ExprContext {
qcx: &qcx,
name: "UP TO",
scope: &scope,
relation_type: desc.typ(),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
plan_expr(ecx, &up_to)?
.type_as_any(ecx)?
.lower_uncorrelated()
}
pub fn plan_as_of(
scx: &StatementContext,
as_of: Option<AsOf<Aug>>,
) -> Result<QueryWhen, PlanError> {
match as_of {
None => Ok(QueryWhen::Immediately),
Some(mut as_of) => match as_of {
AsOf::At(ref mut expr) | AsOf::AtLeast(ref mut expr) => {
let scope = Scope::empty();
let desc = RelationDesc::empty();
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
transform_ast::transform(scx, expr)?;
let ecx = &ExprContext {
qcx: &qcx,
name: "AS OF",
scope: &scope,
relation_type: desc.typ(),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let expr = plan_expr(ecx, expr)?
.type_as_any(ecx)?
.lower_uncorrelated()?;
match as_of {
AsOf::At(_) => Ok(QueryWhen::AtTimestamp(expr)),
AsOf::AtLeast(_) => Ok(QueryWhen::AtLeastTimestamp(expr)),
}
}
},
}
}
pub fn plan_secret_as(
scx: &StatementContext,
mut expr: Expr<Aug>,
) -> Result<MirScalarExpr, PlanError> {
let scope = Scope::empty();
let desc = RelationDesc::empty();
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
transform_ast::transform(scx, &mut expr)?;
let ecx = &ExprContext {
qcx: &qcx,
name: "AS",
scope: &scope,
relation_type: desc.typ(),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let expr = plan_expr(ecx, &expr)?
.type_as(ecx, &ScalarType::Bytes)?
.lower_uncorrelated()?;
Ok(expr)
}
pub fn plan_webhook_validate_using(
scx: &StatementContext,
validate_using: CreateWebhookSourceCheck<Aug>,
) -> Result<WebhookValidation, PlanError> {
let qcx = QueryContext::root(scx, QueryLifetime::Source);
let CreateWebhookSourceCheck {
options,
using: mut expr,
} = validate_using;
let mut column_typs = vec![];
let mut column_names = vec![];
let (bodies, headers, secrets) = options
.map(|o| (o.bodies, o.headers, o.secrets))
.unwrap_or_default();
let mut body_tuples = vec![];
for CreateWebhookSourceBody { alias, use_bytes } in bodies {
let scalar_type = use_bytes
.then_some(ScalarType::Bytes)
.unwrap_or(ScalarType::String);
let name = alias
.map(|a| a.into_string())
.unwrap_or_else(|| "body".to_string());
column_typs.push(ColumnType {
scalar_type,
nullable: false,
});
column_names.push(name);
let column_idx = column_typs.len() - 1;
assert_eq!(
column_idx,
column_names.len() - 1,
"body column names and types don't match"
);
body_tuples.push((column_idx, use_bytes));
}
let mut header_tuples = vec![];
for CreateWebhookSourceHeader { alias, use_bytes } in headers {
let value_type = use_bytes
.then_some(ScalarType::Bytes)
.unwrap_or(ScalarType::String);
let name = alias
.map(|a| a.into_string())
.unwrap_or_else(|| "headers".to_string());
column_typs.push(ColumnType {
scalar_type: ScalarType::Map {
value_type: Box::new(value_type),
custom_id: None,
},
nullable: false,
});
column_names.push(name);
let column_idx = column_typs.len() - 1;
assert_eq!(
column_idx,
column_names.len() - 1,
"header column names and types don't match"
);
header_tuples.push((column_idx, use_bytes));
}
let mut validation_secrets = vec![];
for CreateWebhookSourceSecret {
secret,
alias,
use_bytes,
} in secrets
{
let scalar_type = use_bytes
.then_some(ScalarType::Bytes)
.unwrap_or(ScalarType::String);
column_typs.push(ColumnType {
scalar_type,
nullable: false,
});
let ResolvedItemName::Item {
id,
full_name: FullItemName { item, .. },
..
} = secret
else {
return Err(PlanError::InvalidSecret(Box::new(secret)));
};
let name = if let Some(alias) = alias {
alias.into_string()
} else {
item
};
column_names.push(name);
let column_idx = column_typs.len() - 1;
assert_eq!(
column_idx,
column_names.len() - 1,
"column names and types don't match"
);
validation_secrets.push(WebhookValidationSecret {
id,
column_idx,
use_bytes,
});
}
let relation_typ = RelationType::new(column_typs);
let desc = RelationDesc::new(relation_typ, column_names.clone());
let scope = Scope::from_source(None, column_names);
transform_ast::transform(scx, &mut expr)?;
let ecx = &ExprContext {
qcx: &qcx,
name: "CHECK",
scope: &scope,
relation_type: desc.typ(),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let expr = plan_expr(ecx, &expr)?
.type_as(ecx, &ScalarType::Bool)?
.lower_uncorrelated()?;
let validation = WebhookValidation {
expression: expr,
relation_desc: desc,
bodies: body_tuples,
headers: header_tuples,
secrets: validation_secrets,
};
Ok(validation)
}
pub fn plan_default_expr(
scx: &StatementContext,
expr: &Expr<Aug>,
target_ty: &ScalarType,
) -> Result<HirScalarExpr, PlanError> {
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
let ecx = &ExprContext {
qcx: &qcx,
name: "DEFAULT expression",
scope: &Scope::empty(),
relation_type: &RelationType::empty(),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
Ok(hir)
}
pub fn plan_params<'a>(
scx: &'a StatementContext,
params: Vec<Expr<Aug>>,
desc: &StatementDesc,
) -> Result<Params, PlanError> {
if params.len() != desc.param_types.len() {
sql_bail!(
"expected {} params, got {}",
desc.param_types.len(),
params.len()
);
}
let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
let scope = Scope::empty();
let rel_type = RelationType::empty();
let mut datums = Row::default();
let mut packer = datums.packer();
let mut types = Vec::new();
let temp_storage = &RowArena::new();
for (mut expr, ty) in params.into_iter().zip(&desc.param_types) {
transform_ast::transform(scx, &mut expr)?;
let ecx = &ExprContext {
qcx: &qcx,
name: "EXECUTE",
scope: &scope,
relation_type: &rel_type,
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let ex = plan_expr(ecx, &expr)?.type_as_any(ecx)?;
let st = ecx.scalar_type(&ex);
if st != *ty {
sql_bail!(
"mismatched parameter type: expected {}, got {}",
ecx.humanize_scalar_type(ty),
ecx.humanize_scalar_type(&st),
);
}
let ex = ex.lower_uncorrelated()?;
let evaled = ex.eval(&[], temp_storage)?;
packer.push(evaled);
types.push(st);
}
Ok(Params { datums, types })
}
pub fn plan_index_exprs<'a>(
scx: &'a StatementContext,
on_desc: &RelationDesc,
exprs: Vec<Expr<Aug>>,
) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
let scope = Scope::from_source(None, on_desc.iter_names());
let qcx = QueryContext::root(scx, QueryLifetime::Index);
let ecx = &ExprContext {
qcx: &qcx,
name: "CREATE INDEX",
scope: &scope,
relation_type: on_desc.typ(),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let mut out = vec![];
for mut expr in exprs {
transform_ast::transform(scx, &mut expr)?;
let expr = plan_expr_or_col_index(ecx, &expr)?;
let mut expr = expr.lower_uncorrelated()?;
expr.reduce(&on_desc.typ().column_types);
out.push(expr);
}
Ok(out)
}
fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
Some(column) => Ok(HirScalarExpr::column(column)),
_ => plan_expr(ecx, e)?.type_as_any(ecx),
}
}
fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
match e {
Expr::Value(Value::Number(n)) => {
let n = n.parse::<usize>().map_err(|e| {
sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
})?;
if n < 1 || n > max {
sql_bail!(
"column reference {} in {} is out of range (1 - {})",
n,
name,
max
);
}
Ok(Some(n - 1))
}
_ => Ok(None),
}
}
struct PlannedQuery {
expr: HirRelationExpr,
scope: Scope,
order_by: Vec<ColumnOrder>,
limit: Option<HirScalarExpr>,
offset: usize,
project: Vec<usize>,
group_size_hints: GroupSizeHints,
}
fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
}
fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
let cte_bindings = plan_ctes(qcx, q)?;
let limit = match &q.limit {
None => None,
Some(Limit {
quantity,
with_ties: false,
}) => {
let ecx = &ExprContext {
qcx,
name: "LIMIT",
scope: &Scope::empty(),
relation_type: &RelationType::empty(),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let limit = plan_expr(ecx, quantity)?;
let limit = limit.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?;
let limit = if limit.is_constant() {
let arena = RowArena::new();
let limit = limit.lower_uncorrelated()?;
match limit.eval(&[], &arena)? {
d @ Datum::Int64(v) if v >= 0 => HirScalarExpr::literal(d, ScalarType::Int64),
d @ Datum::Null => HirScalarExpr::literal(d, ScalarType::Int64),
Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
_ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
}
} else {
qcx.scx
.require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
limit
};
Some(limit)
}
Some(Limit {
quantity: _,
with_ties: true,
}) => bail_unsupported!("FETCH ... WITH TIES"),
};
let offset = match &q.offset {
None => 0,
Some(Expr::Value(Value::Number(x))) => x.parse()?,
_ => sql_bail!("OFFSET must be an integer constant"),
};
let mut planned_query = match &q.body {
SetExpr::Select(s) => {
let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
let plan = plan_view_select(qcx, *s.clone(), q.order_by.clone())?;
PlannedQuery {
expr: plan.expr,
scope: plan.scope,
order_by: plan.order_by,
project: plan.project,
limit,
offset,
group_size_hints,
}
}
_ => {
let (expr, scope) = plan_set_expr(qcx, &q.body)?;
let ecx = &ExprContext {
qcx,
name: "ORDER BY clause of a set expression",
scope: &scope,
relation_type: &qcx.relation_type(&expr),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let output_columns: Vec<_> = scope.column_names().enumerate().collect();
let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
let project = (0..ecx.relation_type.arity()).collect();
PlannedQuery {
expr: expr.map(map_exprs),
scope,
order_by,
limit,
project,
offset,
group_size_hints: GroupSizeHints::default(),
}
}
};
match &q.ctes {
CteBlock::Simple(_) => {
for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
if let Some(cte) = qcx.ctes.remove(&id) {
planned_query.expr = HirRelationExpr::Let {
name: cte.name,
id: id.clone(),
value: Box::new(value),
body: Box::new(planned_query.expr),
};
}
if let Some(shadowed_val) = shadowed_val {
qcx.ctes.insert(id, shadowed_val);
}
}
}
CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
let MutRecBlockOptionExtracted {
recursion_limit,
return_at_recursion_limit,
error_at_recursion_limit,
seen: _,
} = MutRecBlockOptionExtracted::try_from(options.clone())?;
let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
(None, None, None) => None,
(Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
(None, Some(max_iters), None) => Some((max_iters, true)),
(None, None, Some(max_iters)) => Some((max_iters, false)),
_ => {
return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
}
}.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
return_at_limit: *return_at_limit,
}))?;
let mut bindings = Vec::new();
for (id, value, shadowed_val) in cte_bindings.into_iter() {
if let Some(cte) = qcx.ctes.remove(&id) {
bindings.push((cte.name, id, value, cte.desc.typ().clone()));
}
if let Some(shadowed_val) = shadowed_val {
qcx.ctes.insert(id, shadowed_val);
}
}
if !bindings.is_empty() {
planned_query.expr = HirRelationExpr::LetRec {
limit,
bindings,
body: Box::new(planned_query.expr),
}
}
}
}
Ok(planned_query)
}
generate_extracted_config!(
MutRecBlockOption,
(RecursionLimit, u64),
(ReturnAtRecursionLimit, u64),
(ErrorAtRecursionLimit, u64)
);
pub fn plan_ctes(
qcx: &mut QueryContext,
q: &Query<Aug>,
) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
let mut result = Vec::new();
let mut shadowed_descs = BTreeMap::new();
if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
sql_bail!(
"WITH query name {} specified more than once",
normalize::ident_ref(ident).quoted()
)
}
match &q.ctes {
CteBlock::Simple(ctes) => {
for cte in ctes.iter() {
let cte_name = normalize::ident(cte.alias.name.clone());
let (val, scope) = plan_nested_query(qcx, &cte.query)?;
let typ = qcx.relation_type(&val);
let mut desc = RelationDesc::new(typ, scope.column_names());
plan_utils::maybe_rename_columns(
format!("CTE {}", cte.alias.name),
&mut desc,
&cte.alias.columns,
)?;
let shadowed = qcx.ctes.insert(
cte.id,
CteDesc {
name: cte_name,
desc,
},
);
result.push((cte.id, val, shadowed));
}
}
CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
for cte in ctes.iter() {
let cte_name = normalize::ident(cte.name.clone());
let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
for column in cte.columns.iter() {
desc_columns.push((
normalize::column_name(column.name.clone()),
ColumnType {
scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
nullable: true,
},
));
}
let desc = RelationDesc::from_names_and_types(desc_columns);
let shadowed = qcx.ctes.insert(
cte.id,
CteDesc {
name: cte_name,
desc,
},
);
if let Some(shadowed) = shadowed {
shadowed_descs.insert(cte.id, shadowed);
}
}
for cte in ctes.iter() {
let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
let proposed_typ = qcx.ctes[&cte.id].desc.typ();
if proposed_typ.column_types.iter().any(|c| !c.nullable) {
sql_bail!("[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types");
}
if !proposed_typ.keys.is_empty() {
sql_bail!("[internal error]: WMR CTEs do not support keys");
}
let derived_typ = qcx.relation_type(&val);
let type_err = |proposed_typ: &RelationType, derived_typ: RelationType| {
let cte_name = normalize::ident(cte.name.clone());
let proposed_typ = proposed_typ
.column_types
.iter()
.map(|ty| qcx.humanize_scalar_type(&ty.scalar_type))
.collect::<Vec<_>>();
let inferred_typ = derived_typ
.column_types
.iter()
.map(|ty| qcx.humanize_scalar_type(&ty.scalar_type))
.collect::<Vec<_>>();
Err(PlanError::RecursiveTypeMismatch(
cte_name,
proposed_typ,
inferred_typ,
))
};
if derived_typ.column_types.len() != proposed_typ.column_types.len() {
return type_err(proposed_typ, derived_typ);
}
let val = match cast_relation(
qcx,
CastContext::Assignment,
val,
proposed_typ.column_types.iter().map(|c| &c.scalar_type),
) {
Ok(val) => val,
Err(_) => return type_err(proposed_typ, derived_typ),
};
result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
}
}
}
Ok(result)
}
pub fn plan_nested_query(
qcx: &mut QueryContext,
q: &Query<Aug>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
let PlannedQuery {
mut expr,
scope,
order_by,
limit,
offset,
project,
group_size_hints,
} = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
if limit.is_some() || offset > 0 {
expr = HirRelationExpr::top_k(
expr,
vec![],
order_by,
limit,
offset,
group_size_hints.limit_input_group_size,
);
}
Ok((expr.project(project), scope))
}
fn plan_set_expr(
qcx: &mut QueryContext,
q: &SetExpr<Aug>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
match q {
SetExpr::Select(select) => {
let order_by_exprs = Vec::new();
let plan = plan_view_select(qcx, *select.clone(), order_by_exprs)?;
assert!(plan.order_by.is_empty());
Ok((plan.expr.project(plan.project), plan.scope))
}
SetExpr::SetOperation {
op,
all,
left,
right,
} => {
let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
let (right_expr, right_scope) =
qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
let left_type = qcx.relation_type(&left_expr);
let right_type = qcx.relation_type(&right_expr);
if left_type.arity() != right_type.arity() {
sql_bail!(
"each {} query must have the same number of columns: {} vs {}",
op,
left_type.arity(),
right_type.arity(),
);
}
let left_ecx = &ExprContext {
qcx,
name: &op.to_string(),
scope: &left_scope,
relation_type: &left_type,
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let right_ecx = &ExprContext {
qcx,
name: &op.to_string(),
scope: &right_scope,
relation_type: &right_type,
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let mut left_casts = vec![];
let mut right_casts = vec![];
for (i, (left_type, right_type)) in left_type
.column_types
.iter()
.zip(right_type.column_types.iter())
.enumerate()
{
let types = &[
CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
];
let target =
typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
match typeconv::plan_cast(
left_ecx,
CastContext::Implicit,
HirScalarExpr::column(i),
&target,
) {
Ok(expr) => left_casts.push(expr),
Err(_) => sql_bail!(
"{} types {} and {} cannot be matched",
op,
qcx.humanize_scalar_type(&left_type.scalar_type),
qcx.humanize_scalar_type(&target),
),
}
match typeconv::plan_cast(
right_ecx,
CastContext::Implicit,
HirScalarExpr::column(i),
&target,
) {
Ok(expr) => right_casts.push(expr),
Err(_) => sql_bail!(
"{} types {} and {} cannot be matched",
op,
qcx.humanize_scalar_type(&target),
qcx.humanize_scalar_type(&right_type.scalar_type),
),
}
}
let lhs = if left_casts
.iter()
.enumerate()
.any(|(i, e)| e != &HirScalarExpr::column(i))
{
let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
left_expr.map(left_casts).project(project_key)
} else {
left_expr
};
let rhs = if right_casts
.iter()
.enumerate()
.any(|(i, e)| e != &HirScalarExpr::column(i))
{
let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
right_expr.map(right_casts).project(project_key)
} else {
right_expr
};
let relation_expr = match op {
SetOperator::Union => {
if *all {
lhs.union(rhs)
} else {
lhs.union(rhs).distinct()
}
}
SetOperator::Except => Hir::except(all, lhs, rhs),
SetOperator::Intersect => {
let left_clone = lhs.clone();
if *all {
lhs.union(left_clone.union(rhs.negate()).threshold().negate())
} else {
lhs.union(left_clone.union(rhs.negate()).threshold().negate())
.distinct()
}
}
};
let scope = Scope::from_source(
None,
left_scope.column_names(),
);
Ok((relation_expr, scope))
}
SetExpr::Values(Values(values)) => plan_values(qcx, values),
SetExpr::Table(name) => {
let (expr, scope) = qcx.resolve_table_name(name.clone())?;
Ok((expr, scope))
}
SetExpr::Query(query) => {
let (expr, scope) = plan_nested_query(qcx, query)?;
Ok((expr, scope))
}
SetExpr::Show(stmt) => {
if !qcx.lifetime.allow_show() {
return Err(PlanError::ShowCommandInView);
}
fn to_hirscope(
plan: ShowCreatePlan,
desc: StatementDesc,
) -> Result<(HirRelationExpr, Scope), PlanError> {
let rows = vec![plan.row.iter().collect::<Vec<_>>()];
let desc = desc.relation_desc.expect("must exist");
let expr = HirRelationExpr::constant(rows, desc.typ().clone());
let scope = Scope::from_source(None, desc.iter_names());
Ok((expr, scope))
}
match stmt.clone() {
ShowStatement::ShowColumns(stmt) => {
show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
}
ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
show::plan_show_create_connection(qcx.scx, stmt.clone())?,
show::describe_show_create_connection(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
show::describe_show_create_cluster(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
show::plan_show_create_index(qcx.scx, stmt.clone())?,
show::describe_show_create_index(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateSink(stmt) => to_hirscope(
show::plan_show_create_sink(qcx.scx, stmt.clone())?,
show::describe_show_create_sink(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateSource(stmt) => to_hirscope(
show::plan_show_create_source(qcx.scx, stmt.clone())?,
show::describe_show_create_source(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateTable(stmt) => to_hirscope(
show::plan_show_create_table(qcx.scx, stmt.clone())?,
show::describe_show_create_table(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateView(stmt) => to_hirscope(
show::plan_show_create_view(qcx.scx, stmt.clone())?,
show::describe_show_create_view(qcx.scx, stmt)?,
),
ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
show::describe_show_create_materialized_view(qcx.scx, stmt)?,
),
ShowStatement::ShowObjects(stmt) => {
show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
}
ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
}
}
}
}
fn plan_values(
qcx: &QueryContext,
values: &[Vec<Expr<Aug>>],
) -> Result<(HirRelationExpr, Scope), PlanError> {
assert!(!values.is_empty());
let ecx = &ExprContext {
qcx,
name: "VALUES",
scope: &Scope::empty(),
relation_type: &RelationType::empty(),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let ncols = values[0].len();
let nrows = values.len();
let mut cols = vec![vec![]; ncols];
for row in values {
if row.len() != ncols {
sql_bail!(
"VALUES expression has varying number of columns: {} vs {}",
row.len(),
ncols
);
}
for (i, v) in row.iter().enumerate() {
cols[i].push(v);
}
}
let mut col_iters = Vec::with_capacity(ncols);
let mut col_types = Vec::with_capacity(ncols);
for col in &cols {
let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
let mut col_type = ecx.column_type(&col[0]);
for val in &col[1..] {
col_type = col_type.union(&ecx.column_type(val))?;
}
col_types.push(col_type);
col_iters.push(col.into_iter());
}
let mut exprs = vec![];
for _ in 0..nrows {
for i in 0..ncols {
exprs.push(col_iters[i].next().unwrap());
}
}
let out = HirRelationExpr::CallTable {
func: mz_expr::TableFunc::Wrap {
width: ncols,
types: col_types,
},
exprs,
};
let mut scope = Scope::empty();
for i in 0..ncols {
let name = format!("column{}", i + 1);
scope.items.push(ScopeItem::from_column_name(name));
}
Ok((out, scope))
}
fn plan_values_insert(
qcx: &QueryContext,
target_names: &[&ColumnName],
target_types: &[&ScalarType],
values: &[Vec<Expr<Aug>>],
) -> Result<HirRelationExpr, PlanError> {
assert!(!values.is_empty());
if !values.iter().map(|row| row.len()).all_equal() {
sql_bail!("VALUES lists must all be the same length");
}
let ecx = &ExprContext {
qcx,
name: "VALUES",
scope: &Scope::empty(),
relation_type: &RelationType::empty(),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let mut exprs = vec![];
let mut types = vec![];
for row in values {
if row.len() > target_names.len() {
sql_bail!("INSERT has more expressions than target columns");
}
for (column, val) in row.into_iter().enumerate() {
let target_type = &target_types[column];
let val = plan_expr(ecx, val)?;
let val = typeconv::plan_coerce(ecx, val, target_type)?;
let source_type = &ecx.scalar_type(&val);
let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
Ok(val) => val,
Err(_) => sql_bail!(
"column {} is of type {} but expression is of type {}",
target_names[column].as_str().quoted(),
qcx.humanize_scalar_type(target_type),
qcx.humanize_scalar_type(source_type),
),
};
if column >= types.len() {
types.push(ecx.column_type(&val));
} else {
types[column] = types[column].union(&ecx.column_type(&val))?;
}
exprs.push(val);
}
}
Ok(HirRelationExpr::CallTable {
func: mz_expr::TableFunc::Wrap {
width: values[0].len(),
types,
},
exprs,
})
}
fn plan_join_identity() -> (HirRelationExpr, Scope) {
let typ = RelationType::new(vec![]);
let expr = HirRelationExpr::constant(vec![vec![]], typ);
let scope = Scope::empty();
(expr, scope)
}
#[derive(Debug)]
struct SelectPlan {
expr: HirRelationExpr,
scope: Scope,
order_by: Vec<ColumnOrder>,
project: Vec<usize>,
}
generate_extracted_config!(
SelectOption,
(ExpectedGroupSize, u64),
(AggregateInputGroupSize, u64),
(DistinctOnInputGroupSize, u64),
(LimitInputGroupSize, u64)
);
fn plan_view_select(
qcx: &QueryContext,
mut s: Select<Aug>,
mut order_by_exprs: Vec<OrderByExpr<Aug>>,
) -> Result<SelectPlan, PlanError> {
let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
let (mut relation_expr, mut from_scope) =
s.from.iter().try_fold(plan_join_identity(), |l, twj| {
let (left, left_scope) = l;
plan_join(
qcx,
left,
left_scope,
&Join {
relation: TableFactor::NestedJoin {
join: Box::new(twj.clone()),
alias: None,
},
join_operator: JoinOperator::CrossJoin,
},
)
})?;
if let Some(selection) = &s.selection {
let ecx = &ExprContext {
qcx,
name: "WHERE clause",
scope: &from_scope,
relation_type: &qcx.relation_type(&relation_expr),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let expr = plan_expr(ecx, selection)
.map_err(|e| sql_err!("WHERE clause error: {}", e))?
.type_as(ecx, &ScalarType::Bool)?;
relation_expr = relation_expr.filter(vec![expr]);
}
let (aggregates, table_funcs) = {
let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
visitor.visit_select_mut(&mut s);
for o in order_by_exprs.iter_mut() {
visitor.visit_order_by_expr_mut(o);
}
visitor.into_result()?
};
let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
if !table_funcs.is_empty() {
let (expr, scope) = plan_scalar_table_funcs(
qcx,
table_funcs,
&mut table_func_names,
&relation_expr,
&from_scope,
)?;
relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
from_scope = from_scope.product(scope)?;
}
let projection = {
let ecx = &ExprContext {
qcx,
name: "SELECT clause",
scope: &from_scope,
relation_type: &qcx.relation_type(&relation_expr),
allow_aggregates: true,
allow_subqueries: true,
allow_parameters: true,
allow_windows: true,
};
let mut out = vec![];
for si in &s.projection {
if *si == SelectItem::Wildcard && s.from.is_empty() {
sql_bail!("SELECT * with no tables specified is not valid");
}
out.extend(expand_select_item(ecx, si, &table_func_names)?);
}
out
};
let (mut group_scope, select_all_mapping) = {
let ecx = &ExprContext {
qcx,
name: "GROUP BY clause",
scope: &from_scope,
relation_type: &qcx.relation_type(&relation_expr),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let mut group_key = vec![];
let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
let mut group_hir_exprs = vec![];
let mut group_scope = Scope::empty();
let mut select_all_mapping = BTreeMap::new();
for group_expr in &s.group_by {
let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
let new_column = group_key.len();
if let Some(group_expr) = group_expr {
if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
existing_scope_item.exprs.insert(group_expr.clone());
continue;
}
}
let mut scope_item = if let HirScalarExpr::Column(ColumnRef {
level: 0,
column: old_column,
}) = &expr
{
select_all_mapping.insert(*old_column, new_column);
let scope_item = ecx.scope.items[*old_column].clone();
scope_item
} else {
ScopeItem::empty()
};
if let Some(group_expr) = group_expr.cloned() {
scope_item.exprs.insert(group_expr);
}
group_key.push(from_scope.len() + group_exprs.len());
group_hir_exprs.push(expr.clone());
group_exprs.insert(expr, scope_item);
}
assert_eq!(group_hir_exprs.len(), group_exprs.len());
for expr in &group_hir_exprs {
if let Some(scope_item) = group_exprs.remove(expr) {
group_scope.items.push(scope_item);
}
}
let ecx = &ExprContext {
qcx,
name: "aggregate function",
scope: &from_scope,
relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let mut agg_exprs = vec![];
for sql_function in aggregates {
if sql_function.over.is_some() {
unreachable!(
"Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
);
}
agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
group_scope
.items
.push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
}
if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
relation_expr = relation_expr.map(group_hir_exprs).reduce(
group_key,
agg_exprs,
group_size_hints.aggregate_input_group_size,
);
for i in 0..from_scope.len() {
if !select_all_mapping.contains_key(&i) {
let scope_item = &ecx.scope.items[i];
group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
table_name: scope_item.table_name.clone(),
column_name: scope_item.column_name.clone(),
allow_unqualified_references: scope_item.allow_unqualified_references,
});
}
}
(group_scope, select_all_mapping)
} else {
(
from_scope.clone(),
(0..from_scope.len()).map(|i| (i, i)).collect(),
)
}
};
if let Some(ref having) = s.having {
let ecx = &ExprContext {
qcx,
name: "HAVING clause",
scope: &group_scope,
relation_type: &qcx.relation_type(&relation_expr),
allow_aggregates: true,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let expr = plan_expr(ecx, having)?.type_as(ecx, &ScalarType::Bool)?;
relation_expr = relation_expr.filter(vec![expr]);
}
let window_funcs = {
let mut visitor = WindowFuncCollector::default();
visitor.visit_select(&s);
for o in order_by_exprs.iter() {
visitor.visit_order_by_expr(o);
}
visitor.into_result()
};
for window_func in window_funcs {
let ecx = &ExprContext {
qcx,
name: "window function",
scope: &group_scope,
relation_type: &qcx.relation_type(&relation_expr),
allow_aggregates: true,
allow_subqueries: true,
allow_parameters: true,
allow_windows: true,
};
relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
group_scope.items.push(ScopeItem::from_expr(window_func));
}
let output_columns = {
let mut new_exprs = vec![];
let mut new_type = qcx.relation_type(&relation_expr);
let mut output_columns = vec![];
for (select_item, column_name) in &projection {
let ecx = &ExprContext {
qcx,
name: "SELECT clause",
scope: &group_scope,
relation_type: &new_type,
allow_aggregates: true,
allow_subqueries: true,
allow_parameters: true,
allow_windows: true,
};
let expr = match select_item {
ExpandedSelectItem::InputOrdinal(i) => {
if let Some(column) = select_all_mapping.get(i).copied() {
HirScalarExpr::column(column)
} else {
return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
}
}
ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
};
if let HirScalarExpr::Column(ColumnRef { level: 0, column }) = expr {
output_columns.push((column, column_name));
} else {
let typ = ecx.column_type(&expr);
new_type.column_types.push(typ);
new_exprs.push(expr);
output_columns.push((group_scope.len(), column_name));
group_scope
.items
.push(ScopeItem::from_expr(select_item.as_expr().cloned()));
}
}
relation_expr = relation_expr.map(new_exprs);
output_columns
};
let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
let order_by = {
let relation_type = qcx.relation_type(&relation_expr);
let (mut order_by, mut map_exprs) = plan_order_by_exprs(
&ExprContext {
qcx,
name: "ORDER BY clause",
scope: &group_scope,
relation_type: &relation_type,
allow_aggregates: true,
allow_subqueries: true,
allow_parameters: true,
allow_windows: true,
},
&order_by_exprs,
&output_columns,
)?;
match s.distinct {
None => relation_expr = relation_expr.map(map_exprs),
Some(Distinct::EntireRow) => {
if relation_type.arity() == 0 {
sql_bail!("SELECT DISTINCT must have at least one column");
}
if !try_push_projection_order_by(
&mut relation_expr,
&mut project_key,
&mut order_by,
) {
sql_bail!(
"for SELECT DISTINCT, ORDER BY expressions must appear in select list"
);
}
assert!(map_exprs.is_empty());
relation_expr = relation_expr.distinct();
}
Some(Distinct::On(exprs)) => {
let ecx = &ExprContext {
qcx,
name: "DISTINCT ON clause",
scope: &group_scope,
relation_type: &qcx.relation_type(&relation_expr),
allow_aggregates: true,
allow_subqueries: true,
allow_parameters: true,
allow_windows: true,
};
let mut distinct_exprs = vec![];
for expr in &exprs {
let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
distinct_exprs.push(expr);
}
let mut distinct_key = vec![];
let arity = relation_type.arity();
for ord in order_by.iter().take(distinct_exprs.len()) {
let mut expr = &HirScalarExpr::column(ord.column);
if ord.column >= arity {
expr = &map_exprs[ord.column - arity];
};
match distinct_exprs.iter().position(move |e| e == expr) {
None => sql_bail!("SELECT DISTINCT ON expressions must match initial ORDER BY expressions"),
Some(pos) => {
distinct_exprs.remove(pos);
}
}
distinct_key.push(ord.column);
}
for expr in distinct_exprs {
let column = match expr {
HirScalarExpr::Column(ColumnRef { level: 0, column }) => column,
_ => {
map_exprs.push(expr);
arity + map_exprs.len() - 1
}
};
distinct_key.push(column);
}
let distinct_len = distinct_key.len();
relation_expr = HirRelationExpr::top_k(
relation_expr.map(map_exprs),
distinct_key,
order_by.iter().skip(distinct_len).cloned().collect(),
Some(HirScalarExpr::literal(Datum::Int64(1), ScalarType::Int64)),
0,
group_size_hints.distinct_on_input_group_size,
);
}
}
order_by
};
let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
Ok(SelectPlan {
expr: relation_expr,
scope,
order_by,
project: project_key,
})
}
fn plan_scalar_table_funcs(
qcx: &QueryContext,
table_funcs: BTreeMap<Function<Aug>, String>,
table_func_names: &mut BTreeMap<String, Ident>,
relation_expr: &HirRelationExpr,
from_scope: &Scope,
) -> Result<(HirRelationExpr, Scope), PlanError> {
let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
for (table_func, id) in table_funcs.iter() {
table_func_names.insert(
id.clone(),
Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
);
}
if table_funcs.len() == 1 {
let (table_func, id) = table_funcs.iter().next().unwrap();
let (expr, mut scope) =
plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
let num_cols = scope.len();
for i in 0..scope.len() {
scope.items[i].table_name = Some(PartialItemName {
database: None,
schema: None,
item: id.clone(),
});
scope.items[i].from_single_column_function = num_cols == 1;
scope.items[i].allow_unqualified_references = false;
}
return Ok((expr, scope));
}
let (expr, mut scope, num_cols) =
plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
let mut i = 0;
for (id, num_cols) in table_funcs.values().zip(num_cols) {
for _ in 0..num_cols {
scope.items[i].table_name = Some(PartialItemName {
database: None,
schema: None,
item: id.clone(),
});
scope.items[i].from_single_column_function = num_cols == 1;
scope.items[i].allow_unqualified_references = false;
i += 1;
}
scope.items[i].table_name = Some(PartialItemName {
database: None,
schema: None,
item: id.clone(),
});
scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
scope.items[i].allow_unqualified_references = false;
i += 1;
}
scope.items[i].allow_unqualified_references = false;
Ok((expr, scope))
}
fn plan_group_by_expr<'a>(
ecx: &ExprContext,
group_expr: &'a Expr<Aug>,
projection: &'a [(ExpandedSelectItem, ColumnName)],
) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
let plan_projection = |column: usize| match &projection[column].0 {
ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
ExpandedSelectItem::Expr(expr) => {
Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
}
};
if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
return plan_projection(column);
}
match group_expr {
Expr::Identifier(names) => match plan_identifier(ecx, names) {
Err(PlanError::UnknownColumn {
table: None,
column,
similar,
}) => {
let mut iter = projection.iter().map(|(_expr, name)| name);
if let Some(i) = iter.position(|n| *n == column) {
if iter.any(|n| *n == column) {
Err(PlanError::AmbiguousColumn(column))
} else {
plan_projection(i)
}
} else {
Err(PlanError::UnknownColumn {
table: None,
column,
similar,
})
}
}
res => Ok((Some(group_expr), res?)),
},
_ => Ok((
Some(group_expr),
plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
)),
}
}
pub(crate) fn plan_order_by_exprs(
ecx: &ExprContext,
order_by_exprs: &[OrderByExpr<Aug>],
output_columns: &[(usize, &ColumnName)],
) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
let mut order_by = vec![];
let mut map_exprs = vec![];
for obe in order_by_exprs {
let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
let column = match expr {
HirScalarExpr::Column(ColumnRef { level: 0, column }) => column,
_ => {
map_exprs.push(expr);
ecx.relation_type.arity() + map_exprs.len() - 1
}
};
order_by.push(resolve_desc_and_nulls_last(obe, column));
}
Ok((order_by, map_exprs))
}
fn plan_order_by_or_distinct_expr(
ecx: &ExprContext,
expr: &Expr<Aug>,
output_columns: &[(usize, &ColumnName)],
) -> Result<HirScalarExpr, PlanError> {
if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
return Ok(HirScalarExpr::column(output_columns[i].0));
}
if let Expr::Identifier(names) = expr {
if let [name] = &names[..] {
let name = normalize::column_name(name.clone());
let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
if let Some((i, _)) = iter.next() {
match iter.next() {
Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
_ => return Ok(HirScalarExpr::column(*i)),
}
}
}
}
plan_expr(ecx, expr)?.type_as_any(ecx)
}
fn plan_table_with_joins(
qcx: &QueryContext,
table_with_joins: &TableWithJoins<Aug>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
for join in &table_with_joins.joins {
let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
expr = new_expr;
scope = new_scope;
}
Ok((expr, scope))
}
fn plan_table_factor(
qcx: &QueryContext,
table_factor: &TableFactor<Aug>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
match table_factor {
TableFactor::Table { name, alias } => {
let (expr, scope) = qcx.resolve_table_name(name.clone())?;
let scope = plan_table_alias(scope, alias.as_ref())?;
Ok((expr, scope))
}
TableFactor::Function {
function,
alias,
with_ordinality,
} => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
TableFactor::RowsFrom {
functions,
alias,
with_ordinality,
} => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
TableFactor::Derived {
lateral,
subquery,
alias,
} => {
let mut qcx = (*qcx).clone();
if !lateral {
for scope in &mut qcx.outer_scopes {
if scope.lateral_barrier {
break;
}
scope.items.clear();
}
}
qcx.outer_scopes[0].lateral_barrier = true;
let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
let scope = plan_table_alias(scope, alias.as_ref())?;
Ok((expr, scope))
}
TableFactor::NestedJoin { join, alias } => {
let (expr, scope) = plan_table_with_joins(qcx, join)?;
let scope = plan_table_alias(scope, alias.as_ref())?;
Ok((expr, scope))
}
}
}
fn plan_rows_from(
qcx: &QueryContext,
functions: &[Function<Aug>],
alias: Option<&TableAlias>,
with_ordinality: bool,
) -> Result<(HirRelationExpr, Scope), PlanError> {
if let [function] = functions {
return plan_solitary_table_function(qcx, function, alias, with_ordinality);
}
let (expr, mut scope, num_cols) = plan_rows_from_internal(
qcx,
functions,
Some(functions[0].name.full_item_name().clone()),
)?;
let mut columns = Vec::new();
let mut offset = 0;
for (idx, cols) in num_cols.into_iter().enumerate() {
for i in 0..cols {
columns.push(offset + i);
}
offset += cols + 1;
scope.items.remove(offset - idx - 1);
}
if with_ordinality {
columns.push(scope.items.len());
} else {
scope.items.pop();
}
let expr = expr.project(columns);
let scope = plan_table_alias(scope, alias)?;
Ok((expr, scope))
}
fn plan_rows_from_internal<'a>(
qcx: &QueryContext,
functions: impl IntoIterator<Item = &'a Function<Aug>>,
table_name: Option<FullItemName>,
) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
let mut functions = functions.into_iter();
let mut num_cols = Vec::new();
let (mut left_expr, mut left_scope) =
plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
num_cols.push(left_scope.len() - 1);
left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
left_scope
.items
.push(ScopeItem::from_column_name("ordinality"));
for function in functions {
let qcx = qcx.empty_derived_context();
let (right_expr, mut right_scope) =
plan_table_function_internal(&qcx, function, true, table_name.clone())?;
num_cols.push(right_scope.len() - 1);
let left_col = left_scope.len() - 1;
let right_col = left_scope.len() + right_scope.len() - 1;
let on = HirScalarExpr::CallBinary {
func: BinaryFunc::Eq,
expr1: Box::new(HirScalarExpr::column(left_col)),
expr2: Box::new(HirScalarExpr::column(right_col)),
};
left_expr = left_expr
.join(right_expr, on, JoinKind::FullOuter)
.map(vec![HirScalarExpr::CallVariadic {
func: VariadicFunc::Coalesce,
exprs: vec![
HirScalarExpr::column(left_col),
HirScalarExpr::column(right_col),
],
}]);
left_expr = left_expr.project(
(0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
);
right_scope.items.push(left_scope.items.pop().unwrap());
left_scope.items.extend(right_scope.items);
}
Ok((left_expr, left_scope, num_cols))
}
fn plan_solitary_table_function(
qcx: &QueryContext,
function: &Function<Aug>,
alias: Option<&TableAlias>,
with_ordinality: bool,
) -> Result<(HirRelationExpr, Scope), PlanError> {
let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
if single_column_function {
let item = &mut scope.items[0];
item.from_single_column_function = true;
if let Some(alias) = alias {
if let ScopeItem {
table_name: Some(table_name),
column_name,
..
} = item
{
if table_name.item.as_str() == column_name.as_str() {
*column_name = normalize::column_name(alias.name.clone());
}
}
}
}
let scope = plan_table_alias(scope, alias)?;
Ok((expr, scope))
}
fn plan_table_function_internal(
qcx: &QueryContext,
Function {
name,
args,
filter,
over,
distinct,
}: &Function<Aug>,
with_ordinality: bool,
table_name: Option<FullItemName>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
assert_none!(filter, "cannot parse table function with FILTER");
assert_none!(over, "cannot parse table function with OVER");
assert!(!*distinct, "cannot parse table function with DISTINCT");
let ecx = &ExprContext {
qcx,
name: "table function arguments",
scope: &Scope::empty(),
relation_type: &RelationType::empty(),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let scalar_args = match args {
FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
FunctionArgs::Args { args, order_by } => {
if !order_by.is_empty() {
sql_bail!(
"ORDER BY specified, but {} is not an aggregate function",
name
);
}
plan_exprs(ecx, args)?
}
};
let table_name = match table_name {
Some(table_name) => table_name.item,
None => name.full_item_name().item.clone(),
};
let scope_name = Some(PartialItemName {
database: None,
schema: None,
item: table_name,
});
let (mut expr, mut scope) = match resolve_func(ecx, name, args)? {
Func::Table(impls) => {
let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
let scope = Scope::from_source(scope_name.clone(), tf.column_names);
(tf.expr, scope)
}
Func::Scalar(impls) => {
let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
let output = expr.typ(
&qcx.outer_relation_types,
&RelationType::new(vec![]),
&qcx.scx.param_types.borrow(),
);
let relation = RelationType::new(vec![output]);
let function_ident = Ident::new(name.full_item_name().item.clone())?;
let column_name = normalize::column_name(function_ident);
let name = column_name.to_string();
let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
(
HirRelationExpr::CallTable {
func: mz_expr::TableFunc::TabletizedScalar { relation, name },
exprs: vec![expr],
},
scope,
)
}
o => sql_bail!(
"{} functions are not supported in functions in FROM",
o.class()
),
};
if with_ordinality {
expr = expr.map(vec![HirScalarExpr::Windowing(WindowExpr {
func: WindowExprType::Scalar(ScalarWindowExpr {
func: ScalarWindowFunc::RowNumber,
order_by: vec![],
}),
partition_by: vec![],
order_by: vec![],
})]);
scope
.items
.push(ScopeItem::from_name(scope_name, "ordinality"));
}
Ok((expr, scope))
}
fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
if let Some(TableAlias {
name,
columns,
strict,
}) = alias
{
if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
sql_bail!(
"{} has {} columns available but {} columns specified",
name,
scope.items.len(),
columns.len()
);
}
let table_name = normalize::ident(name.to_owned());
for (i, item) in scope.items.iter_mut().enumerate() {
item.table_name = if item.allow_unqualified_references {
Some(PartialItemName {
database: None,
schema: None,
item: table_name.clone(),
})
} else {
None
};
item.column_name = columns
.get(i)
.map(|a| normalize::column_name(a.clone()))
.unwrap_or_else(|| item.column_name.clone());
}
}
Ok(scope)
}
fn invent_column_name(
ecx: &ExprContext,
expr: &Expr<Aug>,
table_func_names: &BTreeMap<String, Ident>,
) -> Option<ColumnName> {
#[derive(Debug)]
enum NameQuality {
Low,
High,
}
fn invent(
ecx: &ExprContext,
expr: &Expr<Aug>,
table_func_names: &BTreeMap<String, Ident>,
) -> Option<(ColumnName, NameQuality)> {
match expr {
Expr::Identifier(names) => {
if let [name] = names.as_slice() {
if let Some(table_func_name) = table_func_names.get(name.as_str()) {
return Some((
normalize::column_name(table_func_name.clone()),
NameQuality::High,
));
}
}
names
.last()
.map(|n| (normalize::column_name(n.clone()), NameQuality::High))
}
Expr::Value(v) => match v {
Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
_ => None,
},
Expr::Function(func) => {
let (schema, item) = match &func.name {
ResolvedItemName::Item {
qualifiers,
full_name,
..
} => (&qualifiers.schema_spec, full_name.item.clone()),
_ => unreachable!(),
};
if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
|| schema
== &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
{
None
} else {
Some((item.into(), NameQuality::High))
}
}
Expr::HomogenizingFunction { function, .. } => Some((
function.to_string().to_lowercase().into(),
NameQuality::High,
)),
Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
Expr::List { .. } => Some(("list".into(), NameQuality::High)),
Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
_ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
},
Expr::Case { else_result, .. } => {
match else_result
.as_ref()
.and_then(|else_result| invent(ecx, else_result, table_func_names))
{
Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
_ => Some(("case".into(), NameQuality::Low)),
}
}
Expr::FieldAccess { field, .. } => {
Some((normalize::column_name(field.clone()), NameQuality::High))
}
Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
let (_expr, scope) =
plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
scope
.items
.first()
.map(|name| (name.column_name.clone(), NameQuality::High))
}
Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
_ => None,
}
}
invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
}
#[derive(Debug)]
enum ExpandedSelectItem<'a> {
InputOrdinal(usize),
Expr(Cow<'a, Expr<Aug>>),
}
impl ExpandedSelectItem<'_> {
fn as_expr(&self) -> Option<&Expr<Aug>> {
match self {
ExpandedSelectItem::InputOrdinal(_) => None,
ExpandedSelectItem::Expr(expr) => Some(expr),
}
}
}
fn expand_select_item<'a>(
ecx: &ExprContext,
s: &'a SelectItem<Aug>,
table_func_names: &BTreeMap<String, Ident>,
) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
match s {
SelectItem::Expr {
expr: Expr::QualifiedWildcard(table_name),
alias: _,
} => {
*ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
let table_name =
normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
let out: Vec<_> = ecx
.scope
.items
.iter()
.enumerate()
.filter(|(_i, item)| item.is_from_table(&table_name))
.map(|(i, item)| {
let name = item.column_name.clone();
(ExpandedSelectItem::InputOrdinal(i), name)
})
.collect();
if out.is_empty() {
sql_bail!("no table named '{}' in scope", table_name);
}
Ok(out)
}
SelectItem::Expr {
expr: Expr::WildcardAccess(sql_expr),
alias: _,
} => {
*ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
let fields = match ecx.scalar_type(&expr) {
ScalarType::Record { fields, .. } => fields,
ty => sql_bail!("type {} is not composite", ecx.humanize_scalar_type(&ty)),
};
let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
if let Expr::Identifier(ident) = sql_expr.as_ref() {
if let [name] = ident.as_slice() {
if let Ok(items) = ecx.scope.items_from_table(
&[],
&PartialItemName {
database: None,
schema: None,
item: name.as_str().to_string(),
},
) {
for (_, item) in items {
if item
.is_exists_column_for_a_table_function_that_was_in_the_target_list
{
skip_cols.insert(item.column_name.clone());
}
}
}
}
}
let items = fields
.iter()
.filter_map(|(name, _ty)| {
if skip_cols.contains(name) {
None
} else {
let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
expr: sql_expr.clone(),
field: name.clone().into(),
}));
Some((item, name.clone()))
}
})
.collect();
Ok(items)
}
SelectItem::Wildcard => {
*ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
let items: Vec<_> = ecx
.scope
.items
.iter()
.enumerate()
.filter(|(_i, item)| item.allow_unqualified_references)
.map(|(i, item)| {
let name = item.column_name.clone();
(ExpandedSelectItem::InputOrdinal(i), name)
})
.collect();
Ok(items)
}
SelectItem::Expr { expr, alias } => {
let name = alias
.clone()
.map(normalize::column_name)
.or_else(|| invent_column_name(ecx, expr, table_func_names))
.unwrap_or_else(|| "?column?".into());
Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
}
}
}
fn plan_join(
left_qcx: &QueryContext,
left: HirRelationExpr,
left_scope: Scope,
join: &Join<Aug>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
let (kind, constraint) = match &join.join_operator {
JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
};
let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
if !kind.can_be_correlated() {
for item in &mut right_qcx.outer_scopes[0].items {
item.error_if_referenced =
Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
table: table.cloned(),
column: column.clone(),
});
}
}
let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
let (expr, scope) = match constraint {
JoinConstraint::On(expr) => {
let product_scope = left_scope.product(right_scope)?;
let ecx = &ExprContext {
qcx: left_qcx,
name: "ON clause",
scope: &product_scope,
relation_type: &RelationType::new(
left_qcx
.relation_type(&left)
.column_types
.into_iter()
.chain(right_qcx.relation_type(&right).column_types)
.collect(),
),
allow_aggregates: false,
allow_subqueries: true,
allow_parameters: true,
allow_windows: false,
};
let on = plan_expr(ecx, expr)?.type_as(ecx, &ScalarType::Bool)?;
let joined = left.join(right, on, kind);
(joined, product_scope)
}
JoinConstraint::Using { columns, alias } => {
let column_names = columns
.iter()
.map(|ident| normalize::column_name(ident.clone()))
.collect::<Vec<_>>();
plan_using_constraint(
&column_names,
left_qcx,
left,
left_scope,
&right_qcx,
right,
right_scope,
kind,
alias.as_ref(),
)?
}
JoinConstraint::Natural => {
*left_qcx.scx.ambiguous_columns.borrow_mut() = true;
*right_qcx.scx.ambiguous_columns.borrow_mut() = true;
let left_column_names = left_scope.column_names();
let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
let column_names: Vec<_> = left_column_names
.filter(|col| right_column_names.contains(col))
.cloned()
.collect();
plan_using_constraint(
&column_names,
left_qcx,
left,
left_scope,
&right_qcx,
right,
right_scope,
kind,
None,
)?
}
};
Ok((expr, scope))
}
#[allow(clippy::too_many_arguments)]
fn plan_using_constraint(
column_names: &[ColumnName],
left_qcx: &QueryContext,
left: HirRelationExpr,
left_scope: Scope,
right_qcx: &QueryContext,
right: HirRelationExpr,
right_scope: Scope,
kind: JoinKind,
alias: Option<&Ident>,
) -> Result<(HirRelationExpr, Scope), PlanError> {
let mut both_scope = left_scope.clone().product(right_scope.clone())?;
let mut unique_column_names = BTreeSet::new();
for c in column_names {
if !unique_column_names.insert(c) {
return Err(PlanError::Unsupported {
feature: format!(
"column name {} appears more than once in USING clause",
c.as_str().quoted()
),
discussion_no: None,
});
}
}
let alias_item_name = alias.map(|alias| PartialItemName {
database: None,
schema: None,
item: alias.clone().to_string(),
});
if let Some(alias_item_name) = &alias_item_name {
for partial_item_name in both_scope.table_names() {
if partial_item_name.matches(alias_item_name) {
sql_bail!(
"table name \"{}\" specified more than once",
alias_item_name
)
}
}
}
let ecx = &ExprContext {
qcx: right_qcx,
name: "USING clause",
scope: &both_scope,
relation_type: &RelationType::new(
left_qcx
.relation_type(&left)
.column_types
.into_iter()
.chain(right_qcx.relation_type(&right).column_types)
.collect(),
),
allow_aggregates: false,
allow_subqueries: false,
allow_parameters: false,
allow_windows: false,
};
let mut join_exprs = vec![];
let mut map_exprs = vec![];
let mut new_items = vec![];
let mut join_cols = vec![];
let mut hidden_cols = vec![];
for column_name in column_names {
let lhs = left_scope.resolve_using_column(column_name, JoinSide::Left)?;
let mut rhs = right_scope.resolve_using_column(column_name, JoinSide::Right)?;
rhs.column += left_scope.len();
let mut exprs = coerce_homogeneous_exprs(
&ecx.with_name(&format!(
"NATURAL/USING join column {}",
column_name.as_str().quoted()
)),
vec![
CoercibleScalarExpr::Coerced(HirScalarExpr::Column(lhs)),
CoercibleScalarExpr::Coerced(HirScalarExpr::Column(rhs)),
],
None,
)?;
let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
match kind {
JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
join_cols.push(lhs.column);
hidden_cols.push(rhs.column);
}
JoinKind::RightOuter => {
join_cols.push(rhs.column);
hidden_cols.push(lhs.column);
}
JoinKind::FullOuter => {
join_cols.push(both_scope.items.len() + map_exprs.len());
hidden_cols.push(lhs.column);
hidden_cols.push(rhs.column);
map_exprs.push(HirScalarExpr::CallVariadic {
func: VariadicFunc::Coalesce,
exprs: vec![expr1.clone(), expr2.clone()],
});
new_items.push(ScopeItem::from_column_name(column_name));
}
}
if alias_item_name.is_some() {
let new_item_col = both_scope.items.len() + new_items.len();
join_cols.push(new_item_col);
hidden_cols.push(new_item_col);
new_items.push(ScopeItem::from_name(
alias_item_name.clone(),
column_name.clone().to_string(),
));
map_exprs.push(HirScalarExpr::Column(lhs));
}
join_exprs.push(HirScalarExpr::CallBinary {
func: BinaryFunc::Eq,
expr1: Box::new(expr1),
expr2: Box::new(expr2),
});
}
both_scope.items.extend(new_items);
for c in hidden_cols {
both_scope.items[c].allow_unqualified_references = false;
}
let project_key = join_cols
.into_iter()
.chain(0..both_scope.items.len())
.unique()
.collect::<Vec<_>>();
both_scope = both_scope.project(&project_key);
let on = HirScalarExpr::variadic_and(join_exprs);
let both = left
.join(right, on, kind)
.map(map_exprs)
.project(project_key);
Ok((both, both_scope))
}
pub fn plan_expr<'a>(
ecx: &'a ExprContext,
e: &Expr<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
}
fn plan_expr_inner<'a>(
ecx: &'a ExprContext,
e: &Expr<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
if let Some(i) = ecx.scope.resolve_expr(e) {
return Ok(HirScalarExpr::Column(i).into());
}
match e {
Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
Ok(plan_identifier(ecx, names)?.into())
}
Expr::Value(val) => plan_literal(val),
Expr::Parameter(n) => plan_parameter(ecx, *n),
Expr::Array(exprs) => plan_array(ecx, exprs, None),
Expr::List(exprs) => plan_list(ecx, exprs, None),
Expr::Map(exprs) => plan_map(ecx, exprs, None),
Expr::Row { exprs } => plan_row(ecx, exprs),
Expr::Op { op, expr1, expr2 } => {
Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
}
Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
Expr::Not { expr } => plan_not(ecx, expr),
Expr::And { left, right } => plan_and(ecx, left, right),
Expr::Or { left, right } => plan_or(ecx, left, right),
Expr::IsExpr {
expr,
construct,
negated,
} => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
Expr::Case {
operand,
conditions,
results,
else_result,
} => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
Expr::HomogenizingFunction { function, exprs } => {
plan_homogenizing_function(ecx, function, exprs)
}
Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
ecx,
&None,
&[l_expr.clone().equals(*r_expr.clone())],
&[Expr::null()],
&Some(Box::new(*l_expr.clone())),
)?
.into()),
Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
Expr::Like {
expr,
pattern,
escape,
case_insensitive,
negated,
} => Ok(plan_like(
ecx,
expr,
pattern,
escape.as_deref(),
*case_insensitive,
*negated,
)?
.into()),
Expr::InList {
expr,
list,
negated,
} => plan_in_list(ecx, expr, list, negated),
Expr::Exists(query) => plan_exists(ecx, query),
Expr::Subquery(query) => plan_subquery(ecx, query),
Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
}
}
fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
if !ecx.allow_parameters {
return Err(PlanError::UnknownParameter(n));
}
if n == 0 || n > 65536 {
return Err(PlanError::UnknownParameter(n));
}
if ecx.param_types().borrow().contains_key(&n) {
Ok(HirScalarExpr::Parameter(n).into())
} else {
Ok(CoercibleScalarExpr::Parameter(n))
}
}
fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
let mut out = vec![];
for e in exprs {
out.push(plan_expr(ecx, e)?);
}
Ok(CoercibleScalarExpr::LiteralRecord(out))
}
fn plan_cast(
ecx: &ExprContext,
expr: &Expr<Aug>,
data_type: &ResolvedDataType,
) -> Result<CoercibleScalarExpr, PlanError> {
let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
let expr = match expr {
Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
_ => plan_expr(ecx, expr)?,
};
let ecx = &ecx.with_name("CAST");
let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
Ok(expr.into())
}
fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
let ecx = ecx.with_name("NOT argument");
Ok(HirScalarExpr::CallUnary {
func: UnaryFunc::Not(expr_func::Not),
expr: Box::new(plan_expr(&ecx, expr)?.type_as(&ecx, &ScalarType::Bool)?),
}
.into())
}
fn plan_and(
ecx: &ExprContext,
left: &Expr<Aug>,
right: &Expr<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
let ecx = ecx.with_name("AND argument");
Ok(HirScalarExpr::variadic_and(vec![
plan_expr(&ecx, left)?.type_as(&ecx, &ScalarType::Bool)?,
plan_expr(&ecx, right)?.type_as(&ecx, &ScalarType::Bool)?,
])
.into())
}
fn plan_or(
ecx: &ExprContext,
left: &Expr<Aug>,
right: &Expr<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
let ecx = ecx.with_name("OR argument");
Ok(HirScalarExpr::variadic_or(vec![
plan_expr(&ecx, left)?.type_as(&ecx, &ScalarType::Bool)?,
plan_expr(&ecx, right)?.type_as(&ecx, &ScalarType::Bool)?,
])
.into())
}
fn plan_in_list(
ecx: &ExprContext,
lhs: &Expr<Aug>,
list: &Vec<Expr<Aug>>,
negated: &bool,
) -> Result<CoercibleScalarExpr, PlanError> {
let ecx = ecx.with_name("IN list");
let or = HirScalarExpr::variadic_or(
list.into_iter()
.map(|e| {
let eq = lhs.clone().equals(e.clone());
plan_expr(&ecx, &eq)?.type_as(&ecx, &ScalarType::Bool)
})
.collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
);
Ok(if *negated {
or.call_unary(UnaryFunc::Not(expr_func::Not))
} else {
or
}
.into())
}
fn plan_homogenizing_function(
ecx: &ExprContext,
function: &HomogenizingFunction,
exprs: &[Expr<Aug>],
) -> Result<CoercibleScalarExpr, PlanError> {
assert!(!exprs.is_empty()); let expr = HirScalarExpr::CallVariadic {
func: match function {
HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
HomogenizingFunction::Greatest => VariadicFunc::Greatest,
HomogenizingFunction::Least => VariadicFunc::Least,
},
exprs: coerce_homogeneous_exprs(
&ecx.with_name(&function.to_string().to_lowercase()),
plan_exprs(ecx, exprs)?,
None,
)?,
};
Ok(expr.into())
}
fn plan_field_access(
ecx: &ExprContext,
expr: &Expr<Aug>,
field: &Ident,
) -> Result<CoercibleScalarExpr, PlanError> {
let field = normalize::column_name(field.clone());
let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
let ty = ecx.scalar_type(&expr);
let i = match &ty {
ScalarType::Record { fields, .. } => fields.iter().position(|(name, _ty)| *name == field),
ty => sql_bail!(
"column notation applied to type {}, which is not a composite type",
ecx.humanize_scalar_type(ty)
),
};
match i {
None => sql_bail!(
"field {} not found in data type {}",
field,
ecx.humanize_scalar_type(&ty)
),
Some(i) => Ok(expr
.call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
.into()),
}
}
fn plan_subscript(
ecx: &ExprContext,
expr: &Expr<Aug>,
positions: &[SubscriptPosition<Aug>],
) -> Result<CoercibleScalarExpr, PlanError> {
assert!(
!positions.is_empty(),
"subscript expression must contain at least one position"
);
let ecx = &ecx.with_name("subscripting");
let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
let ty = ecx.scalar_type(&expr);
match &ty {
ScalarType::Array(..) | ScalarType::Int2Vector => plan_subscript_array(
ecx,
expr,
positions,
if ty == ScalarType::Int2Vector { 1 } else { 0 },
),
ScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
ScalarType::List { element_type, .. } => {
let elem_type_name = ecx.humanize_scalar_type(element_type);
let n_layers = ty.unwrap_list_n_layers();
plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
}
ty => sql_bail!("cannot subscript type {}", ecx.humanize_scalar_type(ty)),
}
}
fn extract_scalar_subscript_from_positions<'a>(
positions: &'a [SubscriptPosition<Aug>],
expr_type_name: &str,
) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
let mut scalar_subscripts = Vec::with_capacity(positions.len());
for p in positions {
if p.explicit_slice {
sql_bail!("{} subscript does not support slices", expr_type_name);
}
assert!(
p.end.is_none(),
"index-appearing subscripts cannot have end value"
);
scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
}
Ok(scalar_subscripts)
}
fn plan_subscript_array(
ecx: &ExprContext,
expr: HirScalarExpr,
positions: &[SubscriptPosition<Aug>],
offset: i64,
) -> Result<CoercibleScalarExpr, PlanError> {
let mut exprs = Vec::with_capacity(positions.len() + 1);
exprs.push(expr);
let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
for i in indexes {
exprs.push(plan_expr(ecx, i)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?);
}
Ok(HirScalarExpr::CallVariadic {
func: VariadicFunc::ArrayIndex { offset },
exprs,
}
.into())
}
fn plan_subscript_list(
ecx: &ExprContext,
mut expr: HirScalarExpr,
positions: &[SubscriptPosition<Aug>],
mut remaining_layers: usize,
elem_type_name: &str,
) -> Result<CoercibleScalarExpr, PlanError> {
let mut i = 0;
while i < positions.len() {
let j = positions[i..]
.iter()
.position(|p| p.explicit_slice)
.unwrap_or(positions.len() - i);
if j != 0 {
let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
let (n, e) = plan_index_list(
ecx,
expr,
indexes.as_slice(),
remaining_layers,
elem_type_name,
)?;
remaining_layers = n;
expr = e;
i += j;
}
let j = positions[i..]
.iter()
.position(|p| !p.explicit_slice)
.unwrap_or(positions.len() - i);
if j != 0 {
expr = plan_slice_list(
ecx,
expr,
&positions[i..i + j],
remaining_layers,
elem_type_name,
)?;
i += j;
}
}
Ok(expr.into())
}
fn plan_index_list(
ecx: &ExprContext,
expr: HirScalarExpr,
indexes: &[&Expr<Aug>],
n_layers: usize,
elem_type_name: &str,
) -> Result<(usize, HirScalarExpr), PlanError> {
let depth = indexes.len();
if depth > n_layers {
if n_layers == 0 {
sql_bail!("cannot subscript type {}", elem_type_name)
} else {
sql_bail!(
"cannot index into {} layers; list only has {} layer{}",
depth,
n_layers,
if n_layers == 1 { "" } else { "s" }
)
}
}
let mut exprs = Vec::with_capacity(depth + 1);
exprs.push(expr);
for i in indexes {
exprs.push(plan_expr(ecx, i)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?);
}
Ok((
n_layers - depth,
HirScalarExpr::CallVariadic {
func: VariadicFunc::ListIndex,
exprs,
},
))
}
fn plan_slice_list(
ecx: &ExprContext,
expr: HirScalarExpr,
slices: &[SubscriptPosition<Aug>],
n_layers: usize,
elem_type_name: &str,
) -> Result<HirScalarExpr, PlanError> {
if n_layers == 0 {
sql_bail!("cannot subscript type {}", elem_type_name)
}
let mut exprs = Vec::with_capacity(slices.len() + 1);
exprs.push(expr);
let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
Ok(match position {
Some(p) => {
plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?
}
None => HirScalarExpr::literal(Datum::Int64(default), ScalarType::Int64),
})
};
for p in slices {
let start = extract_position_or_default(p.start.as_ref(), 1)?;
let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
exprs.push(start);
exprs.push(end);
}
Ok(HirScalarExpr::CallVariadic {
func: VariadicFunc::ListSliceLinear,
exprs,
})
}
fn plan_like(
ecx: &ExprContext,
expr: &Expr<Aug>,
pattern: &Expr<Aug>,
escape: Option<&Expr<Aug>>,
case_insensitive: bool,
not: bool,
) -> Result<HirScalarExpr, PlanError> {
use CastContext::Implicit;
let ecx = ecx.with_name("LIKE argument");
let expr = plan_expr(&ecx, expr)?;
let haystack = match ecx.scalar_type(&expr) {
CoercibleScalarType::Coerced(ref ty @ ScalarType::Char { length }) => expr
.type_as(&ecx, ty)?
.call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
_ => expr.cast_to(&ecx, Implicit, &ScalarType::String)?,
};
let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &ScalarType::String)?;
if let Some(escape) = escape {
pattern = pattern.call_binary(
plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &ScalarType::String)?,
BinaryFunc::LikeEscape,
);
}
let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
if not {
Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
} else {
Ok(like)
}
}
fn plan_subscript_jsonb(
ecx: &ExprContext,
expr: HirScalarExpr,
positions: &[SubscriptPosition<Aug>],
) -> Result<CoercibleScalarExpr, PlanError> {
use CastContext::Implicit;
use ScalarType::{Int64, String};
let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
let mut exprs = Vec::with_capacity(subscripts.len());
for s in subscripts {
let subscript = plan_expr(ecx, s)?;
let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
subscript
} else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
typeconv::to_string(ecx, subscript)
} else {
sql_bail!("jsonb subscript type must be coercible to integer or text");
};
exprs.push(subscript);
}
let expr = expr.call_binary(
HirScalarExpr::CallVariadic {
func: VariadicFunc::ArrayCreate {
elem_type: ScalarType::String,
},
exprs,
},
BinaryFunc::JsonbGetPath { stringify: false },
);
Ok(expr.into())
}
fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
if !ecx.allow_subqueries {
sql_bail!("{} does not allow subqueries", ecx.name)
}
let mut qcx = ecx.derived_query_context();
let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
Ok(expr.exists().into())
}
fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
if !ecx.allow_subqueries {
sql_bail!("{} does not allow subqueries", ecx.name)
}
let mut qcx = ecx.derived_query_context();
let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
let column_types = qcx.relation_type(&expr).column_types;
if column_types.len() != 1 {
sql_bail!(
"Expected subselect to return 1 column, got {} columns",
column_types.len()
);
}
Ok(expr.select().into())
}
fn plan_list_subquery(
ecx: &ExprContext,
query: &Query<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
plan_vector_like_subquery(
ecx,
query,
|_| false,
|elem_type| VariadicFunc::ListCreate { elem_type },
|order_by| AggregateFunc::ListConcat { order_by },
BinaryFunc::ListListConcat,
|elem_type| {
HirScalarExpr::literal(
Datum::empty_list(),
ScalarType::List {
element_type: Box::new(elem_type),
custom_id: None,
},
)
},
"list",
)
}
fn plan_array_subquery(
ecx: &ExprContext,
query: &Query<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
plan_vector_like_subquery(
ecx,
query,
|elem_type| {
matches!(
elem_type,
ScalarType::Char { .. }
| ScalarType::Array { .. }
| ScalarType::List { .. }
| ScalarType::Map { .. }
)
},
|elem_type| VariadicFunc::ArrayCreate { elem_type },
|order_by| AggregateFunc::ArrayConcat { order_by },
BinaryFunc::ArrayArrayConcat,
|elem_type| {
HirScalarExpr::literal(Datum::empty_array(), ScalarType::Array(Box::new(elem_type)))
},
"[]",
)
}
fn plan_vector_like_subquery<F1, F2, F3, F4>(
ecx: &ExprContext,
query: &Query<Aug>,
is_unsupported_type: F1,
vector_create: F2,
aggregate_concat: F3,
binary_concat: BinaryFunc,
empty_literal: F4,
vector_type_string: &str,
) -> Result<CoercibleScalarExpr, PlanError>
where
F1: Fn(&ScalarType) -> bool,
F2: Fn(ScalarType) -> VariadicFunc,
F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
F4: Fn(ScalarType) -> HirScalarExpr,
{
if !ecx.allow_subqueries {
sql_bail!("{} does not allow subqueries", ecx.name)
}
let mut qcx = ecx.derived_query_context();
let mut planned_query = plan_query(&mut qcx, query)?;
if planned_query.limit.is_some() || planned_query.offset > 0 {
planned_query.expr = HirRelationExpr::top_k(
planned_query.expr,
vec![],
planned_query.order_by.clone(),
planned_query.limit,
planned_query.offset,
planned_query.group_size_hints.limit_input_group_size,
);
}
if planned_query.project.len() != 1 {
sql_bail!(
"Expected subselect to return 1 column, got {} columns",
planned_query.project.len()
);
}
let project_column = *planned_query.project.get(0).unwrap();
let elem_type = qcx
.relation_type(&planned_query.expr)
.column_types
.get(project_column)
.cloned()
.unwrap()
.scalar_type();
if is_unsupported_type(&elem_type) {
bail_unsupported!(format!(
"cannot build array from subquery because return type {}{}",
ecx.humanize_scalar_type(&elem_type),
vector_type_string
));
}
let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::CallVariadic {
func: vector_create(elem_type.clone()),
exprs: vec![HirScalarExpr::column(project_column)],
})
.chain(
planned_query
.order_by
.iter()
.map(|co| HirScalarExpr::column(co.column)),
)
.collect();
let aggregation_projection = vec![0];
let aggregation_order_by = planned_query
.order_by
.into_iter()
.enumerate()
.map(|(i, order)| ColumnOrder { column: i, ..order })
.collect();
let reduced_expr = planned_query
.expr
.reduce(
vec![],
vec![AggregateExpr {
func: aggregate_concat(aggregation_order_by),
expr: Box::new(HirScalarExpr::CallVariadic {
func: VariadicFunc::RecordCreate {
field_names: iter::repeat(ColumnName::from(""))
.take(aggregation_exprs.len())
.collect(),
},
exprs: aggregation_exprs,
}),
distinct: false,
}],
None,
)
.project(aggregation_projection);
Ok(HirScalarExpr::CallBinary {
func: binary_concat,
expr1: Box::new(HirScalarExpr::Select(Box::new(reduced_expr))),
expr2: Box::new(empty_literal(elem_type)),
}
.into())
}
fn plan_map_subquery(
ecx: &ExprContext,
query: &Query<Aug>,
) -> Result<CoercibleScalarExpr, PlanError> {
if !ecx.allow_subqueries {
sql_bail!("{} does not allow subqueries", ecx.name)
}
let mut qcx = ecx.derived_query_context();
let mut query = plan_query(&mut qcx, query)?;
if query.limit.is_some() || query.offset > 0 {
query.expr = HirRelationExpr::top_k(
query.expr,
vec![],
query.order_by.clone(),
query.limit,
query.offset,
query.group_size_hints.limit_input_group_size,
);
}
if query.project.len() != 2 {
sql_bail!(
"expected map subquery to return 2 columns, got {} columns",
query.project.len()
);
}
let query_types = qcx.relation_type(&query.expr).column_types;
let key_column = query.project[0];
let key_type = query_types[key_column].clone().scalar_type();
let value_column = query.project[1];
let value_type = query_types[value_column].clone().scalar_type();
if key_type != ScalarType::String {
sql_bail!("cannot build map from subquery because first column is not of type text");
}
let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::CallVariadic {
func: VariadicFunc::RecordCreate {
field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
},
exprs: vec![
HirScalarExpr::column(key_column),
HirScalarExpr::column(value_column),
],
})
.chain(
query
.order_by
.iter()
.map(|co| HirScalarExpr::column(co.column)),
)
.collect();
let expr = query
.expr
.reduce(
vec![],
vec![AggregateExpr {
func: AggregateFunc::MapAgg {
order_by: query
.order_by
.into_iter()
.enumerate()
.map(|(i, order)| ColumnOrder { column: i, ..order })
.collect(),
value_type: value_type.clone(),
},
expr: Box::new(HirScalarExpr::CallVariadic {
func: VariadicFunc::RecordCreate {
field_names: iter::repeat(ColumnName::from(""))
.take(aggregation_exprs.len())
.collect(),
},
exprs: aggregation_exprs,
}),
distinct: false,
}],
None,
)
.project(vec![0]);
let expr = HirScalarExpr::CallVariadic {
func: VariadicFunc::Coalesce,
exprs: vec![
HirScalarExpr::Select(Box::new(expr)),
HirScalarExpr::literal(
Datum::empty_map(),
ScalarType::Map {
value_type: Box::new(value_type),
custom_id: None,
},
),
],
};
Ok(expr.into())
}
fn plan_collate(
ecx: &ExprContext,
expr: &Expr<Aug>,
collation: &UnresolvedItemName,
) -> Result<CoercibleScalarExpr, PlanError> {
if collation.0.len() == 2
&& collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
&& collation.0[1] == ident!("default")
{
plan_expr(ecx, expr)
} else {
bail_unsupported!("COLLATE");
}
}
fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
where
E: std::borrow::Borrow<Expr<Aug>>,
{
let mut out = vec![];
for expr in exprs {
out.push(plan_expr(ecx, expr.borrow())?);
}
Ok(out)
}
fn plan_array(
ecx: &ExprContext,
exprs: &[Expr<Aug>],
type_hint: Option<&ScalarType>,
) -> Result<CoercibleScalarExpr, PlanError> {
let mut out = vec![];
for expr in exprs {
out.push(match expr {
Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
_ => plan_expr(ecx, expr)?,
});
}
let type_hint = match type_hint {
Some(ScalarType::Array(elem_type)) => {
let multidimensional = out.iter().any(|e| {
matches!(
ecx.scalar_type(e),
CoercibleScalarType::Coerced(ScalarType::Array(_))
)
});
if multidimensional {
type_hint
} else {
Some(&**elem_type)
}
}
Some(_) => None,
None => None,
};
let (elem_type, exprs) = if exprs.is_empty() {
if let Some(elem_type) = type_hint {
(elem_type.clone(), vec![])
} else {
sql_bail!("cannot determine type of empty array");
}
} else {
let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
(ecx.scalar_type(&out[0]), out)
};
if matches!(
elem_type,
ScalarType::Char { .. } | ScalarType::List { .. } | ScalarType::Map { .. }
) {
bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type)));
}
Ok(HirScalarExpr::CallVariadic {
func: VariadicFunc::ArrayCreate { elem_type },
exprs,
}
.into())
}
fn plan_list(
ecx: &ExprContext,
exprs: &[Expr<Aug>],
type_hint: Option<&ScalarType>,
) -> Result<CoercibleScalarExpr, PlanError> {
let (elem_type, exprs) = if exprs.is_empty() {
if let Some(ScalarType::List { element_type, .. }) = type_hint {
(element_type.without_modifiers(), vec![])
} else {
sql_bail!("cannot determine type of empty list");
}
} else {
let type_hint = match type_hint {
Some(ScalarType::List { element_type, .. }) => Some(&**element_type),
_ => None,
};
let mut out = vec![];
for expr in exprs {
out.push(match expr {
Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
_ => plan_expr(ecx, expr)?,
});
}
let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
(ecx.scalar_type(&out[0]).without_modifiers(), out)
};
if matches!(elem_type, ScalarType::Char { .. }) {
bail_unsupported!("char list");
}
Ok(HirScalarExpr::CallVariadic {
func: VariadicFunc::ListCreate { elem_type },
exprs,
}
.into())
}
fn plan_map(
ecx: &ExprContext,
entries: &[MapEntry<Aug>],
type_hint: Option<&ScalarType>,
) -> Result<CoercibleScalarExpr, PlanError> {
let (value_type, exprs) = if entries.is_empty() {
if let Some(ScalarType::Map { value_type, .. }) = type_hint {
(value_type.without_modifiers(), vec![])
} else {
sql_bail!("cannot determine type of empty map");
}
} else {
let type_hint = match type_hint {
Some(ScalarType::Map { value_type, .. }) => Some(&**value_type),
_ => None,
};
let mut keys = vec![];
let mut values = vec![];
for MapEntry { key, value } in entries {
let key = plan_expr(ecx, key)?.type_as(ecx, &ScalarType::String)?;
let value = match value {
Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
_ => plan_expr(ecx, value)?,
};
keys.push(key);
values.push(value);
}
let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
let value_type = ecx.scalar_type(&values[0]).without_modifiers();
let out = itertools::interleave(keys, values).collect();
(value_type, out)
};
if matches!(value_type, ScalarType::Char { .. }) {
bail_unsupported!("char map");
}
let expr = HirScalarExpr::CallVariadic {
func: VariadicFunc::MapBuild { value_type },
exprs,
};
Ok(expr.into())
}
pub fn coerce_homogeneous_exprs(
ecx: &ExprContext,
exprs: Vec<CoercibleScalarExpr>,
force_type: Option<&ScalarType>,
) -> Result<Vec<HirScalarExpr>, PlanError> {
assert!(!exprs.is_empty());
let target_holder;
let target = match force_type {
Some(t) => t,
None => {
let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
target_holder = typeconv::guess_best_common_type(ecx, &types)?;
&target_holder
}
};
let mut out = Vec::new();
for expr in exprs {
let arg = typeconv::plan_coerce(ecx, expr, target)?;
let ccx = match force_type {
None => CastContext::Implicit,
Some(_) => CastContext::Explicit,
};
match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
Ok(expr) => out.push(expr),
Err(_) => sql_bail!(
"{} could not convert type {} to {}",
ecx.name,
ecx.humanize_scalar_type(&ecx.scalar_type(&arg)),
ecx.humanize_scalar_type(target),
),
}
}
Ok(out)
}
pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
obe: &OrderByExpr<T>,
column: usize,
) -> ColumnOrder {
let desc = !obe.asc.unwrap_or(true);
ColumnOrder {
column,
desc,
nulls_last: obe.nulls_last.unwrap_or(!desc),
}
}
fn plan_function_order_by(
ecx: &ExprContext,
order_by: &[OrderByExpr<Aug>],
) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
let mut order_by_exprs = vec![];
let mut col_orders = vec![];
{
for (i, obe) in order_by.iter().enumerate() {
let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
order_by_exprs.push(expr);
col_orders.push(resolve_desc_and_nulls_last(obe, i));
}
}
Ok((order_by_exprs, col_orders))
}
fn plan_aggregate_common(
ecx: &ExprContext,
Function::<Aug> {
name,
args,
filter,
over: _,
distinct,
}: &Function<Aug>,
) -> Result<AggregateExpr, PlanError> {
let impls = match resolve_func(ecx, name, args)? {
Func::Aggregate(impls) => impls,
_ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
};
let (args, order_by) = match &args {
FunctionArgs::Star => (vec![], vec![]),
FunctionArgs::Args { args, order_by } => {
if args.is_empty() {
sql_bail!(
"{}(*) must be used to call a parameterless aggregate function",
ecx.qcx
.scx
.humanize_resolved_name(name)
.expect("name actually resolved")
);
}
let args = plan_exprs(ecx, args)?;
(args, order_by.clone())
}
};
let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
if let Some(filter) = &filter {
let cond = plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &ScalarType::Bool)?;
let expr_typ = ecx.scalar_type(&expr);
expr = HirScalarExpr::If {
cond: Box::new(cond),
then: Box::new(expr),
els: Box::new(HirScalarExpr::literal(func.identity_datum(), expr_typ)),
};
}
let mut seen_outer = false;
let mut seen_inner = false;
#[allow(deprecated)]
expr.visit_columns(0, &mut |depth, col| {
if depth == 0 && col.level == 0 {
seen_inner = true;
} else if col.level > depth {
seen_outer = true;
}
});
if seen_outer && !seen_inner {
bail_unsupported!(
3720,
"aggregate functions that refer exclusively to outer columns"
);
}
if func.is_order_sensitive() {
let field_names = iter::repeat(ColumnName::from(""))
.take(1 + order_by_exprs.len())
.collect();
let mut exprs = vec![expr];
exprs.extend(order_by_exprs);
expr = HirScalarExpr::CallVariadic {
func: VariadicFunc::RecordCreate { field_names },
exprs,
};
}
Ok(AggregateExpr {
func,
expr: Box::new(expr),
distinct: *distinct,
})
}
fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
let mut names = names.to_vec();
let col_name = normalize::column_name(names.pop().unwrap());
if !names.is_empty() {
let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
let i = ecx
.scope
.resolve_table_column(&ecx.qcx.outer_scopes, &table_name, &col_name)?;
return Ok(HirScalarExpr::Column(i));
}
let similar_names = match ecx.scope.resolve_column(&ecx.qcx.outer_scopes, &col_name) {
Ok(i) => return Ok(HirScalarExpr::Column(i)),
Err(PlanError::UnknownColumn { similar, .. }) => similar,
Err(e) => return Err(e),
};
let items = ecx.scope.items_from_table(
&ecx.qcx.outer_scopes,
&PartialItemName {
database: None,
schema: None,
item: col_name.as_str().to_owned(),
},
)?;
match items.as_slice() {
[] => Err(PlanError::UnknownColumn {
table: None,
column: col_name,
similar: similar_names,
}),
[(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::Column(*column)),
_ => {
let mut has_exists_column = None;
let (exprs, field_names): (Vec<_>, Vec<_>) = items
.into_iter()
.filter_map(|(column, item)| {
if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
has_exists_column = Some(column);
None
} else {
let expr = HirScalarExpr::Column(column);
let name = item.column_name.clone();
Some((expr, name))
}
})
.unzip();
let expr = if exprs.len() == 1 && has_exists_column.is_some() {
exprs.into_element()
} else {
HirScalarExpr::CallVariadic {
func: VariadicFunc::RecordCreate { field_names },
exprs,
}
};
if let Some(has_exists_column) = has_exists_column {
Ok(HirScalarExpr::If {
cond: Box::new(HirScalarExpr::CallUnary {
func: UnaryFunc::IsNull(mz_expr::func::IsNull),
expr: Box::new(HirScalarExpr::Column(has_exists_column)),
}),
then: Box::new(HirScalarExpr::literal_null(ecx.scalar_type(&expr))),
els: Box::new(expr),
})
} else {
Ok(expr)
}
}
}
}
fn plan_op(
ecx: &ExprContext,
op: &str,
expr1: &Expr<Aug>,
expr2: Option<&Expr<Aug>>,
) -> Result<HirScalarExpr, PlanError> {
let impls = func::resolve_op(op)?;
let args = match expr2 {
None => plan_exprs(ecx, &[expr1])?,
Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
};
func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
}
fn plan_function<'a>(
ecx: &ExprContext,
f @ Function {
name,
args,
filter,
over,
distinct,
}: &'a Function<Aug>,
) -> Result<HirScalarExpr, PlanError> {
let impls = match resolve_func(ecx, name, args)? {
Func::Table(_) => {
sql_bail!(
"table functions are not allowed in {} (function {})",
ecx.name,
name
);
}
Func::Scalar(impls) => {
if over.is_some() {
sql_bail!("OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations).");
}
impls
}
Func::ScalarWindow(impls) => {
let (
ignore_nulls,
order_by_exprs,
col_orders,
_window_frame,
partition_by,
scalar_args,
) = plan_window_function_non_aggr(ecx, f)?;
if !scalar_args.is_empty() {
if let ResolvedItemName::Item {
full_name: FullItemName { item, .. },
..
} = name
{
sql_bail!(
"function {} has 0 parameters, but was called with {}",
item,
scalar_args.len()
);
}
}
let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
if ignore_nulls {
bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
}
return Ok(HirScalarExpr::Windowing(WindowExpr {
func: WindowExprType::Scalar(ScalarWindowExpr {
func,
order_by: col_orders,
}),
partition_by,
order_by: order_by_exprs,
}));
}
Func::ValueWindow(impls) => {
let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
plan_window_function_non_aggr(ecx, f)?;
let (args_encoded, func) =
func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
if ignore_nulls {
match func {
ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
_ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
}
}
return Ok(HirScalarExpr::Windowing(WindowExpr {
func: WindowExprType::Value(ValueWindowExpr {
func,
args: Box::new(args_encoded),
order_by: col_orders,
window_frame,
ignore_nulls, }),
partition_by,
order_by: order_by_exprs,
}));
}
Func::Aggregate(_) => {
if f.over.is_none() {
if ecx.allow_aggregates {
sql_bail!(
"Internal error: encountered unplanned non-windowed aggregate function: {:?}",
name,
);
} else {
sql_bail!(
"aggregate functions are not allowed in {} (function {})",
ecx.name,
name
);
}
} else {
let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
plan_window_function_common(ecx, &f.name, &f.over)?;
match (&window_frame.start_bound, &window_frame.end_bound) {
(
mz_expr::WindowFrameBound::UnboundedPreceding,
mz_expr::WindowFrameBound::OffsetPreceding(..),
)
| (
mz_expr::WindowFrameBound::UnboundedPreceding,
mz_expr::WindowFrameBound::OffsetFollowing(..),
)
| (
mz_expr::WindowFrameBound::OffsetPreceding(..),
mz_expr::WindowFrameBound::UnboundedFollowing,
)
| (
mz_expr::WindowFrameBound::OffsetFollowing(..),
mz_expr::WindowFrameBound::UnboundedFollowing,
) => bail_unsupported!("mixed unbounded - offset frames"),
(_, _) => {} }
if ignore_nulls {
bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
}
let aggregate_expr = plan_aggregate_common(ecx, f)?;
if aggregate_expr.distinct {
bail_unsupported!("DISTINCT in window aggregates");
}
return Ok(HirScalarExpr::Windowing(WindowExpr {
func: WindowExprType::Aggregate(AggregateWindowExpr {
aggregate_expr,
order_by: col_orders,
window_frame,
}),
partition_by,
order_by: order_by_exprs,
}));
}
}
};
if over.is_some() {
unreachable!("If there is an OVER clause, we should have returned already above.");
}
if *distinct {
sql_bail!(
"DISTINCT specified, but {} is not an aggregate function",
ecx.qcx
.scx
.humanize_resolved_name(name)
.expect("already resolved")
);
}
if filter.is_some() {
sql_bail!(
"FILTER specified, but {} is not an aggregate function",
ecx.qcx
.scx
.humanize_resolved_name(name)
.expect("already resolved")
);
}
let scalar_args = match &args {
FunctionArgs::Star => {
sql_bail!(
"* argument is invalid with non-aggregate function {}",
ecx.qcx
.scx
.humanize_resolved_name(name)
.expect("already resolved")
)
}
FunctionArgs::Args { args, order_by } => {
if !order_by.is_empty() {
sql_bail!(
"ORDER BY specified, but {} is not an aggregate function",
ecx.qcx
.scx
.humanize_resolved_name(name)
.expect("already resolved")
);
}
plan_exprs(ecx, args)?
}
};
func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
}
pub const IGNORE_NULLS_ERROR_MSG: &str =
"IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
pub fn resolve_func(
ecx: &ExprContext,
name: &ResolvedItemName,
args: &mz_sql_parser::ast::FunctionArgs<Aug>,
) -> Result<&'static Func, PlanError> {
if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
if let Ok(f) = i.func() {
return Ok(f);
}
}
let cexprs = match args {
mz_sql_parser::ast::FunctionArgs::Star => vec![],
mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
if !order_by.is_empty() {
sql_bail!(
"ORDER BY specified, but {} is not an aggregate function",
name
);
}
plan_exprs(ecx, args)?
}
};
let arg_types: Vec<_> = cexprs
.into_iter()
.map(|ty| match ecx.scalar_type(&ty) {
CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty),
CoercibleScalarType::Record(_) => "record".to_string(),
CoercibleScalarType::Uncoerced => "unknown".to_string(),
})
.collect();
Err(PlanError::UnknownFunction {
name: name.to_string(),
arg_types,
})
}
fn plan_is_expr<'a>(
ecx: &ExprContext,
expr: &'a Expr<Aug>,
construct: &IsExprConstruct<Aug>,
not: bool,
) -> Result<HirScalarExpr, PlanError> {
let expr = plan_expr(ecx, expr)?;
let mut expr = match construct {
IsExprConstruct::Null => {
let expr = expr.type_as_any(ecx)?;
expr.call_is_null()
}
IsExprConstruct::Unknown => {
let expr = expr.type_as(ecx, &ScalarType::Bool)?;
expr.call_is_null()
}
IsExprConstruct::True => {
let expr = expr.type_as(ecx, &ScalarType::Bool)?;
expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
}
IsExprConstruct::False => {
let expr = expr.type_as(ecx, &ScalarType::Bool)?;
expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
}
IsExprConstruct::DistinctFrom(expr2) => {
let expr1 = expr.type_as_any(ecx)?;
let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
let term1 = HirScalarExpr::variadic_or(vec![
expr1.clone().call_binary(expr2.clone(), BinaryFunc::NotEq),
expr1.clone().call_is_null(),
expr2.clone().call_is_null(),
]);
let term2 = HirScalarExpr::variadic_or(vec![
expr1.call_is_null().not(),
expr2.call_is_null().not(),
]);
term1.and(term2)
}
};
if not {
expr = expr.not();
}
Ok(expr)
}
fn plan_case<'a>(
ecx: &ExprContext,
operand: &'a Option<Box<Expr<Aug>>>,
conditions: &'a [Expr<Aug>],
results: &'a [Expr<Aug>],
else_result: &'a Option<Box<Expr<Aug>>>,
) -> Result<HirScalarExpr, PlanError> {
let mut cond_exprs = Vec::new();
let mut result_exprs = Vec::new();
for (c, r) in conditions.iter().zip(results) {
let c = match operand {
Some(operand) => operand.clone().equals(c.clone()),
None => c.clone(),
};
let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &ScalarType::Bool)?;
cond_exprs.push(cexpr);
result_exprs.push(r);
}
result_exprs.push(match else_result {
Some(else_result) => else_result,
None => &Expr::Value(Value::Null),
});
let mut result_exprs = coerce_homogeneous_exprs(
&ecx.with_name("CASE"),
plan_exprs(ecx, &result_exprs)?,
None,
)?;
let mut expr = result_exprs.pop().unwrap();
assert_eq!(cond_exprs.len(), result_exprs.len());
for (cexpr, rexpr) in cond_exprs.into_iter().zip(result_exprs).rev() {
expr = HirScalarExpr::If {
cond: Box::new(cexpr),
then: Box::new(rexpr),
els: Box::new(expr),
}
}
Ok(expr)
}
fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
let (datum, scalar_type) = match l {
Value::Number(s) => {
let d = strconv::parse_numeric(s.as_str())?;
if !s.contains(&['E', '.'][..]) {
if let Ok(n) = d.0.try_into() {
(Datum::Int32(n), ScalarType::Int32)
} else if let Ok(n) = d.0.try_into() {
(Datum::Int64(n), ScalarType::Int64)
} else {
(Datum::Numeric(d), ScalarType::Numeric { max_scale: None })
}
} else {
(Datum::Numeric(d), ScalarType::Numeric { max_scale: None })
}
}
Value::HexString(_) => bail_unsupported!("hex string literals"),
Value::Boolean(b) => match b {
false => (Datum::False, ScalarType::Bool),
true => (Datum::True, ScalarType::Bool),
},
Value::Interval(i) => {
let i = literal::plan_interval(i)?;
(Datum::Interval(i), ScalarType::Interval)
}
Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
};
let expr = HirScalarExpr::literal(datum, scalar_type);
Ok(expr.into())
}
fn plan_window_function_non_aggr<'a>(
ecx: &ExprContext,
Function {
name,
args,
filter,
over,
distinct,
}: &'a Function<Aug>,
) -> Result<
(
bool,
Vec<HirScalarExpr>,
Vec<ColumnOrder>,
mz_expr::WindowFrame,
Vec<HirScalarExpr>,
Vec<CoercibleScalarExpr>,
),
PlanError,
> {
let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
plan_window_function_common(ecx, name, over)?;
if *distinct {
sql_bail!(
"DISTINCT specified, but {} is not an aggregate function",
name
);
}
if filter.is_some() {
bail_unsupported!("FILTER in non-aggregate window functions");
}
let scalar_args = match &args {
FunctionArgs::Star => {
sql_bail!("* argument is invalid with non-aggregate function {}", name)
}
FunctionArgs::Args { args, order_by } => {
if !order_by.is_empty() {
sql_bail!(
"ORDER BY specified, but {} is not an aggregate function",
name
);
}
plan_exprs(ecx, args)?
}
};
Ok((
ignore_nulls,
order_by_exprs,
col_orders,
window_frame,
partition,
scalar_args,
))
}
fn plan_window_function_common(
ecx: &ExprContext,
name: &<Aug as AstInfo>::ItemName,
over: &Option<WindowSpec<Aug>>,
) -> Result<
(
bool,
Vec<HirScalarExpr>,
Vec<ColumnOrder>,
mz_expr::WindowFrame,
Vec<HirScalarExpr>,
),
PlanError,
> {
if !ecx.allow_windows {
sql_bail!(
"window functions are not allowed in {} (function {})",
ecx.name,
name
);
}
let window_spec = match over.as_ref() {
Some(over) => over,
None => sql_bail!("window function {} requires an OVER clause", name),
};
if window_spec.ignore_nulls && window_spec.respect_nulls {
sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
}
let window_frame = match window_spec.window_frame.as_ref() {
Some(frame) => plan_window_frame(frame)?,
None => mz_expr::WindowFrame::default(),
};
let mut partition = Vec::new();
for expr in &window_spec.partition_by {
partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
}
let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
Ok((
window_spec.ignore_nulls,
order_by_exprs,
col_orders,
window_frame,
partition,
))
}
fn plan_window_frame(
WindowFrame {
units,
start_bound,
end_bound,
}: &WindowFrame,
) -> Result<mz_expr::WindowFrame, PlanError> {
use mz_expr::WindowFrameBound::*;
let units = window_frame_unit_ast_to_expr(units)?;
let start_bound = window_frame_bound_ast_to_expr(start_bound);
let end_bound = end_bound
.as_ref()
.map(window_frame_bound_ast_to_expr)
.unwrap_or(CurrentRow);
match (&start_bound, &end_bound) {
(UnboundedFollowing, _) => {
sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
}
(_, UnboundedPreceding) => {
sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
}
(CurrentRow, OffsetPreceding(_)) => {
sql_bail!("frame starting from current row cannot have preceding rows")
}
(OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
sql_bail!("frame starting from following row cannot have preceding rows")
}
(OffsetPreceding(o1), OffsetFollowing(o2)) => {
if *o1 > 1000000 || *o2 > 1000000 {
sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
}
}
(OffsetPreceding(o1), OffsetPreceding(o2)) => {
if *o1 > 1000000 || *o2 > 1000000 {
sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
}
}
(OffsetFollowing(o1), OffsetFollowing(o2)) => {
if *o1 > 1000000 || *o2 > 1000000 {
sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
}
}
(OffsetPreceding(o), CurrentRow) => {
if *o > 1000000 {
sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
}
}
(CurrentRow, OffsetFollowing(o)) => {
if *o > 1000000 {
sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
}
}
(_, _) => (),
}
if units == mz_expr::WindowFrameUnits::Range
&& (start_bound != UnboundedPreceding || end_bound != CurrentRow)
{
bail_unsupported!("RANGE in non-default window frames")
}
let frame = mz_expr::WindowFrame {
units,
start_bound,
end_bound,
};
Ok(frame)
}
fn window_frame_unit_ast_to_expr(
unit: &WindowFrameUnits,
) -> Result<mz_expr::WindowFrameUnits, PlanError> {
match unit {
WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
}
}
fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
match bound {
WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
WindowFrameBound::Preceding(Some(offset)) => {
mz_expr::WindowFrameBound::OffsetPreceding(*offset)
}
WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
WindowFrameBound::Following(Some(offset)) => {
mz_expr::WindowFrameBound::OffsetFollowing(*offset)
}
}
}
pub fn scalar_type_from_sql(
scx: &StatementContext,
data_type: &ResolvedDataType,
) -> Result<ScalarType, PlanError> {
match data_type {
ResolvedDataType::AnonymousList(elem_type) => {
let elem_type = scalar_type_from_sql(scx, elem_type)?;
if matches!(elem_type, ScalarType::Char { .. }) {
bail_unsupported!("char list");
}
Ok(ScalarType::List {
element_type: Box::new(elem_type),
custom_id: None,
})
}
ResolvedDataType::AnonymousMap {
key_type,
value_type,
} => {
match scalar_type_from_sql(scx, key_type)? {
ScalarType::String => {}
other => sql_bail!(
"map key type must be {}, got {}",
scx.humanize_scalar_type(&ScalarType::String),
scx.humanize_scalar_type(&other)
),
}
Ok(ScalarType::Map {
value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
custom_id: None,
})
}
ResolvedDataType::Named { id, modifiers, .. } => {
scalar_type_from_catalog(scx.catalog, *id, modifiers)
}
ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
}
}
pub fn scalar_type_from_catalog(
catalog: &dyn SessionCatalog,
id: CatalogItemId,
modifiers: &[i64],
) -> Result<ScalarType, PlanError> {
let entry = catalog.get_item(&id);
let type_details = match entry.type_details() {
Some(type_details) => type_details,
None => {
sql_bail!(
"internal error: {} does not refer to a type",
catalog.resolve_full_name(entry.name()).to_string().quoted()
);
}
};
match &type_details.typ {
CatalogType::Numeric => {
let mut modifiers = modifiers.iter().fuse();
let precision = match modifiers.next() {
Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
sql_bail!(
"precision for type numeric must be between 1 and {}",
NUMERIC_DATUM_MAX_PRECISION,
);
}
Some(p) => Some(*p),
None => None,
};
let scale = match modifiers.next() {
Some(scale) => {
if let Some(precision) = precision {
if *scale > precision {
sql_bail!(
"scale for type numeric must be between 0 and precision {}",
precision
);
}
}
Some(NumericMaxScale::try_from(*scale)?)
}
None => None,
};
if modifiers.next().is_some() {
sql_bail!("type numeric supports at most two type modifiers");
}
Ok(ScalarType::Numeric { max_scale: scale })
}
CatalogType::Char => {
let mut modifiers = modifiers.iter().fuse();
let length = match modifiers.next() {
Some(l) => Some(CharLength::try_from(*l)?),
None => Some(CharLength::ONE),
};
if modifiers.next().is_some() {
sql_bail!("type character supports at most one type modifier");
}
Ok(ScalarType::Char { length })
}
CatalogType::VarChar => {
let mut modifiers = modifiers.iter().fuse();
let length = match modifiers.next() {
Some(l) => Some(VarCharMaxLength::try_from(*l)?),
None => None,
};
if modifiers.next().is_some() {
sql_bail!("type character varying supports at most one type modifier");
}
Ok(ScalarType::VarChar { max_length: length })
}
CatalogType::Timestamp => {
let mut modifiers = modifiers.iter().fuse();
let precision = match modifiers.next() {
Some(p) => Some(TimestampPrecision::try_from(*p)?),
None => None,
};
if modifiers.next().is_some() {
sql_bail!("type timestamp supports at most one type modifier");
}
Ok(ScalarType::Timestamp { precision })
}
CatalogType::TimestampTz => {
let mut modifiers = modifiers.iter().fuse();
let precision = match modifiers.next() {
Some(p) => Some(TimestampPrecision::try_from(*p)?),
None => None,
};
if modifiers.next().is_some() {
sql_bail!("type timestamp with time zone supports at most one type modifier");
}
Ok(ScalarType::TimestampTz { precision })
}
t => {
if !modifiers.is_empty() {
sql_bail!(
"{} does not support type modifiers",
catalog.resolve_full_name(entry.name()).to_string()
);
}
match t {
CatalogType::Array {
element_reference: element_id,
} => Ok(ScalarType::Array(Box::new(scalar_type_from_catalog(
catalog,
*element_id,
modifiers,
)?))),
CatalogType::List {
element_reference: element_id,
element_modifiers,
} => Ok(ScalarType::List {
element_type: Box::new(scalar_type_from_catalog(
catalog,
*element_id,
element_modifiers,
)?),
custom_id: Some(id),
}),
CatalogType::Map {
key_reference: _,
key_modifiers: _,
value_reference: value_id,
value_modifiers,
} => Ok(ScalarType::Map {
value_type: Box::new(scalar_type_from_catalog(
catalog,
*value_id,
value_modifiers,
)?),
custom_id: Some(id),
}),
CatalogType::Range {
element_reference: element_id,
} => Ok(ScalarType::Range {
element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
}),
CatalogType::Record { fields } => {
let scalars: Box<[(ColumnName, ColumnType)]> = fields
.iter()
.map(|f| {
let scalar_type = scalar_type_from_catalog(
catalog,
f.type_reference,
&f.type_modifiers,
)?;
Ok((
f.name.clone(),
ColumnType {
scalar_type,
nullable: true,
},
))
})
.collect::<Result<Box<_>, PlanError>>()?;
Ok(ScalarType::Record {
fields: scalars,
custom_id: Some(id),
})
}
CatalogType::AclItem => Ok(ScalarType::AclItem),
CatalogType::Bool => Ok(ScalarType::Bool),
CatalogType::Bytes => Ok(ScalarType::Bytes),
CatalogType::Date => Ok(ScalarType::Date),
CatalogType::Float32 => Ok(ScalarType::Float32),
CatalogType::Float64 => Ok(ScalarType::Float64),
CatalogType::Int16 => Ok(ScalarType::Int16),
CatalogType::Int32 => Ok(ScalarType::Int32),
CatalogType::Int64 => Ok(ScalarType::Int64),
CatalogType::UInt16 => Ok(ScalarType::UInt16),
CatalogType::UInt32 => Ok(ScalarType::UInt32),
CatalogType::UInt64 => Ok(ScalarType::UInt64),
CatalogType::MzTimestamp => Ok(ScalarType::MzTimestamp),
CatalogType::Interval => Ok(ScalarType::Interval),
CatalogType::Jsonb => Ok(ScalarType::Jsonb),
CatalogType::Oid => Ok(ScalarType::Oid),
CatalogType::PgLegacyChar => Ok(ScalarType::PgLegacyChar),
CatalogType::PgLegacyName => Ok(ScalarType::PgLegacyName),
CatalogType::Pseudo => {
sql_bail!(
"cannot reference pseudo type {}",
catalog.resolve_full_name(entry.name()).to_string()
)
}
CatalogType::RegClass => Ok(ScalarType::RegClass),
CatalogType::RegProc => Ok(ScalarType::RegProc),
CatalogType::RegType => Ok(ScalarType::RegType),
CatalogType::String => Ok(ScalarType::String),
CatalogType::Time => Ok(ScalarType::Time),
CatalogType::Uuid => Ok(ScalarType::Uuid),
CatalogType::Int2Vector => Ok(ScalarType::Int2Vector),
CatalogType::MzAclItem => Ok(ScalarType::MzAclItem),
CatalogType::Numeric => unreachable!("handled above"),
CatalogType::Char => unreachable!("handled above"),
CatalogType::VarChar => unreachable!("handled above"),
CatalogType::Timestamp => unreachable!("handled above"),
CatalogType::TimestampTz => unreachable!("handled above"),
}
}
}
}
struct AggregateTableFuncVisitor<'a> {
scx: &'a StatementContext<'a>,
aggs: Vec<Function<Aug>>,
within_aggregate: bool,
tables: BTreeMap<Function<Aug>, String>,
table_disallowed_context: Vec<&'static str>,
in_select_item: bool,
err: Option<PlanError>,
}
impl<'a> AggregateTableFuncVisitor<'a> {
fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
AggregateTableFuncVisitor {
scx,
aggs: Vec::new(),
within_aggregate: false,
tables: BTreeMap::new(),
table_disallowed_context: Vec::new(),
in_select_item: false,
err: None,
}
}
fn into_result(
self,
) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
match self.err {
Some(err) => Err(err),
None => {
let mut seen = BTreeSet::new();
let aggs = self
.aggs
.into_iter()
.filter(move |agg| seen.insert(agg.clone()))
.collect();
Ok((aggs, self.tables))
}
}
}
}
impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
let item = match self.scx.get_item_by_resolved_name(&func.name) {
Ok(i) => i,
Err(_) => return,
};
match item.func() {
Ok(Func::Aggregate { .. }) if func.over.is_none() => {
if self.within_aggregate {
self.err = Some(sql_err!("nested aggregate functions are not allowed",));
return;
}
self.aggs.push(func.clone());
let Function {
name: _,
args,
filter,
over: _,
distinct: _,
} = func;
if let Some(filter) = filter {
self.visit_expr_mut(filter);
}
let old_within_aggregate = self.within_aggregate;
self.within_aggregate = true;
self.table_disallowed_context
.push("aggregate function calls");
self.visit_function_args_mut(args);
self.within_aggregate = old_within_aggregate;
self.table_disallowed_context.pop();
}
Ok(Func::Table { .. }) => {
self.table_disallowed_context.push("other table functions");
visit_mut::visit_function_mut(self, func);
self.table_disallowed_context.pop();
}
_ => visit_mut::visit_function_mut(self, func),
}
}
fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
}
fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
let (disallowed_context, func) = match expr {
Expr::Case { .. } => (Some("CASE"), None),
Expr::HomogenizingFunction {
function: HomogenizingFunction::Coalesce,
..
} => (Some("COALESCE"), None),
Expr::Function(func) if self.in_select_item => {
let mut table_func = None;
if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
if let Ok(Func::Table { .. }) = item.func() {
if let Some(context) = self.table_disallowed_context.last() {
self.err = Some(sql_err!(
"table functions are not allowed in {} (function {})",
context,
func.name
));
return;
}
table_func = Some(func.clone());
}
}
(None, table_func)
}
_ => (None, None),
};
if let Some(func) = func {
visit_mut::visit_expr_mut(self, expr);
if let Function {
name: _,
args: _,
filter: None,
over: None,
distinct: false,
} = &func
{
let id = self
.tables
.entry(func)
.or_insert_with(|| format!("table_func_{}", Uuid::new_v4()));
*expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
}
}
if let Some(context) = disallowed_context {
self.table_disallowed_context.push(context);
}
visit_mut::visit_expr_mut(self, expr);
if disallowed_context.is_some() {
self.table_disallowed_context.pop();
}
}
fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
let old = self.in_select_item;
self.in_select_item = true;
visit_mut::visit_select_item_mut(self, si);
self.in_select_item = old;
}
}
#[derive(Default)]
struct WindowFuncCollector {
window_funcs: Vec<Expr<Aug>>,
}
impl WindowFuncCollector {
fn into_result(self) -> Vec<Expr<Aug>> {
let mut seen = BTreeSet::new();
let window_funcs_dedupped = self
.window_funcs
.into_iter()
.filter(move |expr| seen.insert(expr.clone()))
.rev()
.collect();
window_funcs_dedupped
}
}
impl Visit<'_, Aug> for WindowFuncCollector {
fn visit_expr(&mut self, expr: &Expr<Aug>) {
match expr {
Expr::Function(func) => {
if func.over.is_some() {
self.window_funcs.push(expr.clone());
}
}
_ => (),
}
visit::visit_expr(self, expr);
}
fn visit_query(&mut self, _query: &Query<Aug>) {
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum QueryLifetime {
OneShot,
Index,
MaterializedView,
Subscribe,
View,
Source,
}
impl QueryLifetime {
pub fn is_one_shot(&self) -> bool {
let result = match self {
QueryLifetime::OneShot => true,
QueryLifetime::Index => false,
QueryLifetime::MaterializedView => false,
QueryLifetime::Subscribe => false,
QueryLifetime::View => false,
QueryLifetime::Source => false,
};
assert_eq!(!result, self.is_maintained());
result
}
pub fn is_maintained(&self) -> bool {
match self {
QueryLifetime::OneShot => false,
QueryLifetime::Index => true,
QueryLifetime::MaterializedView => true,
QueryLifetime::Subscribe => true,
QueryLifetime::View => true,
QueryLifetime::Source => true,
}
}
pub fn allow_show(&self) -> bool {
match self {
QueryLifetime::OneShot => true,
QueryLifetime::Index => false,
QueryLifetime::MaterializedView => false,
QueryLifetime::Subscribe => true, QueryLifetime::View => false,
QueryLifetime::Source => false,
}
}
}
#[derive(Debug, Clone)]
pub struct CteDesc {
pub name: String,
pub desc: RelationDesc,
}
#[derive(Debug, Clone)]
pub struct QueryContext<'a> {
pub scx: &'a StatementContext<'a>,
pub lifetime: QueryLifetime,
pub outer_scopes: Vec<Scope>,
pub outer_relation_types: Vec<RelationType>,
pub ctes: BTreeMap<LocalId, CteDesc>,
pub recursion_guard: RecursionGuard,
}
impl CheckedRecursion for QueryContext<'_> {
fn recursion_guard(&self) -> &RecursionGuard {
&self.recursion_guard
}
}
impl<'a> QueryContext<'a> {
pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
QueryContext {
scx,
lifetime,
outer_scopes: vec![],
outer_relation_types: vec![],
ctes: BTreeMap::new(),
recursion_guard: RecursionGuard::with_limit(1024), }
}
fn relation_type(&self, expr: &HirRelationExpr) -> RelationType {
expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
}
fn derived_context(&self, scope: Scope, relation_type: RelationType) -> QueryContext<'a> {
let ctes = self.ctes.clone();
let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
let outer_relation_types = iter::once(relation_type)
.chain(self.outer_relation_types.clone())
.collect();
QueryContext {
scx: self.scx,
lifetime: self.lifetime,
outer_scopes,
outer_relation_types,
ctes,
recursion_guard: self.recursion_guard.clone(),
}
}
fn empty_derived_context(&self) -> QueryContext<'a> {
let scope = Scope::empty();
let ty = RelationType::empty();
self.derived_context(scope, ty)
}
pub fn resolve_table_name(
&self,
object: ResolvedItemName,
) -> Result<(HirRelationExpr, Scope), PlanError> {
match object {
ResolvedItemName::Item {
id,
full_name,
version,
..
} => {
let name = full_name.into();
let item = self.scx.get_item(&id).at_version(version);
let desc = item
.desc(&self.scx.catalog.resolve_full_name(item.name()))?
.clone();
let expr = HirRelationExpr::Get {
id: Id::Global(item.global_id()),
typ: desc.typ().clone(),
};
let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
Ok((expr, scope))
}
ResolvedItemName::Cte { id, name } => {
let name = name.into();
let cte = self.ctes.get(&id).unwrap();
let expr = HirRelationExpr::Get {
id: Id::Local(id),
typ: cte.desc.typ().clone(),
};
let scope = Scope::from_source(Some(name), cte.desc.iter_names());
Ok((expr, scope))
}
ResolvedItemName::ContinualTask { id, name } => {
let cte = self.ctes.get(&id).unwrap();
let expr = HirRelationExpr::Get {
id: Id::Local(id),
typ: cte.desc.typ().clone(),
};
let scope = Scope::from_source(Some(name), cte.desc.iter_names());
Ok((expr, scope))
}
ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
}
}
pub fn humanize_scalar_type(&self, typ: &ScalarType) -> String {
self.scx.humanize_scalar_type(typ)
}
}
#[derive(Debug, Clone)]
pub struct ExprContext<'a> {
pub qcx: &'a QueryContext<'a>,
pub name: &'a str,
pub scope: &'a Scope,
pub relation_type: &'a RelationType,
pub allow_aggregates: bool,
pub allow_subqueries: bool,
pub allow_parameters: bool,
pub allow_windows: bool,
}
impl CheckedRecursion for ExprContext<'_> {
fn recursion_guard(&self) -> &RecursionGuard {
&self.qcx.recursion_guard
}
}
impl<'a> ExprContext<'a> {
pub fn catalog(&self) -> &dyn SessionCatalog {
self.qcx.scx.catalog
}
pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
let mut ecx = self.clone();
ecx.name = name;
ecx
}
pub fn column_type<E>(&self, expr: &E) -> E::Type
where
E: AbstractExpr,
{
expr.typ(
&self.qcx.outer_relation_types,
self.relation_type,
&self.qcx.scx.param_types.borrow(),
)
}
pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
where
E: AbstractExpr,
{
self.column_type(expr).scalar_type()
}
fn derived_query_context(&self) -> QueryContext {
let mut scope = self.scope.clone();
scope.lateral_barrier = true;
self.qcx.derived_context(scope, self.relation_type.clone())
}
pub fn require_feature_flag(&self, flag: &FeatureFlag) -> Result<(), PlanError> {
self.qcx.scx.require_feature_flag(flag)
}
pub fn param_types(&self) -> &RefCell<BTreeMap<usize, ScalarType>> {
&self.qcx.scx.param_types
}
pub fn humanize_scalar_type(&self, typ: &ScalarType) -> String {
self.qcx.scx.humanize_scalar_type(typ)
}
}