Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50    ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51    MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
56    TableFunc, func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
70use mz_repr::{
71    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
72    ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
73    UNKNOWN_COLUMN_NAME, strconv,
74};
75use mz_sql_parser::ast::display::AstDisplay;
76use mz_sql_parser::ast::visit::Visit;
77use mz_sql_parser::ast::visit_mut::{self, VisitMut};
78use mz_sql_parser::ast::{
79    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
80    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
81    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
82    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
83    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
84    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
85    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
86    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
87};
88use mz_sql_parser::ident;
89
90use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
91use crate::func::{self, Func, FuncSpec, TableFuncImpl};
92use crate::names::{
93    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
94};
95use crate::plan::PlanError::InvalidWmrRecursionLimit;
96use crate::plan::error::PlanError;
97use crate::plan::hir::{
98    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
99    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
100    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
101    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
102};
103use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
104use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
105use crate::plan::statement::{StatementContext, StatementDesc, show};
106use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
107use crate::plan::{
108    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
109    literal, transform_ast,
110};
111use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
112use crate::session::vars::{self, FeatureFlag};
113use crate::{ORDINALITY_COL_NAME, normalize};
114
115#[derive(Debug)]
116pub struct PlannedRootQuery<E> {
117    pub expr: E,
118    pub desc: RelationDesc,
119    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
120    pub scope: Scope,
121}
122
123/// Plans a top-level query, returning the `HirRelationExpr` describing the query
124/// plan, the `RelationDesc` describing the shape of the result set, a
125/// `RowSetFinishing` describing post-processing that must occur before results
126/// are sent to the client, and the types of the parameters in the query, if any
127/// were present.
128///
129/// Note that the returned `RelationDesc` describes the expression after
130/// applying the returned `RowSetFinishing`.
131#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
132pub fn plan_root_query(
133    scx: &StatementContext,
134    mut query: Query<Aug>,
135    lifetime: QueryLifetime,
136) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
137    transform_ast::transform(scx, &mut query)?;
138    let mut qcx = QueryContext::root(scx, lifetime);
139    let PlannedQuery {
140        mut expr,
141        scope,
142        order_by,
143        limit,
144        offset,
145        project,
146        group_size_hints,
147    } = plan_query(&mut qcx, &query)?;
148
149    let mut finishing = RowSetFinishing {
150        limit,
151        offset,
152        project,
153        order_by,
154    };
155
156    // Attempt to push the finishing's ordering past its projection. This allows
157    // data to be projected down on the workers rather than the coordinator. It
158    // also improves the optimizer's demand analysis, as the optimizer can only
159    // reason about demand information in `expr` (i.e., it can't see
160    // `finishing.project`).
161    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
162
163    if lifetime.is_maintained() {
164        expr.finish_maintained(&mut finishing, group_size_hints);
165    }
166
167    let typ = qcx.relation_type(&expr);
168    let typ = SqlRelationType::new(
169        finishing
170            .project
171            .iter()
172            .map(|i| typ.column_types[*i].clone())
173            .collect(),
174    );
175    let desc = RelationDesc::new(typ, scope.column_names());
176
177    Ok(PlannedRootQuery {
178        expr,
179        desc,
180        finishing,
181        scope,
182    })
183}
184
185/// TODO(ct2): Dedup this with [plan_root_query].
186#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
187pub fn plan_ct_query(
188    qcx: &mut QueryContext,
189    mut query: Query<Aug>,
190) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
191    transform_ast::transform(qcx.scx, &mut query)?;
192    let PlannedQuery {
193        mut expr,
194        scope,
195        order_by,
196        limit,
197        offset,
198        project,
199        group_size_hints,
200    } = plan_query(qcx, &query)?;
201
202    let mut finishing = RowSetFinishing {
203        limit,
204        offset,
205        project,
206        order_by,
207    };
208
209    // Attempt to push the finishing's ordering past its projection. This allows
210    // data to be projected down on the workers rather than the coordinator. It
211    // also improves the optimizer's demand analysis, as the optimizer can only
212    // reason about demand information in `expr` (i.e., it can't see
213    // `finishing.project`).
214    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
215
216    expr.finish_maintained(&mut finishing, group_size_hints);
217
218    let typ = qcx.relation_type(&expr);
219    let typ = SqlRelationType::new(
220        finishing
221            .project
222            .iter()
223            .map(|i| typ.column_types[*i].clone())
224            .collect(),
225    );
226    let desc = RelationDesc::new(typ, scope.column_names());
227
228    Ok(PlannedRootQuery {
229        expr,
230        desc,
231        finishing,
232        scope,
233    })
234}
235
236/// Attempts to push a projection through an order by.
237///
238/// The returned bool indicates whether the pushdown was successful or not.
239/// Successful pushdown requires that all the columns referenced in `order_by`
240/// are included in `project`.
241///
242/// When successful, `expr` is wrapped in a projection node, `order_by` is
243/// rewritten to account for the pushed-down projection, and `project` is
244/// replaced with the trivial projection. When unsuccessful, no changes are made
245/// to any of the inputs.
246fn try_push_projection_order_by(
247    expr: &mut HirRelationExpr,
248    project: &mut Vec<usize>,
249    order_by: &mut Vec<ColumnOrder>,
250) -> bool {
251    let mut unproject = vec![None; expr.arity()];
252    for (out_i, in_i) in project.iter().copied().enumerate() {
253        unproject[in_i] = Some(out_i);
254    }
255    if order_by
256        .iter()
257        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
258    {
259        let trivial_project = (0..project.len()).collect();
260        *expr = expr.take().project(mem::replace(project, trivial_project));
261        for ob in order_by {
262            ob.column = unproject[ob.column].unwrap();
263        }
264        true
265    } else {
266        false
267    }
268}
269
270pub fn plan_insert_query(
271    scx: &StatementContext,
272    table_name: ResolvedItemName,
273    columns: Vec<Ident>,
274    source: InsertSource<Aug>,
275    returning: Vec<SelectItem<Aug>>,
276) -> Result<
277    (
278        CatalogItemId,
279        HirRelationExpr,
280        PlannedRootQuery<Vec<HirScalarExpr>>,
281    ),
282    PlanError,
283> {
284    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
285    let table = scx.get_item_by_resolved_name(&table_name)?;
286
287    // Validate the target of the insert.
288    if table.item_type() != CatalogItemType::Table {
289        sql_bail!(
290            "cannot insert into {} '{}'",
291            table.item_type(),
292            table_name.full_name_str()
293        );
294    }
295    let desc = table.relation_desc().expect("table has desc");
296    let mut defaults = table
297        .writable_table_details()
298        .ok_or_else(|| {
299            sql_err!(
300                "cannot insert into non-writeable table '{}'",
301                table_name.full_name_str()
302            )
303        })?
304        .to_vec();
305
306    for default in &mut defaults {
307        transform_ast::transform(scx, default)?;
308    }
309
310    if table.id().is_system() {
311        sql_bail!(
312            "cannot insert into system table '{}'",
313            table_name.full_name_str()
314        );
315    }
316
317    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
318
319    // Validate target column order.
320    let mut source_types = Vec::with_capacity(columns.len());
321    let mut ordering = Vec::with_capacity(columns.len());
322
323    if columns.is_empty() {
324        // Columns in source query must be in order. Let's guess the full shape and truncate to the
325        // right size later after planning the source query
326        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
327        ordering.extend(0..desc.arity());
328    } else {
329        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
330            .iter()
331            .enumerate()
332            .map(|(idx, (name, typ))| (name, (idx, typ)))
333            .collect();
334
335        for c in &columns {
336            if let Some((idx, typ)) = column_by_name.get(c) {
337                ordering.push(*idx);
338                source_types.push(&typ.scalar_type);
339            } else {
340                sql_bail!(
341                    "column {} of relation {} does not exist",
342                    c.quoted(),
343                    table_name.full_name_str().quoted()
344                );
345            }
346        }
347        if let Some(dup) = columns.iter().duplicates().next() {
348            sql_bail!("column {} specified more than once", dup.quoted());
349        }
350    };
351
352    // Plan the source.
353    let expr = match source {
354        InsertSource::Query(mut query) => {
355            transform_ast::transform(scx, &mut query)?;
356
357            match query {
358                // Special-case simple VALUES clauses as PostgreSQL does.
359                Query {
360                    body: SetExpr::Values(Values(values)),
361                    ctes,
362                    order_by,
363                    limit: None,
364                    offset: None,
365                } if ctes.is_empty() && order_by.is_empty() => {
366                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
367                    plan_values_insert(&qcx, &names, &source_types, &values)?
368                }
369                _ => {
370                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
371                    expr
372                }
373            }
374        }
375        InsertSource::DefaultValues => {
376            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
377        }
378    };
379
380    let expr_arity = expr.arity();
381
382    // Validate that the arity of the source query is at most the size of declared columns or the
383    // size of the table if none are declared
384    let max_columns = if columns.is_empty() {
385        desc.arity()
386    } else {
387        columns.len()
388    };
389    if expr_arity > max_columns {
390        sql_bail!("INSERT has more expressions than target columns");
391    }
392    // But it should never have less than the declared columns (or zero)
393    if expr_arity < columns.len() {
394        sql_bail!("INSERT has more target columns than expressions");
395    }
396
397    // Trim now that we know for sure the correct arity of the source query
398    source_types.truncate(expr_arity);
399    ordering.truncate(expr_arity);
400
401    // Ensure the types of the source query match the types of the target table,
402    // installing assignment casts where necessary and possible.
403    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
404        sql_err!(
405            "column {} is of type {} but expression is of type {}",
406            desc.get_name(ordering[e.column]).quoted(),
407            qcx.humanize_sql_scalar_type(&e.target_type, false),
408            qcx.humanize_sql_scalar_type(&e.source_type, false),
409        )
410    })?;
411
412    // Fill in any omitted columns and rearrange into correct order
413    let mut map_exprs = vec![];
414    let mut project_key = Vec::with_capacity(desc.arity());
415
416    // Maps from table column index to position in the source query
417    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
418
419    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
420    for (col_idx, (col_typ, default)) in column_details {
421        if let Some(src_idx) = col_to_source.get(&col_idx) {
422            project_key.push(*src_idx);
423        } else {
424            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
425            project_key.push(expr_arity + map_exprs.len());
426            map_exprs.push(hir);
427        }
428    }
429
430    let returning = {
431        let (scope, typ) = if let ResolvedItemName::Item {
432            full_name,
433            version: _,
434            ..
435        } = table_name
436        {
437            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
438            let typ = desc.typ().clone();
439            (scope, typ)
440        } else {
441            (Scope::empty(), SqlRelationType::empty())
442        };
443        let ecx = &ExprContext {
444            qcx: &qcx,
445            name: "RETURNING clause",
446            scope: &scope,
447            relation_type: &typ,
448            allow_aggregates: false,
449            allow_subqueries: false,
450            allow_parameters: true,
451            allow_windows: false,
452        };
453        let table_func_names = BTreeMap::new();
454        let mut output_columns = vec![];
455        let mut new_exprs = vec![];
456        let mut new_type = SqlRelationType::empty();
457        for mut si in returning {
458            transform_ast::transform(scx, &mut si)?;
459            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
460                let expr = match &select_item {
461                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
462                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
463                };
464                output_columns.push(column_name);
465                let typ = ecx.column_type(&expr);
466                new_type.column_types.push(typ);
467                new_exprs.push(expr);
468            }
469        }
470        let desc = RelationDesc::new(new_type, output_columns);
471        let desc_arity = desc.arity();
472        PlannedRootQuery {
473            expr: new_exprs,
474            desc,
475            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
476            scope,
477        }
478    };
479
480    Ok((
481        table.id(),
482        expr.map(map_exprs).project(project_key),
483        returning,
484    ))
485}
486
487/// Determines the mapping between some external data and a Materialize relation.
488///
489/// Returns the following:
490/// * [`CatalogItemId`] for the destination table.
491/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
492/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
493///   since we now return a [`MapFilterProject`].
494/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
495///   destination table.
496///
497pub fn plan_copy_item(
498    scx: &StatementContext,
499    item_name: ResolvedItemName,
500    columns: Vec<Ident>,
501) -> Result<
502    (
503        CatalogItemId,
504        RelationDesc,
505        Vec<ColumnIndex>,
506        Option<MapFilterProject>,
507    ),
508    PlanError,
509> {
510    let item = scx.get_item_by_resolved_name(&item_name)?;
511    let fullname = scx.catalog.resolve_full_name(item.name());
512    let table_desc = match item.relation_desc() {
513        Some(desc) => desc.into_owned(),
514        None => {
515            return Err(PlanError::InvalidDependency {
516                name: fullname.to_string(),
517                item_type: item.item_type().to_string(),
518            });
519        }
520    };
521    let mut ordering = Vec::with_capacity(columns.len());
522
523    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
524    // should be simplified. The reason they are currently separate code paths is so we can roll
525    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
526
527    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
528    // FROM SOURCE ...`), then we generate an MFP.
529    //
530    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
531    // so it's not always guaranteed that our `item` is a table.
532    let mfp = if let Some(table_defaults) = item.writable_table_details() {
533        let mut table_defaults = table_defaults.to_vec();
534
535        for default in &mut table_defaults {
536            transform_ast::transform(scx, default)?;
537        }
538
539        // Fill in any omitted columns and rearrange into correct order
540        let source_column_names: Vec<_> = columns
541            .iter()
542            .cloned()
543            .map(normalize::column_name)
544            .collect();
545
546        let mut default_exprs = Vec::new();
547        let mut project_keys = Vec::with_capacity(table_desc.arity());
548
549        // For each column in the destination table, either project it from the source data, or provide
550        // an expression to fill in a default value.
551        let column_details = table_desc.iter().zip_eq(table_defaults);
552        for ((col_name, col_type), col_default) in column_details {
553            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
554            if let Some(src_idx) = maybe_src_idx {
555                project_keys.push(src_idx);
556            } else {
557                // If one a column from the table does not exist in the source data, then a default
558                // value will get appended to the end of the input Row from the source data.
559                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
560                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
561                project_keys.push(source_column_names.len() + default_exprs.len());
562                default_exprs.push(mir);
563            }
564        }
565
566        let mfp = MapFilterProject::new(source_column_names.len())
567            .map(default_exprs)
568            .project(project_keys);
569        Some(mfp)
570    } else {
571        None
572    };
573
574    // Create a mapping from input data to the table we're copying into.
575    let source_desc = if columns.is_empty() {
576        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
577        ordering.extend(indexes);
578
579        // The source data should be in the same order as the table.
580        table_desc
581    } else {
582        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
583        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
584            .iter_all()
585            .map(|(idx, name, typ)| (name, (*idx, typ)))
586            .collect();
587
588        let mut names = Vec::with_capacity(columns.len());
589        let mut source_types = Vec::with_capacity(columns.len());
590
591        for c in &columns {
592            if let Some((idx, typ)) = column_by_name.get(c) {
593                ordering.push(*idx);
594                source_types.push((*typ).clone());
595                names.push(c.clone());
596            } else {
597                sql_bail!(
598                    "column {} of relation {} does not exist",
599                    c.quoted(),
600                    item_name.full_name_str().quoted()
601                );
602            }
603        }
604        if let Some(dup) = columns.iter().duplicates().next() {
605            sql_bail!("column {} specified more than once", dup.quoted());
606        }
607
608        // The source data is a different shape than the destination table.
609        RelationDesc::new(SqlRelationType::new(source_types), names)
610    };
611
612    Ok((item.id(), source_desc, ordering, mfp))
613}
614
615/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
616///
617/// TODO(cf3): Merge this method with [`plan_copy_item`].
618pub fn plan_copy_from(
619    scx: &StatementContext,
620    table_name: ResolvedItemName,
621    columns: Vec<Ident>,
622) -> Result<
623    (
624        CatalogItemId,
625        RelationDesc,
626        Vec<ColumnIndex>,
627        Option<MapFilterProject>,
628    ),
629    PlanError,
630> {
631    let table = scx.get_item_by_resolved_name(&table_name)?;
632
633    // Validate the target of the insert.
634    if table.item_type() != CatalogItemType::Table {
635        sql_bail!(
636            "cannot insert into {} '{}'",
637            table.item_type(),
638            table_name.full_name_str()
639        );
640    }
641
642    let _ = table.writable_table_details().ok_or_else(|| {
643        sql_err!(
644            "cannot insert into non-writeable table '{}'",
645            table_name.full_name_str()
646        )
647    })?;
648
649    if table.id().is_system() {
650        sql_bail!(
651            "cannot insert into system table '{}'",
652            table_name.full_name_str()
653        );
654    }
655    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
656
657    Ok((id, desc, ordering, mfp))
658}
659
660/// Builds a plan that adds the default values for the missing columns and re-orders
661/// the datums in the given rows to match the order in the target table.
662pub fn plan_copy_from_rows(
663    pcx: &PlanContext,
664    catalog: &dyn SessionCatalog,
665    target_id: CatalogItemId,
666    target_name: String,
667    columns: Vec<ColumnIndex>,
668    rows: Vec<mz_repr::Row>,
669) -> Result<HirRelationExpr, PlanError> {
670    let scx = StatementContext::new(Some(pcx), catalog);
671
672    // Always copy at the latest version of the table.
673    let table = catalog
674        .try_get_item(&target_id)
675        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
676        .at_version(RelationVersionSelector::Latest);
677
678    let mut defaults = table
679        .writable_table_details()
680        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
681        .to_vec();
682
683    for default in &mut defaults {
684        transform_ast::transform(&scx, default)?;
685    }
686
687    let desc = table.relation_desc().expect("table has desc");
688    let column_types = columns
689        .iter()
690        .map(|x| desc.get_type(x).clone())
691        .map(|mut x| {
692            // Null constraint is enforced later, when inserting the row in the table.
693            // Without this, an assert is hit during lowering.
694            x.nullable = true;
695            x
696        })
697        .collect();
698    let typ = SqlRelationType::new(column_types);
699    let expr = HirRelationExpr::Constant {
700        rows,
701        typ: typ.clone(),
702    };
703
704    // Exit early with just the raw constant if we know that all columns are present
705    // and in the correct order. This lets us bypass expensive downstream optimizations
706    // more easily, as at every stage we know this expression is nothing more than
707    // a constant (as opposed to e.g. a constant with with an identity map and identity
708    // projection).
709    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
710    if columns == default {
711        return Ok(expr);
712    }
713
714    // Fill in any omitted columns and rearrange into correct order
715    let mut map_exprs = vec![];
716    let mut project_key = Vec::with_capacity(desc.arity());
717
718    // Maps from table column index to position in the source query
719    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
720
721    let column_details = desc.iter_all().zip_eq(defaults);
722    for ((col_idx, _col_name, col_typ), default) in column_details {
723        if let Some(src_idx) = col_to_source.get(&col_idx) {
724            project_key.push(*src_idx);
725        } else {
726            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
727            project_key.push(typ.arity() + map_exprs.len());
728            map_exprs.push(hir);
729        }
730    }
731
732    Ok(expr.map(map_exprs).project(project_key))
733}
734
735/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
736pub struct ReadThenWritePlan {
737    pub id: CatalogItemId,
738    /// Read portion of query.
739    ///
740    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
741    /// retractions.
742    pub selection: HirRelationExpr,
743    /// Map from column index to SET expression. Empty for DELETE statements.
744    pub assignments: BTreeMap<usize, HirScalarExpr>,
745    pub finishing: RowSetFinishing,
746}
747
748pub fn plan_delete_query(
749    scx: &StatementContext,
750    mut delete_stmt: DeleteStatement<Aug>,
751) -> Result<ReadThenWritePlan, PlanError> {
752    transform_ast::transform(scx, &mut delete_stmt)?;
753
754    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
755    plan_mutation_query_inner(
756        qcx,
757        delete_stmt.table_name,
758        delete_stmt.alias,
759        delete_stmt.using,
760        vec![],
761        delete_stmt.selection,
762    )
763}
764
765pub fn plan_update_query(
766    scx: &StatementContext,
767    mut update_stmt: UpdateStatement<Aug>,
768) -> Result<ReadThenWritePlan, PlanError> {
769    transform_ast::transform(scx, &mut update_stmt)?;
770
771    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
772
773    plan_mutation_query_inner(
774        qcx,
775        update_stmt.table_name,
776        update_stmt.alias,
777        vec![],
778        update_stmt.assignments,
779        update_stmt.selection,
780    )
781}
782
783pub fn plan_mutation_query_inner(
784    qcx: QueryContext,
785    table_name: ResolvedItemName,
786    alias: Option<TableAlias>,
787    using: Vec<TableWithJoins<Aug>>,
788    assignments: Vec<Assignment<Aug>>,
789    selection: Option<Expr<Aug>>,
790) -> Result<ReadThenWritePlan, PlanError> {
791    // Get ID and version of the relation desc.
792    let (id, version) = match table_name {
793        ResolvedItemName::Item { id, version, .. } => (id, version),
794        _ => sql_bail!("cannot mutate non-user table"),
795    };
796
797    // Perform checks on item with given ID.
798    let item = qcx.scx.get_item(&id).at_version(version);
799    if item.item_type() != CatalogItemType::Table {
800        sql_bail!(
801            "cannot mutate {} '{}'",
802            item.item_type(),
803            table_name.full_name_str()
804        );
805    }
806    let _ = item.writable_table_details().ok_or_else(|| {
807        sql_err!(
808            "cannot mutate non-writeable table '{}'",
809            table_name.full_name_str()
810        )
811    })?;
812    if id.is_system() {
813        sql_bail!(
814            "cannot mutate system table '{}'",
815            table_name.full_name_str()
816        );
817    }
818
819    // Derive structs for operation from validated table
820    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
821    let scope = plan_table_alias(scope, alias.as_ref())?;
822    let desc = item.relation_desc().expect("table has desc");
823    let relation_type = qcx.relation_type(&get);
824
825    if using.is_empty() {
826        if let Some(expr) = selection {
827            let ecx = &ExprContext {
828                qcx: &qcx,
829                name: "WHERE clause",
830                scope: &scope,
831                relation_type: &relation_type,
832                allow_aggregates: false,
833                allow_subqueries: true,
834                allow_parameters: true,
835                allow_windows: false,
836            };
837            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
838            get = get.filter(vec![expr]);
839        }
840    } else {
841        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
842    }
843
844    let mut sets = BTreeMap::new();
845    for Assignment { id, value } in assignments {
846        // Get the index and type of the column.
847        let name = normalize::column_name(id);
848        match desc.get_by_name(&name) {
849            Some((idx, typ)) => {
850                let ecx = &ExprContext {
851                    qcx: &qcx,
852                    name: "SET clause",
853                    scope: &scope,
854                    relation_type: &relation_type,
855                    allow_aggregates: false,
856                    allow_subqueries: false,
857                    allow_parameters: true,
858                    allow_windows: false,
859                };
860                let expr = plan_expr(ecx, &value)?.cast_to(
861                    ecx,
862                    CastContext::Assignment,
863                    &typ.scalar_type,
864                )?;
865
866                if sets.insert(idx, expr).is_some() {
867                    sql_bail!("column {} set twice", name)
868                }
869            }
870            None => sql_bail!("unknown column {}", name),
871        };
872    }
873
874    let finishing = RowSetFinishing {
875        order_by: vec![],
876        limit: None,
877        offset: 0,
878        project: (0..desc.arity()).collect(),
879    };
880
881    Ok(ReadThenWritePlan {
882        id,
883        selection: get,
884        finishing,
885        assignments: sets,
886    })
887}
888
889// Adjust `get` to perform an existential subquery on `using` accounting for
890// `selection`.
891//
892// If `USING`, we essentially want to rewrite the query as a correlated
893// existential subquery, i.e.
894// ```
895// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
896// ```
897// However, we can't do that directly because of esoteric rules w/r/t `lateral`
898// subqueries.
899// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
900fn handle_mutation_using_clause(
901    qcx: &QueryContext,
902    selection: Option<Expr<Aug>>,
903    using: Vec<TableWithJoins<Aug>>,
904    get: HirRelationExpr,
905    outer_scope: Scope,
906) -> Result<HirRelationExpr, PlanError> {
907    // Plan `USING` as a cross-joined `FROM` without knowledge of the
908    // statement's `FROM` target. This prevents `lateral` subqueries from
909    // "seeing" the `FROM` target.
910    let (mut using_rel_expr, using_scope) =
911        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
912            let (left, left_scope) = l;
913            plan_join(
914                qcx,
915                left,
916                left_scope,
917                &Join {
918                    relation: TableFactor::NestedJoin {
919                        join: Box::new(twj),
920                        alias: None,
921                    },
922                    join_operator: JoinOperator::CrossJoin,
923                },
924            )
925        })?;
926
927    if let Some(expr) = selection {
928        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
929        // PG-like semantics e.g. expressing ambiguous column references. We put
930        // `USING...` first for no real reason, but making a different decision
931        // would require adjusting the column references on this relation
932        // differently.
933        let on = HirScalarExpr::literal_true();
934        let joined = using_rel_expr
935            .clone()
936            .join(get.clone(), on, JoinKind::Inner);
937        let joined_scope = using_scope.product(outer_scope)?;
938        let joined_relation_type = qcx.relation_type(&joined);
939
940        let ecx = &ExprContext {
941            qcx,
942            name: "WHERE clause",
943            scope: &joined_scope,
944            relation_type: &joined_relation_type,
945            allow_aggregates: false,
946            allow_subqueries: true,
947            allow_parameters: true,
948            allow_windows: false,
949        };
950
951        // Plan the filter expression on `FROM, USING...`.
952        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
953
954        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
955        // those to the right of `using_rel_expr`) to instead be correlated to
956        // the outer relation, i.e. `get`.
957        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
958        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
959        use mz_expr::visit::Visit;
960        expr.visit_mut_post(&mut |e| {
961            if let HirScalarExpr::Column(c, _name) = e {
962                if c.column >= using_rel_arity {
963                    c.level += 1;
964                    c.column -= using_rel_arity;
965                };
966            }
967        })?;
968
969        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
970        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
971        // relation.
972        using_rel_expr = using_rel_expr.filter(vec![expr]);
973    } else {
974        // Check that scopes are at compatible (i.e. do not double-reference
975        // same table), despite lack of selection
976        let _joined_scope = using_scope.product(outer_scope)?;
977    }
978    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
979    // whether any rows are returned, and not on the contents of those rows,
980    // the output list of the subquery is normally unimportant.
981    //
982    // This means we don't need to worry about projecting/mapping any
983    // additional expressions here.
984    //
985    // https://www.postgresql.org/docs/14/functions-subquery.html
986
987    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
988    Ok(get.filter(vec![using_rel_expr.exists()]))
989}
990
991#[derive(Debug)]
992pub(crate) struct CastRelationError {
993    pub(crate) column: usize,
994    pub(crate) source_type: SqlScalarType,
995    pub(crate) target_type: SqlScalarType,
996}
997
998/// Cast a relation from one type to another using the specified type of cast.
999///
1000/// The length of `target_types` must match the arity of `expr`.
1001pub(crate) fn cast_relation<'a, I>(
1002    qcx: &QueryContext,
1003    ccx: CastContext,
1004    expr: HirRelationExpr,
1005    target_types: I,
1006) -> Result<HirRelationExpr, CastRelationError>
1007where
1008    I: IntoIterator<Item = &'a SqlScalarType>,
1009{
1010    let ecx = &ExprContext {
1011        qcx,
1012        name: "values",
1013        scope: &Scope::empty(),
1014        relation_type: &qcx.relation_type(&expr),
1015        allow_aggregates: false,
1016        allow_subqueries: true,
1017        allow_parameters: true,
1018        allow_windows: false,
1019    };
1020    let mut map_exprs = vec![];
1021    let mut project_key = vec![];
1022    for (i, target_typ) in target_types.into_iter().enumerate() {
1023        let expr = HirScalarExpr::column(i);
1024        // We plan every cast and check the evaluated expressions rather than
1025        // checking the types directly because of some complex casting rules
1026        // between types not expressed in `SqlScalarType` equality.
1027        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1028            Ok(cast_expr) => {
1029                if expr == cast_expr {
1030                    // Cast between types was unnecessary
1031                    project_key.push(i);
1032                } else {
1033                    // Cast between types required
1034                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1035                    map_exprs.push(cast_expr);
1036                }
1037            }
1038            Err(_) => {
1039                return Err(CastRelationError {
1040                    column: i,
1041                    source_type: ecx.scalar_type(&expr),
1042                    target_type: target_typ.clone(),
1043                });
1044            }
1045        }
1046    }
1047    Ok(expr.map(map_exprs).project(project_key))
1048}
1049
1050/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1051/// VIEW` statement.
1052pub fn plan_as_of(
1053    scx: &StatementContext,
1054    as_of: Option<AsOf<Aug>>,
1055) -> Result<QueryWhen, PlanError> {
1056    match as_of {
1057        None => Ok(QueryWhen::Immediately),
1058        Some(as_of) => match as_of {
1059            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1060            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1061        },
1062    }
1063}
1064
1065/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1066///
1067/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1068/// - not a constant,
1069/// - not castable to MzTimestamp,
1070/// - is null,
1071/// - contains an unmaterializable function,
1072/// - some other evaluation error occurs, e.g., a division by 0,
1073/// - contains aggregates, subqueries, parameters, or window function calls.
1074pub fn plan_as_of_or_up_to(
1075    scx: &StatementContext,
1076    mut expr: Expr<Aug>,
1077) -> Result<mz_repr::Timestamp, PlanError> {
1078    let scope = Scope::empty();
1079    let desc = RelationDesc::empty();
1080    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1081    // evaluated only once.)
1082    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1083    transform_ast::transform(scx, &mut expr)?;
1084    let ecx = &ExprContext {
1085        qcx: &qcx,
1086        name: "AS OF or UP TO",
1087        scope: &scope,
1088        relation_type: desc.typ(),
1089        allow_aggregates: false,
1090        allow_subqueries: false,
1091        allow_parameters: false,
1092        allow_windows: false,
1093    };
1094    let hir = plan_expr(ecx, &expr)?.cast_to(
1095        ecx,
1096        CastContext::Assignment,
1097        &SqlScalarType::MzTimestamp,
1098    )?;
1099    if hir.contains_unmaterializable() {
1100        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1101    }
1102    // At this point, we definitely have a constant expression:
1103    // - it can't contain any unmaterializable functions;
1104    // - it can't refer to any columns.
1105    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1106    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1107    // division by 0.
1108    let timestamp = hir
1109        .into_literal_mz_timestamp()
1110        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1111    Ok(timestamp)
1112}
1113
1114/// Plans an expression in the AS position of a `CREATE SECRET`.
1115pub fn plan_secret_as(
1116    scx: &StatementContext,
1117    mut expr: Expr<Aug>,
1118) -> Result<MirScalarExpr, PlanError> {
1119    let scope = Scope::empty();
1120    let desc = RelationDesc::empty();
1121    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1122
1123    transform_ast::transform(scx, &mut expr)?;
1124
1125    let ecx = &ExprContext {
1126        qcx: &qcx,
1127        name: "AS",
1128        scope: &scope,
1129        relation_type: desc.typ(),
1130        allow_aggregates: false,
1131        allow_subqueries: false,
1132        allow_parameters: false,
1133        allow_windows: false,
1134    };
1135    let expr = plan_expr(ecx, &expr)?
1136        .type_as(ecx, &SqlScalarType::Bytes)?
1137        .lower_uncorrelated(scx.catalog.system_vars())?;
1138    Ok(expr)
1139}
1140
1141/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1142pub fn plan_webhook_validate_using(
1143    scx: &StatementContext,
1144    validate_using: CreateWebhookSourceCheck<Aug>,
1145) -> Result<WebhookValidation, PlanError> {
1146    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1147
1148    let CreateWebhookSourceCheck {
1149        options,
1150        using: mut expr,
1151    } = validate_using;
1152
1153    let mut column_typs = vec![];
1154    let mut column_names = vec![];
1155
1156    let (bodies, headers, secrets) = options
1157        .map(|o| (o.bodies, o.headers, o.secrets))
1158        .unwrap_or_default();
1159
1160    // Append all of the bodies so they can be used in the expression.
1161    let mut body_tuples = vec![];
1162    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1163        let scalar_type = use_bytes
1164            .then_some(SqlScalarType::Bytes)
1165            .unwrap_or(SqlScalarType::String);
1166        let name = alias
1167            .map(|a| a.into_string())
1168            .unwrap_or_else(|| "body".to_string());
1169
1170        column_typs.push(SqlColumnType {
1171            scalar_type,
1172            nullable: false,
1173        });
1174        column_names.push(name);
1175
1176        // Store the column index so we can be sure to provide this body correctly.
1177        let column_idx = column_typs.len() - 1;
1178        // Double check we're consistent with column names.
1179        assert_eq!(
1180            column_idx,
1181            column_names.len() - 1,
1182            "body column names and types don't match"
1183        );
1184        body_tuples.push((column_idx, use_bytes));
1185    }
1186
1187    // Append all of the headers so they can be used in the expression.
1188    let mut header_tuples = vec![];
1189
1190    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1191        let value_type = use_bytes
1192            .then_some(SqlScalarType::Bytes)
1193            .unwrap_or(SqlScalarType::String);
1194        let name = alias
1195            .map(|a| a.into_string())
1196            .unwrap_or_else(|| "headers".to_string());
1197
1198        column_typs.push(SqlColumnType {
1199            scalar_type: SqlScalarType::Map {
1200                value_type: Box::new(value_type),
1201                custom_id: None,
1202            },
1203            nullable: false,
1204        });
1205        column_names.push(name);
1206
1207        // Store the column index so we can be sure to provide this body correctly.
1208        let column_idx = column_typs.len() - 1;
1209        // Double check we're consistent with column names.
1210        assert_eq!(
1211            column_idx,
1212            column_names.len() - 1,
1213            "header column names and types don't match"
1214        );
1215        header_tuples.push((column_idx, use_bytes));
1216    }
1217
1218    // Append all secrets so they can be used in the expression.
1219    let mut validation_secrets = vec![];
1220
1221    for CreateWebhookSourceSecret {
1222        secret,
1223        alias,
1224        use_bytes,
1225    } in secrets
1226    {
1227        // Either provide the secret to the validation expression as Bytes or a String.
1228        let scalar_type = use_bytes
1229            .then_some(SqlScalarType::Bytes)
1230            .unwrap_or(SqlScalarType::String);
1231
1232        column_typs.push(SqlColumnType {
1233            scalar_type,
1234            nullable: false,
1235        });
1236        let ResolvedItemName::Item {
1237            id,
1238            full_name: FullItemName { item, .. },
1239            ..
1240        } = secret
1241        else {
1242            return Err(PlanError::InvalidSecret(Box::new(secret)));
1243        };
1244
1245        // Plan the expression using the secret's alias, if one is provided.
1246        let name = if let Some(alias) = alias {
1247            alias.into_string()
1248        } else {
1249            item
1250        };
1251        column_names.push(name);
1252
1253        // Get the column index that corresponds for this secret, so we can make sure to provide the
1254        // secrets in the correct order during evaluation.
1255        let column_idx = column_typs.len() - 1;
1256        // Double check that our column names and types match.
1257        assert_eq!(
1258            column_idx,
1259            column_names.len() - 1,
1260            "column names and types don't match"
1261        );
1262
1263        validation_secrets.push(WebhookValidationSecret {
1264            id,
1265            column_idx,
1266            use_bytes,
1267        });
1268    }
1269
1270    let relation_typ = SqlRelationType::new(column_typs);
1271    let desc = RelationDesc::new(relation_typ, column_names.clone());
1272    let scope = Scope::from_source(None, column_names);
1273
1274    transform_ast::transform(scx, &mut expr)?;
1275
1276    let ecx = &ExprContext {
1277        qcx: &qcx,
1278        name: "CHECK",
1279        scope: &scope,
1280        relation_type: desc.typ(),
1281        allow_aggregates: false,
1282        allow_subqueries: false,
1283        allow_parameters: false,
1284        allow_windows: false,
1285    };
1286    let expr = plan_expr(ecx, &expr)?
1287        .type_as(ecx, &SqlScalarType::Bool)?
1288        .lower_uncorrelated(scx.catalog.system_vars())?;
1289    let validation = WebhookValidation {
1290        expression: expr,
1291        relation_desc: desc,
1292        bodies: body_tuples,
1293        headers: header_tuples,
1294        secrets: validation_secrets,
1295    };
1296    Ok(validation)
1297}
1298
1299pub fn plan_default_expr(
1300    scx: &StatementContext,
1301    expr: &Expr<Aug>,
1302    target_ty: &SqlScalarType,
1303) -> Result<HirScalarExpr, PlanError> {
1304    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1305    let ecx = &ExprContext {
1306        qcx: &qcx,
1307        name: "DEFAULT expression",
1308        scope: &Scope::empty(),
1309        relation_type: &SqlRelationType::empty(),
1310        allow_aggregates: false,
1311        allow_subqueries: false,
1312        allow_parameters: false,
1313        allow_windows: false,
1314    };
1315    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1316    Ok(hir)
1317}
1318
1319pub fn plan_params<'a>(
1320    scx: &'a StatementContext,
1321    params: Vec<Expr<Aug>>,
1322    desc: &StatementDesc,
1323) -> Result<Params, PlanError> {
1324    if params.len() != desc.param_types.len() {
1325        sql_bail!(
1326            "expected {} params, got {}",
1327            desc.param_types.len(),
1328            params.len()
1329        );
1330    }
1331
1332    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1333
1334    let mut datums = Row::default();
1335    let mut packer = datums.packer();
1336    let mut actual_types = Vec::new();
1337    let temp_storage = &RowArena::new();
1338    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1339        transform_ast::transform(scx, &mut expr)?;
1340
1341        let ecx = execute_expr_context(&qcx);
1342        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1343        let actual_ty = ecx.scalar_type(&ex);
1344        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1345            return Err(PlanError::WrongParameterType(
1346                i + 1,
1347                ecx.humanize_sql_scalar_type(expected_ty, false),
1348                ecx.humanize_sql_scalar_type(&actual_ty, false),
1349            ));
1350        }
1351        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1352        let evaled = ex.eval(&[], temp_storage)?;
1353        packer.push(evaled);
1354        actual_types.push(actual_ty);
1355    }
1356    Ok(Params {
1357        datums,
1358        execute_types: actual_types,
1359        expected_types: desc.param_types.clone(),
1360    })
1361}
1362
1363static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1364static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1365
1366/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1367pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1368    ExprContext {
1369        qcx,
1370        name: "EXECUTE",
1371        scope: &EXECUTE_CONTEXT_SCOPE,
1372        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1373        allow_aggregates: false,
1374        allow_subqueries: false,
1375        allow_parameters: false,
1376        allow_windows: false,
1377    }
1378}
1379
1380/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1381///
1382/// This is an assignment cast also in Postgres, see
1383/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1384pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1385    LazyLock::new(|| CastContext::Assignment);
1386
1387pub fn plan_index_exprs<'a>(
1388    scx: &'a StatementContext,
1389    on_desc: &RelationDesc,
1390    exprs: Vec<Expr<Aug>>,
1391) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1392    let scope = Scope::from_source(None, on_desc.iter_names());
1393    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1394
1395    let ecx = &ExprContext {
1396        qcx: &qcx,
1397        name: "CREATE INDEX",
1398        scope: &scope,
1399        relation_type: on_desc.typ(),
1400        allow_aggregates: false,
1401        allow_subqueries: false,
1402        allow_parameters: false,
1403        allow_windows: false,
1404    };
1405    let repr_col_types: Vec<ReprColumnType> = on_desc
1406        .typ()
1407        .column_types
1408        .iter()
1409        .map(ReprColumnType::from)
1410        .collect();
1411    let mut out = vec![];
1412    for mut expr in exprs {
1413        transform_ast::transform(scx, &mut expr)?;
1414        let expr = plan_expr_or_col_index(ecx, &expr)?;
1415        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1416        expr.reduce(&repr_col_types);
1417        out.push(expr);
1418    }
1419    Ok(out)
1420}
1421
1422fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1423    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1424        Some(column) => Ok(HirScalarExpr::column(column)),
1425        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1426    }
1427}
1428
1429fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1430    match e {
1431        Expr::Value(Value::Number(n)) => {
1432            let n = n.parse::<usize>().map_err(|e| {
1433                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1434            })?;
1435            if n < 1 || n > max {
1436                sql_bail!(
1437                    "column reference {} in {} is out of range (1 - {})",
1438                    n,
1439                    name,
1440                    max
1441                );
1442            }
1443            Ok(Some(n - 1))
1444        }
1445        _ => Ok(None),
1446    }
1447}
1448
1449struct PlannedQuery {
1450    expr: HirRelationExpr,
1451    scope: Scope,
1452    order_by: Vec<ColumnOrder>,
1453    limit: Option<HirScalarExpr>,
1454    /// `offset` is either
1455    /// - an Int64 literal
1456    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1457    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1458    ///   later.)
1459    offset: HirScalarExpr,
1460    project: Vec<usize>,
1461    group_size_hints: GroupSizeHints,
1462}
1463
1464fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1465    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1466}
1467
1468fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1469    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1470    // for the identifiers, so that they can be re-installed before returning.
1471    let cte_bindings = plan_ctes(qcx, q)?;
1472
1473    let limit = match &q.limit {
1474        None => None,
1475        Some(Limit {
1476            quantity,
1477            with_ties: false,
1478        }) => {
1479            let ecx = &ExprContext {
1480                qcx,
1481                name: "LIMIT",
1482                scope: &Scope::empty(),
1483                relation_type: &SqlRelationType::empty(),
1484                allow_aggregates: false,
1485                allow_subqueries: true,
1486                allow_parameters: true,
1487                allow_windows: false,
1488            };
1489            let limit = plan_expr(ecx, quantity)?;
1490            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1491
1492            let limit = if limit.is_constant() {
1493                let arena = RowArena::new();
1494                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1495
1496                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1497                // that the error happened in a LIMIT clause, so that we have better error msg for
1498                // something like `SELECT 5 LIMIT 'aaa'`.
1499                match limit.eval(&[], &arena)? {
1500                    d @ Datum::Int64(v) if v >= 0 => {
1501                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1502                    }
1503                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1504                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1505                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1506                }
1507            } else {
1508                // Gate non-constant LIMIT expressions behind a feature flag
1509                qcx.scx
1510                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1511                limit
1512            };
1513
1514            Some(limit)
1515        }
1516        Some(Limit {
1517            quantity: _,
1518            with_ties: true,
1519        }) => bail_unsupported!("FETCH ... WITH TIES"),
1520    };
1521
1522    let offset = match &q.offset {
1523        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1524        Some(offset) => {
1525            let ecx = &ExprContext {
1526                qcx,
1527                name: "OFFSET",
1528                scope: &Scope::empty(),
1529                relation_type: &SqlRelationType::empty(),
1530                allow_aggregates: false,
1531                allow_subqueries: false,
1532                allow_parameters: true,
1533                allow_windows: false,
1534            };
1535            let offset = plan_expr(ecx, offset)?;
1536            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1537
1538            let offset = if offset.is_constant() {
1539                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1540                let offset_value = offset_into_value(offset)?;
1541                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1542            } else {
1543                // The only case when this is allowed to not be a constant is if it contains
1544                // parameters. (In which case, we'll later check that it's a constant after
1545                // parameter binding.)
1546                if !offset.contains_parameters() {
1547                    return Err(PlanError::InvalidOffset(format!(
1548                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1549                        offset
1550                    )));
1551                }
1552                offset
1553            };
1554            offset
1555        }
1556    };
1557
1558    let mut planned_query = match &q.body {
1559        SetExpr::Select(s) => {
1560            // Extract query options.
1561            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1562            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1563
1564            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1565            PlannedQuery {
1566                expr: plan.expr,
1567                scope: plan.scope,
1568                order_by: plan.order_by,
1569                project: plan.project,
1570                limit,
1571                offset,
1572                group_size_hints,
1573            }
1574        }
1575        _ => {
1576            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1577            let ecx = &ExprContext {
1578                qcx,
1579                name: "ORDER BY clause of a set expression",
1580                scope: &scope,
1581                relation_type: &qcx.relation_type(&expr),
1582                allow_aggregates: false,
1583                allow_subqueries: true,
1584                allow_parameters: true,
1585                allow_windows: false,
1586            };
1587            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1588            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1589            let project = (0..ecx.relation_type.arity()).collect();
1590            PlannedQuery {
1591                expr: expr.map(map_exprs),
1592                scope,
1593                order_by,
1594                limit,
1595                project,
1596                offset,
1597                group_size_hints: GroupSizeHints::default(),
1598            }
1599        }
1600    };
1601
1602    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1603    match &q.ctes {
1604        CteBlock::Simple(_) => {
1605            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1606                if let Some(cte) = qcx.ctes.remove(&id) {
1607                    planned_query.expr = HirRelationExpr::Let {
1608                        name: cte.name,
1609                        id: id.clone(),
1610                        value: Box::new(value),
1611                        body: Box::new(planned_query.expr),
1612                    };
1613                }
1614                if let Some(shadowed_val) = shadowed_val {
1615                    qcx.ctes.insert(id, shadowed_val);
1616                }
1617            }
1618        }
1619        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1620            let MutRecBlockOptionExtracted {
1621                recursion_limit,
1622                return_at_recursion_limit,
1623                error_at_recursion_limit,
1624                seen: _,
1625            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1626            let limit = match (
1627                recursion_limit,
1628                return_at_recursion_limit,
1629                error_at_recursion_limit,
1630            ) {
1631                (None, None, None) => None,
1632                (Some(max_iters), None, None) => {
1633                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1634                }
1635                (None, Some(max_iters), None) => Some((max_iters, true)),
1636                (None, None, Some(max_iters)) => Some((max_iters, false)),
1637                _ => {
1638                    return Err(InvalidWmrRecursionLimit(
1639                        "More than one recursion limit given. \
1640                         Please give at most one of RECURSION LIMIT, \
1641                         ERROR AT RECURSION LIMIT, \
1642                         RETURN AT RECURSION LIMIT."
1643                            .to_owned(),
1644                    ));
1645                }
1646            }
1647            .try_map(|(max_iters, return_at_limit)| {
1648                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1649                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1650                        "Recursion limit has to be greater than 0.".to_owned(),
1651                    ))?,
1652                    return_at_limit: *return_at_limit,
1653                })
1654            })?;
1655
1656            let mut bindings = Vec::new();
1657            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1658                if let Some(cte) = qcx.ctes.remove(&id) {
1659                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1660                }
1661                if let Some(shadowed_val) = shadowed_val {
1662                    qcx.ctes.insert(id, shadowed_val);
1663                }
1664            }
1665            if !bindings.is_empty() {
1666                planned_query.expr = HirRelationExpr::LetRec {
1667                    limit,
1668                    bindings,
1669                    body: Box::new(planned_query.expr),
1670                }
1671            }
1672        }
1673    }
1674
1675    Ok(planned_query)
1676}
1677
1678/// Converts an OFFSET expression into a value.
1679pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1680    let offset = offset
1681        .try_into_literal_int64()
1682        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1683    if offset < 0 {
1684        return Err(PlanError::InvalidOffset(format!(
1685            "must not be negative, got {}",
1686            offset
1687        )));
1688    }
1689    Ok(offset)
1690}
1691
1692generate_extracted_config!(
1693    MutRecBlockOption,
1694    (RecursionLimit, u64),
1695    (ReturnAtRecursionLimit, u64),
1696    (ErrorAtRecursionLimit, u64)
1697);
1698
1699/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1700///
1701/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1702/// shadowed value that can be reinstalled once the planning has completed.
1703pub fn plan_ctes(
1704    qcx: &mut QueryContext,
1705    q: &Query<Aug>,
1706) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1707    // Accumulate planned expressions and shadowed descriptions.
1708    let mut result = Vec::new();
1709    // Retain the old descriptions of CTE bindings so that we can restore them
1710    // after we're done planning this SELECT.
1711    let mut shadowed_descs = BTreeMap::new();
1712
1713    // A reused identifier indicates a reused name.
1714    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1715        sql_bail!(
1716            "WITH query name {} specified more than once",
1717            normalize::ident_ref(ident).quoted()
1718        )
1719    }
1720
1721    match &q.ctes {
1722        CteBlock::Simple(ctes) => {
1723            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1724            for cte in ctes.iter() {
1725                let cte_name = normalize::ident(cte.alias.name.clone());
1726                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1727                let typ = qcx.relation_type(&val);
1728                let mut desc = RelationDesc::new(typ, scope.column_names());
1729                plan_utils::maybe_rename_columns(
1730                    format!("CTE {}", cte.alias.name),
1731                    &mut desc,
1732                    &cte.alias.columns,
1733                )?;
1734                // Capture the prior value if it exists, so that it can be re-installed.
1735                let shadowed = qcx.ctes.insert(
1736                    cte.id,
1737                    CteDesc {
1738                        name: cte_name,
1739                        desc,
1740                    },
1741                );
1742
1743                result.push((cte.id, val, shadowed));
1744            }
1745        }
1746        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1747            // Insert column types into `qcx.ctes` first for recursive bindings.
1748            for cte in ctes.iter() {
1749                let cte_name = normalize::ident(cte.name.clone());
1750                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1751                for column in cte.columns.iter() {
1752                    desc_columns.push((
1753                        normalize::column_name(column.name.clone()),
1754                        SqlColumnType {
1755                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1756                            nullable: true,
1757                        },
1758                    ));
1759                }
1760                let desc = RelationDesc::from_names_and_types(desc_columns);
1761                let shadowed = qcx.ctes.insert(
1762                    cte.id,
1763                    CteDesc {
1764                        name: cte_name,
1765                        desc,
1766                    },
1767                );
1768                // Capture the prior value if it exists, so that it can be re-installed.
1769                if let Some(shadowed) = shadowed {
1770                    shadowed_descs.insert(cte.id, shadowed);
1771                }
1772            }
1773
1774            // Plan all CTEs and validate the proposed types.
1775            for cte in ctes.iter() {
1776                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1777
1778                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1779
1780                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1781                    // Once WMR CTEs support NOT NULL constraints, check that
1782                    // nullability of derived column types are compatible.
1783                    sql_bail!(
1784                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1785                    );
1786                }
1787
1788                if !proposed_typ.keys.is_empty() {
1789                    // Once WMR CTEs support keys, check that keys exactly
1790                    // overlap.
1791                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1792                }
1793
1794                // Validate that the derived and proposed types are the same.
1795                let derived_typ = qcx.relation_type(&val);
1796
1797                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1798                    let cte_name = normalize::ident(cte.name.clone());
1799                    let proposed_typ = proposed_typ
1800                        .column_types
1801                        .iter()
1802                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1803                        .collect::<Vec<_>>();
1804                    let inferred_typ = derived_typ
1805                        .column_types
1806                        .iter()
1807                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1808                        .collect::<Vec<_>>();
1809                    Err(PlanError::RecursiveTypeMismatch(
1810                        cte_name,
1811                        proposed_typ,
1812                        inferred_typ,
1813                    ))
1814                };
1815
1816                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1817                    return type_err(proposed_typ, derived_typ);
1818                }
1819
1820                // Cast derived types to proposed types or error.
1821                let val = match cast_relation(
1822                    qcx,
1823                    // Choose `CastContext::Assignment`` because the user has
1824                    // been explicit about the types they expect. Choosing
1825                    // `CastContext::Implicit` is not "strong" enough to impose
1826                    // typmods from proposed types onto values.
1827                    CastContext::Assignment,
1828                    val,
1829                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1830                ) {
1831                    Ok(val) => val,
1832                    Err(_) => return type_err(proposed_typ, derived_typ),
1833                };
1834
1835                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1836            }
1837        }
1838    }
1839
1840    Ok(result)
1841}
1842
1843pub fn plan_nested_query(
1844    qcx: &mut QueryContext,
1845    q: &Query<Aug>,
1846) -> Result<(HirRelationExpr, Scope), PlanError> {
1847    let PlannedQuery {
1848        mut expr,
1849        scope,
1850        order_by,
1851        limit,
1852        offset,
1853        project,
1854        group_size_hints,
1855    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1856    if limit.is_some()
1857        || !offset
1858            .clone()
1859            .try_into_literal_int64()
1860            .is_ok_and(|offset| offset == 0)
1861    {
1862        expr = HirRelationExpr::top_k(
1863            expr,
1864            vec![],
1865            order_by,
1866            limit,
1867            offset,
1868            group_size_hints.limit_input_group_size,
1869        );
1870    }
1871    Ok((expr.project(project), scope))
1872}
1873
1874fn plan_set_expr(
1875    qcx: &mut QueryContext,
1876    q: &SetExpr<Aug>,
1877) -> Result<(HirRelationExpr, Scope), PlanError> {
1878    match q {
1879        SetExpr::Select(select) => {
1880            let order_by_exprs = Vec::new();
1881            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1882            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1883            // should not have planned any ordering.
1884            assert!(plan.order_by.is_empty());
1885            Ok((plan.expr.project(plan.project), plan.scope))
1886        }
1887        SetExpr::SetOperation {
1888            op,
1889            all,
1890            left,
1891            right,
1892        } => {
1893            // Plan the LHS and RHS.
1894            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1895            let (right_expr, right_scope) =
1896                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1897
1898            // Validate that the LHS and RHS are the same width.
1899            let left_type = qcx.relation_type(&left_expr);
1900            let right_type = qcx.relation_type(&right_expr);
1901            if left_type.arity() != right_type.arity() {
1902                sql_bail!(
1903                    "each {} query must have the same number of columns: {} vs {}",
1904                    op,
1905                    left_type.arity(),
1906                    right_type.arity(),
1907                );
1908            }
1909
1910            // Match the types of the corresponding columns on the LHS and RHS
1911            // using the normal type coercion rules. This is equivalent to
1912            // `coerce_homogeneous_exprs`, but implemented in terms of
1913            // `HirRelationExpr` rather than `HirScalarExpr`.
1914            let left_ecx = &ExprContext {
1915                qcx,
1916                name: &op.to_string(),
1917                scope: &left_scope,
1918                relation_type: &left_type,
1919                allow_aggregates: false,
1920                allow_subqueries: false,
1921                allow_parameters: false,
1922                allow_windows: false,
1923            };
1924            let right_ecx = &ExprContext {
1925                qcx,
1926                name: &op.to_string(),
1927                scope: &right_scope,
1928                relation_type: &right_type,
1929                allow_aggregates: false,
1930                allow_subqueries: false,
1931                allow_parameters: false,
1932                allow_windows: false,
1933            };
1934            let mut left_casts = vec![];
1935            let mut right_casts = vec![];
1936            for (i, (left_type, right_type)) in left_type
1937                .column_types
1938                .iter()
1939                .zip_eq(right_type.column_types.iter())
1940                .enumerate()
1941            {
1942                let types = &[
1943                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1944                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1945                ];
1946                let target =
1947                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1948                match typeconv::plan_cast(
1949                    left_ecx,
1950                    CastContext::Implicit,
1951                    HirScalarExpr::column(i),
1952                    &target,
1953                ) {
1954                    Ok(expr) => left_casts.push(expr),
1955                    Err(_) => sql_bail!(
1956                        "{} types {} and {} cannot be matched",
1957                        op,
1958                        qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1959                        qcx.humanize_sql_scalar_type(&target, false),
1960                    ),
1961                }
1962                match typeconv::plan_cast(
1963                    right_ecx,
1964                    CastContext::Implicit,
1965                    HirScalarExpr::column(i),
1966                    &target,
1967                ) {
1968                    Ok(expr) => right_casts.push(expr),
1969                    Err(_) => sql_bail!(
1970                        "{} types {} and {} cannot be matched",
1971                        op,
1972                        qcx.humanize_sql_scalar_type(&target, false),
1973                        qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1974                    ),
1975                }
1976            }
1977            let lhs = if left_casts
1978                .iter()
1979                .enumerate()
1980                .any(|(i, e)| e != &HirScalarExpr::column(i))
1981            {
1982                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1983                left_expr.map(left_casts).project(project_key)
1984            } else {
1985                left_expr
1986            };
1987            let rhs = if right_casts
1988                .iter()
1989                .enumerate()
1990                .any(|(i, e)| e != &HirScalarExpr::column(i))
1991            {
1992                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1993                right_expr.map(right_casts).project(project_key)
1994            } else {
1995                right_expr
1996            };
1997
1998            let relation_expr = match op {
1999                SetOperator::Union => {
2000                    if *all {
2001                        lhs.union(rhs)
2002                    } else {
2003                        lhs.union(rhs).distinct()
2004                    }
2005                }
2006                SetOperator::Except => Hir::except(all, lhs, rhs),
2007                SetOperator::Intersect => {
2008                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
2009                    // Though we believe that render() does The Right Thing (TM)
2010                    // Also note that we do *not* need another threshold() at the end of the method chain
2011                    // because the right-hand side of the outer union only produces existing records,
2012                    // i.e., the record counts for differential data flow definitely remain non-negative.
2013                    let left_clone = lhs.clone();
2014                    if *all {
2015                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2016                    } else {
2017                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2018                            .distinct()
2019                    }
2020                }
2021            };
2022            let scope = Scope::from_source(
2023                None,
2024                // Column names are taken from the left, as in Postgres.
2025                left_scope.column_names(),
2026            );
2027
2028            Ok((relation_expr, scope))
2029        }
2030        SetExpr::Values(Values(values)) => plan_values(qcx, values),
2031        SetExpr::Table(name) => {
2032            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2033            Ok((expr, scope))
2034        }
2035        SetExpr::Query(query) => {
2036            let (expr, scope) = plan_nested_query(qcx, query)?;
2037            Ok((expr, scope))
2038        }
2039        SetExpr::Show(stmt) => {
2040            // The create SQL definition of involving this query, will have the explicit `SHOW`
2041            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2042            // current schema of the executing user. When Materialize restarts and tries to re-plan
2043            // these queries, it will only have access to the raw `SHOW` command and have no idea
2044            // what schema to use. As a result Materialize will fail to boot.
2045            //
2046            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2047            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2048            // However, banning show commands in views gives us more flexibility to change their
2049            // output.
2050            //
2051            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2052            // with all show commands expanded into their equivalent SELECT statements.
2053            if !qcx.lifetime.allow_show() {
2054                return Err(PlanError::ShowCommandInView);
2055            }
2056
2057            // Some SHOW statements are a SELECT query. Others produces Rows
2058            // directly. Convert both of these to the needed Hir and Scope.
2059            fn to_hirscope(
2060                plan: ShowCreatePlan,
2061                desc: StatementDesc,
2062            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2063                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2064                let desc = desc.relation_desc.expect("must exist");
2065                let scope = Scope::from_source(None, desc.iter_names());
2066                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2067                Ok((expr, scope))
2068            }
2069
2070            match stmt.clone() {
2071                ShowStatement::ShowColumns(stmt) => {
2072                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2073                }
2074                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2075                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2076                    show::describe_show_create_connection(qcx.scx, stmt)?,
2077                ),
2078                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2079                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2080                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2081                ),
2082                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2083                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2084                    show::describe_show_create_index(qcx.scx, stmt)?,
2085                ),
2086                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2087                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2088                    show::describe_show_create_sink(qcx.scx, stmt)?,
2089                ),
2090                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2091                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2092                    show::describe_show_create_source(qcx.scx, stmt)?,
2093                ),
2094                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2095                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2096                    show::describe_show_create_table(qcx.scx, stmt)?,
2097                ),
2098                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2099                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2100                    show::describe_show_create_view(qcx.scx, stmt)?,
2101                ),
2102                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2103                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2104                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2105                ),
2106                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2107                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2108                    show::describe_show_create_type(qcx.scx, stmt)?,
2109                ),
2110                ShowStatement::ShowObjects(stmt) => {
2111                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2112                }
2113                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2114                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2115            }
2116        }
2117    }
2118}
2119
2120/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2121fn plan_values(
2122    qcx: &QueryContext,
2123    values: &[Vec<Expr<Aug>>],
2124) -> Result<(HirRelationExpr, Scope), PlanError> {
2125    assert!(!values.is_empty());
2126
2127    let ecx = &ExprContext {
2128        qcx,
2129        name: "VALUES",
2130        scope: &Scope::empty(),
2131        relation_type: &SqlRelationType::empty(),
2132        allow_aggregates: false,
2133        allow_subqueries: true,
2134        allow_parameters: true,
2135        allow_windows: false,
2136    };
2137
2138    let ncols = values[0].len();
2139    let nrows = values.len();
2140
2141    // Arrange input expressions by columns, not rows, so that we can
2142    // call `coerce_homogeneous_exprs` on each column.
2143    let mut cols = vec![vec![]; ncols];
2144    for row in values {
2145        if row.len() != ncols {
2146            sql_bail!(
2147                "VALUES expression has varying number of columns: {} vs {}",
2148                row.len(),
2149                ncols
2150            );
2151        }
2152        for (i, v) in row.iter().enumerate() {
2153            cols[i].push(v);
2154        }
2155    }
2156
2157    // Plan each column.
2158    let mut col_iters = Vec::with_capacity(ncols);
2159    let mut col_types = Vec::with_capacity(ncols);
2160    for col in &cols {
2161        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2162        let mut col_type = ecx.column_type(&col[0]);
2163        for val in &col[1..] {
2164            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2165        }
2166        col_types.push(col_type);
2167        col_iters.push(col.into_iter());
2168    }
2169
2170    // Build constant relation.
2171    let mut exprs = vec![];
2172    for _ in 0..nrows {
2173        for i in 0..ncols {
2174            exprs.push(col_iters[i].next().unwrap());
2175        }
2176    }
2177    let out = HirRelationExpr::CallTable {
2178        func: TableFunc::Wrap {
2179            width: ncols,
2180            types: col_types,
2181        },
2182        exprs,
2183    };
2184
2185    // Build column names.
2186    let mut scope = Scope::empty();
2187    for i in 0..ncols {
2188        let name = format!("column{}", i + 1);
2189        scope.items.push(ScopeItem::from_column_name(name));
2190    }
2191
2192    Ok((out, scope))
2193}
2194
2195/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2196/// statement.
2197///
2198/// This is special-cased in PostgreSQL and different enough from `plan_values`
2199/// that it is easier to use a separate function entirely. Unlike a normal
2200/// `VALUES` clause, each value is coerced to the type of the target table
2201/// via an assignment cast.
2202///
2203/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2204fn plan_values_insert(
2205    qcx: &QueryContext,
2206    target_names: &[&ColumnName],
2207    target_types: &[&SqlScalarType],
2208    values: &[Vec<Expr<Aug>>],
2209) -> Result<HirRelationExpr, PlanError> {
2210    assert!(!values.is_empty());
2211
2212    if !values.iter().map(|row| row.len()).all_equal() {
2213        sql_bail!("VALUES lists must all be the same length");
2214    }
2215
2216    let ecx = &ExprContext {
2217        qcx,
2218        name: "VALUES",
2219        scope: &Scope::empty(),
2220        relation_type: &SqlRelationType::empty(),
2221        allow_aggregates: false,
2222        allow_subqueries: true,
2223        allow_parameters: true,
2224        allow_windows: false,
2225    };
2226
2227    let mut exprs = vec![];
2228    let mut types = vec![];
2229    for row in values {
2230        if row.len() > target_names.len() {
2231            sql_bail!("INSERT has more expressions than target columns");
2232        }
2233        for (column, val) in row.into_iter().enumerate() {
2234            let target_type = &target_types[column];
2235            let val = plan_expr(ecx, val)?;
2236            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2237            let source_type = &ecx.scalar_type(&val);
2238            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2239                Ok(val) => val,
2240                Err(_) => sql_bail!(
2241                    "column {} is of type {} but expression is of type {}",
2242                    target_names[column].quoted(),
2243                    qcx.humanize_sql_scalar_type(target_type, false),
2244                    qcx.humanize_sql_scalar_type(source_type, false),
2245                ),
2246            };
2247            if column >= types.len() {
2248                types.push(ecx.column_type(&val));
2249            } else {
2250                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2251            }
2252            exprs.push(val);
2253        }
2254    }
2255
2256    Ok(HirRelationExpr::CallTable {
2257        func: TableFunc::Wrap {
2258            width: values[0].len(),
2259            types,
2260        },
2261        exprs,
2262    })
2263}
2264
2265fn plan_join_identity() -> (HirRelationExpr, Scope) {
2266    let typ = SqlRelationType::new(vec![]);
2267    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2268    let scope = Scope::empty();
2269    (expr, scope)
2270}
2271
2272/// Describes how to execute a SELECT query.
2273///
2274/// `order_by` describes how to order the rows in `expr` *before* applying the
2275/// projection. The `scope` describes the columns in `expr` *after* the
2276/// projection has been applied.
2277#[derive(Debug)]
2278struct SelectPlan {
2279    expr: HirRelationExpr,
2280    scope: Scope,
2281    order_by: Vec<ColumnOrder>,
2282    project: Vec<usize>,
2283}
2284
2285generate_extracted_config!(
2286    SelectOption,
2287    (ExpectedGroupSize, u64),
2288    (AggregateInputGroupSize, u64),
2289    (DistinctOnInputGroupSize, u64),
2290    (LimitInputGroupSize, u64)
2291);
2292
2293/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2294///
2295/// Normally, the ORDER BY clause occurs after the columns specified in the
2296/// SELECT list have been projected. In a query like
2297///
2298///   CREATE TABLE (a int, b int)
2299///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2300///
2301/// it is valid to refer to `a`, because it is explicitly selected, but it would
2302/// not be valid to refer to unselected column `b`.
2303///
2304/// But PostgreSQL extends the standard to permit queries like
2305///
2306///   SELECT a FROM t ORDER BY b
2307///
2308/// where expressions in the ORDER BY clause can refer to *both* input columns
2309/// and output columns.
2310fn plan_select_from_where(
2311    qcx: &QueryContext,
2312    mut s: Select<Aug>,
2313    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2314) -> Result<SelectPlan, PlanError> {
2315    // TODO: Both `s` and `order_by_exprs` are not references because the
2316    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2317    // table function support (the UUID mapping). Attempt to change this so callers
2318    // don't need to clone the Select.
2319
2320    // Extract query options.
2321    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2322    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2323
2324    // Step 1. Handle FROM clause, including joins.
2325    let (mut relation_expr, mut from_scope) =
2326        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2327            let (left, left_scope) = l;
2328            plan_join(
2329                qcx,
2330                left,
2331                left_scope,
2332                &Join {
2333                    relation: TableFactor::NestedJoin {
2334                        join: Box::new(twj.clone()),
2335                        alias: None,
2336                    },
2337                    join_operator: JoinOperator::CrossJoin,
2338                },
2339            )
2340        })?;
2341
2342    // Step 2. Handle WHERE clause.
2343    if let Some(selection) = &s.selection {
2344        let ecx = &ExprContext {
2345            qcx,
2346            name: "WHERE clause",
2347            scope: &from_scope,
2348            relation_type: &qcx.relation_type(&relation_expr),
2349            allow_aggregates: false,
2350            allow_subqueries: true,
2351            allow_parameters: true,
2352            allow_windows: false,
2353        };
2354        let expr = plan_expr(ecx, selection)
2355            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2356            .type_as(ecx, &SqlScalarType::Bool)?;
2357        relation_expr = relation_expr.filter(vec![expr]);
2358    }
2359
2360    // Step 3. Gather aggregates and table functions.
2361    // (But skip window aggregates.)
2362    let (aggregates, table_funcs) = {
2363        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2364        visitor.visit_select_mut(&mut s);
2365        for o in order_by_exprs.iter_mut() {
2366            visitor.visit_order_by_expr_mut(o);
2367        }
2368        visitor.into_result()?
2369    };
2370    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2371    if !table_funcs.is_empty() {
2372        let (expr, scope) = plan_scalar_table_funcs(
2373            qcx,
2374            table_funcs,
2375            &mut table_func_names,
2376            &relation_expr,
2377            &from_scope,
2378        )?;
2379        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2380        from_scope = from_scope.product(scope)?;
2381    }
2382
2383    // Step 4. Expand SELECT clause.
2384    let projection = {
2385        let ecx = &ExprContext {
2386            qcx,
2387            name: "SELECT clause",
2388            scope: &from_scope,
2389            relation_type: &qcx.relation_type(&relation_expr),
2390            allow_aggregates: true,
2391            allow_subqueries: true,
2392            allow_parameters: true,
2393            allow_windows: true,
2394        };
2395        let mut out = vec![];
2396        for si in &s.projection {
2397            if *si == SelectItem::Wildcard && s.from.is_empty() {
2398                sql_bail!("SELECT * with no tables specified is not valid");
2399            }
2400            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2401        }
2402        out
2403    };
2404
2405    // Step 5. Handle GROUP BY clause.
2406    // This will also plan the aggregates gathered in Step 3.
2407    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2408    let (mut group_scope, select_all_mapping) = {
2409        // Compute GROUP BY expressions.
2410        let ecx = &ExprContext {
2411            qcx,
2412            name: "GROUP BY clause",
2413            scope: &from_scope,
2414            relation_type: &qcx.relation_type(&relation_expr),
2415            allow_aggregates: false,
2416            allow_subqueries: true,
2417            allow_parameters: true,
2418            allow_windows: false,
2419        };
2420        let mut group_key = vec![];
2421        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2422        let mut group_hir_exprs = vec![];
2423        let mut group_scope = Scope::empty();
2424        let mut select_all_mapping = BTreeMap::new();
2425
2426        for group_expr in &s.group_by {
2427            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2428            let new_column = group_key.len();
2429
2430            if let Some(group_expr) = group_expr {
2431                // Multiple AST expressions can map to the same HIR expression.
2432                // If we already have a ScopeItem for this HIR, we can add this
2433                // next AST expression to its set
2434                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2435                    existing_scope_item.exprs.insert(group_expr.clone());
2436                    continue;
2437                }
2438            }
2439
2440            let mut scope_item = if let HirScalarExpr::Column(
2441                ColumnRef {
2442                    level: 0,
2443                    column: old_column,
2444                },
2445                _name,
2446            ) = &expr
2447            {
2448                // If we later have `SELECT foo.*` then we have to find all
2449                // the `foo` items in `from_scope` and figure out where they
2450                // ended up in `group_scope`. This is really hard to do
2451                // right using SQL name resolution, so instead we just track
2452                // the movement here.
2453                select_all_mapping.insert(*old_column, new_column);
2454                let scope_item = ecx.scope.items[*old_column].clone();
2455                scope_item
2456            } else {
2457                ScopeItem::empty()
2458            };
2459
2460            if let Some(group_expr) = group_expr.cloned() {
2461                scope_item.exprs.insert(group_expr);
2462            }
2463
2464            group_key.push(from_scope.len() + group_exprs.len());
2465            group_hir_exprs.push(expr.clone());
2466            group_exprs.insert(expr, scope_item);
2467        }
2468
2469        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2470        for expr in &group_hir_exprs {
2471            if let Some(scope_item) = group_exprs.remove(expr) {
2472                group_scope.items.push(scope_item);
2473            }
2474        }
2475
2476        // Plan aggregates.
2477        let ecx = &ExprContext {
2478            qcx,
2479            name: "aggregate function",
2480            scope: &from_scope,
2481            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2482            allow_aggregates: false,
2483            allow_subqueries: true,
2484            allow_parameters: true,
2485            allow_windows: false,
2486        };
2487        let mut agg_exprs = vec![];
2488        for sql_function in aggregates {
2489            if sql_function.over.is_some() {
2490                unreachable!(
2491                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2492                );
2493            }
2494            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2495            group_scope
2496                .items
2497                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2498        }
2499        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2500            // apply GROUP BY / aggregates
2501            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2502                group_key,
2503                agg_exprs,
2504                group_size_hints.aggregate_input_group_size,
2505            );
2506
2507            // For every old column that wasn't a group key, add a scope item
2508            // that errors when referenced. We can't simply drop these items
2509            // from scope. These items need to *exist* because they might shadow
2510            // variables in outer scopes that would otherwise be valid to
2511            // reference, but accessing them needs to produce an error.
2512            for i in 0..from_scope.len() {
2513                if !select_all_mapping.contains_key(&i) {
2514                    let scope_item = &ecx.scope.items[i];
2515                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2516                        table_name: scope_item.table_name.clone(),
2517                        column_name: scope_item.column_name.clone(),
2518                        allow_unqualified_references: scope_item.allow_unqualified_references,
2519                    });
2520                }
2521            }
2522
2523            (group_scope, select_all_mapping)
2524        } else {
2525            // if no GROUP BY, aggregates or having then all columns remain in scope
2526            (
2527                from_scope.clone(),
2528                (0..from_scope.len()).map(|i| (i, i)).collect(),
2529            )
2530        }
2531    };
2532
2533    // Step 6. Handle HAVING clause.
2534    if let Some(ref having) = s.having {
2535        let ecx = &ExprContext {
2536            qcx,
2537            name: "HAVING clause",
2538            scope: &group_scope,
2539            relation_type: &qcx.relation_type(&relation_expr),
2540            allow_aggregates: true,
2541            allow_subqueries: true,
2542            allow_parameters: true,
2543            allow_windows: false,
2544        };
2545        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2546        relation_expr = relation_expr.filter(vec![expr]);
2547    }
2548
2549    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2550    // (This includes window aggregations.)
2551    //
2552    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2553    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2554    //
2555    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2556    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2557    // and can't be part of a bigger expression.
2558    // See https://www.postgresql.org/docs/current/queries-order.html:
2559    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2560    // expression"
2561    let window_funcs = {
2562        let mut visitor = WindowFuncCollector::default();
2563        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2564        // window functions are excluded from other things by `allow_windows` being false when
2565        // planning those before this code).
2566        visitor.visit_select(&s);
2567        for o in order_by_exprs.iter() {
2568            visitor.visit_order_by_expr(o);
2569        }
2570        visitor.into_result()
2571    };
2572    for window_func in window_funcs {
2573        let ecx = &ExprContext {
2574            qcx,
2575            name: "window function",
2576            scope: &group_scope,
2577            relation_type: &qcx.relation_type(&relation_expr),
2578            allow_aggregates: true,
2579            allow_subqueries: true,
2580            allow_parameters: true,
2581            allow_windows: true,
2582        };
2583        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2584        group_scope.items.push(ScopeItem::from_expr(window_func));
2585    }
2586    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2587    // been already planned now. However, we should still set `allow_windows: true` for the
2588    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2589    // msg if an OVER clause is missing from a window function.
2590
2591    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2592    if let Some(ref qualify) = s.qualify {
2593        let ecx = &ExprContext {
2594            qcx,
2595            name: "QUALIFY clause",
2596            scope: &group_scope,
2597            relation_type: &qcx.relation_type(&relation_expr),
2598            allow_aggregates: true,
2599            allow_subqueries: true,
2600            allow_parameters: true,
2601            allow_windows: true,
2602        };
2603        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2604        relation_expr = relation_expr.filter(vec![expr]);
2605    }
2606
2607    // Step 9. Handle SELECT clause.
2608    let output_columns = {
2609        let mut new_exprs = vec![];
2610        let mut new_type = qcx.relation_type(&relation_expr);
2611        let mut output_columns = vec![];
2612        for (select_item, column_name) in &projection {
2613            let ecx = &ExprContext {
2614                qcx,
2615                name: "SELECT clause",
2616                scope: &group_scope,
2617                relation_type: &new_type,
2618                allow_aggregates: true,
2619                allow_subqueries: true,
2620                allow_parameters: true,
2621                allow_windows: true,
2622            };
2623            let expr = match select_item {
2624                ExpandedSelectItem::InputOrdinal(i) => {
2625                    if let Some(column) = select_all_mapping.get(i).copied() {
2626                        HirScalarExpr::column(column)
2627                    } else {
2628                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2629                    }
2630                }
2631                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2632            };
2633            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2634                // Simple column reference; no need to map on a new expression.
2635                output_columns.push((column, column_name));
2636            } else {
2637                // Complicated expression that requires a map expression. We
2638                // update `group_scope` as we go so that future expressions that
2639                // are textually identical to this one can reuse it. This
2640                // duplicate detection is required for proper determination of
2641                // ambiguous column references with SQL92-style `ORDER BY`
2642                // items. See `plan_order_by_or_distinct_expr` for more.
2643                let typ = ecx.column_type(&expr);
2644                new_type.column_types.push(typ);
2645                new_exprs.push(expr);
2646                output_columns.push((group_scope.len(), column_name));
2647                group_scope
2648                    .items
2649                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2650            }
2651        }
2652        relation_expr = relation_expr.map(new_exprs);
2653        output_columns
2654    };
2655    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2656
2657    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2658    let order_by = {
2659        let relation_type = qcx.relation_type(&relation_expr);
2660        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2661            &ExprContext {
2662                qcx,
2663                name: "ORDER BY clause",
2664                scope: &group_scope,
2665                relation_type: &relation_type,
2666                allow_aggregates: true,
2667                allow_subqueries: true,
2668                allow_parameters: true,
2669                allow_windows: true,
2670            },
2671            &order_by_exprs,
2672            &output_columns,
2673        )?;
2674
2675        match s.distinct {
2676            None => relation_expr = relation_expr.map(map_exprs),
2677            Some(Distinct::EntireRow) => {
2678                if relation_type.arity() == 0 {
2679                    sql_bail!("SELECT DISTINCT must have at least one column");
2680                }
2681                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2682                // list, so we can't proceed if `ORDER BY` has introduced any
2683                // columns for arbitrary expressions. This matches PostgreSQL.
2684                if !try_push_projection_order_by(
2685                    &mut relation_expr,
2686                    &mut project_key,
2687                    &mut order_by,
2688                ) {
2689                    sql_bail!(
2690                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2691                    );
2692                }
2693                assert!(map_exprs.is_empty());
2694                relation_expr = relation_expr.distinct();
2695            }
2696            Some(Distinct::On(exprs)) => {
2697                let ecx = &ExprContext {
2698                    qcx,
2699                    name: "DISTINCT ON clause",
2700                    scope: &group_scope,
2701                    relation_type: &qcx.relation_type(&relation_expr),
2702                    allow_aggregates: true,
2703                    allow_subqueries: true,
2704                    allow_parameters: true,
2705                    allow_windows: true,
2706                };
2707
2708                let mut distinct_exprs = vec![];
2709                for expr in &exprs {
2710                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2711                    distinct_exprs.push(expr);
2712                }
2713
2714                let mut distinct_key = vec![];
2715
2716                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2717                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2718                // expressions, though the order of `DISTINCT ON` expressions
2719                // does not matter. This matches PostgreSQL and leaves the door
2720                // open to a future optimization where the `DISTINCT ON` and
2721                // `ORDER BY` operations happen in one pass.
2722                //
2723                // On the bright side, any columns that have already been
2724                // computed by `ORDER BY` can be reused in the distinct key.
2725                let arity = relation_type.arity();
2726                for ord in order_by.iter().take(distinct_exprs.len()) {
2727                    // The unusual construction of `expr` here is to ensure the
2728                    // temporary column expression lives long enough.
2729                    let mut expr = &HirScalarExpr::column(ord.column);
2730                    if ord.column >= arity {
2731                        expr = &map_exprs[ord.column - arity];
2732                    };
2733                    match distinct_exprs.iter().position(move |e| e == expr) {
2734                        None => sql_bail!(
2735                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2736                        ),
2737                        Some(pos) => {
2738                            distinct_exprs.remove(pos);
2739                        }
2740                    }
2741                    distinct_key.push(ord.column);
2742                }
2743
2744                // Add any remaining `DISTINCT ON` expressions to the key.
2745                for expr in distinct_exprs {
2746                    // If the expression is a reference to an existing column,
2747                    // do not introduce a new column to support it.
2748                    let column = match expr {
2749                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2750                        _ => {
2751                            map_exprs.push(expr);
2752                            arity + map_exprs.len() - 1
2753                        }
2754                    };
2755                    distinct_key.push(column);
2756                }
2757
2758                // `DISTINCT ON` is semantically a TopK with limit 1. The
2759                // columns in `ORDER BY` that are not part of the distinct key,
2760                // if there are any, determine the ordering within each group,
2761                // per PostgreSQL semantics.
2762                let distinct_len = distinct_key.len();
2763                relation_expr = HirRelationExpr::top_k(
2764                    relation_expr.map(map_exprs),
2765                    distinct_key,
2766                    order_by.iter().skip(distinct_len).cloned().collect(),
2767                    Some(HirScalarExpr::literal(
2768                        Datum::Int64(1),
2769                        SqlScalarType::Int64,
2770                    )),
2771                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2772                    group_size_hints.distinct_on_input_group_size,
2773                );
2774            }
2775        }
2776
2777        order_by
2778    };
2779
2780    // Construct a clean scope to expose outwards, where all of the state that
2781    // accumulated in the scope during planning of this SELECT is erased. The
2782    // clean scope has at most one name for each column, and the names are not
2783    // associated with any table.
2784    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2785
2786    Ok(SelectPlan {
2787        expr: relation_expr,
2788        scope,
2789        order_by,
2790        project: project_key,
2791    })
2792}
2793
2794fn plan_scalar_table_funcs(
2795    qcx: &QueryContext,
2796    table_funcs: BTreeMap<Function<Aug>, String>,
2797    table_func_names: &mut BTreeMap<String, Ident>,
2798    relation_expr: &HirRelationExpr,
2799    from_scope: &Scope,
2800) -> Result<(HirRelationExpr, Scope), PlanError> {
2801    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2802
2803    for (table_func, id) in table_funcs.iter() {
2804        table_func_names.insert(
2805            id.clone(),
2806            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2807            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2808        );
2809    }
2810    // If there's only a single table function, we can skip generating
2811    // ordinality columns.
2812    if table_funcs.len() == 1 {
2813        let (table_func, id) = table_funcs.iter().next().unwrap();
2814        let (expr, mut scope) =
2815            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2816
2817        // A single table-function might return several columns as a record
2818        let num_cols = scope.len();
2819        for i in 0..scope.len() {
2820            scope.items[i].table_name = Some(PartialItemName {
2821                database: None,
2822                schema: None,
2823                item: id.clone(),
2824            });
2825            scope.items[i].from_single_column_function = num_cols == 1;
2826            scope.items[i].allow_unqualified_references = false;
2827        }
2828        return Ok((expr, scope));
2829    }
2830    if table_funcs.keys().any(is_repeat_row) {
2831        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
2832        // error message would be misleading, because it would refer to WITH ORDINALITY.
2833        bail_unsupported!(format!(
2834            "{} in a SELECT clause with multiple table functions",
2835            REPEAT_ROW_NAME
2836        ));
2837    }
2838    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2839    let (expr, mut scope, num_cols) =
2840        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2841
2842    // Munge the scope so table names match with the generated ids.
2843    let mut i = 0;
2844    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2845        for _ in 0..num_cols {
2846            scope.items[i].table_name = Some(PartialItemName {
2847                database: None,
2848                schema: None,
2849                item: id.clone(),
2850            });
2851            scope.items[i].from_single_column_function = num_cols == 1;
2852            scope.items[i].allow_unqualified_references = false;
2853            i += 1;
2854        }
2855        // Ordinality column. This doubles as the
2856        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2857        // because it only needs to be NULL or not.
2858        scope.items[i].table_name = Some(PartialItemName {
2859            database: None,
2860            schema: None,
2861            item: id.clone(),
2862        });
2863        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2864        scope.items[i].allow_unqualified_references = false;
2865        i += 1;
2866    }
2867    // Coalesced ordinality column.
2868    scope.items[i].allow_unqualified_references = false;
2869    Ok((expr, scope))
2870}
2871
2872/// Plans an expression in a `GROUP BY` clause.
2873///
2874/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2875/// names/expressions defined in the `SELECT` clause. These special cases are
2876/// handled by this function; see comments within the implementation for
2877/// details.
2878fn plan_group_by_expr<'a>(
2879    ecx: &ExprContext,
2880    group_expr: &'a Expr<Aug>,
2881    projection: &'a [(ExpandedSelectItem, ColumnName)],
2882) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2883    let plan_projection = |column: usize| match &projection[column].0 {
2884        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2885        ExpandedSelectItem::Expr(expr) => {
2886            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2887        }
2888    };
2889
2890    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2891    // a special case that means to use the ith item in the SELECT clause.
2892    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2893        return plan_projection(column);
2894    }
2895
2896    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2897    // The `foo` can refer to *either* an input column or an output column. If
2898    // both exist, the input column is preferred.
2899    match group_expr {
2900        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2901            Err(PlanError::UnknownColumn {
2902                table: None,
2903                column,
2904                similar,
2905            }) => {
2906                // The expression was a simple identifier that did not match an
2907                // input column. See if it matches an output column.
2908                let mut iter = projection.iter().map(|(_expr, name)| name);
2909                if let Some(i) = iter.position(|n| *n == column) {
2910                    if iter.any(|n| *n == column) {
2911                        Err(PlanError::AmbiguousColumn(column))
2912                    } else {
2913                        plan_projection(i)
2914                    }
2915                } else {
2916                    // The name didn't match an output column either. Return the
2917                    // "unknown column" error.
2918                    Err(PlanError::UnknownColumn {
2919                        table: None,
2920                        column,
2921                        similar,
2922                    })
2923                }
2924            }
2925            res => Ok((Some(group_expr), res?)),
2926        },
2927        _ => Ok((
2928            Some(group_expr),
2929            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2930        )),
2931    }
2932}
2933
2934/// Plans a slice of `ORDER BY` expressions.
2935///
2936/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2937/// parameter.
2938///
2939/// Returns the determined column orderings and a list of scalar expressions
2940/// that must be mapped onto the underlying relation expression.
2941pub(crate) fn plan_order_by_exprs(
2942    ecx: &ExprContext,
2943    order_by_exprs: &[OrderByExpr<Aug>],
2944    output_columns: &[(usize, &ColumnName)],
2945) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2946    let mut order_by = vec![];
2947    let mut map_exprs = vec![];
2948    for obe in order_by_exprs {
2949        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2950        // If the expression is a reference to an existing column,
2951        // do not introduce a new column to support it.
2952        let column = match expr {
2953            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2954            _ => {
2955                map_exprs.push(expr);
2956                ecx.relation_type.arity() + map_exprs.len() - 1
2957            }
2958        };
2959        order_by.push(resolve_desc_and_nulls_last(obe, column));
2960    }
2961    Ok((order_by, map_exprs))
2962}
2963
2964/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2965///
2966/// The `output_columns` parameter describes, in order, the physical index and
2967/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2968/// corresponds to a `SELECT` list with a single entry named "a" that can be
2969/// found at index 3 in the underlying relation expression.
2970///
2971/// There are three cases to handle.
2972///
2973///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2974///       reference to the specified output column.
2975///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2976///       an output column, if it exists; otherwise it is a reference to an
2977///       input column.
2978///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2979///       arbitrary expressions exclusively refer to input columns, never output
2980///       columns.
2981fn plan_order_by_or_distinct_expr(
2982    ecx: &ExprContext,
2983    expr: &Expr<Aug>,
2984    output_columns: &[(usize, &ColumnName)],
2985) -> Result<HirScalarExpr, PlanError> {
2986    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2987        return Ok(HirScalarExpr::column(output_columns[i].0));
2988    }
2989
2990    if let Expr::Identifier(names) = expr {
2991        if let [name] = &names[..] {
2992            let name = normalize::column_name(name.clone());
2993            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2994            if let Some((i, _)) = iter.next() {
2995                match iter.next() {
2996                    // Per SQL92, names are not considered ambiguous if they
2997                    // refer to identical target list expressions, as in
2998                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2999                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
3000                    _ => return Ok(HirScalarExpr::column(*i)),
3001                }
3002            }
3003        }
3004    }
3005
3006    plan_expr(ecx, expr)?.type_as_any(ecx)
3007}
3008
3009fn plan_table_with_joins(
3010    qcx: &QueryContext,
3011    table_with_joins: &TableWithJoins<Aug>,
3012) -> Result<(HirRelationExpr, Scope), PlanError> {
3013    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
3014    for join in &table_with_joins.joins {
3015        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
3016        expr = new_expr;
3017        scope = new_scope;
3018    }
3019    Ok((expr, scope))
3020}
3021
3022fn plan_table_factor(
3023    qcx: &QueryContext,
3024    table_factor: &TableFactor<Aug>,
3025) -> Result<(HirRelationExpr, Scope), PlanError> {
3026    match table_factor {
3027        TableFactor::Table { name, alias } => {
3028            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3029            let scope = plan_table_alias(scope, alias.as_ref())?;
3030            Ok((expr, scope))
3031        }
3032
3033        TableFactor::Function {
3034            function,
3035            alias,
3036            with_ordinality,
3037        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3038
3039        TableFactor::RowsFrom {
3040            functions,
3041            alias,
3042            with_ordinality,
3043        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3044
3045        TableFactor::Derived {
3046            lateral,
3047            subquery,
3048            alias,
3049        } => {
3050            let mut qcx = (*qcx).clone();
3051            if !lateral {
3052                // Since this derived table was not marked as `LATERAL`,
3053                // make elements in outer scopes invisible until we reach the
3054                // next lateral barrier.
3055                for scope in &mut qcx.outer_scopes {
3056                    if scope.lateral_barrier {
3057                        break;
3058                    }
3059                    scope.items.clear();
3060                }
3061            }
3062            qcx.outer_scopes[0].lateral_barrier = true;
3063            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3064            let scope = plan_table_alias(scope, alias.as_ref())?;
3065            Ok((expr, scope))
3066        }
3067
3068        TableFactor::NestedJoin { join, alias } => {
3069            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3070            let scope = plan_table_alias(scope, alias.as_ref())?;
3071            Ok((expr, scope))
3072        }
3073    }
3074}
3075
3076/// Plans a `ROWS FROM` expression.
3077///
3078/// `ROWS FROM` concatenates table functions into a single table, filling in
3079/// `NULL`s in places where one table function has fewer rows than another. We
3080/// can achieve this by augmenting each table function with a row number, doing
3081/// a `FULL JOIN` between each table function on the row number and eventually
3082/// projecting away the row number columns. Concretely, the following query
3083/// using `ROWS FROM`
3084///
3085/// ```sql
3086/// SELECT
3087///     *
3088/// FROM
3089///     ROWS FROM (
3090///         generate_series(1, 2),
3091///         information_schema._pg_expandarray(ARRAY[9]),
3092///         generate_series(3, 6)
3093///     );
3094/// ```
3095///
3096/// is equivalent to the following query that does not use `ROWS FROM`:
3097///
3098/// ```sql
3099/// SELECT
3100///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3101/// FROM
3102///     generate_series(1, 2) WITH ORDINALITY AS gs1
3103///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3104///         ON gs1.ordinality = expand.ordinality
3105///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3106///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3107/// ```
3108///
3109/// Note the call to `coalesce` in the last join condition, which ensures that
3110/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3111///
3112/// This function creates a HirRelationExpr that follows the structure of the
3113/// latter query.
3114///
3115/// `with_ordinality` can be used to have the output expression contain a
3116/// single coalesced ordinality column at the end of the entire expression.
3117fn plan_rows_from(
3118    qcx: &QueryContext,
3119    functions: &[Function<Aug>],
3120    alias: Option<&TableAlias>,
3121    with_ordinality: bool,
3122) -> Result<(HirRelationExpr, Scope), PlanError> {
3123    // The `repeat_row` function is not supported in ROWS FROM.
3124    if functions.iter().any(is_repeat_row) {
3125        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
3126        // error message would be misleading, because it would refer to WITH ORDINALITY instead of
3127        // ROWS FROM.
3128        bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3129    }
3130
3131    // If there's only a single table function, planning proceeds as if `ROWS
3132    // FROM` hadn't been written at all.
3133    if let [function] = functions {
3134        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3135    }
3136
3137    // Per PostgreSQL, all scope items take the name of the first function
3138    // (unless aliased).
3139    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3140    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3141        qcx,
3142        functions,
3143        Some(functions[0].name.full_item_name().clone()),
3144    )?;
3145
3146    // Columns tracks the set of columns we will keep in the projection.
3147    let mut columns = Vec::new();
3148    let mut offset = 0;
3149    // Retain table function's non-ordinality columns.
3150    for (idx, cols) in num_cols.into_iter().enumerate() {
3151        for i in 0..cols {
3152            columns.push(offset + i);
3153        }
3154        offset += cols + 1;
3155
3156        // Remove the ordinality column from the scope, accounting for previous scope
3157        // changes from this loop.
3158        scope.items.remove(offset - idx - 1);
3159    }
3160
3161    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3162    // column. Otherwise remove it from the scope.
3163    if with_ordinality {
3164        columns.push(offset);
3165    } else {
3166        scope.items.pop();
3167    }
3168
3169    let expr = expr.project(columns);
3170
3171    let scope = plan_table_alias(scope, alias)?;
3172    Ok((expr, scope))
3173}
3174
3175fn is_repeat_row(f: &Function<Aug>) -> bool {
3176    f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3177}
3178
3179/// Plans an expression coalescing multiple table functions. Each table
3180/// function is followed by its row ordinality. The entire expression is
3181/// followed by the coalesced row ordinality.
3182///
3183/// The returned Scope will set all item's table_name's to the `table_name`
3184/// parameter if it is `Some`. If `None`, they will be the name of each table
3185/// function.
3186///
3187/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3188/// each table function.
3189///
3190/// For example, with table functions tf1 returning 1 column (a) and tf2
3191/// returning 2 columns (b, c), this function will return an expr 6 columns:
3192///
3193/// - tf1.a
3194/// - tf1.ordinality
3195/// - tf2.b
3196/// - tf2.c
3197/// - tf2.ordinality
3198/// - coalesced_ordinality
3199///
3200/// And a `Vec<usize>` of `[1, 2]`.
3201fn plan_rows_from_internal<'a>(
3202    qcx: &QueryContext,
3203    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3204    table_name: Option<FullItemName>,
3205) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3206    let mut functions = functions.into_iter();
3207    let mut num_cols = Vec::new();
3208
3209    // Join together each of the table functions in turn. The last column is
3210    // always the column to join against and is maintained to be the coalescence
3211    // of the row number column for all prior functions.
3212    let (mut left_expr, mut left_scope) =
3213        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3214    num_cols.push(left_scope.len() - 1);
3215    // Create the coalesced ordinality column.
3216    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3217    left_scope
3218        .items
3219        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3220
3221    for function in functions {
3222        // The right hand side of a join must be planned in a new scope.
3223        let qcx = qcx.empty_derived_context();
3224        let (right_expr, mut right_scope) =
3225            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3226        num_cols.push(right_scope.len() - 1);
3227        let left_col = left_scope.len() - 1;
3228        let right_col = left_scope.len() + right_scope.len() - 1;
3229        let on = HirScalarExpr::call_binary(
3230            HirScalarExpr::column(left_col),
3231            HirScalarExpr::column(right_col),
3232            expr_func::Eq,
3233        );
3234        left_expr = left_expr
3235            .join(right_expr, on, JoinKind::FullOuter)
3236            .map(vec![HirScalarExpr::call_variadic(
3237                Coalesce,
3238                vec![
3239                    HirScalarExpr::column(left_col),
3240                    HirScalarExpr::column(right_col),
3241                ],
3242            )]);
3243
3244        // Project off the previous iteration's coalesced column, but keep both of this
3245        // iteration's ordinality columns.
3246        left_expr = left_expr.project(
3247            (0..left_col) // non-coalesced ordinality columns from left function
3248                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3249                .collect(),
3250        );
3251        // Move the coalesced ordinality column.
3252        right_scope.items.push(left_scope.items.pop().unwrap());
3253
3254        left_scope.items.extend(right_scope.items);
3255    }
3256
3257    Ok((left_expr, left_scope, num_cols))
3258}
3259
3260/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3261/// FROM` clause that contains other table functions. Special aliasing rules
3262/// apply.
3263fn plan_solitary_table_function(
3264    qcx: &QueryContext,
3265    function: &Function<Aug>,
3266    alias: Option<&TableAlias>,
3267    with_ordinality: bool,
3268) -> Result<(HirRelationExpr, Scope), PlanError> {
3269    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3270
3271    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3272    if single_column_function {
3273        let item = &mut scope.items[0];
3274
3275        // Mark that the function only produced a single column. This impacts
3276        // whole-row references.
3277        item.from_single_column_function = true;
3278
3279        // Strange special case for solitary table functions that output one
3280        // column whose name matches the name of the table function. If a table
3281        // alias is provided, the column name is changed to the table alias's
3282        // name. Concretely, the following query returns a column named `x`
3283        // rather than a column named `generate_series`:
3284        //
3285        //     SELECT * FROM generate_series(1, 5) AS x
3286        //
3287        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3288        // since its output column is explicitly named `value`, not
3289        // `jsonb_array_elements`.
3290        //
3291        // Note also that we may (correctly) change the column name again when
3292        // we plan the table alias below if the `alias.columns` is non-empty.
3293        if let Some(alias) = alias {
3294            if let ScopeItem {
3295                table_name: Some(table_name),
3296                column_name,
3297                ..
3298            } = item
3299            {
3300                if table_name.item.as_str() == column_name.as_str() {
3301                    *column_name = normalize::column_name(alias.name.clone());
3302                }
3303            }
3304        }
3305    }
3306
3307    let scope = plan_table_alias(scope, alias)?;
3308    Ok((expr, scope))
3309}
3310
3311/// Plans a table function.
3312///
3313/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3314/// instead to get the appropriate aliasing behavior.
3315fn plan_table_function_internal(
3316    qcx: &QueryContext,
3317    Function {
3318        name,
3319        args,
3320        filter,
3321        over,
3322        distinct,
3323    }: &Function<Aug>,
3324    with_ordinality: bool,
3325    table_name: Option<FullItemName>,
3326) -> Result<(HirRelationExpr, Scope), PlanError> {
3327    assert_none!(filter, "cannot parse table function with FILTER");
3328    assert_none!(over, "cannot parse table function with OVER");
3329    assert!(!*distinct, "cannot parse table function with DISTINCT");
3330
3331    let ecx = &ExprContext {
3332        qcx,
3333        name: "table function arguments",
3334        scope: &Scope::empty(),
3335        relation_type: &SqlRelationType::empty(),
3336        allow_aggregates: false,
3337        allow_subqueries: true,
3338        allow_parameters: true,
3339        allow_windows: false,
3340    };
3341
3342    let scalar_args = match args {
3343        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3344        FunctionArgs::Args { args, order_by } => {
3345            if !order_by.is_empty() {
3346                sql_bail!(
3347                    "ORDER BY specified, but {} is not an aggregate function",
3348                    name
3349                );
3350            }
3351            plan_exprs(ecx, args)?
3352        }
3353    };
3354
3355    let table_name = match table_name {
3356        Some(table_name) => table_name.item,
3357        None => name.full_item_name().item.clone(),
3358    };
3359
3360    let scope_name = Some(PartialItemName {
3361        database: None,
3362        schema: None,
3363        item: table_name,
3364    });
3365
3366    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3367        Func::Table(impls) => {
3368            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3369            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3370            let expr = match tf.imp {
3371                TableFuncImpl::CallTable { mut func, exprs } => {
3372                    if with_ordinality {
3373                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3374                            PlanError::Unsupported {
3375                                feature: format!("WITH ORDINALITY on {}", func),
3376                                discussion_no: None,
3377                            },
3378                        )?;
3379                    }
3380                    HirRelationExpr::CallTable { func, exprs }
3381                }
3382                TableFuncImpl::Expr(expr) => {
3383                    if !with_ordinality {
3384                        expr
3385                    } else {
3386                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3387                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3388                        // back to the legacy implementation or error out the query.
3389                        if qcx
3390                            .scx
3391                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3392                        {
3393                            // Note that this can give an incorrect ordering, and also has an extreme
3394                            // performance problem in some cases. See the doc comment of
3395                            // `TableFuncImpl`.
3396                            tracing::error!(
3397                                %name,
3398                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3399                            );
3400                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3401                                func: WindowExprType::Scalar(ScalarWindowExpr {
3402                                    func: ScalarWindowFunc::RowNumber,
3403                                    order_by: vec![],
3404                                }),
3405                                partition_by: vec![],
3406                                order_by: vec![],
3407                            })])
3408                        } else {
3409                            bail_unsupported!(format!(
3410                                "WITH ORDINALITY or ROWS FROM with {}",
3411                                name
3412                            ));
3413                        }
3414                    }
3415                }
3416            };
3417            (expr, scope)
3418        }
3419        Func::Scalar(impls) => {
3420            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3421            let output = expr.typ(
3422                &qcx.outer_relation_types,
3423                &SqlRelationType::new(vec![]),
3424                &qcx.scx.param_types.borrow(),
3425            );
3426
3427            let relation = SqlRelationType::new(vec![output]);
3428
3429            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3430            let column_name = normalize::column_name(function_ident);
3431            let name = column_name.to_string();
3432
3433            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3434
3435            let mut func = TableFunc::TabletizedScalar { relation, name };
3436            if with_ordinality {
3437                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3438                    feature: format!("WITH ORDINALITY on {}", func),
3439                    discussion_no: None,
3440                })?;
3441            }
3442            (
3443                HirRelationExpr::CallTable {
3444                    func,
3445                    exprs: vec![expr],
3446                },
3447                scope,
3448            )
3449        }
3450        o => sql_bail!(
3451            "{} functions are not supported in functions in FROM",
3452            o.class()
3453        ),
3454    };
3455
3456    if with_ordinality {
3457        scope
3458            .items
3459            .push(ScopeItem::from_name(scope_name, "ordinality"));
3460    }
3461
3462    Ok((expr, scope))
3463}
3464
3465fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3466    if let Some(TableAlias {
3467        name,
3468        columns,
3469        strict,
3470    }) = alias
3471    {
3472        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3473            sql_bail!(
3474                "{} has {} columns available but {} columns specified",
3475                name,
3476                scope.items.len(),
3477                columns.len()
3478            );
3479        }
3480
3481        let table_name = normalize::ident(name.to_owned());
3482        for (i, item) in scope.items.iter_mut().enumerate() {
3483            item.table_name = if item.allow_unqualified_references {
3484                Some(PartialItemName {
3485                    database: None,
3486                    schema: None,
3487                    item: table_name.clone(),
3488                })
3489            } else {
3490                // Columns that prohibit unqualified references are special
3491                // columns from the output of a NATURAL or USING join that can
3492                // only be referenced by their full, pre-join name. Applying an
3493                // alias to the output of that join renders those columns
3494                // inaccessible, which we accomplish here by setting the
3495                // table name to `None`.
3496                //
3497                // Concretely, consider:
3498                //
3499                //      CREATE TABLE t1 (a int);
3500                //      CREATE TABLE t2 (a int);
3501                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3502                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3503                //
3504                // In (1), the join has no alias. The underlying columns from
3505                // either side of the join can be referenced as `t1.a` and
3506                // `t2.a`, respectively, and the unqualified name `a` refers to
3507                // a column whose value is `coalesce(t1.a, t2.a)`.
3508                //
3509                // In (2), the join is aliased as `t`. The columns from either
3510                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3511                // the coalesced column can be named as either `a` or `t.a`.
3512                //
3513                // We previously had a bug [0] that mishandled this subtle
3514                // logic.
3515                //
3516                // NOTE(benesch): We could in theory choose to project away
3517                // those inaccessible columns and drop them from the scope
3518                // entirely, but that would require that this function also
3519                // take and return the `HirRelationExpr` that is being aliased,
3520                // which is a rather large refactor.
3521                //
3522                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3523                None
3524            };
3525            item.column_name = columns
3526                .get(i)
3527                .map(|a| normalize::column_name(a.clone()))
3528                .unwrap_or_else(|| item.column_name.clone());
3529        }
3530    }
3531    Ok(scope)
3532}
3533
3534// `table_func_names` is a mapping from a UUID to the original function
3535// name. The UUIDs are identifiers that have been rewritten from some table
3536// function expression, and this mapping restores the original names.
3537fn invent_column_name(
3538    ecx: &ExprContext,
3539    expr: &Expr<Aug>,
3540    table_func_names: &BTreeMap<String, Ident>,
3541) -> Option<ColumnName> {
3542    // We follow PostgreSQL exactly here, which has some complicated rules
3543    // around "high" and "low" quality names. Low quality names override other
3544    // low quality names but not high quality names.
3545    //
3546    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3547
3548    #[derive(Debug)]
3549    enum NameQuality {
3550        Low,
3551        High,
3552    }
3553
3554    fn invent(
3555        ecx: &ExprContext,
3556        expr: &Expr<Aug>,
3557        table_func_names: &BTreeMap<String, Ident>,
3558    ) -> Option<(ColumnName, NameQuality)> {
3559        match expr {
3560            Expr::Identifier(names) => {
3561                if let [name] = names.as_slice() {
3562                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3563                        return Some((
3564                            normalize::column_name(table_func_name.clone()),
3565                            NameQuality::High,
3566                        ));
3567                    }
3568                }
3569                names
3570                    .last()
3571                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3572            }
3573            Expr::Value(v) => match v {
3574                // Per PostgreSQL, `bool` and `interval` literals take on the name
3575                // of their type, but not other literal types.
3576                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3577                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3578                _ => None,
3579            },
3580            Expr::Function(func) => {
3581                let (schema, item) = match &func.name {
3582                    ResolvedItemName::Item {
3583                        qualifiers,
3584                        full_name,
3585                        ..
3586                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3587                    _ => unreachable!(),
3588                };
3589
3590                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3591                    || schema
3592                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3593                {
3594                    None
3595                } else {
3596                    Some((item.into(), NameQuality::High))
3597                }
3598            }
3599            Expr::HomogenizingFunction { function, .. } => Some((
3600                function.to_string().to_lowercase().into(),
3601                NameQuality::High,
3602            )),
3603            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3604            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3605            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3606            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3607            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3608                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3609                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3610            },
3611            Expr::Case { else_result, .. } => {
3612                match else_result
3613                    .as_ref()
3614                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3615                {
3616                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3617                    _ => Some(("case".into(), NameQuality::Low)),
3618                }
3619            }
3620            Expr::FieldAccess { field, .. } => {
3621                Some((normalize::column_name(field.clone()), NameQuality::High))
3622            }
3623            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3624            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3625            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3626                // A bit silly to have to plan the query here just to get its column
3627                // name, since we throw away the planned expression, but fixing this
3628                // requires a separate semantic analysis phase.
3629                let (_expr, scope) =
3630                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3631                scope
3632                    .items
3633                    .first()
3634                    .map(|name| (name.column_name.clone(), NameQuality::High))
3635            }
3636            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3637            _ => None,
3638        }
3639    }
3640
3641    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3642}
3643
3644#[derive(Debug)]
3645enum ExpandedSelectItem<'a> {
3646    InputOrdinal(usize),
3647    Expr(Cow<'a, Expr<Aug>>),
3648}
3649
3650impl ExpandedSelectItem<'_> {
3651    fn as_expr(&self) -> Option<&Expr<Aug>> {
3652        match self {
3653            ExpandedSelectItem::InputOrdinal(_) => None,
3654            ExpandedSelectItem::Expr(expr) => Some(expr),
3655        }
3656    }
3657}
3658
3659fn expand_select_item<'a>(
3660    ecx: &ExprContext,
3661    s: &'a SelectItem<Aug>,
3662    table_func_names: &BTreeMap<String, Ident>,
3663) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3664    match s {
3665        SelectItem::Expr {
3666            expr: Expr::QualifiedWildcard(table_name),
3667            alias: _,
3668        } => {
3669            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3670            let table_name =
3671                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3672            let out: Vec<_> = ecx
3673                .scope
3674                .items
3675                .iter()
3676                .enumerate()
3677                .filter(|(_i, item)| item.is_from_table(&table_name))
3678                .map(|(i, item)| {
3679                    let name = item.column_name.clone();
3680                    (ExpandedSelectItem::InputOrdinal(i), name)
3681                })
3682                .collect();
3683            if out.is_empty() {
3684                sql_bail!("no table named '{}' in scope", table_name);
3685            }
3686            Ok(out)
3687        }
3688        SelectItem::Expr {
3689            expr: Expr::WildcardAccess(sql_expr),
3690            alias: _,
3691        } => {
3692            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3693            // A bit silly to have to plan the expression here just to get its
3694            // type, since we throw away the planned expression, but fixing this
3695            // requires a separate semantic analysis phase. Luckily this is an
3696            // uncommon operation and the PostgreSQL docs have a warning that
3697            // this operation is slow in Postgres too.
3698            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3699            let fields = match ecx.scalar_type(&expr) {
3700                SqlScalarType::Record { fields, .. } => fields,
3701                ty => sql_bail!(
3702                    "type {} is not composite",
3703                    ecx.humanize_sql_scalar_type(&ty, false)
3704                ),
3705            };
3706            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3707            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3708                if let [name] = ident.as_slice() {
3709                    if let Ok(items) = ecx.scope.items_from_table(
3710                        &[],
3711                        &PartialItemName {
3712                            database: None,
3713                            schema: None,
3714                            item: name.as_str().to_string(),
3715                        },
3716                    ) {
3717                        for (_, item) in items {
3718                            if item
3719                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3720                            {
3721                                skip_cols.insert(item.column_name.clone());
3722                            }
3723                        }
3724                    }
3725                }
3726            }
3727            let items = fields
3728                .iter()
3729                .filter_map(|(name, _ty)| {
3730                    if skip_cols.contains(name) {
3731                        None
3732                    } else {
3733                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3734                            expr: sql_expr.clone(),
3735                            field: name.clone().into(),
3736                        }));
3737                        Some((item, name.clone()))
3738                    }
3739                })
3740                .collect();
3741            Ok(items)
3742        }
3743        SelectItem::Wildcard => {
3744            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3745            let items: Vec<_> = ecx
3746                .scope
3747                .items
3748                .iter()
3749                .enumerate()
3750                .filter(|(_i, item)| item.allow_unqualified_references)
3751                .map(|(i, item)| {
3752                    let name = item.column_name.clone();
3753                    (ExpandedSelectItem::InputOrdinal(i), name)
3754                })
3755                .collect();
3756
3757            Ok(items)
3758        }
3759        SelectItem::Expr { expr, alias } => {
3760            let name = alias
3761                .clone()
3762                .map(normalize::column_name)
3763                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3764                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3765            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3766        }
3767    }
3768}
3769
3770fn plan_join(
3771    left_qcx: &QueryContext,
3772    left: HirRelationExpr,
3773    left_scope: Scope,
3774    join: &Join<Aug>,
3775) -> Result<(HirRelationExpr, Scope), PlanError> {
3776    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3777    let (kind, constraint) = match &join.join_operator {
3778        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3779        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3780        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3781        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3782        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3783    };
3784
3785    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3786    if !kind.can_be_correlated() {
3787        for item in &mut right_qcx.outer_scopes[0].items {
3788            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3789            // these items from scope. These items need to *exist* because they
3790            // might shadow variables in outer scopes that would otherwise be
3791            // valid to reference, but accessing them needs to produce an error.
3792            item.error_if_referenced =
3793                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3794                    table: table.cloned(),
3795                    column: column.clone(),
3796                });
3797        }
3798    }
3799    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3800
3801    let (expr, scope) = match constraint {
3802        JoinConstraint::On(expr) => {
3803            let product_scope = left_scope.product(right_scope)?;
3804            let ecx = &ExprContext {
3805                qcx: left_qcx,
3806                name: "ON clause",
3807                scope: &product_scope,
3808                relation_type: &SqlRelationType::new(
3809                    left_qcx
3810                        .relation_type(&left)
3811                        .column_types
3812                        .into_iter()
3813                        .chain(right_qcx.relation_type(&right).column_types)
3814                        .collect(),
3815                ),
3816                allow_aggregates: false,
3817                allow_subqueries: true,
3818                allow_parameters: true,
3819                allow_windows: false,
3820            };
3821            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3822            let joined = left.join(right, on, kind);
3823            (joined, product_scope)
3824        }
3825        JoinConstraint::Using { columns, alias } => {
3826            let column_names = columns
3827                .iter()
3828                .map(|ident| normalize::column_name(ident.clone()))
3829                .collect::<Vec<_>>();
3830
3831            plan_using_constraint(
3832                &column_names,
3833                left_qcx,
3834                left,
3835                left_scope,
3836                &right_qcx,
3837                right,
3838                right_scope,
3839                kind,
3840                alias.as_ref(),
3841            )?
3842        }
3843        JoinConstraint::Natural => {
3844            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3845            // have the same scx. However, it doesn't hurt to be safe.
3846            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3847            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3848            let left_column_names = left_scope.column_names();
3849            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3850            let column_names: Vec<_> = left_column_names
3851                .filter(|col| right_column_names.contains(col))
3852                .cloned()
3853                .collect();
3854            plan_using_constraint(
3855                &column_names,
3856                left_qcx,
3857                left,
3858                left_scope,
3859                &right_qcx,
3860                right,
3861                right_scope,
3862                kind,
3863                None,
3864            )?
3865        }
3866    };
3867    Ok((expr, scope))
3868}
3869
3870// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3871#[allow(clippy::too_many_arguments)]
3872fn plan_using_constraint(
3873    column_names: &[ColumnName],
3874    left_qcx: &QueryContext,
3875    left: HirRelationExpr,
3876    left_scope: Scope,
3877    right_qcx: &QueryContext,
3878    right: HirRelationExpr,
3879    right_scope: Scope,
3880    kind: JoinKind,
3881    alias: Option<&Ident>,
3882) -> Result<(HirRelationExpr, Scope), PlanError> {
3883    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3884
3885    // Cargo culting PG here; no discernable reason this must fail, but PG does
3886    // so we do, as well.
3887    let mut unique_column_names = BTreeSet::new();
3888    for c in column_names {
3889        if !unique_column_names.insert(c) {
3890            return Err(PlanError::Unsupported {
3891                feature: format!(
3892                    "column name {} appears more than once in USING clause",
3893                    c.quoted()
3894                ),
3895                discussion_no: None,
3896            });
3897        }
3898    }
3899
3900    let alias_item_name = alias.map(|alias| PartialItemName {
3901        database: None,
3902        schema: None,
3903        item: alias.clone().to_string(),
3904    });
3905
3906    if let Some(alias_item_name) = &alias_item_name {
3907        for partial_item_name in both_scope.table_names() {
3908            if partial_item_name.matches(alias_item_name) {
3909                sql_bail!(
3910                    "table name \"{}\" specified more than once",
3911                    alias_item_name
3912                )
3913            }
3914        }
3915    }
3916
3917    let ecx = &ExprContext {
3918        qcx: right_qcx,
3919        name: "USING clause",
3920        scope: &both_scope,
3921        relation_type: &SqlRelationType::new(
3922            left_qcx
3923                .relation_type(&left)
3924                .column_types
3925                .into_iter()
3926                .chain(right_qcx.relation_type(&right).column_types)
3927                .collect(),
3928        ),
3929        allow_aggregates: false,
3930        allow_subqueries: false,
3931        allow_parameters: false,
3932        allow_windows: false,
3933    };
3934
3935    let mut join_exprs = vec![];
3936    let mut map_exprs = vec![];
3937    let mut new_items = vec![];
3938    let mut join_cols = vec![];
3939    let mut hidden_cols = vec![];
3940
3941    for column_name in column_names {
3942        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3943        let (lhs, lhs_name) = left_scope.resolve_using_column(
3944            column_name,
3945            JoinSide::Left,
3946            &mut left_qcx.name_manager.borrow_mut(),
3947        )?;
3948        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3949            column_name,
3950            JoinSide::Right,
3951            &mut right_qcx.name_manager.borrow_mut(),
3952        )?;
3953
3954        // Adjust the RHS reference to its post-join location.
3955        rhs.column += left_scope.len();
3956
3957        // Join keys must be resolved to same type.
3958        let mut exprs = coerce_homogeneous_exprs(
3959            &ecx.with_name(&format!(
3960                "NATURAL/USING join column {}",
3961                column_name.quoted()
3962            )),
3963            vec![
3964                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3965                    lhs,
3966                    Arc::clone(&lhs_name),
3967                )),
3968                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3969                    rhs,
3970                    Arc::clone(&rhs_name),
3971                )),
3972            ],
3973            None,
3974        )?;
3975        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3976
3977        match kind {
3978            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3979                join_cols.push(lhs.column);
3980                hidden_cols.push(rhs.column);
3981            }
3982            JoinKind::RightOuter => {
3983                join_cols.push(rhs.column);
3984                hidden_cols.push(lhs.column);
3985            }
3986            JoinKind::FullOuter => {
3987                // Create a new column that will be the coalesced value of left
3988                // and right.
3989                join_cols.push(both_scope.items.len() + map_exprs.len());
3990                hidden_cols.push(lhs.column);
3991                hidden_cols.push(rhs.column);
3992                map_exprs.push(HirScalarExpr::call_variadic(
3993                    Coalesce,
3994                    vec![expr1.clone(), expr2.clone()],
3995                ));
3996                new_items.push(ScopeItem::from_column_name(column_name));
3997            }
3998        }
3999
4000        // If a `join_using_alias` is present, add a new scope item that accepts
4001        // only table-qualified references for each specified join column.
4002        // Unlike regular table aliases, a `join_using_alias` should not hide the
4003        // names of the joined relations.
4004        if alias_item_name.is_some() {
4005            let new_item_col = both_scope.items.len() + new_items.len();
4006            join_cols.push(new_item_col);
4007            hidden_cols.push(new_item_col);
4008
4009            new_items.push(ScopeItem::from_name(
4010                alias_item_name.clone(),
4011                column_name.clone().to_string(),
4012            ));
4013
4014            // Should be safe to use either `lhs` or `rhs` here since the column
4015            // is available in both scopes and must have the same type of the new item.
4016            // We (arbitrarily) choose the left name.
4017            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
4018        }
4019
4020        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4021    }
4022    both_scope.items.extend(new_items);
4023
4024    // The columns from the secondary side of the join remain accessible by
4025    // their table-qualified name, but not by their column name alone. They are
4026    // also excluded from `SELECT *`.
4027    for c in hidden_cols {
4028        both_scope.items[c].allow_unqualified_references = false;
4029    }
4030
4031    // Reproject all returned elements to the front of the list.
4032    let project_key = join_cols
4033        .into_iter()
4034        .chain(0..both_scope.items.len())
4035        .unique()
4036        .collect::<Vec<_>>();
4037
4038    both_scope = both_scope.project(&project_key);
4039
4040    let on = HirScalarExpr::variadic_and(join_exprs);
4041
4042    let both = left
4043        .join(right, on, kind)
4044        .map(map_exprs)
4045        .project(project_key);
4046    Ok((both, both_scope))
4047}
4048
4049pub fn plan_expr<'a>(
4050    ecx: &'a ExprContext,
4051    e: &Expr<Aug>,
4052) -> Result<CoercibleScalarExpr, PlanError> {
4053    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4054}
4055
4056fn plan_expr_inner<'a>(
4057    ecx: &'a ExprContext,
4058    e: &Expr<Aug>,
4059) -> Result<CoercibleScalarExpr, PlanError> {
4060    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4061        // We've already calculated this expression.
4062        return Ok(HirScalarExpr::named_column(
4063            i,
4064            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4065        )
4066        .into());
4067    }
4068
4069    match e {
4070        // Names.
4071        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4072            Ok(plan_identifier(ecx, names)?.into())
4073        }
4074
4075        // Literals.
4076        Expr::Value(val) => plan_literal(val),
4077        Expr::Parameter(n) => plan_parameter(ecx, *n),
4078        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4079        Expr::List(exprs) => plan_list(ecx, exprs, None),
4080        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4081        Expr::Row { exprs } => plan_row(ecx, exprs),
4082
4083        // Generalized functions, operators, and casts.
4084        Expr::Op { op, expr1, expr2 } => {
4085            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4086        }
4087        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4088        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4089
4090        // Special functions and operators.
4091        Expr::Not { expr } => plan_not(ecx, expr),
4092        Expr::And { left, right } => plan_and(ecx, left, right),
4093        Expr::Or { left, right } => plan_or(ecx, left, right),
4094        Expr::IsExpr {
4095            expr,
4096            construct,
4097            negated,
4098        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4099        Expr::Case {
4100            operand,
4101            conditions,
4102            results,
4103            else_result,
4104        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4105        Expr::HomogenizingFunction { function, exprs } => {
4106            plan_homogenizing_function(ecx, function, exprs)
4107        }
4108        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4109            ecx,
4110            &None,
4111            &[l_expr.clone().equals(*r_expr.clone())],
4112            &[Expr::null()],
4113            &Some(Box::new(*l_expr.clone())),
4114        )?
4115        .into()),
4116        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4117        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4118        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4119        Expr::Like {
4120            expr,
4121            pattern,
4122            escape,
4123            case_insensitive,
4124            negated,
4125        } => Ok(plan_like(
4126            ecx,
4127            expr,
4128            pattern,
4129            escape.as_deref(),
4130            *case_insensitive,
4131            *negated,
4132        )?
4133        .into()),
4134
4135        Expr::InList {
4136            expr,
4137            list,
4138            negated,
4139        } => plan_in_list(ecx, expr, list, negated),
4140
4141        // Subqueries.
4142        Expr::Exists(query) => plan_exists(ecx, query),
4143        Expr::Subquery(query) => plan_subquery(ecx, query),
4144        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4145        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4146        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4147        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4148        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4149        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4150        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4151        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4152        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4153        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4154        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4155    }
4156}
4157
4158fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4159    if !ecx.allow_parameters {
4160        // It might be clearer to return an error like "cannot use parameter
4161        // here", but this is how PostgreSQL does it, and so for now we follow
4162        // PostgreSQL.
4163        return Err(PlanError::UnknownParameter(n));
4164    }
4165    if n == 0 || n > 65536 {
4166        return Err(PlanError::UnknownParameter(n));
4167    }
4168    if ecx.param_types().borrow().contains_key(&n) {
4169        Ok(HirScalarExpr::parameter(n).into())
4170    } else {
4171        Ok(CoercibleScalarExpr::Parameter(n))
4172    }
4173}
4174
4175fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4176    let mut out = vec![];
4177    for e in exprs {
4178        out.push(plan_expr(ecx, e)?);
4179    }
4180    Ok(CoercibleScalarExpr::LiteralRecord(out))
4181}
4182
4183fn plan_cast(
4184    ecx: &ExprContext,
4185    expr: &Expr<Aug>,
4186    data_type: &ResolvedDataType,
4187) -> Result<CoercibleScalarExpr, PlanError> {
4188    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4189    let expr = match expr {
4190        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4191        // we can pass in the target type as a type hint. This is
4192        // a limited form of the coercion that we do for string literals
4193        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4194        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4195        // PostgreSQL compatibility trouble.
4196        //
4197        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4198        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4199        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4200        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4201        _ => plan_expr(ecx, expr)?,
4202    };
4203    let ecx = &ecx.with_name("CAST");
4204    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4205    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4206    Ok(expr.into())
4207}
4208
4209fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4210    let ecx = ecx.with_name("NOT argument");
4211    Ok(plan_expr(&ecx, expr)?
4212        .type_as(&ecx, &SqlScalarType::Bool)?
4213        .call_unary(UnaryFunc::Not(expr_func::Not))
4214        .into())
4215}
4216
4217fn plan_and(
4218    ecx: &ExprContext,
4219    left: &Expr<Aug>,
4220    right: &Expr<Aug>,
4221) -> Result<CoercibleScalarExpr, PlanError> {
4222    let ecx = ecx.with_name("AND argument");
4223    Ok(HirScalarExpr::variadic_and(vec![
4224        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4225        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4226    ])
4227    .into())
4228}
4229
4230fn plan_or(
4231    ecx: &ExprContext,
4232    left: &Expr<Aug>,
4233    right: &Expr<Aug>,
4234) -> Result<CoercibleScalarExpr, PlanError> {
4235    let ecx = ecx.with_name("OR argument");
4236    Ok(HirScalarExpr::variadic_or(vec![
4237        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4238        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4239    ])
4240    .into())
4241}
4242
4243fn plan_in_list(
4244    ecx: &ExprContext,
4245    lhs: &Expr<Aug>,
4246    list: &Vec<Expr<Aug>>,
4247    negated: &bool,
4248) -> Result<CoercibleScalarExpr, PlanError> {
4249    let ecx = ecx.with_name("IN list");
4250    let or = HirScalarExpr::variadic_or(
4251        list.into_iter()
4252            .map(|e| {
4253                let eq = lhs.clone().equals(e.clone());
4254                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4255            })
4256            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4257    );
4258    Ok(if *negated {
4259        or.call_unary(UnaryFunc::Not(expr_func::Not))
4260    } else {
4261        or
4262    }
4263    .into())
4264}
4265
4266fn plan_homogenizing_function(
4267    ecx: &ExprContext,
4268    function: &HomogenizingFunction,
4269    exprs: &[Expr<Aug>],
4270) -> Result<CoercibleScalarExpr, PlanError> {
4271    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4272    let expr = HirScalarExpr::call_variadic(
4273        match function {
4274            HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4275            HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4276            HomogenizingFunction::Least => VariadicFunc::from(Least),
4277        },
4278        coerce_homogeneous_exprs(
4279            &ecx.with_name(&function.to_string().to_lowercase()),
4280            plan_exprs(ecx, exprs)?,
4281            None,
4282        )?,
4283    );
4284    Ok(expr.into())
4285}
4286
4287fn plan_field_access(
4288    ecx: &ExprContext,
4289    expr: &Expr<Aug>,
4290    field: &Ident,
4291) -> Result<CoercibleScalarExpr, PlanError> {
4292    let field = normalize::column_name(field.clone());
4293    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4294    let ty = ecx.scalar_type(&expr);
4295    let i = match &ty {
4296        SqlScalarType::Record { fields, .. } => {
4297            fields.iter().position(|(name, _ty)| *name == field)
4298        }
4299        ty => sql_bail!(
4300            "column notation applied to type {}, which is not a composite type",
4301            ecx.humanize_sql_scalar_type(ty, false)
4302        ),
4303    };
4304    match i {
4305        None => sql_bail!(
4306            "field {} not found in data type {}",
4307            field,
4308            ecx.humanize_sql_scalar_type(&ty, false)
4309        ),
4310        Some(i) => Ok(expr
4311            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4312            .into()),
4313    }
4314}
4315
4316fn plan_subscript(
4317    ecx: &ExprContext,
4318    expr: &Expr<Aug>,
4319    positions: &[SubscriptPosition<Aug>],
4320) -> Result<CoercibleScalarExpr, PlanError> {
4321    assert!(
4322        !positions.is_empty(),
4323        "subscript expression must contain at least one position"
4324    );
4325
4326    let ecx = &ecx.with_name("subscripting");
4327    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4328    let ty = ecx.scalar_type(&expr);
4329    match &ty {
4330        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4331            ecx,
4332            expr,
4333            positions,
4334            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4335            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4336            // track in its backing data).
4337            if ty == SqlScalarType::Int2Vector {
4338                1
4339            } else {
4340                0
4341            },
4342        ),
4343        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4344        SqlScalarType::List { element_type, .. } => {
4345            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4346            let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4347            let n_layers = ty.unwrap_list_n_layers();
4348            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4349        }
4350        ty => sql_bail!(
4351            "cannot subscript type {}",
4352            ecx.humanize_sql_scalar_type(ty, false)
4353        ),
4354    }
4355}
4356
4357// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4358// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4359// any were slices (i.e. included colon).
4360fn extract_scalar_subscript_from_positions<'a>(
4361    positions: &'a [SubscriptPosition<Aug>],
4362    expr_type_name: &str,
4363) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4364    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4365    for p in positions {
4366        if p.explicit_slice {
4367            sql_bail!("{} subscript does not support slices", expr_type_name);
4368        }
4369        assert!(
4370            p.end.is_none(),
4371            "index-appearing subscripts cannot have end value"
4372        );
4373        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4374    }
4375    Ok(scalar_subscripts)
4376}
4377
4378fn plan_subscript_array(
4379    ecx: &ExprContext,
4380    expr: HirScalarExpr,
4381    positions: &[SubscriptPosition<Aug>],
4382    offset: i64,
4383) -> Result<CoercibleScalarExpr, PlanError> {
4384    let mut exprs = Vec::with_capacity(positions.len() + 1);
4385    exprs.push(expr);
4386
4387    // Subscripting arrays doesn't yet support slicing, so we always want to
4388    // extract scalars or error.
4389    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4390
4391    for i in indexes {
4392        exprs.push(plan_expr(ecx, i)?.cast_to(
4393            ecx,
4394            CastContext::Explicit,
4395            &SqlScalarType::Int64,
4396        )?);
4397    }
4398
4399    Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4400}
4401
4402fn plan_subscript_list(
4403    ecx: &ExprContext,
4404    mut expr: HirScalarExpr,
4405    positions: &[SubscriptPosition<Aug>],
4406    mut remaining_layers: usize,
4407    elem_type_name: &str,
4408) -> Result<CoercibleScalarExpr, PlanError> {
4409    let mut i = 0;
4410
4411    while i < positions.len() {
4412        // Take all contiguous index operations, i.e. find next slice operation.
4413        let j = positions[i..]
4414            .iter()
4415            .position(|p| p.explicit_slice)
4416            .unwrap_or(positions.len() - i);
4417        if j != 0 {
4418            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4419            let (n, e) = plan_index_list(
4420                ecx,
4421                expr,
4422                indexes.as_slice(),
4423                remaining_layers,
4424                elem_type_name,
4425            )?;
4426            remaining_layers = n;
4427            expr = e;
4428            i += j;
4429        }
4430
4431        // Take all contiguous slice operations, i.e. find next index operation.
4432        let j = positions[i..]
4433            .iter()
4434            .position(|p| !p.explicit_slice)
4435            .unwrap_or(positions.len() - i);
4436        if j != 0 {
4437            expr = plan_slice_list(
4438                ecx,
4439                expr,
4440                &positions[i..i + j],
4441                remaining_layers,
4442                elem_type_name,
4443            )?;
4444            i += j;
4445        }
4446    }
4447
4448    Ok(expr.into())
4449}
4450
4451fn plan_index_list(
4452    ecx: &ExprContext,
4453    expr: HirScalarExpr,
4454    indexes: &[&Expr<Aug>],
4455    n_layers: usize,
4456    elem_type_name: &str,
4457) -> Result<(usize, HirScalarExpr), PlanError> {
4458    let depth = indexes.len();
4459
4460    if depth > n_layers {
4461        if n_layers == 0 {
4462            sql_bail!("cannot subscript type {}", elem_type_name)
4463        } else {
4464            sql_bail!(
4465                "cannot index into {} layers; list only has {} layer{}",
4466                depth,
4467                n_layers,
4468                if n_layers == 1 { "" } else { "s" }
4469            )
4470        }
4471    }
4472
4473    let mut exprs = Vec::with_capacity(depth + 1);
4474    exprs.push(expr);
4475
4476    for i in indexes {
4477        exprs.push(plan_expr(ecx, i)?.cast_to(
4478            ecx,
4479            CastContext::Explicit,
4480            &SqlScalarType::Int64,
4481        )?);
4482    }
4483
4484    Ok((
4485        n_layers - depth,
4486        HirScalarExpr::call_variadic(ListIndex, exprs),
4487    ))
4488}
4489
4490fn plan_slice_list(
4491    ecx: &ExprContext,
4492    expr: HirScalarExpr,
4493    slices: &[SubscriptPosition<Aug>],
4494    n_layers: usize,
4495    elem_type_name: &str,
4496) -> Result<HirScalarExpr, PlanError> {
4497    if n_layers == 0 {
4498        sql_bail!("cannot subscript type {}", elem_type_name)
4499    }
4500
4501    // first arg will be list
4502    let mut exprs = Vec::with_capacity(slices.len() + 1);
4503    exprs.push(expr);
4504    // extract (start, end) parts from collected slices
4505    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4506        Ok(match position {
4507            Some(p) => {
4508                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4509            }
4510            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4511        })
4512    };
4513    for p in slices {
4514        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4515        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4516        exprs.push(start);
4517        exprs.push(end);
4518    }
4519
4520    Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4521}
4522
4523fn plan_like(
4524    ecx: &ExprContext,
4525    expr: &Expr<Aug>,
4526    pattern: &Expr<Aug>,
4527    escape: Option<&Expr<Aug>>,
4528    case_insensitive: bool,
4529    not: bool,
4530) -> Result<HirScalarExpr, PlanError> {
4531    use CastContext::Implicit;
4532    let ecx = ecx.with_name("LIKE argument");
4533    let expr = plan_expr(&ecx, expr)?;
4534    let haystack = match ecx.scalar_type(&expr) {
4535        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4536            .type_as(&ecx, ty)?
4537            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4538        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4539    };
4540    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4541    if let Some(escape) = escape {
4542        pattern = pattern.call_binary(
4543            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4544            expr_func::LikeEscape,
4545        );
4546    }
4547    let func: BinaryFunc = if case_insensitive {
4548        expr_func::IsLikeMatchCaseInsensitive.into()
4549    } else {
4550        expr_func::IsLikeMatchCaseSensitive.into()
4551    };
4552    let like = haystack.call_binary(pattern, func);
4553    if not {
4554        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4555    } else {
4556        Ok(like)
4557    }
4558}
4559
4560fn plan_subscript_jsonb(
4561    ecx: &ExprContext,
4562    expr: HirScalarExpr,
4563    positions: &[SubscriptPosition<Aug>],
4564) -> Result<CoercibleScalarExpr, PlanError> {
4565    use CastContext::Implicit;
4566    use SqlScalarType::{Int64, String};
4567
4568    // JSONB doesn't support the slicing syntax, so simply error if you
4569    // encounter any explicit slices.
4570    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4571
4572    let mut exprs = Vec::with_capacity(subscripts.len());
4573    for s in subscripts {
4574        let subscript = plan_expr(ecx, s)?;
4575        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4576            subscript
4577        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4578            // Integers are converted to a string here and then re-parsed as an
4579            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4580            // do it.
4581            typeconv::to_string(ecx, subscript)
4582        } else {
4583            sql_bail!("jsonb subscript type must be coercible to integer or text");
4584        };
4585        exprs.push(subscript);
4586    }
4587
4588    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4589    // `expr->subscript` as you might expect.
4590    let expr = expr.call_binary(
4591        HirScalarExpr::call_variadic(
4592            ArrayCreate {
4593                elem_type: SqlScalarType::String,
4594            },
4595            exprs,
4596        ),
4597        expr_func::JsonbGetPath,
4598    );
4599    Ok(expr.into())
4600}
4601
4602fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4603    if !ecx.allow_subqueries {
4604        sql_bail!("{} does not allow subqueries", ecx.name)
4605    }
4606    let mut qcx = ecx.derived_query_context();
4607    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4608    Ok(expr.exists().into())
4609}
4610
4611fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4612    if !ecx.allow_subqueries {
4613        sql_bail!("{} does not allow subqueries", ecx.name)
4614    }
4615    let mut qcx = ecx.derived_query_context();
4616    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4617    let column_types = qcx.relation_type(&expr).column_types;
4618    if column_types.len() != 1 {
4619        sql_bail!(
4620            "Expected subselect to return 1 column, got {} columns",
4621            column_types.len()
4622        );
4623    }
4624    Ok(expr.select().into())
4625}
4626
4627fn plan_list_subquery(
4628    ecx: &ExprContext,
4629    query: &Query<Aug>,
4630) -> Result<CoercibleScalarExpr, PlanError> {
4631    plan_vector_like_subquery(
4632        ecx,
4633        query,
4634        |_| false,
4635        |elem_type| ListCreate { elem_type }.into(),
4636        |order_by| AggregateFunc::ListConcat { order_by },
4637        expr_func::ListListConcat.into(),
4638        |elem_type| {
4639            HirScalarExpr::literal(
4640                Datum::empty_list(),
4641                SqlScalarType::List {
4642                    element_type: Box::new(elem_type),
4643                    custom_id: None,
4644                },
4645            )
4646        },
4647        "list",
4648    )
4649}
4650
4651fn plan_array_subquery(
4652    ecx: &ExprContext,
4653    query: &Query<Aug>,
4654) -> Result<CoercibleScalarExpr, PlanError> {
4655    plan_vector_like_subquery(
4656        ecx,
4657        query,
4658        |elem_type| {
4659            matches!(
4660                elem_type,
4661                SqlScalarType::Char { .. }
4662                    | SqlScalarType::Array { .. }
4663                    | SqlScalarType::List { .. }
4664                    | SqlScalarType::Map { .. }
4665            )
4666        },
4667        |elem_type| ArrayCreate { elem_type }.into(),
4668        |order_by| AggregateFunc::ArrayConcat { order_by },
4669        expr_func::ArrayArrayConcat.into(),
4670        |elem_type| {
4671            HirScalarExpr::literal(
4672                Datum::empty_array(),
4673                SqlScalarType::Array(Box::new(elem_type)),
4674            )
4675        },
4676        "[]",
4677    )
4678}
4679
4680/// Generic function used to plan both array subqueries and list subqueries
4681fn plan_vector_like_subquery<F1, F2, F3, F4>(
4682    ecx: &ExprContext,
4683    query: &Query<Aug>,
4684    is_unsupported_type: F1,
4685    vector_create: F2,
4686    aggregate_concat: F3,
4687    binary_concat: BinaryFunc,
4688    empty_literal: F4,
4689    vector_type_string: &str,
4690) -> Result<CoercibleScalarExpr, PlanError>
4691where
4692    F1: Fn(&SqlScalarType) -> bool,
4693    F2: Fn(SqlScalarType) -> VariadicFunc,
4694    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4695    F4: Fn(SqlScalarType) -> HirScalarExpr,
4696{
4697    if !ecx.allow_subqueries {
4698        sql_bail!("{} does not allow subqueries", ecx.name)
4699    }
4700
4701    let mut qcx = ecx.derived_query_context();
4702    let mut planned_query = plan_query(&mut qcx, query)?;
4703    if planned_query.limit.is_some()
4704        || !planned_query
4705            .offset
4706            .clone()
4707            .try_into_literal_int64()
4708            .is_ok_and(|offset| offset == 0)
4709    {
4710        planned_query.expr = HirRelationExpr::top_k(
4711            planned_query.expr,
4712            vec![],
4713            planned_query.order_by.clone(),
4714            planned_query.limit,
4715            planned_query.offset,
4716            planned_query.group_size_hints.limit_input_group_size,
4717        );
4718    }
4719
4720    if planned_query.project.len() != 1 {
4721        sql_bail!(
4722            "Expected subselect to return 1 column, got {} columns",
4723            planned_query.project.len()
4724        );
4725    }
4726
4727    let project_column = *planned_query.project.get(0).unwrap();
4728    let elem_type = qcx
4729        .relation_type(&planned_query.expr)
4730        .column_types
4731        .get(project_column)
4732        .cloned()
4733        .unwrap()
4734        .scalar_type();
4735
4736    if is_unsupported_type(&elem_type) {
4737        bail_unsupported!(format!(
4738            "cannot build array from subquery because return type {}{}",
4739            ecx.humanize_sql_scalar_type(&elem_type, false),
4740            vector_type_string
4741        ));
4742    }
4743
4744    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4745    // subquery above.
4746    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4747        vector_create(elem_type.clone()),
4748        vec![HirScalarExpr::column(project_column)],
4749    ))
4750    .chain(
4751        planned_query
4752            .order_by
4753            .iter()
4754            .map(|co| HirScalarExpr::column(co.column)),
4755    )
4756    .collect();
4757
4758    // However, column references for `aggregation_projection` and `aggregation_order_by`
4759    // are with reference to the `exprs` of the aggregation expression.  Here that is
4760    // `aggregation_exprs`.
4761    let aggregation_projection = vec![0];
4762    let aggregation_order_by = planned_query
4763        .order_by
4764        .into_iter()
4765        .enumerate()
4766        .map(|(i, order)| ColumnOrder { column: i, ..order })
4767        .collect();
4768
4769    let reduced_expr = planned_query
4770        .expr
4771        .reduce(
4772            vec![],
4773            vec![AggregateExpr {
4774                func: aggregate_concat(aggregation_order_by),
4775                expr: Box::new(HirScalarExpr::call_variadic(
4776                    RecordCreate {
4777                        field_names: iter::repeat(ColumnName::from(""))
4778                            .take(aggregation_exprs.len())
4779                            .collect(),
4780                    },
4781                    aggregation_exprs,
4782                )),
4783                distinct: false,
4784            }],
4785            None,
4786        )
4787        .project(aggregation_projection);
4788
4789    // If `expr` has no rows, return an empty array/list rather than NULL.
4790    Ok(reduced_expr
4791        .select()
4792        .call_binary(empty_literal(elem_type), binary_concat)
4793        .into())
4794}
4795
4796fn plan_map_subquery(
4797    ecx: &ExprContext,
4798    query: &Query<Aug>,
4799) -> Result<CoercibleScalarExpr, PlanError> {
4800    if !ecx.allow_subqueries {
4801        sql_bail!("{} does not allow subqueries", ecx.name)
4802    }
4803
4804    let mut qcx = ecx.derived_query_context();
4805    let mut query = plan_query(&mut qcx, query)?;
4806    if query.limit.is_some()
4807        || !query
4808            .offset
4809            .clone()
4810            .try_into_literal_int64()
4811            .is_ok_and(|offset| offset == 0)
4812    {
4813        query.expr = HirRelationExpr::top_k(
4814            query.expr,
4815            vec![],
4816            query.order_by.clone(),
4817            query.limit,
4818            query.offset,
4819            query.group_size_hints.limit_input_group_size,
4820        );
4821    }
4822    if query.project.len() != 2 {
4823        sql_bail!(
4824            "expected map subquery to return 2 columns, got {} columns",
4825            query.project.len()
4826        );
4827    }
4828
4829    let query_types = qcx.relation_type(&query.expr).column_types;
4830    let key_column = query.project[0];
4831    let key_type = query_types[key_column].clone().scalar_type();
4832    let value_column = query.project[1];
4833    let value_type = query_types[value_column].clone().scalar_type();
4834
4835    if key_type != SqlScalarType::String {
4836        sql_bail!("cannot build map from subquery because first column is not of type text");
4837    }
4838
4839    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4840        RecordCreate {
4841            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4842        },
4843        vec![
4844            HirScalarExpr::column(key_column),
4845            HirScalarExpr::column(value_column),
4846        ],
4847    ))
4848    .chain(
4849        query
4850            .order_by
4851            .iter()
4852            .map(|co| HirScalarExpr::column(co.column)),
4853    )
4854    .collect();
4855
4856    let expr = query
4857        .expr
4858        .reduce(
4859            vec![],
4860            vec![AggregateExpr {
4861                func: AggregateFunc::MapAgg {
4862                    order_by: query
4863                        .order_by
4864                        .into_iter()
4865                        .enumerate()
4866                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4867                        .collect(),
4868                    value_type: value_type.clone(),
4869                },
4870                expr: Box::new(HirScalarExpr::call_variadic(
4871                    RecordCreate {
4872                        field_names: iter::repeat(ColumnName::from(""))
4873                            .take(aggregation_exprs.len())
4874                            .collect(),
4875                    },
4876                    aggregation_exprs,
4877                )),
4878                distinct: false,
4879            }],
4880            None,
4881        )
4882        .project(vec![0]);
4883
4884    // If `expr` has no rows, return an empty map rather than NULL.
4885    let expr = HirScalarExpr::call_variadic(
4886        Coalesce,
4887        vec![
4888            expr.select(),
4889            HirScalarExpr::literal(
4890                Datum::empty_map(),
4891                SqlScalarType::Map {
4892                    value_type: Box::new(value_type),
4893                    custom_id: None,
4894                },
4895            ),
4896        ],
4897    );
4898
4899    Ok(expr.into())
4900}
4901
4902fn plan_collate(
4903    ecx: &ExprContext,
4904    expr: &Expr<Aug>,
4905    collation: &UnresolvedItemName,
4906) -> Result<CoercibleScalarExpr, PlanError> {
4907    if collation.0.len() == 2
4908        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4909        && collation.0[1] == ident!("default")
4910    {
4911        plan_expr(ecx, expr)
4912    } else {
4913        bail_unsupported!("COLLATE");
4914    }
4915}
4916
4917/// Plans a slice of expressions.
4918///
4919/// This function is a simple convenience function for mapping [`plan_expr`]
4920/// over a slice of expressions. The planned expressions are returned in the
4921/// same order as the input. If any of the expressions fail to plan, returns an
4922/// error instead.
4923fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4924where
4925    E: std::borrow::Borrow<Expr<Aug>>,
4926{
4927    let mut out = vec![];
4928    for expr in exprs {
4929        out.push(plan_expr(ecx, expr.borrow())?);
4930    }
4931    Ok(out)
4932}
4933
4934/// Plans an `ARRAY` expression.
4935fn plan_array(
4936    ecx: &ExprContext,
4937    exprs: &[Expr<Aug>],
4938    type_hint: Option<&SqlScalarType>,
4939) -> Result<CoercibleScalarExpr, PlanError> {
4940    // Plan each element expression.
4941    let mut out = vec![];
4942    for expr in exprs {
4943        out.push(match expr {
4944            // Special case nested ARRAY expressions so we can plumb
4945            // the type hint through.
4946            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4947            _ => plan_expr(ecx, expr)?,
4948        });
4949    }
4950
4951    // Attempt to make use of the type hint.
4952    let type_hint = match type_hint {
4953        // The user has provided an explicit cast to an array type. We know the
4954        // element type to coerce to. Need to be careful, though: if there's
4955        // evidence that any of the array elements are themselves arrays, we
4956        // want to coerce to the array type, not the element type.
4957        Some(SqlScalarType::Array(elem_type)) => {
4958            let multidimensional = out.iter().any(|e| {
4959                matches!(
4960                    ecx.scalar_type(e),
4961                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4962                )
4963            });
4964            if multidimensional {
4965                type_hint
4966            } else {
4967                Some(&**elem_type)
4968            }
4969        }
4970        // The user provided an explicit cast to a non-array type. We'll have to
4971        // guess what the correct type for the array. Our caller will then
4972        // handle converting that array type to the desired non-array type.
4973        Some(_) => None,
4974        // No type hint. We'll have to guess the correct type for the array.
4975        None => None,
4976    };
4977
4978    // Coerce all elements to the same type.
4979    let (elem_type, exprs) = if exprs.is_empty() {
4980        if let Some(elem_type) = type_hint {
4981            (elem_type.clone(), vec![])
4982        } else {
4983            sql_bail!("cannot determine type of empty array");
4984        }
4985    } else {
4986        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4987        (ecx.scalar_type(&out[0]), out)
4988    };
4989
4990    // Arrays of `char` type are disallowed due to a known limitation:
4991    // https://github.com/MaterializeInc/database-issues/issues/2360.
4992    //
4993    // Arrays of `list` and `map` types are disallowed due to mind-bending
4994    // semantics.
4995    if matches!(
4996        elem_type,
4997        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4998    ) {
4999        bail_unsupported!(format!(
5000            "{}[]",
5001            ecx.humanize_sql_scalar_type(&elem_type, false)
5002        ));
5003    }
5004
5005    Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
5006}
5007
5008fn plan_list(
5009    ecx: &ExprContext,
5010    exprs: &[Expr<Aug>],
5011    type_hint: Option<&SqlScalarType>,
5012) -> Result<CoercibleScalarExpr, PlanError> {
5013    let (elem_type, exprs) = if exprs.is_empty() {
5014        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
5015            (element_type.without_modifiers(), vec![])
5016        } else {
5017            sql_bail!("cannot determine type of empty list");
5018        }
5019    } else {
5020        let type_hint = match type_hint {
5021            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5022            _ => None,
5023        };
5024
5025        let mut out = vec![];
5026        for expr in exprs {
5027            out.push(match expr {
5028                // Special case nested LIST expressions so we can plumb
5029                // the type hint through.
5030                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5031                _ => plan_expr(ecx, expr)?,
5032            });
5033        }
5034        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5035        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5036    };
5037
5038    if matches!(elem_type, SqlScalarType::Char { .. }) {
5039        bail_unsupported!("char list");
5040    }
5041
5042    Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5043}
5044
5045fn plan_map(
5046    ecx: &ExprContext,
5047    entries: &[MapEntry<Aug>],
5048    type_hint: Option<&SqlScalarType>,
5049) -> Result<CoercibleScalarExpr, PlanError> {
5050    let (value_type, exprs) = if entries.is_empty() {
5051        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5052            (value_type.without_modifiers(), vec![])
5053        } else {
5054            sql_bail!("cannot determine type of empty map");
5055        }
5056    } else {
5057        let type_hint = match type_hint {
5058            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5059            _ => None,
5060        };
5061
5062        let mut keys = vec![];
5063        let mut values = vec![];
5064        for MapEntry { key, value } in entries {
5065            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5066            let value = match value {
5067                // Special case nested MAP expressions so we can plumb
5068                // the type hint through.
5069                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5070                _ => plan_expr(ecx, value)?,
5071            };
5072            keys.push(key);
5073            values.push(value);
5074        }
5075        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5076        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5077        let out = itertools::interleave(keys, values).collect();
5078        (value_type, out)
5079    };
5080
5081    if matches!(value_type, SqlScalarType::Char { .. }) {
5082        bail_unsupported!("char map");
5083    }
5084
5085    let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5086    Ok(expr.into())
5087}
5088
5089/// Coerces a list of expressions such that all input expressions will be cast
5090/// to the same type. If successful, returns a new list of expressions in the
5091/// same order as the input, where each expression has the appropriate casts to
5092/// make them all of a uniform type.
5093///
5094/// If `force_type` is `Some`, the expressions are forced to the specified type
5095/// via an explicit cast. Otherwise the best common type is guessed via
5096/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5097/// implicit casts
5098///
5099/// Note that this is our implementation of Postgres' type conversion for
5100/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5101/// isn't yet used in all of those cases.
5102///
5103/// [union-type-conv]:
5104/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5105pub fn coerce_homogeneous_exprs(
5106    ecx: &ExprContext,
5107    exprs: Vec<CoercibleScalarExpr>,
5108    force_type: Option<&SqlScalarType>,
5109) -> Result<Vec<HirScalarExpr>, PlanError> {
5110    assert!(!exprs.is_empty());
5111
5112    let target_holder;
5113    let target = match force_type {
5114        Some(t) => t,
5115        None => {
5116            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5117            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5118            &target_holder
5119        }
5120    };
5121
5122    // Try to cast all expressions to `target`.
5123    let mut out = Vec::new();
5124    for expr in exprs {
5125        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5126        let ccx = match force_type {
5127            None => CastContext::Implicit,
5128            Some(_) => CastContext::Explicit,
5129        };
5130        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5131            Ok(expr) => out.push(expr),
5132            Err(_) => sql_bail!(
5133                "{} could not convert type {} to {}",
5134                ecx.name,
5135                ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5136                ecx.humanize_sql_scalar_type(target, false),
5137            ),
5138        }
5139    }
5140    Ok(out)
5141}
5142
5143/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5144/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5145pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5146    obe: &OrderByExpr<T>,
5147    column: usize,
5148) -> ColumnOrder {
5149    let desc = !obe.asc.unwrap_or(true);
5150    ColumnOrder {
5151        column,
5152        desc,
5153        // https://www.postgresql.org/docs/14/queries-order.html
5154        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5155        nulls_last: obe.nulls_last.unwrap_or(!desc),
5156    }
5157}
5158
5159/// Plans the ORDER BY clause of a window function.
5160///
5161/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5162/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5163/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5164/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5165/// `Vec<HirScalarExpr>` that we return.
5166fn plan_function_order_by(
5167    ecx: &ExprContext,
5168    order_by: &[OrderByExpr<Aug>],
5169) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5170    let mut order_by_exprs = vec![];
5171    let mut col_orders = vec![];
5172    {
5173        for (i, obe) in order_by.iter().enumerate() {
5174            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5175            // do not support ordinal references in PostgreSQL. So we use
5176            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5177            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5178            order_by_exprs.push(expr);
5179            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5180        }
5181    }
5182    Ok((order_by_exprs, col_orders))
5183}
5184
5185/// Common part of the planning of windowed and non-windowed aggregation functions.
5186fn plan_aggregate_common(
5187    ecx: &ExprContext,
5188    Function::<Aug> {
5189        name,
5190        args,
5191        filter,
5192        over: _,
5193        distinct,
5194    }: &Function<Aug>,
5195) -> Result<AggregateExpr, PlanError> {
5196    // Normal aggregate functions, like `sum`, expect as input a single expression
5197    // which yields the datum to aggregate. Order sensitive aggregate functions,
5198    // like `jsonb_agg`, are special, and instead expect a Record whose first
5199    // element yields the datum to aggregate and whose successive elements yield
5200    // keys to order by. This expectation is hard coded within the implementation
5201    // of each of the order-sensitive aggregates. The specification of how many
5202    // order by keys to consider, and in what order, is passed via the `order_by`
5203    // field on the `AggregateFunc` variant.
5204
5205    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5206    // most, so explicitly drop it if the function doesn't care about order. This
5207    // prevents the projection into Record below from triggering on unsupported
5208    // functions.
5209
5210    let impls = match resolve_func(ecx, name, args)? {
5211        Func::Aggregate(impls) => impls,
5212        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5213    };
5214
5215    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5216    // generalized function selection framework. The rule is simple: the user
5217    // must type `count(*)`, but the function selection framework sees an empty
5218    // parameter list, as if the user had typed `count()`. But if the user types
5219    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5220    // rules to all aggregates, not just `count`, since we may one day support
5221    // user-defined aggregates, including user-defined aggregates that take no
5222    // parameters.
5223    let (args, order_by) = match &args {
5224        FunctionArgs::Star => (vec![], vec![]),
5225        FunctionArgs::Args { args, order_by } => {
5226            if args.is_empty() {
5227                sql_bail!(
5228                    "{}(*) must be used to call a parameterless aggregate function",
5229                    ecx.qcx
5230                        .scx
5231                        .humanize_resolved_name(name)
5232                        .expect("name actually resolved")
5233                );
5234            }
5235            let args = plan_exprs(ecx, args)?;
5236            (args, order_by.clone())
5237        }
5238    };
5239
5240    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5241
5242    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5243    if let Some(filter) = &filter {
5244        // If a filter is present, as in
5245        //
5246        //     <agg>(<expr>) FILTER (WHERE <cond>)
5247        //
5248        // we plan it by essentially rewriting the expression to
5249        //
5250        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5251        //
5252        // where <identity> is the identity input for <agg>.
5253        let cond =
5254            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5255        let expr_typ = ecx.scalar_type(&expr);
5256        expr = HirScalarExpr::if_then_else(
5257            cond,
5258            expr,
5259            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5260        );
5261    }
5262
5263    let mut seen_outer = false;
5264    let mut seen_inner = false;
5265    #[allow(deprecated)]
5266    expr.visit_columns(0, &mut |depth, col| {
5267        if depth == 0 && col.level == 0 {
5268            seen_inner = true;
5269        } else if col.level > depth {
5270            seen_outer = true;
5271        }
5272    });
5273    if seen_outer && !seen_inner {
5274        bail_unsupported!(
5275            3720,
5276            "aggregate functions that refer exclusively to outer columns"
5277        );
5278    }
5279
5280    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5281    // map the needed expressions into the aggregate datum.
5282    if func.is_order_sensitive() {
5283        let field_names = iter::repeat(ColumnName::from(""))
5284            .take(1 + order_by_exprs.len())
5285            .collect();
5286        let mut exprs = vec![expr];
5287        exprs.extend(order_by_exprs);
5288        expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5289    }
5290
5291    Ok(AggregateExpr {
5292        func,
5293        expr: Box::new(expr),
5294        distinct: *distinct,
5295    })
5296}
5297
5298fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5299    let mut names = names.to_vec();
5300    let col_name = normalize::column_name(names.pop().unwrap());
5301
5302    // If the name is qualified, it must refer to a column in a table.
5303    if !names.is_empty() {
5304        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5305        let (i, i_name) = ecx.scope.resolve_table_column(
5306            &ecx.qcx.outer_scopes,
5307            &table_name,
5308            &col_name,
5309            &mut ecx.qcx.name_manager.borrow_mut(),
5310        )?;
5311        return Ok(HirScalarExpr::named_column(i, i_name));
5312    }
5313
5314    // If the name is unqualified, first check if it refers to a column. Track any similar names
5315    // that might exist for a better error message.
5316    let similar_names = match ecx.scope.resolve_column(
5317        &ecx.qcx.outer_scopes,
5318        &col_name,
5319        &mut ecx.qcx.name_manager.borrow_mut(),
5320    ) {
5321        Ok((i, i_name)) => {
5322            return Ok(HirScalarExpr::named_column(i, i_name));
5323        }
5324        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5325        Err(e) => return Err(e),
5326    };
5327
5328    // The name doesn't refer to a column. Check if it is a whole-row reference
5329    // to a table.
5330    let items = ecx.scope.items_from_table(
5331        &ecx.qcx.outer_scopes,
5332        &PartialItemName {
5333            database: None,
5334            schema: None,
5335            item: col_name.as_str().to_owned(),
5336        },
5337    )?;
5338    match items.as_slice() {
5339        // The name doesn't refer to a table either. Return an error.
5340        [] => Err(PlanError::UnknownColumn {
5341            table: None,
5342            column: col_name,
5343            similar: similar_names,
5344        }),
5345        // The name refers to a table that is the result of a function that
5346        // returned a single column. Per PostgreSQL, this is a special case
5347        // that returns the value directly.
5348        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5349        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5350            *column,
5351            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5352        )),
5353        // The name refers to a normal table. Return a record containing all the
5354        // columns of the table.
5355        _ => {
5356            let mut has_exists_column = None;
5357            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5358                .into_iter()
5359                .filter_map(|(column, item)| {
5360                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5361                        has_exists_column = Some(column);
5362                        None
5363                    } else {
5364                        let expr = HirScalarExpr::named_column(
5365                            column,
5366                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5367                        );
5368                        let name = item.column_name.clone();
5369                        Some((expr, name))
5370                    }
5371                })
5372                .unzip();
5373            // For the special case of a table function with a single column, the single column is instead not wrapped.
5374            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5375                exprs.into_element()
5376            } else {
5377                HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5378            };
5379            if let Some(has_exists_column) = has_exists_column {
5380                Ok(HirScalarExpr::if_then_else(
5381                    HirScalarExpr::unnamed_column(has_exists_column)
5382                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5383                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5384                    expr,
5385                ))
5386            } else {
5387                Ok(expr)
5388            }
5389        }
5390    }
5391}
5392
5393fn plan_op(
5394    ecx: &ExprContext,
5395    op: &str,
5396    expr1: &Expr<Aug>,
5397    expr2: Option<&Expr<Aug>>,
5398) -> Result<HirScalarExpr, PlanError> {
5399    let impls = func::resolve_op(op)?;
5400    let args = match expr2 {
5401        None => plan_exprs(ecx, &[expr1])?,
5402        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5403    };
5404    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5405}
5406
5407fn plan_function<'a>(
5408    ecx: &ExprContext,
5409    f @ Function {
5410        name,
5411        args,
5412        filter,
5413        over,
5414        distinct,
5415    }: &'a Function<Aug>,
5416) -> Result<HirScalarExpr, PlanError> {
5417    let impls = match resolve_func(ecx, name, args)? {
5418        Func::Table(_) => {
5419            sql_bail!(
5420                "table functions are not allowed in {} (function {})",
5421                ecx.name,
5422                name
5423            );
5424        }
5425        Func::Scalar(impls) => {
5426            if over.is_some() {
5427                sql_bail!(
5428                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5429                );
5430            }
5431            impls
5432        }
5433        Func::ScalarWindow(impls) => {
5434            let (
5435                ignore_nulls,
5436                order_by_exprs,
5437                col_orders,
5438                _window_frame,
5439                partition_by,
5440                scalar_args,
5441            ) = plan_window_function_non_aggr(ecx, f)?;
5442
5443            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5444            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5445            // msg there is less informative.)
5446            if !scalar_args.is_empty() {
5447                if let ResolvedItemName::Item {
5448                    full_name: FullItemName { item, .. },
5449                    ..
5450                } = name
5451                {
5452                    sql_bail!(
5453                        "function {} has 0 parameters, but was called with {}",
5454                        item,
5455                        scalar_args.len()
5456                    );
5457                }
5458            }
5459
5460            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5461            // accept a window frame here without an error msg. (Postgres also does this.)
5462            // TODO: maybe we should give a notice
5463
5464            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5465
5466            if ignore_nulls {
5467                // If we ever add a scalar window function that supports ignore, then don't forget
5468                // to also update HIR EXPLAIN.
5469                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5470            }
5471
5472            return Ok(HirScalarExpr::windowing(WindowExpr {
5473                func: WindowExprType::Scalar(ScalarWindowExpr {
5474                    func,
5475                    order_by: col_orders,
5476                }),
5477                partition_by,
5478                order_by: order_by_exprs,
5479            }));
5480        }
5481        Func::ValueWindow(impls) => {
5482            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5483            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5484                window_plan;
5485
5486            let (args_encoded, func) =
5487                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5488
5489            if ignore_nulls {
5490                match func {
5491                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5492                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5493                }
5494            }
5495
5496            return Ok(HirScalarExpr::windowing(WindowExpr {
5497                func: WindowExprType::Value(ValueWindowExpr {
5498                    func,
5499                    args: Box::new(args_encoded),
5500                    order_by: col_orders,
5501                    window_frame,
5502                    ignore_nulls, // (RESPECT NULLS is the default)
5503                }),
5504                partition_by,
5505                order_by: order_by_exprs,
5506            }));
5507        }
5508        Func::Aggregate(_) => {
5509            if f.over.is_none() {
5510                // Not a window aggregate. Something is wrong.
5511                if ecx.allow_aggregates {
5512                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5513                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5514                    sql_bail!(
5515                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5516                        name,
5517                    );
5518                } else {
5519                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5520                    // because it was in an unsupported context.
5521                    sql_bail!(
5522                        "aggregate functions are not allowed in {} (function {})",
5523                        ecx.name,
5524                        name
5525                    );
5526                }
5527            } else {
5528                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5529                    plan_window_function_common(ecx, &f.name, &f.over)?;
5530
5531                // https://github.com/MaterializeInc/database-issues/issues/6720
5532                match (&window_frame.start_bound, &window_frame.end_bound) {
5533                    (
5534                        mz_expr::WindowFrameBound::UnboundedPreceding,
5535                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5536                    )
5537                    | (
5538                        mz_expr::WindowFrameBound::UnboundedPreceding,
5539                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5540                    )
5541                    | (
5542                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5543                        mz_expr::WindowFrameBound::UnboundedFollowing,
5544                    )
5545                    | (
5546                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5547                        mz_expr::WindowFrameBound::UnboundedFollowing,
5548                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5549                    (_, _) => {} // other cases are ok
5550                }
5551
5552                if ignore_nulls {
5553                    // https://github.com/MaterializeInc/database-issues/issues/6722
5554                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5555                    // forget to also update HIR EXPLAIN.
5556                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5557                }
5558
5559                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5560
5561                if aggregate_expr.distinct {
5562                    // https://github.com/MaterializeInc/database-issues/issues/6626
5563                    bail_unsupported!("DISTINCT in window aggregates");
5564                }
5565
5566                return Ok(HirScalarExpr::windowing(WindowExpr {
5567                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5568                        aggregate_expr,
5569                        order_by: col_orders,
5570                        window_frame,
5571                    }),
5572                    partition_by,
5573                    order_by: order_by_exprs,
5574                }));
5575            }
5576        }
5577    };
5578
5579    if over.is_some() {
5580        unreachable!("If there is an OVER clause, we should have returned already above.");
5581    }
5582
5583    if *distinct {
5584        sql_bail!(
5585            "DISTINCT specified, but {} is not an aggregate function",
5586            ecx.qcx
5587                .scx
5588                .humanize_resolved_name(name)
5589                .expect("already resolved")
5590        );
5591    }
5592    if filter.is_some() {
5593        sql_bail!(
5594            "FILTER specified, but {} is not an aggregate function",
5595            ecx.qcx
5596                .scx
5597                .humanize_resolved_name(name)
5598                .expect("already resolved")
5599        );
5600    }
5601
5602    let scalar_args = match &args {
5603        FunctionArgs::Star => {
5604            sql_bail!(
5605                "* argument is invalid with non-aggregate function {}",
5606                ecx.qcx
5607                    .scx
5608                    .humanize_resolved_name(name)
5609                    .expect("already resolved")
5610            )
5611        }
5612        FunctionArgs::Args { args, order_by } => {
5613            if !order_by.is_empty() {
5614                sql_bail!(
5615                    "ORDER BY specified, but {} is not an aggregate function",
5616                    ecx.qcx
5617                        .scx
5618                        .humanize_resolved_name(name)
5619                        .expect("already resolved")
5620                );
5621            }
5622            plan_exprs(ecx, args)?
5623        }
5624    };
5625
5626    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5627}
5628
5629pub const IGNORE_NULLS_ERROR_MSG: &str =
5630    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5631
5632/// Resolves the name to a set of function implementations.
5633///
5634/// If the name does not specify a known built-in function, returns an error.
5635pub fn resolve_func(
5636    ecx: &ExprContext,
5637    name: &ResolvedItemName,
5638    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5639) -> Result<&'static Func, PlanError> {
5640    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5641        if let Ok(f) = i.func() {
5642            return Ok(f);
5643        }
5644    }
5645
5646    // Couldn't resolve function with this name, so generate verbose error
5647    // message.
5648    let cexprs = match args {
5649        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5650        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5651            if !order_by.is_empty() {
5652                sql_bail!(
5653                    "ORDER BY specified, but {} is not an aggregate function",
5654                    name
5655                );
5656            }
5657            plan_exprs(ecx, args)?
5658        }
5659    };
5660
5661    let arg_types: Vec<_> = cexprs
5662        .into_iter()
5663        .map(|ty| match ecx.scalar_type(&ty) {
5664            CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5665            CoercibleScalarType::Record(_) => "record".to_string(),
5666            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5667        })
5668        .collect();
5669
5670    Err(PlanError::UnknownFunction {
5671        name: name.to_string(),
5672        arg_types,
5673    })
5674}
5675
5676fn plan_is_expr<'a>(
5677    ecx: &ExprContext,
5678    expr: &'a Expr<Aug>,
5679    construct: &IsExprConstruct<Aug>,
5680    not: bool,
5681) -> Result<HirScalarExpr, PlanError> {
5682    let expr_hir = plan_expr(ecx, expr)?;
5683
5684    let mut result = match construct {
5685        IsExprConstruct::Null => {
5686            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5687            // at odds with our type coercion rules, which treat `NULL` literals
5688            // and unconstrained parameters identically. Providing a type hint
5689            // of string means we wind up supporting both.
5690            expr_hir.type_as_any(ecx)?.call_is_null()
5691        }
5692        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5693        IsExprConstruct::True => expr_hir
5694            .type_as(ecx, &SqlScalarType::Bool)?
5695            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5696        IsExprConstruct::False => expr_hir
5697            .type_as(ecx, &SqlScalarType::Bool)?
5698            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5699        IsExprConstruct::DistinctFrom(expr2) => {
5700            // There are three cases:
5701            // 1. Both terms are non-null, in which case the result should be `a != b`.
5702            // 2. Exactly one term is null, in which case the result should be true.
5703            // 3. Both terms are null, in which case the result should be false.
5704            //
5705            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5706
5707            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5708            // construct an AST expression for `expr != expr2` and plan it to get proper type
5709            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5710            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5711            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5712
5713            let expr1_hir = expr_hir.type_as_any(ecx)?;
5714            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5715
5716            let term1 = HirScalarExpr::variadic_or(vec![
5717                ne_hir,
5718                expr1_hir.clone().call_is_null(),
5719                expr2_hir.clone().call_is_null(),
5720            ]);
5721            let term2 = HirScalarExpr::variadic_or(vec![
5722                expr1_hir.call_is_null().not(),
5723                expr2_hir.call_is_null().not(),
5724            ]);
5725            term1.and(term2)
5726        }
5727    };
5728    if not {
5729        result = result.not();
5730    }
5731    Ok(result)
5732}
5733
5734fn plan_case<'a>(
5735    ecx: &ExprContext,
5736    operand: &'a Option<Box<Expr<Aug>>>,
5737    conditions: &'a [Expr<Aug>],
5738    results: &'a [Expr<Aug>],
5739    else_result: &'a Option<Box<Expr<Aug>>>,
5740) -> Result<HirScalarExpr, PlanError> {
5741    let mut cond_exprs = Vec::new();
5742    let mut result_exprs = Vec::new();
5743    for (c, r) in conditions.iter().zip_eq(results) {
5744        let c = match operand {
5745            Some(operand) => operand.clone().equals(c.clone()),
5746            None => c.clone(),
5747        };
5748        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5749        cond_exprs.push(cexpr);
5750        result_exprs.push(r);
5751    }
5752    result_exprs.push(match else_result {
5753        Some(else_result) => else_result,
5754        None => &Expr::Value(Value::Null),
5755    });
5756    let mut result_exprs = coerce_homogeneous_exprs(
5757        &ecx.with_name("CASE"),
5758        plan_exprs(ecx, &result_exprs)?,
5759        None,
5760    )?;
5761    let mut expr = result_exprs.pop().unwrap();
5762    assert_eq!(cond_exprs.len(), result_exprs.len());
5763    for (cexpr, rexpr) in cond_exprs
5764        .into_iter()
5765        .rev()
5766        .zip_eq(result_exprs.into_iter().rev())
5767    {
5768        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5769    }
5770    Ok(expr)
5771}
5772
5773fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5774    let (datum, scalar_type) = match l {
5775        Value::Number(s) => {
5776            let d = strconv::parse_numeric(s.as_str())?;
5777            if !s.contains(&['E', '.'][..]) {
5778                // Maybe representable as an int?
5779                if let Ok(n) = d.0.try_into() {
5780                    (Datum::Int32(n), SqlScalarType::Int32)
5781                } else if let Ok(n) = d.0.try_into() {
5782                    (Datum::Int64(n), SqlScalarType::Int64)
5783                } else {
5784                    (
5785                        Datum::Numeric(d),
5786                        SqlScalarType::Numeric { max_scale: None },
5787                    )
5788                }
5789            } else {
5790                (
5791                    Datum::Numeric(d),
5792                    SqlScalarType::Numeric { max_scale: None },
5793                )
5794            }
5795        }
5796        Value::HexString(_) => bail_unsupported!("hex string literals"),
5797        Value::Boolean(b) => match b {
5798            false => (Datum::False, SqlScalarType::Bool),
5799            true => (Datum::True, SqlScalarType::Bool),
5800        },
5801        Value::Interval(i) => {
5802            let i = literal::plan_interval(i)?;
5803            (Datum::Interval(i), SqlScalarType::Interval)
5804        }
5805        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5806        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5807    };
5808    let expr = HirScalarExpr::literal(datum, scalar_type);
5809    Ok(expr.into())
5810}
5811
5812/// The common part of the planning of non-aggregate window functions, i.e.,
5813/// scalar window functions and value window functions.
5814fn plan_window_function_non_aggr<'a>(
5815    ecx: &ExprContext,
5816    Function {
5817        name,
5818        args,
5819        filter,
5820        over,
5821        distinct,
5822    }: &'a Function<Aug>,
5823) -> Result<
5824    (
5825        bool,
5826        Vec<HirScalarExpr>,
5827        Vec<ColumnOrder>,
5828        mz_expr::WindowFrame,
5829        Vec<HirScalarExpr>,
5830        Vec<CoercibleScalarExpr>,
5831    ),
5832    PlanError,
5833> {
5834    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5835        plan_window_function_common(ecx, name, over)?;
5836
5837    if *distinct {
5838        sql_bail!(
5839            "DISTINCT specified, but {} is not an aggregate function",
5840            name
5841        );
5842    }
5843
5844    if filter.is_some() {
5845        bail_unsupported!("FILTER in non-aggregate window functions");
5846    }
5847
5848    let scalar_args = match &args {
5849        FunctionArgs::Star => {
5850            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5851        }
5852        FunctionArgs::Args { args, order_by } => {
5853            if !order_by.is_empty() {
5854                sql_bail!(
5855                    "ORDER BY specified, but {} is not an aggregate function",
5856                    name
5857                );
5858            }
5859            plan_exprs(ecx, args)?
5860        }
5861    };
5862
5863    Ok((
5864        ignore_nulls,
5865        order_by_exprs,
5866        col_orders,
5867        window_frame,
5868        partition,
5869        scalar_args,
5870    ))
5871}
5872
5873/// The common part of the planning of all window functions.
5874fn plan_window_function_common(
5875    ecx: &ExprContext,
5876    name: &<Aug as AstInfo>::ItemName,
5877    over: &Option<WindowSpec<Aug>>,
5878) -> Result<
5879    (
5880        bool,
5881        Vec<HirScalarExpr>,
5882        Vec<ColumnOrder>,
5883        mz_expr::WindowFrame,
5884        Vec<HirScalarExpr>,
5885    ),
5886    PlanError,
5887> {
5888    if !ecx.allow_windows {
5889        sql_bail!(
5890            "window functions are not allowed in {} (function {})",
5891            ecx.name,
5892            name
5893        );
5894    }
5895
5896    let window_spec = match over.as_ref() {
5897        Some(over) => over,
5898        None => sql_bail!("window function {} requires an OVER clause", name),
5899    };
5900    if window_spec.ignore_nulls && window_spec.respect_nulls {
5901        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5902    }
5903    let window_frame = match window_spec.window_frame.as_ref() {
5904        Some(frame) => plan_window_frame(frame)?,
5905        None => mz_expr::WindowFrame::default(),
5906    };
5907    let mut partition = Vec::new();
5908    for expr in &window_spec.partition_by {
5909        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5910    }
5911
5912    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5913
5914    Ok((
5915        window_spec.ignore_nulls,
5916        order_by_exprs,
5917        col_orders,
5918        window_frame,
5919        partition,
5920    ))
5921}
5922
5923fn plan_window_frame(
5924    WindowFrame {
5925        units,
5926        start_bound,
5927        end_bound,
5928    }: &WindowFrame,
5929) -> Result<mz_expr::WindowFrame, PlanError> {
5930    use mz_expr::WindowFrameBound::*;
5931    let units = window_frame_unit_ast_to_expr(units)?;
5932    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5933    let end_bound = end_bound
5934        .as_ref()
5935        .map(window_frame_bound_ast_to_expr)
5936        .unwrap_or(CurrentRow);
5937
5938    // Validate bounds according to Postgres rules
5939    match (&start_bound, &end_bound) {
5940        // Start bound can't be UNBOUNDED FOLLOWING
5941        (UnboundedFollowing, _) => {
5942            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5943        }
5944        // End bound can't be UNBOUNDED PRECEDING
5945        (_, UnboundedPreceding) => {
5946            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5947        }
5948        // Start bound should come before end bound in the list of bound definitions
5949        (CurrentRow, OffsetPreceding(_)) => {
5950            sql_bail!("frame starting from current row cannot have preceding rows")
5951        }
5952        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5953            sql_bail!("frame starting from following row cannot have preceding rows")
5954        }
5955        // The above rules are adopted from Postgres.
5956        // The following rules are Materialize-specific.
5957        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5958            // Note that the only hard limit is that partition size + offset should fit in i64, so
5959            // in theory, we could support much larger offsets than this. But for our current
5960            // performance, even 1000000 is quite big.
5961            if *o1 > 1000000 || *o2 > 1000000 {
5962                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5963            }
5964        }
5965        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5966            if *o1 > 1000000 || *o2 > 1000000 {
5967                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5968            }
5969        }
5970        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5971            if *o1 > 1000000 || *o2 > 1000000 {
5972                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5973            }
5974        }
5975        (OffsetPreceding(o), CurrentRow) => {
5976            if *o > 1000000 {
5977                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5978            }
5979        }
5980        (CurrentRow, OffsetFollowing(o)) => {
5981            if *o > 1000000 {
5982                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5983            }
5984        }
5985        // Other bounds are valid
5986        (_, _) => (),
5987    }
5988
5989    // RANGE is only supported in the default frame
5990    // https://github.com/MaterializeInc/database-issues/issues/6585
5991    if units == mz_expr::WindowFrameUnits::Range
5992        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5993    {
5994        bail_unsupported!("RANGE in non-default window frames")
5995    }
5996
5997    let frame = mz_expr::WindowFrame {
5998        units,
5999        start_bound,
6000        end_bound,
6001    };
6002    Ok(frame)
6003}
6004
6005fn window_frame_unit_ast_to_expr(
6006    unit: &WindowFrameUnits,
6007) -> Result<mz_expr::WindowFrameUnits, PlanError> {
6008    match unit {
6009        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
6010        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
6011        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
6012    }
6013}
6014
6015fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
6016    match bound {
6017        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
6018        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
6019        WindowFrameBound::Preceding(Some(offset)) => {
6020            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6021        }
6022        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6023        WindowFrameBound::Following(Some(offset)) => {
6024            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6025        }
6026    }
6027}
6028
6029pub fn scalar_type_from_sql(
6030    scx: &StatementContext,
6031    data_type: &ResolvedDataType,
6032) -> Result<SqlScalarType, PlanError> {
6033    match data_type {
6034        ResolvedDataType::AnonymousList(elem_type) => {
6035            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6036            if matches!(elem_type, SqlScalarType::Char { .. }) {
6037                bail_unsupported!("char list");
6038            }
6039            Ok(SqlScalarType::List {
6040                element_type: Box::new(elem_type),
6041                custom_id: None,
6042            })
6043        }
6044        ResolvedDataType::AnonymousMap {
6045            key_type,
6046            value_type,
6047        } => {
6048            match scalar_type_from_sql(scx, key_type)? {
6049                SqlScalarType::String => {}
6050                other => sql_bail!(
6051                    "map key type must be {}, got {}",
6052                    scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6053                    scx.humanize_sql_scalar_type(&other, false)
6054                ),
6055            }
6056            Ok(SqlScalarType::Map {
6057                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6058                custom_id: None,
6059            })
6060        }
6061        ResolvedDataType::Named { id, modifiers, .. } => {
6062            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6063        }
6064        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6065    }
6066}
6067
6068pub fn scalar_type_from_catalog(
6069    catalog: &dyn SessionCatalog,
6070    id: CatalogItemId,
6071    modifiers: &[i64],
6072) -> Result<SqlScalarType, PlanError> {
6073    let entry = catalog.get_item(&id);
6074    let type_details = match entry.type_details() {
6075        Some(type_details) => type_details,
6076        None => {
6077            // Resolution should never produce a `ResolvedDataType::Named` with
6078            // an ID of a non-type, but we error gracefully just in case.
6079            sql_bail!(
6080                "internal error: {} does not refer to a type",
6081                catalog.resolve_full_name(entry.name()).to_string().quoted()
6082            );
6083        }
6084    };
6085    match &type_details.typ {
6086        CatalogType::Numeric => {
6087            let mut modifiers = modifiers.iter().fuse();
6088            let precision = match modifiers.next() {
6089                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6090                    sql_bail!(
6091                        "precision for type numeric must be between 1 and {}",
6092                        NUMERIC_DATUM_MAX_PRECISION,
6093                    );
6094                }
6095                Some(p) => Some(*p),
6096                None => None,
6097            };
6098            let scale = match modifiers.next() {
6099                Some(scale) => {
6100                    if let Some(precision) = precision {
6101                        if *scale > precision {
6102                            sql_bail!(
6103                                "scale for type numeric must be between 0 and precision {}",
6104                                precision
6105                            );
6106                        }
6107                    }
6108                    Some(NumericMaxScale::try_from(*scale)?)
6109                }
6110                None => None,
6111            };
6112            if modifiers.next().is_some() {
6113                sql_bail!("type numeric supports at most two type modifiers");
6114            }
6115            Ok(SqlScalarType::Numeric { max_scale: scale })
6116        }
6117        CatalogType::Char => {
6118            let mut modifiers = modifiers.iter().fuse();
6119            let length = match modifiers.next() {
6120                Some(l) => Some(CharLength::try_from(*l)?),
6121                None => Some(CharLength::ONE),
6122            };
6123            if modifiers.next().is_some() {
6124                sql_bail!("type character supports at most one type modifier");
6125            }
6126            Ok(SqlScalarType::Char { length })
6127        }
6128        CatalogType::VarChar => {
6129            let mut modifiers = modifiers.iter().fuse();
6130            let length = match modifiers.next() {
6131                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6132                None => None,
6133            };
6134            if modifiers.next().is_some() {
6135                sql_bail!("type character varying supports at most one type modifier");
6136            }
6137            Ok(SqlScalarType::VarChar { max_length: length })
6138        }
6139        CatalogType::Timestamp => {
6140            let mut modifiers = modifiers.iter().fuse();
6141            let precision = match modifiers.next() {
6142                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6143                None => None,
6144            };
6145            if modifiers.next().is_some() {
6146                sql_bail!("type timestamp supports at most one type modifier");
6147            }
6148            Ok(SqlScalarType::Timestamp { precision })
6149        }
6150        CatalogType::TimestampTz => {
6151            let mut modifiers = modifiers.iter().fuse();
6152            let precision = match modifiers.next() {
6153                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6154                None => None,
6155            };
6156            if modifiers.next().is_some() {
6157                sql_bail!("type timestamp with time zone supports at most one type modifier");
6158            }
6159            Ok(SqlScalarType::TimestampTz { precision })
6160        }
6161        t => {
6162            if !modifiers.is_empty() {
6163                sql_bail!(
6164                    "{} does not support type modifiers",
6165                    catalog.resolve_full_name(entry.name()).to_string()
6166                );
6167            }
6168            match t {
6169                CatalogType::Array {
6170                    element_reference: element_id,
6171                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6172                    catalog,
6173                    *element_id,
6174                    modifiers,
6175                )?))),
6176                CatalogType::List {
6177                    element_reference: element_id,
6178                    element_modifiers,
6179                } => Ok(SqlScalarType::List {
6180                    element_type: Box::new(scalar_type_from_catalog(
6181                        catalog,
6182                        *element_id,
6183                        element_modifiers,
6184                    )?),
6185                    custom_id: Some(id),
6186                }),
6187                CatalogType::Map {
6188                    key_reference: _,
6189                    key_modifiers: _,
6190                    value_reference: value_id,
6191                    value_modifiers,
6192                } => Ok(SqlScalarType::Map {
6193                    value_type: Box::new(scalar_type_from_catalog(
6194                        catalog,
6195                        *value_id,
6196                        value_modifiers,
6197                    )?),
6198                    custom_id: Some(id),
6199                }),
6200                CatalogType::Range {
6201                    element_reference: element_id,
6202                } => Ok(SqlScalarType::Range {
6203                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6204                }),
6205                CatalogType::Record { fields } => {
6206                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6207                        .iter()
6208                        .map(|f| {
6209                            let scalar_type = scalar_type_from_catalog(
6210                                catalog,
6211                                f.type_reference,
6212                                &f.type_modifiers,
6213                            )?;
6214                            Ok((
6215                                f.name.clone(),
6216                                SqlColumnType {
6217                                    scalar_type,
6218                                    nullable: true,
6219                                },
6220                            ))
6221                        })
6222                        .collect::<Result<Box<_>, PlanError>>()?;
6223                    Ok(SqlScalarType::Record {
6224                        fields: scalars,
6225                        custom_id: Some(id),
6226                    })
6227                }
6228                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6229                CatalogType::Bool => Ok(SqlScalarType::Bool),
6230                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6231                CatalogType::Date => Ok(SqlScalarType::Date),
6232                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6233                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6234                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6235                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6236                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6237                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6238                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6239                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6240                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6241                CatalogType::Interval => Ok(SqlScalarType::Interval),
6242                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6243                CatalogType::Oid => Ok(SqlScalarType::Oid),
6244                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6245                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6246                CatalogType::Pseudo => {
6247                    sql_bail!(
6248                        "cannot reference pseudo type {}",
6249                        catalog.resolve_full_name(entry.name()).to_string()
6250                    )
6251                }
6252                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6253                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6254                CatalogType::RegType => Ok(SqlScalarType::RegType),
6255                CatalogType::String => Ok(SqlScalarType::String),
6256                CatalogType::Time => Ok(SqlScalarType::Time),
6257                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6258                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6259                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6260                CatalogType::Numeric => unreachable!("handled above"),
6261                CatalogType::Char => unreachable!("handled above"),
6262                CatalogType::VarChar => unreachable!("handled above"),
6263                CatalogType::Timestamp => unreachable!("handled above"),
6264                CatalogType::TimestampTz => unreachable!("handled above"),
6265            }
6266        }
6267    }
6268}
6269
6270/// This is used to collect aggregates and table functions from within an `Expr`.
6271/// See the explanation of aggregate handling at the top of the file for more details.
6272struct AggregateTableFuncVisitor<'a> {
6273    scx: &'a StatementContext<'a>,
6274    aggs: Vec<Function<Aug>>,
6275    within_aggregate: bool,
6276    tables: BTreeMap<Function<Aug>, String>,
6277    table_disallowed_context: Vec<&'static str>,
6278    in_select_item: bool,
6279    id_gen: IdGen,
6280    err: Option<PlanError>,
6281}
6282
6283impl<'a> AggregateTableFuncVisitor<'a> {
6284    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6285        AggregateTableFuncVisitor {
6286            scx,
6287            aggs: Vec::new(),
6288            within_aggregate: false,
6289            tables: BTreeMap::new(),
6290            table_disallowed_context: Vec::new(),
6291            in_select_item: false,
6292            id_gen: Default::default(),
6293            err: None,
6294        }
6295    }
6296
6297    fn into_result(
6298        self,
6299    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6300        match self.err {
6301            Some(err) => Err(err),
6302            None => {
6303                // Dedup while preserving the order. We don't care what the order is, but it
6304                // has to be reproducible so that EXPLAIN PLAN tests work.
6305                let mut seen = BTreeSet::new();
6306                let aggs = self
6307                    .aggs
6308                    .into_iter()
6309                    .filter(move |agg| seen.insert(agg.clone()))
6310                    .collect();
6311                Ok((aggs, self.tables))
6312            }
6313        }
6314    }
6315}
6316
6317impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6318    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6319        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6320            Ok(i) => i,
6321            // Catching missing functions later in planning improves error messages.
6322            Err(_) => return,
6323        };
6324
6325        match item.func() {
6326            // We don't want to collect window aggregations, because these will be handled not by
6327            // plan_aggregate, but by plan_function.
6328            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6329                if self.within_aggregate {
6330                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6331                    return;
6332                }
6333                self.aggs.push(func.clone());
6334                let Function {
6335                    name: _,
6336                    args,
6337                    filter,
6338                    over: _,
6339                    distinct: _,
6340                } = func;
6341                if let Some(filter) = filter {
6342                    self.visit_expr_mut(filter);
6343                }
6344                let old_within_aggregate = self.within_aggregate;
6345                self.within_aggregate = true;
6346                self.table_disallowed_context
6347                    .push("aggregate function calls");
6348
6349                self.visit_function_args_mut(args);
6350
6351                self.within_aggregate = old_within_aggregate;
6352                self.table_disallowed_context.pop();
6353            }
6354            Ok(Func::Table { .. }) => {
6355                self.table_disallowed_context.push("other table functions");
6356                visit_mut::visit_function_mut(self, func);
6357                self.table_disallowed_context.pop();
6358            }
6359            _ => visit_mut::visit_function_mut(self, func),
6360        }
6361    }
6362
6363    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6364        // Don't go into subqueries.
6365    }
6366
6367    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6368        let (disallowed_context, func) = match expr {
6369            Expr::Case { .. } => (Some("CASE"), None),
6370            Expr::HomogenizingFunction {
6371                function: HomogenizingFunction::Coalesce,
6372                ..
6373            } => (Some("COALESCE"), None),
6374            Expr::Function(func) if self.in_select_item => {
6375                // If we're in a SELECT list, replace table functions with a uuid identifier
6376                // and save the table func so it can be planned elsewhere.
6377                let mut table_func = None;
6378                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6379                    if let Ok(Func::Table { .. }) = item.func() {
6380                        if let Some(context) = self.table_disallowed_context.last() {
6381                            self.err = Some(sql_err!(
6382                                "table functions are not allowed in {} (function {})",
6383                                context,
6384                                func.name
6385                            ));
6386                            return;
6387                        }
6388                        table_func = Some(func.clone());
6389                    }
6390                }
6391                // Since we will descend into the table func below, don't add its own disallow
6392                // context here, instead use visit_function to set that.
6393                (None, table_func)
6394            }
6395            _ => (None, None),
6396        };
6397        if let Some(func) = func {
6398            // Since we are trading out expr, we need to visit the table func here.
6399            visit_mut::visit_expr_mut(self, expr);
6400            // Don't attempt to replace table functions with unsupported syntax.
6401            if let Function {
6402                name: _,
6403                args: _,
6404                filter: None,
6405                over: None,
6406                distinct: false,
6407            } = &func
6408            {
6409                // Identical table functions can be de-duplicated.
6410                let unique_id = self.id_gen.allocate_id();
6411                let id = self
6412                    .tables
6413                    .entry(func)
6414                    .or_insert_with(|| format!("table_func_{unique_id}"));
6415                // We know this is okay because id is is 11 characters + <=20 characters, which is
6416                // less than our max length.
6417                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6418            }
6419        }
6420        if let Some(context) = disallowed_context {
6421            self.table_disallowed_context.push(context);
6422        }
6423
6424        visit_mut::visit_expr_mut(self, expr);
6425
6426        if disallowed_context.is_some() {
6427            self.table_disallowed_context.pop();
6428        }
6429    }
6430
6431    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6432        let old = self.in_select_item;
6433        self.in_select_item = true;
6434        visit_mut::visit_select_item_mut(self, si);
6435        self.in_select_item = old;
6436    }
6437}
6438
6439#[derive(Default)]
6440struct WindowFuncCollector {
6441    window_funcs: Vec<Expr<Aug>>,
6442}
6443
6444impl WindowFuncCollector {
6445    fn into_result(self) -> Vec<Expr<Aug>> {
6446        // Dedup while preserving the order.
6447        let mut seen = BTreeSet::new();
6448        let window_funcs_dedupped = self
6449            .window_funcs
6450            .into_iter()
6451            .filter(move |expr| seen.insert(expr.clone()))
6452            // Reverse the order, so that in case of a nested window function call, the
6453            // inner one is evaluated first.
6454            .rev()
6455            .collect();
6456        window_funcs_dedupped
6457    }
6458}
6459
6460impl Visit<'_, Aug> for WindowFuncCollector {
6461    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6462        match expr {
6463            Expr::Function(func) => {
6464                if func.over.is_some() {
6465                    self.window_funcs.push(expr.clone());
6466                }
6467            }
6468            _ => (),
6469        }
6470        visit::visit_expr(self, expr);
6471    }
6472
6473    fn visit_query(&mut self, _query: &Query<Aug>) {
6474        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6475    }
6476}
6477
6478/// Specifies how long a query will live.
6479#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6480pub enum QueryLifetime {
6481    /// The query's (or the expression's) result will be computed at one point in time.
6482    OneShot,
6483    /// The query (or expression) is used in a dataflow that maintains an index.
6484    Index,
6485    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6486    MaterializedView,
6487    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6488    Subscribe,
6489    /// The query (or expression) is part of a (non-materialized) view.
6490    View,
6491    /// The expression is part of a source definition.
6492    Source,
6493}
6494
6495impl QueryLifetime {
6496    /// (This used to impact whether the query is allowed to reason about the time at which it is
6497    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6498    /// mechanism, see `ExprPrepStyle`.)
6499    pub fn is_one_shot(&self) -> bool {
6500        let result = match self {
6501            QueryLifetime::OneShot => true,
6502            QueryLifetime::Index => false,
6503            QueryLifetime::MaterializedView => false,
6504            QueryLifetime::Subscribe => false,
6505            QueryLifetime::View => false,
6506            QueryLifetime::Source => false,
6507        };
6508        assert_eq!(!result, self.is_maintained());
6509        result
6510    }
6511
6512    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6513    /// turned into a `TopK`.
6514    pub fn is_maintained(&self) -> bool {
6515        match self {
6516            QueryLifetime::OneShot => false,
6517            QueryLifetime::Index => true,
6518            QueryLifetime::MaterializedView => true,
6519            QueryLifetime::Subscribe => true,
6520            QueryLifetime::View => true,
6521            QueryLifetime::Source => true,
6522        }
6523    }
6524
6525    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6526    pub fn allow_show(&self) -> bool {
6527        match self {
6528            QueryLifetime::OneShot => true,
6529            QueryLifetime::Index => false,
6530            QueryLifetime::MaterializedView => false,
6531            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6532            QueryLifetime::View => false,
6533            QueryLifetime::Source => false,
6534        }
6535    }
6536}
6537
6538/// Description of a CTE sufficient for query planning.
6539#[derive(Debug, Clone)]
6540pub struct CteDesc {
6541    pub name: String,
6542    pub desc: RelationDesc,
6543}
6544
6545/// The state required when planning a `Query`.
6546#[derive(Debug, Clone)]
6547pub struct QueryContext<'a> {
6548    /// The context for the containing `Statement`.
6549    pub scx: &'a StatementContext<'a>,
6550    /// The lifetime that the planned query will have.
6551    pub lifetime: QueryLifetime,
6552    /// The scopes of the outer relation expression.
6553    pub outer_scopes: Vec<Scope>,
6554    /// The type of the outer relation expressions.
6555    pub outer_relation_types: Vec<SqlRelationType>,
6556    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6557    pub ctes: BTreeMap<LocalId, CteDesc>,
6558    /// A name manager, for interning column names that will be stored in HIR and MIR.
6559    pub name_manager: Rc<RefCell<NameManager>>,
6560    pub recursion_guard: RecursionGuard,
6561}
6562
6563impl CheckedRecursion for QueryContext<'_> {
6564    fn recursion_guard(&self) -> &RecursionGuard {
6565        &self.recursion_guard
6566    }
6567}
6568
6569impl<'a> QueryContext<'a> {
6570    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6571        QueryContext {
6572            scx,
6573            lifetime,
6574            outer_scopes: vec![],
6575            outer_relation_types: vec![],
6576            ctes: BTreeMap::new(),
6577            name_manager: Rc::new(RefCell::new(NameManager::new())),
6578            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6579        }
6580    }
6581
6582    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6583        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6584    }
6585
6586    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6587    /// `self`.
6588    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6589        let ctes = self.ctes.clone();
6590        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6591        let outer_relation_types = iter::once(relation_type)
6592            .chain(self.outer_relation_types.clone())
6593            .collect();
6594        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6595        let name_manager = Rc::clone(&self.name_manager);
6596
6597        QueryContext {
6598            scx: self.scx,
6599            lifetime: self.lifetime,
6600            outer_scopes,
6601            outer_relation_types,
6602            ctes,
6603            name_manager,
6604            recursion_guard: self.recursion_guard.clone(),
6605        }
6606    }
6607
6608    /// Derives a `QueryContext` for a scope that contains no columns.
6609    fn empty_derived_context(&self) -> QueryContext<'a> {
6610        let scope = Scope::empty();
6611        let ty = SqlRelationType::empty();
6612        self.derived_context(scope, ty)
6613    }
6614
6615    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6616    /// CTE.
6617    pub fn resolve_table_name(
6618        &self,
6619        object: ResolvedItemName,
6620    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6621        match object {
6622            ResolvedItemName::Item {
6623                id,
6624                full_name,
6625                version,
6626                ..
6627            } => {
6628                let item = self.scx.get_item(&id).at_version(version);
6629                let desc = match item.relation_desc() {
6630                    Some(desc) => desc.clone(),
6631                    None => {
6632                        return Err(PlanError::InvalidDependency {
6633                            name: full_name.to_string(),
6634                            item_type: item.item_type().to_string(),
6635                        });
6636                    }
6637                };
6638                let expr = HirRelationExpr::Get {
6639                    id: Id::Global(item.global_id()),
6640                    typ: desc.typ().clone(),
6641                };
6642
6643                let name = full_name.into();
6644                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6645
6646                Ok((expr, scope))
6647            }
6648            ResolvedItemName::Cte { id, name } => {
6649                let name = name.into();
6650                let cte = self.ctes.get(&id).unwrap();
6651                let expr = HirRelationExpr::Get {
6652                    id: Id::Local(id),
6653                    typ: cte.desc.typ().clone(),
6654                };
6655
6656                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6657
6658                Ok((expr, scope))
6659            }
6660            ResolvedItemName::ContinualTask { id, name } => {
6661                let cte = self.ctes.get(&id).unwrap();
6662                let expr = HirRelationExpr::Get {
6663                    id: Id::Local(id),
6664                    typ: cte.desc.typ().clone(),
6665                };
6666
6667                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6668
6669                Ok((expr, scope))
6670            }
6671            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6672        }
6673    }
6674
6675    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6676    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6677    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6678        self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6679    }
6680}
6681
6682/// A bundle of unrelated things that we need for planning `Expr`s.
6683#[derive(Debug, Clone)]
6684pub struct ExprContext<'a> {
6685    pub qcx: &'a QueryContext<'a>,
6686    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6687    pub name: &'a str,
6688    /// The context for the `Query` that contains this `Expr`.
6689    /// The current scope.
6690    pub scope: &'a Scope,
6691    /// The type of the current relation expression upon which this scalar
6692    /// expression will be evaluated.
6693    pub relation_type: &'a SqlRelationType,
6694    /// Are aggregate functions allowed in this context
6695    pub allow_aggregates: bool,
6696    /// Are subqueries allowed in this context
6697    pub allow_subqueries: bool,
6698    /// Are parameters allowed in this context.
6699    pub allow_parameters: bool,
6700    /// Are window functions allowed in this context
6701    pub allow_windows: bool,
6702}
6703
6704impl CheckedRecursion for ExprContext<'_> {
6705    fn recursion_guard(&self) -> &RecursionGuard {
6706        &self.qcx.recursion_guard
6707    }
6708}
6709
6710impl<'a> ExprContext<'a> {
6711    pub fn catalog(&self) -> &dyn SessionCatalog {
6712        self.qcx.scx.catalog
6713    }
6714
6715    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6716        let mut ecx = self.clone();
6717        ecx.name = name;
6718        ecx
6719    }
6720
6721    pub fn column_type<E>(&self, expr: &E) -> E::Type
6722    where
6723        E: AbstractExpr,
6724    {
6725        expr.typ(
6726            &self.qcx.outer_relation_types,
6727            self.relation_type,
6728            &self.qcx.scx.param_types.borrow(),
6729        )
6730    }
6731
6732    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6733    where
6734        E: AbstractExpr,
6735    {
6736        self.column_type(expr).scalar_type()
6737    }
6738
6739    fn derived_query_context(&self) -> QueryContext<'_> {
6740        let mut scope = self.scope.clone();
6741        scope.lateral_barrier = true;
6742        self.qcx.derived_context(scope, self.relation_type.clone())
6743    }
6744
6745    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6746        self.qcx.scx.require_feature_flag(flag)
6747    }
6748
6749    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6750        &self.qcx.scx.param_types
6751    }
6752
6753    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6754    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6755    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6756        self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6757    }
6758
6759    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6760        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6761    }
6762}
6763
6764/// Manages column names, doing lightweight string internment.
6765///
6766/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6767/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6768/// to ensure maximal sharing.
6769#[derive(Debug, Clone)]
6770pub struct NameManager(BTreeSet<Arc<str>>);
6771
6772impl NameManager {
6773    /// Creates a new `NameManager`, with no interned names
6774    pub fn new() -> Self {
6775        Self(BTreeSet::new())
6776    }
6777
6778    /// Interns a string, returning a reference-counted pointer to the interned
6779    /// string.
6780    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6781        let s = s.as_ref();
6782        if let Some(interned) = self.0.get(s) {
6783            Arc::clone(interned)
6784        } else {
6785            let interned: Arc<str> = Arc::from(s);
6786            self.0.insert(Arc::clone(&interned));
6787            interned
6788        }
6789    }
6790
6791    /// Interns a string representing a reference to a `ScopeItem`, returning a
6792    /// reference-counted pointer to the interned string.
6793    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6794        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6795        //
6796        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6797        // name information. Note that as of 2025-04-09, we don't support column
6798        // renames.
6799        //
6800        // A few bad alternatives:
6801        //
6802        // (1) Store it but don't write it down. This fails because the expression
6803        //     cache will erase our names on restart.
6804        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6805        //     get the right names. But the world now and the world when we made
6806        //     those objects may be different.
6807        // (3) Just don't write down the table name. Nothing fails... for now.
6808
6809        self.intern(item.column_name.as_str())
6810    }
6811}
6812
6813#[cfg(test)]
6814mod test {
6815    use super::*;
6816
6817    /// Ensure that `NameManager`'s string interning works as expected.
6818    ///
6819    /// In particular, structurally but not referentially identical strings should
6820    /// be interned to the same `Arc`ed pointer.
6821    #[mz_ore::test]
6822    pub fn test_name_manager_string_interning() {
6823        let mut nm = NameManager::new();
6824
6825        let orig_hi = "hi";
6826        let hi = nm.intern(orig_hi);
6827        let hello = nm.intern("hello");
6828
6829        assert_ne!(hi.as_ptr(), hello.as_ptr());
6830
6831        // this static string is _likely_ the same as `orig_hi``
6832        let hi2 = nm.intern("hi");
6833        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6834
6835        // generate a "hi" string that doesn't get optimized to the same static string
6836        let s = format!(
6837            "{}{}",
6838            hi.chars().nth(0).unwrap(),
6839            hi2.chars().nth(1).unwrap()
6840        );
6841        // make sure that we're testing with a fresh string!
6842        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6843
6844        let hi3 = nm.intern(s);
6845        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6846    }
6847}