1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50 ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51 MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
56 TableFunc, func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
70use mz_repr::{
71 CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
72 ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
73 UNKNOWN_COLUMN_NAME, strconv,
74};
75use mz_sql_parser::ast::display::AstDisplay;
76use mz_sql_parser::ast::visit::Visit;
77use mz_sql_parser::ast::visit_mut::{self, VisitMut};
78use mz_sql_parser::ast::{
79 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
80 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
81 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
82 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
83 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
84 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
85 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
86 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
87};
88use mz_sql_parser::ident;
89
90use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
91use crate::func::{self, Func, FuncSpec, TableFuncImpl};
92use crate::names::{
93 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
94};
95use crate::plan::PlanError::InvalidWmrRecursionLimit;
96use crate::plan::error::PlanError;
97use crate::plan::hir::{
98 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
99 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
100 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
101 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
102};
103use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
104use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
105use crate::plan::statement::{StatementContext, StatementDesc, show};
106use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
107use crate::plan::{
108 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
109 literal, transform_ast,
110};
111use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
112use crate::session::vars::{self, FeatureFlag};
113use crate::{ORDINALITY_COL_NAME, normalize};
114
115#[derive(Debug)]
116pub struct PlannedRootQuery<E> {
117 pub expr: E,
118 pub desc: RelationDesc,
119 pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
120 pub scope: Scope,
121}
122
123#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
132pub fn plan_root_query(
133 scx: &StatementContext,
134 mut query: Query<Aug>,
135 lifetime: QueryLifetime,
136) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
137 transform_ast::transform(scx, &mut query)?;
138 let mut qcx = QueryContext::root(scx, lifetime);
139 let PlannedQuery {
140 mut expr,
141 scope,
142 order_by,
143 limit,
144 offset,
145 project,
146 group_size_hints,
147 } = plan_query(&mut qcx, &query)?;
148
149 let mut finishing = RowSetFinishing {
150 limit,
151 offset,
152 project,
153 order_by,
154 };
155
156 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
162
163 if lifetime.is_maintained() {
164 expr.finish_maintained(&mut finishing, group_size_hints);
165 }
166
167 let typ = qcx.relation_type(&expr);
168 let typ = SqlRelationType::new(
169 finishing
170 .project
171 .iter()
172 .map(|i| typ.column_types[*i].clone())
173 .collect(),
174 );
175 let desc = RelationDesc::new(typ, scope.column_names());
176
177 Ok(PlannedRootQuery {
178 expr,
179 desc,
180 finishing,
181 scope,
182 })
183}
184
185#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
187pub fn plan_ct_query(
188 qcx: &mut QueryContext,
189 mut query: Query<Aug>,
190) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
191 transform_ast::transform(qcx.scx, &mut query)?;
192 let PlannedQuery {
193 mut expr,
194 scope,
195 order_by,
196 limit,
197 offset,
198 project,
199 group_size_hints,
200 } = plan_query(qcx, &query)?;
201
202 let mut finishing = RowSetFinishing {
203 limit,
204 offset,
205 project,
206 order_by,
207 };
208
209 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
215
216 expr.finish_maintained(&mut finishing, group_size_hints);
217
218 let typ = qcx.relation_type(&expr);
219 let typ = SqlRelationType::new(
220 finishing
221 .project
222 .iter()
223 .map(|i| typ.column_types[*i].clone())
224 .collect(),
225 );
226 let desc = RelationDesc::new(typ, scope.column_names());
227
228 Ok(PlannedRootQuery {
229 expr,
230 desc,
231 finishing,
232 scope,
233 })
234}
235
236fn try_push_projection_order_by(
247 expr: &mut HirRelationExpr,
248 project: &mut Vec<usize>,
249 order_by: &mut Vec<ColumnOrder>,
250) -> bool {
251 let mut unproject = vec![None; expr.arity()];
252 for (out_i, in_i) in project.iter().copied().enumerate() {
253 unproject[in_i] = Some(out_i);
254 }
255 if order_by
256 .iter()
257 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
258 {
259 let trivial_project = (0..project.len()).collect();
260 *expr = expr.take().project(mem::replace(project, trivial_project));
261 for ob in order_by {
262 ob.column = unproject[ob.column].unwrap();
263 }
264 true
265 } else {
266 false
267 }
268}
269
270pub fn plan_insert_query(
271 scx: &StatementContext,
272 table_name: ResolvedItemName,
273 columns: Vec<Ident>,
274 source: InsertSource<Aug>,
275 returning: Vec<SelectItem<Aug>>,
276) -> Result<
277 (
278 CatalogItemId,
279 HirRelationExpr,
280 PlannedRootQuery<Vec<HirScalarExpr>>,
281 ),
282 PlanError,
283> {
284 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
285 let table = scx.get_item_by_resolved_name(&table_name)?;
286
287 if table.item_type() != CatalogItemType::Table {
289 sql_bail!(
290 "cannot insert into {} '{}'",
291 table.item_type(),
292 table_name.full_name_str()
293 );
294 }
295 let desc = table.relation_desc().expect("table has desc");
296 let mut defaults = table
297 .writable_table_details()
298 .ok_or_else(|| {
299 sql_err!(
300 "cannot insert into non-writeable table '{}'",
301 table_name.full_name_str()
302 )
303 })?
304 .to_vec();
305
306 for default in &mut defaults {
307 transform_ast::transform(scx, default)?;
308 }
309
310 if table.id().is_system() {
311 sql_bail!(
312 "cannot insert into system table '{}'",
313 table_name.full_name_str()
314 );
315 }
316
317 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
318
319 let mut source_types = Vec::with_capacity(columns.len());
321 let mut ordering = Vec::with_capacity(columns.len());
322
323 if columns.is_empty() {
324 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
327 ordering.extend(0..desc.arity());
328 } else {
329 let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
330 .iter()
331 .enumerate()
332 .map(|(idx, (name, typ))| (name, (idx, typ)))
333 .collect();
334
335 for c in &columns {
336 if let Some((idx, typ)) = column_by_name.get(c) {
337 ordering.push(*idx);
338 source_types.push(&typ.scalar_type);
339 } else {
340 sql_bail!(
341 "column {} of relation {} does not exist",
342 c.quoted(),
343 table_name.full_name_str().quoted()
344 );
345 }
346 }
347 if let Some(dup) = columns.iter().duplicates().next() {
348 sql_bail!("column {} specified more than once", dup.quoted());
349 }
350 };
351
352 let expr = match source {
354 InsertSource::Query(mut query) => {
355 transform_ast::transform(scx, &mut query)?;
356
357 match query {
358 Query {
360 body: SetExpr::Values(Values(values)),
361 ctes,
362 order_by,
363 limit: None,
364 offset: None,
365 } if ctes.is_empty() && order_by.is_empty() => {
366 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
367 plan_values_insert(&qcx, &names, &source_types, &values)?
368 }
369 _ => {
370 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
371 expr
372 }
373 }
374 }
375 InsertSource::DefaultValues => {
376 HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
377 }
378 };
379
380 let expr_arity = expr.arity();
381
382 let max_columns = if columns.is_empty() {
385 desc.arity()
386 } else {
387 columns.len()
388 };
389 if expr_arity > max_columns {
390 sql_bail!("INSERT has more expressions than target columns");
391 }
392 if expr_arity < columns.len() {
394 sql_bail!("INSERT has more target columns than expressions");
395 }
396
397 source_types.truncate(expr_arity);
399 ordering.truncate(expr_arity);
400
401 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
404 sql_err!(
405 "column {} is of type {} but expression is of type {}",
406 desc.get_name(ordering[e.column]).quoted(),
407 qcx.humanize_sql_scalar_type(&e.target_type, false),
408 qcx.humanize_sql_scalar_type(&e.source_type, false),
409 )
410 })?;
411
412 let mut map_exprs = vec![];
414 let mut project_key = Vec::with_capacity(desc.arity());
415
416 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
418
419 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
420 for (col_idx, (col_typ, default)) in column_details {
421 if let Some(src_idx) = col_to_source.get(&col_idx) {
422 project_key.push(*src_idx);
423 } else {
424 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
425 project_key.push(expr_arity + map_exprs.len());
426 map_exprs.push(hir);
427 }
428 }
429
430 let returning = {
431 let (scope, typ) = if let ResolvedItemName::Item {
432 full_name,
433 version: _,
434 ..
435 } = table_name
436 {
437 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
438 let typ = desc.typ().clone();
439 (scope, typ)
440 } else {
441 (Scope::empty(), SqlRelationType::empty())
442 };
443 let ecx = &ExprContext {
444 qcx: &qcx,
445 name: "RETURNING clause",
446 scope: &scope,
447 relation_type: &typ,
448 allow_aggregates: false,
449 allow_subqueries: false,
450 allow_parameters: true,
451 allow_windows: false,
452 };
453 let table_func_names = BTreeMap::new();
454 let mut output_columns = vec![];
455 let mut new_exprs = vec![];
456 let mut new_type = SqlRelationType::empty();
457 for mut si in returning {
458 transform_ast::transform(scx, &mut si)?;
459 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
460 let expr = match &select_item {
461 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
462 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
463 };
464 output_columns.push(column_name);
465 let typ = ecx.column_type(&expr);
466 new_type.column_types.push(typ);
467 new_exprs.push(expr);
468 }
469 }
470 let desc = RelationDesc::new(new_type, output_columns);
471 let desc_arity = desc.arity();
472 PlannedRootQuery {
473 expr: new_exprs,
474 desc,
475 finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
476 scope,
477 }
478 };
479
480 Ok((
481 table.id(),
482 expr.map(map_exprs).project(project_key),
483 returning,
484 ))
485}
486
487pub fn plan_copy_item(
498 scx: &StatementContext,
499 item_name: ResolvedItemName,
500 columns: Vec<Ident>,
501) -> Result<
502 (
503 CatalogItemId,
504 RelationDesc,
505 Vec<ColumnIndex>,
506 Option<MapFilterProject>,
507 ),
508 PlanError,
509> {
510 let item = scx.get_item_by_resolved_name(&item_name)?;
511 let fullname = scx.catalog.resolve_full_name(item.name());
512 let table_desc = match item.relation_desc() {
513 Some(desc) => desc.into_owned(),
514 None => {
515 return Err(PlanError::InvalidDependency {
516 name: fullname.to_string(),
517 item_type: item.item_type().to_string(),
518 });
519 }
520 };
521 let mut ordering = Vec::with_capacity(columns.len());
522
523 let mfp = if let Some(table_defaults) = item.writable_table_details() {
533 let mut table_defaults = table_defaults.to_vec();
534
535 for default in &mut table_defaults {
536 transform_ast::transform(scx, default)?;
537 }
538
539 let source_column_names: Vec<_> = columns
541 .iter()
542 .cloned()
543 .map(normalize::column_name)
544 .collect();
545
546 let mut default_exprs = Vec::new();
547 let mut project_keys = Vec::with_capacity(table_desc.arity());
548
549 let column_details = table_desc.iter().zip_eq(table_defaults);
552 for ((col_name, col_type), col_default) in column_details {
553 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
554 if let Some(src_idx) = maybe_src_idx {
555 project_keys.push(src_idx);
556 } else {
557 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
560 let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
561 project_keys.push(source_column_names.len() + default_exprs.len());
562 default_exprs.push(mir);
563 }
564 }
565
566 let mfp = MapFilterProject::new(source_column_names.len())
567 .map(default_exprs)
568 .project(project_keys);
569 Some(mfp)
570 } else {
571 None
572 };
573
574 let source_desc = if columns.is_empty() {
576 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
577 ordering.extend(indexes);
578
579 table_desc
581 } else {
582 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
583 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
584 .iter_all()
585 .map(|(idx, name, typ)| (name, (*idx, typ)))
586 .collect();
587
588 let mut names = Vec::with_capacity(columns.len());
589 let mut source_types = Vec::with_capacity(columns.len());
590
591 for c in &columns {
592 if let Some((idx, typ)) = column_by_name.get(c) {
593 ordering.push(*idx);
594 source_types.push((*typ).clone());
595 names.push(c.clone());
596 } else {
597 sql_bail!(
598 "column {} of relation {} does not exist",
599 c.quoted(),
600 item_name.full_name_str().quoted()
601 );
602 }
603 }
604 if let Some(dup) = columns.iter().duplicates().next() {
605 sql_bail!("column {} specified more than once", dup.quoted());
606 }
607
608 RelationDesc::new(SqlRelationType::new(source_types), names)
610 };
611
612 Ok((item.id(), source_desc, ordering, mfp))
613}
614
615pub fn plan_copy_from(
619 scx: &StatementContext,
620 table_name: ResolvedItemName,
621 columns: Vec<Ident>,
622) -> Result<
623 (
624 CatalogItemId,
625 RelationDesc,
626 Vec<ColumnIndex>,
627 Option<MapFilterProject>,
628 ),
629 PlanError,
630> {
631 let table = scx.get_item_by_resolved_name(&table_name)?;
632
633 if table.item_type() != CatalogItemType::Table {
635 sql_bail!(
636 "cannot insert into {} '{}'",
637 table.item_type(),
638 table_name.full_name_str()
639 );
640 }
641
642 let _ = table.writable_table_details().ok_or_else(|| {
643 sql_err!(
644 "cannot insert into non-writeable table '{}'",
645 table_name.full_name_str()
646 )
647 })?;
648
649 if table.id().is_system() {
650 sql_bail!(
651 "cannot insert into system table '{}'",
652 table_name.full_name_str()
653 );
654 }
655 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
656
657 Ok((id, desc, ordering, mfp))
658}
659
660pub fn plan_copy_from_rows(
663 pcx: &PlanContext,
664 catalog: &dyn SessionCatalog,
665 target_id: CatalogItemId,
666 target_name: String,
667 columns: Vec<ColumnIndex>,
668 rows: Vec<mz_repr::Row>,
669) -> Result<HirRelationExpr, PlanError> {
670 let scx = StatementContext::new(Some(pcx), catalog);
671
672 let table = catalog
674 .try_get_item(&target_id)
675 .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
676 .at_version(RelationVersionSelector::Latest);
677
678 let mut defaults = table
679 .writable_table_details()
680 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
681 .to_vec();
682
683 for default in &mut defaults {
684 transform_ast::transform(&scx, default)?;
685 }
686
687 let desc = table.relation_desc().expect("table has desc");
688 let column_types = columns
689 .iter()
690 .map(|x| desc.get_type(x).clone())
691 .map(|mut x| {
692 x.nullable = true;
695 x
696 })
697 .collect();
698 let typ = SqlRelationType::new(column_types);
699 let expr = HirRelationExpr::Constant {
700 rows,
701 typ: typ.clone(),
702 };
703
704 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
710 if columns == default {
711 return Ok(expr);
712 }
713
714 let mut map_exprs = vec![];
716 let mut project_key = Vec::with_capacity(desc.arity());
717
718 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
720
721 let column_details = desc.iter_all().zip_eq(defaults);
722 for ((col_idx, _col_name, col_typ), default) in column_details {
723 if let Some(src_idx) = col_to_source.get(&col_idx) {
724 project_key.push(*src_idx);
725 } else {
726 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
727 project_key.push(typ.arity() + map_exprs.len());
728 map_exprs.push(hir);
729 }
730 }
731
732 Ok(expr.map(map_exprs).project(project_key))
733}
734
735pub struct ReadThenWritePlan {
737 pub id: CatalogItemId,
738 pub selection: HirRelationExpr,
743 pub assignments: BTreeMap<usize, HirScalarExpr>,
745 pub finishing: RowSetFinishing,
746}
747
748pub fn plan_delete_query(
749 scx: &StatementContext,
750 mut delete_stmt: DeleteStatement<Aug>,
751) -> Result<ReadThenWritePlan, PlanError> {
752 transform_ast::transform(scx, &mut delete_stmt)?;
753
754 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
755 plan_mutation_query_inner(
756 qcx,
757 delete_stmt.table_name,
758 delete_stmt.alias,
759 delete_stmt.using,
760 vec![],
761 delete_stmt.selection,
762 )
763}
764
765pub fn plan_update_query(
766 scx: &StatementContext,
767 mut update_stmt: UpdateStatement<Aug>,
768) -> Result<ReadThenWritePlan, PlanError> {
769 transform_ast::transform(scx, &mut update_stmt)?;
770
771 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
772
773 plan_mutation_query_inner(
774 qcx,
775 update_stmt.table_name,
776 update_stmt.alias,
777 vec![],
778 update_stmt.assignments,
779 update_stmt.selection,
780 )
781}
782
783pub fn plan_mutation_query_inner(
784 qcx: QueryContext,
785 table_name: ResolvedItemName,
786 alias: Option<TableAlias>,
787 using: Vec<TableWithJoins<Aug>>,
788 assignments: Vec<Assignment<Aug>>,
789 selection: Option<Expr<Aug>>,
790) -> Result<ReadThenWritePlan, PlanError> {
791 let (id, version) = match table_name {
793 ResolvedItemName::Item { id, version, .. } => (id, version),
794 _ => sql_bail!("cannot mutate non-user table"),
795 };
796
797 let item = qcx.scx.get_item(&id).at_version(version);
799 if item.item_type() != CatalogItemType::Table {
800 sql_bail!(
801 "cannot mutate {} '{}'",
802 item.item_type(),
803 table_name.full_name_str()
804 );
805 }
806 let _ = item.writable_table_details().ok_or_else(|| {
807 sql_err!(
808 "cannot mutate non-writeable table '{}'",
809 table_name.full_name_str()
810 )
811 })?;
812 if id.is_system() {
813 sql_bail!(
814 "cannot mutate system table '{}'",
815 table_name.full_name_str()
816 );
817 }
818
819 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
821 let scope = plan_table_alias(scope, alias.as_ref())?;
822 let desc = item.relation_desc().expect("table has desc");
823 let relation_type = qcx.relation_type(&get);
824
825 if using.is_empty() {
826 if let Some(expr) = selection {
827 let ecx = &ExprContext {
828 qcx: &qcx,
829 name: "WHERE clause",
830 scope: &scope,
831 relation_type: &relation_type,
832 allow_aggregates: false,
833 allow_subqueries: true,
834 allow_parameters: true,
835 allow_windows: false,
836 };
837 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
838 get = get.filter(vec![expr]);
839 }
840 } else {
841 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
842 }
843
844 let mut sets = BTreeMap::new();
845 for Assignment { id, value } in assignments {
846 let name = normalize::column_name(id);
848 match desc.get_by_name(&name) {
849 Some((idx, typ)) => {
850 let ecx = &ExprContext {
851 qcx: &qcx,
852 name: "SET clause",
853 scope: &scope,
854 relation_type: &relation_type,
855 allow_aggregates: false,
856 allow_subqueries: false,
857 allow_parameters: true,
858 allow_windows: false,
859 };
860 let expr = plan_expr(ecx, &value)?.cast_to(
861 ecx,
862 CastContext::Assignment,
863 &typ.scalar_type,
864 )?;
865
866 if sets.insert(idx, expr).is_some() {
867 sql_bail!("column {} set twice", name)
868 }
869 }
870 None => sql_bail!("unknown column {}", name),
871 };
872 }
873
874 let finishing = RowSetFinishing {
875 order_by: vec![],
876 limit: None,
877 offset: 0,
878 project: (0..desc.arity()).collect(),
879 };
880
881 Ok(ReadThenWritePlan {
882 id,
883 selection: get,
884 finishing,
885 assignments: sets,
886 })
887}
888
889fn handle_mutation_using_clause(
901 qcx: &QueryContext,
902 selection: Option<Expr<Aug>>,
903 using: Vec<TableWithJoins<Aug>>,
904 get: HirRelationExpr,
905 outer_scope: Scope,
906) -> Result<HirRelationExpr, PlanError> {
907 let (mut using_rel_expr, using_scope) =
911 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
912 let (left, left_scope) = l;
913 plan_join(
914 qcx,
915 left,
916 left_scope,
917 &Join {
918 relation: TableFactor::NestedJoin {
919 join: Box::new(twj),
920 alias: None,
921 },
922 join_operator: JoinOperator::CrossJoin,
923 },
924 )
925 })?;
926
927 if let Some(expr) = selection {
928 let on = HirScalarExpr::literal_true();
934 let joined = using_rel_expr
935 .clone()
936 .join(get.clone(), on, JoinKind::Inner);
937 let joined_scope = using_scope.product(outer_scope)?;
938 let joined_relation_type = qcx.relation_type(&joined);
939
940 let ecx = &ExprContext {
941 qcx,
942 name: "WHERE clause",
943 scope: &joined_scope,
944 relation_type: &joined_relation_type,
945 allow_aggregates: false,
946 allow_subqueries: true,
947 allow_parameters: true,
948 allow_windows: false,
949 };
950
951 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
953
954 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
958 use mz_expr::visit::Visit;
960 expr.visit_mut_post(&mut |e| {
961 if let HirScalarExpr::Column(c, _name) = e {
962 if c.column >= using_rel_arity {
963 c.level += 1;
964 c.column -= using_rel_arity;
965 };
966 }
967 })?;
968
969 using_rel_expr = using_rel_expr.filter(vec![expr]);
973 } else {
974 let _joined_scope = using_scope.product(outer_scope)?;
977 }
978 Ok(get.filter(vec![using_rel_expr.exists()]))
989}
990
991#[derive(Debug)]
992pub(crate) struct CastRelationError {
993 pub(crate) column: usize,
994 pub(crate) source_type: SqlScalarType,
995 pub(crate) target_type: SqlScalarType,
996}
997
998pub(crate) fn cast_relation<'a, I>(
1002 qcx: &QueryContext,
1003 ccx: CastContext,
1004 expr: HirRelationExpr,
1005 target_types: I,
1006) -> Result<HirRelationExpr, CastRelationError>
1007where
1008 I: IntoIterator<Item = &'a SqlScalarType>,
1009{
1010 let ecx = &ExprContext {
1011 qcx,
1012 name: "values",
1013 scope: &Scope::empty(),
1014 relation_type: &qcx.relation_type(&expr),
1015 allow_aggregates: false,
1016 allow_subqueries: true,
1017 allow_parameters: true,
1018 allow_windows: false,
1019 };
1020 let mut map_exprs = vec![];
1021 let mut project_key = vec![];
1022 for (i, target_typ) in target_types.into_iter().enumerate() {
1023 let expr = HirScalarExpr::column(i);
1024 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1028 Ok(cast_expr) => {
1029 if expr == cast_expr {
1030 project_key.push(i);
1032 } else {
1033 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1035 map_exprs.push(cast_expr);
1036 }
1037 }
1038 Err(_) => {
1039 return Err(CastRelationError {
1040 column: i,
1041 source_type: ecx.scalar_type(&expr),
1042 target_type: target_typ.clone(),
1043 });
1044 }
1045 }
1046 }
1047 Ok(expr.map(map_exprs).project(project_key))
1048}
1049
1050pub fn plan_as_of(
1053 scx: &StatementContext,
1054 as_of: Option<AsOf<Aug>>,
1055) -> Result<QueryWhen, PlanError> {
1056 match as_of {
1057 None => Ok(QueryWhen::Immediately),
1058 Some(as_of) => match as_of {
1059 AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1060 AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1061 },
1062 }
1063}
1064
1065pub fn plan_as_of_or_up_to(
1075 scx: &StatementContext,
1076 mut expr: Expr<Aug>,
1077) -> Result<mz_repr::Timestamp, PlanError> {
1078 let scope = Scope::empty();
1079 let desc = RelationDesc::empty();
1080 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1083 transform_ast::transform(scx, &mut expr)?;
1084 let ecx = &ExprContext {
1085 qcx: &qcx,
1086 name: "AS OF or UP TO",
1087 scope: &scope,
1088 relation_type: desc.typ(),
1089 allow_aggregates: false,
1090 allow_subqueries: false,
1091 allow_parameters: false,
1092 allow_windows: false,
1093 };
1094 let hir = plan_expr(ecx, &expr)?.cast_to(
1095 ecx,
1096 CastContext::Assignment,
1097 &SqlScalarType::MzTimestamp,
1098 )?;
1099 if hir.contains_unmaterializable() {
1100 bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1101 }
1102 let timestamp = hir
1109 .into_literal_mz_timestamp()
1110 .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1111 Ok(timestamp)
1112}
1113
1114pub fn plan_secret_as(
1116 scx: &StatementContext,
1117 mut expr: Expr<Aug>,
1118) -> Result<MirScalarExpr, PlanError> {
1119 let scope = Scope::empty();
1120 let desc = RelationDesc::empty();
1121 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1122
1123 transform_ast::transform(scx, &mut expr)?;
1124
1125 let ecx = &ExprContext {
1126 qcx: &qcx,
1127 name: "AS",
1128 scope: &scope,
1129 relation_type: desc.typ(),
1130 allow_aggregates: false,
1131 allow_subqueries: false,
1132 allow_parameters: false,
1133 allow_windows: false,
1134 };
1135 let expr = plan_expr(ecx, &expr)?
1136 .type_as(ecx, &SqlScalarType::Bytes)?
1137 .lower_uncorrelated(scx.catalog.system_vars())?;
1138 Ok(expr)
1139}
1140
1141pub fn plan_webhook_validate_using(
1143 scx: &StatementContext,
1144 validate_using: CreateWebhookSourceCheck<Aug>,
1145) -> Result<WebhookValidation, PlanError> {
1146 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1147
1148 let CreateWebhookSourceCheck {
1149 options,
1150 using: mut expr,
1151 } = validate_using;
1152
1153 let mut column_typs = vec![];
1154 let mut column_names = vec![];
1155
1156 let (bodies, headers, secrets) = options
1157 .map(|o| (o.bodies, o.headers, o.secrets))
1158 .unwrap_or_default();
1159
1160 let mut body_tuples = vec![];
1162 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1163 let scalar_type = use_bytes
1164 .then_some(SqlScalarType::Bytes)
1165 .unwrap_or(SqlScalarType::String);
1166 let name = alias
1167 .map(|a| a.into_string())
1168 .unwrap_or_else(|| "body".to_string());
1169
1170 column_typs.push(SqlColumnType {
1171 scalar_type,
1172 nullable: false,
1173 });
1174 column_names.push(name);
1175
1176 let column_idx = column_typs.len() - 1;
1178 assert_eq!(
1180 column_idx,
1181 column_names.len() - 1,
1182 "body column names and types don't match"
1183 );
1184 body_tuples.push((column_idx, use_bytes));
1185 }
1186
1187 let mut header_tuples = vec![];
1189
1190 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1191 let value_type = use_bytes
1192 .then_some(SqlScalarType::Bytes)
1193 .unwrap_or(SqlScalarType::String);
1194 let name = alias
1195 .map(|a| a.into_string())
1196 .unwrap_or_else(|| "headers".to_string());
1197
1198 column_typs.push(SqlColumnType {
1199 scalar_type: SqlScalarType::Map {
1200 value_type: Box::new(value_type),
1201 custom_id: None,
1202 },
1203 nullable: false,
1204 });
1205 column_names.push(name);
1206
1207 let column_idx = column_typs.len() - 1;
1209 assert_eq!(
1211 column_idx,
1212 column_names.len() - 1,
1213 "header column names and types don't match"
1214 );
1215 header_tuples.push((column_idx, use_bytes));
1216 }
1217
1218 let mut validation_secrets = vec![];
1220
1221 for CreateWebhookSourceSecret {
1222 secret,
1223 alias,
1224 use_bytes,
1225 } in secrets
1226 {
1227 let scalar_type = use_bytes
1229 .then_some(SqlScalarType::Bytes)
1230 .unwrap_or(SqlScalarType::String);
1231
1232 column_typs.push(SqlColumnType {
1233 scalar_type,
1234 nullable: false,
1235 });
1236 let ResolvedItemName::Item {
1237 id,
1238 full_name: FullItemName { item, .. },
1239 ..
1240 } = secret
1241 else {
1242 return Err(PlanError::InvalidSecret(Box::new(secret)));
1243 };
1244
1245 let name = if let Some(alias) = alias {
1247 alias.into_string()
1248 } else {
1249 item
1250 };
1251 column_names.push(name);
1252
1253 let column_idx = column_typs.len() - 1;
1256 assert_eq!(
1258 column_idx,
1259 column_names.len() - 1,
1260 "column names and types don't match"
1261 );
1262
1263 validation_secrets.push(WebhookValidationSecret {
1264 id,
1265 column_idx,
1266 use_bytes,
1267 });
1268 }
1269
1270 let relation_typ = SqlRelationType::new(column_typs);
1271 let desc = RelationDesc::new(relation_typ, column_names.clone());
1272 let scope = Scope::from_source(None, column_names);
1273
1274 transform_ast::transform(scx, &mut expr)?;
1275
1276 let ecx = &ExprContext {
1277 qcx: &qcx,
1278 name: "CHECK",
1279 scope: &scope,
1280 relation_type: desc.typ(),
1281 allow_aggregates: false,
1282 allow_subqueries: false,
1283 allow_parameters: false,
1284 allow_windows: false,
1285 };
1286 let expr = plan_expr(ecx, &expr)?
1287 .type_as(ecx, &SqlScalarType::Bool)?
1288 .lower_uncorrelated(scx.catalog.system_vars())?;
1289 let validation = WebhookValidation {
1290 expression: expr,
1291 relation_desc: desc,
1292 bodies: body_tuples,
1293 headers: header_tuples,
1294 secrets: validation_secrets,
1295 };
1296 Ok(validation)
1297}
1298
1299pub fn plan_default_expr(
1300 scx: &StatementContext,
1301 expr: &Expr<Aug>,
1302 target_ty: &SqlScalarType,
1303) -> Result<HirScalarExpr, PlanError> {
1304 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1305 let ecx = &ExprContext {
1306 qcx: &qcx,
1307 name: "DEFAULT expression",
1308 scope: &Scope::empty(),
1309 relation_type: &SqlRelationType::empty(),
1310 allow_aggregates: false,
1311 allow_subqueries: false,
1312 allow_parameters: false,
1313 allow_windows: false,
1314 };
1315 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1316 Ok(hir)
1317}
1318
1319pub fn plan_params<'a>(
1320 scx: &'a StatementContext,
1321 params: Vec<Expr<Aug>>,
1322 desc: &StatementDesc,
1323) -> Result<Params, PlanError> {
1324 if params.len() != desc.param_types.len() {
1325 sql_bail!(
1326 "expected {} params, got {}",
1327 desc.param_types.len(),
1328 params.len()
1329 );
1330 }
1331
1332 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1333
1334 let mut datums = Row::default();
1335 let mut packer = datums.packer();
1336 let mut actual_types = Vec::new();
1337 let temp_storage = &RowArena::new();
1338 for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1339 transform_ast::transform(scx, &mut expr)?;
1340
1341 let ecx = execute_expr_context(&qcx);
1342 let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1343 let actual_ty = ecx.scalar_type(&ex);
1344 if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1345 return Err(PlanError::WrongParameterType(
1346 i + 1,
1347 ecx.humanize_sql_scalar_type(expected_ty, false),
1348 ecx.humanize_sql_scalar_type(&actual_ty, false),
1349 ));
1350 }
1351 let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1352 let evaled = ex.eval(&[], temp_storage)?;
1353 packer.push(evaled);
1354 actual_types.push(actual_ty);
1355 }
1356 Ok(Params {
1357 datums,
1358 execute_types: actual_types,
1359 expected_types: desc.param_types.clone(),
1360 })
1361}
1362
1363static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1364static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1365
1366pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1368 ExprContext {
1369 qcx,
1370 name: "EXECUTE",
1371 scope: &EXECUTE_CONTEXT_SCOPE,
1372 relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1373 allow_aggregates: false,
1374 allow_subqueries: false,
1375 allow_parameters: false,
1376 allow_windows: false,
1377 }
1378}
1379
1380pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1385 LazyLock::new(|| CastContext::Assignment);
1386
1387pub fn plan_index_exprs<'a>(
1388 scx: &'a StatementContext,
1389 on_desc: &RelationDesc,
1390 exprs: Vec<Expr<Aug>>,
1391) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1392 let scope = Scope::from_source(None, on_desc.iter_names());
1393 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1394
1395 let ecx = &ExprContext {
1396 qcx: &qcx,
1397 name: "CREATE INDEX",
1398 scope: &scope,
1399 relation_type: on_desc.typ(),
1400 allow_aggregates: false,
1401 allow_subqueries: false,
1402 allow_parameters: false,
1403 allow_windows: false,
1404 };
1405 let repr_col_types: Vec<ReprColumnType> = on_desc
1406 .typ()
1407 .column_types
1408 .iter()
1409 .map(ReprColumnType::from)
1410 .collect();
1411 let mut out = vec![];
1412 for mut expr in exprs {
1413 transform_ast::transform(scx, &mut expr)?;
1414 let expr = plan_expr_or_col_index(ecx, &expr)?;
1415 let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1416 expr.reduce(&repr_col_types);
1417 out.push(expr);
1418 }
1419 Ok(out)
1420}
1421
1422fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1423 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1424 Some(column) => Ok(HirScalarExpr::column(column)),
1425 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1426 }
1427}
1428
1429fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1430 match e {
1431 Expr::Value(Value::Number(n)) => {
1432 let n = n.parse::<usize>().map_err(|e| {
1433 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1434 })?;
1435 if n < 1 || n > max {
1436 sql_bail!(
1437 "column reference {} in {} is out of range (1 - {})",
1438 n,
1439 name,
1440 max
1441 );
1442 }
1443 Ok(Some(n - 1))
1444 }
1445 _ => Ok(None),
1446 }
1447}
1448
1449struct PlannedQuery {
1450 expr: HirRelationExpr,
1451 scope: Scope,
1452 order_by: Vec<ColumnOrder>,
1453 limit: Option<HirScalarExpr>,
1454 offset: HirScalarExpr,
1460 project: Vec<usize>,
1461 group_size_hints: GroupSizeHints,
1462}
1463
1464fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1465 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1466}
1467
1468fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1469 let cte_bindings = plan_ctes(qcx, q)?;
1472
1473 let limit = match &q.limit {
1474 None => None,
1475 Some(Limit {
1476 quantity,
1477 with_ties: false,
1478 }) => {
1479 let ecx = &ExprContext {
1480 qcx,
1481 name: "LIMIT",
1482 scope: &Scope::empty(),
1483 relation_type: &SqlRelationType::empty(),
1484 allow_aggregates: false,
1485 allow_subqueries: true,
1486 allow_parameters: true,
1487 allow_windows: false,
1488 };
1489 let limit = plan_expr(ecx, quantity)?;
1490 let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1491
1492 let limit = if limit.is_constant() {
1493 let arena = RowArena::new();
1494 let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1495
1496 match limit.eval(&[], &arena)? {
1500 d @ Datum::Int64(v) if v >= 0 => {
1501 HirScalarExpr::literal(d, SqlScalarType::Int64)
1502 }
1503 d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1504 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1505 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1506 }
1507 } else {
1508 qcx.scx
1510 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1511 limit
1512 };
1513
1514 Some(limit)
1515 }
1516 Some(Limit {
1517 quantity: _,
1518 with_ties: true,
1519 }) => bail_unsupported!("FETCH ... WITH TIES"),
1520 };
1521
1522 let offset = match &q.offset {
1523 None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1524 Some(offset) => {
1525 let ecx = &ExprContext {
1526 qcx,
1527 name: "OFFSET",
1528 scope: &Scope::empty(),
1529 relation_type: &SqlRelationType::empty(),
1530 allow_aggregates: false,
1531 allow_subqueries: false,
1532 allow_parameters: true,
1533 allow_windows: false,
1534 };
1535 let offset = plan_expr(ecx, offset)?;
1536 let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1537
1538 let offset = if offset.is_constant() {
1539 let offset_value = offset_into_value(offset)?;
1541 HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1542 } else {
1543 if !offset.contains_parameters() {
1547 return Err(PlanError::InvalidOffset(format!(
1548 "must be simplifiable to a constant, possibly after parameter binding, got {}",
1549 offset
1550 )));
1551 }
1552 offset
1553 };
1554 offset
1555 }
1556 };
1557
1558 let mut planned_query = match &q.body {
1559 SetExpr::Select(s) => {
1560 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1562 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1563
1564 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1565 PlannedQuery {
1566 expr: plan.expr,
1567 scope: plan.scope,
1568 order_by: plan.order_by,
1569 project: plan.project,
1570 limit,
1571 offset,
1572 group_size_hints,
1573 }
1574 }
1575 _ => {
1576 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1577 let ecx = &ExprContext {
1578 qcx,
1579 name: "ORDER BY clause of a set expression",
1580 scope: &scope,
1581 relation_type: &qcx.relation_type(&expr),
1582 allow_aggregates: false,
1583 allow_subqueries: true,
1584 allow_parameters: true,
1585 allow_windows: false,
1586 };
1587 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1588 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1589 let project = (0..ecx.relation_type.arity()).collect();
1590 PlannedQuery {
1591 expr: expr.map(map_exprs),
1592 scope,
1593 order_by,
1594 limit,
1595 project,
1596 offset,
1597 group_size_hints: GroupSizeHints::default(),
1598 }
1599 }
1600 };
1601
1602 match &q.ctes {
1604 CteBlock::Simple(_) => {
1605 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1606 if let Some(cte) = qcx.ctes.remove(&id) {
1607 planned_query.expr = HirRelationExpr::Let {
1608 name: cte.name,
1609 id: id.clone(),
1610 value: Box::new(value),
1611 body: Box::new(planned_query.expr),
1612 };
1613 }
1614 if let Some(shadowed_val) = shadowed_val {
1615 qcx.ctes.insert(id, shadowed_val);
1616 }
1617 }
1618 }
1619 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1620 let MutRecBlockOptionExtracted {
1621 recursion_limit,
1622 return_at_recursion_limit,
1623 error_at_recursion_limit,
1624 seen: _,
1625 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1626 let limit = match (
1627 recursion_limit,
1628 return_at_recursion_limit,
1629 error_at_recursion_limit,
1630 ) {
1631 (None, None, None) => None,
1632 (Some(max_iters), None, None) => {
1633 Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1634 }
1635 (None, Some(max_iters), None) => Some((max_iters, true)),
1636 (None, None, Some(max_iters)) => Some((max_iters, false)),
1637 _ => {
1638 return Err(InvalidWmrRecursionLimit(
1639 "More than one recursion limit given. \
1640 Please give at most one of RECURSION LIMIT, \
1641 ERROR AT RECURSION LIMIT, \
1642 RETURN AT RECURSION LIMIT."
1643 .to_owned(),
1644 ));
1645 }
1646 }
1647 .try_map(|(max_iters, return_at_limit)| {
1648 Ok::<LetRecLimit, PlanError>(LetRecLimit {
1649 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1650 "Recursion limit has to be greater than 0.".to_owned(),
1651 ))?,
1652 return_at_limit: *return_at_limit,
1653 })
1654 })?;
1655
1656 let mut bindings = Vec::new();
1657 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1658 if let Some(cte) = qcx.ctes.remove(&id) {
1659 bindings.push((cte.name, id, value, cte.desc.into_typ()));
1660 }
1661 if let Some(shadowed_val) = shadowed_val {
1662 qcx.ctes.insert(id, shadowed_val);
1663 }
1664 }
1665 if !bindings.is_empty() {
1666 planned_query.expr = HirRelationExpr::LetRec {
1667 limit,
1668 bindings,
1669 body: Box::new(planned_query.expr),
1670 }
1671 }
1672 }
1673 }
1674
1675 Ok(planned_query)
1676}
1677
1678pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1680 let offset = offset
1681 .try_into_literal_int64()
1682 .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1683 if offset < 0 {
1684 return Err(PlanError::InvalidOffset(format!(
1685 "must not be negative, got {}",
1686 offset
1687 )));
1688 }
1689 Ok(offset)
1690}
1691
1692generate_extracted_config!(
1693 MutRecBlockOption,
1694 (RecursionLimit, u64),
1695 (ReturnAtRecursionLimit, u64),
1696 (ErrorAtRecursionLimit, u64)
1697);
1698
1699pub fn plan_ctes(
1704 qcx: &mut QueryContext,
1705 q: &Query<Aug>,
1706) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1707 let mut result = Vec::new();
1709 let mut shadowed_descs = BTreeMap::new();
1712
1713 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1715 sql_bail!(
1716 "WITH query name {} specified more than once",
1717 normalize::ident_ref(ident).quoted()
1718 )
1719 }
1720
1721 match &q.ctes {
1722 CteBlock::Simple(ctes) => {
1723 for cte in ctes.iter() {
1725 let cte_name = normalize::ident(cte.alias.name.clone());
1726 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1727 let typ = qcx.relation_type(&val);
1728 let mut desc = RelationDesc::new(typ, scope.column_names());
1729 plan_utils::maybe_rename_columns(
1730 format!("CTE {}", cte.alias.name),
1731 &mut desc,
1732 &cte.alias.columns,
1733 )?;
1734 let shadowed = qcx.ctes.insert(
1736 cte.id,
1737 CteDesc {
1738 name: cte_name,
1739 desc,
1740 },
1741 );
1742
1743 result.push((cte.id, val, shadowed));
1744 }
1745 }
1746 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1747 for cte in ctes.iter() {
1749 let cte_name = normalize::ident(cte.name.clone());
1750 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1751 for column in cte.columns.iter() {
1752 desc_columns.push((
1753 normalize::column_name(column.name.clone()),
1754 SqlColumnType {
1755 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1756 nullable: true,
1757 },
1758 ));
1759 }
1760 let desc = RelationDesc::from_names_and_types(desc_columns);
1761 let shadowed = qcx.ctes.insert(
1762 cte.id,
1763 CteDesc {
1764 name: cte_name,
1765 desc,
1766 },
1767 );
1768 if let Some(shadowed) = shadowed {
1770 shadowed_descs.insert(cte.id, shadowed);
1771 }
1772 }
1773
1774 for cte in ctes.iter() {
1776 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1777
1778 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1779
1780 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1781 sql_bail!(
1784 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1785 );
1786 }
1787
1788 if !proposed_typ.keys.is_empty() {
1789 sql_bail!("[internal error]: WMR CTEs do not support keys");
1792 }
1793
1794 let derived_typ = qcx.relation_type(&val);
1796
1797 let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1798 let cte_name = normalize::ident(cte.name.clone());
1799 let proposed_typ = proposed_typ
1800 .column_types
1801 .iter()
1802 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1803 .collect::<Vec<_>>();
1804 let inferred_typ = derived_typ
1805 .column_types
1806 .iter()
1807 .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1808 .collect::<Vec<_>>();
1809 Err(PlanError::RecursiveTypeMismatch(
1810 cte_name,
1811 proposed_typ,
1812 inferred_typ,
1813 ))
1814 };
1815
1816 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1817 return type_err(proposed_typ, derived_typ);
1818 }
1819
1820 let val = match cast_relation(
1822 qcx,
1823 CastContext::Assignment,
1828 val,
1829 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1830 ) {
1831 Ok(val) => val,
1832 Err(_) => return type_err(proposed_typ, derived_typ),
1833 };
1834
1835 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1836 }
1837 }
1838 }
1839
1840 Ok(result)
1841}
1842
1843pub fn plan_nested_query(
1844 qcx: &mut QueryContext,
1845 q: &Query<Aug>,
1846) -> Result<(HirRelationExpr, Scope), PlanError> {
1847 let PlannedQuery {
1848 mut expr,
1849 scope,
1850 order_by,
1851 limit,
1852 offset,
1853 project,
1854 group_size_hints,
1855 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1856 if limit.is_some()
1857 || !offset
1858 .clone()
1859 .try_into_literal_int64()
1860 .is_ok_and(|offset| offset == 0)
1861 {
1862 expr = HirRelationExpr::top_k(
1863 expr,
1864 vec![],
1865 order_by,
1866 limit,
1867 offset,
1868 group_size_hints.limit_input_group_size,
1869 );
1870 }
1871 Ok((expr.project(project), scope))
1872}
1873
1874fn plan_set_expr(
1875 qcx: &mut QueryContext,
1876 q: &SetExpr<Aug>,
1877) -> Result<(HirRelationExpr, Scope), PlanError> {
1878 match q {
1879 SetExpr::Select(select) => {
1880 let order_by_exprs = Vec::new();
1881 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1882 assert!(plan.order_by.is_empty());
1885 Ok((plan.expr.project(plan.project), plan.scope))
1886 }
1887 SetExpr::SetOperation {
1888 op,
1889 all,
1890 left,
1891 right,
1892 } => {
1893 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1895 let (right_expr, right_scope) =
1896 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1897
1898 let left_type = qcx.relation_type(&left_expr);
1900 let right_type = qcx.relation_type(&right_expr);
1901 if left_type.arity() != right_type.arity() {
1902 sql_bail!(
1903 "each {} query must have the same number of columns: {} vs {}",
1904 op,
1905 left_type.arity(),
1906 right_type.arity(),
1907 );
1908 }
1909
1910 let left_ecx = &ExprContext {
1915 qcx,
1916 name: &op.to_string(),
1917 scope: &left_scope,
1918 relation_type: &left_type,
1919 allow_aggregates: false,
1920 allow_subqueries: false,
1921 allow_parameters: false,
1922 allow_windows: false,
1923 };
1924 let right_ecx = &ExprContext {
1925 qcx,
1926 name: &op.to_string(),
1927 scope: &right_scope,
1928 relation_type: &right_type,
1929 allow_aggregates: false,
1930 allow_subqueries: false,
1931 allow_parameters: false,
1932 allow_windows: false,
1933 };
1934 let mut left_casts = vec![];
1935 let mut right_casts = vec![];
1936 for (i, (left_type, right_type)) in left_type
1937 .column_types
1938 .iter()
1939 .zip_eq(right_type.column_types.iter())
1940 .enumerate()
1941 {
1942 let types = &[
1943 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1944 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1945 ];
1946 let target =
1947 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1948 match typeconv::plan_cast(
1949 left_ecx,
1950 CastContext::Implicit,
1951 HirScalarExpr::column(i),
1952 &target,
1953 ) {
1954 Ok(expr) => left_casts.push(expr),
1955 Err(_) => sql_bail!(
1956 "{} types {} and {} cannot be matched",
1957 op,
1958 qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1959 qcx.humanize_sql_scalar_type(&target, false),
1960 ),
1961 }
1962 match typeconv::plan_cast(
1963 right_ecx,
1964 CastContext::Implicit,
1965 HirScalarExpr::column(i),
1966 &target,
1967 ) {
1968 Ok(expr) => right_casts.push(expr),
1969 Err(_) => sql_bail!(
1970 "{} types {} and {} cannot be matched",
1971 op,
1972 qcx.humanize_sql_scalar_type(&target, false),
1973 qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1974 ),
1975 }
1976 }
1977 let lhs = if left_casts
1978 .iter()
1979 .enumerate()
1980 .any(|(i, e)| e != &HirScalarExpr::column(i))
1981 {
1982 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1983 left_expr.map(left_casts).project(project_key)
1984 } else {
1985 left_expr
1986 };
1987 let rhs = if right_casts
1988 .iter()
1989 .enumerate()
1990 .any(|(i, e)| e != &HirScalarExpr::column(i))
1991 {
1992 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1993 right_expr.map(right_casts).project(project_key)
1994 } else {
1995 right_expr
1996 };
1997
1998 let relation_expr = match op {
1999 SetOperator::Union => {
2000 if *all {
2001 lhs.union(rhs)
2002 } else {
2003 lhs.union(rhs).distinct()
2004 }
2005 }
2006 SetOperator::Except => Hir::except(all, lhs, rhs),
2007 SetOperator::Intersect => {
2008 let left_clone = lhs.clone();
2014 if *all {
2015 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2016 } else {
2017 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2018 .distinct()
2019 }
2020 }
2021 };
2022 let scope = Scope::from_source(
2023 None,
2024 left_scope.column_names(),
2026 );
2027
2028 Ok((relation_expr, scope))
2029 }
2030 SetExpr::Values(Values(values)) => plan_values(qcx, values),
2031 SetExpr::Table(name) => {
2032 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2033 Ok((expr, scope))
2034 }
2035 SetExpr::Query(query) => {
2036 let (expr, scope) = plan_nested_query(qcx, query)?;
2037 Ok((expr, scope))
2038 }
2039 SetExpr::Show(stmt) => {
2040 if !qcx.lifetime.allow_show() {
2054 return Err(PlanError::ShowCommandInView);
2055 }
2056
2057 fn to_hirscope(
2060 plan: ShowCreatePlan,
2061 desc: StatementDesc,
2062 ) -> Result<(HirRelationExpr, Scope), PlanError> {
2063 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2064 let desc = desc.relation_desc.expect("must exist");
2065 let scope = Scope::from_source(None, desc.iter_names());
2066 let expr = HirRelationExpr::constant(rows, desc.into_typ());
2067 Ok((expr, scope))
2068 }
2069
2070 match stmt.clone() {
2071 ShowStatement::ShowColumns(stmt) => {
2072 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2073 }
2074 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2075 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2076 show::describe_show_create_connection(qcx.scx, stmt)?,
2077 ),
2078 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2079 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2080 show::describe_show_create_cluster(qcx.scx, stmt)?,
2081 ),
2082 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2083 show::plan_show_create_index(qcx.scx, stmt.clone())?,
2084 show::describe_show_create_index(qcx.scx, stmt)?,
2085 ),
2086 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2087 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2088 show::describe_show_create_sink(qcx.scx, stmt)?,
2089 ),
2090 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2091 show::plan_show_create_source(qcx.scx, stmt.clone())?,
2092 show::describe_show_create_source(qcx.scx, stmt)?,
2093 ),
2094 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2095 show::plan_show_create_table(qcx.scx, stmt.clone())?,
2096 show::describe_show_create_table(qcx.scx, stmt)?,
2097 ),
2098 ShowStatement::ShowCreateView(stmt) => to_hirscope(
2099 show::plan_show_create_view(qcx.scx, stmt.clone())?,
2100 show::describe_show_create_view(qcx.scx, stmt)?,
2101 ),
2102 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2103 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2104 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2105 ),
2106 ShowStatement::ShowCreateType(stmt) => to_hirscope(
2107 show::plan_show_create_type(qcx.scx, stmt.clone())?,
2108 show::describe_show_create_type(qcx.scx, stmt)?,
2109 ),
2110 ShowStatement::ShowObjects(stmt) => {
2111 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2112 }
2113 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2114 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2115 }
2116 }
2117 }
2118}
2119
2120fn plan_values(
2122 qcx: &QueryContext,
2123 values: &[Vec<Expr<Aug>>],
2124) -> Result<(HirRelationExpr, Scope), PlanError> {
2125 assert!(!values.is_empty());
2126
2127 let ecx = &ExprContext {
2128 qcx,
2129 name: "VALUES",
2130 scope: &Scope::empty(),
2131 relation_type: &SqlRelationType::empty(),
2132 allow_aggregates: false,
2133 allow_subqueries: true,
2134 allow_parameters: true,
2135 allow_windows: false,
2136 };
2137
2138 let ncols = values[0].len();
2139 let nrows = values.len();
2140
2141 let mut cols = vec![vec![]; ncols];
2144 for row in values {
2145 if row.len() != ncols {
2146 sql_bail!(
2147 "VALUES expression has varying number of columns: {} vs {}",
2148 row.len(),
2149 ncols
2150 );
2151 }
2152 for (i, v) in row.iter().enumerate() {
2153 cols[i].push(v);
2154 }
2155 }
2156
2157 let mut col_iters = Vec::with_capacity(ncols);
2159 let mut col_types = Vec::with_capacity(ncols);
2160 for col in &cols {
2161 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2162 let mut col_type = ecx.column_type(&col[0]);
2163 for val in &col[1..] {
2164 col_type = col_type.sql_union(&ecx.column_type(val))?; }
2166 col_types.push(col_type);
2167 col_iters.push(col.into_iter());
2168 }
2169
2170 let mut exprs = vec![];
2172 for _ in 0..nrows {
2173 for i in 0..ncols {
2174 exprs.push(col_iters[i].next().unwrap());
2175 }
2176 }
2177 let out = HirRelationExpr::CallTable {
2178 func: TableFunc::Wrap {
2179 width: ncols,
2180 types: col_types,
2181 },
2182 exprs,
2183 };
2184
2185 let mut scope = Scope::empty();
2187 for i in 0..ncols {
2188 let name = format!("column{}", i + 1);
2189 scope.items.push(ScopeItem::from_column_name(name));
2190 }
2191
2192 Ok((out, scope))
2193}
2194
2195fn plan_values_insert(
2205 qcx: &QueryContext,
2206 target_names: &[&ColumnName],
2207 target_types: &[&SqlScalarType],
2208 values: &[Vec<Expr<Aug>>],
2209) -> Result<HirRelationExpr, PlanError> {
2210 assert!(!values.is_empty());
2211
2212 if !values.iter().map(|row| row.len()).all_equal() {
2213 sql_bail!("VALUES lists must all be the same length");
2214 }
2215
2216 let ecx = &ExprContext {
2217 qcx,
2218 name: "VALUES",
2219 scope: &Scope::empty(),
2220 relation_type: &SqlRelationType::empty(),
2221 allow_aggregates: false,
2222 allow_subqueries: true,
2223 allow_parameters: true,
2224 allow_windows: false,
2225 };
2226
2227 let mut exprs = vec![];
2228 let mut types = vec![];
2229 for row in values {
2230 if row.len() > target_names.len() {
2231 sql_bail!("INSERT has more expressions than target columns");
2232 }
2233 for (column, val) in row.into_iter().enumerate() {
2234 let target_type = &target_types[column];
2235 let val = plan_expr(ecx, val)?;
2236 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2237 let source_type = &ecx.scalar_type(&val);
2238 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2239 Ok(val) => val,
2240 Err(_) => sql_bail!(
2241 "column {} is of type {} but expression is of type {}",
2242 target_names[column].quoted(),
2243 qcx.humanize_sql_scalar_type(target_type, false),
2244 qcx.humanize_sql_scalar_type(source_type, false),
2245 ),
2246 };
2247 if column >= types.len() {
2248 types.push(ecx.column_type(&val));
2249 } else {
2250 types[column] = types[column].sql_union(&ecx.column_type(&val))?; }
2252 exprs.push(val);
2253 }
2254 }
2255
2256 Ok(HirRelationExpr::CallTable {
2257 func: TableFunc::Wrap {
2258 width: values[0].len(),
2259 types,
2260 },
2261 exprs,
2262 })
2263}
2264
2265fn plan_join_identity() -> (HirRelationExpr, Scope) {
2266 let typ = SqlRelationType::new(vec![]);
2267 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2268 let scope = Scope::empty();
2269 (expr, scope)
2270}
2271
2272#[derive(Debug)]
2278struct SelectPlan {
2279 expr: HirRelationExpr,
2280 scope: Scope,
2281 order_by: Vec<ColumnOrder>,
2282 project: Vec<usize>,
2283}
2284
2285generate_extracted_config!(
2286 SelectOption,
2287 (ExpectedGroupSize, u64),
2288 (AggregateInputGroupSize, u64),
2289 (DistinctOnInputGroupSize, u64),
2290 (LimitInputGroupSize, u64)
2291);
2292
2293fn plan_select_from_where(
2311 qcx: &QueryContext,
2312 mut s: Select<Aug>,
2313 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2314) -> Result<SelectPlan, PlanError> {
2315 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2322 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2323
2324 let (mut relation_expr, mut from_scope) =
2326 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2327 let (left, left_scope) = l;
2328 plan_join(
2329 qcx,
2330 left,
2331 left_scope,
2332 &Join {
2333 relation: TableFactor::NestedJoin {
2334 join: Box::new(twj.clone()),
2335 alias: None,
2336 },
2337 join_operator: JoinOperator::CrossJoin,
2338 },
2339 )
2340 })?;
2341
2342 if let Some(selection) = &s.selection {
2344 let ecx = &ExprContext {
2345 qcx,
2346 name: "WHERE clause",
2347 scope: &from_scope,
2348 relation_type: &qcx.relation_type(&relation_expr),
2349 allow_aggregates: false,
2350 allow_subqueries: true,
2351 allow_parameters: true,
2352 allow_windows: false,
2353 };
2354 let expr = plan_expr(ecx, selection)
2355 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2356 .type_as(ecx, &SqlScalarType::Bool)?;
2357 relation_expr = relation_expr.filter(vec![expr]);
2358 }
2359
2360 let (aggregates, table_funcs) = {
2363 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2364 visitor.visit_select_mut(&mut s);
2365 for o in order_by_exprs.iter_mut() {
2366 visitor.visit_order_by_expr_mut(o);
2367 }
2368 visitor.into_result()?
2369 };
2370 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2371 if !table_funcs.is_empty() {
2372 let (expr, scope) = plan_scalar_table_funcs(
2373 qcx,
2374 table_funcs,
2375 &mut table_func_names,
2376 &relation_expr,
2377 &from_scope,
2378 )?;
2379 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2380 from_scope = from_scope.product(scope)?;
2381 }
2382
2383 let projection = {
2385 let ecx = &ExprContext {
2386 qcx,
2387 name: "SELECT clause",
2388 scope: &from_scope,
2389 relation_type: &qcx.relation_type(&relation_expr),
2390 allow_aggregates: true,
2391 allow_subqueries: true,
2392 allow_parameters: true,
2393 allow_windows: true,
2394 };
2395 let mut out = vec![];
2396 for si in &s.projection {
2397 if *si == SelectItem::Wildcard && s.from.is_empty() {
2398 sql_bail!("SELECT * with no tables specified is not valid");
2399 }
2400 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2401 }
2402 out
2403 };
2404
2405 let (mut group_scope, select_all_mapping) = {
2409 let ecx = &ExprContext {
2411 qcx,
2412 name: "GROUP BY clause",
2413 scope: &from_scope,
2414 relation_type: &qcx.relation_type(&relation_expr),
2415 allow_aggregates: false,
2416 allow_subqueries: true,
2417 allow_parameters: true,
2418 allow_windows: false,
2419 };
2420 let mut group_key = vec![];
2421 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2422 let mut group_hir_exprs = vec![];
2423 let mut group_scope = Scope::empty();
2424 let mut select_all_mapping = BTreeMap::new();
2425
2426 for group_expr in &s.group_by {
2427 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2428 let new_column = group_key.len();
2429
2430 if let Some(group_expr) = group_expr {
2431 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2435 existing_scope_item.exprs.insert(group_expr.clone());
2436 continue;
2437 }
2438 }
2439
2440 let mut scope_item = if let HirScalarExpr::Column(
2441 ColumnRef {
2442 level: 0,
2443 column: old_column,
2444 },
2445 _name,
2446 ) = &expr
2447 {
2448 select_all_mapping.insert(*old_column, new_column);
2454 let scope_item = ecx.scope.items[*old_column].clone();
2455 scope_item
2456 } else {
2457 ScopeItem::empty()
2458 };
2459
2460 if let Some(group_expr) = group_expr.cloned() {
2461 scope_item.exprs.insert(group_expr);
2462 }
2463
2464 group_key.push(from_scope.len() + group_exprs.len());
2465 group_hir_exprs.push(expr.clone());
2466 group_exprs.insert(expr, scope_item);
2467 }
2468
2469 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2470 for expr in &group_hir_exprs {
2471 if let Some(scope_item) = group_exprs.remove(expr) {
2472 group_scope.items.push(scope_item);
2473 }
2474 }
2475
2476 let ecx = &ExprContext {
2478 qcx,
2479 name: "aggregate function",
2480 scope: &from_scope,
2481 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2482 allow_aggregates: false,
2483 allow_subqueries: true,
2484 allow_parameters: true,
2485 allow_windows: false,
2486 };
2487 let mut agg_exprs = vec![];
2488 for sql_function in aggregates {
2489 if sql_function.over.is_some() {
2490 unreachable!(
2491 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2492 );
2493 }
2494 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2495 group_scope
2496 .items
2497 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2498 }
2499 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2500 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2502 group_key,
2503 agg_exprs,
2504 group_size_hints.aggregate_input_group_size,
2505 );
2506
2507 for i in 0..from_scope.len() {
2513 if !select_all_mapping.contains_key(&i) {
2514 let scope_item = &ecx.scope.items[i];
2515 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2516 table_name: scope_item.table_name.clone(),
2517 column_name: scope_item.column_name.clone(),
2518 allow_unqualified_references: scope_item.allow_unqualified_references,
2519 });
2520 }
2521 }
2522
2523 (group_scope, select_all_mapping)
2524 } else {
2525 (
2527 from_scope.clone(),
2528 (0..from_scope.len()).map(|i| (i, i)).collect(),
2529 )
2530 }
2531 };
2532
2533 if let Some(ref having) = s.having {
2535 let ecx = &ExprContext {
2536 qcx,
2537 name: "HAVING clause",
2538 scope: &group_scope,
2539 relation_type: &qcx.relation_type(&relation_expr),
2540 allow_aggregates: true,
2541 allow_subqueries: true,
2542 allow_parameters: true,
2543 allow_windows: false,
2544 };
2545 let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2546 relation_expr = relation_expr.filter(vec![expr]);
2547 }
2548
2549 let window_funcs = {
2562 let mut visitor = WindowFuncCollector::default();
2563 visitor.visit_select(&s);
2567 for o in order_by_exprs.iter() {
2568 visitor.visit_order_by_expr(o);
2569 }
2570 visitor.into_result()
2571 };
2572 for window_func in window_funcs {
2573 let ecx = &ExprContext {
2574 qcx,
2575 name: "window function",
2576 scope: &group_scope,
2577 relation_type: &qcx.relation_type(&relation_expr),
2578 allow_aggregates: true,
2579 allow_subqueries: true,
2580 allow_parameters: true,
2581 allow_windows: true,
2582 };
2583 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2584 group_scope.items.push(ScopeItem::from_expr(window_func));
2585 }
2586 if let Some(ref qualify) = s.qualify {
2593 let ecx = &ExprContext {
2594 qcx,
2595 name: "QUALIFY clause",
2596 scope: &group_scope,
2597 relation_type: &qcx.relation_type(&relation_expr),
2598 allow_aggregates: true,
2599 allow_subqueries: true,
2600 allow_parameters: true,
2601 allow_windows: true,
2602 };
2603 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2604 relation_expr = relation_expr.filter(vec![expr]);
2605 }
2606
2607 let output_columns = {
2609 let mut new_exprs = vec![];
2610 let mut new_type = qcx.relation_type(&relation_expr);
2611 let mut output_columns = vec![];
2612 for (select_item, column_name) in &projection {
2613 let ecx = &ExprContext {
2614 qcx,
2615 name: "SELECT clause",
2616 scope: &group_scope,
2617 relation_type: &new_type,
2618 allow_aggregates: true,
2619 allow_subqueries: true,
2620 allow_parameters: true,
2621 allow_windows: true,
2622 };
2623 let expr = match select_item {
2624 ExpandedSelectItem::InputOrdinal(i) => {
2625 if let Some(column) = select_all_mapping.get(i).copied() {
2626 HirScalarExpr::column(column)
2627 } else {
2628 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2629 }
2630 }
2631 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2632 };
2633 if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2634 output_columns.push((column, column_name));
2636 } else {
2637 let typ = ecx.column_type(&expr);
2644 new_type.column_types.push(typ);
2645 new_exprs.push(expr);
2646 output_columns.push((group_scope.len(), column_name));
2647 group_scope
2648 .items
2649 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2650 }
2651 }
2652 relation_expr = relation_expr.map(new_exprs);
2653 output_columns
2654 };
2655 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2656
2657 let order_by = {
2659 let relation_type = qcx.relation_type(&relation_expr);
2660 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2661 &ExprContext {
2662 qcx,
2663 name: "ORDER BY clause",
2664 scope: &group_scope,
2665 relation_type: &relation_type,
2666 allow_aggregates: true,
2667 allow_subqueries: true,
2668 allow_parameters: true,
2669 allow_windows: true,
2670 },
2671 &order_by_exprs,
2672 &output_columns,
2673 )?;
2674
2675 match s.distinct {
2676 None => relation_expr = relation_expr.map(map_exprs),
2677 Some(Distinct::EntireRow) => {
2678 if relation_type.arity() == 0 {
2679 sql_bail!("SELECT DISTINCT must have at least one column");
2680 }
2681 if !try_push_projection_order_by(
2685 &mut relation_expr,
2686 &mut project_key,
2687 &mut order_by,
2688 ) {
2689 sql_bail!(
2690 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2691 );
2692 }
2693 assert!(map_exprs.is_empty());
2694 relation_expr = relation_expr.distinct();
2695 }
2696 Some(Distinct::On(exprs)) => {
2697 let ecx = &ExprContext {
2698 qcx,
2699 name: "DISTINCT ON clause",
2700 scope: &group_scope,
2701 relation_type: &qcx.relation_type(&relation_expr),
2702 allow_aggregates: true,
2703 allow_subqueries: true,
2704 allow_parameters: true,
2705 allow_windows: true,
2706 };
2707
2708 let mut distinct_exprs = vec![];
2709 for expr in &exprs {
2710 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2711 distinct_exprs.push(expr);
2712 }
2713
2714 let mut distinct_key = vec![];
2715
2716 let arity = relation_type.arity();
2726 for ord in order_by.iter().take(distinct_exprs.len()) {
2727 let mut expr = &HirScalarExpr::column(ord.column);
2730 if ord.column >= arity {
2731 expr = &map_exprs[ord.column - arity];
2732 };
2733 match distinct_exprs.iter().position(move |e| e == expr) {
2734 None => sql_bail!(
2735 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2736 ),
2737 Some(pos) => {
2738 distinct_exprs.remove(pos);
2739 }
2740 }
2741 distinct_key.push(ord.column);
2742 }
2743
2744 for expr in distinct_exprs {
2746 let column = match expr {
2749 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2750 _ => {
2751 map_exprs.push(expr);
2752 arity + map_exprs.len() - 1
2753 }
2754 };
2755 distinct_key.push(column);
2756 }
2757
2758 let distinct_len = distinct_key.len();
2763 relation_expr = HirRelationExpr::top_k(
2764 relation_expr.map(map_exprs),
2765 distinct_key,
2766 order_by.iter().skip(distinct_len).cloned().collect(),
2767 Some(HirScalarExpr::literal(
2768 Datum::Int64(1),
2769 SqlScalarType::Int64,
2770 )),
2771 HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2772 group_size_hints.distinct_on_input_group_size,
2773 );
2774 }
2775 }
2776
2777 order_by
2778 };
2779
2780 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2785
2786 Ok(SelectPlan {
2787 expr: relation_expr,
2788 scope,
2789 order_by,
2790 project: project_key,
2791 })
2792}
2793
2794fn plan_scalar_table_funcs(
2795 qcx: &QueryContext,
2796 table_funcs: BTreeMap<Function<Aug>, String>,
2797 table_func_names: &mut BTreeMap<String, Ident>,
2798 relation_expr: &HirRelationExpr,
2799 from_scope: &Scope,
2800) -> Result<(HirRelationExpr, Scope), PlanError> {
2801 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2802
2803 for (table_func, id) in table_funcs.iter() {
2804 table_func_names.insert(
2805 id.clone(),
2806 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2808 );
2809 }
2810 if table_funcs.len() == 1 {
2813 let (table_func, id) = table_funcs.iter().next().unwrap();
2814 let (expr, mut scope) =
2815 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2816
2817 let num_cols = scope.len();
2819 for i in 0..scope.len() {
2820 scope.items[i].table_name = Some(PartialItemName {
2821 database: None,
2822 schema: None,
2823 item: id.clone(),
2824 });
2825 scope.items[i].from_single_column_function = num_cols == 1;
2826 scope.items[i].allow_unqualified_references = false;
2827 }
2828 return Ok((expr, scope));
2829 }
2830 if table_funcs.keys().any(is_repeat_row) {
2831 bail_unsupported!(format!(
2834 "{} in a SELECT clause with multiple table functions",
2835 REPEAT_ROW_NAME
2836 ));
2837 }
2838 let (expr, mut scope, num_cols) =
2840 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2841
2842 let mut i = 0;
2844 for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2845 for _ in 0..num_cols {
2846 scope.items[i].table_name = Some(PartialItemName {
2847 database: None,
2848 schema: None,
2849 item: id.clone(),
2850 });
2851 scope.items[i].from_single_column_function = num_cols == 1;
2852 scope.items[i].allow_unqualified_references = false;
2853 i += 1;
2854 }
2855 scope.items[i].table_name = Some(PartialItemName {
2859 database: None,
2860 schema: None,
2861 item: id.clone(),
2862 });
2863 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2864 scope.items[i].allow_unqualified_references = false;
2865 i += 1;
2866 }
2867 scope.items[i].allow_unqualified_references = false;
2869 Ok((expr, scope))
2870}
2871
2872fn plan_group_by_expr<'a>(
2879 ecx: &ExprContext,
2880 group_expr: &'a Expr<Aug>,
2881 projection: &'a [(ExpandedSelectItem, ColumnName)],
2882) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2883 let plan_projection = |column: usize| match &projection[column].0 {
2884 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2885 ExpandedSelectItem::Expr(expr) => {
2886 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2887 }
2888 };
2889
2890 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2893 return plan_projection(column);
2894 }
2895
2896 match group_expr {
2900 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2901 Err(PlanError::UnknownColumn {
2902 table: None,
2903 column,
2904 similar,
2905 }) => {
2906 let mut iter = projection.iter().map(|(_expr, name)| name);
2909 if let Some(i) = iter.position(|n| *n == column) {
2910 if iter.any(|n| *n == column) {
2911 Err(PlanError::AmbiguousColumn(column))
2912 } else {
2913 plan_projection(i)
2914 }
2915 } else {
2916 Err(PlanError::UnknownColumn {
2919 table: None,
2920 column,
2921 similar,
2922 })
2923 }
2924 }
2925 res => Ok((Some(group_expr), res?)),
2926 },
2927 _ => Ok((
2928 Some(group_expr),
2929 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2930 )),
2931 }
2932}
2933
2934pub(crate) fn plan_order_by_exprs(
2942 ecx: &ExprContext,
2943 order_by_exprs: &[OrderByExpr<Aug>],
2944 output_columns: &[(usize, &ColumnName)],
2945) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2946 let mut order_by = vec![];
2947 let mut map_exprs = vec![];
2948 for obe in order_by_exprs {
2949 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2950 let column = match expr {
2953 HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2954 _ => {
2955 map_exprs.push(expr);
2956 ecx.relation_type.arity() + map_exprs.len() - 1
2957 }
2958 };
2959 order_by.push(resolve_desc_and_nulls_last(obe, column));
2960 }
2961 Ok((order_by, map_exprs))
2962}
2963
2964fn plan_order_by_or_distinct_expr(
2982 ecx: &ExprContext,
2983 expr: &Expr<Aug>,
2984 output_columns: &[(usize, &ColumnName)],
2985) -> Result<HirScalarExpr, PlanError> {
2986 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2987 return Ok(HirScalarExpr::column(output_columns[i].0));
2988 }
2989
2990 if let Expr::Identifier(names) = expr {
2991 if let [name] = &names[..] {
2992 let name = normalize::column_name(name.clone());
2993 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2994 if let Some((i, _)) = iter.next() {
2995 match iter.next() {
2996 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
3000 _ => return Ok(HirScalarExpr::column(*i)),
3001 }
3002 }
3003 }
3004 }
3005
3006 plan_expr(ecx, expr)?.type_as_any(ecx)
3007}
3008
3009fn plan_table_with_joins(
3010 qcx: &QueryContext,
3011 table_with_joins: &TableWithJoins<Aug>,
3012) -> Result<(HirRelationExpr, Scope), PlanError> {
3013 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
3014 for join in &table_with_joins.joins {
3015 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
3016 expr = new_expr;
3017 scope = new_scope;
3018 }
3019 Ok((expr, scope))
3020}
3021
3022fn plan_table_factor(
3023 qcx: &QueryContext,
3024 table_factor: &TableFactor<Aug>,
3025) -> Result<(HirRelationExpr, Scope), PlanError> {
3026 match table_factor {
3027 TableFactor::Table { name, alias } => {
3028 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3029 let scope = plan_table_alias(scope, alias.as_ref())?;
3030 Ok((expr, scope))
3031 }
3032
3033 TableFactor::Function {
3034 function,
3035 alias,
3036 with_ordinality,
3037 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3038
3039 TableFactor::RowsFrom {
3040 functions,
3041 alias,
3042 with_ordinality,
3043 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3044
3045 TableFactor::Derived {
3046 lateral,
3047 subquery,
3048 alias,
3049 } => {
3050 let mut qcx = (*qcx).clone();
3051 if !lateral {
3052 for scope in &mut qcx.outer_scopes {
3056 if scope.lateral_barrier {
3057 break;
3058 }
3059 scope.items.clear();
3060 }
3061 }
3062 qcx.outer_scopes[0].lateral_barrier = true;
3063 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3064 let scope = plan_table_alias(scope, alias.as_ref())?;
3065 Ok((expr, scope))
3066 }
3067
3068 TableFactor::NestedJoin { join, alias } => {
3069 let (expr, scope) = plan_table_with_joins(qcx, join)?;
3070 let scope = plan_table_alias(scope, alias.as_ref())?;
3071 Ok((expr, scope))
3072 }
3073 }
3074}
3075
3076fn plan_rows_from(
3118 qcx: &QueryContext,
3119 functions: &[Function<Aug>],
3120 alias: Option<&TableAlias>,
3121 with_ordinality: bool,
3122) -> Result<(HirRelationExpr, Scope), PlanError> {
3123 if functions.iter().any(is_repeat_row) {
3125 bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3129 }
3130
3131 if let [function] = functions {
3134 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3135 }
3136
3137 let (expr, mut scope, num_cols) = plan_rows_from_internal(
3141 qcx,
3142 functions,
3143 Some(functions[0].name.full_item_name().clone()),
3144 )?;
3145
3146 let mut columns = Vec::new();
3148 let mut offset = 0;
3149 for (idx, cols) in num_cols.into_iter().enumerate() {
3151 for i in 0..cols {
3152 columns.push(offset + i);
3153 }
3154 offset += cols + 1;
3155
3156 scope.items.remove(offset - idx - 1);
3159 }
3160
3161 if with_ordinality {
3164 columns.push(offset);
3165 } else {
3166 scope.items.pop();
3167 }
3168
3169 let expr = expr.project(columns);
3170
3171 let scope = plan_table_alias(scope, alias)?;
3172 Ok((expr, scope))
3173}
3174
3175fn is_repeat_row(f: &Function<Aug>) -> bool {
3176 f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3177}
3178
3179fn plan_rows_from_internal<'a>(
3202 qcx: &QueryContext,
3203 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3204 table_name: Option<FullItemName>,
3205) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3206 let mut functions = functions.into_iter();
3207 let mut num_cols = Vec::new();
3208
3209 let (mut left_expr, mut left_scope) =
3213 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3214 num_cols.push(left_scope.len() - 1);
3215 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3217 left_scope
3218 .items
3219 .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3220
3221 for function in functions {
3222 let qcx = qcx.empty_derived_context();
3224 let (right_expr, mut right_scope) =
3225 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3226 num_cols.push(right_scope.len() - 1);
3227 let left_col = left_scope.len() - 1;
3228 let right_col = left_scope.len() + right_scope.len() - 1;
3229 let on = HirScalarExpr::call_binary(
3230 HirScalarExpr::column(left_col),
3231 HirScalarExpr::column(right_col),
3232 expr_func::Eq,
3233 );
3234 left_expr = left_expr
3235 .join(right_expr, on, JoinKind::FullOuter)
3236 .map(vec![HirScalarExpr::call_variadic(
3237 Coalesce,
3238 vec![
3239 HirScalarExpr::column(left_col),
3240 HirScalarExpr::column(right_col),
3241 ],
3242 )]);
3243
3244 left_expr = left_expr.project(
3247 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3250 );
3251 right_scope.items.push(left_scope.items.pop().unwrap());
3253
3254 left_scope.items.extend(right_scope.items);
3255 }
3256
3257 Ok((left_expr, left_scope, num_cols))
3258}
3259
3260fn plan_solitary_table_function(
3264 qcx: &QueryContext,
3265 function: &Function<Aug>,
3266 alias: Option<&TableAlias>,
3267 with_ordinality: bool,
3268) -> Result<(HirRelationExpr, Scope), PlanError> {
3269 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3270
3271 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3272 if single_column_function {
3273 let item = &mut scope.items[0];
3274
3275 item.from_single_column_function = true;
3278
3279 if let Some(alias) = alias {
3294 if let ScopeItem {
3295 table_name: Some(table_name),
3296 column_name,
3297 ..
3298 } = item
3299 {
3300 if table_name.item.as_str() == column_name.as_str() {
3301 *column_name = normalize::column_name(alias.name.clone());
3302 }
3303 }
3304 }
3305 }
3306
3307 let scope = plan_table_alias(scope, alias)?;
3308 Ok((expr, scope))
3309}
3310
3311fn plan_table_function_internal(
3316 qcx: &QueryContext,
3317 Function {
3318 name,
3319 args,
3320 filter,
3321 over,
3322 distinct,
3323 }: &Function<Aug>,
3324 with_ordinality: bool,
3325 table_name: Option<FullItemName>,
3326) -> Result<(HirRelationExpr, Scope), PlanError> {
3327 assert_none!(filter, "cannot parse table function with FILTER");
3328 assert_none!(over, "cannot parse table function with OVER");
3329 assert!(!*distinct, "cannot parse table function with DISTINCT");
3330
3331 let ecx = &ExprContext {
3332 qcx,
3333 name: "table function arguments",
3334 scope: &Scope::empty(),
3335 relation_type: &SqlRelationType::empty(),
3336 allow_aggregates: false,
3337 allow_subqueries: true,
3338 allow_parameters: true,
3339 allow_windows: false,
3340 };
3341
3342 let scalar_args = match args {
3343 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3344 FunctionArgs::Args { args, order_by } => {
3345 if !order_by.is_empty() {
3346 sql_bail!(
3347 "ORDER BY specified, but {} is not an aggregate function",
3348 name
3349 );
3350 }
3351 plan_exprs(ecx, args)?
3352 }
3353 };
3354
3355 let table_name = match table_name {
3356 Some(table_name) => table_name.item,
3357 None => name.full_item_name().item.clone(),
3358 };
3359
3360 let scope_name = Some(PartialItemName {
3361 database: None,
3362 schema: None,
3363 item: table_name,
3364 });
3365
3366 let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3367 Func::Table(impls) => {
3368 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3369 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3370 let expr = match tf.imp {
3371 TableFuncImpl::CallTable { mut func, exprs } => {
3372 if with_ordinality {
3373 func = TableFunc::with_ordinality(func.clone()).ok_or(
3374 PlanError::Unsupported {
3375 feature: format!("WITH ORDINALITY on {}", func),
3376 discussion_no: None,
3377 },
3378 )?;
3379 }
3380 HirRelationExpr::CallTable { func, exprs }
3381 }
3382 TableFuncImpl::Expr(expr) => {
3383 if !with_ordinality {
3384 expr
3385 } else {
3386 if qcx
3390 .scx
3391 .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3392 {
3393 tracing::error!(
3397 %name,
3398 "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3399 );
3400 expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3401 func: WindowExprType::Scalar(ScalarWindowExpr {
3402 func: ScalarWindowFunc::RowNumber,
3403 order_by: vec![],
3404 }),
3405 partition_by: vec![],
3406 order_by: vec![],
3407 })])
3408 } else {
3409 bail_unsupported!(format!(
3410 "WITH ORDINALITY or ROWS FROM with {}",
3411 name
3412 ));
3413 }
3414 }
3415 }
3416 };
3417 (expr, scope)
3418 }
3419 Func::Scalar(impls) => {
3420 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3421 let output = expr.typ(
3422 &qcx.outer_relation_types,
3423 &SqlRelationType::new(vec![]),
3424 &qcx.scx.param_types.borrow(),
3425 );
3426
3427 let relation = SqlRelationType::new(vec![output]);
3428
3429 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3430 let column_name = normalize::column_name(function_ident);
3431 let name = column_name.to_string();
3432
3433 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3434
3435 let mut func = TableFunc::TabletizedScalar { relation, name };
3436 if with_ordinality {
3437 func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3438 feature: format!("WITH ORDINALITY on {}", func),
3439 discussion_no: None,
3440 })?;
3441 }
3442 (
3443 HirRelationExpr::CallTable {
3444 func,
3445 exprs: vec![expr],
3446 },
3447 scope,
3448 )
3449 }
3450 o => sql_bail!(
3451 "{} functions are not supported in functions in FROM",
3452 o.class()
3453 ),
3454 };
3455
3456 if with_ordinality {
3457 scope
3458 .items
3459 .push(ScopeItem::from_name(scope_name, "ordinality"));
3460 }
3461
3462 Ok((expr, scope))
3463}
3464
3465fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3466 if let Some(TableAlias {
3467 name,
3468 columns,
3469 strict,
3470 }) = alias
3471 {
3472 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3473 sql_bail!(
3474 "{} has {} columns available but {} columns specified",
3475 name,
3476 scope.items.len(),
3477 columns.len()
3478 );
3479 }
3480
3481 let table_name = normalize::ident(name.to_owned());
3482 for (i, item) in scope.items.iter_mut().enumerate() {
3483 item.table_name = if item.allow_unqualified_references {
3484 Some(PartialItemName {
3485 database: None,
3486 schema: None,
3487 item: table_name.clone(),
3488 })
3489 } else {
3490 None
3524 };
3525 item.column_name = columns
3526 .get(i)
3527 .map(|a| normalize::column_name(a.clone()))
3528 .unwrap_or_else(|| item.column_name.clone());
3529 }
3530 }
3531 Ok(scope)
3532}
3533
3534fn invent_column_name(
3538 ecx: &ExprContext,
3539 expr: &Expr<Aug>,
3540 table_func_names: &BTreeMap<String, Ident>,
3541) -> Option<ColumnName> {
3542 #[derive(Debug)]
3549 enum NameQuality {
3550 Low,
3551 High,
3552 }
3553
3554 fn invent(
3555 ecx: &ExprContext,
3556 expr: &Expr<Aug>,
3557 table_func_names: &BTreeMap<String, Ident>,
3558 ) -> Option<(ColumnName, NameQuality)> {
3559 match expr {
3560 Expr::Identifier(names) => {
3561 if let [name] = names.as_slice() {
3562 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3563 return Some((
3564 normalize::column_name(table_func_name.clone()),
3565 NameQuality::High,
3566 ));
3567 }
3568 }
3569 names
3570 .last()
3571 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3572 }
3573 Expr::Value(v) => match v {
3574 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3577 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3578 _ => None,
3579 },
3580 Expr::Function(func) => {
3581 let (schema, item) = match &func.name {
3582 ResolvedItemName::Item {
3583 qualifiers,
3584 full_name,
3585 ..
3586 } => (&qualifiers.schema_spec, full_name.item.clone()),
3587 _ => unreachable!(),
3588 };
3589
3590 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3591 || schema
3592 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3593 {
3594 None
3595 } else {
3596 Some((item.into(), NameQuality::High))
3597 }
3598 }
3599 Expr::HomogenizingFunction { function, .. } => Some((
3600 function.to_string().to_lowercase().into(),
3601 NameQuality::High,
3602 )),
3603 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3604 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3605 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3606 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3607 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3608 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3609 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3610 },
3611 Expr::Case { else_result, .. } => {
3612 match else_result
3613 .as_ref()
3614 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3615 {
3616 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3617 _ => Some(("case".into(), NameQuality::Low)),
3618 }
3619 }
3620 Expr::FieldAccess { field, .. } => {
3621 Some((normalize::column_name(field.clone()), NameQuality::High))
3622 }
3623 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3624 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3625 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3626 let (_expr, scope) =
3630 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3631 scope
3632 .items
3633 .first()
3634 .map(|name| (name.column_name.clone(), NameQuality::High))
3635 }
3636 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3637 _ => None,
3638 }
3639 }
3640
3641 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3642}
3643
3644#[derive(Debug)]
3645enum ExpandedSelectItem<'a> {
3646 InputOrdinal(usize),
3647 Expr(Cow<'a, Expr<Aug>>),
3648}
3649
3650impl ExpandedSelectItem<'_> {
3651 fn as_expr(&self) -> Option<&Expr<Aug>> {
3652 match self {
3653 ExpandedSelectItem::InputOrdinal(_) => None,
3654 ExpandedSelectItem::Expr(expr) => Some(expr),
3655 }
3656 }
3657}
3658
3659fn expand_select_item<'a>(
3660 ecx: &ExprContext,
3661 s: &'a SelectItem<Aug>,
3662 table_func_names: &BTreeMap<String, Ident>,
3663) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3664 match s {
3665 SelectItem::Expr {
3666 expr: Expr::QualifiedWildcard(table_name),
3667 alias: _,
3668 } => {
3669 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3670 let table_name =
3671 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3672 let out: Vec<_> = ecx
3673 .scope
3674 .items
3675 .iter()
3676 .enumerate()
3677 .filter(|(_i, item)| item.is_from_table(&table_name))
3678 .map(|(i, item)| {
3679 let name = item.column_name.clone();
3680 (ExpandedSelectItem::InputOrdinal(i), name)
3681 })
3682 .collect();
3683 if out.is_empty() {
3684 sql_bail!("no table named '{}' in scope", table_name);
3685 }
3686 Ok(out)
3687 }
3688 SelectItem::Expr {
3689 expr: Expr::WildcardAccess(sql_expr),
3690 alias: _,
3691 } => {
3692 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3693 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3699 let fields = match ecx.scalar_type(&expr) {
3700 SqlScalarType::Record { fields, .. } => fields,
3701 ty => sql_bail!(
3702 "type {} is not composite",
3703 ecx.humanize_sql_scalar_type(&ty, false)
3704 ),
3705 };
3706 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3707 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3708 if let [name] = ident.as_slice() {
3709 if let Ok(items) = ecx.scope.items_from_table(
3710 &[],
3711 &PartialItemName {
3712 database: None,
3713 schema: None,
3714 item: name.as_str().to_string(),
3715 },
3716 ) {
3717 for (_, item) in items {
3718 if item
3719 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3720 {
3721 skip_cols.insert(item.column_name.clone());
3722 }
3723 }
3724 }
3725 }
3726 }
3727 let items = fields
3728 .iter()
3729 .filter_map(|(name, _ty)| {
3730 if skip_cols.contains(name) {
3731 None
3732 } else {
3733 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3734 expr: sql_expr.clone(),
3735 field: name.clone().into(),
3736 }));
3737 Some((item, name.clone()))
3738 }
3739 })
3740 .collect();
3741 Ok(items)
3742 }
3743 SelectItem::Wildcard => {
3744 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3745 let items: Vec<_> = ecx
3746 .scope
3747 .items
3748 .iter()
3749 .enumerate()
3750 .filter(|(_i, item)| item.allow_unqualified_references)
3751 .map(|(i, item)| {
3752 let name = item.column_name.clone();
3753 (ExpandedSelectItem::InputOrdinal(i), name)
3754 })
3755 .collect();
3756
3757 Ok(items)
3758 }
3759 SelectItem::Expr { expr, alias } => {
3760 let name = alias
3761 .clone()
3762 .map(normalize::column_name)
3763 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3764 .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3765 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3766 }
3767 }
3768}
3769
3770fn plan_join(
3771 left_qcx: &QueryContext,
3772 left: HirRelationExpr,
3773 left_scope: Scope,
3774 join: &Join<Aug>,
3775) -> Result<(HirRelationExpr, Scope), PlanError> {
3776 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3777 let (kind, constraint) = match &join.join_operator {
3778 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3779 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3780 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3781 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3782 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3783 };
3784
3785 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3786 if !kind.can_be_correlated() {
3787 for item in &mut right_qcx.outer_scopes[0].items {
3788 item.error_if_referenced =
3793 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3794 table: table.cloned(),
3795 column: column.clone(),
3796 });
3797 }
3798 }
3799 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3800
3801 let (expr, scope) = match constraint {
3802 JoinConstraint::On(expr) => {
3803 let product_scope = left_scope.product(right_scope)?;
3804 let ecx = &ExprContext {
3805 qcx: left_qcx,
3806 name: "ON clause",
3807 scope: &product_scope,
3808 relation_type: &SqlRelationType::new(
3809 left_qcx
3810 .relation_type(&left)
3811 .column_types
3812 .into_iter()
3813 .chain(right_qcx.relation_type(&right).column_types)
3814 .collect(),
3815 ),
3816 allow_aggregates: false,
3817 allow_subqueries: true,
3818 allow_parameters: true,
3819 allow_windows: false,
3820 };
3821 let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3822 let joined = left.join(right, on, kind);
3823 (joined, product_scope)
3824 }
3825 JoinConstraint::Using { columns, alias } => {
3826 let column_names = columns
3827 .iter()
3828 .map(|ident| normalize::column_name(ident.clone()))
3829 .collect::<Vec<_>>();
3830
3831 plan_using_constraint(
3832 &column_names,
3833 left_qcx,
3834 left,
3835 left_scope,
3836 &right_qcx,
3837 right,
3838 right_scope,
3839 kind,
3840 alias.as_ref(),
3841 )?
3842 }
3843 JoinConstraint::Natural => {
3844 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3847 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3848 let left_column_names = left_scope.column_names();
3849 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3850 let column_names: Vec<_> = left_column_names
3851 .filter(|col| right_column_names.contains(col))
3852 .cloned()
3853 .collect();
3854 plan_using_constraint(
3855 &column_names,
3856 left_qcx,
3857 left,
3858 left_scope,
3859 &right_qcx,
3860 right,
3861 right_scope,
3862 kind,
3863 None,
3864 )?
3865 }
3866 };
3867 Ok((expr, scope))
3868}
3869
3870#[allow(clippy::too_many_arguments)]
3872fn plan_using_constraint(
3873 column_names: &[ColumnName],
3874 left_qcx: &QueryContext,
3875 left: HirRelationExpr,
3876 left_scope: Scope,
3877 right_qcx: &QueryContext,
3878 right: HirRelationExpr,
3879 right_scope: Scope,
3880 kind: JoinKind,
3881 alias: Option<&Ident>,
3882) -> Result<(HirRelationExpr, Scope), PlanError> {
3883 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3884
3885 let mut unique_column_names = BTreeSet::new();
3888 for c in column_names {
3889 if !unique_column_names.insert(c) {
3890 return Err(PlanError::Unsupported {
3891 feature: format!(
3892 "column name {} appears more than once in USING clause",
3893 c.quoted()
3894 ),
3895 discussion_no: None,
3896 });
3897 }
3898 }
3899
3900 let alias_item_name = alias.map(|alias| PartialItemName {
3901 database: None,
3902 schema: None,
3903 item: alias.clone().to_string(),
3904 });
3905
3906 if let Some(alias_item_name) = &alias_item_name {
3907 for partial_item_name in both_scope.table_names() {
3908 if partial_item_name.matches(alias_item_name) {
3909 sql_bail!(
3910 "table name \"{}\" specified more than once",
3911 alias_item_name
3912 )
3913 }
3914 }
3915 }
3916
3917 let ecx = &ExprContext {
3918 qcx: right_qcx,
3919 name: "USING clause",
3920 scope: &both_scope,
3921 relation_type: &SqlRelationType::new(
3922 left_qcx
3923 .relation_type(&left)
3924 .column_types
3925 .into_iter()
3926 .chain(right_qcx.relation_type(&right).column_types)
3927 .collect(),
3928 ),
3929 allow_aggregates: false,
3930 allow_subqueries: false,
3931 allow_parameters: false,
3932 allow_windows: false,
3933 };
3934
3935 let mut join_exprs = vec![];
3936 let mut map_exprs = vec![];
3937 let mut new_items = vec![];
3938 let mut join_cols = vec![];
3939 let mut hidden_cols = vec![];
3940
3941 for column_name in column_names {
3942 let (lhs, lhs_name) = left_scope.resolve_using_column(
3944 column_name,
3945 JoinSide::Left,
3946 &mut left_qcx.name_manager.borrow_mut(),
3947 )?;
3948 let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3949 column_name,
3950 JoinSide::Right,
3951 &mut right_qcx.name_manager.borrow_mut(),
3952 )?;
3953
3954 rhs.column += left_scope.len();
3956
3957 let mut exprs = coerce_homogeneous_exprs(
3959 &ecx.with_name(&format!(
3960 "NATURAL/USING join column {}",
3961 column_name.quoted()
3962 )),
3963 vec![
3964 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3965 lhs,
3966 Arc::clone(&lhs_name),
3967 )),
3968 CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3969 rhs,
3970 Arc::clone(&rhs_name),
3971 )),
3972 ],
3973 None,
3974 )?;
3975 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3976
3977 match kind {
3978 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3979 join_cols.push(lhs.column);
3980 hidden_cols.push(rhs.column);
3981 }
3982 JoinKind::RightOuter => {
3983 join_cols.push(rhs.column);
3984 hidden_cols.push(lhs.column);
3985 }
3986 JoinKind::FullOuter => {
3987 join_cols.push(both_scope.items.len() + map_exprs.len());
3990 hidden_cols.push(lhs.column);
3991 hidden_cols.push(rhs.column);
3992 map_exprs.push(HirScalarExpr::call_variadic(
3993 Coalesce,
3994 vec![expr1.clone(), expr2.clone()],
3995 ));
3996 new_items.push(ScopeItem::from_column_name(column_name));
3997 }
3998 }
3999
4000 if alias_item_name.is_some() {
4005 let new_item_col = both_scope.items.len() + new_items.len();
4006 join_cols.push(new_item_col);
4007 hidden_cols.push(new_item_col);
4008
4009 new_items.push(ScopeItem::from_name(
4010 alias_item_name.clone(),
4011 column_name.clone().to_string(),
4012 ));
4013
4014 map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
4018 }
4019
4020 join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4021 }
4022 both_scope.items.extend(new_items);
4023
4024 for c in hidden_cols {
4028 both_scope.items[c].allow_unqualified_references = false;
4029 }
4030
4031 let project_key = join_cols
4033 .into_iter()
4034 .chain(0..both_scope.items.len())
4035 .unique()
4036 .collect::<Vec<_>>();
4037
4038 both_scope = both_scope.project(&project_key);
4039
4040 let on = HirScalarExpr::variadic_and(join_exprs);
4041
4042 let both = left
4043 .join(right, on, kind)
4044 .map(map_exprs)
4045 .project(project_key);
4046 Ok((both, both_scope))
4047}
4048
4049pub fn plan_expr<'a>(
4050 ecx: &'a ExprContext,
4051 e: &Expr<Aug>,
4052) -> Result<CoercibleScalarExpr, PlanError> {
4053 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4054}
4055
4056fn plan_expr_inner<'a>(
4057 ecx: &'a ExprContext,
4058 e: &Expr<Aug>,
4059) -> Result<CoercibleScalarExpr, PlanError> {
4060 if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4061 return Ok(HirScalarExpr::named_column(
4063 i,
4064 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4065 )
4066 .into());
4067 }
4068
4069 match e {
4070 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4072 Ok(plan_identifier(ecx, names)?.into())
4073 }
4074
4075 Expr::Value(val) => plan_literal(val),
4077 Expr::Parameter(n) => plan_parameter(ecx, *n),
4078 Expr::Array(exprs) => plan_array(ecx, exprs, None),
4079 Expr::List(exprs) => plan_list(ecx, exprs, None),
4080 Expr::Map(exprs) => plan_map(ecx, exprs, None),
4081 Expr::Row { exprs } => plan_row(ecx, exprs),
4082
4083 Expr::Op { op, expr1, expr2 } => {
4085 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4086 }
4087 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4088 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4089
4090 Expr::Not { expr } => plan_not(ecx, expr),
4092 Expr::And { left, right } => plan_and(ecx, left, right),
4093 Expr::Or { left, right } => plan_or(ecx, left, right),
4094 Expr::IsExpr {
4095 expr,
4096 construct,
4097 negated,
4098 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4099 Expr::Case {
4100 operand,
4101 conditions,
4102 results,
4103 else_result,
4104 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4105 Expr::HomogenizingFunction { function, exprs } => {
4106 plan_homogenizing_function(ecx, function, exprs)
4107 }
4108 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4109 ecx,
4110 &None,
4111 &[l_expr.clone().equals(*r_expr.clone())],
4112 &[Expr::null()],
4113 &Some(Box::new(*l_expr.clone())),
4114 )?
4115 .into()),
4116 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4117 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4118 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4119 Expr::Like {
4120 expr,
4121 pattern,
4122 escape,
4123 case_insensitive,
4124 negated,
4125 } => Ok(plan_like(
4126 ecx,
4127 expr,
4128 pattern,
4129 escape.as_deref(),
4130 *case_insensitive,
4131 *negated,
4132 )?
4133 .into()),
4134
4135 Expr::InList {
4136 expr,
4137 list,
4138 negated,
4139 } => plan_in_list(ecx, expr, list, negated),
4140
4141 Expr::Exists(query) => plan_exists(ecx, query),
4143 Expr::Subquery(query) => plan_subquery(ecx, query),
4144 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4145 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4146 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4147 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4148 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4149 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4150 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4151 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4152 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4153 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4154 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4155 }
4156}
4157
4158fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4159 if !ecx.allow_parameters {
4160 return Err(PlanError::UnknownParameter(n));
4164 }
4165 if n == 0 || n > 65536 {
4166 return Err(PlanError::UnknownParameter(n));
4167 }
4168 if ecx.param_types().borrow().contains_key(&n) {
4169 Ok(HirScalarExpr::parameter(n).into())
4170 } else {
4171 Ok(CoercibleScalarExpr::Parameter(n))
4172 }
4173}
4174
4175fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4176 let mut out = vec![];
4177 for e in exprs {
4178 out.push(plan_expr(ecx, e)?);
4179 }
4180 Ok(CoercibleScalarExpr::LiteralRecord(out))
4181}
4182
4183fn plan_cast(
4184 ecx: &ExprContext,
4185 expr: &Expr<Aug>,
4186 data_type: &ResolvedDataType,
4187) -> Result<CoercibleScalarExpr, PlanError> {
4188 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4189 let expr = match expr {
4190 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4199 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4200 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4201 _ => plan_expr(ecx, expr)?,
4202 };
4203 let ecx = &ecx.with_name("CAST");
4204 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4205 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4206 Ok(expr.into())
4207}
4208
4209fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4210 let ecx = ecx.with_name("NOT argument");
4211 Ok(plan_expr(&ecx, expr)?
4212 .type_as(&ecx, &SqlScalarType::Bool)?
4213 .call_unary(UnaryFunc::Not(expr_func::Not))
4214 .into())
4215}
4216
4217fn plan_and(
4218 ecx: &ExprContext,
4219 left: &Expr<Aug>,
4220 right: &Expr<Aug>,
4221) -> Result<CoercibleScalarExpr, PlanError> {
4222 let ecx = ecx.with_name("AND argument");
4223 Ok(HirScalarExpr::variadic_and(vec![
4224 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4225 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4226 ])
4227 .into())
4228}
4229
4230fn plan_or(
4231 ecx: &ExprContext,
4232 left: &Expr<Aug>,
4233 right: &Expr<Aug>,
4234) -> Result<CoercibleScalarExpr, PlanError> {
4235 let ecx = ecx.with_name("OR argument");
4236 Ok(HirScalarExpr::variadic_or(vec![
4237 plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4238 plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4239 ])
4240 .into())
4241}
4242
4243fn plan_in_list(
4244 ecx: &ExprContext,
4245 lhs: &Expr<Aug>,
4246 list: &Vec<Expr<Aug>>,
4247 negated: &bool,
4248) -> Result<CoercibleScalarExpr, PlanError> {
4249 let ecx = ecx.with_name("IN list");
4250 let or = HirScalarExpr::variadic_or(
4251 list.into_iter()
4252 .map(|e| {
4253 let eq = lhs.clone().equals(e.clone());
4254 plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4255 })
4256 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4257 );
4258 Ok(if *negated {
4259 or.call_unary(UnaryFunc::Not(expr_func::Not))
4260 } else {
4261 or
4262 }
4263 .into())
4264}
4265
4266fn plan_homogenizing_function(
4267 ecx: &ExprContext,
4268 function: &HomogenizingFunction,
4269 exprs: &[Expr<Aug>],
4270) -> Result<CoercibleScalarExpr, PlanError> {
4271 assert!(!exprs.is_empty()); let expr = HirScalarExpr::call_variadic(
4273 match function {
4274 HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4275 HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4276 HomogenizingFunction::Least => VariadicFunc::from(Least),
4277 },
4278 coerce_homogeneous_exprs(
4279 &ecx.with_name(&function.to_string().to_lowercase()),
4280 plan_exprs(ecx, exprs)?,
4281 None,
4282 )?,
4283 );
4284 Ok(expr.into())
4285}
4286
4287fn plan_field_access(
4288 ecx: &ExprContext,
4289 expr: &Expr<Aug>,
4290 field: &Ident,
4291) -> Result<CoercibleScalarExpr, PlanError> {
4292 let field = normalize::column_name(field.clone());
4293 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4294 let ty = ecx.scalar_type(&expr);
4295 let i = match &ty {
4296 SqlScalarType::Record { fields, .. } => {
4297 fields.iter().position(|(name, _ty)| *name == field)
4298 }
4299 ty => sql_bail!(
4300 "column notation applied to type {}, which is not a composite type",
4301 ecx.humanize_sql_scalar_type(ty, false)
4302 ),
4303 };
4304 match i {
4305 None => sql_bail!(
4306 "field {} not found in data type {}",
4307 field,
4308 ecx.humanize_sql_scalar_type(&ty, false)
4309 ),
4310 Some(i) => Ok(expr
4311 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4312 .into()),
4313 }
4314}
4315
4316fn plan_subscript(
4317 ecx: &ExprContext,
4318 expr: &Expr<Aug>,
4319 positions: &[SubscriptPosition<Aug>],
4320) -> Result<CoercibleScalarExpr, PlanError> {
4321 assert!(
4322 !positions.is_empty(),
4323 "subscript expression must contain at least one position"
4324 );
4325
4326 let ecx = &ecx.with_name("subscripting");
4327 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4328 let ty = ecx.scalar_type(&expr);
4329 match &ty {
4330 SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4331 ecx,
4332 expr,
4333 positions,
4334 if ty == SqlScalarType::Int2Vector {
4338 1
4339 } else {
4340 0
4341 },
4342 ),
4343 SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4344 SqlScalarType::List { element_type, .. } => {
4345 let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4347 let n_layers = ty.unwrap_list_n_layers();
4348 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4349 }
4350 ty => sql_bail!(
4351 "cannot subscript type {}",
4352 ecx.humanize_sql_scalar_type(ty, false)
4353 ),
4354 }
4355}
4356
4357fn extract_scalar_subscript_from_positions<'a>(
4361 positions: &'a [SubscriptPosition<Aug>],
4362 expr_type_name: &str,
4363) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4364 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4365 for p in positions {
4366 if p.explicit_slice {
4367 sql_bail!("{} subscript does not support slices", expr_type_name);
4368 }
4369 assert!(
4370 p.end.is_none(),
4371 "index-appearing subscripts cannot have end value"
4372 );
4373 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4374 }
4375 Ok(scalar_subscripts)
4376}
4377
4378fn plan_subscript_array(
4379 ecx: &ExprContext,
4380 expr: HirScalarExpr,
4381 positions: &[SubscriptPosition<Aug>],
4382 offset: i64,
4383) -> Result<CoercibleScalarExpr, PlanError> {
4384 let mut exprs = Vec::with_capacity(positions.len() + 1);
4385 exprs.push(expr);
4386
4387 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4390
4391 for i in indexes {
4392 exprs.push(plan_expr(ecx, i)?.cast_to(
4393 ecx,
4394 CastContext::Explicit,
4395 &SqlScalarType::Int64,
4396 )?);
4397 }
4398
4399 Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4400}
4401
4402fn plan_subscript_list(
4403 ecx: &ExprContext,
4404 mut expr: HirScalarExpr,
4405 positions: &[SubscriptPosition<Aug>],
4406 mut remaining_layers: usize,
4407 elem_type_name: &str,
4408) -> Result<CoercibleScalarExpr, PlanError> {
4409 let mut i = 0;
4410
4411 while i < positions.len() {
4412 let j = positions[i..]
4414 .iter()
4415 .position(|p| p.explicit_slice)
4416 .unwrap_or(positions.len() - i);
4417 if j != 0 {
4418 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4419 let (n, e) = plan_index_list(
4420 ecx,
4421 expr,
4422 indexes.as_slice(),
4423 remaining_layers,
4424 elem_type_name,
4425 )?;
4426 remaining_layers = n;
4427 expr = e;
4428 i += j;
4429 }
4430
4431 let j = positions[i..]
4433 .iter()
4434 .position(|p| !p.explicit_slice)
4435 .unwrap_or(positions.len() - i);
4436 if j != 0 {
4437 expr = plan_slice_list(
4438 ecx,
4439 expr,
4440 &positions[i..i + j],
4441 remaining_layers,
4442 elem_type_name,
4443 )?;
4444 i += j;
4445 }
4446 }
4447
4448 Ok(expr.into())
4449}
4450
4451fn plan_index_list(
4452 ecx: &ExprContext,
4453 expr: HirScalarExpr,
4454 indexes: &[&Expr<Aug>],
4455 n_layers: usize,
4456 elem_type_name: &str,
4457) -> Result<(usize, HirScalarExpr), PlanError> {
4458 let depth = indexes.len();
4459
4460 if depth > n_layers {
4461 if n_layers == 0 {
4462 sql_bail!("cannot subscript type {}", elem_type_name)
4463 } else {
4464 sql_bail!(
4465 "cannot index into {} layers; list only has {} layer{}",
4466 depth,
4467 n_layers,
4468 if n_layers == 1 { "" } else { "s" }
4469 )
4470 }
4471 }
4472
4473 let mut exprs = Vec::with_capacity(depth + 1);
4474 exprs.push(expr);
4475
4476 for i in indexes {
4477 exprs.push(plan_expr(ecx, i)?.cast_to(
4478 ecx,
4479 CastContext::Explicit,
4480 &SqlScalarType::Int64,
4481 )?);
4482 }
4483
4484 Ok((
4485 n_layers - depth,
4486 HirScalarExpr::call_variadic(ListIndex, exprs),
4487 ))
4488}
4489
4490fn plan_slice_list(
4491 ecx: &ExprContext,
4492 expr: HirScalarExpr,
4493 slices: &[SubscriptPosition<Aug>],
4494 n_layers: usize,
4495 elem_type_name: &str,
4496) -> Result<HirScalarExpr, PlanError> {
4497 if n_layers == 0 {
4498 sql_bail!("cannot subscript type {}", elem_type_name)
4499 }
4500
4501 let mut exprs = Vec::with_capacity(slices.len() + 1);
4503 exprs.push(expr);
4504 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4506 Ok(match position {
4507 Some(p) => {
4508 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4509 }
4510 None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4511 })
4512 };
4513 for p in slices {
4514 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4515 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4516 exprs.push(start);
4517 exprs.push(end);
4518 }
4519
4520 Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4521}
4522
4523fn plan_like(
4524 ecx: &ExprContext,
4525 expr: &Expr<Aug>,
4526 pattern: &Expr<Aug>,
4527 escape: Option<&Expr<Aug>>,
4528 case_insensitive: bool,
4529 not: bool,
4530) -> Result<HirScalarExpr, PlanError> {
4531 use CastContext::Implicit;
4532 let ecx = ecx.with_name("LIKE argument");
4533 let expr = plan_expr(&ecx, expr)?;
4534 let haystack = match ecx.scalar_type(&expr) {
4535 CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4536 .type_as(&ecx, ty)?
4537 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4538 _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4539 };
4540 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4541 if let Some(escape) = escape {
4542 pattern = pattern.call_binary(
4543 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4544 expr_func::LikeEscape,
4545 );
4546 }
4547 let func: BinaryFunc = if case_insensitive {
4548 expr_func::IsLikeMatchCaseInsensitive.into()
4549 } else {
4550 expr_func::IsLikeMatchCaseSensitive.into()
4551 };
4552 let like = haystack.call_binary(pattern, func);
4553 if not {
4554 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4555 } else {
4556 Ok(like)
4557 }
4558}
4559
4560fn plan_subscript_jsonb(
4561 ecx: &ExprContext,
4562 expr: HirScalarExpr,
4563 positions: &[SubscriptPosition<Aug>],
4564) -> Result<CoercibleScalarExpr, PlanError> {
4565 use CastContext::Implicit;
4566 use SqlScalarType::{Int64, String};
4567
4568 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4571
4572 let mut exprs = Vec::with_capacity(subscripts.len());
4573 for s in subscripts {
4574 let subscript = plan_expr(ecx, s)?;
4575 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4576 subscript
4577 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4578 typeconv::to_string(ecx, subscript)
4582 } else {
4583 sql_bail!("jsonb subscript type must be coercible to integer or text");
4584 };
4585 exprs.push(subscript);
4586 }
4587
4588 let expr = expr.call_binary(
4591 HirScalarExpr::call_variadic(
4592 ArrayCreate {
4593 elem_type: SqlScalarType::String,
4594 },
4595 exprs,
4596 ),
4597 expr_func::JsonbGetPath,
4598 );
4599 Ok(expr.into())
4600}
4601
4602fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4603 if !ecx.allow_subqueries {
4604 sql_bail!("{} does not allow subqueries", ecx.name)
4605 }
4606 let mut qcx = ecx.derived_query_context();
4607 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4608 Ok(expr.exists().into())
4609}
4610
4611fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4612 if !ecx.allow_subqueries {
4613 sql_bail!("{} does not allow subqueries", ecx.name)
4614 }
4615 let mut qcx = ecx.derived_query_context();
4616 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4617 let column_types = qcx.relation_type(&expr).column_types;
4618 if column_types.len() != 1 {
4619 sql_bail!(
4620 "Expected subselect to return 1 column, got {} columns",
4621 column_types.len()
4622 );
4623 }
4624 Ok(expr.select().into())
4625}
4626
4627fn plan_list_subquery(
4628 ecx: &ExprContext,
4629 query: &Query<Aug>,
4630) -> Result<CoercibleScalarExpr, PlanError> {
4631 plan_vector_like_subquery(
4632 ecx,
4633 query,
4634 |_| false,
4635 |elem_type| ListCreate { elem_type }.into(),
4636 |order_by| AggregateFunc::ListConcat { order_by },
4637 expr_func::ListListConcat.into(),
4638 |elem_type| {
4639 HirScalarExpr::literal(
4640 Datum::empty_list(),
4641 SqlScalarType::List {
4642 element_type: Box::new(elem_type),
4643 custom_id: None,
4644 },
4645 )
4646 },
4647 "list",
4648 )
4649}
4650
4651fn plan_array_subquery(
4652 ecx: &ExprContext,
4653 query: &Query<Aug>,
4654) -> Result<CoercibleScalarExpr, PlanError> {
4655 plan_vector_like_subquery(
4656 ecx,
4657 query,
4658 |elem_type| {
4659 matches!(
4660 elem_type,
4661 SqlScalarType::Char { .. }
4662 | SqlScalarType::Array { .. }
4663 | SqlScalarType::List { .. }
4664 | SqlScalarType::Map { .. }
4665 )
4666 },
4667 |elem_type| ArrayCreate { elem_type }.into(),
4668 |order_by| AggregateFunc::ArrayConcat { order_by },
4669 expr_func::ArrayArrayConcat.into(),
4670 |elem_type| {
4671 HirScalarExpr::literal(
4672 Datum::empty_array(),
4673 SqlScalarType::Array(Box::new(elem_type)),
4674 )
4675 },
4676 "[]",
4677 )
4678}
4679
4680fn plan_vector_like_subquery<F1, F2, F3, F4>(
4682 ecx: &ExprContext,
4683 query: &Query<Aug>,
4684 is_unsupported_type: F1,
4685 vector_create: F2,
4686 aggregate_concat: F3,
4687 binary_concat: BinaryFunc,
4688 empty_literal: F4,
4689 vector_type_string: &str,
4690) -> Result<CoercibleScalarExpr, PlanError>
4691where
4692 F1: Fn(&SqlScalarType) -> bool,
4693 F2: Fn(SqlScalarType) -> VariadicFunc,
4694 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4695 F4: Fn(SqlScalarType) -> HirScalarExpr,
4696{
4697 if !ecx.allow_subqueries {
4698 sql_bail!("{} does not allow subqueries", ecx.name)
4699 }
4700
4701 let mut qcx = ecx.derived_query_context();
4702 let mut planned_query = plan_query(&mut qcx, query)?;
4703 if planned_query.limit.is_some()
4704 || !planned_query
4705 .offset
4706 .clone()
4707 .try_into_literal_int64()
4708 .is_ok_and(|offset| offset == 0)
4709 {
4710 planned_query.expr = HirRelationExpr::top_k(
4711 planned_query.expr,
4712 vec![],
4713 planned_query.order_by.clone(),
4714 planned_query.limit,
4715 planned_query.offset,
4716 planned_query.group_size_hints.limit_input_group_size,
4717 );
4718 }
4719
4720 if planned_query.project.len() != 1 {
4721 sql_bail!(
4722 "Expected subselect to return 1 column, got {} columns",
4723 planned_query.project.len()
4724 );
4725 }
4726
4727 let project_column = *planned_query.project.get(0).unwrap();
4728 let elem_type = qcx
4729 .relation_type(&planned_query.expr)
4730 .column_types
4731 .get(project_column)
4732 .cloned()
4733 .unwrap()
4734 .scalar_type();
4735
4736 if is_unsupported_type(&elem_type) {
4737 bail_unsupported!(format!(
4738 "cannot build array from subquery because return type {}{}",
4739 ecx.humanize_sql_scalar_type(&elem_type, false),
4740 vector_type_string
4741 ));
4742 }
4743
4744 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4747 vector_create(elem_type.clone()),
4748 vec![HirScalarExpr::column(project_column)],
4749 ))
4750 .chain(
4751 planned_query
4752 .order_by
4753 .iter()
4754 .map(|co| HirScalarExpr::column(co.column)),
4755 )
4756 .collect();
4757
4758 let aggregation_projection = vec![0];
4762 let aggregation_order_by = planned_query
4763 .order_by
4764 .into_iter()
4765 .enumerate()
4766 .map(|(i, order)| ColumnOrder { column: i, ..order })
4767 .collect();
4768
4769 let reduced_expr = planned_query
4770 .expr
4771 .reduce(
4772 vec![],
4773 vec![AggregateExpr {
4774 func: aggregate_concat(aggregation_order_by),
4775 expr: Box::new(HirScalarExpr::call_variadic(
4776 RecordCreate {
4777 field_names: iter::repeat(ColumnName::from(""))
4778 .take(aggregation_exprs.len())
4779 .collect(),
4780 },
4781 aggregation_exprs,
4782 )),
4783 distinct: false,
4784 }],
4785 None,
4786 )
4787 .project(aggregation_projection);
4788
4789 Ok(reduced_expr
4791 .select()
4792 .call_binary(empty_literal(elem_type), binary_concat)
4793 .into())
4794}
4795
4796fn plan_map_subquery(
4797 ecx: &ExprContext,
4798 query: &Query<Aug>,
4799) -> Result<CoercibleScalarExpr, PlanError> {
4800 if !ecx.allow_subqueries {
4801 sql_bail!("{} does not allow subqueries", ecx.name)
4802 }
4803
4804 let mut qcx = ecx.derived_query_context();
4805 let mut query = plan_query(&mut qcx, query)?;
4806 if query.limit.is_some()
4807 || !query
4808 .offset
4809 .clone()
4810 .try_into_literal_int64()
4811 .is_ok_and(|offset| offset == 0)
4812 {
4813 query.expr = HirRelationExpr::top_k(
4814 query.expr,
4815 vec![],
4816 query.order_by.clone(),
4817 query.limit,
4818 query.offset,
4819 query.group_size_hints.limit_input_group_size,
4820 );
4821 }
4822 if query.project.len() != 2 {
4823 sql_bail!(
4824 "expected map subquery to return 2 columns, got {} columns",
4825 query.project.len()
4826 );
4827 }
4828
4829 let query_types = qcx.relation_type(&query.expr).column_types;
4830 let key_column = query.project[0];
4831 let key_type = query_types[key_column].clone().scalar_type();
4832 let value_column = query.project[1];
4833 let value_type = query_types[value_column].clone().scalar_type();
4834
4835 if key_type != SqlScalarType::String {
4836 sql_bail!("cannot build map from subquery because first column is not of type text");
4837 }
4838
4839 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4840 RecordCreate {
4841 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4842 },
4843 vec![
4844 HirScalarExpr::column(key_column),
4845 HirScalarExpr::column(value_column),
4846 ],
4847 ))
4848 .chain(
4849 query
4850 .order_by
4851 .iter()
4852 .map(|co| HirScalarExpr::column(co.column)),
4853 )
4854 .collect();
4855
4856 let expr = query
4857 .expr
4858 .reduce(
4859 vec![],
4860 vec![AggregateExpr {
4861 func: AggregateFunc::MapAgg {
4862 order_by: query
4863 .order_by
4864 .into_iter()
4865 .enumerate()
4866 .map(|(i, order)| ColumnOrder { column: i, ..order })
4867 .collect(),
4868 value_type: value_type.clone(),
4869 },
4870 expr: Box::new(HirScalarExpr::call_variadic(
4871 RecordCreate {
4872 field_names: iter::repeat(ColumnName::from(""))
4873 .take(aggregation_exprs.len())
4874 .collect(),
4875 },
4876 aggregation_exprs,
4877 )),
4878 distinct: false,
4879 }],
4880 None,
4881 )
4882 .project(vec![0]);
4883
4884 let expr = HirScalarExpr::call_variadic(
4886 Coalesce,
4887 vec![
4888 expr.select(),
4889 HirScalarExpr::literal(
4890 Datum::empty_map(),
4891 SqlScalarType::Map {
4892 value_type: Box::new(value_type),
4893 custom_id: None,
4894 },
4895 ),
4896 ],
4897 );
4898
4899 Ok(expr.into())
4900}
4901
4902fn plan_collate(
4903 ecx: &ExprContext,
4904 expr: &Expr<Aug>,
4905 collation: &UnresolvedItemName,
4906) -> Result<CoercibleScalarExpr, PlanError> {
4907 if collation.0.len() == 2
4908 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4909 && collation.0[1] == ident!("default")
4910 {
4911 plan_expr(ecx, expr)
4912 } else {
4913 bail_unsupported!("COLLATE");
4914 }
4915}
4916
4917fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4924where
4925 E: std::borrow::Borrow<Expr<Aug>>,
4926{
4927 let mut out = vec![];
4928 for expr in exprs {
4929 out.push(plan_expr(ecx, expr.borrow())?);
4930 }
4931 Ok(out)
4932}
4933
4934fn plan_array(
4936 ecx: &ExprContext,
4937 exprs: &[Expr<Aug>],
4938 type_hint: Option<&SqlScalarType>,
4939) -> Result<CoercibleScalarExpr, PlanError> {
4940 let mut out = vec![];
4942 for expr in exprs {
4943 out.push(match expr {
4944 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4947 _ => plan_expr(ecx, expr)?,
4948 });
4949 }
4950
4951 let type_hint = match type_hint {
4953 Some(SqlScalarType::Array(elem_type)) => {
4958 let multidimensional = out.iter().any(|e| {
4959 matches!(
4960 ecx.scalar_type(e),
4961 CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4962 )
4963 });
4964 if multidimensional {
4965 type_hint
4966 } else {
4967 Some(&**elem_type)
4968 }
4969 }
4970 Some(_) => None,
4974 None => None,
4976 };
4977
4978 let (elem_type, exprs) = if exprs.is_empty() {
4980 if let Some(elem_type) = type_hint {
4981 (elem_type.clone(), vec![])
4982 } else {
4983 sql_bail!("cannot determine type of empty array");
4984 }
4985 } else {
4986 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4987 (ecx.scalar_type(&out[0]), out)
4988 };
4989
4990 if matches!(
4996 elem_type,
4997 SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4998 ) {
4999 bail_unsupported!(format!(
5000 "{}[]",
5001 ecx.humanize_sql_scalar_type(&elem_type, false)
5002 ));
5003 }
5004
5005 Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
5006}
5007
5008fn plan_list(
5009 ecx: &ExprContext,
5010 exprs: &[Expr<Aug>],
5011 type_hint: Option<&SqlScalarType>,
5012) -> Result<CoercibleScalarExpr, PlanError> {
5013 let (elem_type, exprs) = if exprs.is_empty() {
5014 if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
5015 (element_type.without_modifiers(), vec![])
5016 } else {
5017 sql_bail!("cannot determine type of empty list");
5018 }
5019 } else {
5020 let type_hint = match type_hint {
5021 Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5022 _ => None,
5023 };
5024
5025 let mut out = vec![];
5026 for expr in exprs {
5027 out.push(match expr {
5028 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5031 _ => plan_expr(ecx, expr)?,
5032 });
5033 }
5034 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5035 (ecx.scalar_type(&out[0]).without_modifiers(), out)
5036 };
5037
5038 if matches!(elem_type, SqlScalarType::Char { .. }) {
5039 bail_unsupported!("char list");
5040 }
5041
5042 Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5043}
5044
5045fn plan_map(
5046 ecx: &ExprContext,
5047 entries: &[MapEntry<Aug>],
5048 type_hint: Option<&SqlScalarType>,
5049) -> Result<CoercibleScalarExpr, PlanError> {
5050 let (value_type, exprs) = if entries.is_empty() {
5051 if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5052 (value_type.without_modifiers(), vec![])
5053 } else {
5054 sql_bail!("cannot determine type of empty map");
5055 }
5056 } else {
5057 let type_hint = match type_hint {
5058 Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5059 _ => None,
5060 };
5061
5062 let mut keys = vec![];
5063 let mut values = vec![];
5064 for MapEntry { key, value } in entries {
5065 let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5066 let value = match value {
5067 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5070 _ => plan_expr(ecx, value)?,
5071 };
5072 keys.push(key);
5073 values.push(value);
5074 }
5075 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5076 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5077 let out = itertools::interleave(keys, values).collect();
5078 (value_type, out)
5079 };
5080
5081 if matches!(value_type, SqlScalarType::Char { .. }) {
5082 bail_unsupported!("char map");
5083 }
5084
5085 let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5086 Ok(expr.into())
5087}
5088
5089pub fn coerce_homogeneous_exprs(
5106 ecx: &ExprContext,
5107 exprs: Vec<CoercibleScalarExpr>,
5108 force_type: Option<&SqlScalarType>,
5109) -> Result<Vec<HirScalarExpr>, PlanError> {
5110 assert!(!exprs.is_empty());
5111
5112 let target_holder;
5113 let target = match force_type {
5114 Some(t) => t,
5115 None => {
5116 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5117 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5118 &target_holder
5119 }
5120 };
5121
5122 let mut out = Vec::new();
5124 for expr in exprs {
5125 let arg = typeconv::plan_coerce(ecx, expr, target)?;
5126 let ccx = match force_type {
5127 None => CastContext::Implicit,
5128 Some(_) => CastContext::Explicit,
5129 };
5130 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5131 Ok(expr) => out.push(expr),
5132 Err(_) => sql_bail!(
5133 "{} could not convert type {} to {}",
5134 ecx.name,
5135 ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5136 ecx.humanize_sql_scalar_type(target, false),
5137 ),
5138 }
5139 }
5140 Ok(out)
5141}
5142
5143pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5146 obe: &OrderByExpr<T>,
5147 column: usize,
5148) -> ColumnOrder {
5149 let desc = !obe.asc.unwrap_or(true);
5150 ColumnOrder {
5151 column,
5152 desc,
5153 nulls_last: obe.nulls_last.unwrap_or(!desc),
5156 }
5157}
5158
5159fn plan_function_order_by(
5167 ecx: &ExprContext,
5168 order_by: &[OrderByExpr<Aug>],
5169) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5170 let mut order_by_exprs = vec![];
5171 let mut col_orders = vec![];
5172 {
5173 for (i, obe) in order_by.iter().enumerate() {
5174 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5178 order_by_exprs.push(expr);
5179 col_orders.push(resolve_desc_and_nulls_last(obe, i));
5180 }
5181 }
5182 Ok((order_by_exprs, col_orders))
5183}
5184
5185fn plan_aggregate_common(
5187 ecx: &ExprContext,
5188 Function::<Aug> {
5189 name,
5190 args,
5191 filter,
5192 over: _,
5193 distinct,
5194 }: &Function<Aug>,
5195) -> Result<AggregateExpr, PlanError> {
5196 let impls = match resolve_func(ecx, name, args)? {
5211 Func::Aggregate(impls) => impls,
5212 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5213 };
5214
5215 let (args, order_by) = match &args {
5224 FunctionArgs::Star => (vec![], vec![]),
5225 FunctionArgs::Args { args, order_by } => {
5226 if args.is_empty() {
5227 sql_bail!(
5228 "{}(*) must be used to call a parameterless aggregate function",
5229 ecx.qcx
5230 .scx
5231 .humanize_resolved_name(name)
5232 .expect("name actually resolved")
5233 );
5234 }
5235 let args = plan_exprs(ecx, args)?;
5236 (args, order_by.clone())
5237 }
5238 };
5239
5240 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5241
5242 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5243 if let Some(filter) = &filter {
5244 let cond =
5254 plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5255 let expr_typ = ecx.scalar_type(&expr);
5256 expr = HirScalarExpr::if_then_else(
5257 cond,
5258 expr,
5259 HirScalarExpr::literal(func.identity_datum(), expr_typ),
5260 );
5261 }
5262
5263 let mut seen_outer = false;
5264 let mut seen_inner = false;
5265 #[allow(deprecated)]
5266 expr.visit_columns(0, &mut |depth, col| {
5267 if depth == 0 && col.level == 0 {
5268 seen_inner = true;
5269 } else if col.level > depth {
5270 seen_outer = true;
5271 }
5272 });
5273 if seen_outer && !seen_inner {
5274 bail_unsupported!(
5275 3720,
5276 "aggregate functions that refer exclusively to outer columns"
5277 );
5278 }
5279
5280 if func.is_order_sensitive() {
5283 let field_names = iter::repeat(ColumnName::from(""))
5284 .take(1 + order_by_exprs.len())
5285 .collect();
5286 let mut exprs = vec![expr];
5287 exprs.extend(order_by_exprs);
5288 expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5289 }
5290
5291 Ok(AggregateExpr {
5292 func,
5293 expr: Box::new(expr),
5294 distinct: *distinct,
5295 })
5296}
5297
5298fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5299 let mut names = names.to_vec();
5300 let col_name = normalize::column_name(names.pop().unwrap());
5301
5302 if !names.is_empty() {
5304 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5305 let (i, i_name) = ecx.scope.resolve_table_column(
5306 &ecx.qcx.outer_scopes,
5307 &table_name,
5308 &col_name,
5309 &mut ecx.qcx.name_manager.borrow_mut(),
5310 )?;
5311 return Ok(HirScalarExpr::named_column(i, i_name));
5312 }
5313
5314 let similar_names = match ecx.scope.resolve_column(
5317 &ecx.qcx.outer_scopes,
5318 &col_name,
5319 &mut ecx.qcx.name_manager.borrow_mut(),
5320 ) {
5321 Ok((i, i_name)) => {
5322 return Ok(HirScalarExpr::named_column(i, i_name));
5323 }
5324 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5325 Err(e) => return Err(e),
5326 };
5327
5328 let items = ecx.scope.items_from_table(
5331 &ecx.qcx.outer_scopes,
5332 &PartialItemName {
5333 database: None,
5334 schema: None,
5335 item: col_name.as_str().to_owned(),
5336 },
5337 )?;
5338 match items.as_slice() {
5339 [] => Err(PlanError::UnknownColumn {
5341 table: None,
5342 column: col_name,
5343 similar: similar_names,
5344 }),
5345 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5350 *column,
5351 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5352 )),
5353 _ => {
5356 let mut has_exists_column = None;
5357 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5358 .into_iter()
5359 .filter_map(|(column, item)| {
5360 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5361 has_exists_column = Some(column);
5362 None
5363 } else {
5364 let expr = HirScalarExpr::named_column(
5365 column,
5366 ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5367 );
5368 let name = item.column_name.clone();
5369 Some((expr, name))
5370 }
5371 })
5372 .unzip();
5373 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5375 exprs.into_element()
5376 } else {
5377 HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5378 };
5379 if let Some(has_exists_column) = has_exists_column {
5380 Ok(HirScalarExpr::if_then_else(
5381 HirScalarExpr::unnamed_column(has_exists_column)
5382 .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5383 HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5384 expr,
5385 ))
5386 } else {
5387 Ok(expr)
5388 }
5389 }
5390 }
5391}
5392
5393fn plan_op(
5394 ecx: &ExprContext,
5395 op: &str,
5396 expr1: &Expr<Aug>,
5397 expr2: Option<&Expr<Aug>>,
5398) -> Result<HirScalarExpr, PlanError> {
5399 let impls = func::resolve_op(op)?;
5400 let args = match expr2 {
5401 None => plan_exprs(ecx, &[expr1])?,
5402 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5403 };
5404 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5405}
5406
5407fn plan_function<'a>(
5408 ecx: &ExprContext,
5409 f @ Function {
5410 name,
5411 args,
5412 filter,
5413 over,
5414 distinct,
5415 }: &'a Function<Aug>,
5416) -> Result<HirScalarExpr, PlanError> {
5417 let impls = match resolve_func(ecx, name, args)? {
5418 Func::Table(_) => {
5419 sql_bail!(
5420 "table functions are not allowed in {} (function {})",
5421 ecx.name,
5422 name
5423 );
5424 }
5425 Func::Scalar(impls) => {
5426 if over.is_some() {
5427 sql_bail!(
5428 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5429 );
5430 }
5431 impls
5432 }
5433 Func::ScalarWindow(impls) => {
5434 let (
5435 ignore_nulls,
5436 order_by_exprs,
5437 col_orders,
5438 _window_frame,
5439 partition_by,
5440 scalar_args,
5441 ) = plan_window_function_non_aggr(ecx, f)?;
5442
5443 if !scalar_args.is_empty() {
5447 if let ResolvedItemName::Item {
5448 full_name: FullItemName { item, .. },
5449 ..
5450 } = name
5451 {
5452 sql_bail!(
5453 "function {} has 0 parameters, but was called with {}",
5454 item,
5455 scalar_args.len()
5456 );
5457 }
5458 }
5459
5460 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5465
5466 if ignore_nulls {
5467 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5470 }
5471
5472 return Ok(HirScalarExpr::windowing(WindowExpr {
5473 func: WindowExprType::Scalar(ScalarWindowExpr {
5474 func,
5475 order_by: col_orders,
5476 }),
5477 partition_by,
5478 order_by: order_by_exprs,
5479 }));
5480 }
5481 Func::ValueWindow(impls) => {
5482 let window_plan = plan_window_function_non_aggr(ecx, f)?;
5483 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5484 window_plan;
5485
5486 let (args_encoded, func) =
5487 func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5488
5489 if ignore_nulls {
5490 match func {
5491 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5492 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5493 }
5494 }
5495
5496 return Ok(HirScalarExpr::windowing(WindowExpr {
5497 func: WindowExprType::Value(ValueWindowExpr {
5498 func,
5499 args: Box::new(args_encoded),
5500 order_by: col_orders,
5501 window_frame,
5502 ignore_nulls, }),
5504 partition_by,
5505 order_by: order_by_exprs,
5506 }));
5507 }
5508 Func::Aggregate(_) => {
5509 if f.over.is_none() {
5510 if ecx.allow_aggregates {
5512 sql_bail!(
5515 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5516 name,
5517 );
5518 } else {
5519 sql_bail!(
5522 "aggregate functions are not allowed in {} (function {})",
5523 ecx.name,
5524 name
5525 );
5526 }
5527 } else {
5528 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5529 plan_window_function_common(ecx, &f.name, &f.over)?;
5530
5531 match (&window_frame.start_bound, &window_frame.end_bound) {
5533 (
5534 mz_expr::WindowFrameBound::UnboundedPreceding,
5535 mz_expr::WindowFrameBound::OffsetPreceding(..),
5536 )
5537 | (
5538 mz_expr::WindowFrameBound::UnboundedPreceding,
5539 mz_expr::WindowFrameBound::OffsetFollowing(..),
5540 )
5541 | (
5542 mz_expr::WindowFrameBound::OffsetPreceding(..),
5543 mz_expr::WindowFrameBound::UnboundedFollowing,
5544 )
5545 | (
5546 mz_expr::WindowFrameBound::OffsetFollowing(..),
5547 mz_expr::WindowFrameBound::UnboundedFollowing,
5548 ) => bail_unsupported!("mixed unbounded - offset frames"),
5549 (_, _) => {} }
5551
5552 if ignore_nulls {
5553 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5557 }
5558
5559 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5560
5561 if aggregate_expr.distinct {
5562 bail_unsupported!("DISTINCT in window aggregates");
5564 }
5565
5566 return Ok(HirScalarExpr::windowing(WindowExpr {
5567 func: WindowExprType::Aggregate(AggregateWindowExpr {
5568 aggregate_expr,
5569 order_by: col_orders,
5570 window_frame,
5571 }),
5572 partition_by,
5573 order_by: order_by_exprs,
5574 }));
5575 }
5576 }
5577 };
5578
5579 if over.is_some() {
5580 unreachable!("If there is an OVER clause, we should have returned already above.");
5581 }
5582
5583 if *distinct {
5584 sql_bail!(
5585 "DISTINCT specified, but {} is not an aggregate function",
5586 ecx.qcx
5587 .scx
5588 .humanize_resolved_name(name)
5589 .expect("already resolved")
5590 );
5591 }
5592 if filter.is_some() {
5593 sql_bail!(
5594 "FILTER specified, but {} is not an aggregate function",
5595 ecx.qcx
5596 .scx
5597 .humanize_resolved_name(name)
5598 .expect("already resolved")
5599 );
5600 }
5601
5602 let scalar_args = match &args {
5603 FunctionArgs::Star => {
5604 sql_bail!(
5605 "* argument is invalid with non-aggregate function {}",
5606 ecx.qcx
5607 .scx
5608 .humanize_resolved_name(name)
5609 .expect("already resolved")
5610 )
5611 }
5612 FunctionArgs::Args { args, order_by } => {
5613 if !order_by.is_empty() {
5614 sql_bail!(
5615 "ORDER BY specified, but {} is not an aggregate function",
5616 ecx.qcx
5617 .scx
5618 .humanize_resolved_name(name)
5619 .expect("already resolved")
5620 );
5621 }
5622 plan_exprs(ecx, args)?
5623 }
5624 };
5625
5626 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5627}
5628
5629pub const IGNORE_NULLS_ERROR_MSG: &str =
5630 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5631
5632pub fn resolve_func(
5636 ecx: &ExprContext,
5637 name: &ResolvedItemName,
5638 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5639) -> Result<&'static Func, PlanError> {
5640 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5641 if let Ok(f) = i.func() {
5642 return Ok(f);
5643 }
5644 }
5645
5646 let cexprs = match args {
5649 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5650 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5651 if !order_by.is_empty() {
5652 sql_bail!(
5653 "ORDER BY specified, but {} is not an aggregate function",
5654 name
5655 );
5656 }
5657 plan_exprs(ecx, args)?
5658 }
5659 };
5660
5661 let arg_types: Vec<_> = cexprs
5662 .into_iter()
5663 .map(|ty| match ecx.scalar_type(&ty) {
5664 CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5665 CoercibleScalarType::Record(_) => "record".to_string(),
5666 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5667 })
5668 .collect();
5669
5670 Err(PlanError::UnknownFunction {
5671 name: name.to_string(),
5672 arg_types,
5673 })
5674}
5675
5676fn plan_is_expr<'a>(
5677 ecx: &ExprContext,
5678 expr: &'a Expr<Aug>,
5679 construct: &IsExprConstruct<Aug>,
5680 not: bool,
5681) -> Result<HirScalarExpr, PlanError> {
5682 let expr_hir = plan_expr(ecx, expr)?;
5683
5684 let mut result = match construct {
5685 IsExprConstruct::Null => {
5686 expr_hir.type_as_any(ecx)?.call_is_null()
5691 }
5692 IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5693 IsExprConstruct::True => expr_hir
5694 .type_as(ecx, &SqlScalarType::Bool)?
5695 .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5696 IsExprConstruct::False => expr_hir
5697 .type_as(ecx, &SqlScalarType::Bool)?
5698 .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5699 IsExprConstruct::DistinctFrom(expr2) => {
5700 let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5711 let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5712
5713 let expr1_hir = expr_hir.type_as_any(ecx)?;
5714 let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5715
5716 let term1 = HirScalarExpr::variadic_or(vec![
5717 ne_hir,
5718 expr1_hir.clone().call_is_null(),
5719 expr2_hir.clone().call_is_null(),
5720 ]);
5721 let term2 = HirScalarExpr::variadic_or(vec![
5722 expr1_hir.call_is_null().not(),
5723 expr2_hir.call_is_null().not(),
5724 ]);
5725 term1.and(term2)
5726 }
5727 };
5728 if not {
5729 result = result.not();
5730 }
5731 Ok(result)
5732}
5733
5734fn plan_case<'a>(
5735 ecx: &ExprContext,
5736 operand: &'a Option<Box<Expr<Aug>>>,
5737 conditions: &'a [Expr<Aug>],
5738 results: &'a [Expr<Aug>],
5739 else_result: &'a Option<Box<Expr<Aug>>>,
5740) -> Result<HirScalarExpr, PlanError> {
5741 let mut cond_exprs = Vec::new();
5742 let mut result_exprs = Vec::new();
5743 for (c, r) in conditions.iter().zip_eq(results) {
5744 let c = match operand {
5745 Some(operand) => operand.clone().equals(c.clone()),
5746 None => c.clone(),
5747 };
5748 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5749 cond_exprs.push(cexpr);
5750 result_exprs.push(r);
5751 }
5752 result_exprs.push(match else_result {
5753 Some(else_result) => else_result,
5754 None => &Expr::Value(Value::Null),
5755 });
5756 let mut result_exprs = coerce_homogeneous_exprs(
5757 &ecx.with_name("CASE"),
5758 plan_exprs(ecx, &result_exprs)?,
5759 None,
5760 )?;
5761 let mut expr = result_exprs.pop().unwrap();
5762 assert_eq!(cond_exprs.len(), result_exprs.len());
5763 for (cexpr, rexpr) in cond_exprs
5764 .into_iter()
5765 .rev()
5766 .zip_eq(result_exprs.into_iter().rev())
5767 {
5768 expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5769 }
5770 Ok(expr)
5771}
5772
5773fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5774 let (datum, scalar_type) = match l {
5775 Value::Number(s) => {
5776 let d = strconv::parse_numeric(s.as_str())?;
5777 if !s.contains(&['E', '.'][..]) {
5778 if let Ok(n) = d.0.try_into() {
5780 (Datum::Int32(n), SqlScalarType::Int32)
5781 } else if let Ok(n) = d.0.try_into() {
5782 (Datum::Int64(n), SqlScalarType::Int64)
5783 } else {
5784 (
5785 Datum::Numeric(d),
5786 SqlScalarType::Numeric { max_scale: None },
5787 )
5788 }
5789 } else {
5790 (
5791 Datum::Numeric(d),
5792 SqlScalarType::Numeric { max_scale: None },
5793 )
5794 }
5795 }
5796 Value::HexString(_) => bail_unsupported!("hex string literals"),
5797 Value::Boolean(b) => match b {
5798 false => (Datum::False, SqlScalarType::Bool),
5799 true => (Datum::True, SqlScalarType::Bool),
5800 },
5801 Value::Interval(i) => {
5802 let i = literal::plan_interval(i)?;
5803 (Datum::Interval(i), SqlScalarType::Interval)
5804 }
5805 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5806 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5807 };
5808 let expr = HirScalarExpr::literal(datum, scalar_type);
5809 Ok(expr.into())
5810}
5811
5812fn plan_window_function_non_aggr<'a>(
5815 ecx: &ExprContext,
5816 Function {
5817 name,
5818 args,
5819 filter,
5820 over,
5821 distinct,
5822 }: &'a Function<Aug>,
5823) -> Result<
5824 (
5825 bool,
5826 Vec<HirScalarExpr>,
5827 Vec<ColumnOrder>,
5828 mz_expr::WindowFrame,
5829 Vec<HirScalarExpr>,
5830 Vec<CoercibleScalarExpr>,
5831 ),
5832 PlanError,
5833> {
5834 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5835 plan_window_function_common(ecx, name, over)?;
5836
5837 if *distinct {
5838 sql_bail!(
5839 "DISTINCT specified, but {} is not an aggregate function",
5840 name
5841 );
5842 }
5843
5844 if filter.is_some() {
5845 bail_unsupported!("FILTER in non-aggregate window functions");
5846 }
5847
5848 let scalar_args = match &args {
5849 FunctionArgs::Star => {
5850 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5851 }
5852 FunctionArgs::Args { args, order_by } => {
5853 if !order_by.is_empty() {
5854 sql_bail!(
5855 "ORDER BY specified, but {} is not an aggregate function",
5856 name
5857 );
5858 }
5859 plan_exprs(ecx, args)?
5860 }
5861 };
5862
5863 Ok((
5864 ignore_nulls,
5865 order_by_exprs,
5866 col_orders,
5867 window_frame,
5868 partition,
5869 scalar_args,
5870 ))
5871}
5872
5873fn plan_window_function_common(
5875 ecx: &ExprContext,
5876 name: &<Aug as AstInfo>::ItemName,
5877 over: &Option<WindowSpec<Aug>>,
5878) -> Result<
5879 (
5880 bool,
5881 Vec<HirScalarExpr>,
5882 Vec<ColumnOrder>,
5883 mz_expr::WindowFrame,
5884 Vec<HirScalarExpr>,
5885 ),
5886 PlanError,
5887> {
5888 if !ecx.allow_windows {
5889 sql_bail!(
5890 "window functions are not allowed in {} (function {})",
5891 ecx.name,
5892 name
5893 );
5894 }
5895
5896 let window_spec = match over.as_ref() {
5897 Some(over) => over,
5898 None => sql_bail!("window function {} requires an OVER clause", name),
5899 };
5900 if window_spec.ignore_nulls && window_spec.respect_nulls {
5901 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5902 }
5903 let window_frame = match window_spec.window_frame.as_ref() {
5904 Some(frame) => plan_window_frame(frame)?,
5905 None => mz_expr::WindowFrame::default(),
5906 };
5907 let mut partition = Vec::new();
5908 for expr in &window_spec.partition_by {
5909 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5910 }
5911
5912 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5913
5914 Ok((
5915 window_spec.ignore_nulls,
5916 order_by_exprs,
5917 col_orders,
5918 window_frame,
5919 partition,
5920 ))
5921}
5922
5923fn plan_window_frame(
5924 WindowFrame {
5925 units,
5926 start_bound,
5927 end_bound,
5928 }: &WindowFrame,
5929) -> Result<mz_expr::WindowFrame, PlanError> {
5930 use mz_expr::WindowFrameBound::*;
5931 let units = window_frame_unit_ast_to_expr(units)?;
5932 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5933 let end_bound = end_bound
5934 .as_ref()
5935 .map(window_frame_bound_ast_to_expr)
5936 .unwrap_or(CurrentRow);
5937
5938 match (&start_bound, &end_bound) {
5940 (UnboundedFollowing, _) => {
5942 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5943 }
5944 (_, UnboundedPreceding) => {
5946 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5947 }
5948 (CurrentRow, OffsetPreceding(_)) => {
5950 sql_bail!("frame starting from current row cannot have preceding rows")
5951 }
5952 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5953 sql_bail!("frame starting from following row cannot have preceding rows")
5954 }
5955 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5958 if *o1 > 1000000 || *o2 > 1000000 {
5962 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5963 }
5964 }
5965 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5966 if *o1 > 1000000 || *o2 > 1000000 {
5967 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5968 }
5969 }
5970 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5971 if *o1 > 1000000 || *o2 > 1000000 {
5972 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5973 }
5974 }
5975 (OffsetPreceding(o), CurrentRow) => {
5976 if *o > 1000000 {
5977 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5978 }
5979 }
5980 (CurrentRow, OffsetFollowing(o)) => {
5981 if *o > 1000000 {
5982 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5983 }
5984 }
5985 (_, _) => (),
5987 }
5988
5989 if units == mz_expr::WindowFrameUnits::Range
5992 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5993 {
5994 bail_unsupported!("RANGE in non-default window frames")
5995 }
5996
5997 let frame = mz_expr::WindowFrame {
5998 units,
5999 start_bound,
6000 end_bound,
6001 };
6002 Ok(frame)
6003}
6004
6005fn window_frame_unit_ast_to_expr(
6006 unit: &WindowFrameUnits,
6007) -> Result<mz_expr::WindowFrameUnits, PlanError> {
6008 match unit {
6009 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
6010 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
6011 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
6012 }
6013}
6014
6015fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
6016 match bound {
6017 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
6018 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
6019 WindowFrameBound::Preceding(Some(offset)) => {
6020 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6021 }
6022 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6023 WindowFrameBound::Following(Some(offset)) => {
6024 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6025 }
6026 }
6027}
6028
6029pub fn scalar_type_from_sql(
6030 scx: &StatementContext,
6031 data_type: &ResolvedDataType,
6032) -> Result<SqlScalarType, PlanError> {
6033 match data_type {
6034 ResolvedDataType::AnonymousList(elem_type) => {
6035 let elem_type = scalar_type_from_sql(scx, elem_type)?;
6036 if matches!(elem_type, SqlScalarType::Char { .. }) {
6037 bail_unsupported!("char list");
6038 }
6039 Ok(SqlScalarType::List {
6040 element_type: Box::new(elem_type),
6041 custom_id: None,
6042 })
6043 }
6044 ResolvedDataType::AnonymousMap {
6045 key_type,
6046 value_type,
6047 } => {
6048 match scalar_type_from_sql(scx, key_type)? {
6049 SqlScalarType::String => {}
6050 other => sql_bail!(
6051 "map key type must be {}, got {}",
6052 scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6053 scx.humanize_sql_scalar_type(&other, false)
6054 ),
6055 }
6056 Ok(SqlScalarType::Map {
6057 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6058 custom_id: None,
6059 })
6060 }
6061 ResolvedDataType::Named { id, modifiers, .. } => {
6062 scalar_type_from_catalog(scx.catalog, *id, modifiers)
6063 }
6064 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6065 }
6066}
6067
6068pub fn scalar_type_from_catalog(
6069 catalog: &dyn SessionCatalog,
6070 id: CatalogItemId,
6071 modifiers: &[i64],
6072) -> Result<SqlScalarType, PlanError> {
6073 let entry = catalog.get_item(&id);
6074 let type_details = match entry.type_details() {
6075 Some(type_details) => type_details,
6076 None => {
6077 sql_bail!(
6080 "internal error: {} does not refer to a type",
6081 catalog.resolve_full_name(entry.name()).to_string().quoted()
6082 );
6083 }
6084 };
6085 match &type_details.typ {
6086 CatalogType::Numeric => {
6087 let mut modifiers = modifiers.iter().fuse();
6088 let precision = match modifiers.next() {
6089 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6090 sql_bail!(
6091 "precision for type numeric must be between 1 and {}",
6092 NUMERIC_DATUM_MAX_PRECISION,
6093 );
6094 }
6095 Some(p) => Some(*p),
6096 None => None,
6097 };
6098 let scale = match modifiers.next() {
6099 Some(scale) => {
6100 if let Some(precision) = precision {
6101 if *scale > precision {
6102 sql_bail!(
6103 "scale for type numeric must be between 0 and precision {}",
6104 precision
6105 );
6106 }
6107 }
6108 Some(NumericMaxScale::try_from(*scale)?)
6109 }
6110 None => None,
6111 };
6112 if modifiers.next().is_some() {
6113 sql_bail!("type numeric supports at most two type modifiers");
6114 }
6115 Ok(SqlScalarType::Numeric { max_scale: scale })
6116 }
6117 CatalogType::Char => {
6118 let mut modifiers = modifiers.iter().fuse();
6119 let length = match modifiers.next() {
6120 Some(l) => Some(CharLength::try_from(*l)?),
6121 None => Some(CharLength::ONE),
6122 };
6123 if modifiers.next().is_some() {
6124 sql_bail!("type character supports at most one type modifier");
6125 }
6126 Ok(SqlScalarType::Char { length })
6127 }
6128 CatalogType::VarChar => {
6129 let mut modifiers = modifiers.iter().fuse();
6130 let length = match modifiers.next() {
6131 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6132 None => None,
6133 };
6134 if modifiers.next().is_some() {
6135 sql_bail!("type character varying supports at most one type modifier");
6136 }
6137 Ok(SqlScalarType::VarChar { max_length: length })
6138 }
6139 CatalogType::Timestamp => {
6140 let mut modifiers = modifiers.iter().fuse();
6141 let precision = match modifiers.next() {
6142 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6143 None => None,
6144 };
6145 if modifiers.next().is_some() {
6146 sql_bail!("type timestamp supports at most one type modifier");
6147 }
6148 Ok(SqlScalarType::Timestamp { precision })
6149 }
6150 CatalogType::TimestampTz => {
6151 let mut modifiers = modifiers.iter().fuse();
6152 let precision = match modifiers.next() {
6153 Some(p) => Some(TimestampPrecision::try_from(*p)?),
6154 None => None,
6155 };
6156 if modifiers.next().is_some() {
6157 sql_bail!("type timestamp with time zone supports at most one type modifier");
6158 }
6159 Ok(SqlScalarType::TimestampTz { precision })
6160 }
6161 t => {
6162 if !modifiers.is_empty() {
6163 sql_bail!(
6164 "{} does not support type modifiers",
6165 catalog.resolve_full_name(entry.name()).to_string()
6166 );
6167 }
6168 match t {
6169 CatalogType::Array {
6170 element_reference: element_id,
6171 } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6172 catalog,
6173 *element_id,
6174 modifiers,
6175 )?))),
6176 CatalogType::List {
6177 element_reference: element_id,
6178 element_modifiers,
6179 } => Ok(SqlScalarType::List {
6180 element_type: Box::new(scalar_type_from_catalog(
6181 catalog,
6182 *element_id,
6183 element_modifiers,
6184 )?),
6185 custom_id: Some(id),
6186 }),
6187 CatalogType::Map {
6188 key_reference: _,
6189 key_modifiers: _,
6190 value_reference: value_id,
6191 value_modifiers,
6192 } => Ok(SqlScalarType::Map {
6193 value_type: Box::new(scalar_type_from_catalog(
6194 catalog,
6195 *value_id,
6196 value_modifiers,
6197 )?),
6198 custom_id: Some(id),
6199 }),
6200 CatalogType::Range {
6201 element_reference: element_id,
6202 } => Ok(SqlScalarType::Range {
6203 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6204 }),
6205 CatalogType::Record { fields } => {
6206 let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6207 .iter()
6208 .map(|f| {
6209 let scalar_type = scalar_type_from_catalog(
6210 catalog,
6211 f.type_reference,
6212 &f.type_modifiers,
6213 )?;
6214 Ok((
6215 f.name.clone(),
6216 SqlColumnType {
6217 scalar_type,
6218 nullable: true,
6219 },
6220 ))
6221 })
6222 .collect::<Result<Box<_>, PlanError>>()?;
6223 Ok(SqlScalarType::Record {
6224 fields: scalars,
6225 custom_id: Some(id),
6226 })
6227 }
6228 CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6229 CatalogType::Bool => Ok(SqlScalarType::Bool),
6230 CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6231 CatalogType::Date => Ok(SqlScalarType::Date),
6232 CatalogType::Float32 => Ok(SqlScalarType::Float32),
6233 CatalogType::Float64 => Ok(SqlScalarType::Float64),
6234 CatalogType::Int16 => Ok(SqlScalarType::Int16),
6235 CatalogType::Int32 => Ok(SqlScalarType::Int32),
6236 CatalogType::Int64 => Ok(SqlScalarType::Int64),
6237 CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6238 CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6239 CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6240 CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6241 CatalogType::Interval => Ok(SqlScalarType::Interval),
6242 CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6243 CatalogType::Oid => Ok(SqlScalarType::Oid),
6244 CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6245 CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6246 CatalogType::Pseudo => {
6247 sql_bail!(
6248 "cannot reference pseudo type {}",
6249 catalog.resolve_full_name(entry.name()).to_string()
6250 )
6251 }
6252 CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6253 CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6254 CatalogType::RegType => Ok(SqlScalarType::RegType),
6255 CatalogType::String => Ok(SqlScalarType::String),
6256 CatalogType::Time => Ok(SqlScalarType::Time),
6257 CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6258 CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6259 CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6260 CatalogType::Numeric => unreachable!("handled above"),
6261 CatalogType::Char => unreachable!("handled above"),
6262 CatalogType::VarChar => unreachable!("handled above"),
6263 CatalogType::Timestamp => unreachable!("handled above"),
6264 CatalogType::TimestampTz => unreachable!("handled above"),
6265 }
6266 }
6267 }
6268}
6269
6270struct AggregateTableFuncVisitor<'a> {
6273 scx: &'a StatementContext<'a>,
6274 aggs: Vec<Function<Aug>>,
6275 within_aggregate: bool,
6276 tables: BTreeMap<Function<Aug>, String>,
6277 table_disallowed_context: Vec<&'static str>,
6278 in_select_item: bool,
6279 id_gen: IdGen,
6280 err: Option<PlanError>,
6281}
6282
6283impl<'a> AggregateTableFuncVisitor<'a> {
6284 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6285 AggregateTableFuncVisitor {
6286 scx,
6287 aggs: Vec::new(),
6288 within_aggregate: false,
6289 tables: BTreeMap::new(),
6290 table_disallowed_context: Vec::new(),
6291 in_select_item: false,
6292 id_gen: Default::default(),
6293 err: None,
6294 }
6295 }
6296
6297 fn into_result(
6298 self,
6299 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6300 match self.err {
6301 Some(err) => Err(err),
6302 None => {
6303 let mut seen = BTreeSet::new();
6306 let aggs = self
6307 .aggs
6308 .into_iter()
6309 .filter(move |agg| seen.insert(agg.clone()))
6310 .collect();
6311 Ok((aggs, self.tables))
6312 }
6313 }
6314 }
6315}
6316
6317impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6318 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6319 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6320 Ok(i) => i,
6321 Err(_) => return,
6323 };
6324
6325 match item.func() {
6326 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6329 if self.within_aggregate {
6330 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6331 return;
6332 }
6333 self.aggs.push(func.clone());
6334 let Function {
6335 name: _,
6336 args,
6337 filter,
6338 over: _,
6339 distinct: _,
6340 } = func;
6341 if let Some(filter) = filter {
6342 self.visit_expr_mut(filter);
6343 }
6344 let old_within_aggregate = self.within_aggregate;
6345 self.within_aggregate = true;
6346 self.table_disallowed_context
6347 .push("aggregate function calls");
6348
6349 self.visit_function_args_mut(args);
6350
6351 self.within_aggregate = old_within_aggregate;
6352 self.table_disallowed_context.pop();
6353 }
6354 Ok(Func::Table { .. }) => {
6355 self.table_disallowed_context.push("other table functions");
6356 visit_mut::visit_function_mut(self, func);
6357 self.table_disallowed_context.pop();
6358 }
6359 _ => visit_mut::visit_function_mut(self, func),
6360 }
6361 }
6362
6363 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6364 }
6366
6367 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6368 let (disallowed_context, func) = match expr {
6369 Expr::Case { .. } => (Some("CASE"), None),
6370 Expr::HomogenizingFunction {
6371 function: HomogenizingFunction::Coalesce,
6372 ..
6373 } => (Some("COALESCE"), None),
6374 Expr::Function(func) if self.in_select_item => {
6375 let mut table_func = None;
6378 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6379 if let Ok(Func::Table { .. }) = item.func() {
6380 if let Some(context) = self.table_disallowed_context.last() {
6381 self.err = Some(sql_err!(
6382 "table functions are not allowed in {} (function {})",
6383 context,
6384 func.name
6385 ));
6386 return;
6387 }
6388 table_func = Some(func.clone());
6389 }
6390 }
6391 (None, table_func)
6394 }
6395 _ => (None, None),
6396 };
6397 if let Some(func) = func {
6398 visit_mut::visit_expr_mut(self, expr);
6400 if let Function {
6402 name: _,
6403 args: _,
6404 filter: None,
6405 over: None,
6406 distinct: false,
6407 } = &func
6408 {
6409 let unique_id = self.id_gen.allocate_id();
6411 let id = self
6412 .tables
6413 .entry(func)
6414 .or_insert_with(|| format!("table_func_{unique_id}"));
6415 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6418 }
6419 }
6420 if let Some(context) = disallowed_context {
6421 self.table_disallowed_context.push(context);
6422 }
6423
6424 visit_mut::visit_expr_mut(self, expr);
6425
6426 if disallowed_context.is_some() {
6427 self.table_disallowed_context.pop();
6428 }
6429 }
6430
6431 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6432 let old = self.in_select_item;
6433 self.in_select_item = true;
6434 visit_mut::visit_select_item_mut(self, si);
6435 self.in_select_item = old;
6436 }
6437}
6438
6439#[derive(Default)]
6440struct WindowFuncCollector {
6441 window_funcs: Vec<Expr<Aug>>,
6442}
6443
6444impl WindowFuncCollector {
6445 fn into_result(self) -> Vec<Expr<Aug>> {
6446 let mut seen = BTreeSet::new();
6448 let window_funcs_dedupped = self
6449 .window_funcs
6450 .into_iter()
6451 .filter(move |expr| seen.insert(expr.clone()))
6452 .rev()
6455 .collect();
6456 window_funcs_dedupped
6457 }
6458}
6459
6460impl Visit<'_, Aug> for WindowFuncCollector {
6461 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6462 match expr {
6463 Expr::Function(func) => {
6464 if func.over.is_some() {
6465 self.window_funcs.push(expr.clone());
6466 }
6467 }
6468 _ => (),
6469 }
6470 visit::visit_expr(self, expr);
6471 }
6472
6473 fn visit_query(&mut self, _query: &Query<Aug>) {
6474 }
6476}
6477
6478#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6480pub enum QueryLifetime {
6481 OneShot,
6483 Index,
6485 MaterializedView,
6487 Subscribe,
6489 View,
6491 Source,
6493}
6494
6495impl QueryLifetime {
6496 pub fn is_one_shot(&self) -> bool {
6500 let result = match self {
6501 QueryLifetime::OneShot => true,
6502 QueryLifetime::Index => false,
6503 QueryLifetime::MaterializedView => false,
6504 QueryLifetime::Subscribe => false,
6505 QueryLifetime::View => false,
6506 QueryLifetime::Source => false,
6507 };
6508 assert_eq!(!result, self.is_maintained());
6509 result
6510 }
6511
6512 pub fn is_maintained(&self) -> bool {
6515 match self {
6516 QueryLifetime::OneShot => false,
6517 QueryLifetime::Index => true,
6518 QueryLifetime::MaterializedView => true,
6519 QueryLifetime::Subscribe => true,
6520 QueryLifetime::View => true,
6521 QueryLifetime::Source => true,
6522 }
6523 }
6524
6525 pub fn allow_show(&self) -> bool {
6527 match self {
6528 QueryLifetime::OneShot => true,
6529 QueryLifetime::Index => false,
6530 QueryLifetime::MaterializedView => false,
6531 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6533 QueryLifetime::Source => false,
6534 }
6535 }
6536}
6537
6538#[derive(Debug, Clone)]
6540pub struct CteDesc {
6541 pub name: String,
6542 pub desc: RelationDesc,
6543}
6544
6545#[derive(Debug, Clone)]
6547pub struct QueryContext<'a> {
6548 pub scx: &'a StatementContext<'a>,
6550 pub lifetime: QueryLifetime,
6552 pub outer_scopes: Vec<Scope>,
6554 pub outer_relation_types: Vec<SqlRelationType>,
6556 pub ctes: BTreeMap<LocalId, CteDesc>,
6558 pub name_manager: Rc<RefCell<NameManager>>,
6560 pub recursion_guard: RecursionGuard,
6561}
6562
6563impl CheckedRecursion for QueryContext<'_> {
6564 fn recursion_guard(&self) -> &RecursionGuard {
6565 &self.recursion_guard
6566 }
6567}
6568
6569impl<'a> QueryContext<'a> {
6570 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6571 QueryContext {
6572 scx,
6573 lifetime,
6574 outer_scopes: vec![],
6575 outer_relation_types: vec![],
6576 ctes: BTreeMap::new(),
6577 name_manager: Rc::new(RefCell::new(NameManager::new())),
6578 recursion_guard: RecursionGuard::with_limit(1024), }
6580 }
6581
6582 fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6583 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6584 }
6585
6586 fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6589 let ctes = self.ctes.clone();
6590 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6591 let outer_relation_types = iter::once(relation_type)
6592 .chain(self.outer_relation_types.clone())
6593 .collect();
6594 let name_manager = Rc::clone(&self.name_manager);
6596
6597 QueryContext {
6598 scx: self.scx,
6599 lifetime: self.lifetime,
6600 outer_scopes,
6601 outer_relation_types,
6602 ctes,
6603 name_manager,
6604 recursion_guard: self.recursion_guard.clone(),
6605 }
6606 }
6607
6608 fn empty_derived_context(&self) -> QueryContext<'a> {
6610 let scope = Scope::empty();
6611 let ty = SqlRelationType::empty();
6612 self.derived_context(scope, ty)
6613 }
6614
6615 pub fn resolve_table_name(
6618 &self,
6619 object: ResolvedItemName,
6620 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6621 match object {
6622 ResolvedItemName::Item {
6623 id,
6624 full_name,
6625 version,
6626 ..
6627 } => {
6628 let item = self.scx.get_item(&id).at_version(version);
6629 let desc = match item.relation_desc() {
6630 Some(desc) => desc.clone(),
6631 None => {
6632 return Err(PlanError::InvalidDependency {
6633 name: full_name.to_string(),
6634 item_type: item.item_type().to_string(),
6635 });
6636 }
6637 };
6638 let expr = HirRelationExpr::Get {
6639 id: Id::Global(item.global_id()),
6640 typ: desc.typ().clone(),
6641 };
6642
6643 let name = full_name.into();
6644 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6645
6646 Ok((expr, scope))
6647 }
6648 ResolvedItemName::Cte { id, name } => {
6649 let name = name.into();
6650 let cte = self.ctes.get(&id).unwrap();
6651 let expr = HirRelationExpr::Get {
6652 id: Id::Local(id),
6653 typ: cte.desc.typ().clone(),
6654 };
6655
6656 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6657
6658 Ok((expr, scope))
6659 }
6660 ResolvedItemName::ContinualTask { id, name } => {
6661 let cte = self.ctes.get(&id).unwrap();
6662 let expr = HirRelationExpr::Get {
6663 id: Id::Local(id),
6664 typ: cte.desc.typ().clone(),
6665 };
6666
6667 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6668
6669 Ok((expr, scope))
6670 }
6671 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6672 }
6673 }
6674
6675 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6678 self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6679 }
6680}
6681
6682#[derive(Debug, Clone)]
6684pub struct ExprContext<'a> {
6685 pub qcx: &'a QueryContext<'a>,
6686 pub name: &'a str,
6688 pub scope: &'a Scope,
6691 pub relation_type: &'a SqlRelationType,
6694 pub allow_aggregates: bool,
6696 pub allow_subqueries: bool,
6698 pub allow_parameters: bool,
6700 pub allow_windows: bool,
6702}
6703
6704impl CheckedRecursion for ExprContext<'_> {
6705 fn recursion_guard(&self) -> &RecursionGuard {
6706 &self.qcx.recursion_guard
6707 }
6708}
6709
6710impl<'a> ExprContext<'a> {
6711 pub fn catalog(&self) -> &dyn SessionCatalog {
6712 self.qcx.scx.catalog
6713 }
6714
6715 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6716 let mut ecx = self.clone();
6717 ecx.name = name;
6718 ecx
6719 }
6720
6721 pub fn column_type<E>(&self, expr: &E) -> E::Type
6722 where
6723 E: AbstractExpr,
6724 {
6725 expr.typ(
6726 &self.qcx.outer_relation_types,
6727 self.relation_type,
6728 &self.qcx.scx.param_types.borrow(),
6729 )
6730 }
6731
6732 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6733 where
6734 E: AbstractExpr,
6735 {
6736 self.column_type(expr).scalar_type()
6737 }
6738
6739 fn derived_query_context(&self) -> QueryContext<'_> {
6740 let mut scope = self.scope.clone();
6741 scope.lateral_barrier = true;
6742 self.qcx.derived_context(scope, self.relation_type.clone())
6743 }
6744
6745 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6746 self.qcx.scx.require_feature_flag(flag)
6747 }
6748
6749 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6750 &self.qcx.scx.param_types
6751 }
6752
6753 pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6756 self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6757 }
6758
6759 pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6760 self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6761 }
6762}
6763
6764#[derive(Debug, Clone)]
6770pub struct NameManager(BTreeSet<Arc<str>>);
6771
6772impl NameManager {
6773 pub fn new() -> Self {
6775 Self(BTreeSet::new())
6776 }
6777
6778 fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6781 let s = s.as_ref();
6782 if let Some(interned) = self.0.get(s) {
6783 Arc::clone(interned)
6784 } else {
6785 let interned: Arc<str> = Arc::from(s);
6786 self.0.insert(Arc::clone(&interned));
6787 interned
6788 }
6789 }
6790
6791 pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6794 self.intern(item.column_name.as_str())
6810 }
6811}
6812
6813#[cfg(test)]
6814mod test {
6815 use super::*;
6816
6817 #[mz_ore::test]
6822 pub fn test_name_manager_string_interning() {
6823 let mut nm = NameManager::new();
6824
6825 let orig_hi = "hi";
6826 let hi = nm.intern(orig_hi);
6827 let hello = nm.intern("hello");
6828
6829 assert_ne!(hi.as_ptr(), hello.as_ptr());
6830
6831 let hi2 = nm.intern("hi");
6833 assert_eq!(hi.as_ptr(), hi2.as_ptr());
6834
6835 let s = format!(
6837 "{}{}",
6838 hi.chars().nth(0).unwrap(),
6839 hi2.chars().nth(1).unwrap()
6840 );
6841 assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6843
6844 let hi3 = nm.intern(s);
6845 assert_eq!(hi.as_ptr(), hi3.as_ptr());
6846 }
6847}