1use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::{iter, mem};
45
46use itertools::Itertools;
47use mz_expr::virtual_syntax::AlgExcept;
48use mz_expr::{
49 Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, func as expr_func,
50};
51use mz_ore::assert_none;
52use mz_ore::collections::CollectionExt;
53use mz_ore::option::FallibleMapExt;
54use mz_ore::stack::{CheckedRecursion, RecursionGuard};
55use mz_ore::str::StrExt;
56use mz_repr::adt::char::CharLength;
57use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
58use mz_repr::adt::timestamp::TimestampPrecision;
59use mz_repr::adt::varchar::VarCharMaxLength;
60use mz_repr::{
61 CatalogItemId, ColumnIndex, ColumnName, ColumnType, Datum, RelationDesc, RelationType,
62 RelationVersionSelector, Row, RowArena, ScalarType, strconv,
63};
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_sql_parser::ast::visit::Visit;
66use mz_sql_parser::ast::visit_mut::{self, VisitMut};
67use mz_sql_parser::ast::{
68 AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
69 CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
70 Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
71 JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
72 MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
73 SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
74 TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
75 WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
76};
77use mz_sql_parser::ident;
78use uuid::Uuid;
79
80use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
81use crate::func::{self, Func, FuncSpec};
82use crate::names::{
83 Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
84};
85use crate::normalize;
86use crate::plan::PlanError::InvalidWmrRecursionLimit;
87use crate::plan::error::PlanError;
88use crate::plan::hir::{
89 AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
90 BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
91 HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
92 ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
93};
94use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
95use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
96use crate::plan::statement::{StatementContext, StatementDesc, show};
97use crate::plan::typeconv::{self, CastContext};
98use crate::plan::{
99 Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
100 literal, transform_ast,
101};
102use crate::session::vars::{self, FeatureFlag};
103
104#[derive(Debug)]
105pub struct PlannedRootQuery<E> {
106 pub expr: E,
107 pub desc: RelationDesc,
108 pub finishing: RowSetFinishing<HirScalarExpr>,
109 pub scope: Scope,
110}
111
112#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
121pub fn plan_root_query(
122 scx: &StatementContext,
123 mut query: Query<Aug>,
124 lifetime: QueryLifetime,
125) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
126 transform_ast::transform(scx, &mut query)?;
127 let mut qcx = QueryContext::root(scx, lifetime);
128 let PlannedQuery {
129 mut expr,
130 scope,
131 order_by,
132 limit,
133 offset,
134 project,
135 group_size_hints,
136 } = plan_query(&mut qcx, &query)?;
137
138 let mut finishing = RowSetFinishing {
139 limit,
140 offset,
141 project,
142 order_by,
143 };
144
145 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
151
152 if lifetime.is_maintained() {
153 expr.finish_maintained(&mut finishing, group_size_hints);
154 }
155
156 let typ = qcx.relation_type(&expr);
157 let typ = RelationType::new(
158 finishing
159 .project
160 .iter()
161 .map(|i| typ.column_types[*i].clone())
162 .collect(),
163 );
164 let desc = RelationDesc::new(typ, scope.column_names());
165
166 Ok(PlannedRootQuery {
167 expr,
168 desc,
169 finishing,
170 scope,
171 })
172}
173
174#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
176pub fn plan_ct_query(
177 qcx: &mut QueryContext,
178 mut query: Query<Aug>,
179) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
180 transform_ast::transform(qcx.scx, &mut query)?;
181 let PlannedQuery {
182 mut expr,
183 scope,
184 order_by,
185 limit,
186 offset,
187 project,
188 group_size_hints,
189 } = plan_query(qcx, &query)?;
190
191 let mut finishing = RowSetFinishing {
192 limit,
193 offset,
194 project,
195 order_by,
196 };
197
198 try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
204
205 expr.finish_maintained(&mut finishing, group_size_hints);
206
207 let typ = qcx.relation_type(&expr);
208 let typ = RelationType::new(
209 finishing
210 .project
211 .iter()
212 .map(|i| typ.column_types[*i].clone())
213 .collect(),
214 );
215 let desc = RelationDesc::new(typ, scope.column_names());
216
217 Ok(PlannedRootQuery {
218 expr,
219 desc,
220 finishing,
221 scope,
222 })
223}
224
225fn try_push_projection_order_by(
236 expr: &mut HirRelationExpr,
237 project: &mut Vec<usize>,
238 order_by: &mut Vec<ColumnOrder>,
239) -> bool {
240 let mut unproject = vec![None; expr.arity()];
241 for (out_i, in_i) in project.iter().copied().enumerate() {
242 unproject[in_i] = Some(out_i);
243 }
244 if order_by
245 .iter()
246 .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
247 {
248 let trivial_project = (0..project.len()).collect();
249 *expr = expr.take().project(mem::replace(project, trivial_project));
250 for ob in order_by {
251 ob.column = unproject[ob.column].unwrap();
252 }
253 true
254 } else {
255 false
256 }
257}
258
259pub fn plan_insert_query(
260 scx: &StatementContext,
261 table_name: ResolvedItemName,
262 columns: Vec<Ident>,
263 source: InsertSource<Aug>,
264 returning: Vec<SelectItem<Aug>>,
265) -> Result<
266 (
267 CatalogItemId,
268 HirRelationExpr,
269 PlannedRootQuery<Vec<HirScalarExpr>>,
270 ),
271 PlanError,
272> {
273 let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
274 let table = scx.get_item_by_resolved_name(&table_name)?;
275
276 if table.item_type() != CatalogItemType::Table {
278 sql_bail!(
279 "cannot insert into {} '{}'",
280 table.item_type(),
281 table_name.full_name_str()
282 );
283 }
284 let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
285 let mut defaults = table
286 .writable_table_details()
287 .ok_or_else(|| {
288 sql_err!(
289 "cannot insert into non-writeable table '{}'",
290 table_name.full_name_str()
291 )
292 })?
293 .to_vec();
294
295 for default in &mut defaults {
296 transform_ast::transform(scx, default)?;
297 }
298
299 if table.id().is_system() {
300 sql_bail!(
301 "cannot insert into system table '{}'",
302 table_name.full_name_str()
303 );
304 }
305
306 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
307
308 let mut source_types = Vec::with_capacity(columns.len());
310 let mut ordering = Vec::with_capacity(columns.len());
311
312 if columns.is_empty() {
313 source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
316 ordering.extend(0..desc.arity());
317 } else {
318 let column_by_name: BTreeMap<&ColumnName, (usize, &ColumnType)> = desc
319 .iter()
320 .enumerate()
321 .map(|(idx, (name, typ))| (name, (idx, typ)))
322 .collect();
323
324 for c in &columns {
325 if let Some((idx, typ)) = column_by_name.get(c) {
326 ordering.push(*idx);
327 source_types.push(&typ.scalar_type);
328 } else {
329 sql_bail!(
330 "column {} of relation {} does not exist",
331 c.as_str().quoted(),
332 table_name.full_name_str().quoted()
333 );
334 }
335 }
336 if let Some(dup) = columns.iter().duplicates().next() {
337 sql_bail!("column {} specified more than once", dup.as_str().quoted());
338 }
339 };
340
341 let expr = match source {
343 InsertSource::Query(mut query) => {
344 transform_ast::transform(scx, &mut query)?;
345
346 match query {
347 Query {
349 body: SetExpr::Values(Values(values)),
350 ctes,
351 order_by,
352 limit: None,
353 offset: None,
354 } if ctes.is_empty() && order_by.is_empty() => {
355 let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
356 plan_values_insert(&qcx, &names, &source_types, &values)?
357 }
358 _ => {
359 let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
360 expr
361 }
362 }
363 }
364 InsertSource::DefaultValues => {
365 HirRelationExpr::constant(vec![vec![]], RelationType::empty())
366 }
367 };
368
369 let expr_arity = expr.arity();
370
371 let max_columns = if columns.is_empty() {
374 desc.arity()
375 } else {
376 columns.len()
377 };
378 if expr_arity > max_columns {
379 sql_bail!("INSERT has more expressions than target columns");
380 }
381 if expr_arity < columns.len() {
383 sql_bail!("INSERT has more target columns than expressions");
384 }
385
386 source_types.truncate(expr_arity);
388 ordering.truncate(expr_arity);
389
390 let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
393 sql_err!(
394 "column {} is of type {} but expression is of type {}",
395 desc.get_name(ordering[e.column]).as_str().quoted(),
396 qcx.humanize_scalar_type(&e.target_type, false),
397 qcx.humanize_scalar_type(&e.source_type, false),
398 )
399 })?;
400
401 let mut map_exprs = vec![];
403 let mut project_key = Vec::with_capacity(desc.arity());
404
405 let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
407
408 let column_details = desc.iter_types().zip_eq(defaults).enumerate();
409 for (col_idx, (col_typ, default)) in column_details {
410 if let Some(src_idx) = col_to_source.get(&col_idx) {
411 project_key.push(*src_idx);
412 } else {
413 let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
414 project_key.push(expr_arity + map_exprs.len());
415 map_exprs.push(hir);
416 }
417 }
418
419 let returning = {
420 let (scope, typ) = if let ResolvedItemName::Item {
421 full_name,
422 version: _,
423 ..
424 } = table_name
425 {
426 let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
427 let typ = desc.typ().clone();
428 (scope, typ)
429 } else {
430 (Scope::empty(), RelationType::empty())
431 };
432 let ecx = &ExprContext {
433 qcx: &qcx,
434 name: "RETURNING clause",
435 scope: &scope,
436 relation_type: &typ,
437 allow_aggregates: false,
438 allow_subqueries: false,
439 allow_parameters: false,
440 allow_windows: false,
441 };
442 let table_func_names = BTreeMap::new();
443 let mut output_columns = vec![];
444 let mut new_exprs = vec![];
445 let mut new_type = RelationType::empty();
446 for mut si in returning {
447 transform_ast::transform(scx, &mut si)?;
448 for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
449 let expr = match &select_item {
450 ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
451 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
452 };
453 output_columns.push(column_name);
454 let typ = ecx.column_type(&expr);
455 new_type.column_types.push(typ);
456 new_exprs.push(expr);
457 }
458 }
459 let desc = RelationDesc::new(new_type, output_columns);
460 let desc_arity = desc.arity();
461 PlannedRootQuery {
462 expr: new_exprs,
463 desc,
464 finishing: RowSetFinishing {
465 order_by: vec![],
466 limit: None,
467 offset: 0,
468 project: (0..desc_arity).collect(),
469 },
470 scope,
471 }
472 };
473
474 Ok((
475 table.id(),
476 expr.map(map_exprs).project(project_key),
477 returning,
478 ))
479}
480
481pub fn plan_copy_item(
492 scx: &StatementContext,
493 item_name: ResolvedItemName,
494 columns: Vec<Ident>,
495) -> Result<
496 (
497 CatalogItemId,
498 RelationDesc,
499 Vec<ColumnIndex>,
500 Option<MapFilterProject>,
501 ),
502 PlanError,
503> {
504 let item = scx.get_item_by_resolved_name(&item_name)?;
505 let fullname = scx.catalog.resolve_full_name(item.name());
506 let table_desc = item.desc(&fullname)?.into_owned();
507 let mut ordering = Vec::with_capacity(columns.len());
508
509 let mfp = if let Some(table_defaults) = item.writable_table_details() {
519 let mut table_defaults = table_defaults.to_vec();
520
521 for default in &mut table_defaults {
522 transform_ast::transform(scx, default)?;
523 }
524
525 let source_column_names: Vec<_> = columns
527 .iter()
528 .cloned()
529 .map(normalize::column_name)
530 .collect();
531
532 let mut default_exprs = Vec::new();
533 let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535 let column_details = table_desc.iter().zip_eq(table_defaults);
538 for ((col_name, col_type), col_default) in column_details {
539 let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540 if let Some(src_idx) = maybe_src_idx {
541 project_keys.push(src_idx);
542 } else {
543 let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546 let mir = hir.lower_uncorrelated()?;
547 project_keys.push(source_column_names.len() + default_exprs.len());
548 default_exprs.push(mir);
549 }
550 }
551
552 let mfp = MapFilterProject::new(source_column_names.len())
553 .map(default_exprs)
554 .project(project_keys);
555 Some(mfp)
556 } else {
557 None
558 };
559
560 let source_desc = if columns.is_empty() {
562 let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563 ordering.extend(indexes);
564
565 table_desc
567 } else {
568 let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569 let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &ColumnType)> = table_desc
570 .iter_all()
571 .map(|(idx, name, typ)| (name, (*idx, typ)))
572 .collect();
573
574 let mut names = Vec::with_capacity(columns.len());
575 let mut source_types = Vec::with_capacity(columns.len());
576
577 for c in &columns {
578 if let Some((idx, typ)) = column_by_name.get(c) {
579 ordering.push(*idx);
580 source_types.push((*typ).clone());
581 names.push(c.clone());
582 } else {
583 sql_bail!(
584 "column {} of relation {} does not exist",
585 c.as_str().quoted(),
586 item_name.full_name_str().quoted()
587 );
588 }
589 }
590 if let Some(dup) = columns.iter().duplicates().next() {
591 sql_bail!("column {} specified more than once", dup.as_str().quoted());
592 }
593
594 RelationDesc::new(RelationType::new(source_types), names)
596 };
597
598 Ok((item.id(), source_desc, ordering, mfp))
599}
600
601pub fn plan_copy_from(
605 scx: &StatementContext,
606 table_name: ResolvedItemName,
607 columns: Vec<Ident>,
608) -> Result<
609 (
610 CatalogItemId,
611 RelationDesc,
612 Vec<ColumnIndex>,
613 Option<MapFilterProject>,
614 ),
615 PlanError,
616> {
617 let table = scx.get_item_by_resolved_name(&table_name)?;
618
619 if table.item_type() != CatalogItemType::Table {
621 sql_bail!(
622 "cannot insert into {} '{}'",
623 table.item_type(),
624 table_name.full_name_str()
625 );
626 }
627
628 let _ = table.writable_table_details().ok_or_else(|| {
629 sql_err!(
630 "cannot insert into non-writeable table '{}'",
631 table_name.full_name_str()
632 )
633 })?;
634
635 if table.id().is_system() {
636 sql_bail!(
637 "cannot insert into system table '{}'",
638 table_name.full_name_str()
639 );
640 }
641 let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643 Ok((id, desc, ordering, mfp))
644}
645
646pub fn plan_copy_from_rows(
649 pcx: &PlanContext,
650 catalog: &dyn SessionCatalog,
651 id: CatalogItemId,
652 columns: Vec<ColumnIndex>,
653 rows: Vec<mz_repr::Row>,
654) -> Result<HirRelationExpr, PlanError> {
655 let scx = StatementContext::new(Some(pcx), catalog);
656
657 let table = catalog
659 .get_item(&id)
660 .at_version(RelationVersionSelector::Latest);
661 let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
662
663 let mut defaults = table
664 .writable_table_details()
665 .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
666 .to_vec();
667
668 for default in &mut defaults {
669 transform_ast::transform(&scx, default)?;
670 }
671
672 let column_types = columns
673 .iter()
674 .map(|x| desc.get_type(x).clone())
675 .map(|mut x| {
676 x.nullable = true;
679 x
680 })
681 .collect();
682 let typ = RelationType::new(column_types);
683 let expr = HirRelationExpr::Constant {
684 rows,
685 typ: typ.clone(),
686 };
687
688 let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
694 if columns == default {
695 return Ok(expr);
696 }
697
698 let mut map_exprs = vec![];
700 let mut project_key = Vec::with_capacity(desc.arity());
701
702 let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
704
705 let column_details = desc.iter_all().zip_eq(defaults);
706 for ((col_idx, _col_name, col_typ), default) in column_details {
707 if let Some(src_idx) = col_to_source.get(&col_idx) {
708 project_key.push(*src_idx);
709 } else {
710 let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
711 project_key.push(typ.arity() + map_exprs.len());
712 map_exprs.push(hir);
713 }
714 }
715
716 Ok(expr.map(map_exprs).project(project_key))
717}
718
719pub struct ReadThenWritePlan {
721 pub id: CatalogItemId,
722 pub selection: HirRelationExpr,
727 pub assignments: BTreeMap<usize, HirScalarExpr>,
729 pub finishing: RowSetFinishing,
730}
731
732pub fn plan_delete_query(
733 scx: &StatementContext,
734 mut delete_stmt: DeleteStatement<Aug>,
735) -> Result<ReadThenWritePlan, PlanError> {
736 transform_ast::transform(scx, &mut delete_stmt)?;
737
738 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
739 plan_mutation_query_inner(
740 qcx,
741 delete_stmt.table_name,
742 delete_stmt.alias,
743 delete_stmt.using,
744 vec![],
745 delete_stmt.selection,
746 )
747}
748
749pub fn plan_update_query(
750 scx: &StatementContext,
751 mut update_stmt: UpdateStatement<Aug>,
752) -> Result<ReadThenWritePlan, PlanError> {
753 transform_ast::transform(scx, &mut update_stmt)?;
754
755 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
756
757 plan_mutation_query_inner(
758 qcx,
759 update_stmt.table_name,
760 update_stmt.alias,
761 vec![],
762 update_stmt.assignments,
763 update_stmt.selection,
764 )
765}
766
767pub fn plan_mutation_query_inner(
768 qcx: QueryContext,
769 table_name: ResolvedItemName,
770 alias: Option<TableAlias>,
771 using: Vec<TableWithJoins<Aug>>,
772 assignments: Vec<Assignment<Aug>>,
773 selection: Option<Expr<Aug>>,
774) -> Result<ReadThenWritePlan, PlanError> {
775 let (id, version) = match table_name {
777 ResolvedItemName::Item { id, version, .. } => (id, version),
778 _ => sql_bail!("cannot mutate non-user table"),
779 };
780
781 let item = qcx.scx.get_item(&id).at_version(version);
783 if item.item_type() != CatalogItemType::Table {
784 sql_bail!(
785 "cannot mutate {} '{}'",
786 item.item_type(),
787 table_name.full_name_str()
788 );
789 }
790 let _ = item.writable_table_details().ok_or_else(|| {
791 sql_err!(
792 "cannot mutate non-writeable table '{}'",
793 table_name.full_name_str()
794 )
795 })?;
796 if id.is_system() {
797 sql_bail!(
798 "cannot mutate system table '{}'",
799 table_name.full_name_str()
800 );
801 }
802
803 let (mut get, scope) = qcx.resolve_table_name(table_name)?;
805 let scope = plan_table_alias(scope, alias.as_ref())?;
806 let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
807 let relation_type = qcx.relation_type(&get);
808
809 if using.is_empty() {
810 if let Some(expr) = selection {
811 let ecx = &ExprContext {
812 qcx: &qcx,
813 name: "WHERE clause",
814 scope: &scope,
815 relation_type: &relation_type,
816 allow_aggregates: false,
817 allow_subqueries: true,
818 allow_parameters: true,
819 allow_windows: false,
820 };
821 let expr = plan_expr(ecx, &expr)?.type_as(ecx, &ScalarType::Bool)?;
822 get = get.filter(vec![expr]);
823 }
824 } else {
825 get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
826 }
827
828 let mut sets = BTreeMap::new();
829 for Assignment { id, value } in assignments {
830 let name = normalize::column_name(id);
832 match desc.get_by_name(&name) {
833 Some((idx, typ)) => {
834 let ecx = &ExprContext {
835 qcx: &qcx,
836 name: "SET clause",
837 scope: &scope,
838 relation_type: &relation_type,
839 allow_aggregates: false,
840 allow_subqueries: false,
841 allow_parameters: true,
842 allow_windows: false,
843 };
844 let expr = plan_expr(ecx, &value)?.cast_to(
845 ecx,
846 CastContext::Assignment,
847 &typ.scalar_type,
848 )?;
849
850 if sets.insert(idx, expr).is_some() {
851 sql_bail!("column {} set twice", name)
852 }
853 }
854 None => sql_bail!("unknown column {}", name),
855 };
856 }
857
858 let finishing = RowSetFinishing {
859 order_by: vec![],
860 limit: None,
861 offset: 0,
862 project: (0..desc.arity()).collect(),
863 };
864
865 Ok(ReadThenWritePlan {
866 id,
867 selection: get,
868 finishing,
869 assignments: sets,
870 })
871}
872
873fn handle_mutation_using_clause(
885 qcx: &QueryContext,
886 selection: Option<Expr<Aug>>,
887 using: Vec<TableWithJoins<Aug>>,
888 get: HirRelationExpr,
889 outer_scope: Scope,
890) -> Result<HirRelationExpr, PlanError> {
891 let (mut using_rel_expr, using_scope) =
895 using.into_iter().try_fold(plan_join_identity(), |l, twj| {
896 let (left, left_scope) = l;
897 plan_join(
898 qcx,
899 left,
900 left_scope,
901 &Join {
902 relation: TableFactor::NestedJoin {
903 join: Box::new(twj),
904 alias: None,
905 },
906 join_operator: JoinOperator::CrossJoin,
907 },
908 )
909 })?;
910
911 if let Some(expr) = selection {
912 let on = HirScalarExpr::literal_true();
918 let joined = using_rel_expr
919 .clone()
920 .join(get.clone(), on, JoinKind::Inner);
921 let joined_scope = using_scope.product(outer_scope)?;
922 let joined_relation_type = qcx.relation_type(&joined);
923
924 let ecx = &ExprContext {
925 qcx,
926 name: "WHERE clause",
927 scope: &joined_scope,
928 relation_type: &joined_relation_type,
929 allow_aggregates: false,
930 allow_subqueries: true,
931 allow_parameters: true,
932 allow_windows: false,
933 };
934
935 let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &ScalarType::Bool)?;
937
938 let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
942 use mz_expr::visit::Visit;
944 expr.visit_mut_post(&mut |e| {
945 if let HirScalarExpr::Column(c) = e {
946 if c.column >= using_rel_arity {
947 c.level += 1;
948 c.column -= using_rel_arity;
949 };
950 }
951 })?;
952
953 using_rel_expr = using_rel_expr.filter(vec![expr]);
957 } else {
958 let _joined_scope = using_scope.product(outer_scope)?;
961 }
962 Ok(get.filter(vec![using_rel_expr.exists()]))
973}
974
975#[derive(Debug)]
976pub(crate) struct CastRelationError {
977 pub(crate) column: usize,
978 pub(crate) source_type: ScalarType,
979 pub(crate) target_type: ScalarType,
980}
981
982pub(crate) fn cast_relation<'a, I>(
986 qcx: &QueryContext,
987 ccx: CastContext,
988 expr: HirRelationExpr,
989 target_types: I,
990) -> Result<HirRelationExpr, CastRelationError>
991where
992 I: IntoIterator<Item = &'a ScalarType>,
993{
994 let ecx = &ExprContext {
995 qcx,
996 name: "values",
997 scope: &Scope::empty(),
998 relation_type: &qcx.relation_type(&expr),
999 allow_aggregates: false,
1000 allow_subqueries: true,
1001 allow_parameters: true,
1002 allow_windows: false,
1003 };
1004 let mut map_exprs = vec![];
1005 let mut project_key = vec![];
1006 for (i, target_typ) in target_types.into_iter().enumerate() {
1007 let expr = HirScalarExpr::column(i);
1008 match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1012 Ok(cast_expr) => {
1013 if expr == cast_expr {
1014 project_key.push(i);
1016 } else {
1017 project_key.push(ecx.relation_type.arity() + map_exprs.len());
1019 map_exprs.push(cast_expr);
1020 }
1021 }
1022 Err(_) => {
1023 return Err(CastRelationError {
1024 column: i,
1025 source_type: ecx.scalar_type(&expr),
1026 target_type: target_typ.clone(),
1027 });
1028 }
1029 }
1030 }
1031 Ok(expr.map(map_exprs).project(project_key))
1032}
1033
1034pub fn plan_up_to(
1036 scx: &StatementContext,
1037 mut up_to: Expr<Aug>,
1038) -> Result<MirScalarExpr, PlanError> {
1039 let scope = Scope::empty();
1040 let desc = RelationDesc::empty();
1041 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1044 transform_ast::transform(scx, &mut up_to)?;
1045 let ecx = &ExprContext {
1046 qcx: &qcx,
1047 name: "UP TO",
1048 scope: &scope,
1049 relation_type: desc.typ(),
1050 allow_aggregates: false,
1051 allow_subqueries: false,
1052 allow_parameters: false,
1053 allow_windows: false,
1054 };
1055 plan_expr(ecx, &up_to)?
1056 .type_as_any(ecx)?
1057 .lower_uncorrelated()
1058}
1059
1060pub fn plan_as_of(
1063 scx: &StatementContext,
1064 as_of: Option<AsOf<Aug>>,
1065) -> Result<QueryWhen, PlanError> {
1066 match as_of {
1067 None => Ok(QueryWhen::Immediately),
1068 Some(mut as_of) => match as_of {
1069 AsOf::At(ref mut expr) | AsOf::AtLeast(ref mut expr) => {
1070 let scope = Scope::empty();
1071 let desc = RelationDesc::empty();
1072 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1075 transform_ast::transform(scx, expr)?;
1076 let ecx = &ExprContext {
1077 qcx: &qcx,
1078 name: "AS OF",
1079 scope: &scope,
1080 relation_type: desc.typ(),
1081 allow_aggregates: false,
1082 allow_subqueries: false,
1083 allow_parameters: false,
1084 allow_windows: false,
1085 };
1086 let expr = plan_expr(ecx, expr)?
1087 .type_as_any(ecx)?
1088 .lower_uncorrelated()?;
1089 match as_of {
1090 AsOf::At(_) => Ok(QueryWhen::AtTimestamp(expr)),
1091 AsOf::AtLeast(_) => Ok(QueryWhen::AtLeastTimestamp(expr)),
1092 }
1093 }
1094 },
1095 }
1096}
1097
1098pub fn plan_secret_as(
1100 scx: &StatementContext,
1101 mut expr: Expr<Aug>,
1102) -> Result<MirScalarExpr, PlanError> {
1103 let scope = Scope::empty();
1104 let desc = RelationDesc::empty();
1105 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1106
1107 transform_ast::transform(scx, &mut expr)?;
1108
1109 let ecx = &ExprContext {
1110 qcx: &qcx,
1111 name: "AS",
1112 scope: &scope,
1113 relation_type: desc.typ(),
1114 allow_aggregates: false,
1115 allow_subqueries: false,
1116 allow_parameters: false,
1117 allow_windows: false,
1118 };
1119 let expr = plan_expr(ecx, &expr)?
1120 .type_as(ecx, &ScalarType::Bytes)?
1121 .lower_uncorrelated()?;
1122 Ok(expr)
1123}
1124
1125pub fn plan_webhook_validate_using(
1127 scx: &StatementContext,
1128 validate_using: CreateWebhookSourceCheck<Aug>,
1129) -> Result<WebhookValidation, PlanError> {
1130 let qcx = QueryContext::root(scx, QueryLifetime::Source);
1131
1132 let CreateWebhookSourceCheck {
1133 options,
1134 using: mut expr,
1135 } = validate_using;
1136
1137 let mut column_typs = vec![];
1138 let mut column_names = vec![];
1139
1140 let (bodies, headers, secrets) = options
1141 .map(|o| (o.bodies, o.headers, o.secrets))
1142 .unwrap_or_default();
1143
1144 let mut body_tuples = vec![];
1146 for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1147 let scalar_type = use_bytes
1148 .then_some(ScalarType::Bytes)
1149 .unwrap_or(ScalarType::String);
1150 let name = alias
1151 .map(|a| a.into_string())
1152 .unwrap_or_else(|| "body".to_string());
1153
1154 column_typs.push(ColumnType {
1155 scalar_type,
1156 nullable: false,
1157 });
1158 column_names.push(name);
1159
1160 let column_idx = column_typs.len() - 1;
1162 assert_eq!(
1164 column_idx,
1165 column_names.len() - 1,
1166 "body column names and types don't match"
1167 );
1168 body_tuples.push((column_idx, use_bytes));
1169 }
1170
1171 let mut header_tuples = vec![];
1173
1174 for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1175 let value_type = use_bytes
1176 .then_some(ScalarType::Bytes)
1177 .unwrap_or(ScalarType::String);
1178 let name = alias
1179 .map(|a| a.into_string())
1180 .unwrap_or_else(|| "headers".to_string());
1181
1182 column_typs.push(ColumnType {
1183 scalar_type: ScalarType::Map {
1184 value_type: Box::new(value_type),
1185 custom_id: None,
1186 },
1187 nullable: false,
1188 });
1189 column_names.push(name);
1190
1191 let column_idx = column_typs.len() - 1;
1193 assert_eq!(
1195 column_idx,
1196 column_names.len() - 1,
1197 "header column names and types don't match"
1198 );
1199 header_tuples.push((column_idx, use_bytes));
1200 }
1201
1202 let mut validation_secrets = vec![];
1204
1205 for CreateWebhookSourceSecret {
1206 secret,
1207 alias,
1208 use_bytes,
1209 } in secrets
1210 {
1211 let scalar_type = use_bytes
1213 .then_some(ScalarType::Bytes)
1214 .unwrap_or(ScalarType::String);
1215
1216 column_typs.push(ColumnType {
1217 scalar_type,
1218 nullable: false,
1219 });
1220 let ResolvedItemName::Item {
1221 id,
1222 full_name: FullItemName { item, .. },
1223 ..
1224 } = secret
1225 else {
1226 return Err(PlanError::InvalidSecret(Box::new(secret)));
1227 };
1228
1229 let name = if let Some(alias) = alias {
1231 alias.into_string()
1232 } else {
1233 item
1234 };
1235 column_names.push(name);
1236
1237 let column_idx = column_typs.len() - 1;
1240 assert_eq!(
1242 column_idx,
1243 column_names.len() - 1,
1244 "column names and types don't match"
1245 );
1246
1247 validation_secrets.push(WebhookValidationSecret {
1248 id,
1249 column_idx,
1250 use_bytes,
1251 });
1252 }
1253
1254 let relation_typ = RelationType::new(column_typs);
1255 let desc = RelationDesc::new(relation_typ, column_names.clone());
1256 let scope = Scope::from_source(None, column_names);
1257
1258 transform_ast::transform(scx, &mut expr)?;
1259
1260 let ecx = &ExprContext {
1261 qcx: &qcx,
1262 name: "CHECK",
1263 scope: &scope,
1264 relation_type: desc.typ(),
1265 allow_aggregates: false,
1266 allow_subqueries: false,
1267 allow_parameters: false,
1268 allow_windows: false,
1269 };
1270 let expr = plan_expr(ecx, &expr)?
1271 .type_as(ecx, &ScalarType::Bool)?
1272 .lower_uncorrelated()?;
1273 let validation = WebhookValidation {
1274 expression: expr,
1275 relation_desc: desc,
1276 bodies: body_tuples,
1277 headers: header_tuples,
1278 secrets: validation_secrets,
1279 };
1280 Ok(validation)
1281}
1282
1283pub fn plan_default_expr(
1284 scx: &StatementContext,
1285 expr: &Expr<Aug>,
1286 target_ty: &ScalarType,
1287) -> Result<HirScalarExpr, PlanError> {
1288 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1289 let ecx = &ExprContext {
1290 qcx: &qcx,
1291 name: "DEFAULT expression",
1292 scope: &Scope::empty(),
1293 relation_type: &RelationType::empty(),
1294 allow_aggregates: false,
1295 allow_subqueries: false,
1296 allow_parameters: false,
1297 allow_windows: false,
1298 };
1299 let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1300 Ok(hir)
1301}
1302
1303pub fn plan_params<'a>(
1304 scx: &'a StatementContext,
1305 params: Vec<Expr<Aug>>,
1306 desc: &StatementDesc,
1307) -> Result<Params, PlanError> {
1308 if params.len() != desc.param_types.len() {
1309 sql_bail!(
1310 "expected {} params, got {}",
1311 desc.param_types.len(),
1312 params.len()
1313 );
1314 }
1315
1316 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1317 let scope = Scope::empty();
1318 let rel_type = RelationType::empty();
1319
1320 let mut datums = Row::default();
1321 let mut packer = datums.packer();
1322 let mut types = Vec::new();
1323 let temp_storage = &RowArena::new();
1324 for (mut expr, ty) in params.into_iter().zip(&desc.param_types) {
1325 transform_ast::transform(scx, &mut expr)?;
1326
1327 let ecx = &ExprContext {
1328 qcx: &qcx,
1329 name: "EXECUTE",
1330 scope: &scope,
1331 relation_type: &rel_type,
1332 allow_aggregates: false,
1333 allow_subqueries: false,
1334 allow_parameters: false,
1335 allow_windows: false,
1336 };
1337 let ex = plan_expr(ecx, &expr)?.type_as_any(ecx)?;
1338 let st = ecx.scalar_type(&ex);
1339 if st != *ty {
1340 sql_bail!(
1341 "mismatched parameter type: expected {}, got {}",
1342 ecx.humanize_scalar_type(ty, false),
1343 ecx.humanize_scalar_type(&st, false),
1344 );
1345 }
1346 let ex = ex.lower_uncorrelated()?;
1347 let evaled = ex.eval(&[], temp_storage)?;
1348 packer.push(evaled);
1349 types.push(st);
1350 }
1351 Ok(Params { datums, types })
1352}
1353
1354pub fn plan_index_exprs<'a>(
1355 scx: &'a StatementContext,
1356 on_desc: &RelationDesc,
1357 exprs: Vec<Expr<Aug>>,
1358) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1359 let scope = Scope::from_source(None, on_desc.iter_names());
1360 let qcx = QueryContext::root(scx, QueryLifetime::Index);
1361
1362 let ecx = &ExprContext {
1363 qcx: &qcx,
1364 name: "CREATE INDEX",
1365 scope: &scope,
1366 relation_type: on_desc.typ(),
1367 allow_aggregates: false,
1368 allow_subqueries: false,
1369 allow_parameters: false,
1370 allow_windows: false,
1371 };
1372 let mut out = vec![];
1373 for mut expr in exprs {
1374 transform_ast::transform(scx, &mut expr)?;
1375 let expr = plan_expr_or_col_index(ecx, &expr)?;
1376 let mut expr = expr.lower_uncorrelated()?;
1377 expr.reduce(&on_desc.typ().column_types);
1378 out.push(expr);
1379 }
1380 Ok(out)
1381}
1382
1383fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1384 match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1385 Some(column) => Ok(HirScalarExpr::column(column)),
1386 _ => plan_expr(ecx, e)?.type_as_any(ecx),
1387 }
1388}
1389
1390fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1391 match e {
1392 Expr::Value(Value::Number(n)) => {
1393 let n = n.parse::<usize>().map_err(|e| {
1394 sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1395 })?;
1396 if n < 1 || n > max {
1397 sql_bail!(
1398 "column reference {} in {} is out of range (1 - {})",
1399 n,
1400 name,
1401 max
1402 );
1403 }
1404 Ok(Some(n - 1))
1405 }
1406 _ => Ok(None),
1407 }
1408}
1409
1410struct PlannedQuery {
1411 expr: HirRelationExpr,
1412 scope: Scope,
1413 order_by: Vec<ColumnOrder>,
1414 limit: Option<HirScalarExpr>,
1415 offset: usize,
1416 project: Vec<usize>,
1417 group_size_hints: GroupSizeHints,
1418}
1419
1420fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1421 qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1422}
1423
1424fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1425 let cte_bindings = plan_ctes(qcx, q)?;
1428
1429 let limit = match &q.limit {
1430 None => None,
1431 Some(Limit {
1432 quantity,
1433 with_ties: false,
1434 }) => {
1435 let ecx = &ExprContext {
1436 qcx,
1437 name: "LIMIT",
1438 scope: &Scope::empty(),
1439 relation_type: &RelationType::empty(),
1440 allow_aggregates: false,
1441 allow_subqueries: true,
1442 allow_parameters: true,
1443 allow_windows: false,
1444 };
1445 let limit = plan_expr(ecx, quantity)?;
1446 let limit = limit.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?;
1447
1448 let limit = if limit.is_constant() {
1449 let arena = RowArena::new();
1450 let limit = limit.lower_uncorrelated()?;
1451
1452 match limit.eval(&[], &arena)? {
1453 d @ Datum::Int64(v) if v >= 0 => HirScalarExpr::literal(d, ScalarType::Int64),
1454 d @ Datum::Null => HirScalarExpr::literal(d, ScalarType::Int64),
1455 Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1456 _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1457 }
1458 } else {
1459 qcx.scx
1461 .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1462 limit
1463 };
1464
1465 Some(limit)
1466 }
1467 Some(Limit {
1468 quantity: _,
1469 with_ties: true,
1470 }) => bail_unsupported!("FETCH ... WITH TIES"),
1471 };
1472 let offset = match &q.offset {
1473 None => 0,
1474 Some(Expr::Value(Value::Number(x))) => x.parse()?,
1475 _ => sql_bail!("OFFSET must be an integer constant"),
1476 };
1477
1478 let mut planned_query = match &q.body {
1479 SetExpr::Select(s) => {
1480 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1482 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1483
1484 let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1485 PlannedQuery {
1486 expr: plan.expr,
1487 scope: plan.scope,
1488 order_by: plan.order_by,
1489 project: plan.project,
1490 limit,
1491 offset,
1492 group_size_hints,
1493 }
1494 }
1495 _ => {
1496 let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1497 let ecx = &ExprContext {
1498 qcx,
1499 name: "ORDER BY clause of a set expression",
1500 scope: &scope,
1501 relation_type: &qcx.relation_type(&expr),
1502 allow_aggregates: false,
1503 allow_subqueries: true,
1504 allow_parameters: true,
1505 allow_windows: false,
1506 };
1507 let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1508 let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1509 let project = (0..ecx.relation_type.arity()).collect();
1510 PlannedQuery {
1511 expr: expr.map(map_exprs),
1512 scope,
1513 order_by,
1514 limit,
1515 project,
1516 offset,
1517 group_size_hints: GroupSizeHints::default(),
1518 }
1519 }
1520 };
1521
1522 match &q.ctes {
1524 CteBlock::Simple(_) => {
1525 for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1526 if let Some(cte) = qcx.ctes.remove(&id) {
1527 planned_query.expr = HirRelationExpr::Let {
1528 name: cte.name,
1529 id: id.clone(),
1530 value: Box::new(value),
1531 body: Box::new(planned_query.expr),
1532 };
1533 }
1534 if let Some(shadowed_val) = shadowed_val {
1535 qcx.ctes.insert(id, shadowed_val);
1536 }
1537 }
1538 }
1539 CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1540 let MutRecBlockOptionExtracted {
1541 recursion_limit,
1542 return_at_recursion_limit,
1543 error_at_recursion_limit,
1544 seen: _,
1545 } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1546 let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1547 (None, None, None) => None,
1548 (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1549 (None, Some(max_iters), None) => Some((max_iters, true)),
1550 (None, None, Some(max_iters)) => Some((max_iters, false)),
1551 _ => {
1552 return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1553 }
1554 }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1555 max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1556 return_at_limit: *return_at_limit,
1557 }))?;
1558
1559 let mut bindings = Vec::new();
1560 for (id, value, shadowed_val) in cte_bindings.into_iter() {
1561 if let Some(cte) = qcx.ctes.remove(&id) {
1562 bindings.push((cte.name, id, value, cte.desc.typ().clone()));
1563 }
1564 if let Some(shadowed_val) = shadowed_val {
1565 qcx.ctes.insert(id, shadowed_val);
1566 }
1567 }
1568 if !bindings.is_empty() {
1569 planned_query.expr = HirRelationExpr::LetRec {
1570 limit,
1571 bindings,
1572 body: Box::new(planned_query.expr),
1573 }
1574 }
1575 }
1576 }
1577
1578 Ok(planned_query)
1579}
1580
1581generate_extracted_config!(
1582 MutRecBlockOption,
1583 (RecursionLimit, u64),
1584 (ReturnAtRecursionLimit, u64),
1585 (ErrorAtRecursionLimit, u64)
1586);
1587
1588pub fn plan_ctes(
1593 qcx: &mut QueryContext,
1594 q: &Query<Aug>,
1595) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1596 let mut result = Vec::new();
1598 let mut shadowed_descs = BTreeMap::new();
1601
1602 if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1604 sql_bail!(
1605 "WITH query name {} specified more than once",
1606 normalize::ident_ref(ident).quoted()
1607 )
1608 }
1609
1610 match &q.ctes {
1611 CteBlock::Simple(ctes) => {
1612 for cte in ctes.iter() {
1614 let cte_name = normalize::ident(cte.alias.name.clone());
1615 let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1616 let typ = qcx.relation_type(&val);
1617 let mut desc = RelationDesc::new(typ, scope.column_names());
1618 plan_utils::maybe_rename_columns(
1619 format!("CTE {}", cte.alias.name),
1620 &mut desc,
1621 &cte.alias.columns,
1622 )?;
1623 let shadowed = qcx.ctes.insert(
1625 cte.id,
1626 CteDesc {
1627 name: cte_name,
1628 desc,
1629 },
1630 );
1631
1632 result.push((cte.id, val, shadowed));
1633 }
1634 }
1635 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1636 for cte in ctes.iter() {
1638 let cte_name = normalize::ident(cte.name.clone());
1639 let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1640 for column in cte.columns.iter() {
1641 desc_columns.push((
1642 normalize::column_name(column.name.clone()),
1643 ColumnType {
1644 scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1645 nullable: true,
1646 },
1647 ));
1648 }
1649 let desc = RelationDesc::from_names_and_types(desc_columns);
1650 let shadowed = qcx.ctes.insert(
1651 cte.id,
1652 CteDesc {
1653 name: cte_name,
1654 desc,
1655 },
1656 );
1657 if let Some(shadowed) = shadowed {
1659 shadowed_descs.insert(cte.id, shadowed);
1660 }
1661 }
1662
1663 for cte in ctes.iter() {
1665 let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1666
1667 let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1668
1669 if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1670 sql_bail!(
1673 "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1674 );
1675 }
1676
1677 if !proposed_typ.keys.is_empty() {
1678 sql_bail!("[internal error]: WMR CTEs do not support keys");
1681 }
1682
1683 let derived_typ = qcx.relation_type(&val);
1685
1686 let type_err = |proposed_typ: &RelationType, derived_typ: RelationType| {
1687 let cte_name = normalize::ident(cte.name.clone());
1688 let proposed_typ = proposed_typ
1689 .column_types
1690 .iter()
1691 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1692 .collect::<Vec<_>>();
1693 let inferred_typ = derived_typ
1694 .column_types
1695 .iter()
1696 .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1697 .collect::<Vec<_>>();
1698 Err(PlanError::RecursiveTypeMismatch(
1699 cte_name,
1700 proposed_typ,
1701 inferred_typ,
1702 ))
1703 };
1704
1705 if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1706 return type_err(proposed_typ, derived_typ);
1707 }
1708
1709 let val = match cast_relation(
1711 qcx,
1712 CastContext::Assignment,
1717 val,
1718 proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1719 ) {
1720 Ok(val) => val,
1721 Err(_) => return type_err(proposed_typ, derived_typ),
1722 };
1723
1724 result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1725 }
1726 }
1727 }
1728
1729 Ok(result)
1730}
1731
1732pub fn plan_nested_query(
1733 qcx: &mut QueryContext,
1734 q: &Query<Aug>,
1735) -> Result<(HirRelationExpr, Scope), PlanError> {
1736 let PlannedQuery {
1737 mut expr,
1738 scope,
1739 order_by,
1740 limit,
1741 offset,
1742 project,
1743 group_size_hints,
1744 } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1745 if limit.is_some() || offset > 0 {
1746 expr = HirRelationExpr::top_k(
1747 expr,
1748 vec![],
1749 order_by,
1750 limit,
1751 offset,
1752 group_size_hints.limit_input_group_size,
1753 );
1754 }
1755 Ok((expr.project(project), scope))
1756}
1757
1758fn plan_set_expr(
1759 qcx: &mut QueryContext,
1760 q: &SetExpr<Aug>,
1761) -> Result<(HirRelationExpr, Scope), PlanError> {
1762 match q {
1763 SetExpr::Select(select) => {
1764 let order_by_exprs = Vec::new();
1765 let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1766 assert!(plan.order_by.is_empty());
1769 Ok((plan.expr.project(plan.project), plan.scope))
1770 }
1771 SetExpr::SetOperation {
1772 op,
1773 all,
1774 left,
1775 right,
1776 } => {
1777 let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1779 let (right_expr, right_scope) =
1780 qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1781
1782 let left_type = qcx.relation_type(&left_expr);
1784 let right_type = qcx.relation_type(&right_expr);
1785 if left_type.arity() != right_type.arity() {
1786 sql_bail!(
1787 "each {} query must have the same number of columns: {} vs {}",
1788 op,
1789 left_type.arity(),
1790 right_type.arity(),
1791 );
1792 }
1793
1794 let left_ecx = &ExprContext {
1799 qcx,
1800 name: &op.to_string(),
1801 scope: &left_scope,
1802 relation_type: &left_type,
1803 allow_aggregates: false,
1804 allow_subqueries: false,
1805 allow_parameters: false,
1806 allow_windows: false,
1807 };
1808 let right_ecx = &ExprContext {
1809 qcx,
1810 name: &op.to_string(),
1811 scope: &right_scope,
1812 relation_type: &right_type,
1813 allow_aggregates: false,
1814 allow_subqueries: false,
1815 allow_parameters: false,
1816 allow_windows: false,
1817 };
1818 let mut left_casts = vec![];
1819 let mut right_casts = vec![];
1820 for (i, (left_type, right_type)) in left_type
1821 .column_types
1822 .iter()
1823 .zip(right_type.column_types.iter())
1824 .enumerate()
1825 {
1826 let types = &[
1827 CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1828 CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1829 ];
1830 let target =
1831 typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1832 match typeconv::plan_cast(
1833 left_ecx,
1834 CastContext::Implicit,
1835 HirScalarExpr::column(i),
1836 &target,
1837 ) {
1838 Ok(expr) => left_casts.push(expr),
1839 Err(_) => sql_bail!(
1840 "{} types {} and {} cannot be matched",
1841 op,
1842 qcx.humanize_scalar_type(&left_type.scalar_type, false),
1843 qcx.humanize_scalar_type(&target, false),
1844 ),
1845 }
1846 match typeconv::plan_cast(
1847 right_ecx,
1848 CastContext::Implicit,
1849 HirScalarExpr::column(i),
1850 &target,
1851 ) {
1852 Ok(expr) => right_casts.push(expr),
1853 Err(_) => sql_bail!(
1854 "{} types {} and {} cannot be matched",
1855 op,
1856 qcx.humanize_scalar_type(&target, false),
1857 qcx.humanize_scalar_type(&right_type.scalar_type, false),
1858 ),
1859 }
1860 }
1861 let lhs = if left_casts
1862 .iter()
1863 .enumerate()
1864 .any(|(i, e)| e != &HirScalarExpr::column(i))
1865 {
1866 let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1867 left_expr.map(left_casts).project(project_key)
1868 } else {
1869 left_expr
1870 };
1871 let rhs = if right_casts
1872 .iter()
1873 .enumerate()
1874 .any(|(i, e)| e != &HirScalarExpr::column(i))
1875 {
1876 let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1877 right_expr.map(right_casts).project(project_key)
1878 } else {
1879 right_expr
1880 };
1881
1882 let relation_expr = match op {
1883 SetOperator::Union => {
1884 if *all {
1885 lhs.union(rhs)
1886 } else {
1887 lhs.union(rhs).distinct()
1888 }
1889 }
1890 SetOperator::Except => Hir::except(all, lhs, rhs),
1891 SetOperator::Intersect => {
1892 let left_clone = lhs.clone();
1898 if *all {
1899 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1900 } else {
1901 lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1902 .distinct()
1903 }
1904 }
1905 };
1906 let scope = Scope::from_source(
1907 None,
1908 left_scope.column_names(),
1910 );
1911
1912 Ok((relation_expr, scope))
1913 }
1914 SetExpr::Values(Values(values)) => plan_values(qcx, values),
1915 SetExpr::Table(name) => {
1916 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1917 Ok((expr, scope))
1918 }
1919 SetExpr::Query(query) => {
1920 let (expr, scope) = plan_nested_query(qcx, query)?;
1921 Ok((expr, scope))
1922 }
1923 SetExpr::Show(stmt) => {
1924 if !qcx.lifetime.allow_show() {
1938 return Err(PlanError::ShowCommandInView);
1939 }
1940
1941 fn to_hirscope(
1944 plan: ShowCreatePlan,
1945 desc: StatementDesc,
1946 ) -> Result<(HirRelationExpr, Scope), PlanError> {
1947 let rows = vec![plan.row.iter().collect::<Vec<_>>()];
1948 let desc = desc.relation_desc.expect("must exist");
1949 let expr = HirRelationExpr::constant(rows, desc.typ().clone());
1950 let scope = Scope::from_source(None, desc.iter_names());
1951 Ok((expr, scope))
1952 }
1953
1954 match stmt.clone() {
1955 ShowStatement::ShowColumns(stmt) => {
1956 show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
1957 }
1958 ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
1959 show::plan_show_create_connection(qcx.scx, stmt.clone())?,
1960 show::describe_show_create_connection(qcx.scx, stmt)?,
1961 ),
1962 ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
1963 show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
1964 show::describe_show_create_cluster(qcx.scx, stmt)?,
1965 ),
1966 ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
1967 show::plan_show_create_index(qcx.scx, stmt.clone())?,
1968 show::describe_show_create_index(qcx.scx, stmt)?,
1969 ),
1970 ShowStatement::ShowCreateSink(stmt) => to_hirscope(
1971 show::plan_show_create_sink(qcx.scx, stmt.clone())?,
1972 show::describe_show_create_sink(qcx.scx, stmt)?,
1973 ),
1974 ShowStatement::ShowCreateSource(stmt) => to_hirscope(
1975 show::plan_show_create_source(qcx.scx, stmt.clone())?,
1976 show::describe_show_create_source(qcx.scx, stmt)?,
1977 ),
1978 ShowStatement::ShowCreateTable(stmt) => to_hirscope(
1979 show::plan_show_create_table(qcx.scx, stmt.clone())?,
1980 show::describe_show_create_table(qcx.scx, stmt)?,
1981 ),
1982 ShowStatement::ShowCreateView(stmt) => to_hirscope(
1983 show::plan_show_create_view(qcx.scx, stmt.clone())?,
1984 show::describe_show_create_view(qcx.scx, stmt)?,
1985 ),
1986 ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
1987 show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
1988 show::describe_show_create_materialized_view(qcx.scx, stmt)?,
1989 ),
1990 ShowStatement::ShowObjects(stmt) => {
1991 show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
1992 }
1993 ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
1994 ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
1995 }
1996 }
1997 }
1998}
1999
2000fn plan_values(
2002 qcx: &QueryContext,
2003 values: &[Vec<Expr<Aug>>],
2004) -> Result<(HirRelationExpr, Scope), PlanError> {
2005 assert!(!values.is_empty());
2006
2007 let ecx = &ExprContext {
2008 qcx,
2009 name: "VALUES",
2010 scope: &Scope::empty(),
2011 relation_type: &RelationType::empty(),
2012 allow_aggregates: false,
2013 allow_subqueries: true,
2014 allow_parameters: true,
2015 allow_windows: false,
2016 };
2017
2018 let ncols = values[0].len();
2019 let nrows = values.len();
2020
2021 let mut cols = vec![vec![]; ncols];
2024 for row in values {
2025 if row.len() != ncols {
2026 sql_bail!(
2027 "VALUES expression has varying number of columns: {} vs {}",
2028 row.len(),
2029 ncols
2030 );
2031 }
2032 for (i, v) in row.iter().enumerate() {
2033 cols[i].push(v);
2034 }
2035 }
2036
2037 let mut col_iters = Vec::with_capacity(ncols);
2039 let mut col_types = Vec::with_capacity(ncols);
2040 for col in &cols {
2041 let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2042 let mut col_type = ecx.column_type(&col[0]);
2043 for val in &col[1..] {
2044 col_type = col_type.union(&ecx.column_type(val))?;
2045 }
2046 col_types.push(col_type);
2047 col_iters.push(col.into_iter());
2048 }
2049
2050 let mut exprs = vec![];
2052 for _ in 0..nrows {
2053 for i in 0..ncols {
2054 exprs.push(col_iters[i].next().unwrap());
2055 }
2056 }
2057 let out = HirRelationExpr::CallTable {
2058 func: mz_expr::TableFunc::Wrap {
2059 width: ncols,
2060 types: col_types,
2061 },
2062 exprs,
2063 };
2064
2065 let mut scope = Scope::empty();
2067 for i in 0..ncols {
2068 let name = format!("column{}", i + 1);
2069 scope.items.push(ScopeItem::from_column_name(name));
2070 }
2071
2072 Ok((out, scope))
2073}
2074
2075fn plan_values_insert(
2085 qcx: &QueryContext,
2086 target_names: &[&ColumnName],
2087 target_types: &[&ScalarType],
2088 values: &[Vec<Expr<Aug>>],
2089) -> Result<HirRelationExpr, PlanError> {
2090 assert!(!values.is_empty());
2091
2092 if !values.iter().map(|row| row.len()).all_equal() {
2093 sql_bail!("VALUES lists must all be the same length");
2094 }
2095
2096 let ecx = &ExprContext {
2097 qcx,
2098 name: "VALUES",
2099 scope: &Scope::empty(),
2100 relation_type: &RelationType::empty(),
2101 allow_aggregates: false,
2102 allow_subqueries: true,
2103 allow_parameters: true,
2104 allow_windows: false,
2105 };
2106
2107 let mut exprs = vec![];
2108 let mut types = vec![];
2109 for row in values {
2110 if row.len() > target_names.len() {
2111 sql_bail!("INSERT has more expressions than target columns");
2112 }
2113 for (column, val) in row.into_iter().enumerate() {
2114 let target_type = &target_types[column];
2115 let val = plan_expr(ecx, val)?;
2116 let val = typeconv::plan_coerce(ecx, val, target_type)?;
2117 let source_type = &ecx.scalar_type(&val);
2118 let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2119 Ok(val) => val,
2120 Err(_) => sql_bail!(
2121 "column {} is of type {} but expression is of type {}",
2122 target_names[column].as_str().quoted(),
2123 qcx.humanize_scalar_type(target_type, false),
2124 qcx.humanize_scalar_type(source_type, false),
2125 ),
2126 };
2127 if column >= types.len() {
2128 types.push(ecx.column_type(&val));
2129 } else {
2130 types[column] = types[column].union(&ecx.column_type(&val))?;
2131 }
2132 exprs.push(val);
2133 }
2134 }
2135
2136 Ok(HirRelationExpr::CallTable {
2137 func: mz_expr::TableFunc::Wrap {
2138 width: values[0].len(),
2139 types,
2140 },
2141 exprs,
2142 })
2143}
2144
2145fn plan_join_identity() -> (HirRelationExpr, Scope) {
2146 let typ = RelationType::new(vec![]);
2147 let expr = HirRelationExpr::constant(vec![vec![]], typ);
2148 let scope = Scope::empty();
2149 (expr, scope)
2150}
2151
2152#[derive(Debug)]
2158struct SelectPlan {
2159 expr: HirRelationExpr,
2160 scope: Scope,
2161 order_by: Vec<ColumnOrder>,
2162 project: Vec<usize>,
2163}
2164
2165generate_extracted_config!(
2166 SelectOption,
2167 (ExpectedGroupSize, u64),
2168 (AggregateInputGroupSize, u64),
2169 (DistinctOnInputGroupSize, u64),
2170 (LimitInputGroupSize, u64)
2171);
2172
2173fn plan_select_from_where(
2191 qcx: &QueryContext,
2192 mut s: Select<Aug>,
2193 mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2194) -> Result<SelectPlan, PlanError> {
2195 let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2202 let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2203
2204 let (mut relation_expr, mut from_scope) =
2206 s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2207 let (left, left_scope) = l;
2208 plan_join(
2209 qcx,
2210 left,
2211 left_scope,
2212 &Join {
2213 relation: TableFactor::NestedJoin {
2214 join: Box::new(twj.clone()),
2215 alias: None,
2216 },
2217 join_operator: JoinOperator::CrossJoin,
2218 },
2219 )
2220 })?;
2221
2222 if let Some(selection) = &s.selection {
2224 let ecx = &ExprContext {
2225 qcx,
2226 name: "WHERE clause",
2227 scope: &from_scope,
2228 relation_type: &qcx.relation_type(&relation_expr),
2229 allow_aggregates: false,
2230 allow_subqueries: true,
2231 allow_parameters: true,
2232 allow_windows: false,
2233 };
2234 let expr = plan_expr(ecx, selection)
2235 .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2236 .type_as(ecx, &ScalarType::Bool)?;
2237 relation_expr = relation_expr.filter(vec![expr]);
2238 }
2239
2240 let (aggregates, table_funcs) = {
2243 let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2244 visitor.visit_select_mut(&mut s);
2245 for o in order_by_exprs.iter_mut() {
2246 visitor.visit_order_by_expr_mut(o);
2247 }
2248 visitor.into_result()?
2249 };
2250 let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2251 if !table_funcs.is_empty() {
2252 let (expr, scope) = plan_scalar_table_funcs(
2253 qcx,
2254 table_funcs,
2255 &mut table_func_names,
2256 &relation_expr,
2257 &from_scope,
2258 )?;
2259 relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2260 from_scope = from_scope.product(scope)?;
2261 }
2262
2263 let projection = {
2265 let ecx = &ExprContext {
2266 qcx,
2267 name: "SELECT clause",
2268 scope: &from_scope,
2269 relation_type: &qcx.relation_type(&relation_expr),
2270 allow_aggregates: true,
2271 allow_subqueries: true,
2272 allow_parameters: true,
2273 allow_windows: true,
2274 };
2275 let mut out = vec![];
2276 for si in &s.projection {
2277 if *si == SelectItem::Wildcard && s.from.is_empty() {
2278 sql_bail!("SELECT * with no tables specified is not valid");
2279 }
2280 out.extend(expand_select_item(ecx, si, &table_func_names)?);
2281 }
2282 out
2283 };
2284
2285 let (mut group_scope, select_all_mapping) = {
2289 let ecx = &ExprContext {
2291 qcx,
2292 name: "GROUP BY clause",
2293 scope: &from_scope,
2294 relation_type: &qcx.relation_type(&relation_expr),
2295 allow_aggregates: false,
2296 allow_subqueries: true,
2297 allow_parameters: true,
2298 allow_windows: false,
2299 };
2300 let mut group_key = vec![];
2301 let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2302 let mut group_hir_exprs = vec![];
2303 let mut group_scope = Scope::empty();
2304 let mut select_all_mapping = BTreeMap::new();
2305
2306 for group_expr in &s.group_by {
2307 let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2308 let new_column = group_key.len();
2309
2310 if let Some(group_expr) = group_expr {
2311 if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2315 existing_scope_item.exprs.insert(group_expr.clone());
2316 continue;
2317 }
2318 }
2319
2320 let mut scope_item = if let HirScalarExpr::Column(ColumnRef {
2321 level: 0,
2322 column: old_column,
2323 }) = &expr
2324 {
2325 select_all_mapping.insert(*old_column, new_column);
2331 let scope_item = ecx.scope.items[*old_column].clone();
2332 scope_item
2333 } else {
2334 ScopeItem::empty()
2335 };
2336
2337 if let Some(group_expr) = group_expr.cloned() {
2338 scope_item.exprs.insert(group_expr);
2339 }
2340
2341 group_key.push(from_scope.len() + group_exprs.len());
2342 group_hir_exprs.push(expr.clone());
2343 group_exprs.insert(expr, scope_item);
2344 }
2345
2346 assert_eq!(group_hir_exprs.len(), group_exprs.len());
2347 for expr in &group_hir_exprs {
2348 if let Some(scope_item) = group_exprs.remove(expr) {
2349 group_scope.items.push(scope_item);
2350 }
2351 }
2352
2353 let ecx = &ExprContext {
2355 qcx,
2356 name: "aggregate function",
2357 scope: &from_scope,
2358 relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2359 allow_aggregates: false,
2360 allow_subqueries: true,
2361 allow_parameters: true,
2362 allow_windows: false,
2363 };
2364 let mut agg_exprs = vec![];
2365 for sql_function in aggregates {
2366 if sql_function.over.is_some() {
2367 unreachable!(
2368 "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2369 );
2370 }
2371 agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2372 group_scope
2373 .items
2374 .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2375 }
2376 if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2377 relation_expr = relation_expr.map(group_hir_exprs).reduce(
2379 group_key,
2380 agg_exprs,
2381 group_size_hints.aggregate_input_group_size,
2382 );
2383
2384 for i in 0..from_scope.len() {
2390 if !select_all_mapping.contains_key(&i) {
2391 let scope_item = &ecx.scope.items[i];
2392 group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2393 table_name: scope_item.table_name.clone(),
2394 column_name: scope_item.column_name.clone(),
2395 allow_unqualified_references: scope_item.allow_unqualified_references,
2396 });
2397 }
2398 }
2399
2400 (group_scope, select_all_mapping)
2401 } else {
2402 (
2404 from_scope.clone(),
2405 (0..from_scope.len()).map(|i| (i, i)).collect(),
2406 )
2407 }
2408 };
2409
2410 if let Some(ref having) = s.having {
2412 let ecx = &ExprContext {
2413 qcx,
2414 name: "HAVING clause",
2415 scope: &group_scope,
2416 relation_type: &qcx.relation_type(&relation_expr),
2417 allow_aggregates: true,
2418 allow_subqueries: true,
2419 allow_parameters: true,
2420 allow_windows: false,
2421 };
2422 let expr = plan_expr(ecx, having)?.type_as(ecx, &ScalarType::Bool)?;
2423 relation_expr = relation_expr.filter(vec![expr]);
2424 }
2425
2426 let window_funcs = {
2439 let mut visitor = WindowFuncCollector::default();
2440 visitor.visit_select(&s);
2444 for o in order_by_exprs.iter() {
2445 visitor.visit_order_by_expr(o);
2446 }
2447 visitor.into_result()
2448 };
2449 for window_func in window_funcs {
2450 let ecx = &ExprContext {
2451 qcx,
2452 name: "window function",
2453 scope: &group_scope,
2454 relation_type: &qcx.relation_type(&relation_expr),
2455 allow_aggregates: true,
2456 allow_subqueries: true,
2457 allow_parameters: true,
2458 allow_windows: true,
2459 };
2460 relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2461 group_scope.items.push(ScopeItem::from_expr(window_func));
2462 }
2463 if let Some(ref qualify) = s.qualify {
2470 let ecx = &ExprContext {
2471 qcx,
2472 name: "QUALIFY clause",
2473 scope: &group_scope,
2474 relation_type: &qcx.relation_type(&relation_expr),
2475 allow_aggregates: true,
2476 allow_subqueries: true,
2477 allow_parameters: true,
2478 allow_windows: true,
2479 };
2480 let expr = plan_expr(ecx, qualify)?.type_as(ecx, &ScalarType::Bool)?;
2481 relation_expr = relation_expr.filter(vec![expr]);
2482 }
2483
2484 let output_columns = {
2486 let mut new_exprs = vec![];
2487 let mut new_type = qcx.relation_type(&relation_expr);
2488 let mut output_columns = vec![];
2489 for (select_item, column_name) in &projection {
2490 let ecx = &ExprContext {
2491 qcx,
2492 name: "SELECT clause",
2493 scope: &group_scope,
2494 relation_type: &new_type,
2495 allow_aggregates: true,
2496 allow_subqueries: true,
2497 allow_parameters: true,
2498 allow_windows: true,
2499 };
2500 let expr = match select_item {
2501 ExpandedSelectItem::InputOrdinal(i) => {
2502 if let Some(column) = select_all_mapping.get(i).copied() {
2503 HirScalarExpr::column(column)
2504 } else {
2505 return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2506 }
2507 }
2508 ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2509 };
2510 if let HirScalarExpr::Column(ColumnRef { level: 0, column }) = expr {
2511 output_columns.push((column, column_name));
2513 } else {
2514 let typ = ecx.column_type(&expr);
2521 new_type.column_types.push(typ);
2522 new_exprs.push(expr);
2523 output_columns.push((group_scope.len(), column_name));
2524 group_scope
2525 .items
2526 .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2527 }
2528 }
2529 relation_expr = relation_expr.map(new_exprs);
2530 output_columns
2531 };
2532 let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2533
2534 let order_by = {
2536 let relation_type = qcx.relation_type(&relation_expr);
2537 let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2538 &ExprContext {
2539 qcx,
2540 name: "ORDER BY clause",
2541 scope: &group_scope,
2542 relation_type: &relation_type,
2543 allow_aggregates: true,
2544 allow_subqueries: true,
2545 allow_parameters: true,
2546 allow_windows: true,
2547 },
2548 &order_by_exprs,
2549 &output_columns,
2550 )?;
2551
2552 match s.distinct {
2553 None => relation_expr = relation_expr.map(map_exprs),
2554 Some(Distinct::EntireRow) => {
2555 if relation_type.arity() == 0 {
2556 sql_bail!("SELECT DISTINCT must have at least one column");
2557 }
2558 if !try_push_projection_order_by(
2562 &mut relation_expr,
2563 &mut project_key,
2564 &mut order_by,
2565 ) {
2566 sql_bail!(
2567 "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2568 );
2569 }
2570 assert!(map_exprs.is_empty());
2571 relation_expr = relation_expr.distinct();
2572 }
2573 Some(Distinct::On(exprs)) => {
2574 let ecx = &ExprContext {
2575 qcx,
2576 name: "DISTINCT ON clause",
2577 scope: &group_scope,
2578 relation_type: &qcx.relation_type(&relation_expr),
2579 allow_aggregates: true,
2580 allow_subqueries: true,
2581 allow_parameters: true,
2582 allow_windows: true,
2583 };
2584
2585 let mut distinct_exprs = vec![];
2586 for expr in &exprs {
2587 let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2588 distinct_exprs.push(expr);
2589 }
2590
2591 let mut distinct_key = vec![];
2592
2593 let arity = relation_type.arity();
2603 for ord in order_by.iter().take(distinct_exprs.len()) {
2604 let mut expr = &HirScalarExpr::column(ord.column);
2607 if ord.column >= arity {
2608 expr = &map_exprs[ord.column - arity];
2609 };
2610 match distinct_exprs.iter().position(move |e| e == expr) {
2611 None => sql_bail!(
2612 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2613 ),
2614 Some(pos) => {
2615 distinct_exprs.remove(pos);
2616 }
2617 }
2618 distinct_key.push(ord.column);
2619 }
2620
2621 for expr in distinct_exprs {
2623 let column = match expr {
2626 HirScalarExpr::Column(ColumnRef { level: 0, column }) => column,
2627 _ => {
2628 map_exprs.push(expr);
2629 arity + map_exprs.len() - 1
2630 }
2631 };
2632 distinct_key.push(column);
2633 }
2634
2635 let distinct_len = distinct_key.len();
2640 relation_expr = HirRelationExpr::top_k(
2641 relation_expr.map(map_exprs),
2642 distinct_key,
2643 order_by.iter().skip(distinct_len).cloned().collect(),
2644 Some(HirScalarExpr::literal(Datum::Int64(1), ScalarType::Int64)),
2645 0,
2646 group_size_hints.distinct_on_input_group_size,
2647 );
2648 }
2649 }
2650
2651 order_by
2652 };
2653
2654 let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2659
2660 Ok(SelectPlan {
2661 expr: relation_expr,
2662 scope,
2663 order_by,
2664 project: project_key,
2665 })
2666}
2667
2668fn plan_scalar_table_funcs(
2669 qcx: &QueryContext,
2670 table_funcs: BTreeMap<Function<Aug>, String>,
2671 table_func_names: &mut BTreeMap<String, Ident>,
2672 relation_expr: &HirRelationExpr,
2673 from_scope: &Scope,
2674) -> Result<(HirRelationExpr, Scope), PlanError> {
2675 let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2676
2677 for (table_func, id) in table_funcs.iter() {
2678 table_func_names.insert(
2679 id.clone(),
2680 Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2682 );
2683 }
2684 if table_funcs.len() == 1 {
2687 let (table_func, id) = table_funcs.iter().next().unwrap();
2688 let (expr, mut scope) =
2689 plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2690
2691 let num_cols = scope.len();
2693 for i in 0..scope.len() {
2694 scope.items[i].table_name = Some(PartialItemName {
2695 database: None,
2696 schema: None,
2697 item: id.clone(),
2698 });
2699 scope.items[i].from_single_column_function = num_cols == 1;
2700 scope.items[i].allow_unqualified_references = false;
2701 }
2702 return Ok((expr, scope));
2703 }
2704 let (expr, mut scope, num_cols) =
2706 plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2707
2708 let mut i = 0;
2710 for (id, num_cols) in table_funcs.values().zip(num_cols) {
2711 for _ in 0..num_cols {
2712 scope.items[i].table_name = Some(PartialItemName {
2713 database: None,
2714 schema: None,
2715 item: id.clone(),
2716 });
2717 scope.items[i].from_single_column_function = num_cols == 1;
2718 scope.items[i].allow_unqualified_references = false;
2719 i += 1;
2720 }
2721 scope.items[i].table_name = Some(PartialItemName {
2725 database: None,
2726 schema: None,
2727 item: id.clone(),
2728 });
2729 scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2730 scope.items[i].allow_unqualified_references = false;
2731 i += 1;
2732 }
2733 scope.items[i].allow_unqualified_references = false;
2735 Ok((expr, scope))
2736}
2737
2738fn plan_group_by_expr<'a>(
2745 ecx: &ExprContext,
2746 group_expr: &'a Expr<Aug>,
2747 projection: &'a [(ExpandedSelectItem, ColumnName)],
2748) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2749 let plan_projection = |column: usize| match &projection[column].0 {
2750 ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2751 ExpandedSelectItem::Expr(expr) => {
2752 Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2753 }
2754 };
2755
2756 if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2759 return plan_projection(column);
2760 }
2761
2762 match group_expr {
2766 Expr::Identifier(names) => match plan_identifier(ecx, names) {
2767 Err(PlanError::UnknownColumn {
2768 table: None,
2769 column,
2770 similar,
2771 }) => {
2772 let mut iter = projection.iter().map(|(_expr, name)| name);
2775 if let Some(i) = iter.position(|n| *n == column) {
2776 if iter.any(|n| *n == column) {
2777 Err(PlanError::AmbiguousColumn(column))
2778 } else {
2779 plan_projection(i)
2780 }
2781 } else {
2782 Err(PlanError::UnknownColumn {
2785 table: None,
2786 column,
2787 similar,
2788 })
2789 }
2790 }
2791 res => Ok((Some(group_expr), res?)),
2792 },
2793 _ => Ok((
2794 Some(group_expr),
2795 plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2796 )),
2797 }
2798}
2799
2800pub(crate) fn plan_order_by_exprs(
2808 ecx: &ExprContext,
2809 order_by_exprs: &[OrderByExpr<Aug>],
2810 output_columns: &[(usize, &ColumnName)],
2811) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2812 let mut order_by = vec![];
2813 let mut map_exprs = vec![];
2814 for obe in order_by_exprs {
2815 let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2816 let column = match expr {
2819 HirScalarExpr::Column(ColumnRef { level: 0, column }) => column,
2820 _ => {
2821 map_exprs.push(expr);
2822 ecx.relation_type.arity() + map_exprs.len() - 1
2823 }
2824 };
2825 order_by.push(resolve_desc_and_nulls_last(obe, column));
2826 }
2827 Ok((order_by, map_exprs))
2828}
2829
2830fn plan_order_by_or_distinct_expr(
2848 ecx: &ExprContext,
2849 expr: &Expr<Aug>,
2850 output_columns: &[(usize, &ColumnName)],
2851) -> Result<HirScalarExpr, PlanError> {
2852 if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2853 return Ok(HirScalarExpr::column(output_columns[i].0));
2854 }
2855
2856 if let Expr::Identifier(names) = expr {
2857 if let [name] = &names[..] {
2858 let name = normalize::column_name(name.clone());
2859 let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2860 if let Some((i, _)) = iter.next() {
2861 match iter.next() {
2862 Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2866 _ => return Ok(HirScalarExpr::column(*i)),
2867 }
2868 }
2869 }
2870 }
2871
2872 plan_expr(ecx, expr)?.type_as_any(ecx)
2873}
2874
2875fn plan_table_with_joins(
2876 qcx: &QueryContext,
2877 table_with_joins: &TableWithJoins<Aug>,
2878) -> Result<(HirRelationExpr, Scope), PlanError> {
2879 let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2880 for join in &table_with_joins.joins {
2881 let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2882 expr = new_expr;
2883 scope = new_scope;
2884 }
2885 Ok((expr, scope))
2886}
2887
2888fn plan_table_factor(
2889 qcx: &QueryContext,
2890 table_factor: &TableFactor<Aug>,
2891) -> Result<(HirRelationExpr, Scope), PlanError> {
2892 match table_factor {
2893 TableFactor::Table { name, alias } => {
2894 let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2895 let scope = plan_table_alias(scope, alias.as_ref())?;
2896 Ok((expr, scope))
2897 }
2898
2899 TableFactor::Function {
2900 function,
2901 alias,
2902 with_ordinality,
2903 } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2904
2905 TableFactor::RowsFrom {
2906 functions,
2907 alias,
2908 with_ordinality,
2909 } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2910
2911 TableFactor::Derived {
2912 lateral,
2913 subquery,
2914 alias,
2915 } => {
2916 let mut qcx = (*qcx).clone();
2917 if !lateral {
2918 for scope in &mut qcx.outer_scopes {
2922 if scope.lateral_barrier {
2923 break;
2924 }
2925 scope.items.clear();
2926 }
2927 }
2928 qcx.outer_scopes[0].lateral_barrier = true;
2929 let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
2930 let scope = plan_table_alias(scope, alias.as_ref())?;
2931 Ok((expr, scope))
2932 }
2933
2934 TableFactor::NestedJoin { join, alias } => {
2935 let (expr, scope) = plan_table_with_joins(qcx, join)?;
2936 let scope = plan_table_alias(scope, alias.as_ref())?;
2937 Ok((expr, scope))
2938 }
2939 }
2940}
2941
2942fn plan_rows_from(
2984 qcx: &QueryContext,
2985 functions: &[Function<Aug>],
2986 alias: Option<&TableAlias>,
2987 with_ordinality: bool,
2988) -> Result<(HirRelationExpr, Scope), PlanError> {
2989 if let [function] = functions {
2992 return plan_solitary_table_function(qcx, function, alias, with_ordinality);
2993 }
2994
2995 let (expr, mut scope, num_cols) = plan_rows_from_internal(
2999 qcx,
3000 functions,
3001 Some(functions[0].name.full_item_name().clone()),
3002 )?;
3003
3004 let mut columns = Vec::new();
3006 let mut offset = 0;
3007 for (idx, cols) in num_cols.into_iter().enumerate() {
3009 for i in 0..cols {
3010 columns.push(offset + i);
3011 }
3012 offset += cols + 1;
3013
3014 scope.items.remove(offset - idx - 1);
3017 }
3018
3019 if with_ordinality {
3022 columns.push(scope.items.len());
3023 } else {
3024 scope.items.pop();
3025 }
3026
3027 let expr = expr.project(columns);
3028
3029 let scope = plan_table_alias(scope, alias)?;
3030 Ok((expr, scope))
3031}
3032
3033fn plan_rows_from_internal<'a>(
3056 qcx: &QueryContext,
3057 functions: impl IntoIterator<Item = &'a Function<Aug>>,
3058 table_name: Option<FullItemName>,
3059) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3060 let mut functions = functions.into_iter();
3061 let mut num_cols = Vec::new();
3062
3063 let (mut left_expr, mut left_scope) =
3067 plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3068 num_cols.push(left_scope.len() - 1);
3069 left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3071 left_scope
3072 .items
3073 .push(ScopeItem::from_column_name("ordinality"));
3074
3075 for function in functions {
3076 let qcx = qcx.empty_derived_context();
3078 let (right_expr, mut right_scope) =
3079 plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3080 num_cols.push(right_scope.len() - 1);
3081 let left_col = left_scope.len() - 1;
3082 let right_col = left_scope.len() + right_scope.len() - 1;
3083 let on = HirScalarExpr::CallBinary {
3084 func: BinaryFunc::Eq,
3085 expr1: Box::new(HirScalarExpr::column(left_col)),
3086 expr2: Box::new(HirScalarExpr::column(right_col)),
3087 };
3088 left_expr = left_expr
3089 .join(right_expr, on, JoinKind::FullOuter)
3090 .map(vec![HirScalarExpr::CallVariadic {
3091 func: VariadicFunc::Coalesce,
3092 exprs: vec![
3093 HirScalarExpr::column(left_col),
3094 HirScalarExpr::column(right_col),
3095 ],
3096 }]);
3097
3098 left_expr = left_expr.project(
3101 (0..left_col) .chain(left_col + 1..right_col + 2) .collect(),
3104 );
3105 right_scope.items.push(left_scope.items.pop().unwrap());
3107
3108 left_scope.items.extend(right_scope.items);
3109 }
3110
3111 Ok((left_expr, left_scope, num_cols))
3112}
3113
3114fn plan_solitary_table_function(
3118 qcx: &QueryContext,
3119 function: &Function<Aug>,
3120 alias: Option<&TableAlias>,
3121 with_ordinality: bool,
3122) -> Result<(HirRelationExpr, Scope), PlanError> {
3123 let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3124
3125 let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3126 if single_column_function {
3127 let item = &mut scope.items[0];
3128
3129 item.from_single_column_function = true;
3132
3133 if let Some(alias) = alias {
3148 if let ScopeItem {
3149 table_name: Some(table_name),
3150 column_name,
3151 ..
3152 } = item
3153 {
3154 if table_name.item.as_str() == column_name.as_str() {
3155 *column_name = normalize::column_name(alias.name.clone());
3156 }
3157 }
3158 }
3159 }
3160
3161 let scope = plan_table_alias(scope, alias)?;
3162 Ok((expr, scope))
3163}
3164
3165fn plan_table_function_internal(
3170 qcx: &QueryContext,
3171 Function {
3172 name,
3173 args,
3174 filter,
3175 over,
3176 distinct,
3177 }: &Function<Aug>,
3178 with_ordinality: bool,
3179 table_name: Option<FullItemName>,
3180) -> Result<(HirRelationExpr, Scope), PlanError> {
3181 assert_none!(filter, "cannot parse table function with FILTER");
3182 assert_none!(over, "cannot parse table function with OVER");
3183 assert!(!*distinct, "cannot parse table function with DISTINCT");
3184
3185 let ecx = &ExprContext {
3186 qcx,
3187 name: "table function arguments",
3188 scope: &Scope::empty(),
3189 relation_type: &RelationType::empty(),
3190 allow_aggregates: false,
3191 allow_subqueries: true,
3192 allow_parameters: true,
3193 allow_windows: false,
3194 };
3195
3196 let scalar_args = match args {
3197 FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3198 FunctionArgs::Args { args, order_by } => {
3199 if !order_by.is_empty() {
3200 sql_bail!(
3201 "ORDER BY specified, but {} is not an aggregate function",
3202 name
3203 );
3204 }
3205 plan_exprs(ecx, args)?
3206 }
3207 };
3208
3209 let table_name = match table_name {
3210 Some(table_name) => table_name.item,
3211 None => name.full_item_name().item.clone(),
3212 };
3213
3214 let scope_name = Some(PartialItemName {
3215 database: None,
3216 schema: None,
3217 item: table_name,
3218 });
3219
3220 let (mut expr, mut scope) = match resolve_func(ecx, name, args)? {
3221 Func::Table(impls) => {
3222 let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3223 let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3224 (tf.expr, scope)
3225 }
3226 Func::Scalar(impls) => {
3227 let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3228 let output = expr.typ(
3229 &qcx.outer_relation_types,
3230 &RelationType::new(vec![]),
3231 &qcx.scx.param_types.borrow(),
3232 );
3233
3234 let relation = RelationType::new(vec![output]);
3235
3236 let function_ident = Ident::new(name.full_item_name().item.clone())?;
3237 let column_name = normalize::column_name(function_ident);
3238 let name = column_name.to_string();
3239
3240 let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3241
3242 (
3243 HirRelationExpr::CallTable {
3244 func: mz_expr::TableFunc::TabletizedScalar { relation, name },
3245 exprs: vec![expr],
3246 },
3247 scope,
3248 )
3249 }
3250 o => sql_bail!(
3251 "{} functions are not supported in functions in FROM",
3252 o.class()
3253 ),
3254 };
3255
3256 if with_ordinality {
3257 expr = expr.map(vec![HirScalarExpr::Windowing(WindowExpr {
3258 func: WindowExprType::Scalar(ScalarWindowExpr {
3259 func: ScalarWindowFunc::RowNumber,
3260 order_by: vec![],
3261 }),
3262 partition_by: vec![],
3263 order_by: vec![],
3264 })]);
3265 scope
3266 .items
3267 .push(ScopeItem::from_name(scope_name, "ordinality"));
3268 }
3269
3270 Ok((expr, scope))
3271}
3272
3273fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3274 if let Some(TableAlias {
3275 name,
3276 columns,
3277 strict,
3278 }) = alias
3279 {
3280 if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3281 sql_bail!(
3282 "{} has {} columns available but {} columns specified",
3283 name,
3284 scope.items.len(),
3285 columns.len()
3286 );
3287 }
3288
3289 let table_name = normalize::ident(name.to_owned());
3290 for (i, item) in scope.items.iter_mut().enumerate() {
3291 item.table_name = if item.allow_unqualified_references {
3292 Some(PartialItemName {
3293 database: None,
3294 schema: None,
3295 item: table_name.clone(),
3296 })
3297 } else {
3298 None
3332 };
3333 item.column_name = columns
3334 .get(i)
3335 .map(|a| normalize::column_name(a.clone()))
3336 .unwrap_or_else(|| item.column_name.clone());
3337 }
3338 }
3339 Ok(scope)
3340}
3341
3342fn invent_column_name(
3346 ecx: &ExprContext,
3347 expr: &Expr<Aug>,
3348 table_func_names: &BTreeMap<String, Ident>,
3349) -> Option<ColumnName> {
3350 #[derive(Debug)]
3357 enum NameQuality {
3358 Low,
3359 High,
3360 }
3361
3362 fn invent(
3363 ecx: &ExprContext,
3364 expr: &Expr<Aug>,
3365 table_func_names: &BTreeMap<String, Ident>,
3366 ) -> Option<(ColumnName, NameQuality)> {
3367 match expr {
3368 Expr::Identifier(names) => {
3369 if let [name] = names.as_slice() {
3370 if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3371 return Some((
3372 normalize::column_name(table_func_name.clone()),
3373 NameQuality::High,
3374 ));
3375 }
3376 }
3377 names
3378 .last()
3379 .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3380 }
3381 Expr::Value(v) => match v {
3382 Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3385 Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3386 _ => None,
3387 },
3388 Expr::Function(func) => {
3389 let (schema, item) = match &func.name {
3390 ResolvedItemName::Item {
3391 qualifiers,
3392 full_name,
3393 ..
3394 } => (&qualifiers.schema_spec, full_name.item.clone()),
3395 _ => unreachable!(),
3396 };
3397
3398 if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3399 || schema
3400 == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3401 {
3402 None
3403 } else {
3404 Some((item.into(), NameQuality::High))
3405 }
3406 }
3407 Expr::HomogenizingFunction { function, .. } => Some((
3408 function.to_string().to_lowercase().into(),
3409 NameQuality::High,
3410 )),
3411 Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3412 Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3413 Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3414 Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3415 Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3416 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3417 _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3418 },
3419 Expr::Case { else_result, .. } => {
3420 match else_result
3421 .as_ref()
3422 .and_then(|else_result| invent(ecx, else_result, table_func_names))
3423 {
3424 Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3425 _ => Some(("case".into(), NameQuality::Low)),
3426 }
3427 }
3428 Expr::FieldAccess { field, .. } => {
3429 Some((normalize::column_name(field.clone()), NameQuality::High))
3430 }
3431 Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3432 Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3433 Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3434 let (_expr, scope) =
3438 plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3439 scope
3440 .items
3441 .first()
3442 .map(|name| (name.column_name.clone(), NameQuality::High))
3443 }
3444 Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3445 _ => None,
3446 }
3447 }
3448
3449 invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3450}
3451
3452#[derive(Debug)]
3453enum ExpandedSelectItem<'a> {
3454 InputOrdinal(usize),
3455 Expr(Cow<'a, Expr<Aug>>),
3456}
3457
3458impl ExpandedSelectItem<'_> {
3459 fn as_expr(&self) -> Option<&Expr<Aug>> {
3460 match self {
3461 ExpandedSelectItem::InputOrdinal(_) => None,
3462 ExpandedSelectItem::Expr(expr) => Some(expr),
3463 }
3464 }
3465}
3466
3467fn expand_select_item<'a>(
3468 ecx: &ExprContext,
3469 s: &'a SelectItem<Aug>,
3470 table_func_names: &BTreeMap<String, Ident>,
3471) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3472 match s {
3473 SelectItem::Expr {
3474 expr: Expr::QualifiedWildcard(table_name),
3475 alias: _,
3476 } => {
3477 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3478 let table_name =
3479 normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3480 let out: Vec<_> = ecx
3481 .scope
3482 .items
3483 .iter()
3484 .enumerate()
3485 .filter(|(_i, item)| item.is_from_table(&table_name))
3486 .map(|(i, item)| {
3487 let name = item.column_name.clone();
3488 (ExpandedSelectItem::InputOrdinal(i), name)
3489 })
3490 .collect();
3491 if out.is_empty() {
3492 sql_bail!("no table named '{}' in scope", table_name);
3493 }
3494 Ok(out)
3495 }
3496 SelectItem::Expr {
3497 expr: Expr::WildcardAccess(sql_expr),
3498 alias: _,
3499 } => {
3500 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3501 let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3507 let fields = match ecx.scalar_type(&expr) {
3508 ScalarType::Record { fields, .. } => fields,
3509 ty => sql_bail!(
3510 "type {} is not composite",
3511 ecx.humanize_scalar_type(&ty, false)
3512 ),
3513 };
3514 let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3515 if let Expr::Identifier(ident) = sql_expr.as_ref() {
3516 if let [name] = ident.as_slice() {
3517 if let Ok(items) = ecx.scope.items_from_table(
3518 &[],
3519 &PartialItemName {
3520 database: None,
3521 schema: None,
3522 item: name.as_str().to_string(),
3523 },
3524 ) {
3525 for (_, item) in items {
3526 if item
3527 .is_exists_column_for_a_table_function_that_was_in_the_target_list
3528 {
3529 skip_cols.insert(item.column_name.clone());
3530 }
3531 }
3532 }
3533 }
3534 }
3535 let items = fields
3536 .iter()
3537 .filter_map(|(name, _ty)| {
3538 if skip_cols.contains(name) {
3539 None
3540 } else {
3541 let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3542 expr: sql_expr.clone(),
3543 field: name.clone().into(),
3544 }));
3545 Some((item, name.clone()))
3546 }
3547 })
3548 .collect();
3549 Ok(items)
3550 }
3551 SelectItem::Wildcard => {
3552 *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3553 let items: Vec<_> = ecx
3554 .scope
3555 .items
3556 .iter()
3557 .enumerate()
3558 .filter(|(_i, item)| item.allow_unqualified_references)
3559 .map(|(i, item)| {
3560 let name = item.column_name.clone();
3561 (ExpandedSelectItem::InputOrdinal(i), name)
3562 })
3563 .collect();
3564
3565 Ok(items)
3566 }
3567 SelectItem::Expr { expr, alias } => {
3568 let name = alias
3569 .clone()
3570 .map(normalize::column_name)
3571 .or_else(|| invent_column_name(ecx, expr, table_func_names))
3572 .unwrap_or_else(|| "?column?".into());
3573 Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3574 }
3575 }
3576}
3577
3578fn plan_join(
3579 left_qcx: &QueryContext,
3580 left: HirRelationExpr,
3581 left_scope: Scope,
3582 join: &Join<Aug>,
3583) -> Result<(HirRelationExpr, Scope), PlanError> {
3584 const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3585 let (kind, constraint) = match &join.join_operator {
3586 JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3587 JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3588 JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3589 JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3590 JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3591 };
3592
3593 let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3594 if !kind.can_be_correlated() {
3595 for item in &mut right_qcx.outer_scopes[0].items {
3596 item.error_if_referenced =
3601 Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3602 table: table.cloned(),
3603 column: column.clone(),
3604 });
3605 }
3606 }
3607 let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3608
3609 let (expr, scope) = match constraint {
3610 JoinConstraint::On(expr) => {
3611 let product_scope = left_scope.product(right_scope)?;
3612 let ecx = &ExprContext {
3613 qcx: left_qcx,
3614 name: "ON clause",
3615 scope: &product_scope,
3616 relation_type: &RelationType::new(
3617 left_qcx
3618 .relation_type(&left)
3619 .column_types
3620 .into_iter()
3621 .chain(right_qcx.relation_type(&right).column_types)
3622 .collect(),
3623 ),
3624 allow_aggregates: false,
3625 allow_subqueries: true,
3626 allow_parameters: true,
3627 allow_windows: false,
3628 };
3629 let on = plan_expr(ecx, expr)?.type_as(ecx, &ScalarType::Bool)?;
3630 let joined = left.join(right, on, kind);
3631 (joined, product_scope)
3632 }
3633 JoinConstraint::Using { columns, alias } => {
3634 let column_names = columns
3635 .iter()
3636 .map(|ident| normalize::column_name(ident.clone()))
3637 .collect::<Vec<_>>();
3638
3639 plan_using_constraint(
3640 &column_names,
3641 left_qcx,
3642 left,
3643 left_scope,
3644 &right_qcx,
3645 right,
3646 right_scope,
3647 kind,
3648 alias.as_ref(),
3649 )?
3650 }
3651 JoinConstraint::Natural => {
3652 *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3655 *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3656 let left_column_names = left_scope.column_names();
3657 let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3658 let column_names: Vec<_> = left_column_names
3659 .filter(|col| right_column_names.contains(col))
3660 .cloned()
3661 .collect();
3662 plan_using_constraint(
3663 &column_names,
3664 left_qcx,
3665 left,
3666 left_scope,
3667 &right_qcx,
3668 right,
3669 right_scope,
3670 kind,
3671 None,
3672 )?
3673 }
3674 };
3675 Ok((expr, scope))
3676}
3677
3678#[allow(clippy::too_many_arguments)]
3680fn plan_using_constraint(
3681 column_names: &[ColumnName],
3682 left_qcx: &QueryContext,
3683 left: HirRelationExpr,
3684 left_scope: Scope,
3685 right_qcx: &QueryContext,
3686 right: HirRelationExpr,
3687 right_scope: Scope,
3688 kind: JoinKind,
3689 alias: Option<&Ident>,
3690) -> Result<(HirRelationExpr, Scope), PlanError> {
3691 let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3692
3693 let mut unique_column_names = BTreeSet::new();
3696 for c in column_names {
3697 if !unique_column_names.insert(c) {
3698 return Err(PlanError::Unsupported {
3699 feature: format!(
3700 "column name {} appears more than once in USING clause",
3701 c.as_str().quoted()
3702 ),
3703 discussion_no: None,
3704 });
3705 }
3706 }
3707
3708 let alias_item_name = alias.map(|alias| PartialItemName {
3709 database: None,
3710 schema: None,
3711 item: alias.clone().to_string(),
3712 });
3713
3714 if let Some(alias_item_name) = &alias_item_name {
3715 for partial_item_name in both_scope.table_names() {
3716 if partial_item_name.matches(alias_item_name) {
3717 sql_bail!(
3718 "table name \"{}\" specified more than once",
3719 alias_item_name
3720 )
3721 }
3722 }
3723 }
3724
3725 let ecx = &ExprContext {
3726 qcx: right_qcx,
3727 name: "USING clause",
3728 scope: &both_scope,
3729 relation_type: &RelationType::new(
3730 left_qcx
3731 .relation_type(&left)
3732 .column_types
3733 .into_iter()
3734 .chain(right_qcx.relation_type(&right).column_types)
3735 .collect(),
3736 ),
3737 allow_aggregates: false,
3738 allow_subqueries: false,
3739 allow_parameters: false,
3740 allow_windows: false,
3741 };
3742
3743 let mut join_exprs = vec![];
3744 let mut map_exprs = vec![];
3745 let mut new_items = vec![];
3746 let mut join_cols = vec![];
3747 let mut hidden_cols = vec![];
3748
3749 for column_name in column_names {
3750 let lhs = left_scope.resolve_using_column(column_name, JoinSide::Left)?;
3751 let mut rhs = right_scope.resolve_using_column(column_name, JoinSide::Right)?;
3752
3753 rhs.column += left_scope.len();
3755
3756 let mut exprs = coerce_homogeneous_exprs(
3758 &ecx.with_name(&format!(
3759 "NATURAL/USING join column {}",
3760 column_name.as_str().quoted()
3761 )),
3762 vec![
3763 CoercibleScalarExpr::Coerced(HirScalarExpr::Column(lhs)),
3764 CoercibleScalarExpr::Coerced(HirScalarExpr::Column(rhs)),
3765 ],
3766 None,
3767 )?;
3768 let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3769
3770 match kind {
3771 JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3772 join_cols.push(lhs.column);
3773 hidden_cols.push(rhs.column);
3774 }
3775 JoinKind::RightOuter => {
3776 join_cols.push(rhs.column);
3777 hidden_cols.push(lhs.column);
3778 }
3779 JoinKind::FullOuter => {
3780 join_cols.push(both_scope.items.len() + map_exprs.len());
3783 hidden_cols.push(lhs.column);
3784 hidden_cols.push(rhs.column);
3785 map_exprs.push(HirScalarExpr::CallVariadic {
3786 func: VariadicFunc::Coalesce,
3787 exprs: vec![expr1.clone(), expr2.clone()],
3788 });
3789 new_items.push(ScopeItem::from_column_name(column_name));
3790 }
3791 }
3792
3793 if alias_item_name.is_some() {
3798 let new_item_col = both_scope.items.len() + new_items.len();
3799 join_cols.push(new_item_col);
3800 hidden_cols.push(new_item_col);
3801
3802 new_items.push(ScopeItem::from_name(
3803 alias_item_name.clone(),
3804 column_name.clone().to_string(),
3805 ));
3806
3807 map_exprs.push(HirScalarExpr::Column(lhs));
3810 }
3811
3812 join_exprs.push(HirScalarExpr::CallBinary {
3813 func: BinaryFunc::Eq,
3814 expr1: Box::new(expr1),
3815 expr2: Box::new(expr2),
3816 });
3817 }
3818 both_scope.items.extend(new_items);
3819
3820 for c in hidden_cols {
3824 both_scope.items[c].allow_unqualified_references = false;
3825 }
3826
3827 let project_key = join_cols
3829 .into_iter()
3830 .chain(0..both_scope.items.len())
3831 .unique()
3832 .collect::<Vec<_>>();
3833
3834 both_scope = both_scope.project(&project_key);
3835
3836 let on = HirScalarExpr::variadic_and(join_exprs);
3837
3838 let both = left
3839 .join(right, on, kind)
3840 .map(map_exprs)
3841 .project(project_key);
3842 Ok((both, both_scope))
3843}
3844
3845pub fn plan_expr<'a>(
3846 ecx: &'a ExprContext,
3847 e: &Expr<Aug>,
3848) -> Result<CoercibleScalarExpr, PlanError> {
3849 ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3850}
3851
3852fn plan_expr_inner<'a>(
3853 ecx: &'a ExprContext,
3854 e: &Expr<Aug>,
3855) -> Result<CoercibleScalarExpr, PlanError> {
3856 if let Some(i) = ecx.scope.resolve_expr(e) {
3857 return Ok(HirScalarExpr::Column(i).into());
3859 }
3860
3861 match e {
3862 Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
3864 Ok(plan_identifier(ecx, names)?.into())
3865 }
3866
3867 Expr::Value(val) => plan_literal(val),
3869 Expr::Parameter(n) => plan_parameter(ecx, *n),
3870 Expr::Array(exprs) => plan_array(ecx, exprs, None),
3871 Expr::List(exprs) => plan_list(ecx, exprs, None),
3872 Expr::Map(exprs) => plan_map(ecx, exprs, None),
3873 Expr::Row { exprs } => plan_row(ecx, exprs),
3874
3875 Expr::Op { op, expr1, expr2 } => {
3877 Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
3878 }
3879 Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
3880 Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
3881
3882 Expr::Not { expr } => plan_not(ecx, expr),
3884 Expr::And { left, right } => plan_and(ecx, left, right),
3885 Expr::Or { left, right } => plan_or(ecx, left, right),
3886 Expr::IsExpr {
3887 expr,
3888 construct,
3889 negated,
3890 } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
3891 Expr::Case {
3892 operand,
3893 conditions,
3894 results,
3895 else_result,
3896 } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
3897 Expr::HomogenizingFunction { function, exprs } => {
3898 plan_homogenizing_function(ecx, function, exprs)
3899 }
3900 Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
3901 ecx,
3902 &None,
3903 &[l_expr.clone().equals(*r_expr.clone())],
3904 &[Expr::null()],
3905 &Some(Box::new(*l_expr.clone())),
3906 )?
3907 .into()),
3908 Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
3909 Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
3910 Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
3911 Expr::Like {
3912 expr,
3913 pattern,
3914 escape,
3915 case_insensitive,
3916 negated,
3917 } => Ok(plan_like(
3918 ecx,
3919 expr,
3920 pattern,
3921 escape.as_deref(),
3922 *case_insensitive,
3923 *negated,
3924 )?
3925 .into()),
3926
3927 Expr::InList {
3928 expr,
3929 list,
3930 negated,
3931 } => plan_in_list(ecx, expr, list, negated),
3932
3933 Expr::Exists(query) => plan_exists(ecx, query),
3935 Expr::Subquery(query) => plan_subquery(ecx, query),
3936 Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
3937 Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
3938 Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
3939 Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
3940 Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
3941 Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
3942 Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
3943 Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
3944 Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
3945 Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
3946 Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
3947 }
3948}
3949
3950fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
3951 if !ecx.allow_parameters {
3952 return Err(PlanError::UnknownParameter(n));
3956 }
3957 if n == 0 || n > 65536 {
3958 return Err(PlanError::UnknownParameter(n));
3959 }
3960 if ecx.param_types().borrow().contains_key(&n) {
3961 Ok(HirScalarExpr::Parameter(n).into())
3962 } else {
3963 Ok(CoercibleScalarExpr::Parameter(n))
3964 }
3965}
3966
3967fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
3968 let mut out = vec![];
3969 for e in exprs {
3970 out.push(plan_expr(ecx, e)?);
3971 }
3972 Ok(CoercibleScalarExpr::LiteralRecord(out))
3973}
3974
3975fn plan_cast(
3976 ecx: &ExprContext,
3977 expr: &Expr<Aug>,
3978 data_type: &ResolvedDataType,
3979) -> Result<CoercibleScalarExpr, PlanError> {
3980 let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
3981 let expr = match expr {
3982 Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
3991 Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
3992 Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
3993 _ => plan_expr(ecx, expr)?,
3994 };
3995 let ecx = &ecx.with_name("CAST");
3996 let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
3997 let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
3998 Ok(expr.into())
3999}
4000
4001fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4002 let ecx = ecx.with_name("NOT argument");
4003 Ok(HirScalarExpr::CallUnary {
4004 func: UnaryFunc::Not(expr_func::Not),
4005 expr: Box::new(plan_expr(&ecx, expr)?.type_as(&ecx, &ScalarType::Bool)?),
4006 }
4007 .into())
4008}
4009
4010fn plan_and(
4011 ecx: &ExprContext,
4012 left: &Expr<Aug>,
4013 right: &Expr<Aug>,
4014) -> Result<CoercibleScalarExpr, PlanError> {
4015 let ecx = ecx.with_name("AND argument");
4016 Ok(HirScalarExpr::variadic_and(vec![
4017 plan_expr(&ecx, left)?.type_as(&ecx, &ScalarType::Bool)?,
4018 plan_expr(&ecx, right)?.type_as(&ecx, &ScalarType::Bool)?,
4019 ])
4020 .into())
4021}
4022
4023fn plan_or(
4024 ecx: &ExprContext,
4025 left: &Expr<Aug>,
4026 right: &Expr<Aug>,
4027) -> Result<CoercibleScalarExpr, PlanError> {
4028 let ecx = ecx.with_name("OR argument");
4029 Ok(HirScalarExpr::variadic_or(vec![
4030 plan_expr(&ecx, left)?.type_as(&ecx, &ScalarType::Bool)?,
4031 plan_expr(&ecx, right)?.type_as(&ecx, &ScalarType::Bool)?,
4032 ])
4033 .into())
4034}
4035
4036fn plan_in_list(
4037 ecx: &ExprContext,
4038 lhs: &Expr<Aug>,
4039 list: &Vec<Expr<Aug>>,
4040 negated: &bool,
4041) -> Result<CoercibleScalarExpr, PlanError> {
4042 let ecx = ecx.with_name("IN list");
4043 let or = HirScalarExpr::variadic_or(
4044 list.into_iter()
4045 .map(|e| {
4046 let eq = lhs.clone().equals(e.clone());
4047 plan_expr(&ecx, &eq)?.type_as(&ecx, &ScalarType::Bool)
4048 })
4049 .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4050 );
4051 Ok(if *negated {
4052 or.call_unary(UnaryFunc::Not(expr_func::Not))
4053 } else {
4054 or
4055 }
4056 .into())
4057}
4058
4059fn plan_homogenizing_function(
4060 ecx: &ExprContext,
4061 function: &HomogenizingFunction,
4062 exprs: &[Expr<Aug>],
4063) -> Result<CoercibleScalarExpr, PlanError> {
4064 assert!(!exprs.is_empty()); let expr = HirScalarExpr::CallVariadic {
4066 func: match function {
4067 HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4068 HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4069 HomogenizingFunction::Least => VariadicFunc::Least,
4070 },
4071 exprs: coerce_homogeneous_exprs(
4072 &ecx.with_name(&function.to_string().to_lowercase()),
4073 plan_exprs(ecx, exprs)?,
4074 None,
4075 )?,
4076 };
4077 Ok(expr.into())
4078}
4079
4080fn plan_field_access(
4081 ecx: &ExprContext,
4082 expr: &Expr<Aug>,
4083 field: &Ident,
4084) -> Result<CoercibleScalarExpr, PlanError> {
4085 let field = normalize::column_name(field.clone());
4086 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4087 let ty = ecx.scalar_type(&expr);
4088 let i = match &ty {
4089 ScalarType::Record { fields, .. } => fields.iter().position(|(name, _ty)| *name == field),
4090 ty => sql_bail!(
4091 "column notation applied to type {}, which is not a composite type",
4092 ecx.humanize_scalar_type(ty, false)
4093 ),
4094 };
4095 match i {
4096 None => sql_bail!(
4097 "field {} not found in data type {}",
4098 field,
4099 ecx.humanize_scalar_type(&ty, false)
4100 ),
4101 Some(i) => Ok(expr
4102 .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4103 .into()),
4104 }
4105}
4106
4107fn plan_subscript(
4108 ecx: &ExprContext,
4109 expr: &Expr<Aug>,
4110 positions: &[SubscriptPosition<Aug>],
4111) -> Result<CoercibleScalarExpr, PlanError> {
4112 assert!(
4113 !positions.is_empty(),
4114 "subscript expression must contain at least one position"
4115 );
4116
4117 let ecx = &ecx.with_name("subscripting");
4118 let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4119 let ty = ecx.scalar_type(&expr);
4120 match &ty {
4121 ScalarType::Array(..) | ScalarType::Int2Vector => plan_subscript_array(
4122 ecx,
4123 expr,
4124 positions,
4125 if ty == ScalarType::Int2Vector { 1 } else { 0 },
4129 ),
4130 ScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4131 ScalarType::List { element_type, .. } => {
4132 let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4134 let n_layers = ty.unwrap_list_n_layers();
4135 plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4136 }
4137 ty => sql_bail!(
4138 "cannot subscript type {}",
4139 ecx.humanize_scalar_type(ty, false)
4140 ),
4141 }
4142}
4143
4144fn extract_scalar_subscript_from_positions<'a>(
4148 positions: &'a [SubscriptPosition<Aug>],
4149 expr_type_name: &str,
4150) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4151 let mut scalar_subscripts = Vec::with_capacity(positions.len());
4152 for p in positions {
4153 if p.explicit_slice {
4154 sql_bail!("{} subscript does not support slices", expr_type_name);
4155 }
4156 assert!(
4157 p.end.is_none(),
4158 "index-appearing subscripts cannot have end value"
4159 );
4160 scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4161 }
4162 Ok(scalar_subscripts)
4163}
4164
4165fn plan_subscript_array(
4166 ecx: &ExprContext,
4167 expr: HirScalarExpr,
4168 positions: &[SubscriptPosition<Aug>],
4169 offset: i64,
4170) -> Result<CoercibleScalarExpr, PlanError> {
4171 let mut exprs = Vec::with_capacity(positions.len() + 1);
4172 exprs.push(expr);
4173
4174 let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4177
4178 for i in indexes {
4179 exprs.push(plan_expr(ecx, i)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?);
4180 }
4181
4182 Ok(HirScalarExpr::CallVariadic {
4183 func: VariadicFunc::ArrayIndex { offset },
4184 exprs,
4185 }
4186 .into())
4187}
4188
4189fn plan_subscript_list(
4190 ecx: &ExprContext,
4191 mut expr: HirScalarExpr,
4192 positions: &[SubscriptPosition<Aug>],
4193 mut remaining_layers: usize,
4194 elem_type_name: &str,
4195) -> Result<CoercibleScalarExpr, PlanError> {
4196 let mut i = 0;
4197
4198 while i < positions.len() {
4199 let j = positions[i..]
4201 .iter()
4202 .position(|p| p.explicit_slice)
4203 .unwrap_or(positions.len() - i);
4204 if j != 0 {
4205 let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4206 let (n, e) = plan_index_list(
4207 ecx,
4208 expr,
4209 indexes.as_slice(),
4210 remaining_layers,
4211 elem_type_name,
4212 )?;
4213 remaining_layers = n;
4214 expr = e;
4215 i += j;
4216 }
4217
4218 let j = positions[i..]
4220 .iter()
4221 .position(|p| !p.explicit_slice)
4222 .unwrap_or(positions.len() - i);
4223 if j != 0 {
4224 expr = plan_slice_list(
4225 ecx,
4226 expr,
4227 &positions[i..i + j],
4228 remaining_layers,
4229 elem_type_name,
4230 )?;
4231 i += j;
4232 }
4233 }
4234
4235 Ok(expr.into())
4236}
4237
4238fn plan_index_list(
4239 ecx: &ExprContext,
4240 expr: HirScalarExpr,
4241 indexes: &[&Expr<Aug>],
4242 n_layers: usize,
4243 elem_type_name: &str,
4244) -> Result<(usize, HirScalarExpr), PlanError> {
4245 let depth = indexes.len();
4246
4247 if depth > n_layers {
4248 if n_layers == 0 {
4249 sql_bail!("cannot subscript type {}", elem_type_name)
4250 } else {
4251 sql_bail!(
4252 "cannot index into {} layers; list only has {} layer{}",
4253 depth,
4254 n_layers,
4255 if n_layers == 1 { "" } else { "s" }
4256 )
4257 }
4258 }
4259
4260 let mut exprs = Vec::with_capacity(depth + 1);
4261 exprs.push(expr);
4262
4263 for i in indexes {
4264 exprs.push(plan_expr(ecx, i)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?);
4265 }
4266
4267 Ok((
4268 n_layers - depth,
4269 HirScalarExpr::CallVariadic {
4270 func: VariadicFunc::ListIndex,
4271 exprs,
4272 },
4273 ))
4274}
4275
4276fn plan_slice_list(
4277 ecx: &ExprContext,
4278 expr: HirScalarExpr,
4279 slices: &[SubscriptPosition<Aug>],
4280 n_layers: usize,
4281 elem_type_name: &str,
4282) -> Result<HirScalarExpr, PlanError> {
4283 if n_layers == 0 {
4284 sql_bail!("cannot subscript type {}", elem_type_name)
4285 }
4286
4287 let mut exprs = Vec::with_capacity(slices.len() + 1);
4289 exprs.push(expr);
4290 let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4292 Ok(match position {
4293 Some(p) => {
4294 plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?
4295 }
4296 None => HirScalarExpr::literal(Datum::Int64(default), ScalarType::Int64),
4297 })
4298 };
4299 for p in slices {
4300 let start = extract_position_or_default(p.start.as_ref(), 1)?;
4301 let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4302 exprs.push(start);
4303 exprs.push(end);
4304 }
4305
4306 Ok(HirScalarExpr::CallVariadic {
4307 func: VariadicFunc::ListSliceLinear,
4308 exprs,
4309 })
4310}
4311
4312fn plan_like(
4313 ecx: &ExprContext,
4314 expr: &Expr<Aug>,
4315 pattern: &Expr<Aug>,
4316 escape: Option<&Expr<Aug>>,
4317 case_insensitive: bool,
4318 not: bool,
4319) -> Result<HirScalarExpr, PlanError> {
4320 use CastContext::Implicit;
4321 let ecx = ecx.with_name("LIKE argument");
4322 let expr = plan_expr(&ecx, expr)?;
4323 let haystack = match ecx.scalar_type(&expr) {
4324 CoercibleScalarType::Coerced(ref ty @ ScalarType::Char { length }) => expr
4325 .type_as(&ecx, ty)?
4326 .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4327 _ => expr.cast_to(&ecx, Implicit, &ScalarType::String)?,
4328 };
4329 let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &ScalarType::String)?;
4330 if let Some(escape) = escape {
4331 pattern = pattern.call_binary(
4332 plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &ScalarType::String)?,
4333 BinaryFunc::LikeEscape,
4334 );
4335 }
4336 let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
4337 if not {
4338 Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4339 } else {
4340 Ok(like)
4341 }
4342}
4343
4344fn plan_subscript_jsonb(
4345 ecx: &ExprContext,
4346 expr: HirScalarExpr,
4347 positions: &[SubscriptPosition<Aug>],
4348) -> Result<CoercibleScalarExpr, PlanError> {
4349 use CastContext::Implicit;
4350 use ScalarType::{Int64, String};
4351
4352 let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4355
4356 let mut exprs = Vec::with_capacity(subscripts.len());
4357 for s in subscripts {
4358 let subscript = plan_expr(ecx, s)?;
4359 let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4360 subscript
4361 } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4362 typeconv::to_string(ecx, subscript)
4366 } else {
4367 sql_bail!("jsonb subscript type must be coercible to integer or text");
4368 };
4369 exprs.push(subscript);
4370 }
4371
4372 let expr = expr.call_binary(
4375 HirScalarExpr::CallVariadic {
4376 func: VariadicFunc::ArrayCreate {
4377 elem_type: ScalarType::String,
4378 },
4379 exprs,
4380 },
4381 BinaryFunc::JsonbGetPath,
4382 );
4383 Ok(expr.into())
4384}
4385
4386fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4387 if !ecx.allow_subqueries {
4388 sql_bail!("{} does not allow subqueries", ecx.name)
4389 }
4390 let mut qcx = ecx.derived_query_context();
4391 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4392 Ok(expr.exists().into())
4393}
4394
4395fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4396 if !ecx.allow_subqueries {
4397 sql_bail!("{} does not allow subqueries", ecx.name)
4398 }
4399 let mut qcx = ecx.derived_query_context();
4400 let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4401 let column_types = qcx.relation_type(&expr).column_types;
4402 if column_types.len() != 1 {
4403 sql_bail!(
4404 "Expected subselect to return 1 column, got {} columns",
4405 column_types.len()
4406 );
4407 }
4408 Ok(expr.select().into())
4409}
4410
4411fn plan_list_subquery(
4412 ecx: &ExprContext,
4413 query: &Query<Aug>,
4414) -> Result<CoercibleScalarExpr, PlanError> {
4415 plan_vector_like_subquery(
4416 ecx,
4417 query,
4418 |_| false,
4419 |elem_type| VariadicFunc::ListCreate { elem_type },
4420 |order_by| AggregateFunc::ListConcat { order_by },
4421 BinaryFunc::ListListConcat,
4422 |elem_type| {
4423 HirScalarExpr::literal(
4424 Datum::empty_list(),
4425 ScalarType::List {
4426 element_type: Box::new(elem_type),
4427 custom_id: None,
4428 },
4429 )
4430 },
4431 "list",
4432 )
4433}
4434
4435fn plan_array_subquery(
4436 ecx: &ExprContext,
4437 query: &Query<Aug>,
4438) -> Result<CoercibleScalarExpr, PlanError> {
4439 plan_vector_like_subquery(
4440 ecx,
4441 query,
4442 |elem_type| {
4443 matches!(
4444 elem_type,
4445 ScalarType::Char { .. }
4446 | ScalarType::Array { .. }
4447 | ScalarType::List { .. }
4448 | ScalarType::Map { .. }
4449 )
4450 },
4451 |elem_type| VariadicFunc::ArrayCreate { elem_type },
4452 |order_by| AggregateFunc::ArrayConcat { order_by },
4453 BinaryFunc::ArrayArrayConcat,
4454 |elem_type| {
4455 HirScalarExpr::literal(Datum::empty_array(), ScalarType::Array(Box::new(elem_type)))
4456 },
4457 "[]",
4458 )
4459}
4460
4461fn plan_vector_like_subquery<F1, F2, F3, F4>(
4463 ecx: &ExprContext,
4464 query: &Query<Aug>,
4465 is_unsupported_type: F1,
4466 vector_create: F2,
4467 aggregate_concat: F3,
4468 binary_concat: BinaryFunc,
4469 empty_literal: F4,
4470 vector_type_string: &str,
4471) -> Result<CoercibleScalarExpr, PlanError>
4472where
4473 F1: Fn(&ScalarType) -> bool,
4474 F2: Fn(ScalarType) -> VariadicFunc,
4475 F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4476 F4: Fn(ScalarType) -> HirScalarExpr,
4477{
4478 if !ecx.allow_subqueries {
4479 sql_bail!("{} does not allow subqueries", ecx.name)
4480 }
4481
4482 let mut qcx = ecx.derived_query_context();
4483 let mut planned_query = plan_query(&mut qcx, query)?;
4484 if planned_query.limit.is_some() || planned_query.offset > 0 {
4485 planned_query.expr = HirRelationExpr::top_k(
4486 planned_query.expr,
4487 vec![],
4488 planned_query.order_by.clone(),
4489 planned_query.limit,
4490 planned_query.offset,
4491 planned_query.group_size_hints.limit_input_group_size,
4492 );
4493 }
4494
4495 if planned_query.project.len() != 1 {
4496 sql_bail!(
4497 "Expected subselect to return 1 column, got {} columns",
4498 planned_query.project.len()
4499 );
4500 }
4501
4502 let project_column = *planned_query.project.get(0).unwrap();
4503 let elem_type = qcx
4504 .relation_type(&planned_query.expr)
4505 .column_types
4506 .get(project_column)
4507 .cloned()
4508 .unwrap()
4509 .scalar_type();
4510
4511 if is_unsupported_type(&elem_type) {
4512 bail_unsupported!(format!(
4513 "cannot build array from subquery because return type {}{}",
4514 ecx.humanize_scalar_type(&elem_type, false),
4515 vector_type_string
4516 ));
4517 }
4518
4519 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::CallVariadic {
4522 func: vector_create(elem_type.clone()),
4523 exprs: vec![HirScalarExpr::column(project_column)],
4524 })
4525 .chain(
4526 planned_query
4527 .order_by
4528 .iter()
4529 .map(|co| HirScalarExpr::column(co.column)),
4530 )
4531 .collect();
4532
4533 let aggregation_projection = vec![0];
4537 let aggregation_order_by = planned_query
4538 .order_by
4539 .into_iter()
4540 .enumerate()
4541 .map(|(i, order)| ColumnOrder { column: i, ..order })
4542 .collect();
4543
4544 let reduced_expr = planned_query
4545 .expr
4546 .reduce(
4547 vec![],
4548 vec![AggregateExpr {
4549 func: aggregate_concat(aggregation_order_by),
4550 expr: Box::new(HirScalarExpr::CallVariadic {
4551 func: VariadicFunc::RecordCreate {
4552 field_names: iter::repeat(ColumnName::from(""))
4553 .take(aggregation_exprs.len())
4554 .collect(),
4555 },
4556 exprs: aggregation_exprs,
4557 }),
4558 distinct: false,
4559 }],
4560 None,
4561 )
4562 .project(aggregation_projection);
4563
4564 Ok(HirScalarExpr::CallBinary {
4566 func: binary_concat,
4567 expr1: Box::new(HirScalarExpr::Select(Box::new(reduced_expr))),
4568 expr2: Box::new(empty_literal(elem_type)),
4569 }
4570 .into())
4571}
4572
4573fn plan_map_subquery(
4574 ecx: &ExprContext,
4575 query: &Query<Aug>,
4576) -> Result<CoercibleScalarExpr, PlanError> {
4577 if !ecx.allow_subqueries {
4578 sql_bail!("{} does not allow subqueries", ecx.name)
4579 }
4580
4581 let mut qcx = ecx.derived_query_context();
4582 let mut query = plan_query(&mut qcx, query)?;
4583 if query.limit.is_some() || query.offset > 0 {
4584 query.expr = HirRelationExpr::top_k(
4585 query.expr,
4586 vec![],
4587 query.order_by.clone(),
4588 query.limit,
4589 query.offset,
4590 query.group_size_hints.limit_input_group_size,
4591 );
4592 }
4593 if query.project.len() != 2 {
4594 sql_bail!(
4595 "expected map subquery to return 2 columns, got {} columns",
4596 query.project.len()
4597 );
4598 }
4599
4600 let query_types = qcx.relation_type(&query.expr).column_types;
4601 let key_column = query.project[0];
4602 let key_type = query_types[key_column].clone().scalar_type();
4603 let value_column = query.project[1];
4604 let value_type = query_types[value_column].clone().scalar_type();
4605
4606 if key_type != ScalarType::String {
4607 sql_bail!("cannot build map from subquery because first column is not of type text");
4608 }
4609
4610 let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::CallVariadic {
4611 func: VariadicFunc::RecordCreate {
4612 field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4613 },
4614 exprs: vec![
4615 HirScalarExpr::column(key_column),
4616 HirScalarExpr::column(value_column),
4617 ],
4618 })
4619 .chain(
4620 query
4621 .order_by
4622 .iter()
4623 .map(|co| HirScalarExpr::column(co.column)),
4624 )
4625 .collect();
4626
4627 let expr = query
4628 .expr
4629 .reduce(
4630 vec![],
4631 vec![AggregateExpr {
4632 func: AggregateFunc::MapAgg {
4633 order_by: query
4634 .order_by
4635 .into_iter()
4636 .enumerate()
4637 .map(|(i, order)| ColumnOrder { column: i, ..order })
4638 .collect(),
4639 value_type: value_type.clone(),
4640 },
4641 expr: Box::new(HirScalarExpr::CallVariadic {
4642 func: VariadicFunc::RecordCreate {
4643 field_names: iter::repeat(ColumnName::from(""))
4644 .take(aggregation_exprs.len())
4645 .collect(),
4646 },
4647 exprs: aggregation_exprs,
4648 }),
4649 distinct: false,
4650 }],
4651 None,
4652 )
4653 .project(vec![0]);
4654
4655 let expr = HirScalarExpr::CallVariadic {
4657 func: VariadicFunc::Coalesce,
4658 exprs: vec![
4659 HirScalarExpr::Select(Box::new(expr)),
4660 HirScalarExpr::literal(
4661 Datum::empty_map(),
4662 ScalarType::Map {
4663 value_type: Box::new(value_type),
4664 custom_id: None,
4665 },
4666 ),
4667 ],
4668 };
4669
4670 Ok(expr.into())
4671}
4672
4673fn plan_collate(
4674 ecx: &ExprContext,
4675 expr: &Expr<Aug>,
4676 collation: &UnresolvedItemName,
4677) -> Result<CoercibleScalarExpr, PlanError> {
4678 if collation.0.len() == 2
4679 && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4680 && collation.0[1] == ident!("default")
4681 {
4682 plan_expr(ecx, expr)
4683 } else {
4684 bail_unsupported!("COLLATE");
4685 }
4686}
4687
4688fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4695where
4696 E: std::borrow::Borrow<Expr<Aug>>,
4697{
4698 let mut out = vec![];
4699 for expr in exprs {
4700 out.push(plan_expr(ecx, expr.borrow())?);
4701 }
4702 Ok(out)
4703}
4704
4705fn plan_array(
4707 ecx: &ExprContext,
4708 exprs: &[Expr<Aug>],
4709 type_hint: Option<&ScalarType>,
4710) -> Result<CoercibleScalarExpr, PlanError> {
4711 let mut out = vec![];
4713 for expr in exprs {
4714 out.push(match expr {
4715 Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4718 _ => plan_expr(ecx, expr)?,
4719 });
4720 }
4721
4722 let type_hint = match type_hint {
4724 Some(ScalarType::Array(elem_type)) => {
4729 let multidimensional = out.iter().any(|e| {
4730 matches!(
4731 ecx.scalar_type(e),
4732 CoercibleScalarType::Coerced(ScalarType::Array(_))
4733 )
4734 });
4735 if multidimensional {
4736 type_hint
4737 } else {
4738 Some(&**elem_type)
4739 }
4740 }
4741 Some(_) => None,
4745 None => None,
4747 };
4748
4749 let (elem_type, exprs) = if exprs.is_empty() {
4751 if let Some(elem_type) = type_hint {
4752 (elem_type.clone(), vec![])
4753 } else {
4754 sql_bail!("cannot determine type of empty array");
4755 }
4756 } else {
4757 let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4758 (ecx.scalar_type(&out[0]), out)
4759 };
4760
4761 if matches!(
4767 elem_type,
4768 ScalarType::Char { .. } | ScalarType::List { .. } | ScalarType::Map { .. }
4769 ) {
4770 bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4771 }
4772
4773 Ok(HirScalarExpr::CallVariadic {
4774 func: VariadicFunc::ArrayCreate { elem_type },
4775 exprs,
4776 }
4777 .into())
4778}
4779
4780fn plan_list(
4781 ecx: &ExprContext,
4782 exprs: &[Expr<Aug>],
4783 type_hint: Option<&ScalarType>,
4784) -> Result<CoercibleScalarExpr, PlanError> {
4785 let (elem_type, exprs) = if exprs.is_empty() {
4786 if let Some(ScalarType::List { element_type, .. }) = type_hint {
4787 (element_type.without_modifiers(), vec![])
4788 } else {
4789 sql_bail!("cannot determine type of empty list");
4790 }
4791 } else {
4792 let type_hint = match type_hint {
4793 Some(ScalarType::List { element_type, .. }) => Some(&**element_type),
4794 _ => None,
4795 };
4796
4797 let mut out = vec![];
4798 for expr in exprs {
4799 out.push(match expr {
4800 Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4803 _ => plan_expr(ecx, expr)?,
4804 });
4805 }
4806 let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4807 (ecx.scalar_type(&out[0]).without_modifiers(), out)
4808 };
4809
4810 if matches!(elem_type, ScalarType::Char { .. }) {
4811 bail_unsupported!("char list");
4812 }
4813
4814 Ok(HirScalarExpr::CallVariadic {
4815 func: VariadicFunc::ListCreate { elem_type },
4816 exprs,
4817 }
4818 .into())
4819}
4820
4821fn plan_map(
4822 ecx: &ExprContext,
4823 entries: &[MapEntry<Aug>],
4824 type_hint: Option<&ScalarType>,
4825) -> Result<CoercibleScalarExpr, PlanError> {
4826 let (value_type, exprs) = if entries.is_empty() {
4827 if let Some(ScalarType::Map { value_type, .. }) = type_hint {
4828 (value_type.without_modifiers(), vec![])
4829 } else {
4830 sql_bail!("cannot determine type of empty map");
4831 }
4832 } else {
4833 let type_hint = match type_hint {
4834 Some(ScalarType::Map { value_type, .. }) => Some(&**value_type),
4835 _ => None,
4836 };
4837
4838 let mut keys = vec![];
4839 let mut values = vec![];
4840 for MapEntry { key, value } in entries {
4841 let key = plan_expr(ecx, key)?.type_as(ecx, &ScalarType::String)?;
4842 let value = match value {
4843 Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
4846 _ => plan_expr(ecx, value)?,
4847 };
4848 keys.push(key);
4849 values.push(value);
4850 }
4851 let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
4852 let value_type = ecx.scalar_type(&values[0]).without_modifiers();
4853 let out = itertools::interleave(keys, values).collect();
4854 (value_type, out)
4855 };
4856
4857 if matches!(value_type, ScalarType::Char { .. }) {
4858 bail_unsupported!("char map");
4859 }
4860
4861 let expr = HirScalarExpr::CallVariadic {
4862 func: VariadicFunc::MapBuild { value_type },
4863 exprs,
4864 };
4865 Ok(expr.into())
4866}
4867
4868pub fn coerce_homogeneous_exprs(
4885 ecx: &ExprContext,
4886 exprs: Vec<CoercibleScalarExpr>,
4887 force_type: Option<&ScalarType>,
4888) -> Result<Vec<HirScalarExpr>, PlanError> {
4889 assert!(!exprs.is_empty());
4890
4891 let target_holder;
4892 let target = match force_type {
4893 Some(t) => t,
4894 None => {
4895 let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
4896 target_holder = typeconv::guess_best_common_type(ecx, &types)?;
4897 &target_holder
4898 }
4899 };
4900
4901 let mut out = Vec::new();
4903 for expr in exprs {
4904 let arg = typeconv::plan_coerce(ecx, expr, target)?;
4905 let ccx = match force_type {
4906 None => CastContext::Implicit,
4907 Some(_) => CastContext::Explicit,
4908 };
4909 match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
4910 Ok(expr) => out.push(expr),
4911 Err(_) => sql_bail!(
4912 "{} could not convert type {} to {}",
4913 ecx.name,
4914 ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
4915 ecx.humanize_scalar_type(target, false),
4916 ),
4917 }
4918 }
4919 Ok(out)
4920}
4921
4922pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
4925 obe: &OrderByExpr<T>,
4926 column: usize,
4927) -> ColumnOrder {
4928 let desc = !obe.asc.unwrap_or(true);
4929 ColumnOrder {
4930 column,
4931 desc,
4932 nulls_last: obe.nulls_last.unwrap_or(!desc),
4935 }
4936}
4937
4938fn plan_function_order_by(
4946 ecx: &ExprContext,
4947 order_by: &[OrderByExpr<Aug>],
4948) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
4949 let mut order_by_exprs = vec![];
4950 let mut col_orders = vec![];
4951 {
4952 for (i, obe) in order_by.iter().enumerate() {
4953 let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
4957 order_by_exprs.push(expr);
4958 col_orders.push(resolve_desc_and_nulls_last(obe, i));
4959 }
4960 }
4961 Ok((order_by_exprs, col_orders))
4962}
4963
4964fn plan_aggregate_common(
4966 ecx: &ExprContext,
4967 Function::<Aug> {
4968 name,
4969 args,
4970 filter,
4971 over: _,
4972 distinct,
4973 }: &Function<Aug>,
4974) -> Result<AggregateExpr, PlanError> {
4975 let impls = match resolve_func(ecx, name, args)? {
4990 Func::Aggregate(impls) => impls,
4991 _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
4992 };
4993
4994 let (args, order_by) = match &args {
5003 FunctionArgs::Star => (vec![], vec![]),
5004 FunctionArgs::Args { args, order_by } => {
5005 if args.is_empty() {
5006 sql_bail!(
5007 "{}(*) must be used to call a parameterless aggregate function",
5008 ecx.qcx
5009 .scx
5010 .humanize_resolved_name(name)
5011 .expect("name actually resolved")
5012 );
5013 }
5014 let args = plan_exprs(ecx, args)?;
5015 (args, order_by.clone())
5016 }
5017 };
5018
5019 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5020
5021 let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5022 if let Some(filter) = &filter {
5023 let cond = plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &ScalarType::Bool)?;
5033 let expr_typ = ecx.scalar_type(&expr);
5034 expr = HirScalarExpr::If {
5035 cond: Box::new(cond),
5036 then: Box::new(expr),
5037 els: Box::new(HirScalarExpr::literal(func.identity_datum(), expr_typ)),
5038 };
5039 }
5040
5041 let mut seen_outer = false;
5042 let mut seen_inner = false;
5043 #[allow(deprecated)]
5044 expr.visit_columns(0, &mut |depth, col| {
5045 if depth == 0 && col.level == 0 {
5046 seen_inner = true;
5047 } else if col.level > depth {
5048 seen_outer = true;
5049 }
5050 });
5051 if seen_outer && !seen_inner {
5052 bail_unsupported!(
5053 3720,
5054 "aggregate functions that refer exclusively to outer columns"
5055 );
5056 }
5057
5058 if func.is_order_sensitive() {
5061 let field_names = iter::repeat(ColumnName::from(""))
5062 .take(1 + order_by_exprs.len())
5063 .collect();
5064 let mut exprs = vec![expr];
5065 exprs.extend(order_by_exprs);
5066 expr = HirScalarExpr::CallVariadic {
5067 func: VariadicFunc::RecordCreate { field_names },
5068 exprs,
5069 };
5070 }
5071
5072 Ok(AggregateExpr {
5073 func,
5074 expr: Box::new(expr),
5075 distinct: *distinct,
5076 })
5077}
5078
5079fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5080 let mut names = names.to_vec();
5081 let col_name = normalize::column_name(names.pop().unwrap());
5082
5083 if !names.is_empty() {
5085 let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5086 let i = ecx
5087 .scope
5088 .resolve_table_column(&ecx.qcx.outer_scopes, &table_name, &col_name)?;
5089 return Ok(HirScalarExpr::Column(i));
5090 }
5091
5092 let similar_names = match ecx.scope.resolve_column(&ecx.qcx.outer_scopes, &col_name) {
5095 Ok(i) => return Ok(HirScalarExpr::Column(i)),
5096 Err(PlanError::UnknownColumn { similar, .. }) => similar,
5097 Err(e) => return Err(e),
5098 };
5099
5100 let items = ecx.scope.items_from_table(
5103 &ecx.qcx.outer_scopes,
5104 &PartialItemName {
5105 database: None,
5106 schema: None,
5107 item: col_name.as_str().to_owned(),
5108 },
5109 )?;
5110 match items.as_slice() {
5111 [] => Err(PlanError::UnknownColumn {
5113 table: None,
5114 column: col_name,
5115 similar: similar_names,
5116 }),
5117 [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::Column(*column)),
5122 _ => {
5125 let mut has_exists_column = None;
5126 let (exprs, field_names): (Vec<_>, Vec<_>) = items
5127 .into_iter()
5128 .filter_map(|(column, item)| {
5129 if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5130 has_exists_column = Some(column);
5131 None
5132 } else {
5133 let expr = HirScalarExpr::Column(column);
5134 let name = item.column_name.clone();
5135 Some((expr, name))
5136 }
5137 })
5138 .unzip();
5139 let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5141 exprs.into_element()
5142 } else {
5143 HirScalarExpr::CallVariadic {
5144 func: VariadicFunc::RecordCreate { field_names },
5145 exprs,
5146 }
5147 };
5148 if let Some(has_exists_column) = has_exists_column {
5149 Ok(HirScalarExpr::If {
5150 cond: Box::new(HirScalarExpr::CallUnary {
5151 func: UnaryFunc::IsNull(mz_expr::func::IsNull),
5152 expr: Box::new(HirScalarExpr::Column(has_exists_column)),
5153 }),
5154 then: Box::new(HirScalarExpr::literal_null(ecx.scalar_type(&expr))),
5155 els: Box::new(expr),
5156 })
5157 } else {
5158 Ok(expr)
5159 }
5160 }
5161 }
5162}
5163
5164fn plan_op(
5165 ecx: &ExprContext,
5166 op: &str,
5167 expr1: &Expr<Aug>,
5168 expr2: Option<&Expr<Aug>>,
5169) -> Result<HirScalarExpr, PlanError> {
5170 let impls = func::resolve_op(op)?;
5171 let args = match expr2 {
5172 None => plan_exprs(ecx, &[expr1])?,
5173 Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5174 };
5175 func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5176}
5177
5178fn plan_function<'a>(
5179 ecx: &ExprContext,
5180 f @ Function {
5181 name,
5182 args,
5183 filter,
5184 over,
5185 distinct,
5186 }: &'a Function<Aug>,
5187) -> Result<HirScalarExpr, PlanError> {
5188 let impls = match resolve_func(ecx, name, args)? {
5189 Func::Table(_) => {
5190 sql_bail!(
5191 "table functions are not allowed in {} (function {})",
5192 ecx.name,
5193 name
5194 );
5195 }
5196 Func::Scalar(impls) => {
5197 if over.is_some() {
5198 sql_bail!(
5199 "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5200 );
5201 }
5202 impls
5203 }
5204 Func::ScalarWindow(impls) => {
5205 let (
5206 ignore_nulls,
5207 order_by_exprs,
5208 col_orders,
5209 _window_frame,
5210 partition_by,
5211 scalar_args,
5212 ) = plan_window_function_non_aggr(ecx, f)?;
5213
5214 if !scalar_args.is_empty() {
5218 if let ResolvedItemName::Item {
5219 full_name: FullItemName { item, .. },
5220 ..
5221 } = name
5222 {
5223 sql_bail!(
5224 "function {} has 0 parameters, but was called with {}",
5225 item,
5226 scalar_args.len()
5227 );
5228 }
5229 }
5230
5231 let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5236
5237 if ignore_nulls {
5238 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5241 }
5242
5243 return Ok(HirScalarExpr::Windowing(WindowExpr {
5244 func: WindowExprType::Scalar(ScalarWindowExpr {
5245 func,
5246 order_by: col_orders,
5247 }),
5248 partition_by,
5249 order_by: order_by_exprs,
5250 }));
5251 }
5252 Func::ValueWindow(impls) => {
5253 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5254 plan_window_function_non_aggr(ecx, f)?;
5255
5256 let (args_encoded, func) =
5257 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5258
5259 if ignore_nulls {
5260 match func {
5261 ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5262 _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5263 }
5264 }
5265
5266 return Ok(HirScalarExpr::Windowing(WindowExpr {
5267 func: WindowExprType::Value(ValueWindowExpr {
5268 func,
5269 args: Box::new(args_encoded),
5270 order_by: col_orders,
5271 window_frame,
5272 ignore_nulls, }),
5274 partition_by,
5275 order_by: order_by_exprs,
5276 }));
5277 }
5278 Func::Aggregate(_) => {
5279 if f.over.is_none() {
5280 if ecx.allow_aggregates {
5282 sql_bail!(
5285 "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5286 name,
5287 );
5288 } else {
5289 sql_bail!(
5292 "aggregate functions are not allowed in {} (function {})",
5293 ecx.name,
5294 name
5295 );
5296 }
5297 } else {
5298 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5299 plan_window_function_common(ecx, &f.name, &f.over)?;
5300
5301 match (&window_frame.start_bound, &window_frame.end_bound) {
5303 (
5304 mz_expr::WindowFrameBound::UnboundedPreceding,
5305 mz_expr::WindowFrameBound::OffsetPreceding(..),
5306 )
5307 | (
5308 mz_expr::WindowFrameBound::UnboundedPreceding,
5309 mz_expr::WindowFrameBound::OffsetFollowing(..),
5310 )
5311 | (
5312 mz_expr::WindowFrameBound::OffsetPreceding(..),
5313 mz_expr::WindowFrameBound::UnboundedFollowing,
5314 )
5315 | (
5316 mz_expr::WindowFrameBound::OffsetFollowing(..),
5317 mz_expr::WindowFrameBound::UnboundedFollowing,
5318 ) => bail_unsupported!("mixed unbounded - offset frames"),
5319 (_, _) => {} }
5321
5322 if ignore_nulls {
5323 bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5327 }
5328
5329 let aggregate_expr = plan_aggregate_common(ecx, f)?;
5330
5331 if aggregate_expr.distinct {
5332 bail_unsupported!("DISTINCT in window aggregates");
5334 }
5335
5336 return Ok(HirScalarExpr::Windowing(WindowExpr {
5337 func: WindowExprType::Aggregate(AggregateWindowExpr {
5338 aggregate_expr,
5339 order_by: col_orders,
5340 window_frame,
5341 }),
5342 partition_by,
5343 order_by: order_by_exprs,
5344 }));
5345 }
5346 }
5347 };
5348
5349 if over.is_some() {
5350 unreachable!("If there is an OVER clause, we should have returned already above.");
5351 }
5352
5353 if *distinct {
5354 sql_bail!(
5355 "DISTINCT specified, but {} is not an aggregate function",
5356 ecx.qcx
5357 .scx
5358 .humanize_resolved_name(name)
5359 .expect("already resolved")
5360 );
5361 }
5362 if filter.is_some() {
5363 sql_bail!(
5364 "FILTER specified, but {} is not an aggregate function",
5365 ecx.qcx
5366 .scx
5367 .humanize_resolved_name(name)
5368 .expect("already resolved")
5369 );
5370 }
5371
5372 let scalar_args = match &args {
5373 FunctionArgs::Star => {
5374 sql_bail!(
5375 "* argument is invalid with non-aggregate function {}",
5376 ecx.qcx
5377 .scx
5378 .humanize_resolved_name(name)
5379 .expect("already resolved")
5380 )
5381 }
5382 FunctionArgs::Args { args, order_by } => {
5383 if !order_by.is_empty() {
5384 sql_bail!(
5385 "ORDER BY specified, but {} is not an aggregate function",
5386 ecx.qcx
5387 .scx
5388 .humanize_resolved_name(name)
5389 .expect("already resolved")
5390 );
5391 }
5392 plan_exprs(ecx, args)?
5393 }
5394 };
5395
5396 func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5397}
5398
5399pub const IGNORE_NULLS_ERROR_MSG: &str =
5400 "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5401
5402pub fn resolve_func(
5406 ecx: &ExprContext,
5407 name: &ResolvedItemName,
5408 args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5409) -> Result<&'static Func, PlanError> {
5410 if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5411 if let Ok(f) = i.func() {
5412 return Ok(f);
5413 }
5414 }
5415
5416 let cexprs = match args {
5419 mz_sql_parser::ast::FunctionArgs::Star => vec![],
5420 mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5421 if !order_by.is_empty() {
5422 sql_bail!(
5423 "ORDER BY specified, but {} is not an aggregate function",
5424 name
5425 );
5426 }
5427 plan_exprs(ecx, args)?
5428 }
5429 };
5430
5431 let arg_types: Vec<_> = cexprs
5432 .into_iter()
5433 .map(|ty| match ecx.scalar_type(&ty) {
5434 CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5435 CoercibleScalarType::Record(_) => "record".to_string(),
5436 CoercibleScalarType::Uncoerced => "unknown".to_string(),
5437 })
5438 .collect();
5439
5440 Err(PlanError::UnknownFunction {
5441 name: name.to_string(),
5442 arg_types,
5443 })
5444}
5445
5446fn plan_is_expr<'a>(
5447 ecx: &ExprContext,
5448 expr: &'a Expr<Aug>,
5449 construct: &IsExprConstruct<Aug>,
5450 not: bool,
5451) -> Result<HirScalarExpr, PlanError> {
5452 let expr = plan_expr(ecx, expr)?;
5453 let mut expr = match construct {
5454 IsExprConstruct::Null => {
5455 let expr = expr.type_as_any(ecx)?;
5460 expr.call_is_null()
5461 }
5462 IsExprConstruct::Unknown => {
5463 let expr = expr.type_as(ecx, &ScalarType::Bool)?;
5464 expr.call_is_null()
5465 }
5466 IsExprConstruct::True => {
5467 let expr = expr.type_as(ecx, &ScalarType::Bool)?;
5468 expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5469 }
5470 IsExprConstruct::False => {
5471 let expr = expr.type_as(ecx, &ScalarType::Bool)?;
5472 expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5473 }
5474 IsExprConstruct::DistinctFrom(expr2) => {
5475 let expr1 = expr.type_as_any(ecx)?;
5476 let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5477 let term1 = HirScalarExpr::variadic_or(vec![
5484 expr1.clone().call_binary(expr2.clone(), BinaryFunc::NotEq),
5485 expr1.clone().call_is_null(),
5486 expr2.clone().call_is_null(),
5487 ]);
5488 let term2 = HirScalarExpr::variadic_or(vec![
5489 expr1.call_is_null().not(),
5490 expr2.call_is_null().not(),
5491 ]);
5492 term1.and(term2)
5493 }
5494 };
5495 if not {
5496 expr = expr.not();
5497 }
5498 Ok(expr)
5499}
5500
5501fn plan_case<'a>(
5502 ecx: &ExprContext,
5503 operand: &'a Option<Box<Expr<Aug>>>,
5504 conditions: &'a [Expr<Aug>],
5505 results: &'a [Expr<Aug>],
5506 else_result: &'a Option<Box<Expr<Aug>>>,
5507) -> Result<HirScalarExpr, PlanError> {
5508 let mut cond_exprs = Vec::new();
5509 let mut result_exprs = Vec::new();
5510 for (c, r) in conditions.iter().zip(results) {
5511 let c = match operand {
5512 Some(operand) => operand.clone().equals(c.clone()),
5513 None => c.clone(),
5514 };
5515 let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &ScalarType::Bool)?;
5516 cond_exprs.push(cexpr);
5517 result_exprs.push(r);
5518 }
5519 result_exprs.push(match else_result {
5520 Some(else_result) => else_result,
5521 None => &Expr::Value(Value::Null),
5522 });
5523 let mut result_exprs = coerce_homogeneous_exprs(
5524 &ecx.with_name("CASE"),
5525 plan_exprs(ecx, &result_exprs)?,
5526 None,
5527 )?;
5528 let mut expr = result_exprs.pop().unwrap();
5529 assert_eq!(cond_exprs.len(), result_exprs.len());
5530 for (cexpr, rexpr) in cond_exprs.into_iter().zip(result_exprs).rev() {
5531 expr = HirScalarExpr::If {
5532 cond: Box::new(cexpr),
5533 then: Box::new(rexpr),
5534 els: Box::new(expr),
5535 }
5536 }
5537 Ok(expr)
5538}
5539
5540fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5541 let (datum, scalar_type) = match l {
5542 Value::Number(s) => {
5543 let d = strconv::parse_numeric(s.as_str())?;
5544 if !s.contains(&['E', '.'][..]) {
5545 if let Ok(n) = d.0.try_into() {
5547 (Datum::Int32(n), ScalarType::Int32)
5548 } else if let Ok(n) = d.0.try_into() {
5549 (Datum::Int64(n), ScalarType::Int64)
5550 } else {
5551 (Datum::Numeric(d), ScalarType::Numeric { max_scale: None })
5552 }
5553 } else {
5554 (Datum::Numeric(d), ScalarType::Numeric { max_scale: None })
5555 }
5556 }
5557 Value::HexString(_) => bail_unsupported!("hex string literals"),
5558 Value::Boolean(b) => match b {
5559 false => (Datum::False, ScalarType::Bool),
5560 true => (Datum::True, ScalarType::Bool),
5561 },
5562 Value::Interval(i) => {
5563 let i = literal::plan_interval(i)?;
5564 (Datum::Interval(i), ScalarType::Interval)
5565 }
5566 Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5567 Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5568 };
5569 let expr = HirScalarExpr::literal(datum, scalar_type);
5570 Ok(expr.into())
5571}
5572
5573fn plan_window_function_non_aggr<'a>(
5576 ecx: &ExprContext,
5577 Function {
5578 name,
5579 args,
5580 filter,
5581 over,
5582 distinct,
5583 }: &'a Function<Aug>,
5584) -> Result<
5585 (
5586 bool,
5587 Vec<HirScalarExpr>,
5588 Vec<ColumnOrder>,
5589 mz_expr::WindowFrame,
5590 Vec<HirScalarExpr>,
5591 Vec<CoercibleScalarExpr>,
5592 ),
5593 PlanError,
5594> {
5595 let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5596 plan_window_function_common(ecx, name, over)?;
5597
5598 if *distinct {
5599 sql_bail!(
5600 "DISTINCT specified, but {} is not an aggregate function",
5601 name
5602 );
5603 }
5604
5605 if filter.is_some() {
5606 bail_unsupported!("FILTER in non-aggregate window functions");
5607 }
5608
5609 let scalar_args = match &args {
5610 FunctionArgs::Star => {
5611 sql_bail!("* argument is invalid with non-aggregate function {}", name)
5612 }
5613 FunctionArgs::Args { args, order_by } => {
5614 if !order_by.is_empty() {
5615 sql_bail!(
5616 "ORDER BY specified, but {} is not an aggregate function",
5617 name
5618 );
5619 }
5620 plan_exprs(ecx, args)?
5621 }
5622 };
5623
5624 Ok((
5625 ignore_nulls,
5626 order_by_exprs,
5627 col_orders,
5628 window_frame,
5629 partition,
5630 scalar_args,
5631 ))
5632}
5633
5634fn plan_window_function_common(
5636 ecx: &ExprContext,
5637 name: &<Aug as AstInfo>::ItemName,
5638 over: &Option<WindowSpec<Aug>>,
5639) -> Result<
5640 (
5641 bool,
5642 Vec<HirScalarExpr>,
5643 Vec<ColumnOrder>,
5644 mz_expr::WindowFrame,
5645 Vec<HirScalarExpr>,
5646 ),
5647 PlanError,
5648> {
5649 if !ecx.allow_windows {
5650 sql_bail!(
5651 "window functions are not allowed in {} (function {})",
5652 ecx.name,
5653 name
5654 );
5655 }
5656
5657 let window_spec = match over.as_ref() {
5658 Some(over) => over,
5659 None => sql_bail!("window function {} requires an OVER clause", name),
5660 };
5661 if window_spec.ignore_nulls && window_spec.respect_nulls {
5662 sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5663 }
5664 let window_frame = match window_spec.window_frame.as_ref() {
5665 Some(frame) => plan_window_frame(frame)?,
5666 None => mz_expr::WindowFrame::default(),
5667 };
5668 let mut partition = Vec::new();
5669 for expr in &window_spec.partition_by {
5670 partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5671 }
5672
5673 let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5674
5675 Ok((
5676 window_spec.ignore_nulls,
5677 order_by_exprs,
5678 col_orders,
5679 window_frame,
5680 partition,
5681 ))
5682}
5683
5684fn plan_window_frame(
5685 WindowFrame {
5686 units,
5687 start_bound,
5688 end_bound,
5689 }: &WindowFrame,
5690) -> Result<mz_expr::WindowFrame, PlanError> {
5691 use mz_expr::WindowFrameBound::*;
5692 let units = window_frame_unit_ast_to_expr(units)?;
5693 let start_bound = window_frame_bound_ast_to_expr(start_bound);
5694 let end_bound = end_bound
5695 .as_ref()
5696 .map(window_frame_bound_ast_to_expr)
5697 .unwrap_or(CurrentRow);
5698
5699 match (&start_bound, &end_bound) {
5701 (UnboundedFollowing, _) => {
5703 sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5704 }
5705 (_, UnboundedPreceding) => {
5707 sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5708 }
5709 (CurrentRow, OffsetPreceding(_)) => {
5711 sql_bail!("frame starting from current row cannot have preceding rows")
5712 }
5713 (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5714 sql_bail!("frame starting from following row cannot have preceding rows")
5715 }
5716 (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5719 if *o1 > 1000000 || *o2 > 1000000 {
5723 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5724 }
5725 }
5726 (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5727 if *o1 > 1000000 || *o2 > 1000000 {
5728 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5729 }
5730 }
5731 (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5732 if *o1 > 1000000 || *o2 > 1000000 {
5733 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5734 }
5735 }
5736 (OffsetPreceding(o), CurrentRow) => {
5737 if *o > 1000000 {
5738 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5739 }
5740 }
5741 (CurrentRow, OffsetFollowing(o)) => {
5742 if *o > 1000000 {
5743 sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5744 }
5745 }
5746 (_, _) => (),
5748 }
5749
5750 if units == mz_expr::WindowFrameUnits::Range
5753 && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5754 {
5755 bail_unsupported!("RANGE in non-default window frames")
5756 }
5757
5758 let frame = mz_expr::WindowFrame {
5759 units,
5760 start_bound,
5761 end_bound,
5762 };
5763 Ok(frame)
5764}
5765
5766fn window_frame_unit_ast_to_expr(
5767 unit: &WindowFrameUnits,
5768) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5769 match unit {
5770 WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5771 WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5772 WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5773 }
5774}
5775
5776fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5777 match bound {
5778 WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5779 WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5780 WindowFrameBound::Preceding(Some(offset)) => {
5781 mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5782 }
5783 WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5784 WindowFrameBound::Following(Some(offset)) => {
5785 mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5786 }
5787 }
5788}
5789
5790pub fn scalar_type_from_sql(
5791 scx: &StatementContext,
5792 data_type: &ResolvedDataType,
5793) -> Result<ScalarType, PlanError> {
5794 match data_type {
5795 ResolvedDataType::AnonymousList(elem_type) => {
5796 let elem_type = scalar_type_from_sql(scx, elem_type)?;
5797 if matches!(elem_type, ScalarType::Char { .. }) {
5798 bail_unsupported!("char list");
5799 }
5800 Ok(ScalarType::List {
5801 element_type: Box::new(elem_type),
5802 custom_id: None,
5803 })
5804 }
5805 ResolvedDataType::AnonymousMap {
5806 key_type,
5807 value_type,
5808 } => {
5809 match scalar_type_from_sql(scx, key_type)? {
5810 ScalarType::String => {}
5811 other => sql_bail!(
5812 "map key type must be {}, got {}",
5813 scx.humanize_scalar_type(&ScalarType::String, false),
5814 scx.humanize_scalar_type(&other, false)
5815 ),
5816 }
5817 Ok(ScalarType::Map {
5818 value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5819 custom_id: None,
5820 })
5821 }
5822 ResolvedDataType::Named { id, modifiers, .. } => {
5823 scalar_type_from_catalog(scx.catalog, *id, modifiers)
5824 }
5825 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
5826 }
5827}
5828
5829pub fn scalar_type_from_catalog(
5830 catalog: &dyn SessionCatalog,
5831 id: CatalogItemId,
5832 modifiers: &[i64],
5833) -> Result<ScalarType, PlanError> {
5834 let entry = catalog.get_item(&id);
5835 let type_details = match entry.type_details() {
5836 Some(type_details) => type_details,
5837 None => {
5838 sql_bail!(
5841 "internal error: {} does not refer to a type",
5842 catalog.resolve_full_name(entry.name()).to_string().quoted()
5843 );
5844 }
5845 };
5846 match &type_details.typ {
5847 CatalogType::Numeric => {
5848 let mut modifiers = modifiers.iter().fuse();
5849 let precision = match modifiers.next() {
5850 Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
5851 sql_bail!(
5852 "precision for type numeric must be between 1 and {}",
5853 NUMERIC_DATUM_MAX_PRECISION,
5854 );
5855 }
5856 Some(p) => Some(*p),
5857 None => None,
5858 };
5859 let scale = match modifiers.next() {
5860 Some(scale) => {
5861 if let Some(precision) = precision {
5862 if *scale > precision {
5863 sql_bail!(
5864 "scale for type numeric must be between 0 and precision {}",
5865 precision
5866 );
5867 }
5868 }
5869 Some(NumericMaxScale::try_from(*scale)?)
5870 }
5871 None => None,
5872 };
5873 if modifiers.next().is_some() {
5874 sql_bail!("type numeric supports at most two type modifiers");
5875 }
5876 Ok(ScalarType::Numeric { max_scale: scale })
5877 }
5878 CatalogType::Char => {
5879 let mut modifiers = modifiers.iter().fuse();
5880 let length = match modifiers.next() {
5881 Some(l) => Some(CharLength::try_from(*l)?),
5882 None => Some(CharLength::ONE),
5883 };
5884 if modifiers.next().is_some() {
5885 sql_bail!("type character supports at most one type modifier");
5886 }
5887 Ok(ScalarType::Char { length })
5888 }
5889 CatalogType::VarChar => {
5890 let mut modifiers = modifiers.iter().fuse();
5891 let length = match modifiers.next() {
5892 Some(l) => Some(VarCharMaxLength::try_from(*l)?),
5893 None => None,
5894 };
5895 if modifiers.next().is_some() {
5896 sql_bail!("type character varying supports at most one type modifier");
5897 }
5898 Ok(ScalarType::VarChar { max_length: length })
5899 }
5900 CatalogType::Timestamp => {
5901 let mut modifiers = modifiers.iter().fuse();
5902 let precision = match modifiers.next() {
5903 Some(p) => Some(TimestampPrecision::try_from(*p)?),
5904 None => None,
5905 };
5906 if modifiers.next().is_some() {
5907 sql_bail!("type timestamp supports at most one type modifier");
5908 }
5909 Ok(ScalarType::Timestamp { precision })
5910 }
5911 CatalogType::TimestampTz => {
5912 let mut modifiers = modifiers.iter().fuse();
5913 let precision = match modifiers.next() {
5914 Some(p) => Some(TimestampPrecision::try_from(*p)?),
5915 None => None,
5916 };
5917 if modifiers.next().is_some() {
5918 sql_bail!("type timestamp with time zone supports at most one type modifier");
5919 }
5920 Ok(ScalarType::TimestampTz { precision })
5921 }
5922 t => {
5923 if !modifiers.is_empty() {
5924 sql_bail!(
5925 "{} does not support type modifiers",
5926 catalog.resolve_full_name(entry.name()).to_string()
5927 );
5928 }
5929 match t {
5930 CatalogType::Array {
5931 element_reference: element_id,
5932 } => Ok(ScalarType::Array(Box::new(scalar_type_from_catalog(
5933 catalog,
5934 *element_id,
5935 modifiers,
5936 )?))),
5937 CatalogType::List {
5938 element_reference: element_id,
5939 element_modifiers,
5940 } => Ok(ScalarType::List {
5941 element_type: Box::new(scalar_type_from_catalog(
5942 catalog,
5943 *element_id,
5944 element_modifiers,
5945 )?),
5946 custom_id: Some(id),
5947 }),
5948 CatalogType::Map {
5949 key_reference: _,
5950 key_modifiers: _,
5951 value_reference: value_id,
5952 value_modifiers,
5953 } => Ok(ScalarType::Map {
5954 value_type: Box::new(scalar_type_from_catalog(
5955 catalog,
5956 *value_id,
5957 value_modifiers,
5958 )?),
5959 custom_id: Some(id),
5960 }),
5961 CatalogType::Range {
5962 element_reference: element_id,
5963 } => Ok(ScalarType::Range {
5964 element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
5965 }),
5966 CatalogType::Record { fields } => {
5967 let scalars: Box<[(ColumnName, ColumnType)]> = fields
5968 .iter()
5969 .map(|f| {
5970 let scalar_type = scalar_type_from_catalog(
5971 catalog,
5972 f.type_reference,
5973 &f.type_modifiers,
5974 )?;
5975 Ok((
5976 f.name.clone(),
5977 ColumnType {
5978 scalar_type,
5979 nullable: true,
5980 },
5981 ))
5982 })
5983 .collect::<Result<Box<_>, PlanError>>()?;
5984 Ok(ScalarType::Record {
5985 fields: scalars,
5986 custom_id: Some(id),
5987 })
5988 }
5989 CatalogType::AclItem => Ok(ScalarType::AclItem),
5990 CatalogType::Bool => Ok(ScalarType::Bool),
5991 CatalogType::Bytes => Ok(ScalarType::Bytes),
5992 CatalogType::Date => Ok(ScalarType::Date),
5993 CatalogType::Float32 => Ok(ScalarType::Float32),
5994 CatalogType::Float64 => Ok(ScalarType::Float64),
5995 CatalogType::Int16 => Ok(ScalarType::Int16),
5996 CatalogType::Int32 => Ok(ScalarType::Int32),
5997 CatalogType::Int64 => Ok(ScalarType::Int64),
5998 CatalogType::UInt16 => Ok(ScalarType::UInt16),
5999 CatalogType::UInt32 => Ok(ScalarType::UInt32),
6000 CatalogType::UInt64 => Ok(ScalarType::UInt64),
6001 CatalogType::MzTimestamp => Ok(ScalarType::MzTimestamp),
6002 CatalogType::Interval => Ok(ScalarType::Interval),
6003 CatalogType::Jsonb => Ok(ScalarType::Jsonb),
6004 CatalogType::Oid => Ok(ScalarType::Oid),
6005 CatalogType::PgLegacyChar => Ok(ScalarType::PgLegacyChar),
6006 CatalogType::PgLegacyName => Ok(ScalarType::PgLegacyName),
6007 CatalogType::Pseudo => {
6008 sql_bail!(
6009 "cannot reference pseudo type {}",
6010 catalog.resolve_full_name(entry.name()).to_string()
6011 )
6012 }
6013 CatalogType::RegClass => Ok(ScalarType::RegClass),
6014 CatalogType::RegProc => Ok(ScalarType::RegProc),
6015 CatalogType::RegType => Ok(ScalarType::RegType),
6016 CatalogType::String => Ok(ScalarType::String),
6017 CatalogType::Time => Ok(ScalarType::Time),
6018 CatalogType::Uuid => Ok(ScalarType::Uuid),
6019 CatalogType::Int2Vector => Ok(ScalarType::Int2Vector),
6020 CatalogType::MzAclItem => Ok(ScalarType::MzAclItem),
6021 CatalogType::Numeric => unreachable!("handled above"),
6022 CatalogType::Char => unreachable!("handled above"),
6023 CatalogType::VarChar => unreachable!("handled above"),
6024 CatalogType::Timestamp => unreachable!("handled above"),
6025 CatalogType::TimestampTz => unreachable!("handled above"),
6026 }
6027 }
6028 }
6029}
6030
6031struct AggregateTableFuncVisitor<'a> {
6034 scx: &'a StatementContext<'a>,
6035 aggs: Vec<Function<Aug>>,
6036 within_aggregate: bool,
6037 tables: BTreeMap<Function<Aug>, String>,
6038 table_disallowed_context: Vec<&'static str>,
6039 in_select_item: bool,
6040 err: Option<PlanError>,
6041}
6042
6043impl<'a> AggregateTableFuncVisitor<'a> {
6044 fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6045 AggregateTableFuncVisitor {
6046 scx,
6047 aggs: Vec::new(),
6048 within_aggregate: false,
6049 tables: BTreeMap::new(),
6050 table_disallowed_context: Vec::new(),
6051 in_select_item: false,
6052 err: None,
6053 }
6054 }
6055
6056 fn into_result(
6057 self,
6058 ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6059 match self.err {
6060 Some(err) => Err(err),
6061 None => {
6062 let mut seen = BTreeSet::new();
6065 let aggs = self
6066 .aggs
6067 .into_iter()
6068 .filter(move |agg| seen.insert(agg.clone()))
6069 .collect();
6070 Ok((aggs, self.tables))
6071 }
6072 }
6073 }
6074}
6075
6076impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6077 fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6078 let item = match self.scx.get_item_by_resolved_name(&func.name) {
6079 Ok(i) => i,
6080 Err(_) => return,
6082 };
6083
6084 match item.func() {
6085 Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6088 if self.within_aggregate {
6089 self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6090 return;
6091 }
6092 self.aggs.push(func.clone());
6093 let Function {
6094 name: _,
6095 args,
6096 filter,
6097 over: _,
6098 distinct: _,
6099 } = func;
6100 if let Some(filter) = filter {
6101 self.visit_expr_mut(filter);
6102 }
6103 let old_within_aggregate = self.within_aggregate;
6104 self.within_aggregate = true;
6105 self.table_disallowed_context
6106 .push("aggregate function calls");
6107
6108 self.visit_function_args_mut(args);
6109
6110 self.within_aggregate = old_within_aggregate;
6111 self.table_disallowed_context.pop();
6112 }
6113 Ok(Func::Table { .. }) => {
6114 self.table_disallowed_context.push("other table functions");
6115 visit_mut::visit_function_mut(self, func);
6116 self.table_disallowed_context.pop();
6117 }
6118 _ => visit_mut::visit_function_mut(self, func),
6119 }
6120 }
6121
6122 fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6123 }
6125
6126 fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6127 let (disallowed_context, func) = match expr {
6128 Expr::Case { .. } => (Some("CASE"), None),
6129 Expr::HomogenizingFunction {
6130 function: HomogenizingFunction::Coalesce,
6131 ..
6132 } => (Some("COALESCE"), None),
6133 Expr::Function(func) if self.in_select_item => {
6134 let mut table_func = None;
6137 if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6138 if let Ok(Func::Table { .. }) = item.func() {
6139 if let Some(context) = self.table_disallowed_context.last() {
6140 self.err = Some(sql_err!(
6141 "table functions are not allowed in {} (function {})",
6142 context,
6143 func.name
6144 ));
6145 return;
6146 }
6147 table_func = Some(func.clone());
6148 }
6149 }
6150 (None, table_func)
6153 }
6154 _ => (None, None),
6155 };
6156 if let Some(func) = func {
6157 visit_mut::visit_expr_mut(self, expr);
6159 if let Function {
6161 name: _,
6162 args: _,
6163 filter: None,
6164 over: None,
6165 distinct: false,
6166 } = &func
6167 {
6168 let id = self
6170 .tables
6171 .entry(func)
6172 .or_insert_with(|| format!("table_func_{}", Uuid::new_v4()));
6173 *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6176 }
6177 }
6178 if let Some(context) = disallowed_context {
6179 self.table_disallowed_context.push(context);
6180 }
6181
6182 visit_mut::visit_expr_mut(self, expr);
6183
6184 if disallowed_context.is_some() {
6185 self.table_disallowed_context.pop();
6186 }
6187 }
6188
6189 fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6190 let old = self.in_select_item;
6191 self.in_select_item = true;
6192 visit_mut::visit_select_item_mut(self, si);
6193 self.in_select_item = old;
6194 }
6195}
6196
6197#[derive(Default)]
6198struct WindowFuncCollector {
6199 window_funcs: Vec<Expr<Aug>>,
6200}
6201
6202impl WindowFuncCollector {
6203 fn into_result(self) -> Vec<Expr<Aug>> {
6204 let mut seen = BTreeSet::new();
6206 let window_funcs_dedupped = self
6207 .window_funcs
6208 .into_iter()
6209 .filter(move |expr| seen.insert(expr.clone()))
6210 .rev()
6213 .collect();
6214 window_funcs_dedupped
6215 }
6216}
6217
6218impl Visit<'_, Aug> for WindowFuncCollector {
6219 fn visit_expr(&mut self, expr: &Expr<Aug>) {
6220 match expr {
6221 Expr::Function(func) => {
6222 if func.over.is_some() {
6223 self.window_funcs.push(expr.clone());
6224 }
6225 }
6226 _ => (),
6227 }
6228 visit::visit_expr(self, expr);
6229 }
6230
6231 fn visit_query(&mut self, _query: &Query<Aug>) {
6232 }
6234}
6235
6236#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6238pub enum QueryLifetime {
6239 OneShot,
6241 Index,
6243 MaterializedView,
6245 Subscribe,
6247 View,
6249 Source,
6251}
6252
6253impl QueryLifetime {
6254 pub fn is_one_shot(&self) -> bool {
6258 let result = match self {
6259 QueryLifetime::OneShot => true,
6260 QueryLifetime::Index => false,
6261 QueryLifetime::MaterializedView => false,
6262 QueryLifetime::Subscribe => false,
6263 QueryLifetime::View => false,
6264 QueryLifetime::Source => false,
6265 };
6266 assert_eq!(!result, self.is_maintained());
6267 result
6268 }
6269
6270 pub fn is_maintained(&self) -> bool {
6273 match self {
6274 QueryLifetime::OneShot => false,
6275 QueryLifetime::Index => true,
6276 QueryLifetime::MaterializedView => true,
6277 QueryLifetime::Subscribe => true,
6278 QueryLifetime::View => true,
6279 QueryLifetime::Source => true,
6280 }
6281 }
6282
6283 pub fn allow_show(&self) -> bool {
6285 match self {
6286 QueryLifetime::OneShot => true,
6287 QueryLifetime::Index => false,
6288 QueryLifetime::MaterializedView => false,
6289 QueryLifetime::Subscribe => true, QueryLifetime::View => false,
6291 QueryLifetime::Source => false,
6292 }
6293 }
6294}
6295
6296#[derive(Debug, Clone)]
6298pub struct CteDesc {
6299 pub name: String,
6300 pub desc: RelationDesc,
6301}
6302
6303#[derive(Debug, Clone)]
6305pub struct QueryContext<'a> {
6306 pub scx: &'a StatementContext<'a>,
6308 pub lifetime: QueryLifetime,
6310 pub outer_scopes: Vec<Scope>,
6312 pub outer_relation_types: Vec<RelationType>,
6314 pub ctes: BTreeMap<LocalId, CteDesc>,
6316 pub recursion_guard: RecursionGuard,
6317}
6318
6319impl CheckedRecursion for QueryContext<'_> {
6320 fn recursion_guard(&self) -> &RecursionGuard {
6321 &self.recursion_guard
6322 }
6323}
6324
6325impl<'a> QueryContext<'a> {
6326 pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6327 QueryContext {
6328 scx,
6329 lifetime,
6330 outer_scopes: vec![],
6331 outer_relation_types: vec![],
6332 ctes: BTreeMap::new(),
6333 recursion_guard: RecursionGuard::with_limit(1024), }
6335 }
6336
6337 fn relation_type(&self, expr: &HirRelationExpr) -> RelationType {
6338 expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6339 }
6340
6341 fn derived_context(&self, scope: Scope, relation_type: RelationType) -> QueryContext<'a> {
6344 let ctes = self.ctes.clone();
6345 let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6346 let outer_relation_types = iter::once(relation_type)
6347 .chain(self.outer_relation_types.clone())
6348 .collect();
6349
6350 QueryContext {
6351 scx: self.scx,
6352 lifetime: self.lifetime,
6353 outer_scopes,
6354 outer_relation_types,
6355 ctes,
6356 recursion_guard: self.recursion_guard.clone(),
6357 }
6358 }
6359
6360 fn empty_derived_context(&self) -> QueryContext<'a> {
6362 let scope = Scope::empty();
6363 let ty = RelationType::empty();
6364 self.derived_context(scope, ty)
6365 }
6366
6367 pub fn resolve_table_name(
6370 &self,
6371 object: ResolvedItemName,
6372 ) -> Result<(HirRelationExpr, Scope), PlanError> {
6373 match object {
6374 ResolvedItemName::Item {
6375 id,
6376 full_name,
6377 version,
6378 ..
6379 } => {
6380 let name = full_name.into();
6381 let item = self.scx.get_item(&id).at_version(version);
6382 let desc = item
6383 .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6384 .clone();
6385 let expr = HirRelationExpr::Get {
6386 id: Id::Global(item.global_id()),
6387 typ: desc.typ().clone(),
6388 };
6389
6390 let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6391
6392 Ok((expr, scope))
6393 }
6394 ResolvedItemName::Cte { id, name } => {
6395 let name = name.into();
6396 let cte = self.ctes.get(&id).unwrap();
6397 let expr = HirRelationExpr::Get {
6398 id: Id::Local(id),
6399 typ: cte.desc.typ().clone(),
6400 };
6401
6402 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6403
6404 Ok((expr, scope))
6405 }
6406 ResolvedItemName::ContinualTask { id, name } => {
6407 let cte = self.ctes.get(&id).unwrap();
6408 let expr = HirRelationExpr::Get {
6409 id: Id::Local(id),
6410 typ: cte.desc.typ().clone(),
6411 };
6412
6413 let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6414
6415 Ok((expr, scope))
6416 }
6417 ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6418 }
6419 }
6420
6421 pub fn humanize_scalar_type(&self, typ: &ScalarType, postgres_compat: bool) -> String {
6424 self.scx.humanize_scalar_type(typ, postgres_compat)
6425 }
6426}
6427
6428#[derive(Debug, Clone)]
6430pub struct ExprContext<'a> {
6431 pub qcx: &'a QueryContext<'a>,
6432 pub name: &'a str,
6434 pub scope: &'a Scope,
6437 pub relation_type: &'a RelationType,
6440 pub allow_aggregates: bool,
6442 pub allow_subqueries: bool,
6444 pub allow_parameters: bool,
6446 pub allow_windows: bool,
6448}
6449
6450impl CheckedRecursion for ExprContext<'_> {
6451 fn recursion_guard(&self) -> &RecursionGuard {
6452 &self.qcx.recursion_guard
6453 }
6454}
6455
6456impl<'a> ExprContext<'a> {
6457 pub fn catalog(&self) -> &dyn SessionCatalog {
6458 self.qcx.scx.catalog
6459 }
6460
6461 pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6462 let mut ecx = self.clone();
6463 ecx.name = name;
6464 ecx
6465 }
6466
6467 pub fn column_type<E>(&self, expr: &E) -> E::Type
6468 where
6469 E: AbstractExpr,
6470 {
6471 expr.typ(
6472 &self.qcx.outer_relation_types,
6473 self.relation_type,
6474 &self.qcx.scx.param_types.borrow(),
6475 )
6476 }
6477
6478 pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6479 where
6480 E: AbstractExpr,
6481 {
6482 self.column_type(expr).scalar_type()
6483 }
6484
6485 fn derived_query_context(&self) -> QueryContext {
6486 let mut scope = self.scope.clone();
6487 scope.lateral_barrier = true;
6488 self.qcx.derived_context(scope, self.relation_type.clone())
6489 }
6490
6491 pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6492 self.qcx.scx.require_feature_flag(flag)
6493 }
6494
6495 pub fn param_types(&self) -> &RefCell<BTreeMap<usize, ScalarType>> {
6496 &self.qcx.scx.param_types
6497 }
6498
6499 pub fn humanize_scalar_type(&self, typ: &ScalarType, postgres_compat: bool) -> String {
6502 self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6503 }
6504}