Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50    ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51    MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55    Eval, Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME,
56    RowSetFinishing, TableFunc, func as expr_func,
57};
58use mz_ore::collections::CollectionExt;
59use mz_ore::error::ErrorExt;
60use mz_ore::id_gen::IdGen;
61use mz_ore::option::FallibleMapExt;
62use mz_ore::stack::{CheckedRecursion, RecursionGuard};
63use mz_ore::str::StrExt;
64use mz_repr::adt::char::CharLength;
65use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
66use mz_repr::adt::timestamp::TimestampPrecision;
67use mz_repr::adt::varchar::VarCharMaxLength;
68use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
69use mz_repr::{
70    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
71    ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
72    UNKNOWN_COLUMN_NAME, strconv,
73};
74use mz_sql_parser::ast::display::AstDisplay;
75use mz_sql_parser::ast::visit::Visit;
76use mz_sql_parser::ast::visit_mut::{self, VisitMut};
77use mz_sql_parser::ast::{
78    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
79    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
80    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
81    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
82    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
83    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
84    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
85    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
86};
87use mz_sql_parser::ident;
88
89use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
90use crate::func::{self, Func, FuncSpec, TableFuncImpl};
91use crate::names::{
92    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
93};
94use crate::plan::PlanError::InvalidWmrRecursionLimit;
95use crate::plan::error::PlanError;
96use crate::plan::hir::{
97    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
98    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
99    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
100    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
101};
102use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
103use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
104use crate::plan::statement::{StatementContext, StatementDesc, show};
105use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
106use crate::plan::{
107    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
108    literal, transform_ast,
109};
110use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
111use crate::session::vars::{self, FeatureFlag};
112use crate::{ORDINALITY_COL_NAME, normalize};
113
114#[derive(Debug)]
115pub struct PlannedRootQuery<E> {
116    pub expr: E,
117    pub desc: RelationDesc,
118    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
119    pub scope: Scope,
120}
121
122/// Plans a top-level query, returning the `HirRelationExpr` describing the query
123/// plan, the `RelationDesc` describing the shape of the result set, a
124/// `RowSetFinishing` describing post-processing that must occur before results
125/// are sent to the client, and the types of the parameters in the query, if any
126/// were present.
127///
128/// Note that the returned `RelationDesc` describes the expression after
129/// applying the returned `RowSetFinishing`.
130#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
131pub fn plan_root_query(
132    scx: &StatementContext,
133    mut query: Query<Aug>,
134    lifetime: QueryLifetime,
135) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
136    transform_ast::transform(scx, &mut query)?;
137    let mut qcx = QueryContext::root(scx, lifetime);
138    let PlannedQuery {
139        mut expr,
140        scope,
141        order_by,
142        limit,
143        offset,
144        project,
145        group_size_hints,
146    } = plan_query(&mut qcx, &query)?;
147
148    let mut finishing = RowSetFinishing {
149        limit,
150        offset,
151        project,
152        order_by,
153    };
154
155    // Attempt to push the finishing's ordering past its projection. This allows
156    // data to be projected down on the workers rather than the coordinator. It
157    // also improves the optimizer's demand analysis, as the optimizer can only
158    // reason about demand information in `expr` (i.e., it can't see
159    // `finishing.project`).
160    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
161
162    if lifetime.is_maintained() {
163        expr.finish_maintained(&mut finishing, group_size_hints);
164    }
165
166    let typ = qcx.relation_type(&expr);
167    let typ = SqlRelationType::new(
168        finishing
169            .project
170            .iter()
171            .map(|i| typ.column_types[*i].clone())
172            .collect(),
173    );
174    let desc = RelationDesc::new(typ, scope.column_names());
175
176    Ok(PlannedRootQuery {
177        expr,
178        desc,
179        finishing,
180        scope,
181    })
182}
183
184/// Attempts to push a projection through an order by.
185///
186/// The returned bool indicates whether the pushdown was successful or not.
187/// Successful pushdown requires that all the columns referenced in `order_by`
188/// are included in `project`.
189///
190/// When successful, `expr` is wrapped in a projection node, `order_by` is
191/// rewritten to account for the pushed-down projection, and `project` is
192/// replaced with the trivial projection. When unsuccessful, no changes are made
193/// to any of the inputs.
194fn try_push_projection_order_by(
195    expr: &mut HirRelationExpr,
196    project: &mut Vec<usize>,
197    order_by: &mut Vec<ColumnOrder>,
198) -> bool {
199    let mut unproject = vec![None; expr.arity()];
200    for (out_i, in_i) in project.iter().copied().enumerate() {
201        unproject[in_i] = Some(out_i);
202    }
203    if order_by
204        .iter()
205        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
206    {
207        let trivial_project = (0..project.len()).collect();
208        *expr = expr.take().project(mem::replace(project, trivial_project));
209        for ob in order_by {
210            ob.column = unproject[ob.column].unwrap();
211        }
212        true
213    } else {
214        false
215    }
216}
217
218pub fn plan_insert_query(
219    scx: &StatementContext,
220    table_name: ResolvedItemName,
221    columns: Vec<Ident>,
222    source: InsertSource<Aug>,
223    returning: Vec<SelectItem<Aug>>,
224) -> Result<
225    (
226        CatalogItemId,
227        HirRelationExpr,
228        PlannedRootQuery<Vec<HirScalarExpr>>,
229    ),
230    PlanError,
231> {
232    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
233    let table = scx.get_item_by_resolved_name(&table_name)?;
234
235    // Validate the target of the insert.
236    if table.item_type() != CatalogItemType::Table {
237        sql_bail!(
238            "cannot insert into {} '{}'",
239            table.item_type(),
240            table_name.full_name_str()
241        );
242    }
243    let desc = table
244        .relation_desc()
245        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
246    let mut defaults = table
247        .writable_table_details()
248        .ok_or_else(|| {
249            sql_err!(
250                "cannot insert into non-writeable table '{}'",
251                table_name.full_name_str()
252            )
253        })?
254        .to_vec();
255
256    for default in &mut defaults {
257        transform_ast::transform(scx, default)?;
258    }
259
260    if table.id().is_system() {
261        sql_bail!(
262            "cannot insert into system table '{}'",
263            table_name.full_name_str()
264        );
265    }
266
267    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
268
269    // Validate target column order.
270    let mut source_types = Vec::with_capacity(columns.len());
271    let mut ordering = Vec::with_capacity(columns.len());
272
273    if columns.is_empty() {
274        // Columns in source query must be in order. Let's guess the full shape and truncate to the
275        // right size later after planning the source query
276        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
277        ordering.extend(0..desc.arity());
278    } else {
279        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
280            .iter()
281            .enumerate()
282            .map(|(idx, (name, typ))| (name, (idx, typ)))
283            .collect();
284
285        for c in &columns {
286            if let Some((idx, typ)) = column_by_name.get(c) {
287                ordering.push(*idx);
288                source_types.push(&typ.scalar_type);
289            } else {
290                sql_bail!(
291                    "column {} of relation {} does not exist",
292                    c.quoted(),
293                    table_name.full_name_str().quoted()
294                );
295            }
296        }
297        if let Some(dup) = columns.iter().duplicates().next() {
298            sql_bail!("column {} specified more than once", dup.quoted());
299        }
300    };
301
302    // Plan the source.
303    let expr = match source {
304        InsertSource::Query(mut query) => {
305            transform_ast::transform(scx, &mut query)?;
306
307            match query {
308                // Special-case simple VALUES clauses as PostgreSQL does.
309                Query {
310                    body: SetExpr::Values(Values(values)),
311                    ctes,
312                    order_by,
313                    limit: None,
314                    offset: None,
315                } if ctes.is_empty() && order_by.is_empty() => {
316                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
317                    plan_values_insert(&qcx, &names, &source_types, &values)?
318                }
319                _ => {
320                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
321                    expr
322                }
323            }
324        }
325        InsertSource::DefaultValues => {
326            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
327        }
328    };
329
330    let expr_arity = expr.arity();
331
332    // Validate that the arity of the source query is at most the size of declared columns or the
333    // size of the table if none are declared
334    let max_columns = if columns.is_empty() {
335        desc.arity()
336    } else {
337        columns.len()
338    };
339    if expr_arity > max_columns {
340        sql_bail!("INSERT has more expressions than target columns");
341    }
342    // But it should never have less than the declared columns (or zero)
343    if expr_arity < columns.len() {
344        sql_bail!("INSERT has more target columns than expressions");
345    }
346
347    // Trim now that we know for sure the correct arity of the source query
348    source_types.truncate(expr_arity);
349    ordering.truncate(expr_arity);
350
351    // Ensure the types of the source query match the types of the target table,
352    // installing assignment casts where necessary and possible.
353    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
354        sql_err!(
355            "column {} is of type {} but expression is of type {}",
356            desc.get_name(ordering[e.column]).quoted(),
357            qcx.humanize_sql_scalar_type(&e.target_type, false),
358            qcx.humanize_sql_scalar_type(&e.source_type, false),
359        )
360    })?;
361
362    // Fill in any omitted columns and rearrange into correct order
363    let mut map_exprs = vec![];
364    let mut project_key = Vec::with_capacity(desc.arity());
365
366    // Maps from table column index to position in the source query
367    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
368
369    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
370    for (col_idx, (col_typ, default)) in column_details {
371        if let Some(src_idx) = col_to_source.get(&col_idx) {
372            project_key.push(*src_idx);
373        } else {
374            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
375            project_key.push(expr_arity + map_exprs.len());
376            map_exprs.push(hir);
377        }
378    }
379
380    let returning = {
381        let (scope, typ) = if let ResolvedItemName::Item {
382            full_name,
383            version: _,
384            ..
385        } = table_name
386        {
387            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
388            let typ = desc.typ().clone();
389            (scope, typ)
390        } else {
391            (Scope::empty(), SqlRelationType::empty())
392        };
393        let ecx = &ExprContext {
394            qcx: &qcx,
395            name: "RETURNING clause",
396            scope: &scope,
397            relation_type: &typ,
398            allow_aggregates: false,
399            allow_subqueries: false,
400            allow_parameters: true,
401            allow_windows: false,
402        };
403        let table_func_names = BTreeMap::new();
404        let mut output_columns = vec![];
405        let mut new_exprs = vec![];
406        let mut new_type = SqlRelationType::empty();
407        for mut si in returning {
408            transform_ast::transform(scx, &mut si)?;
409            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
410                let expr = match &select_item {
411                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
412                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
413                };
414                output_columns.push(column_name);
415                let typ = ecx.column_type(&expr);
416                new_type.column_types.push(typ);
417                new_exprs.push(expr);
418            }
419        }
420        let desc = RelationDesc::new(new_type, output_columns);
421        let desc_arity = desc.arity();
422        PlannedRootQuery {
423            expr: new_exprs,
424            desc,
425            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
426            scope,
427        }
428    };
429
430    Ok((
431        table.id(),
432        expr.map(map_exprs).project(project_key),
433        returning,
434    ))
435}
436
437/// Determines the mapping between some external data and a Materialize relation.
438///
439/// Returns the following:
440/// * [`CatalogItemId`] for the destination table.
441/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
442/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
443///   since we now return a [`MapFilterProject`].
444/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
445///   destination table.
446///
447pub fn plan_copy_item(
448    scx: &StatementContext,
449    item_name: ResolvedItemName,
450    columns: Vec<Ident>,
451) -> Result<
452    (
453        CatalogItemId,
454        RelationDesc,
455        Vec<ColumnIndex>,
456        Option<MapFilterProject>,
457    ),
458    PlanError,
459> {
460    let item = scx.get_item_by_resolved_name(&item_name)?;
461    let fullname = scx.catalog.resolve_full_name(item.name());
462    let table_desc = match item.relation_desc() {
463        Some(desc) => desc.into_owned(),
464        None => {
465            return Err(PlanError::InvalidDependency {
466                name: fullname.to_string(),
467                item_type: item.item_type().to_string(),
468            });
469        }
470    };
471    let mut ordering = Vec::with_capacity(columns.len());
472
473    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
474    // should be simplified. The reason they are currently separate code paths is so we can roll
475    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
476
477    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
478    // FROM SOURCE ...`), then we generate an MFP.
479    //
480    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
481    // so it's not always guaranteed that our `item` is a table.
482    let mfp = if let Some(table_defaults) = item.writable_table_details() {
483        let mut table_defaults = table_defaults.to_vec();
484
485        for default in &mut table_defaults {
486            transform_ast::transform(scx, default)?;
487        }
488
489        // Fill in any omitted columns and rearrange into correct order
490        let source_column_names: Vec<_> = columns
491            .iter()
492            .cloned()
493            .map(normalize::column_name)
494            .collect();
495
496        let mut default_exprs = Vec::new();
497        let mut project_keys = Vec::with_capacity(table_desc.arity());
498
499        // For each column in the destination table, either project it from the source data, or provide
500        // an expression to fill in a default value.
501        let column_details = table_desc.iter().zip_eq(table_defaults);
502        for ((col_name, col_type), col_default) in column_details {
503            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
504            if let Some(src_idx) = maybe_src_idx {
505                project_keys.push(src_idx);
506            } else {
507                // If one a column from the table does not exist in the source data, then a default
508                // value will get appended to the end of the input Row from the source data.
509                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
510                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
511                project_keys.push(source_column_names.len() + default_exprs.len());
512                default_exprs.push(mir);
513            }
514        }
515
516        let mfp = MapFilterProject::new(source_column_names.len())
517            .map(default_exprs)
518            .project(project_keys);
519        Some(mfp)
520    } else {
521        None
522    };
523
524    // Create a mapping from input data to the table we're copying into.
525    let source_desc = if columns.is_empty() {
526        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
527        ordering.extend(indexes);
528
529        // The source data should be in the same order as the table.
530        table_desc
531    } else {
532        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
533        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
534            .iter_all()
535            .map(|(idx, name, typ)| (name, (*idx, typ)))
536            .collect();
537
538        let mut names = Vec::with_capacity(columns.len());
539        let mut source_types = Vec::with_capacity(columns.len());
540
541        for c in &columns {
542            if let Some((idx, typ)) = column_by_name.get(c) {
543                ordering.push(*idx);
544                source_types.push((*typ).clone());
545                names.push(c.clone());
546            } else {
547                sql_bail!(
548                    "column {} of relation {} does not exist",
549                    c.quoted(),
550                    item_name.full_name_str().quoted()
551                );
552            }
553        }
554        if let Some(dup) = columns.iter().duplicates().next() {
555            sql_bail!("column {} specified more than once", dup.quoted());
556        }
557
558        // The source data is a different shape than the destination table.
559        RelationDesc::new(SqlRelationType::new(source_types), names)
560    };
561
562    Ok((item.id(), source_desc, ordering, mfp))
563}
564
565/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
566///
567/// TODO(cf3): Merge this method with [`plan_copy_item`].
568pub fn plan_copy_from(
569    scx: &StatementContext,
570    table_name: ResolvedItemName,
571    columns: Vec<Ident>,
572) -> Result<
573    (
574        CatalogItemId,
575        RelationDesc,
576        Vec<ColumnIndex>,
577        Option<MapFilterProject>,
578    ),
579    PlanError,
580> {
581    let table = scx.get_item_by_resolved_name(&table_name)?;
582
583    // Validate the target of the insert.
584    if table.item_type() != CatalogItemType::Table {
585        sql_bail!(
586            "cannot insert into {} '{}'",
587            table.item_type(),
588            table_name.full_name_str()
589        );
590    }
591
592    let _ = table.writable_table_details().ok_or_else(|| {
593        sql_err!(
594            "cannot insert into non-writeable table '{}'",
595            table_name.full_name_str()
596        )
597    })?;
598
599    if table.id().is_system() {
600        sql_bail!(
601            "cannot insert into system table '{}'",
602            table_name.full_name_str()
603        );
604    }
605    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
606
607    Ok((id, desc, ordering, mfp))
608}
609
610/// Builds a plan that adds the default values for the missing columns and re-orders
611/// the datums in the given rows to match the order in the target table.
612pub fn plan_copy_from_rows(
613    pcx: &PlanContext,
614    catalog: &dyn SessionCatalog,
615    target_id: CatalogItemId,
616    target_name: String,
617    columns: Vec<ColumnIndex>,
618    rows: Vec<mz_repr::Row>,
619) -> Result<HirRelationExpr, PlanError> {
620    let scx = StatementContext::new(Some(pcx), catalog);
621
622    // Always copy at the latest version of the table.
623    let table = catalog
624        .try_get_item(&target_id)
625        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
626        .at_version(RelationVersionSelector::Latest);
627
628    let mut defaults = table
629        .writable_table_details()
630        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
631        .to_vec();
632
633    for default in &mut defaults {
634        transform_ast::transform(&scx, default)?;
635    }
636
637    let desc = table
638        .relation_desc()
639        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
640    let column_types = columns
641        .iter()
642        .map(|x| desc.get_type(x).clone())
643        .map(|mut x| {
644            // Null constraint is enforced later, when inserting the row in the table.
645            // Without this, an assert is hit during lowering.
646            x.nullable = true;
647            x
648        })
649        .collect();
650    let typ = SqlRelationType::new(column_types);
651    let expr = HirRelationExpr::Constant {
652        rows,
653        typ: typ.clone(),
654    };
655
656    // Exit early with just the raw constant if we know that all columns are present
657    // and in the correct order. This lets us bypass expensive downstream optimizations
658    // more easily, as at every stage we know this expression is nothing more than
659    // a constant (as opposed to e.g. a constant with with an identity map and identity
660    // projection).
661    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
662    if columns == default {
663        return Ok(expr);
664    }
665
666    // Fill in any omitted columns and rearrange into correct order
667    let mut map_exprs = vec![];
668    let mut project_key = Vec::with_capacity(desc.arity());
669
670    // Maps from table column index to position in the source query
671    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
672
673    let column_details = desc.iter_all().zip_eq(defaults);
674    for ((col_idx, _col_name, col_typ), default) in column_details {
675        if let Some(src_idx) = col_to_source.get(&col_idx) {
676            project_key.push(*src_idx);
677        } else {
678            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
679            project_key.push(typ.arity() + map_exprs.len());
680            map_exprs.push(hir);
681        }
682    }
683
684    Ok(expr.map(map_exprs).project(project_key))
685}
686
687/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
688pub struct ReadThenWritePlan {
689    pub id: CatalogItemId,
690    /// Read portion of query.
691    ///
692    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
693    /// retractions.
694    pub selection: HirRelationExpr,
695    /// Map from column index to SET expression. Empty for DELETE statements.
696    pub assignments: BTreeMap<usize, HirScalarExpr>,
697    pub finishing: RowSetFinishing,
698}
699
700pub fn plan_delete_query(
701    scx: &StatementContext,
702    mut delete_stmt: DeleteStatement<Aug>,
703) -> Result<ReadThenWritePlan, PlanError> {
704    transform_ast::transform(scx, &mut delete_stmt)?;
705
706    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
707    plan_mutation_query_inner(
708        qcx,
709        delete_stmt.table_name,
710        delete_stmt.alias,
711        delete_stmt.using,
712        vec![],
713        delete_stmt.selection,
714    )
715}
716
717pub fn plan_update_query(
718    scx: &StatementContext,
719    mut update_stmt: UpdateStatement<Aug>,
720) -> Result<ReadThenWritePlan, PlanError> {
721    transform_ast::transform(scx, &mut update_stmt)?;
722
723    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
724
725    plan_mutation_query_inner(
726        qcx,
727        update_stmt.table_name,
728        update_stmt.alias,
729        vec![],
730        update_stmt.assignments,
731        update_stmt.selection,
732    )
733}
734
735pub fn plan_mutation_query_inner(
736    qcx: QueryContext,
737    table_name: ResolvedItemName,
738    alias: Option<TableAlias>,
739    using: Vec<TableWithJoins<Aug>>,
740    assignments: Vec<Assignment<Aug>>,
741    selection: Option<Expr<Aug>>,
742) -> Result<ReadThenWritePlan, PlanError> {
743    // Get ID and version of the relation desc.
744    let (id, version) = match table_name {
745        ResolvedItemName::Item { id, version, .. } => (id, version),
746        _ => sql_bail!("cannot mutate non-user table"),
747    };
748
749    // Perform checks on item with given ID.
750    let item = qcx.scx.get_item(&id).at_version(version);
751    if item.item_type() != CatalogItemType::Table {
752        sql_bail!(
753            "cannot mutate {} '{}'",
754            item.item_type(),
755            table_name.full_name_str()
756        );
757    }
758    let _ = item.writable_table_details().ok_or_else(|| {
759        sql_err!(
760            "cannot mutate non-writeable table '{}'",
761            table_name.full_name_str()
762        )
763    })?;
764    if id.is_system() {
765        sql_bail!(
766            "cannot mutate system table '{}'",
767            table_name.full_name_str()
768        );
769    }
770
771    // Derive structs for operation from validated table
772    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
773    let scope = plan_table_alias(scope, alias.as_ref())?;
774    let desc = item.relation_desc().expect("table has desc");
775    let relation_type = qcx.relation_type(&get);
776
777    if using.is_empty() {
778        if let Some(expr) = selection {
779            let ecx = &ExprContext {
780                qcx: &qcx,
781                name: "WHERE clause",
782                scope: &scope,
783                relation_type: &relation_type,
784                allow_aggregates: false,
785                allow_subqueries: true,
786                allow_parameters: true,
787                allow_windows: false,
788            };
789            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
790            get = get.filter(vec![expr]);
791        }
792    } else {
793        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
794    }
795
796    let mut sets = BTreeMap::new();
797    for Assignment { id, value } in assignments {
798        // Get the index and type of the column.
799        let name = normalize::column_name(id);
800        match desc.get_by_name(&name) {
801            Some((idx, typ)) => {
802                let ecx = &ExprContext {
803                    qcx: &qcx,
804                    name: "SET clause",
805                    scope: &scope,
806                    relation_type: &relation_type,
807                    allow_aggregates: false,
808                    allow_subqueries: false,
809                    allow_parameters: true,
810                    allow_windows: false,
811                };
812                let expr = plan_expr(ecx, &value)?.cast_to(
813                    ecx,
814                    CastContext::Assignment,
815                    &typ.scalar_type,
816                )?;
817
818                if sets.insert(idx, expr).is_some() {
819                    sql_bail!("column {} set twice", name)
820                }
821            }
822            None => sql_bail!("unknown column {}", name),
823        };
824    }
825
826    let finishing = RowSetFinishing {
827        order_by: vec![],
828        limit: None,
829        offset: 0,
830        project: (0..desc.arity()).collect(),
831    };
832
833    Ok(ReadThenWritePlan {
834        id,
835        selection: get,
836        finishing,
837        assignments: sets,
838    })
839}
840
841// Adjust `get` to perform an existential subquery on `using` accounting for
842// `selection`.
843//
844// If `USING`, we essentially want to rewrite the query as a correlated
845// existential subquery, i.e.
846// ```
847// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
848// ```
849// However, we can't do that directly because of esoteric rules w/r/t `lateral`
850// subqueries.
851// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
852fn handle_mutation_using_clause(
853    qcx: &QueryContext,
854    selection: Option<Expr<Aug>>,
855    using: Vec<TableWithJoins<Aug>>,
856    get: HirRelationExpr,
857    outer_scope: Scope,
858) -> Result<HirRelationExpr, PlanError> {
859    // Plan `USING` as a cross-joined `FROM` without knowledge of the
860    // statement's `FROM` target. This prevents `lateral` subqueries from
861    // "seeing" the `FROM` target.
862    let (mut using_rel_expr, using_scope) =
863        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
864            let (left, left_scope) = l;
865            plan_join(
866                qcx,
867                left,
868                left_scope,
869                &Join {
870                    relation: TableFactor::NestedJoin {
871                        join: Box::new(twj),
872                        alias: None,
873                    },
874                    join_operator: JoinOperator::CrossJoin,
875                },
876            )
877        })?;
878
879    if let Some(expr) = selection {
880        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
881        // PG-like semantics e.g. expressing ambiguous column references. We put
882        // `USING...` first for no real reason, but making a different decision
883        // would require adjusting the column references on this relation
884        // differently.
885        let on = HirScalarExpr::literal_true();
886        let joined = using_rel_expr
887            .clone()
888            .join(get.clone(), on, JoinKind::Inner);
889        let joined_scope = using_scope.product(outer_scope)?;
890        let joined_relation_type = qcx.relation_type(&joined);
891
892        let ecx = &ExprContext {
893            qcx,
894            name: "WHERE clause",
895            scope: &joined_scope,
896            relation_type: &joined_relation_type,
897            allow_aggregates: false,
898            allow_subqueries: true,
899            allow_parameters: true,
900            allow_windows: false,
901        };
902
903        // Plan the filter expression on `FROM, USING...`.
904        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
905
906        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
907        // those to the right of `using_rel_expr`) to instead be correlated to
908        // the outer relation, i.e. `get`.
909        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
910        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
911        use mz_expr::visit::Visit;
912        expr.visit_mut_post(&mut |e| {
913            if let HirScalarExpr::Column(c, _name) = e {
914                if c.column >= using_rel_arity {
915                    c.level += 1;
916                    c.column -= using_rel_arity;
917                };
918            }
919        });
920
921        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
922        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
923        // relation.
924        using_rel_expr = using_rel_expr.filter(vec![expr]);
925    } else {
926        // Check that scopes are at compatible (i.e. do not double-reference
927        // same table), despite lack of selection
928        let _joined_scope = using_scope.product(outer_scope)?;
929    }
930    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
931    // whether any rows are returned, and not on the contents of those rows,
932    // the output list of the subquery is normally unimportant.
933    //
934    // This means we don't need to worry about projecting/mapping any
935    // additional expressions here.
936    //
937    // https://www.postgresql.org/docs/14/functions-subquery.html
938
939    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
940    Ok(get.filter(vec![using_rel_expr.exists()]))
941}
942
943#[derive(Debug)]
944pub(crate) struct CastRelationError {
945    pub(crate) column: usize,
946    pub(crate) source_type: SqlScalarType,
947    pub(crate) target_type: SqlScalarType,
948}
949
950/// Cast a relation from one type to another using the specified type of cast.
951///
952/// The length of `target_types` must match the arity of `expr`.
953pub(crate) fn cast_relation<'a, I>(
954    qcx: &QueryContext,
955    ccx: CastContext,
956    expr: HirRelationExpr,
957    target_types: I,
958) -> Result<HirRelationExpr, CastRelationError>
959where
960    I: IntoIterator<Item = &'a SqlScalarType>,
961{
962    let ecx = &ExprContext {
963        qcx,
964        name: "values",
965        scope: &Scope::empty(),
966        relation_type: &qcx.relation_type(&expr),
967        allow_aggregates: false,
968        allow_subqueries: true,
969        allow_parameters: true,
970        allow_windows: false,
971    };
972    let mut map_exprs = vec![];
973    let mut project_key = vec![];
974    for (i, target_typ) in target_types.into_iter().enumerate() {
975        let expr = HirScalarExpr::column(i);
976        // We plan every cast and check the evaluated expressions rather than
977        // checking the types directly because of some complex casting rules
978        // between types not expressed in `SqlScalarType` equality.
979        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
980            Ok(cast_expr) => {
981                if expr == cast_expr {
982                    // Cast between types was unnecessary
983                    project_key.push(i);
984                } else {
985                    // Cast between types required
986                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
987                    map_exprs.push(cast_expr);
988                }
989            }
990            Err(_) => {
991                return Err(CastRelationError {
992                    column: i,
993                    source_type: ecx.scalar_type(&expr),
994                    target_type: target_typ.clone(),
995                });
996            }
997        }
998    }
999    Ok(expr.map(map_exprs).project(project_key))
1000}
1001
1002/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1003/// VIEW` statement.
1004pub fn plan_as_of(
1005    scx: &StatementContext,
1006    as_of: Option<AsOf<Aug>>,
1007) -> Result<QueryWhen, PlanError> {
1008    match as_of {
1009        None => Ok(QueryWhen::Immediately),
1010        Some(as_of) => match as_of {
1011            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1012            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1013        },
1014    }
1015}
1016
1017/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1018///
1019/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1020/// - not a constant,
1021/// - not castable to MzTimestamp,
1022/// - is null,
1023/// - contains an unmaterializable function,
1024/// - some other evaluation error occurs, e.g., a division by 0,
1025/// - contains aggregates, subqueries, parameters, or window function calls.
1026pub fn plan_as_of_or_up_to(
1027    scx: &StatementContext,
1028    mut expr: Expr<Aug>,
1029) -> Result<mz_repr::Timestamp, PlanError> {
1030    let scope = Scope::empty();
1031    let desc = RelationDesc::empty();
1032    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1033    // evaluated only once.)
1034    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1035    transform_ast::transform(scx, &mut expr)?;
1036    let ecx = &ExprContext {
1037        qcx: &qcx,
1038        name: "AS OF or UP TO",
1039        scope: &scope,
1040        relation_type: desc.typ(),
1041        allow_aggregates: false,
1042        allow_subqueries: false,
1043        allow_parameters: false,
1044        allow_windows: false,
1045    };
1046    let hir = plan_expr(ecx, &expr)?.cast_to(
1047        ecx,
1048        CastContext::Assignment,
1049        &SqlScalarType::MzTimestamp,
1050    )?;
1051    if hir.contains_unmaterializable() {
1052        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1053    }
1054    // At this point, we definitely have a constant expression:
1055    // - it can't contain any unmaterializable functions;
1056    // - it can't refer to any columns.
1057    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1058    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1059    // division by 0.
1060    let timestamp = hir
1061        .into_literal_mz_timestamp()
1062        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1063    Ok(timestamp)
1064}
1065
1066/// Plans an expression in the AS position of a `CREATE SECRET`.
1067pub fn plan_secret_as(
1068    scx: &StatementContext,
1069    mut expr: Expr<Aug>,
1070) -> Result<MirScalarExpr, PlanError> {
1071    let scope = Scope::empty();
1072    let desc = RelationDesc::empty();
1073    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1074
1075    transform_ast::transform(scx, &mut expr)?;
1076
1077    let ecx = &ExprContext {
1078        qcx: &qcx,
1079        name: "AS",
1080        scope: &scope,
1081        relation_type: desc.typ(),
1082        allow_aggregates: false,
1083        allow_subqueries: false,
1084        allow_parameters: false,
1085        allow_windows: false,
1086    };
1087    let expr = plan_expr(ecx, &expr)?
1088        .type_as(ecx, &SqlScalarType::Bytes)?
1089        .lower_uncorrelated(scx.catalog.system_vars())?;
1090    Ok(expr)
1091}
1092
1093/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1094pub fn plan_webhook_validate_using(
1095    scx: &StatementContext,
1096    validate_using: CreateWebhookSourceCheck<Aug>,
1097) -> Result<WebhookValidation, PlanError> {
1098    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1099
1100    let CreateWebhookSourceCheck {
1101        options,
1102        using: mut expr,
1103    } = validate_using;
1104
1105    let mut column_typs = vec![];
1106    let mut column_names = vec![];
1107
1108    let (bodies, headers, secrets) = options
1109        .map(|o| (o.bodies, o.headers, o.secrets))
1110        .unwrap_or_default();
1111
1112    // Append all of the bodies so they can be used in the expression.
1113    let mut body_tuples = vec![];
1114    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1115        let scalar_type = use_bytes
1116            .then_some(SqlScalarType::Bytes)
1117            .unwrap_or(SqlScalarType::String);
1118        let name = alias
1119            .map(|a| a.into_string())
1120            .unwrap_or_else(|| "body".to_string());
1121
1122        column_typs.push(SqlColumnType {
1123            scalar_type,
1124            nullable: false,
1125        });
1126        column_names.push(name);
1127
1128        // Store the column index so we can be sure to provide this body correctly.
1129        let column_idx = column_typs.len() - 1;
1130        // Double check we're consistent with column names.
1131        assert_eq!(
1132            column_idx,
1133            column_names.len() - 1,
1134            "body column names and types don't match"
1135        );
1136        body_tuples.push((column_idx, use_bytes));
1137    }
1138
1139    // Append all of the headers so they can be used in the expression.
1140    let mut header_tuples = vec![];
1141
1142    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1143        let value_type = use_bytes
1144            .then_some(SqlScalarType::Bytes)
1145            .unwrap_or(SqlScalarType::String);
1146        let name = alias
1147            .map(|a| a.into_string())
1148            .unwrap_or_else(|| "headers".to_string());
1149
1150        column_typs.push(SqlColumnType {
1151            scalar_type: SqlScalarType::Map {
1152                value_type: Box::new(value_type),
1153                custom_id: None,
1154            },
1155            nullable: false,
1156        });
1157        column_names.push(name);
1158
1159        // Store the column index so we can be sure to provide this body correctly.
1160        let column_idx = column_typs.len() - 1;
1161        // Double check we're consistent with column names.
1162        assert_eq!(
1163            column_idx,
1164            column_names.len() - 1,
1165            "header column names and types don't match"
1166        );
1167        header_tuples.push((column_idx, use_bytes));
1168    }
1169
1170    // Append all secrets so they can be used in the expression.
1171    let mut validation_secrets = vec![];
1172
1173    for CreateWebhookSourceSecret {
1174        secret,
1175        alias,
1176        use_bytes,
1177    } in secrets
1178    {
1179        // Either provide the secret to the validation expression as Bytes or a String.
1180        let scalar_type = use_bytes
1181            .then_some(SqlScalarType::Bytes)
1182            .unwrap_or(SqlScalarType::String);
1183
1184        column_typs.push(SqlColumnType {
1185            scalar_type,
1186            nullable: false,
1187        });
1188        let ResolvedItemName::Item {
1189            id,
1190            full_name: FullItemName { item, .. },
1191            ..
1192        } = secret
1193        else {
1194            return Err(PlanError::InvalidSecret(Box::new(secret)));
1195        };
1196
1197        // Plan the expression using the secret's alias, if one is provided.
1198        let name = if let Some(alias) = alias {
1199            alias.into_string()
1200        } else {
1201            item
1202        };
1203        column_names.push(name);
1204
1205        // Get the column index that corresponds for this secret, so we can make sure to provide the
1206        // secrets in the correct order during evaluation.
1207        let column_idx = column_typs.len() - 1;
1208        // Double check that our column names and types match.
1209        assert_eq!(
1210            column_idx,
1211            column_names.len() - 1,
1212            "column names and types don't match"
1213        );
1214
1215        validation_secrets.push(WebhookValidationSecret {
1216            id,
1217            column_idx,
1218            use_bytes,
1219        });
1220    }
1221
1222    let relation_typ = SqlRelationType::new(column_typs);
1223    let desc = RelationDesc::new(relation_typ, column_names.clone());
1224    let scope = Scope::from_source(None, column_names);
1225
1226    transform_ast::transform(scx, &mut expr)?;
1227
1228    let ecx = &ExprContext {
1229        qcx: &qcx,
1230        name: "CHECK",
1231        scope: &scope,
1232        relation_type: desc.typ(),
1233        allow_aggregates: false,
1234        allow_subqueries: false,
1235        allow_parameters: false,
1236        allow_windows: false,
1237    };
1238    let expr = plan_expr(ecx, &expr)?
1239        .type_as(ecx, &SqlScalarType::Bool)?
1240        .lower_uncorrelated(scx.catalog.system_vars())?;
1241    let validation = WebhookValidation {
1242        expression: expr,
1243        relation_desc: desc,
1244        bodies: body_tuples,
1245        headers: header_tuples,
1246        secrets: validation_secrets,
1247    };
1248    Ok(validation)
1249}
1250
1251pub fn plan_default_expr(
1252    scx: &StatementContext,
1253    expr: &Expr<Aug>,
1254    target_ty: &SqlScalarType,
1255) -> Result<HirScalarExpr, PlanError> {
1256    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1257    let ecx = &ExprContext {
1258        qcx: &qcx,
1259        name: "DEFAULT expression",
1260        scope: &Scope::empty(),
1261        relation_type: &SqlRelationType::empty(),
1262        allow_aggregates: false,
1263        allow_subqueries: false,
1264        allow_parameters: false,
1265        allow_windows: false,
1266    };
1267    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1268    Ok(hir)
1269}
1270
1271pub fn plan_params<'a>(
1272    scx: &'a StatementContext,
1273    params: Vec<Expr<Aug>>,
1274    desc: &StatementDesc,
1275) -> Result<Params, PlanError> {
1276    if params.len() != desc.param_types.len() {
1277        sql_bail!(
1278            "expected {} params, got {}",
1279            desc.param_types.len(),
1280            params.len()
1281        );
1282    }
1283
1284    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1285
1286    let mut datums = Row::default();
1287    let mut packer = datums.packer();
1288    let mut actual_types = Vec::new();
1289    let temp_storage = &RowArena::new();
1290    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1291        transform_ast::transform(scx, &mut expr)?;
1292
1293        let ecx = execute_expr_context(&qcx);
1294        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1295        let actual_ty = ecx.scalar_type(&ex);
1296        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1297            return Err(PlanError::WrongParameterType(
1298                i + 1,
1299                ecx.humanize_sql_scalar_type(expected_ty, false),
1300                ecx.humanize_sql_scalar_type(&actual_ty, false),
1301            ));
1302        }
1303        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1304        let evaled = ex.eval(&[], temp_storage)?;
1305        packer.push(evaled);
1306        actual_types.push(actual_ty);
1307    }
1308    Ok(Params {
1309        datums,
1310        execute_types: actual_types,
1311        expected_types: desc.param_types.clone(),
1312    })
1313}
1314
1315static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1316static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1317
1318/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1319pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1320    ExprContext {
1321        qcx,
1322        name: "EXECUTE",
1323        scope: &EXECUTE_CONTEXT_SCOPE,
1324        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1325        allow_aggregates: false,
1326        allow_subqueries: false,
1327        allow_parameters: false,
1328        allow_windows: false,
1329    }
1330}
1331
1332/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1333///
1334/// This is an assignment cast also in Postgres, see
1335/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1336pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1337    LazyLock::new(|| CastContext::Assignment);
1338
1339pub fn plan_index_exprs<'a>(
1340    scx: &'a StatementContext,
1341    on_desc: &RelationDesc,
1342    exprs: Vec<Expr<Aug>>,
1343) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1344    let scope = Scope::from_source(None, on_desc.iter_names());
1345    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1346
1347    let ecx = &ExprContext {
1348        qcx: &qcx,
1349        name: "CREATE INDEX",
1350        scope: &scope,
1351        relation_type: on_desc.typ(),
1352        allow_aggregates: false,
1353        allow_subqueries: false,
1354        allow_parameters: false,
1355        allow_windows: false,
1356    };
1357    let repr_col_types: Vec<ReprColumnType> = on_desc
1358        .typ()
1359        .column_types
1360        .iter()
1361        .map(ReprColumnType::from)
1362        .collect();
1363    let mut out = vec![];
1364    for mut expr in exprs {
1365        transform_ast::transform(scx, &mut expr)?;
1366        let expr = plan_expr_or_col_index(ecx, &expr)?;
1367        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1368        expr.reduce(&repr_col_types);
1369        out.push(expr);
1370    }
1371    Ok(out)
1372}
1373
1374fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1375    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1376        Some(column) => Ok(HirScalarExpr::column(column)),
1377        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1378    }
1379}
1380
1381fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1382    match e {
1383        Expr::Value(Value::Number(n)) => {
1384            let n = n.parse::<usize>().map_err(|e| {
1385                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1386            })?;
1387            if n < 1 || n > max {
1388                sql_bail!(
1389                    "column reference {} in {} is out of range (1 - {})",
1390                    n,
1391                    name,
1392                    max
1393                );
1394            }
1395            Ok(Some(n - 1))
1396        }
1397        _ => Ok(None),
1398    }
1399}
1400
1401struct PlannedQuery {
1402    expr: HirRelationExpr,
1403    scope: Scope,
1404    order_by: Vec<ColumnOrder>,
1405    limit: Option<HirScalarExpr>,
1406    /// `offset` is either
1407    /// - an Int64 literal
1408    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1409    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1410    ///   later.)
1411    offset: HirScalarExpr,
1412    project: Vec<usize>,
1413    group_size_hints: GroupSizeHints,
1414}
1415
1416fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1417    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1418}
1419
1420fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1421    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1422    // for the identifiers, so that they can be re-installed before returning.
1423    let cte_bindings = plan_ctes(qcx, q)?;
1424
1425    let limit = match &q.limit {
1426        None => None,
1427        Some(Limit {
1428            quantity,
1429            with_ties: false,
1430        }) => {
1431            let ecx = &ExprContext {
1432                qcx,
1433                name: "LIMIT",
1434                scope: &Scope::empty(),
1435                relation_type: &SqlRelationType::empty(),
1436                allow_aggregates: false,
1437                allow_subqueries: true,
1438                allow_parameters: true,
1439                allow_windows: false,
1440            };
1441            let limit = plan_expr(ecx, quantity)?;
1442            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1443
1444            let limit = if limit.is_constant() {
1445                let arena = RowArena::new();
1446                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1447
1448                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1449                // that the error happened in a LIMIT clause, so that we have better error msg for
1450                // something like `SELECT 5 LIMIT 'aaa'`.
1451                match limit.eval(&[], &arena)? {
1452                    d @ Datum::Int64(v) if v >= 0 => {
1453                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1454                    }
1455                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1456                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1457                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1458                }
1459            } else {
1460                // Gate non-constant LIMIT expressions behind a feature flag
1461                qcx.scx
1462                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1463                limit
1464            };
1465
1466            Some(limit)
1467        }
1468        Some(Limit {
1469            quantity: _,
1470            with_ties: true,
1471        }) => bail_unsupported!("FETCH ... WITH TIES"),
1472    };
1473
1474    let offset = match &q.offset {
1475        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1476        Some(offset) => {
1477            let ecx = &ExprContext {
1478                qcx,
1479                name: "OFFSET",
1480                scope: &Scope::empty(),
1481                relation_type: &SqlRelationType::empty(),
1482                allow_aggregates: false,
1483                allow_subqueries: false,
1484                allow_parameters: true,
1485                allow_windows: false,
1486            };
1487            let offset = plan_expr(ecx, offset)?;
1488            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1489
1490            let offset = if offset.is_constant() {
1491                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1492                let offset_value = offset_into_value(offset)?;
1493                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1494            } else {
1495                // The only case when this is allowed to not be a constant is if it contains
1496                // parameters. (In which case, we'll later check that it's a constant after
1497                // parameter binding.)
1498                if !offset.contains_parameters() {
1499                    return Err(PlanError::InvalidOffset(format!(
1500                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1501                        offset
1502                    )));
1503                }
1504                offset
1505            };
1506            offset
1507        }
1508    };
1509
1510    let mut planned_query = match &q.body {
1511        SetExpr::Select(s) => {
1512            // Extract query options.
1513            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1514            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1515
1516            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1517            PlannedQuery {
1518                expr: plan.expr,
1519                scope: plan.scope,
1520                order_by: plan.order_by,
1521                project: plan.project,
1522                limit,
1523                offset,
1524                group_size_hints,
1525            }
1526        }
1527        _ => {
1528            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1529            let ecx = &ExprContext {
1530                qcx,
1531                name: "ORDER BY clause of a set expression",
1532                scope: &scope,
1533                relation_type: &qcx.relation_type(&expr),
1534                allow_aggregates: false,
1535                allow_subqueries: true,
1536                allow_parameters: true,
1537                allow_windows: false,
1538            };
1539            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1540            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1541            let project = (0..ecx.relation_type.arity()).collect();
1542            PlannedQuery {
1543                expr: expr.map(map_exprs),
1544                scope,
1545                order_by,
1546                limit,
1547                project,
1548                offset,
1549                group_size_hints: GroupSizeHints::default(),
1550            }
1551        }
1552    };
1553
1554    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1555    match &q.ctes {
1556        CteBlock::Simple(_) => {
1557            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1558                if let Some(cte) = qcx.ctes.remove(&id) {
1559                    planned_query.expr = HirRelationExpr::Let {
1560                        name: cte.name,
1561                        id: id.clone(),
1562                        value: Box::new(value),
1563                        body: Box::new(planned_query.expr),
1564                    };
1565                }
1566                if let Some(shadowed_val) = shadowed_val {
1567                    qcx.ctes.insert(id, shadowed_val);
1568                }
1569            }
1570        }
1571        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1572            let MutRecBlockOptionExtracted {
1573                recursion_limit,
1574                return_at_recursion_limit,
1575                error_at_recursion_limit,
1576                seen: _,
1577            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1578            let limit = match (
1579                recursion_limit,
1580                return_at_recursion_limit,
1581                error_at_recursion_limit,
1582            ) {
1583                (None, None, None) => None,
1584                (Some(max_iters), None, None) => {
1585                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1586                }
1587                (None, Some(max_iters), None) => Some((max_iters, true)),
1588                (None, None, Some(max_iters)) => Some((max_iters, false)),
1589                _ => {
1590                    return Err(InvalidWmrRecursionLimit(
1591                        "More than one recursion limit given. \
1592                         Please give at most one of RECURSION LIMIT, \
1593                         ERROR AT RECURSION LIMIT, \
1594                         RETURN AT RECURSION LIMIT."
1595                            .to_owned(),
1596                    ));
1597                }
1598            }
1599            .try_map(|(max_iters, return_at_limit)| {
1600                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1601                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1602                        "Recursion limit has to be greater than 0.".to_owned(),
1603                    ))?,
1604                    return_at_limit: *return_at_limit,
1605                })
1606            })?;
1607
1608            let mut bindings = Vec::new();
1609            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1610                if let Some(cte) = qcx.ctes.remove(&id) {
1611                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1612                }
1613                if let Some(shadowed_val) = shadowed_val {
1614                    qcx.ctes.insert(id, shadowed_val);
1615                }
1616            }
1617            if !bindings.is_empty() {
1618                planned_query.expr = HirRelationExpr::LetRec {
1619                    limit,
1620                    bindings,
1621                    body: Box::new(planned_query.expr),
1622                }
1623            }
1624        }
1625    }
1626
1627    Ok(planned_query)
1628}
1629
1630/// Converts an OFFSET expression into a value.
1631pub(crate) fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1632    let offset = offset
1633        .try_into_literal_int64()
1634        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1635    if offset < 0 {
1636        return Err(negative_offset_error(offset));
1637    }
1638    Ok(offset)
1639}
1640
1641pub(crate) fn negative_offset_error(offset: i64) -> PlanError {
1642    PlanError::InvalidOffset(format!("must not be negative, got {}", offset))
1643}
1644
1645generate_extracted_config!(
1646    MutRecBlockOption,
1647    (RecursionLimit, u64),
1648    (ReturnAtRecursionLimit, u64),
1649    (ErrorAtRecursionLimit, u64)
1650);
1651
1652/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1653///
1654/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1655/// shadowed value that can be reinstalled once the planning has completed.
1656pub fn plan_ctes(
1657    qcx: &mut QueryContext,
1658    q: &Query<Aug>,
1659) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1660    // Accumulate planned expressions and shadowed descriptions.
1661    let mut result = Vec::new();
1662    // Retain the old descriptions of CTE bindings so that we can restore them
1663    // after we're done planning this SELECT.
1664    let mut shadowed_descs = BTreeMap::new();
1665
1666    // A reused identifier indicates a reused name.
1667    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1668        sql_bail!(
1669            "WITH query name {} specified more than once",
1670            normalize::ident_ref(ident).quoted()
1671        )
1672    }
1673
1674    match &q.ctes {
1675        CteBlock::Simple(ctes) => {
1676            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1677            for cte in ctes.iter() {
1678                let cte_name = normalize::ident(cte.alias.name.clone());
1679                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1680                let typ = qcx.relation_type(&val);
1681                let mut desc = RelationDesc::new(typ, scope.column_names());
1682                plan_utils::maybe_rename_columns(
1683                    format!("CTE {}", cte.alias.name),
1684                    &mut desc,
1685                    &cte.alias.columns,
1686                )?;
1687                // Capture the prior value if it exists, so that it can be re-installed.
1688                let shadowed = qcx.ctes.insert(
1689                    cte.id,
1690                    CteDesc {
1691                        name: cte_name,
1692                        desc,
1693                    },
1694                );
1695
1696                result.push((cte.id, val, shadowed));
1697            }
1698        }
1699        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1700            // Insert column types into `qcx.ctes` first for recursive bindings.
1701            for cte in ctes.iter() {
1702                let cte_name = normalize::ident(cte.name.clone());
1703                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1704                for column in cte.columns.iter() {
1705                    desc_columns.push((
1706                        normalize::column_name(column.name.clone()),
1707                        SqlColumnType {
1708                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1709                            nullable: true,
1710                        },
1711                    ));
1712                }
1713                let desc = RelationDesc::from_names_and_types(desc_columns);
1714                let shadowed = qcx.ctes.insert(
1715                    cte.id,
1716                    CteDesc {
1717                        name: cte_name,
1718                        desc,
1719                    },
1720                );
1721                // Capture the prior value if it exists, so that it can be re-installed.
1722                if let Some(shadowed) = shadowed {
1723                    shadowed_descs.insert(cte.id, shadowed);
1724                }
1725            }
1726
1727            // Plan all CTEs and validate the proposed types.
1728            for cte in ctes.iter() {
1729                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1730
1731                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1732
1733                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1734                    // Once WMR CTEs support NOT NULL constraints, check that
1735                    // nullability of derived column types are compatible.
1736                    sql_bail!(
1737                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1738                    );
1739                }
1740
1741                if !proposed_typ.keys.is_empty() {
1742                    // Once WMR CTEs support keys, check that keys exactly
1743                    // overlap.
1744                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1745                }
1746
1747                // Validate that the derived and proposed types are the same.
1748                let derived_typ = qcx.relation_type(&val);
1749
1750                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1751                    let cte_name = normalize::ident(cte.name.clone());
1752                    let proposed_typ = proposed_typ
1753                        .column_types
1754                        .iter()
1755                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1756                        .collect::<Vec<_>>();
1757                    let inferred_typ = derived_typ
1758                        .column_types
1759                        .iter()
1760                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1761                        .collect::<Vec<_>>();
1762                    Err(PlanError::RecursiveTypeMismatch(
1763                        cte_name,
1764                        proposed_typ,
1765                        inferred_typ,
1766                    ))
1767                };
1768
1769                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1770                    return type_err(proposed_typ, derived_typ);
1771                }
1772
1773                // Cast derived types to proposed types or error.
1774                let val = match cast_relation(
1775                    qcx,
1776                    // Choose `CastContext::Assignment`` because the user has
1777                    // been explicit about the types they expect. Choosing
1778                    // `CastContext::Implicit` is not "strong" enough to impose
1779                    // typmods from proposed types onto values.
1780                    CastContext::Assignment,
1781                    val,
1782                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1783                ) {
1784                    Ok(val) => val,
1785                    Err(_) => return type_err(proposed_typ, derived_typ),
1786                };
1787
1788                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1789            }
1790        }
1791    }
1792
1793    Ok(result)
1794}
1795
1796pub fn plan_nested_query(
1797    qcx: &mut QueryContext,
1798    q: &Query<Aug>,
1799) -> Result<(HirRelationExpr, Scope), PlanError> {
1800    let PlannedQuery {
1801        mut expr,
1802        scope,
1803        order_by,
1804        limit,
1805        offset,
1806        project,
1807        group_size_hints,
1808    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1809    if limit.is_some()
1810        || !offset
1811            .clone()
1812            .try_into_literal_int64()
1813            .is_ok_and(|offset| offset == 0)
1814    {
1815        expr = HirRelationExpr::top_k(
1816            expr,
1817            vec![],
1818            order_by,
1819            limit,
1820            offset,
1821            group_size_hints.limit_input_group_size,
1822        );
1823    }
1824    Ok((expr.project(project), scope))
1825}
1826
1827fn plan_set_expr(
1828    qcx: &mut QueryContext,
1829    q: &SetExpr<Aug>,
1830) -> Result<(HirRelationExpr, Scope), PlanError> {
1831    match q {
1832        SetExpr::Select(select) => {
1833            let order_by_exprs = Vec::new();
1834            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1835            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1836            // should not have planned any ordering.
1837            assert!(plan.order_by.is_empty());
1838            Ok((plan.expr.project(plan.project), plan.scope))
1839        }
1840        SetExpr::SetOperation {
1841            op,
1842            all,
1843            left,
1844            right,
1845        } => {
1846            // Plan the LHS and RHS.
1847            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1848            let (right_expr, right_scope) =
1849                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1850
1851            // Validate that the LHS and RHS are the same width.
1852            let left_type = qcx.relation_type(&left_expr);
1853            let right_type = qcx.relation_type(&right_expr);
1854            if left_type.arity() != right_type.arity() {
1855                sql_bail!(
1856                    "each {} query must have the same number of columns: {} vs {}",
1857                    op,
1858                    left_type.arity(),
1859                    right_type.arity(),
1860                );
1861            }
1862
1863            // Match the types of the corresponding columns on the LHS and RHS
1864            // using the normal type coercion rules. This is equivalent to
1865            // `coerce_homogeneous_exprs`, but implemented in terms of
1866            // `HirRelationExpr` rather than `HirScalarExpr`.
1867            let left_ecx = &ExprContext {
1868                qcx,
1869                name: &op.to_string(),
1870                scope: &left_scope,
1871                relation_type: &left_type,
1872                allow_aggregates: false,
1873                allow_subqueries: false,
1874                allow_parameters: false,
1875                allow_windows: false,
1876            };
1877            let right_ecx = &ExprContext {
1878                qcx,
1879                name: &op.to_string(),
1880                scope: &right_scope,
1881                relation_type: &right_type,
1882                allow_aggregates: false,
1883                allow_subqueries: false,
1884                allow_parameters: false,
1885                allow_windows: false,
1886            };
1887            let mut left_casts = vec![];
1888            let mut right_casts = vec![];
1889            for (i, (left_type, right_type)) in left_type
1890                .column_types
1891                .iter()
1892                .zip_eq(right_type.column_types.iter())
1893                .enumerate()
1894            {
1895                let types = &[
1896                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1897                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1898                ];
1899                let target =
1900                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1901                match typeconv::plan_cast(
1902                    left_ecx,
1903                    CastContext::Implicit,
1904                    HirScalarExpr::column(i),
1905                    &target,
1906                ) {
1907                    Ok(expr) => left_casts.push(expr),
1908                    Err(_) => sql_bail!(
1909                        "{} types {} and {} cannot be matched",
1910                        op,
1911                        qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1912                        qcx.humanize_sql_scalar_type(&target, false),
1913                    ),
1914                }
1915                match typeconv::plan_cast(
1916                    right_ecx,
1917                    CastContext::Implicit,
1918                    HirScalarExpr::column(i),
1919                    &target,
1920                ) {
1921                    Ok(expr) => right_casts.push(expr),
1922                    Err(_) => sql_bail!(
1923                        "{} types {} and {} cannot be matched",
1924                        op,
1925                        qcx.humanize_sql_scalar_type(&target, false),
1926                        qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1927                    ),
1928                }
1929            }
1930            let lhs = if left_casts
1931                .iter()
1932                .enumerate()
1933                .any(|(i, e)| e != &HirScalarExpr::column(i))
1934            {
1935                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1936                left_expr.map(left_casts).project(project_key)
1937            } else {
1938                left_expr
1939            };
1940            let rhs = if right_casts
1941                .iter()
1942                .enumerate()
1943                .any(|(i, e)| e != &HirScalarExpr::column(i))
1944            {
1945                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1946                right_expr.map(right_casts).project(project_key)
1947            } else {
1948                right_expr
1949            };
1950
1951            let relation_expr = match op {
1952                SetOperator::Union => {
1953                    if *all {
1954                        lhs.union(rhs)
1955                    } else {
1956                        lhs.union(rhs).distinct()
1957                    }
1958                }
1959                SetOperator::Except => Hir::except(all, lhs, rhs),
1960                SetOperator::Intersect => {
1961                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1962                    // Though we believe that render() does The Right Thing (TM)
1963                    // Also note that we do *not* need another threshold() at the end of the method chain
1964                    // because the right-hand side of the outer union only produces existing records,
1965                    // i.e., the record counts for differential data flow definitely remain non-negative.
1966                    let left_clone = lhs.clone();
1967                    if *all {
1968                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1969                    } else {
1970                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1971                            .distinct()
1972                    }
1973                }
1974            };
1975            let scope = Scope::from_source(
1976                None,
1977                // Column names are taken from the left, as in Postgres.
1978                left_scope.column_names(),
1979            );
1980
1981            Ok((relation_expr, scope))
1982        }
1983        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1984        SetExpr::Table(name) => {
1985            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1986            Ok((expr, scope))
1987        }
1988        SetExpr::Query(query) => {
1989            let (expr, scope) = plan_nested_query(qcx, query)?;
1990            Ok((expr, scope))
1991        }
1992        SetExpr::Show(stmt) => {
1993            // The create SQL definition of involving this query, will have the explicit `SHOW`
1994            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
1995            // current schema of the executing user. When Materialize restarts and tries to re-plan
1996            // these queries, it will only have access to the raw `SHOW` command and have no idea
1997            // what schema to use. As a result Materialize will fail to boot.
1998            //
1999            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2000            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2001            // However, banning show commands in views gives us more flexibility to change their
2002            // output.
2003            //
2004            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2005            // with all show commands expanded into their equivalent SELECT statements.
2006            if !qcx.lifetime.allow_show() {
2007                return Err(PlanError::ShowCommandInView);
2008            }
2009
2010            // Some SHOW statements are a SELECT query. Others produces Rows
2011            // directly. Convert both of these to the needed Hir and Scope.
2012            fn to_hirscope(
2013                plan: ShowCreatePlan,
2014                desc: StatementDesc,
2015            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2016                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2017                let desc = desc.relation_desc.ok_or_else(|| {
2018                    internal_err!("statement description missing relation descriptor")
2019                })?;
2020                let scope = Scope::from_source(None, desc.iter_names());
2021                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2022                Ok((expr, scope))
2023            }
2024
2025            match stmt.clone() {
2026                ShowStatement::ShowColumns(stmt) => {
2027                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2028                }
2029                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2030                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2031                    show::describe_show_create_connection(qcx.scx, stmt)?,
2032                ),
2033                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2034                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2035                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2036                ),
2037                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2038                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2039                    show::describe_show_create_index(qcx.scx, stmt)?,
2040                ),
2041                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2042                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2043                    show::describe_show_create_sink(qcx.scx, stmt)?,
2044                ),
2045                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2046                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2047                    show::describe_show_create_source(qcx.scx, stmt)?,
2048                ),
2049                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2050                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2051                    show::describe_show_create_table(qcx.scx, stmt)?,
2052                ),
2053                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2054                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2055                    show::describe_show_create_view(qcx.scx, stmt)?,
2056                ),
2057                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2058                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2059                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2060                ),
2061                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2062                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2063                    show::describe_show_create_type(qcx.scx, stmt)?,
2064                ),
2065                ShowStatement::ShowObjects(stmt) => {
2066                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2067                }
2068                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2069                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2070            }
2071        }
2072    }
2073}
2074
2075/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2076fn plan_values(
2077    qcx: &QueryContext,
2078    values: &[Vec<Expr<Aug>>],
2079) -> Result<(HirRelationExpr, Scope), PlanError> {
2080    assert!(!values.is_empty());
2081
2082    let ecx = &ExprContext {
2083        qcx,
2084        name: "VALUES",
2085        scope: &Scope::empty(),
2086        relation_type: &SqlRelationType::empty(),
2087        allow_aggregates: false,
2088        allow_subqueries: true,
2089        allow_parameters: true,
2090        allow_windows: false,
2091    };
2092
2093    let ncols = values[0].len();
2094    let nrows = values.len();
2095
2096    // Arrange input expressions by columns, not rows, so that we can
2097    // call `coerce_homogeneous_exprs` on each column.
2098    let mut cols = vec![vec![]; ncols];
2099    for row in values {
2100        if row.len() != ncols {
2101            sql_bail!(
2102                "VALUES expression has varying number of columns: {} vs {}",
2103                row.len(),
2104                ncols
2105            );
2106        }
2107        for (i, v) in row.iter().enumerate() {
2108            cols[i].push(v);
2109        }
2110    }
2111
2112    // Plan each column.
2113    let mut col_iters = Vec::with_capacity(ncols);
2114    let mut col_types = Vec::with_capacity(ncols);
2115    for col in &cols {
2116        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2117        let mut col_type = ecx.column_type(&col[0]);
2118        for val in &col[1..] {
2119            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2120        }
2121        col_types.push(col_type);
2122        col_iters.push(col.into_iter());
2123    }
2124
2125    // Build constant relation.
2126    let mut exprs = vec![];
2127    for _ in 0..nrows {
2128        for i in 0..ncols {
2129            exprs.push(col_iters[i].next().unwrap());
2130        }
2131    }
2132    let out = HirRelationExpr::CallTable {
2133        func: TableFunc::Wrap {
2134            width: ncols,
2135            types: col_types,
2136        },
2137        exprs,
2138    };
2139
2140    // Build column names.
2141    let mut scope = Scope::empty();
2142    for i in 0..ncols {
2143        let name = format!("column{}", i + 1);
2144        scope.items.push(ScopeItem::from_column_name(name));
2145    }
2146
2147    Ok((out, scope))
2148}
2149
2150/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2151/// statement.
2152///
2153/// This is special-cased in PostgreSQL and different enough from `plan_values`
2154/// that it is easier to use a separate function entirely. Unlike a normal
2155/// `VALUES` clause, each value is coerced to the type of the target table
2156/// via an assignment cast.
2157///
2158/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2159fn plan_values_insert(
2160    qcx: &QueryContext,
2161    target_names: &[&ColumnName],
2162    target_types: &[&SqlScalarType],
2163    values: &[Vec<Expr<Aug>>],
2164) -> Result<HirRelationExpr, PlanError> {
2165    assert!(!values.is_empty());
2166
2167    if !values.iter().map(|row| row.len()).all_equal() {
2168        sql_bail!("VALUES lists must all be the same length");
2169    }
2170
2171    let ecx = &ExprContext {
2172        qcx,
2173        name: "VALUES",
2174        scope: &Scope::empty(),
2175        relation_type: &SqlRelationType::empty(),
2176        allow_aggregates: false,
2177        allow_subqueries: true,
2178        allow_parameters: true,
2179        allow_windows: false,
2180    };
2181
2182    let mut exprs = vec![];
2183    let mut types = vec![];
2184    for row in values {
2185        if row.len() > target_names.len() {
2186            sql_bail!("INSERT has more expressions than target columns");
2187        }
2188        for (column, val) in row.into_iter().enumerate() {
2189            let target_type = &target_types[column];
2190            let val = plan_expr(ecx, val)?;
2191            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2192            let source_type = &ecx.scalar_type(&val);
2193            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2194                Ok(val) => val,
2195                Err(_) => sql_bail!(
2196                    "column {} is of type {} but expression is of type {}",
2197                    target_names[column].quoted(),
2198                    qcx.humanize_sql_scalar_type(target_type, false),
2199                    qcx.humanize_sql_scalar_type(source_type, false),
2200                ),
2201            };
2202            if column >= types.len() {
2203                types.push(ecx.column_type(&val));
2204            } else {
2205                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2206            }
2207            exprs.push(val);
2208        }
2209    }
2210
2211    Ok(HirRelationExpr::CallTable {
2212        func: TableFunc::Wrap {
2213            width: values[0].len(),
2214            types,
2215        },
2216        exprs,
2217    })
2218}
2219
2220fn plan_join_identity() -> (HirRelationExpr, Scope) {
2221    let typ = SqlRelationType::new(vec![]);
2222    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2223    let scope = Scope::empty();
2224    (expr, scope)
2225}
2226
2227/// Describes how to execute a SELECT query.
2228///
2229/// `order_by` describes how to order the rows in `expr` *before* applying the
2230/// projection. The `scope` describes the columns in `expr` *after* the
2231/// projection has been applied.
2232#[derive(Debug)]
2233struct SelectPlan {
2234    expr: HirRelationExpr,
2235    scope: Scope,
2236    order_by: Vec<ColumnOrder>,
2237    project: Vec<usize>,
2238}
2239
2240generate_extracted_config!(
2241    SelectOption,
2242    (ExpectedGroupSize, u64),
2243    (AggregateInputGroupSize, u64),
2244    (DistinctOnInputGroupSize, u64),
2245    (LimitInputGroupSize, u64)
2246);
2247
2248/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2249///
2250/// Normally, the ORDER BY clause occurs after the columns specified in the
2251/// SELECT list have been projected. In a query like
2252///
2253///   CREATE TABLE (a int, b int)
2254///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2255///
2256/// it is valid to refer to `a`, because it is explicitly selected, but it would
2257/// not be valid to refer to unselected column `b`.
2258///
2259/// But PostgreSQL extends the standard to permit queries like
2260///
2261///   SELECT a FROM t ORDER BY b
2262///
2263/// where expressions in the ORDER BY clause can refer to *both* input columns
2264/// and output columns.
2265fn plan_select_from_where(
2266    qcx: &QueryContext,
2267    mut s: Select<Aug>,
2268    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2269) -> Result<SelectPlan, PlanError> {
2270    // TODO: Both `s` and `order_by_exprs` are not references because the
2271    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2272    // table function support (the UUID mapping). Attempt to change this so callers
2273    // don't need to clone the Select.
2274
2275    // Extract query options.
2276    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2277    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2278
2279    // Step 1. Handle FROM clause, including joins.
2280    let (mut relation_expr, mut from_scope) =
2281        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2282            let (left, left_scope) = l;
2283            plan_join(
2284                qcx,
2285                left,
2286                left_scope,
2287                &Join {
2288                    relation: TableFactor::NestedJoin {
2289                        join: Box::new(twj.clone()),
2290                        alias: None,
2291                    },
2292                    join_operator: JoinOperator::CrossJoin,
2293                },
2294            )
2295        })?;
2296
2297    // Step 2. Handle WHERE clause.
2298    if let Some(selection) = &s.selection {
2299        let ecx = &ExprContext {
2300            qcx,
2301            name: "WHERE clause",
2302            scope: &from_scope,
2303            relation_type: &qcx.relation_type(&relation_expr),
2304            allow_aggregates: false,
2305            allow_subqueries: true,
2306            allow_parameters: true,
2307            allow_windows: false,
2308        };
2309        let expr = plan_expr(ecx, selection)
2310            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2311            .type_as(ecx, &SqlScalarType::Bool)?;
2312        relation_expr = relation_expr.filter(vec![expr]);
2313    }
2314
2315    // Step 3. Gather aggregates and table functions.
2316    // (But skip window aggregates.)
2317    let (aggregates, table_funcs) = {
2318        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2319        visitor.visit_select_mut(&mut s);
2320        for o in order_by_exprs.iter_mut() {
2321            visitor.visit_order_by_expr_mut(o);
2322        }
2323        visitor.into_result()?
2324    };
2325    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2326    if !table_funcs.is_empty() {
2327        let (expr, scope) = plan_scalar_table_funcs(
2328            qcx,
2329            table_funcs,
2330            &mut table_func_names,
2331            &relation_expr,
2332            &from_scope,
2333        )?;
2334        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2335        from_scope = from_scope.product(scope)?;
2336    }
2337
2338    // Step 4. Expand SELECT clause.
2339    let projection = {
2340        let ecx = &ExprContext {
2341            qcx,
2342            name: "SELECT clause",
2343            scope: &from_scope,
2344            relation_type: &qcx.relation_type(&relation_expr),
2345            allow_aggregates: true,
2346            allow_subqueries: true,
2347            allow_parameters: true,
2348            allow_windows: true,
2349        };
2350        let mut out = vec![];
2351        for si in &s.projection {
2352            if *si == SelectItem::Wildcard && s.from.is_empty() {
2353                sql_bail!("SELECT * with no tables specified is not valid");
2354            }
2355            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2356        }
2357        out
2358    };
2359
2360    // Step 5. Handle GROUP BY clause.
2361    // This will also plan the aggregates gathered in Step 3.
2362    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2363    let (mut group_scope, select_all_mapping) = {
2364        // Compute GROUP BY expressions.
2365        let ecx = &ExprContext {
2366            qcx,
2367            name: "GROUP BY clause",
2368            scope: &from_scope,
2369            relation_type: &qcx.relation_type(&relation_expr),
2370            allow_aggregates: false,
2371            allow_subqueries: true,
2372            allow_parameters: true,
2373            allow_windows: false,
2374        };
2375        let mut group_key = vec![];
2376        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2377        let mut group_hir_exprs = vec![];
2378        let mut group_scope = Scope::empty();
2379        let mut select_all_mapping = BTreeMap::new();
2380
2381        for group_expr in &s.group_by {
2382            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2383            let new_column = group_key.len();
2384
2385            // Multiple AST expressions can map to the same HIR expression, e.g.
2386            // `GROUP BY 1, 1` or `GROUP BY a, 1` where the positional reference
2387            // `1` resolves to the same column as `a`. When we already have a
2388            // ScopeItem for this HIR expression we must deduplicate: skip adding
2389            // a second group key for it. We must not gate this on `group_expr`
2390            // being `Some`, because a positional reference to an input column
2391            // (e.g. `GROUP BY 1`) has no AST expression to record yet still
2392            // needs to dedup against the existing key; gating on `Some` here is
2393            // what made `GROUP BY 1, 1` panic on the `group_hir_exprs.len() ==
2394            // group_exprs.len()` assertion below.
2395            if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2396                // If this AST expression is a named expression (not a bare
2397                // positional reference), record it on the existing ScopeItem so
2398                // name resolution can find it.
2399                if let Some(group_expr) = group_expr {
2400                    existing_scope_item.exprs.insert(group_expr.clone());
2401                }
2402                continue;
2403            }
2404
2405            let mut scope_item = if let HirScalarExpr::Column(
2406                ColumnRef {
2407                    level: 0,
2408                    column: old_column,
2409                },
2410                _name,
2411            ) = &expr
2412            {
2413                // If we later have `SELECT foo.*` then we have to find all
2414                // the `foo` items in `from_scope` and figure out where they
2415                // ended up in `group_scope`. This is really hard to do
2416                // right using SQL name resolution, so instead we just track
2417                // the movement here.
2418                select_all_mapping.insert(*old_column, new_column);
2419                let scope_item = ecx.scope.items[*old_column].clone();
2420                scope_item
2421            } else {
2422                ScopeItem::empty()
2423            };
2424
2425            if let Some(group_expr) = group_expr.cloned() {
2426                scope_item.exprs.insert(group_expr);
2427            }
2428
2429            group_key.push(from_scope.len() + group_exprs.len());
2430            group_hir_exprs.push(expr.clone());
2431            group_exprs.insert(expr, scope_item);
2432        }
2433
2434        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2435        for expr in &group_hir_exprs {
2436            if let Some(scope_item) = group_exprs.remove(expr) {
2437                group_scope.items.push(scope_item);
2438            }
2439        }
2440
2441        // Plan aggregates.
2442        let ecx = &ExprContext {
2443            qcx,
2444            name: "aggregate function",
2445            scope: &from_scope,
2446            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2447            allow_aggregates: false,
2448            allow_subqueries: true,
2449            allow_parameters: true,
2450            allow_windows: false,
2451        };
2452        let mut agg_exprs = vec![];
2453        for sql_function in aggregates {
2454            if sql_function.over.is_some() {
2455                unreachable!(
2456                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2457                );
2458            }
2459            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2460            group_scope
2461                .items
2462                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2463        }
2464        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2465            // apply GROUP BY / aggregates
2466            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2467                group_key,
2468                agg_exprs,
2469                group_size_hints.aggregate_input_group_size,
2470            );
2471
2472            // For every old column that wasn't a group key, add a scope item
2473            // that errors when referenced. We can't simply drop these items
2474            // from scope. These items need to *exist* because they might shadow
2475            // variables in outer scopes that would otherwise be valid to
2476            // reference, but accessing them needs to produce an error.
2477            for i in 0..from_scope.len() {
2478                if !select_all_mapping.contains_key(&i) {
2479                    let scope_item = &ecx.scope.items[i];
2480                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2481                        table_name: scope_item.table_name.clone(),
2482                        column_name: scope_item.column_name.clone(),
2483                        allow_unqualified_references: scope_item.allow_unqualified_references,
2484                    });
2485                }
2486            }
2487
2488            (group_scope, select_all_mapping)
2489        } else {
2490            // if no GROUP BY, aggregates or having then all columns remain in scope
2491            (
2492                from_scope.clone(),
2493                (0..from_scope.len()).map(|i| (i, i)).collect(),
2494            )
2495        }
2496    };
2497
2498    // Step 6. Handle HAVING clause.
2499    if let Some(ref having) = s.having {
2500        let ecx = &ExprContext {
2501            qcx,
2502            name: "HAVING clause",
2503            scope: &group_scope,
2504            relation_type: &qcx.relation_type(&relation_expr),
2505            allow_aggregates: true,
2506            allow_subqueries: true,
2507            allow_parameters: true,
2508            allow_windows: false,
2509        };
2510        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2511        relation_expr = relation_expr.filter(vec![expr]);
2512    }
2513
2514    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2515    // (This includes window aggregations.)
2516    //
2517    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2518    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2519    //
2520    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2521    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2522    // and can't be part of a bigger expression.
2523    // See https://www.postgresql.org/docs/current/queries-order.html:
2524    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2525    // expression"
2526    let window_funcs = {
2527        let mut visitor = WindowFuncCollector::default();
2528        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2529        // window functions are excluded from other things by `allow_windows` being false when
2530        // planning those before this code).
2531        visitor.visit_select(&s);
2532        for o in order_by_exprs.iter() {
2533            visitor.visit_order_by_expr(o);
2534        }
2535        visitor.into_result()
2536    };
2537    for window_func in window_funcs {
2538        let ecx = &ExprContext {
2539            qcx,
2540            name: "window function",
2541            scope: &group_scope,
2542            relation_type: &qcx.relation_type(&relation_expr),
2543            allow_aggregates: true,
2544            allow_subqueries: true,
2545            allow_parameters: true,
2546            allow_windows: true,
2547        };
2548        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2549        group_scope.items.push(ScopeItem::from_expr(window_func));
2550    }
2551    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2552    // been already planned now. However, we should still set `allow_windows: true` for the
2553    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2554    // msg if an OVER clause is missing from a window function.
2555
2556    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2557    if let Some(ref qualify) = s.qualify {
2558        let ecx = &ExprContext {
2559            qcx,
2560            name: "QUALIFY clause",
2561            scope: &group_scope,
2562            relation_type: &qcx.relation_type(&relation_expr),
2563            allow_aggregates: true,
2564            allow_subqueries: true,
2565            allow_parameters: true,
2566            allow_windows: true,
2567        };
2568        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2569        relation_expr = relation_expr.filter(vec![expr]);
2570    }
2571
2572    // Step 9. Handle SELECT clause.
2573    let output_columns = {
2574        let mut new_exprs = vec![];
2575        let mut new_type = qcx.relation_type(&relation_expr);
2576        let mut output_columns = vec![];
2577        for (select_item, column_name) in &projection {
2578            let ecx = &ExprContext {
2579                qcx,
2580                name: "SELECT clause",
2581                scope: &group_scope,
2582                relation_type: &new_type,
2583                allow_aggregates: true,
2584                allow_subqueries: true,
2585                allow_parameters: true,
2586                allow_windows: true,
2587            };
2588            let expr = match select_item {
2589                ExpandedSelectItem::InputOrdinal(i) => {
2590                    if let Some(column) = select_all_mapping.get(i).copied() {
2591                        HirScalarExpr::column(column)
2592                    } else {
2593                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2594                    }
2595                }
2596                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2597            };
2598            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2599                // Simple column reference; no need to map on a new expression.
2600                output_columns.push((column, column_name));
2601            } else {
2602                // Complicated expression that requires a map expression. We
2603                // update `group_scope` as we go so that future expressions that
2604                // are textually identical to this one can reuse it. This
2605                // duplicate detection is required for proper determination of
2606                // ambiguous column references with SQL92-style `ORDER BY`
2607                // items. See `plan_order_by_or_distinct_expr` for more.
2608                let typ = ecx.column_type(&expr);
2609                new_type.column_types.push(typ);
2610                new_exprs.push(expr);
2611                output_columns.push((group_scope.len(), column_name));
2612                group_scope
2613                    .items
2614                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2615            }
2616        }
2617        relation_expr = relation_expr.map(new_exprs);
2618        output_columns
2619    };
2620    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2621
2622    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2623    let order_by = {
2624        let relation_type = qcx.relation_type(&relation_expr);
2625        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2626            &ExprContext {
2627                qcx,
2628                name: "ORDER BY clause",
2629                scope: &group_scope,
2630                relation_type: &relation_type,
2631                allow_aggregates: true,
2632                allow_subqueries: true,
2633                allow_parameters: true,
2634                allow_windows: true,
2635            },
2636            &order_by_exprs,
2637            &output_columns,
2638        )?;
2639
2640        match s.distinct {
2641            None => relation_expr = relation_expr.map(map_exprs),
2642            Some(Distinct::EntireRow) => {
2643                if relation_type.arity() == 0 {
2644                    sql_bail!("SELECT DISTINCT must have at least one column");
2645                }
2646                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2647                // list, so we can't proceed if `ORDER BY` has introduced any
2648                // columns for arbitrary expressions. This matches PostgreSQL.
2649                if !try_push_projection_order_by(
2650                    &mut relation_expr,
2651                    &mut project_key,
2652                    &mut order_by,
2653                ) {
2654                    sql_bail!(
2655                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2656                    );
2657                }
2658                assert!(map_exprs.is_empty());
2659                relation_expr = relation_expr.distinct();
2660            }
2661            Some(Distinct::On(exprs)) => {
2662                let ecx = &ExprContext {
2663                    qcx,
2664                    name: "DISTINCT ON clause",
2665                    scope: &group_scope,
2666                    relation_type: &qcx.relation_type(&relation_expr),
2667                    allow_aggregates: true,
2668                    allow_subqueries: true,
2669                    allow_parameters: true,
2670                    allow_windows: true,
2671                };
2672
2673                let mut distinct_exprs = vec![];
2674                for expr in &exprs {
2675                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2676                    distinct_exprs.push(expr);
2677                }
2678
2679                let mut distinct_key = vec![];
2680
2681                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2682                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2683                // expressions, though the order of `DISTINCT ON` expressions
2684                // does not matter. This matches PostgreSQL and leaves the door
2685                // open to a future optimization where the `DISTINCT ON` and
2686                // `ORDER BY` operations happen in one pass.
2687                //
2688                // On the bright side, any columns that have already been
2689                // computed by `ORDER BY` can be reused in the distinct key.
2690                let arity = relation_type.arity();
2691                for ord in order_by.iter().take(distinct_exprs.len()) {
2692                    // The unusual construction of `expr` here is to ensure the
2693                    // temporary column expression lives long enough.
2694                    let mut expr = &HirScalarExpr::column(ord.column);
2695                    if ord.column >= arity {
2696                        expr = &map_exprs[ord.column - arity];
2697                    };
2698                    match distinct_exprs.iter().position(move |e| e == expr) {
2699                        None => sql_bail!(
2700                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2701                        ),
2702                        Some(pos) => {
2703                            distinct_exprs.remove(pos);
2704                        }
2705                    }
2706                    distinct_key.push(ord.column);
2707                }
2708
2709                // Add any remaining `DISTINCT ON` expressions to the key.
2710                for expr in distinct_exprs {
2711                    // If the expression is a reference to an existing column,
2712                    // do not introduce a new column to support it.
2713                    let column = match expr {
2714                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2715                        _ => {
2716                            map_exprs.push(expr);
2717                            arity + map_exprs.len() - 1
2718                        }
2719                    };
2720                    distinct_key.push(column);
2721                }
2722
2723                // `DISTINCT ON` is semantically a TopK with limit 1. The
2724                // columns in `ORDER BY` that are not part of the distinct key,
2725                // if there are any, determine the ordering within each group,
2726                // per PostgreSQL semantics.
2727                let distinct_len = distinct_key.len();
2728                relation_expr = HirRelationExpr::top_k(
2729                    relation_expr.map(map_exprs),
2730                    distinct_key,
2731                    order_by.iter().skip(distinct_len).cloned().collect(),
2732                    Some(HirScalarExpr::literal(
2733                        Datum::Int64(1),
2734                        SqlScalarType::Int64,
2735                    )),
2736                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2737                    group_size_hints.distinct_on_input_group_size,
2738                );
2739            }
2740        }
2741
2742        order_by
2743    };
2744
2745    // Construct a clean scope to expose outwards, where all of the state that
2746    // accumulated in the scope during planning of this SELECT is erased. The
2747    // clean scope has at most one name for each column, and the names are not
2748    // associated with any table.
2749    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2750
2751    Ok(SelectPlan {
2752        expr: relation_expr,
2753        scope,
2754        order_by,
2755        project: project_key,
2756    })
2757}
2758
2759fn plan_scalar_table_funcs(
2760    qcx: &QueryContext,
2761    table_funcs: BTreeMap<Function<Aug>, String>,
2762    table_func_names: &mut BTreeMap<String, Ident>,
2763    relation_expr: &HirRelationExpr,
2764    from_scope: &Scope,
2765) -> Result<(HirRelationExpr, Scope), PlanError> {
2766    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2767
2768    for (table_func, id) in table_funcs.iter() {
2769        table_func_names.insert(
2770            id.clone(),
2771            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2772            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2773        );
2774    }
2775    // If there's only a single table function, we can skip generating
2776    // ordinality columns.
2777    if table_funcs.len() == 1 {
2778        let (table_func, id) = table_funcs.iter().next().unwrap();
2779        let (expr, mut scope) =
2780            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2781
2782        // A single table-function might return several columns as a record
2783        let num_cols = scope.len();
2784        for i in 0..scope.len() {
2785            scope.items[i].table_name = Some(PartialItemName {
2786                database: None,
2787                schema: None,
2788                item: id.clone(),
2789            });
2790            scope.items[i].from_single_column_function = num_cols == 1;
2791            scope.items[i].allow_unqualified_references = false;
2792        }
2793        return Ok((expr, scope));
2794    }
2795    if table_funcs.keys().any(is_repeat_row) {
2796        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
2797        // error message would be misleading, because it would refer to WITH ORDINALITY.
2798        bail_unsupported!(format!(
2799            "{} in a SELECT clause with multiple table functions",
2800            REPEAT_ROW_NAME
2801        ));
2802    }
2803    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2804    let (expr, mut scope, num_cols) =
2805        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2806
2807    // Munge the scope so table names match with the generated ids.
2808    let mut i = 0;
2809    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2810        for _ in 0..num_cols {
2811            scope.items[i].table_name = Some(PartialItemName {
2812                database: None,
2813                schema: None,
2814                item: id.clone(),
2815            });
2816            scope.items[i].from_single_column_function = num_cols == 1;
2817            scope.items[i].allow_unqualified_references = false;
2818            i += 1;
2819        }
2820        // Ordinality column. This doubles as the
2821        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2822        // because it only needs to be NULL or not.
2823        scope.items[i].table_name = Some(PartialItemName {
2824            database: None,
2825            schema: None,
2826            item: id.clone(),
2827        });
2828        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2829        scope.items[i].allow_unqualified_references = false;
2830        i += 1;
2831    }
2832    // Coalesced ordinality column.
2833    scope.items[i].allow_unqualified_references = false;
2834    Ok((expr, scope))
2835}
2836
2837/// Plans an expression in a `GROUP BY` clause.
2838///
2839/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2840/// names/expressions defined in the `SELECT` clause. These special cases are
2841/// handled by this function; see comments within the implementation for
2842/// details.
2843fn plan_group_by_expr<'a>(
2844    ecx: &ExprContext,
2845    group_expr: &'a Expr<Aug>,
2846    projection: &'a [(ExpandedSelectItem, ColumnName)],
2847) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2848    let plan_projection = |column: usize| match &projection[column].0 {
2849        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2850        ExpandedSelectItem::Expr(expr) => {
2851            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2852        }
2853    };
2854
2855    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2856    // a special case that means to use the ith item in the SELECT clause.
2857    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2858        return plan_projection(column);
2859    }
2860
2861    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2862    // The `foo` can refer to *either* an input column or an output column. If
2863    // both exist, the input column is preferred.
2864    match group_expr {
2865        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2866            Err(PlanError::UnknownColumn {
2867                table: None,
2868                column,
2869                similar,
2870            }) => {
2871                // The expression was a simple identifier that did not match an
2872                // input column. See if it matches an output column.
2873                let mut iter = projection.iter().map(|(_expr, name)| name);
2874                if let Some(i) = iter.position(|n| *n == column) {
2875                    if iter.any(|n| *n == column) {
2876                        Err(PlanError::AmbiguousColumn(column))
2877                    } else {
2878                        plan_projection(i)
2879                    }
2880                } else {
2881                    // The name didn't match an output column either. Return the
2882                    // "unknown column" error.
2883                    Err(PlanError::UnknownColumn {
2884                        table: None,
2885                        column,
2886                        similar,
2887                    })
2888                }
2889            }
2890            res => Ok((Some(group_expr), res?)),
2891        },
2892        _ => Ok((
2893            Some(group_expr),
2894            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2895        )),
2896    }
2897}
2898
2899/// Plans a slice of `ORDER BY` expressions.
2900///
2901/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2902/// parameter.
2903///
2904/// Returns the determined column orderings and a list of scalar expressions
2905/// that must be mapped onto the underlying relation expression.
2906pub(crate) fn plan_order_by_exprs(
2907    ecx: &ExprContext,
2908    order_by_exprs: &[OrderByExpr<Aug>],
2909    output_columns: &[(usize, &ColumnName)],
2910) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2911    let mut order_by = vec![];
2912    let mut map_exprs = vec![];
2913    for obe in order_by_exprs {
2914        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2915        // If the expression is a reference to an existing column,
2916        // do not introduce a new column to support it.
2917        let column = match expr {
2918            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2919            _ => {
2920                map_exprs.push(expr);
2921                ecx.relation_type.arity() + map_exprs.len() - 1
2922            }
2923        };
2924        order_by.push(resolve_desc_and_nulls_last(obe, column));
2925    }
2926    Ok((order_by, map_exprs))
2927}
2928
2929/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2930///
2931/// The `output_columns` parameter describes, in order, the physical index and
2932/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2933/// corresponds to a `SELECT` list with a single entry named "a" that can be
2934/// found at index 3 in the underlying relation expression.
2935///
2936/// There are three cases to handle.
2937///
2938///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2939///       reference to the specified output column.
2940///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2941///       an output column, if it exists; otherwise it is a reference to an
2942///       input column.
2943///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2944///       arbitrary expressions exclusively refer to input columns, never output
2945///       columns.
2946fn plan_order_by_or_distinct_expr(
2947    ecx: &ExprContext,
2948    expr: &Expr<Aug>,
2949    output_columns: &[(usize, &ColumnName)],
2950) -> Result<HirScalarExpr, PlanError> {
2951    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2952        return Ok(HirScalarExpr::column(output_columns[i].0));
2953    }
2954
2955    if let Expr::Identifier(names) = expr {
2956        if let [name] = &names[..] {
2957            let name = normalize::column_name(name.clone());
2958            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2959            if let Some((i, _)) = iter.next() {
2960                match iter.next() {
2961                    // Per SQL92, names are not considered ambiguous if they
2962                    // refer to identical target list expressions, as in
2963                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2964                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2965                    _ => return Ok(HirScalarExpr::column(*i)),
2966                }
2967            }
2968        }
2969    }
2970
2971    plan_expr(ecx, expr)?.type_as_any(ecx)
2972}
2973
2974fn plan_table_with_joins(
2975    qcx: &QueryContext,
2976    table_with_joins: &TableWithJoins<Aug>,
2977) -> Result<(HirRelationExpr, Scope), PlanError> {
2978    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2979    for join in &table_with_joins.joins {
2980        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2981        expr = new_expr;
2982        scope = new_scope;
2983    }
2984    Ok((expr, scope))
2985}
2986
2987fn plan_table_factor(
2988    qcx: &QueryContext,
2989    table_factor: &TableFactor<Aug>,
2990) -> Result<(HirRelationExpr, Scope), PlanError> {
2991    match table_factor {
2992        TableFactor::Table { name, alias } => {
2993            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2994            let scope = plan_table_alias(scope, alias.as_ref())?;
2995            Ok((expr, scope))
2996        }
2997
2998        TableFactor::Function {
2999            function,
3000            alias,
3001            with_ordinality,
3002        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3003
3004        TableFactor::RowsFrom {
3005            functions,
3006            alias,
3007            with_ordinality,
3008        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3009
3010        TableFactor::Derived {
3011            lateral,
3012            subquery,
3013            alias,
3014        } => {
3015            let mut qcx = (*qcx).clone();
3016            if !lateral {
3017                // Since this derived table was not marked as `LATERAL`,
3018                // make elements in outer scopes invisible until we reach the
3019                // next lateral barrier.
3020                for scope in &mut qcx.outer_scopes {
3021                    if scope.lateral_barrier {
3022                        break;
3023                    }
3024                    scope.items.clear();
3025                }
3026            }
3027            qcx.outer_scopes[0].lateral_barrier = true;
3028            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3029            let scope = plan_table_alias(scope, alias.as_ref())?;
3030            Ok((expr, scope))
3031        }
3032
3033        TableFactor::NestedJoin { join, alias } => {
3034            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3035            let scope = plan_table_alias(scope, alias.as_ref())?;
3036            Ok((expr, scope))
3037        }
3038    }
3039}
3040
3041/// Plans a `ROWS FROM` expression.
3042///
3043/// `ROWS FROM` concatenates table functions into a single table, filling in
3044/// `NULL`s in places where one table function has fewer rows than another. We
3045/// can achieve this by augmenting each table function with a row number, doing
3046/// a `FULL JOIN` between each table function on the row number and eventually
3047/// projecting away the row number columns. Concretely, the following query
3048/// using `ROWS FROM`
3049///
3050/// ```sql
3051/// SELECT
3052///     *
3053/// FROM
3054///     ROWS FROM (
3055///         generate_series(1, 2),
3056///         information_schema._pg_expandarray(ARRAY[9]),
3057///         generate_series(3, 6)
3058///     );
3059/// ```
3060///
3061/// is equivalent to the following query that does not use `ROWS FROM`:
3062///
3063/// ```sql
3064/// SELECT
3065///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3066/// FROM
3067///     generate_series(1, 2) WITH ORDINALITY AS gs1
3068///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3069///         ON gs1.ordinality = expand.ordinality
3070///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3071///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3072/// ```
3073///
3074/// Note the call to `coalesce` in the last join condition, which ensures that
3075/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3076///
3077/// This function creates a HirRelationExpr that follows the structure of the
3078/// latter query.
3079///
3080/// `with_ordinality` can be used to have the output expression contain a
3081/// single coalesced ordinality column at the end of the entire expression.
3082fn plan_rows_from(
3083    qcx: &QueryContext,
3084    functions: &[Function<Aug>],
3085    alias: Option<&TableAlias>,
3086    with_ordinality: bool,
3087) -> Result<(HirRelationExpr, Scope), PlanError> {
3088    // The `repeat_row` function is not supported in ROWS FROM.
3089    if functions.iter().any(is_repeat_row) {
3090        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
3091        // error message would be misleading, because it would refer to WITH ORDINALITY instead of
3092        // ROWS FROM.
3093        bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3094    }
3095
3096    // If there's only a single table function, planning proceeds as if `ROWS
3097    // FROM` hadn't been written at all.
3098    if let [function] = functions {
3099        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3100    }
3101
3102    // Per PostgreSQL, all scope items take the name of the first function
3103    // (unless aliased).
3104    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3105    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3106        qcx,
3107        functions,
3108        Some(functions[0].name.full_item_name().clone()),
3109    )?;
3110
3111    // Columns tracks the set of columns we will keep in the projection.
3112    let mut columns = Vec::new();
3113    let mut offset = 0;
3114    // Retain table function's non-ordinality columns.
3115    for (idx, cols) in num_cols.into_iter().enumerate() {
3116        for i in 0..cols {
3117            columns.push(offset + i);
3118        }
3119        offset += cols + 1;
3120
3121        // Remove the ordinality column from the scope, accounting for previous scope
3122        // changes from this loop.
3123        scope.items.remove(offset - idx - 1);
3124    }
3125
3126    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3127    // column. Otherwise remove it from the scope.
3128    if with_ordinality {
3129        columns.push(offset);
3130    } else {
3131        scope.items.pop();
3132    }
3133
3134    let expr = expr.project(columns);
3135
3136    let scope = plan_table_alias(scope, alias)?;
3137    Ok((expr, scope))
3138}
3139
3140fn is_repeat_row(f: &Function<Aug>) -> bool {
3141    f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3142}
3143
3144/// Plans an expression coalescing multiple table functions. Each table
3145/// function is followed by its row ordinality. The entire expression is
3146/// followed by the coalesced row ordinality.
3147///
3148/// The returned Scope will set all item's table_name's to the `table_name`
3149/// parameter if it is `Some`. If `None`, they will be the name of each table
3150/// function.
3151///
3152/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3153/// each table function.
3154///
3155/// For example, with table functions tf1 returning 1 column (a) and tf2
3156/// returning 2 columns (b, c), this function will return an expr 6 columns:
3157///
3158/// - tf1.a
3159/// - tf1.ordinality
3160/// - tf2.b
3161/// - tf2.c
3162/// - tf2.ordinality
3163/// - coalesced_ordinality
3164///
3165/// And a `Vec<usize>` of `[1, 2]`.
3166fn plan_rows_from_internal<'a>(
3167    qcx: &QueryContext,
3168    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3169    table_name: Option<FullItemName>,
3170) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3171    let mut functions = functions.into_iter();
3172    let mut num_cols = Vec::new();
3173
3174    // Join together each of the table functions in turn. The last column is
3175    // always the column to join against and is maintained to be the coalescence
3176    // of the row number column for all prior functions.
3177    let (mut left_expr, mut left_scope) =
3178        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3179    num_cols.push(left_scope.len() - 1);
3180    // Create the coalesced ordinality column.
3181    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3182    left_scope
3183        .items
3184        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3185
3186    for function in functions {
3187        // The right hand side of a join must be planned in a new scope.
3188        let qcx = qcx.empty_derived_context();
3189        let (right_expr, mut right_scope) =
3190            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3191        num_cols.push(right_scope.len() - 1);
3192        let left_col = left_scope.len() - 1;
3193        let right_col = left_scope.len() + right_scope.len() - 1;
3194        let on = HirScalarExpr::call_binary(
3195            HirScalarExpr::column(left_col),
3196            HirScalarExpr::column(right_col),
3197            expr_func::Eq,
3198        );
3199        left_expr = left_expr
3200            .join(right_expr, on, JoinKind::FullOuter)
3201            .map(vec![HirScalarExpr::call_variadic(
3202                Coalesce,
3203                vec![
3204                    HirScalarExpr::column(left_col),
3205                    HirScalarExpr::column(right_col),
3206                ],
3207            )]);
3208
3209        // Project off the previous iteration's coalesced column, but keep both of this
3210        // iteration's ordinality columns.
3211        left_expr = left_expr.project(
3212            (0..left_col) // non-coalesced ordinality columns from left function
3213                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3214                .collect(),
3215        );
3216        // Move the coalesced ordinality column.
3217        right_scope.items.push(left_scope.items.pop().unwrap());
3218
3219        left_scope.items.extend(right_scope.items);
3220    }
3221
3222    Ok((left_expr, left_scope, num_cols))
3223}
3224
3225/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3226/// FROM` clause that contains other table functions. Special aliasing rules
3227/// apply.
3228fn plan_solitary_table_function(
3229    qcx: &QueryContext,
3230    function: &Function<Aug>,
3231    alias: Option<&TableAlias>,
3232    with_ordinality: bool,
3233) -> Result<(HirRelationExpr, Scope), PlanError> {
3234    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3235
3236    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3237    if single_column_function {
3238        let item = &mut scope.items[0];
3239
3240        // Mark that the function only produced a single column. This impacts
3241        // whole-row references.
3242        item.from_single_column_function = true;
3243
3244        // Strange special case for solitary table functions that output one
3245        // column whose name matches the name of the table function. If a table
3246        // alias is provided, the column name is changed to the table alias's
3247        // name. Concretely, the following query returns a column named `x`
3248        // rather than a column named `generate_series`:
3249        //
3250        //     SELECT * FROM generate_series(1, 5) AS x
3251        //
3252        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3253        // since its output column is explicitly named `value`, not
3254        // `jsonb_array_elements`.
3255        //
3256        // Note also that we may (correctly) change the column name again when
3257        // we plan the table alias below if the `alias.columns` is non-empty.
3258        if let Some(alias) = alias {
3259            if let ScopeItem {
3260                table_name: Some(table_name),
3261                column_name,
3262                ..
3263            } = item
3264            {
3265                if table_name.item.as_str() == column_name.as_str() {
3266                    *column_name = normalize::column_name(alias.name.clone());
3267                }
3268            }
3269        }
3270    }
3271
3272    let scope = plan_table_alias(scope, alias)?;
3273    Ok((expr, scope))
3274}
3275
3276/// Plans a table function.
3277///
3278/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3279/// instead to get the appropriate aliasing behavior.
3280fn plan_table_function_internal(
3281    qcx: &QueryContext,
3282    Function {
3283        name,
3284        args,
3285        filter,
3286        over,
3287        distinct,
3288    }: &Function<Aug>,
3289    with_ordinality: bool,
3290    table_name: Option<FullItemName>,
3291) -> Result<(HirRelationExpr, Scope), PlanError> {
3292    // The parser rejects FILTER, OVER, and DISTINCT in every table function
3293    // position (`FROM f(...)`, `ROWS FROM (...)`), and table functions in
3294    // scalar position are only lifted into a `FROM` clause when all three are
3295    // absent, so these are defensive.
3296    if filter.is_some() {
3297        sql_bail!("FILTER is not allowed for table functions in FROM");
3298    }
3299    if over.is_some() {
3300        sql_bail!("OVER is not allowed for table functions in FROM");
3301    }
3302    if *distinct {
3303        sql_bail!("DISTINCT is not allowed for table functions in FROM");
3304    }
3305
3306    let ecx = &ExprContext {
3307        qcx,
3308        name: "table function arguments",
3309        scope: &Scope::empty(),
3310        relation_type: &SqlRelationType::empty(),
3311        allow_aggregates: false,
3312        allow_subqueries: true,
3313        allow_parameters: true,
3314        allow_windows: false,
3315    };
3316
3317    let scalar_args = match args {
3318        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3319        FunctionArgs::Args { args, order_by } => {
3320            if !order_by.is_empty() {
3321                sql_bail!(
3322                    "ORDER BY specified, but {} is not an aggregate function",
3323                    name
3324                );
3325            }
3326            plan_exprs(ecx, args)?
3327        }
3328    };
3329
3330    let table_name = match table_name {
3331        Some(table_name) => table_name.item,
3332        None => name.full_item_name().item.clone(),
3333    };
3334
3335    let scope_name = Some(PartialItemName {
3336        database: None,
3337        schema: None,
3338        item: table_name,
3339    });
3340
3341    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3342        Func::Table(impls) => {
3343            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3344            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3345            let expr = match tf.imp {
3346                TableFuncImpl::CallTable { mut func, exprs } => {
3347                    if with_ordinality {
3348                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3349                            PlanError::Unsupported {
3350                                feature: format!("WITH ORDINALITY on {}", func),
3351                                discussion_no: None,
3352                            },
3353                        )?;
3354                    }
3355                    HirRelationExpr::CallTable { func, exprs }
3356                }
3357                TableFuncImpl::Expr(expr) => {
3358                    if !with_ordinality {
3359                        expr
3360                    } else {
3361                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3362                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3363                        // back to the legacy implementation or error out the query.
3364                        if qcx
3365                            .scx
3366                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3367                        {
3368                            // Note that this can give an incorrect ordering, and also has an extreme
3369                            // performance problem in some cases. See the doc comment of
3370                            // `TableFuncImpl`.
3371                            tracing::error!(
3372                                %name,
3373                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3374                            );
3375                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3376                                func: WindowExprType::Scalar(ScalarWindowExpr {
3377                                    func: ScalarWindowFunc::RowNumber,
3378                                    order_by: vec![],
3379                                }),
3380                                partition_by: vec![],
3381                                order_by: vec![],
3382                            })])
3383                        } else {
3384                            bail_unsupported!(format!(
3385                                "WITH ORDINALITY or ROWS FROM with {}",
3386                                name
3387                            ));
3388                        }
3389                    }
3390                }
3391            };
3392            (expr, scope)
3393        }
3394        Func::Scalar(impls) => {
3395            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3396            let output = expr.typ(
3397                &qcx.outer_relation_types,
3398                &SqlRelationType::new(vec![]),
3399                &qcx.scx.param_types.borrow(),
3400            );
3401
3402            let relation = SqlRelationType::new(vec![output]);
3403
3404            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3405            let column_name = normalize::column_name(function_ident);
3406            let name = column_name.to_string();
3407
3408            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3409
3410            let mut func = TableFunc::TabletizedScalar { relation, name };
3411            if with_ordinality {
3412                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3413                    feature: format!("WITH ORDINALITY on {}", func),
3414                    discussion_no: None,
3415                })?;
3416            }
3417            (
3418                HirRelationExpr::CallTable {
3419                    func,
3420                    exprs: vec![expr],
3421                },
3422                scope,
3423            )
3424        }
3425        o => sql_bail!(
3426            "{} functions are not supported in functions in FROM",
3427            o.class()
3428        ),
3429    };
3430
3431    if with_ordinality {
3432        scope
3433            .items
3434            .push(ScopeItem::from_name(scope_name, "ordinality"));
3435    }
3436
3437    Ok((expr, scope))
3438}
3439
3440fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3441    if let Some(TableAlias {
3442        name,
3443        columns,
3444        strict,
3445    }) = alias
3446    {
3447        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3448            sql_bail!(
3449                "{} has {} columns available but {} columns specified",
3450                name,
3451                scope.items.len(),
3452                columns.len()
3453            );
3454        }
3455
3456        let table_name = normalize::ident(name.to_owned());
3457        for (i, item) in scope.items.iter_mut().enumerate() {
3458            item.table_name = if item.allow_unqualified_references {
3459                Some(PartialItemName {
3460                    database: None,
3461                    schema: None,
3462                    item: table_name.clone(),
3463                })
3464            } else {
3465                // Columns that prohibit unqualified references are special
3466                // columns from the output of a NATURAL or USING join that can
3467                // only be referenced by their full, pre-join name. Applying an
3468                // alias to the output of that join renders those columns
3469                // inaccessible, which we accomplish here by setting the
3470                // table name to `None`.
3471                //
3472                // Concretely, consider:
3473                //
3474                //      CREATE TABLE t1 (a int);
3475                //      CREATE TABLE t2 (a int);
3476                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3477                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3478                //
3479                // In (1), the join has no alias. The underlying columns from
3480                // either side of the join can be referenced as `t1.a` and
3481                // `t2.a`, respectively, and the unqualified name `a` refers to
3482                // a column whose value is `coalesce(t1.a, t2.a)`.
3483                //
3484                // In (2), the join is aliased as `t`. The columns from either
3485                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3486                // the coalesced column can be named as either `a` or `t.a`.
3487                //
3488                // We previously had a bug [0] that mishandled this subtle
3489                // logic.
3490                //
3491                // NOTE(benesch): We could in theory choose to project away
3492                // those inaccessible columns and drop them from the scope
3493                // entirely, but that would require that this function also
3494                // take and return the `HirRelationExpr` that is being aliased,
3495                // which is a rather large refactor.
3496                //
3497                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3498                None
3499            };
3500            item.column_name = columns
3501                .get(i)
3502                .map(|a| normalize::column_name(a.clone()))
3503                .unwrap_or_else(|| item.column_name.clone());
3504        }
3505    }
3506    Ok(scope)
3507}
3508
3509// `table_func_names` is a mapping from a UUID to the original function
3510// name. The UUIDs are identifiers that have been rewritten from some table
3511// function expression, and this mapping restores the original names.
3512fn invent_column_name(
3513    ecx: &ExprContext,
3514    expr: &Expr<Aug>,
3515    table_func_names: &BTreeMap<String, Ident>,
3516) -> Result<Option<ColumnName>, PlanError> {
3517    // We follow PostgreSQL exactly here, which has some complicated rules
3518    // around "high" and "low" quality names. Low quality names override other
3519    // low quality names but not high quality names.
3520    //
3521    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3522
3523    #[derive(Debug)]
3524    enum NameQuality {
3525        Low,
3526        High,
3527    }
3528
3529    fn invent(
3530        ecx: &ExprContext,
3531        expr: &Expr<Aug>,
3532        table_func_names: &BTreeMap<String, Ident>,
3533    ) -> Result<Option<(ColumnName, NameQuality)>, PlanError> {
3534        Ok(match expr {
3535            Expr::Identifier(names) => {
3536                if let [name] = names.as_slice() {
3537                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3538                        return Ok(Some((
3539                            normalize::column_name(table_func_name.clone()),
3540                            NameQuality::High,
3541                        )));
3542                    }
3543                }
3544                names
3545                    .last()
3546                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3547            }
3548            Expr::Value(v) => match v {
3549                // Per PostgreSQL, `bool` and `interval` literals take on the name
3550                // of their type, but not other literal types.
3551                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3552                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3553                _ => None,
3554            },
3555            Expr::Function(func) => {
3556                let (schema, item) = match &func.name {
3557                    ResolvedItemName::Item {
3558                        qualifiers,
3559                        full_name,
3560                        ..
3561                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3562                    // Name resolution should have rejected anything other than
3563                    // `Item` for a function call.
3564                    _ => {
3565                        bail_internal!("function name did not resolve to an item: {:?}", func.name)
3566                    }
3567                };
3568
3569                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3570                    || schema
3571                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3572                {
3573                    None
3574                } else {
3575                    Some((item.into(), NameQuality::High))
3576                }
3577            }
3578            Expr::HomogenizingFunction { function, .. } => Some((
3579                function.to_string().to_lowercase().into(),
3580                NameQuality::High,
3581            )),
3582            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3583            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3584            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3585            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3586            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names)? {
3587                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3588                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3589            },
3590            Expr::Case { else_result, .. } => {
3591                let inner = match else_result.as_ref() {
3592                    Some(else_result) => invent(ecx, else_result, table_func_names)?,
3593                    None => None,
3594                };
3595                match inner {
3596                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3597                    _ => Some(("case".into(), NameQuality::Low)),
3598                }
3599            }
3600            Expr::FieldAccess { field, .. } => {
3601                Some((normalize::column_name(field.clone()), NameQuality::High))
3602            }
3603            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3604            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names)?,
3605            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3606                // A bit silly to have to plan the query here just to get its column
3607                // name, since we throw away the planned expression, but fixing this
3608                // requires a separate semantic analysis phase.
3609                //
3610                // We deliberately swallow planning errors here: if the subquery
3611                // doesn't plan, we just don't invent a name for it; the real
3612                // planning attempt elsewhere will surface the error.
3613                let Ok((_expr, scope)) = plan_nested_query(&mut ecx.derived_query_context(), query)
3614                else {
3615                    return Ok(None);
3616                };
3617                scope
3618                    .items
3619                    .first()
3620                    .map(|name| (name.column_name.clone(), NameQuality::High))
3621            }
3622            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3623            _ => None,
3624        })
3625    }
3626
3627    Ok(invent(ecx, expr, table_func_names)?.map(|(name, _quality)| name))
3628}
3629
3630#[derive(Debug)]
3631enum ExpandedSelectItem<'a> {
3632    InputOrdinal(usize),
3633    Expr(Cow<'a, Expr<Aug>>),
3634}
3635
3636impl ExpandedSelectItem<'_> {
3637    fn as_expr(&self) -> Option<&Expr<Aug>> {
3638        match self {
3639            ExpandedSelectItem::InputOrdinal(_) => None,
3640            ExpandedSelectItem::Expr(expr) => Some(expr),
3641        }
3642    }
3643}
3644
3645fn expand_select_item<'a>(
3646    ecx: &ExprContext,
3647    s: &'a SelectItem<Aug>,
3648    table_func_names: &BTreeMap<String, Ident>,
3649) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3650    match s {
3651        SelectItem::Expr {
3652            expr: Expr::QualifiedWildcard(table_name),
3653            alias: _,
3654        } => {
3655            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3656            let table_name =
3657                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3658            let out: Vec<_> = ecx
3659                .scope
3660                .items
3661                .iter()
3662                .enumerate()
3663                .filter(|(_i, item)| item.is_from_table(&table_name))
3664                .map(|(i, item)| {
3665                    let name = item.column_name.clone();
3666                    (ExpandedSelectItem::InputOrdinal(i), name)
3667                })
3668                .collect();
3669            if out.is_empty() {
3670                sql_bail!("no table named '{}' in scope", table_name);
3671            }
3672            Ok(out)
3673        }
3674        SelectItem::Expr {
3675            expr: Expr::WildcardAccess(sql_expr),
3676            alias: _,
3677        } => {
3678            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3679            // A bit silly to have to plan the expression here just to get its
3680            // type, since we throw away the planned expression, but fixing this
3681            // requires a separate semantic analysis phase. Luckily this is an
3682            // uncommon operation and the PostgreSQL docs have a warning that
3683            // this operation is slow in Postgres too.
3684            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3685            let fields = match ecx.scalar_type(&expr) {
3686                SqlScalarType::Record { fields, .. } => fields,
3687                ty => sql_bail!(
3688                    "type {} is not composite",
3689                    ecx.humanize_sql_scalar_type(&ty, false)
3690                ),
3691            };
3692            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3693            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3694                if let [name] = ident.as_slice() {
3695                    if let Ok(items) = ecx.scope.items_from_table(
3696                        &[],
3697                        &PartialItemName {
3698                            database: None,
3699                            schema: None,
3700                            item: name.as_str().to_string(),
3701                        },
3702                    ) {
3703                        for (_, item) in items {
3704                            if item
3705                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3706                            {
3707                                skip_cols.insert(item.column_name.clone());
3708                            }
3709                        }
3710                    }
3711                }
3712            }
3713            let items = fields
3714                .iter()
3715                .filter_map(|(name, _ty)| {
3716                    if skip_cols.contains(name) {
3717                        None
3718                    } else {
3719                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3720                            expr: sql_expr.clone(),
3721                            field: name.clone().into(),
3722                        }));
3723                        Some((item, name.clone()))
3724                    }
3725                })
3726                .collect();
3727            Ok(items)
3728        }
3729        SelectItem::Wildcard => {
3730            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3731            let items: Vec<_> = ecx
3732                .scope
3733                .items
3734                .iter()
3735                .enumerate()
3736                .filter(|(_i, item)| item.allow_unqualified_references)
3737                .map(|(i, item)| {
3738                    let name = item.column_name.clone();
3739                    (ExpandedSelectItem::InputOrdinal(i), name)
3740                })
3741                .collect();
3742
3743            Ok(items)
3744        }
3745        SelectItem::Expr { expr, alias } => {
3746            let name = match alias.clone().map(normalize::column_name) {
3747                Some(name) => name,
3748                None => invent_column_name(ecx, expr, table_func_names)?
3749                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into()),
3750            };
3751            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3752        }
3753    }
3754}
3755
3756fn plan_join(
3757    left_qcx: &QueryContext,
3758    left: HirRelationExpr,
3759    left_scope: Scope,
3760    join: &Join<Aug>,
3761) -> Result<(HirRelationExpr, Scope), PlanError> {
3762    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3763    let (kind, constraint) = match &join.join_operator {
3764        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3765        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3766        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3767        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3768        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3769    };
3770
3771    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3772    if !kind.can_be_correlated() {
3773        for item in &mut right_qcx.outer_scopes[0].items {
3774            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3775            // these items from scope. These items need to *exist* because they
3776            // might shadow variables in outer scopes that would otherwise be
3777            // valid to reference, but accessing them needs to produce an error.
3778            item.error_if_referenced =
3779                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3780                    table: table.cloned(),
3781                    column: column.clone(),
3782                });
3783        }
3784    }
3785    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3786
3787    let (expr, scope) = match constraint {
3788        JoinConstraint::On(expr) => {
3789            let product_scope = left_scope.product(right_scope)?;
3790            let ecx = &ExprContext {
3791                qcx: left_qcx,
3792                name: "ON clause",
3793                scope: &product_scope,
3794                relation_type: &SqlRelationType::new(
3795                    left_qcx
3796                        .relation_type(&left)
3797                        .column_types
3798                        .into_iter()
3799                        .chain(right_qcx.relation_type(&right).column_types)
3800                        .collect(),
3801                ),
3802                allow_aggregates: false,
3803                allow_subqueries: true,
3804                allow_parameters: true,
3805                allow_windows: false,
3806            };
3807            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3808            let joined = left.join(right, on, kind);
3809            (joined, product_scope)
3810        }
3811        JoinConstraint::Using { columns, alias } => {
3812            let column_names = columns
3813                .iter()
3814                .map(|ident| normalize::column_name(ident.clone()))
3815                .collect::<Vec<_>>();
3816
3817            plan_using_constraint(
3818                &column_names,
3819                left_qcx,
3820                left,
3821                left_scope,
3822                &right_qcx,
3823                right,
3824                right_scope,
3825                kind,
3826                alias.as_ref(),
3827            )?
3828        }
3829        JoinConstraint::Natural => {
3830            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3831            // have the same scx. However, it doesn't hurt to be safe.
3832            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3833            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3834            let left_column_names = left_scope.column_names();
3835            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3836            let column_names: Vec<_> = left_column_names
3837                .filter(|col| right_column_names.contains(col))
3838                .cloned()
3839                .collect();
3840            plan_using_constraint(
3841                &column_names,
3842                left_qcx,
3843                left,
3844                left_scope,
3845                &right_qcx,
3846                right,
3847                right_scope,
3848                kind,
3849                None,
3850            )?
3851        }
3852    };
3853    Ok((expr, scope))
3854}
3855
3856// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3857#[allow(clippy::too_many_arguments)]
3858fn plan_using_constraint(
3859    column_names: &[ColumnName],
3860    left_qcx: &QueryContext,
3861    left: HirRelationExpr,
3862    left_scope: Scope,
3863    right_qcx: &QueryContext,
3864    right: HirRelationExpr,
3865    right_scope: Scope,
3866    kind: JoinKind,
3867    alias: Option<&Ident>,
3868) -> Result<(HirRelationExpr, Scope), PlanError> {
3869    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3870
3871    // Cargo culting PG here; no discernable reason this must fail, but PG does
3872    // so we do, as well.
3873    let mut unique_column_names = BTreeSet::new();
3874    for c in column_names {
3875        if !unique_column_names.insert(c) {
3876            return Err(PlanError::Unsupported {
3877                feature: format!(
3878                    "column name {} appears more than once in USING clause",
3879                    c.quoted()
3880                ),
3881                discussion_no: None,
3882            });
3883        }
3884    }
3885
3886    let alias_item_name = alias.map(|alias| PartialItemName {
3887        database: None,
3888        schema: None,
3889        item: alias.clone().to_string(),
3890    });
3891
3892    if let Some(alias_item_name) = &alias_item_name {
3893        for partial_item_name in both_scope.table_names() {
3894            if partial_item_name.matches(alias_item_name) {
3895                sql_bail!(
3896                    "table name \"{}\" specified more than once",
3897                    alias_item_name
3898                )
3899            }
3900        }
3901    }
3902
3903    let ecx = &ExprContext {
3904        qcx: right_qcx,
3905        name: "USING clause",
3906        scope: &both_scope,
3907        relation_type: &SqlRelationType::new(
3908            left_qcx
3909                .relation_type(&left)
3910                .column_types
3911                .into_iter()
3912                .chain(right_qcx.relation_type(&right).column_types)
3913                .collect(),
3914        ),
3915        allow_aggregates: false,
3916        allow_subqueries: false,
3917        allow_parameters: false,
3918        allow_windows: false,
3919    };
3920
3921    let mut join_exprs = vec![];
3922    let mut map_exprs = vec![];
3923    let mut new_items = vec![];
3924    let mut join_cols = vec![];
3925    let mut hidden_cols = vec![];
3926
3927    for column_name in column_names {
3928        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3929        let (lhs, lhs_name) = left_scope.resolve_using_column(
3930            column_name,
3931            JoinSide::Left,
3932            &mut left_qcx.name_manager.borrow_mut(),
3933        )?;
3934        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3935            column_name,
3936            JoinSide::Right,
3937            &mut right_qcx.name_manager.borrow_mut(),
3938        )?;
3939
3940        // Adjust the RHS reference to its post-join location.
3941        rhs.column += left_scope.len();
3942
3943        // Join keys must be resolved to same type.
3944        let mut exprs = coerce_homogeneous_exprs(
3945            &ecx.with_name(&format!(
3946                "NATURAL/USING join column {}",
3947                column_name.quoted()
3948            )),
3949            vec![
3950                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3951                    lhs,
3952                    Arc::clone(&lhs_name),
3953                )),
3954                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3955                    rhs,
3956                    Arc::clone(&rhs_name),
3957                )),
3958            ],
3959            None,
3960        )?;
3961        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3962
3963        match kind {
3964            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3965                join_cols.push(lhs.column);
3966                hidden_cols.push(rhs.column);
3967            }
3968            JoinKind::RightOuter => {
3969                join_cols.push(rhs.column);
3970                hidden_cols.push(lhs.column);
3971            }
3972            JoinKind::FullOuter => {
3973                // Create a new column that will be the coalesced value of left
3974                // and right.
3975                join_cols.push(both_scope.items.len() + map_exprs.len());
3976                hidden_cols.push(lhs.column);
3977                hidden_cols.push(rhs.column);
3978                map_exprs.push(HirScalarExpr::call_variadic(
3979                    Coalesce,
3980                    vec![expr1.clone(), expr2.clone()],
3981                ));
3982                new_items.push(ScopeItem::from_column_name(column_name));
3983            }
3984        }
3985
3986        // If a `join_using_alias` is present, add a new scope item that accepts
3987        // only table-qualified references for each specified join column.
3988        // Unlike regular table aliases, a `join_using_alias` should not hide the
3989        // names of the joined relations.
3990        if alias_item_name.is_some() {
3991            let new_item_col = both_scope.items.len() + new_items.len();
3992            join_cols.push(new_item_col);
3993            hidden_cols.push(new_item_col);
3994
3995            new_items.push(ScopeItem::from_name(
3996                alias_item_name.clone(),
3997                column_name.clone().to_string(),
3998            ));
3999
4000            // The aliased column `alias.col` must take the same value as the
4001            // unqualified join output column `col`. For INNER and LEFT joins
4002            // that's the LHS value, for RIGHT joins it's the RHS value, and
4003            // for FULL OUTER joins it's COALESCE(lhs, rhs). Using `lhs`
4004            // unconditionally produces wrong results for RIGHT/FULL joins on
4005            // rows where the LHS side is NULL.
4006            let alias_expr = match kind {
4007                JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
4008                    HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name))
4009                }
4010                JoinKind::RightOuter => HirScalarExpr::named_column(rhs, Arc::clone(&rhs_name)),
4011                JoinKind::FullOuter => {
4012                    HirScalarExpr::call_variadic(Coalesce, vec![expr1.clone(), expr2.clone()])
4013                }
4014            };
4015            map_exprs.push(alias_expr);
4016        }
4017
4018        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4019    }
4020    both_scope.items.extend(new_items);
4021
4022    // The columns from the secondary side of the join remain accessible by
4023    // their table-qualified name, but not by their column name alone. They are
4024    // also excluded from `SELECT *`.
4025    for c in hidden_cols {
4026        both_scope.items[c].allow_unqualified_references = false;
4027    }
4028
4029    // Reproject all returned elements to the front of the list.
4030    let project_key = join_cols
4031        .into_iter()
4032        .chain(0..both_scope.items.len())
4033        .unique()
4034        .collect::<Vec<_>>();
4035
4036    both_scope = both_scope.project(&project_key);
4037
4038    let on = HirScalarExpr::variadic_and(join_exprs);
4039
4040    let both = left
4041        .join(right, on, kind)
4042        .map(map_exprs)
4043        .project(project_key);
4044    Ok((both, both_scope))
4045}
4046
4047pub fn plan_expr<'a>(
4048    ecx: &'a ExprContext,
4049    e: &Expr<Aug>,
4050) -> Result<CoercibleScalarExpr, PlanError> {
4051    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4052}
4053
4054fn plan_expr_inner<'a>(
4055    ecx: &'a ExprContext,
4056    e: &Expr<Aug>,
4057) -> Result<CoercibleScalarExpr, PlanError> {
4058    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4059        // We've already calculated this expression.
4060        return Ok(HirScalarExpr::named_column(
4061            i,
4062            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4063        )
4064        .into());
4065    }
4066
4067    match e {
4068        // Names.
4069        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4070            Ok(plan_identifier(ecx, names)?.into())
4071        }
4072
4073        // Literals.
4074        Expr::Value(val) => plan_literal(val),
4075        Expr::Parameter(n) => plan_parameter(ecx, *n),
4076        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4077        Expr::List(exprs) => plan_list(ecx, exprs, None),
4078        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4079        Expr::Row { exprs } => plan_row(ecx, exprs),
4080
4081        // Generalized functions, operators, and casts.
4082        Expr::Op { op, expr1, expr2 } => {
4083            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4084        }
4085        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4086        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4087
4088        // Special functions and operators.
4089        Expr::Not { expr } => plan_not(ecx, expr),
4090        Expr::And { left, right } => plan_and(ecx, left, right),
4091        Expr::Or { left, right } => plan_or(ecx, left, right),
4092        Expr::IsExpr {
4093            expr,
4094            construct,
4095            negated,
4096        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4097        Expr::Case {
4098            operand,
4099            conditions,
4100            results,
4101            else_result,
4102        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4103        Expr::HomogenizingFunction { function, exprs } => {
4104            plan_homogenizing_function(ecx, function, exprs)
4105        }
4106        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4107            ecx,
4108            &None,
4109            &[l_expr.clone().equals(*r_expr.clone())],
4110            &[Expr::null()],
4111            &Some(Box::new(*l_expr.clone())),
4112        )?
4113        .into()),
4114        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4115        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4116        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4117        Expr::Like {
4118            expr,
4119            pattern,
4120            escape,
4121            case_insensitive,
4122            negated,
4123        } => Ok(plan_like(
4124            ecx,
4125            expr,
4126            pattern,
4127            escape.as_deref(),
4128            *case_insensitive,
4129            *negated,
4130        )?
4131        .into()),
4132
4133        Expr::InList {
4134            expr,
4135            list,
4136            negated,
4137        } => plan_in_list(ecx, expr, list, negated),
4138
4139        // Subqueries.
4140        Expr::Exists(query) => plan_exists(ecx, query),
4141        Expr::Subquery(query) => plan_subquery(ecx, query),
4142        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4143        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4144        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4145        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4146        Expr::Nested(_) => bail_internal!("Expr::Nested should have been desugared"),
4147        Expr::InSubquery { .. } => {
4148            bail_internal!("Expr::InSubquery should have been desugared")
4149        }
4150        Expr::AnyExpr { .. } => {
4151            bail_internal!("Expr::AnyExpr should have been desugared")
4152        }
4153        Expr::AllExpr { .. } => {
4154            bail_internal!("Expr::AllExpr should have been desugared")
4155        }
4156        Expr::AnySubquery { .. } => {
4157            bail_internal!("Expr::AnySubquery should have been desugared")
4158        }
4159        Expr::AllSubquery { .. } => {
4160            bail_internal!("Expr::AllSubquery should have been desugared")
4161        }
4162        Expr::Between { .. } => {
4163            bail_internal!("Expr::Between should have been desugared")
4164        }
4165    }
4166}
4167
4168fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4169    if !ecx.allow_parameters {
4170        // It might be clearer to return an error like "cannot use parameter
4171        // here", but this is how PostgreSQL does it, and so for now we follow
4172        // PostgreSQL.
4173        return Err(PlanError::UnknownParameter(n));
4174    }
4175    if n == 0 || n > 65536 {
4176        return Err(PlanError::UnknownParameter(n));
4177    }
4178    if ecx.param_types().borrow().contains_key(&n) {
4179        Ok(HirScalarExpr::parameter(n).into())
4180    } else {
4181        Ok(CoercibleScalarExpr::Parameter(n))
4182    }
4183}
4184
4185fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4186    let mut out = vec![];
4187    for e in exprs {
4188        out.push(plan_expr(ecx, e)?);
4189    }
4190    Ok(CoercibleScalarExpr::LiteralRecord(out))
4191}
4192
4193fn plan_cast(
4194    ecx: &ExprContext,
4195    expr: &Expr<Aug>,
4196    data_type: &ResolvedDataType,
4197) -> Result<CoercibleScalarExpr, PlanError> {
4198    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4199    let expr = match expr {
4200        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4201        // we can pass in the target type as a type hint. This is
4202        // a limited form of the coercion that we do for string literals
4203        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4204        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4205        // PostgreSQL compatibility trouble.
4206        //
4207        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4208        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4209        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4210        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4211        _ => plan_expr(ecx, expr)?,
4212    };
4213    let ecx = &ecx.with_name("CAST");
4214    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4215    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4216    Ok(expr.into())
4217}
4218
4219fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4220    let ecx = ecx.with_name("NOT argument");
4221    Ok(plan_expr(&ecx, expr)?
4222        .type_as(&ecx, &SqlScalarType::Bool)?
4223        .call_unary(UnaryFunc::Not(expr_func::Not))
4224        .into())
4225}
4226
4227fn plan_and(
4228    ecx: &ExprContext,
4229    left: &Expr<Aug>,
4230    right: &Expr<Aug>,
4231) -> Result<CoercibleScalarExpr, PlanError> {
4232    let ecx = ecx.with_name("AND argument");
4233    Ok(HirScalarExpr::variadic_and(vec![
4234        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4235        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4236    ])
4237    .into())
4238}
4239
4240fn plan_or(
4241    ecx: &ExprContext,
4242    left: &Expr<Aug>,
4243    right: &Expr<Aug>,
4244) -> Result<CoercibleScalarExpr, PlanError> {
4245    let ecx = ecx.with_name("OR argument");
4246    Ok(HirScalarExpr::variadic_or(vec![
4247        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4248        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4249    ])
4250    .into())
4251}
4252
4253fn plan_in_list(
4254    ecx: &ExprContext,
4255    lhs: &Expr<Aug>,
4256    list: &Vec<Expr<Aug>>,
4257    negated: &bool,
4258) -> Result<CoercibleScalarExpr, PlanError> {
4259    let ecx = ecx.with_name("IN list");
4260    let or = HirScalarExpr::variadic_or(
4261        list.into_iter()
4262            .map(|e| {
4263                let eq = lhs.clone().equals(e.clone());
4264                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4265            })
4266            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4267    );
4268    Ok(if *negated {
4269        or.call_unary(UnaryFunc::Not(expr_func::Not))
4270    } else {
4271        or
4272    }
4273    .into())
4274}
4275
4276fn plan_homogenizing_function(
4277    ecx: &ExprContext,
4278    function: &HomogenizingFunction,
4279    exprs: &[Expr<Aug>],
4280) -> Result<CoercibleScalarExpr, PlanError> {
4281    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4282    let expr = HirScalarExpr::call_variadic(
4283        match function {
4284            HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4285            HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4286            HomogenizingFunction::Least => VariadicFunc::from(Least),
4287        },
4288        coerce_homogeneous_exprs(
4289            &ecx.with_name(&function.to_string().to_lowercase()),
4290            plan_exprs(ecx, exprs)?,
4291            None,
4292        )?,
4293    );
4294    Ok(expr.into())
4295}
4296
4297fn plan_field_access(
4298    ecx: &ExprContext,
4299    expr: &Expr<Aug>,
4300    field: &Ident,
4301) -> Result<CoercibleScalarExpr, PlanError> {
4302    let field = normalize::column_name(field.clone());
4303    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4304    let ty = ecx.scalar_type(&expr);
4305    let i = match &ty {
4306        SqlScalarType::Record { fields, .. } => {
4307            fields.iter().position(|(name, _ty)| *name == field)
4308        }
4309        ty => sql_bail!(
4310            "column notation applied to type {}, which is not a composite type",
4311            ecx.humanize_sql_scalar_type(ty, false)
4312        ),
4313    };
4314    match i {
4315        None => sql_bail!(
4316            "field {} not found in data type {}",
4317            field,
4318            ecx.humanize_sql_scalar_type(&ty, false)
4319        ),
4320        Some(i) => Ok(expr
4321            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4322            .into()),
4323    }
4324}
4325
4326fn plan_subscript(
4327    ecx: &ExprContext,
4328    expr: &Expr<Aug>,
4329    positions: &[SubscriptPosition<Aug>],
4330) -> Result<CoercibleScalarExpr, PlanError> {
4331    assert!(
4332        !positions.is_empty(),
4333        "subscript expression must contain at least one position"
4334    );
4335
4336    let ecx = &ecx.with_name("subscripting");
4337    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4338    let ty = ecx.scalar_type(&expr);
4339    match &ty {
4340        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4341            ecx,
4342            expr,
4343            positions,
4344            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4345            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4346            // track in its backing data).
4347            if ty == SqlScalarType::Int2Vector {
4348                1
4349            } else {
4350                0
4351            },
4352        ),
4353        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4354        SqlScalarType::List { element_type, .. } => {
4355            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4356            let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4357            let n_layers = ty.unwrap_list_n_layers();
4358            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4359        }
4360        ty => sql_bail!(
4361            "cannot subscript type {}",
4362            ecx.humanize_sql_scalar_type(ty, false)
4363        ),
4364    }
4365}
4366
4367// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4368// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4369// any were slices (i.e. included colon).
4370fn extract_scalar_subscript_from_positions<'a>(
4371    positions: &'a [SubscriptPosition<Aug>],
4372    expr_type_name: &str,
4373) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4374    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4375    for p in positions {
4376        if p.explicit_slice {
4377            sql_bail!("{} subscript does not support slices", expr_type_name);
4378        }
4379        assert!(
4380            p.end.is_none(),
4381            "index-appearing subscripts cannot have end value"
4382        );
4383        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4384    }
4385    Ok(scalar_subscripts)
4386}
4387
4388fn plan_subscript_array(
4389    ecx: &ExprContext,
4390    expr: HirScalarExpr,
4391    positions: &[SubscriptPosition<Aug>],
4392    offset: i64,
4393) -> Result<CoercibleScalarExpr, PlanError> {
4394    let mut exprs = Vec::with_capacity(positions.len() + 1);
4395    exprs.push(expr);
4396
4397    // Subscripting arrays doesn't yet support slicing, so we always want to
4398    // extract scalars or error.
4399    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4400
4401    for i in indexes {
4402        exprs.push(plan_expr(ecx, i)?.cast_to(
4403            ecx,
4404            CastContext::Explicit,
4405            &SqlScalarType::Int64,
4406        )?);
4407    }
4408
4409    Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4410}
4411
4412fn plan_subscript_list(
4413    ecx: &ExprContext,
4414    mut expr: HirScalarExpr,
4415    positions: &[SubscriptPosition<Aug>],
4416    mut remaining_layers: usize,
4417    elem_type_name: &str,
4418) -> Result<CoercibleScalarExpr, PlanError> {
4419    let mut i = 0;
4420
4421    while i < positions.len() {
4422        // Take all contiguous index operations, i.e. find next slice operation.
4423        let j = positions[i..]
4424            .iter()
4425            .position(|p| p.explicit_slice)
4426            .unwrap_or(positions.len() - i);
4427        if j != 0 {
4428            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4429            let (n, e) = plan_index_list(
4430                ecx,
4431                expr,
4432                indexes.as_slice(),
4433                remaining_layers,
4434                elem_type_name,
4435            )?;
4436            remaining_layers = n;
4437            expr = e;
4438            i += j;
4439        }
4440
4441        // Take all contiguous slice operations, i.e. find next index operation.
4442        let j = positions[i..]
4443            .iter()
4444            .position(|p| !p.explicit_slice)
4445            .unwrap_or(positions.len() - i);
4446        if j != 0 {
4447            expr = plan_slice_list(
4448                ecx,
4449                expr,
4450                &positions[i..i + j],
4451                remaining_layers,
4452                elem_type_name,
4453            )?;
4454            i += j;
4455        }
4456    }
4457
4458    Ok(expr.into())
4459}
4460
4461fn plan_index_list(
4462    ecx: &ExprContext,
4463    expr: HirScalarExpr,
4464    indexes: &[&Expr<Aug>],
4465    n_layers: usize,
4466    elem_type_name: &str,
4467) -> Result<(usize, HirScalarExpr), PlanError> {
4468    let depth = indexes.len();
4469
4470    if depth > n_layers {
4471        if n_layers == 0 {
4472            sql_bail!("cannot subscript type {}", elem_type_name)
4473        } else {
4474            sql_bail!(
4475                "cannot index into {} layers; list only has {} layer{}",
4476                depth,
4477                n_layers,
4478                if n_layers == 1 { "" } else { "s" }
4479            )
4480        }
4481    }
4482
4483    let mut exprs = Vec::with_capacity(depth + 1);
4484    exprs.push(expr);
4485
4486    for i in indexes {
4487        exprs.push(plan_expr(ecx, i)?.cast_to(
4488            ecx,
4489            CastContext::Explicit,
4490            &SqlScalarType::Int64,
4491        )?);
4492    }
4493
4494    Ok((
4495        n_layers - depth,
4496        HirScalarExpr::call_variadic(ListIndex, exprs),
4497    ))
4498}
4499
4500fn plan_slice_list(
4501    ecx: &ExprContext,
4502    expr: HirScalarExpr,
4503    slices: &[SubscriptPosition<Aug>],
4504    n_layers: usize,
4505    elem_type_name: &str,
4506) -> Result<HirScalarExpr, PlanError> {
4507    if n_layers == 0 {
4508        sql_bail!("cannot subscript type {}", elem_type_name)
4509    }
4510
4511    // first arg will be list
4512    let mut exprs = Vec::with_capacity(slices.len() + 1);
4513    exprs.push(expr);
4514    // extract (start, end) parts from collected slices
4515    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4516        Ok(match position {
4517            Some(p) => {
4518                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4519            }
4520            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4521        })
4522    };
4523    for p in slices {
4524        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4525        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4526        exprs.push(start);
4527        exprs.push(end);
4528    }
4529
4530    Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4531}
4532
4533fn plan_like(
4534    ecx: &ExprContext,
4535    expr: &Expr<Aug>,
4536    pattern: &Expr<Aug>,
4537    escape: Option<&Expr<Aug>>,
4538    case_insensitive: bool,
4539    not: bool,
4540) -> Result<HirScalarExpr, PlanError> {
4541    use CastContext::Implicit;
4542    let ecx = ecx.with_name("LIKE argument");
4543    let expr = plan_expr(&ecx, expr)?;
4544    let haystack = match ecx.scalar_type(&expr) {
4545        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4546            .type_as(&ecx, ty)?
4547            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4548        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4549    };
4550    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4551    if let Some(escape) = escape {
4552        pattern = pattern.call_binary(
4553            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4554            expr_func::LikeEscape,
4555        );
4556    }
4557    let func: BinaryFunc = if case_insensitive {
4558        expr_func::IsLikeMatchCaseInsensitive.into()
4559    } else {
4560        expr_func::IsLikeMatchCaseSensitive.into()
4561    };
4562    let like = haystack.call_binary(pattern, func);
4563    if not {
4564        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4565    } else {
4566        Ok(like)
4567    }
4568}
4569
4570fn plan_subscript_jsonb(
4571    ecx: &ExprContext,
4572    expr: HirScalarExpr,
4573    positions: &[SubscriptPosition<Aug>],
4574) -> Result<CoercibleScalarExpr, PlanError> {
4575    use CastContext::Implicit;
4576    use SqlScalarType::{Int64, String};
4577
4578    // JSONB doesn't support the slicing syntax, so simply error if you
4579    // encounter any explicit slices.
4580    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4581
4582    let mut exprs = Vec::with_capacity(subscripts.len());
4583    for s in subscripts {
4584        let subscript = plan_expr(ecx, s)?;
4585        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4586            subscript
4587        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4588            // Integers are converted to a string here and then re-parsed as an
4589            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4590            // do it.
4591            typeconv::to_string(ecx, subscript)?
4592        } else {
4593            sql_bail!("jsonb subscript type must be coercible to integer or text");
4594        };
4595        exprs.push(subscript);
4596    }
4597
4598    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4599    // `expr->subscript` as you might expect.
4600    let expr = expr.call_binary(
4601        HirScalarExpr::call_variadic(
4602            ArrayCreate {
4603                elem_type: SqlScalarType::String,
4604            },
4605            exprs,
4606        ),
4607        expr_func::JsonbGetPath,
4608    );
4609    Ok(expr.into())
4610}
4611
4612fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4613    if !ecx.allow_subqueries {
4614        sql_bail!("{} does not allow subqueries", ecx.name)
4615    }
4616    let mut qcx = ecx.derived_query_context();
4617    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4618    Ok(expr.exists().into())
4619}
4620
4621fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4622    if !ecx.allow_subqueries {
4623        sql_bail!("{} does not allow subqueries", ecx.name)
4624    }
4625    let mut qcx = ecx.derived_query_context();
4626    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4627    let column_types = qcx.relation_type(&expr).column_types;
4628    if column_types.len() != 1 {
4629        sql_bail!(
4630            "Expected subselect to return 1 column, got {} columns",
4631            column_types.len()
4632        );
4633    }
4634    Ok(expr.select().into())
4635}
4636
4637fn plan_list_subquery(
4638    ecx: &ExprContext,
4639    query: &Query<Aug>,
4640) -> Result<CoercibleScalarExpr, PlanError> {
4641    plan_vector_like_subquery(
4642        ecx,
4643        query,
4644        |_| false,
4645        |elem_type| ListCreate { elem_type }.into(),
4646        |order_by| AggregateFunc::ListConcat { order_by },
4647        expr_func::ListListConcat.into(),
4648        |elem_type| {
4649            HirScalarExpr::literal(
4650                Datum::empty_list(),
4651                SqlScalarType::List {
4652                    element_type: Box::new(elem_type),
4653                    custom_id: None,
4654                },
4655            )
4656        },
4657        "list",
4658    )
4659}
4660
4661fn plan_array_subquery(
4662    ecx: &ExprContext,
4663    query: &Query<Aug>,
4664) -> Result<CoercibleScalarExpr, PlanError> {
4665    plan_vector_like_subquery(
4666        ecx,
4667        query,
4668        |elem_type| {
4669            matches!(
4670                elem_type,
4671                SqlScalarType::Char { .. }
4672                    | SqlScalarType::Array { .. }
4673                    | SqlScalarType::List { .. }
4674                    | SqlScalarType::Map { .. }
4675            )
4676        },
4677        |elem_type| ArrayCreate { elem_type }.into(),
4678        |order_by| AggregateFunc::ArrayConcat { order_by },
4679        expr_func::ArrayArrayConcat.into(),
4680        |elem_type| {
4681            HirScalarExpr::literal(
4682                Datum::empty_array(),
4683                SqlScalarType::Array(Box::new(elem_type)),
4684            )
4685        },
4686        "[]",
4687    )
4688}
4689
4690/// Generic function used to plan both array subqueries and list subqueries
4691fn plan_vector_like_subquery<F1, F2, F3, F4>(
4692    ecx: &ExprContext,
4693    query: &Query<Aug>,
4694    is_unsupported_type: F1,
4695    vector_create: F2,
4696    aggregate_concat: F3,
4697    binary_concat: BinaryFunc,
4698    empty_literal: F4,
4699    vector_type_string: &str,
4700) -> Result<CoercibleScalarExpr, PlanError>
4701where
4702    F1: Fn(&SqlScalarType) -> bool,
4703    F2: Fn(SqlScalarType) -> VariadicFunc,
4704    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4705    F4: Fn(SqlScalarType) -> HirScalarExpr,
4706{
4707    if !ecx.allow_subqueries {
4708        sql_bail!("{} does not allow subqueries", ecx.name)
4709    }
4710
4711    let mut qcx = ecx.derived_query_context();
4712    let mut planned_query = plan_query(&mut qcx, query)?;
4713    if planned_query.limit.is_some()
4714        || !planned_query
4715            .offset
4716            .clone()
4717            .try_into_literal_int64()
4718            .is_ok_and(|offset| offset == 0)
4719    {
4720        planned_query.expr = HirRelationExpr::top_k(
4721            planned_query.expr,
4722            vec![],
4723            planned_query.order_by.clone(),
4724            planned_query.limit,
4725            planned_query.offset,
4726            planned_query.group_size_hints.limit_input_group_size,
4727        );
4728    }
4729
4730    if planned_query.project.len() != 1 {
4731        sql_bail!(
4732            "Expected subselect to return 1 column, got {} columns",
4733            planned_query.project.len()
4734        );
4735    }
4736
4737    let project_column = *planned_query.project.get(0).unwrap();
4738    let elem_type = qcx
4739        .relation_type(&planned_query.expr)
4740        .column_types
4741        .get(project_column)
4742        .cloned()
4743        .unwrap()
4744        .scalar_type();
4745
4746    if is_unsupported_type(&elem_type) {
4747        bail_unsupported!(format!(
4748            "cannot build array from subquery because return type {}{}",
4749            ecx.humanize_sql_scalar_type(&elem_type, false),
4750            vector_type_string
4751        ));
4752    }
4753
4754    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4755    // subquery above.
4756    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4757        vector_create(elem_type.clone()),
4758        vec![HirScalarExpr::column(project_column)],
4759    ))
4760    .chain(
4761        planned_query
4762            .order_by
4763            .iter()
4764            .map(|co| HirScalarExpr::column(co.column)),
4765    )
4766    .collect();
4767
4768    // However, column references for `aggregation_projection` and `aggregation_order_by`
4769    // are with reference to the `exprs` of the aggregation expression.  Here that is
4770    // `aggregation_exprs`.
4771    let aggregation_projection = vec![0];
4772    let aggregation_order_by = planned_query
4773        .order_by
4774        .into_iter()
4775        .enumerate()
4776        .map(|(i, order)| ColumnOrder { column: i, ..order })
4777        .collect();
4778
4779    let reduced_expr = planned_query
4780        .expr
4781        .reduce(
4782            vec![],
4783            vec![AggregateExpr {
4784                func: aggregate_concat(aggregation_order_by),
4785                expr: Box::new(HirScalarExpr::call_variadic(
4786                    RecordCreate {
4787                        field_names: iter::repeat(ColumnName::from(""))
4788                            .take(aggregation_exprs.len())
4789                            .collect(),
4790                    },
4791                    aggregation_exprs,
4792                )),
4793                distinct: false,
4794            }],
4795            None,
4796        )
4797        .project(aggregation_projection);
4798
4799    // If `expr` has no rows, return an empty array/list rather than NULL.
4800    Ok(reduced_expr
4801        .select()
4802        .call_binary(empty_literal(elem_type), binary_concat)
4803        .into())
4804}
4805
4806fn plan_map_subquery(
4807    ecx: &ExprContext,
4808    query: &Query<Aug>,
4809) -> Result<CoercibleScalarExpr, PlanError> {
4810    if !ecx.allow_subqueries {
4811        sql_bail!("{} does not allow subqueries", ecx.name)
4812    }
4813
4814    let mut qcx = ecx.derived_query_context();
4815    let mut query = plan_query(&mut qcx, query)?;
4816    if query.limit.is_some()
4817        || !query
4818            .offset
4819            .clone()
4820            .try_into_literal_int64()
4821            .is_ok_and(|offset| offset == 0)
4822    {
4823        query.expr = HirRelationExpr::top_k(
4824            query.expr,
4825            vec![],
4826            query.order_by.clone(),
4827            query.limit,
4828            query.offset,
4829            query.group_size_hints.limit_input_group_size,
4830        );
4831    }
4832    if query.project.len() != 2 {
4833        sql_bail!(
4834            "expected map subquery to return 2 columns, got {} columns",
4835            query.project.len()
4836        );
4837    }
4838
4839    let query_types = qcx.relation_type(&query.expr).column_types;
4840    let key_column = query.project[0];
4841    let key_type = query_types[key_column].clone().scalar_type();
4842    let value_column = query.project[1];
4843    let value_type = query_types[value_column].clone().scalar_type();
4844
4845    if key_type != SqlScalarType::String {
4846        sql_bail!("cannot build map from subquery because first column is not of type text");
4847    }
4848
4849    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4850        RecordCreate {
4851            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4852        },
4853        vec![
4854            HirScalarExpr::column(key_column),
4855            HirScalarExpr::column(value_column),
4856        ],
4857    ))
4858    .chain(
4859        query
4860            .order_by
4861            .iter()
4862            .map(|co| HirScalarExpr::column(co.column)),
4863    )
4864    .collect();
4865
4866    let expr = query
4867        .expr
4868        .reduce(
4869            vec![],
4870            vec![AggregateExpr {
4871                func: AggregateFunc::MapAgg {
4872                    order_by: query
4873                        .order_by
4874                        .into_iter()
4875                        .enumerate()
4876                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4877                        .collect(),
4878                    value_type: value_type.clone(),
4879                },
4880                expr: Box::new(HirScalarExpr::call_variadic(
4881                    RecordCreate {
4882                        field_names: iter::repeat(ColumnName::from(""))
4883                            .take(aggregation_exprs.len())
4884                            .collect(),
4885                    },
4886                    aggregation_exprs,
4887                )),
4888                distinct: false,
4889            }],
4890            None,
4891        )
4892        .project(vec![0]);
4893
4894    // If `expr` has no rows, return an empty map rather than NULL.
4895    let expr = HirScalarExpr::call_variadic(
4896        Coalesce,
4897        vec![
4898            expr.select(),
4899            HirScalarExpr::literal(
4900                Datum::empty_map(),
4901                SqlScalarType::Map {
4902                    value_type: Box::new(value_type),
4903                    custom_id: None,
4904                },
4905            ),
4906        ],
4907    );
4908
4909    Ok(expr.into())
4910}
4911
4912fn plan_collate(
4913    ecx: &ExprContext,
4914    expr: &Expr<Aug>,
4915    collation: &UnresolvedItemName,
4916) -> Result<CoercibleScalarExpr, PlanError> {
4917    if collation.0.len() == 2
4918        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4919        && collation.0[1] == ident!("default")
4920    {
4921        plan_expr(ecx, expr)
4922    } else {
4923        bail_unsupported!("COLLATE");
4924    }
4925}
4926
4927/// Plans a slice of expressions.
4928///
4929/// This function is a simple convenience function for mapping [`plan_expr`]
4930/// over a slice of expressions. The planned expressions are returned in the
4931/// same order as the input. If any of the expressions fail to plan, returns an
4932/// error instead.
4933fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4934where
4935    E: std::borrow::Borrow<Expr<Aug>>,
4936{
4937    let mut out = vec![];
4938    for expr in exprs {
4939        out.push(plan_expr(ecx, expr.borrow())?);
4940    }
4941    Ok(out)
4942}
4943
4944/// Plans an `ARRAY` expression.
4945fn plan_array(
4946    ecx: &ExprContext,
4947    exprs: &[Expr<Aug>],
4948    type_hint: Option<&SqlScalarType>,
4949) -> Result<CoercibleScalarExpr, PlanError> {
4950    // Plan each element expression.
4951    let mut out = vec![];
4952    for expr in exprs {
4953        out.push(match expr {
4954            // Special case nested ARRAY expressions so we can plumb
4955            // the type hint through.
4956            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4957            _ => plan_expr(ecx, expr)?,
4958        });
4959    }
4960
4961    // Attempt to make use of the type hint.
4962    let type_hint = match type_hint {
4963        // The user has provided an explicit cast to an array type. We know the
4964        // element type to coerce to. Need to be careful, though: if there's
4965        // evidence that any of the array elements are themselves arrays, we
4966        // want to coerce to the array type, not the element type.
4967        Some(SqlScalarType::Array(elem_type)) => {
4968            let multidimensional = out.iter().any(|e| {
4969                matches!(
4970                    ecx.scalar_type(e),
4971                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4972                )
4973            });
4974            if multidimensional {
4975                type_hint
4976            } else {
4977                Some(&**elem_type)
4978            }
4979        }
4980        // The user provided an explicit cast to a non-array type. We'll have to
4981        // guess what the correct type for the array. Our caller will then
4982        // handle converting that array type to the desired non-array type.
4983        Some(_) => None,
4984        // No type hint. We'll have to guess the correct type for the array.
4985        None => None,
4986    };
4987
4988    // Coerce all elements to the same type.
4989    let (elem_type, exprs) = if exprs.is_empty() {
4990        if let Some(elem_type) = type_hint {
4991            (elem_type.clone(), vec![])
4992        } else {
4993            sql_bail!("cannot determine type of empty array");
4994        }
4995    } else {
4996        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4997        (ecx.scalar_type(&out[0]), out)
4998    };
4999
5000    // Arrays of `char` type are disallowed due to a known limitation:
5001    // https://github.com/MaterializeInc/database-issues/issues/2360.
5002    //
5003    // Arrays of `list` and `map` types are disallowed due to mind-bending
5004    // semantics.
5005    if matches!(
5006        elem_type,
5007        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
5008    ) {
5009        bail_unsupported!(format!(
5010            "{}[]",
5011            ecx.humanize_sql_scalar_type(&elem_type, false)
5012        ));
5013    }
5014
5015    Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
5016}
5017
5018fn plan_list(
5019    ecx: &ExprContext,
5020    exprs: &[Expr<Aug>],
5021    type_hint: Option<&SqlScalarType>,
5022) -> Result<CoercibleScalarExpr, PlanError> {
5023    let (elem_type, exprs) = if exprs.is_empty() {
5024        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
5025            (element_type.without_modifiers(), vec![])
5026        } else {
5027            sql_bail!("cannot determine type of empty list");
5028        }
5029    } else {
5030        let type_hint = match type_hint {
5031            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5032            _ => None,
5033        };
5034
5035        let mut out = vec![];
5036        for expr in exprs {
5037            out.push(match expr {
5038                // Special case nested LIST expressions so we can plumb
5039                // the type hint through.
5040                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5041                _ => plan_expr(ecx, expr)?,
5042            });
5043        }
5044        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5045        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5046    };
5047
5048    if matches!(elem_type, SqlScalarType::Char { .. }) {
5049        bail_unsupported!("char list");
5050    }
5051
5052    Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5053}
5054
5055fn plan_map(
5056    ecx: &ExprContext,
5057    entries: &[MapEntry<Aug>],
5058    type_hint: Option<&SqlScalarType>,
5059) -> Result<CoercibleScalarExpr, PlanError> {
5060    let (value_type, exprs) = if entries.is_empty() {
5061        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5062            (value_type.without_modifiers(), vec![])
5063        } else {
5064            sql_bail!("cannot determine type of empty map");
5065        }
5066    } else {
5067        let type_hint = match type_hint {
5068            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5069            _ => None,
5070        };
5071
5072        let mut keys = vec![];
5073        let mut values = vec![];
5074        for MapEntry { key, value } in entries {
5075            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5076            let value = match value {
5077                // Special case nested MAP expressions so we can plumb
5078                // the type hint through.
5079                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5080                _ => plan_expr(ecx, value)?,
5081            };
5082            keys.push(key);
5083            values.push(value);
5084        }
5085        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5086        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5087        let out = itertools::interleave(keys, values).collect();
5088        (value_type, out)
5089    };
5090
5091    if matches!(value_type, SqlScalarType::Char { .. }) {
5092        bail_unsupported!("char map");
5093    }
5094
5095    let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5096    Ok(expr.into())
5097}
5098
5099/// Coerces a list of expressions such that all input expressions will be cast
5100/// to the same type. If successful, returns a new list of expressions in the
5101/// same order as the input, where each expression has the appropriate casts to
5102/// make them all of a uniform type.
5103///
5104/// If `force_type` is `Some`, the expressions are forced to the specified type
5105/// via an explicit cast. Otherwise the best common type is guessed via
5106/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5107/// implicit casts
5108///
5109/// Note that this is our implementation of Postgres' type conversion for
5110/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5111/// isn't yet used in all of those cases.
5112///
5113/// [union-type-conv]:
5114/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5115pub fn coerce_homogeneous_exprs(
5116    ecx: &ExprContext,
5117    exprs: Vec<CoercibleScalarExpr>,
5118    force_type: Option<&SqlScalarType>,
5119) -> Result<Vec<HirScalarExpr>, PlanError> {
5120    assert!(!exprs.is_empty());
5121
5122    let target_holder;
5123    let target = match force_type {
5124        Some(t) => t,
5125        None => {
5126            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5127            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5128            &target_holder
5129        }
5130    };
5131
5132    // Try to cast all expressions to `target`.
5133    let mut out = Vec::new();
5134    for expr in exprs {
5135        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5136        let ccx = match force_type {
5137            None => CastContext::Implicit,
5138            Some(_) => CastContext::Explicit,
5139        };
5140        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5141            Ok(expr) => out.push(expr),
5142            Err(_) => sql_bail!(
5143                "{} could not convert type {} to {}",
5144                ecx.name,
5145                ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5146                ecx.humanize_sql_scalar_type(target, false),
5147            ),
5148        }
5149    }
5150    Ok(out)
5151}
5152
5153/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5154/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5155pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5156    obe: &OrderByExpr<T>,
5157    column: usize,
5158) -> ColumnOrder {
5159    let desc = !obe.asc.unwrap_or(true);
5160    ColumnOrder {
5161        column,
5162        desc,
5163        // https://www.postgresql.org/docs/14/queries-order.html
5164        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5165        nulls_last: obe.nulls_last.unwrap_or(!desc),
5166    }
5167}
5168
5169/// Plans the ORDER BY clause of a window function.
5170///
5171/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5172/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5173/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5174/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5175/// `Vec<HirScalarExpr>` that we return.
5176fn plan_function_order_by(
5177    ecx: &ExprContext,
5178    order_by: &[OrderByExpr<Aug>],
5179) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5180    let mut order_by_exprs = vec![];
5181    let mut col_orders = vec![];
5182    {
5183        for (i, obe) in order_by.iter().enumerate() {
5184            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5185            // do not support ordinal references in PostgreSQL. So we use
5186            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5187            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5188            order_by_exprs.push(expr);
5189            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5190        }
5191    }
5192    Ok((order_by_exprs, col_orders))
5193}
5194
5195/// Returns a human-readable rendering of `name`, falling back to a debug
5196/// dump of the `ResolvedItemName` if humanization fails. Used to construct
5197/// user-facing error messages on already-failing paths, where we'd rather
5198/// surface the raw resolved name than emit a useless `<unknown>` placeholder.
5199fn humanize_or_debug(scx: &StatementContext, name: &ResolvedItemName) -> String {
5200    scx.humanize_resolved_name(name)
5201        .map(|n| n.to_string())
5202        .unwrap_or_else(|_| format!("<error when trying to humanize `{name:?}`>"))
5203}
5204
5205/// Common part of the planning of windowed and non-windowed aggregation functions.
5206fn plan_aggregate_common(
5207    ecx: &ExprContext,
5208    Function::<Aug> {
5209        name,
5210        args,
5211        filter,
5212        over: _,
5213        distinct,
5214    }: &Function<Aug>,
5215) -> Result<AggregateExpr, PlanError> {
5216    // Normal aggregate functions, like `sum`, expect as input a single expression
5217    // which yields the datum to aggregate. Order sensitive aggregate functions,
5218    // like `jsonb_agg`, are special, and instead expect a Record whose first
5219    // element yields the datum to aggregate and whose successive elements yield
5220    // keys to order by. This expectation is hard coded within the implementation
5221    // of each of the order-sensitive aggregates. The specification of how many
5222    // order by keys to consider, and in what order, is passed via the `order_by`
5223    // field on the `AggregateFunc` variant.
5224
5225    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5226    // most, so explicitly drop it if the function doesn't care about order. This
5227    // prevents the projection into Record below from triggering on unsupported
5228    // functions.
5229
5230    let impls = match resolve_func(ecx, name, args)? {
5231        Func::Aggregate(impls) => impls,
5232        _ => bail_internal!("plan_aggregate_common called on non-aggregate function"),
5233    };
5234
5235    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5236    // generalized function selection framework. The rule is simple: the user
5237    // must type `count(*)`, but the function selection framework sees an empty
5238    // parameter list, as if the user had typed `count()`. But if the user types
5239    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5240    // rules to all aggregates, not just `count`, since we may one day support
5241    // user-defined aggregates, including user-defined aggregates that take no
5242    // parameters.
5243    let (args, order_by) = match &args {
5244        FunctionArgs::Star => (vec![], vec![]),
5245        FunctionArgs::Args { args, order_by } => {
5246            if args.is_empty() {
5247                sql_bail!(
5248                    "{}(*) must be used to call a parameterless aggregate function",
5249                    humanize_or_debug(ecx.qcx.scx, name)
5250                );
5251            }
5252            let args = plan_exprs(ecx, args)?;
5253            (args, order_by.clone())
5254        }
5255    };
5256
5257    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5258
5259    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5260    if let Some(filter) = &filter {
5261        // If a filter is present, as in
5262        //
5263        //     <agg>(<expr>) FILTER (WHERE <cond>)
5264        //
5265        // we plan it by essentially rewriting the expression to
5266        //
5267        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5268        //
5269        // where <identity> is the identity input for <agg>.
5270        let cond =
5271            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5272        let expr_typ = ecx.scalar_type(&expr);
5273        expr = HirScalarExpr::if_then_else(
5274            cond,
5275            expr,
5276            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5277        );
5278    }
5279
5280    let mut seen_outer = false;
5281    let mut seen_inner = false;
5282    #[allow(deprecated)]
5283    expr.visit_columns(0, &mut |depth, col| {
5284        if depth == 0 && col.level == 0 {
5285            seen_inner = true;
5286        } else if col.level > depth {
5287            seen_outer = true;
5288        }
5289    });
5290    if seen_outer && !seen_inner {
5291        bail_unsupported!(
5292            3720,
5293            "aggregate functions that refer exclusively to outer columns"
5294        );
5295    }
5296
5297    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5298    // map the needed expressions into the aggregate datum.
5299    if func.is_order_sensitive() {
5300        let field_names = iter::repeat(ColumnName::from(""))
5301            .take(1 + order_by_exprs.len())
5302            .collect();
5303        let mut exprs = vec![expr];
5304        exprs.extend(order_by_exprs);
5305        expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5306    }
5307
5308    Ok(AggregateExpr {
5309        func,
5310        expr: Box::new(expr),
5311        distinct: *distinct,
5312    })
5313}
5314
5315fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5316    let mut names = names.to_vec();
5317    // The parser guarantees that an `Expr::Identifier` is constructed with a
5318    // non-empty list of name parts, so an empty list here is an internal bug.
5319    let Some(last) = names.pop() else {
5320        bail_internal!("empty identifier");
5321    };
5322    let col_name = normalize::column_name(last);
5323
5324    // If the name is qualified, it must refer to a column in a table.
5325    if !names.is_empty() {
5326        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5327        let (i, i_name) = ecx.scope.resolve_table_column(
5328            &ecx.qcx.outer_scopes,
5329            &table_name,
5330            &col_name,
5331            &mut ecx.qcx.name_manager.borrow_mut(),
5332        )?;
5333        return Ok(HirScalarExpr::named_column(i, i_name));
5334    }
5335
5336    // If the name is unqualified, first check if it refers to a column. Track any similar names
5337    // that might exist for a better error message.
5338    let similar_names = match ecx.scope.resolve_column(
5339        &ecx.qcx.outer_scopes,
5340        &col_name,
5341        &mut ecx.qcx.name_manager.borrow_mut(),
5342    ) {
5343        Ok((i, i_name)) => {
5344            return Ok(HirScalarExpr::named_column(i, i_name));
5345        }
5346        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5347        Err(e) => return Err(e),
5348    };
5349
5350    // The name doesn't refer to a column. Check if it is a whole-row reference
5351    // to a table.
5352    let items = ecx.scope.items_from_table(
5353        &ecx.qcx.outer_scopes,
5354        &PartialItemName {
5355            database: None,
5356            schema: None,
5357            item: col_name.as_str().to_owned(),
5358        },
5359    )?;
5360    match items.as_slice() {
5361        // The name doesn't refer to a table either. Return an error.
5362        [] => Err(PlanError::UnknownColumn {
5363            table: None,
5364            column: col_name,
5365            similar: similar_names,
5366        }),
5367        // The name refers to a table that is the result of a function that
5368        // returned a single column. Per PostgreSQL, this is a special case
5369        // that returns the value directly.
5370        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5371        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5372            *column,
5373            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5374        )),
5375        // The name refers to a normal table. Return a record containing all the
5376        // columns of the table.
5377        _ => {
5378            let mut has_exists_column = None;
5379            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5380                .into_iter()
5381                .filter_map(|(column, item)| {
5382                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5383                        has_exists_column = Some(column);
5384                        None
5385                    } else {
5386                        let expr = HirScalarExpr::named_column(
5387                            column,
5388                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5389                        );
5390                        let name = item.column_name.clone();
5391                        Some((expr, name))
5392                    }
5393                })
5394                .unzip();
5395            // For the special case of a table function with a single column, the single column is instead not wrapped.
5396            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5397                exprs.into_element()
5398            } else {
5399                HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5400            };
5401            if let Some(has_exists_column) = has_exists_column {
5402                Ok(HirScalarExpr::if_then_else(
5403                    HirScalarExpr::unnamed_column(has_exists_column)
5404                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5405                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5406                    expr,
5407                ))
5408            } else {
5409                Ok(expr)
5410            }
5411        }
5412    }
5413}
5414
5415fn plan_op(
5416    ecx: &ExprContext,
5417    op: &str,
5418    expr1: &Expr<Aug>,
5419    expr2: Option<&Expr<Aug>>,
5420) -> Result<HirScalarExpr, PlanError> {
5421    let impls = func::resolve_op(op)?;
5422    let args = match expr2 {
5423        None => plan_exprs(ecx, &[expr1])?,
5424        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5425    };
5426    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5427}
5428
5429fn plan_function<'a>(
5430    ecx: &ExprContext,
5431    f @ Function {
5432        name,
5433        args,
5434        filter,
5435        over,
5436        distinct,
5437    }: &'a Function<Aug>,
5438) -> Result<HirScalarExpr, PlanError> {
5439    let impls = match resolve_func(ecx, name, args)? {
5440        Func::Table(_) => {
5441            sql_bail!(
5442                "table functions are not allowed in {} (function {})",
5443                ecx.name,
5444                name
5445            );
5446        }
5447        Func::Scalar(impls) => {
5448            if over.is_some() {
5449                sql_bail!(
5450                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5451                );
5452            }
5453            impls
5454        }
5455        Func::ScalarWindow(impls) => {
5456            let (
5457                ignore_nulls,
5458                order_by_exprs,
5459                col_orders,
5460                _window_frame,
5461                partition_by,
5462                scalar_args,
5463            ) = plan_window_function_non_aggr(ecx, f)?;
5464
5465            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5466            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5467            // msg there is less informative.)
5468            if !scalar_args.is_empty() {
5469                if let ResolvedItemName::Item {
5470                    full_name: FullItemName { item, .. },
5471                    ..
5472                } = name
5473                {
5474                    sql_bail!(
5475                        "function {} has 0 parameters, but was called with {}",
5476                        item,
5477                        scalar_args.len()
5478                    );
5479                }
5480            }
5481
5482            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5483            // accept a window frame here without an error msg. (Postgres also does this.)
5484            // TODO: maybe we should give a notice
5485
5486            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5487
5488            if ignore_nulls {
5489                // If we ever add a scalar window function that supports ignore, then don't forget
5490                // to also update HIR EXPLAIN.
5491                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5492            }
5493
5494            return Ok(HirScalarExpr::windowing(WindowExpr {
5495                func: WindowExprType::Scalar(ScalarWindowExpr {
5496                    func,
5497                    order_by: col_orders,
5498                }),
5499                partition_by,
5500                order_by: order_by_exprs,
5501            }));
5502        }
5503        Func::ValueWindow(impls) => {
5504            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5505            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5506                window_plan;
5507
5508            let (args_encoded, func) =
5509                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5510
5511            if ignore_nulls {
5512                match func {
5513                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5514                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5515                }
5516            }
5517
5518            return Ok(HirScalarExpr::windowing(WindowExpr {
5519                func: WindowExprType::Value(ValueWindowExpr {
5520                    func,
5521                    args: Box::new(args_encoded),
5522                    order_by: col_orders,
5523                    window_frame,
5524                    ignore_nulls, // (RESPECT NULLS is the default)
5525                }),
5526                partition_by,
5527                order_by: order_by_exprs,
5528            }));
5529        }
5530        Func::Aggregate(_) => {
5531            if f.over.is_none() {
5532                // Not a window aggregate. Something is wrong.
5533                if ecx.allow_aggregates {
5534                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5535                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5536                    sql_bail!(
5537                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5538                        name,
5539                    );
5540                } else {
5541                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5542                    // because it was in an unsupported context.
5543                    sql_bail!(
5544                        "aggregate functions are not allowed in {} (function {})",
5545                        ecx.name,
5546                        name
5547                    );
5548                }
5549            } else {
5550                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5551                    plan_window_function_common(ecx, &f.name, &f.over)?;
5552
5553                // https://github.com/MaterializeInc/database-issues/issues/6720
5554                match (&window_frame.start_bound, &window_frame.end_bound) {
5555                    (
5556                        mz_expr::WindowFrameBound::UnboundedPreceding,
5557                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5558                    )
5559                    | (
5560                        mz_expr::WindowFrameBound::UnboundedPreceding,
5561                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5562                    )
5563                    | (
5564                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5565                        mz_expr::WindowFrameBound::UnboundedFollowing,
5566                    )
5567                    | (
5568                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5569                        mz_expr::WindowFrameBound::UnboundedFollowing,
5570                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5571                    (_, _) => {} // other cases are ok
5572                }
5573
5574                if ignore_nulls {
5575                    // https://github.com/MaterializeInc/database-issues/issues/6722
5576                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5577                    // forget to also update HIR EXPLAIN.
5578                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5579                }
5580
5581                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5582
5583                if aggregate_expr.distinct {
5584                    // https://github.com/MaterializeInc/database-issues/issues/6626
5585                    bail_unsupported!("DISTINCT in window aggregates");
5586                }
5587
5588                return Ok(HirScalarExpr::windowing(WindowExpr {
5589                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5590                        aggregate_expr,
5591                        order_by: col_orders,
5592                        window_frame,
5593                    }),
5594                    partition_by,
5595                    order_by: order_by_exprs,
5596                }));
5597            }
5598        }
5599    };
5600
5601    if over.is_some() {
5602        bail_internal!("OVER clause should have been handled by the window function path above");
5603    }
5604
5605    if *distinct {
5606        sql_bail!(
5607            "DISTINCT specified, but {} is not an aggregate function",
5608            humanize_or_debug(ecx.qcx.scx, name)
5609        );
5610    }
5611    if filter.is_some() {
5612        sql_bail!(
5613            "FILTER specified, but {} is not an aggregate function",
5614            humanize_or_debug(ecx.qcx.scx, name)
5615        );
5616    }
5617
5618    let scalar_args = match &args {
5619        FunctionArgs::Star => {
5620            sql_bail!(
5621                "* argument is invalid with non-aggregate function {}",
5622                humanize_or_debug(ecx.qcx.scx, name)
5623            )
5624        }
5625        FunctionArgs::Args { args, order_by } => {
5626            if !order_by.is_empty() {
5627                sql_bail!(
5628                    "ORDER BY specified, but {} is not an aggregate function",
5629                    humanize_or_debug(ecx.qcx.scx, name)
5630                );
5631            }
5632            plan_exprs(ecx, args)?
5633        }
5634    };
5635
5636    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5637}
5638
5639pub const IGNORE_NULLS_ERROR_MSG: &str =
5640    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5641
5642/// Resolves the name to a set of function implementations.
5643///
5644/// If the name does not specify a known built-in function, returns an error.
5645pub fn resolve_func(
5646    ecx: &ExprContext,
5647    name: &ResolvedItemName,
5648    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5649) -> Result<&'static Func, PlanError> {
5650    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5651        if let Ok(f) = i.func() {
5652            return Ok(f);
5653        }
5654    }
5655
5656    // Couldn't resolve function with this name, so generate verbose error
5657    // message.
5658    let cexprs = match args {
5659        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5660        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5661            if !order_by.is_empty() {
5662                sql_bail!(
5663                    "ORDER BY specified, but {} is not an aggregate function",
5664                    name
5665                );
5666            }
5667            plan_exprs(ecx, args)?
5668        }
5669    };
5670
5671    let arg_types: Vec<_> = cexprs
5672        .into_iter()
5673        .map(|ty| match ecx.scalar_type(&ty) {
5674            CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5675            CoercibleScalarType::Record(_) => "record".to_string(),
5676            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5677        })
5678        .collect();
5679
5680    Err(PlanError::UnknownFunction {
5681        name: name.to_string(),
5682        arg_types,
5683    })
5684}
5685
5686fn plan_is_expr<'a>(
5687    ecx: &ExprContext,
5688    expr: &'a Expr<Aug>,
5689    construct: &IsExprConstruct<Aug>,
5690    not: bool,
5691) -> Result<HirScalarExpr, PlanError> {
5692    let expr_hir = plan_expr(ecx, expr)?;
5693
5694    let mut result = match construct {
5695        IsExprConstruct::Null => {
5696            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5697            // at odds with our type coercion rules, which treat `NULL` literals
5698            // and unconstrained parameters identically. Providing a type hint
5699            // of string means we wind up supporting both.
5700            expr_hir.type_as_any(ecx)?.call_is_null()
5701        }
5702        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5703        IsExprConstruct::True => expr_hir
5704            .type_as(ecx, &SqlScalarType::Bool)?
5705            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5706        IsExprConstruct::False => expr_hir
5707            .type_as(ecx, &SqlScalarType::Bool)?
5708            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5709        IsExprConstruct::DistinctFrom(expr2) => {
5710            // There are three cases:
5711            // 1. Both terms are non-null, in which case the result should be `a != b`.
5712            // 2. Exactly one term is null, in which case the result should be true.
5713            // 3. Both terms are null, in which case the result should be false.
5714            //
5715            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5716
5717            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5718            // construct an AST expression for `expr != expr2` and plan it to get proper type
5719            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5720            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5721            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5722
5723            let expr1_hir = expr_hir.type_as_any(ecx)?;
5724            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5725
5726            let term1 = HirScalarExpr::variadic_or(vec![
5727                ne_hir,
5728                expr1_hir.clone().call_is_null(),
5729                expr2_hir.clone().call_is_null(),
5730            ]);
5731            let term2 = HirScalarExpr::variadic_or(vec![
5732                expr1_hir.call_is_null().not(),
5733                expr2_hir.call_is_null().not(),
5734            ]);
5735            term1.and(term2)
5736        }
5737    };
5738    if not {
5739        result = result.not();
5740    }
5741    Ok(result)
5742}
5743
5744fn plan_case<'a>(
5745    ecx: &ExprContext,
5746    operand: &'a Option<Box<Expr<Aug>>>,
5747    conditions: &'a [Expr<Aug>],
5748    results: &'a [Expr<Aug>],
5749    else_result: &'a Option<Box<Expr<Aug>>>,
5750) -> Result<HirScalarExpr, PlanError> {
5751    let mut cond_exprs = Vec::new();
5752    let mut result_exprs = Vec::new();
5753    for (c, r) in conditions.iter().zip_eq(results) {
5754        let c = match operand {
5755            Some(operand) => operand.clone().equals(c.clone()),
5756            None => c.clone(),
5757        };
5758        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5759        cond_exprs.push(cexpr);
5760        result_exprs.push(r);
5761    }
5762    result_exprs.push(match else_result {
5763        Some(else_result) => else_result,
5764        None => &Expr::Value(Value::Null),
5765    });
5766    let mut result_exprs = coerce_homogeneous_exprs(
5767        &ecx.with_name("CASE"),
5768        plan_exprs(ecx, &result_exprs)?,
5769        None,
5770    )?;
5771    let mut expr = result_exprs.pop().unwrap();
5772    assert_eq!(cond_exprs.len(), result_exprs.len());
5773    for (cexpr, rexpr) in cond_exprs
5774        .into_iter()
5775        .rev()
5776        .zip_eq(result_exprs.into_iter().rev())
5777    {
5778        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5779    }
5780    Ok(expr)
5781}
5782
5783fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5784    let (datum, scalar_type) = match l {
5785        Value::Number(s) => {
5786            let d = strconv::parse_numeric(s.as_str())?;
5787            if !s.contains(&['E', '.'][..]) {
5788                // Maybe representable as an int?
5789                if let Ok(n) = d.0.try_into() {
5790                    (Datum::Int32(n), SqlScalarType::Int32)
5791                } else if let Ok(n) = d.0.try_into() {
5792                    (Datum::Int64(n), SqlScalarType::Int64)
5793                } else {
5794                    (
5795                        Datum::Numeric(d),
5796                        SqlScalarType::Numeric { max_scale: None },
5797                    )
5798                }
5799            } else {
5800                (
5801                    Datum::Numeric(d),
5802                    SqlScalarType::Numeric { max_scale: None },
5803                )
5804            }
5805        }
5806        Value::HexString(_) => bail_unsupported!("hex string literals"),
5807        Value::Boolean(b) => match b {
5808            false => (Datum::False, SqlScalarType::Bool),
5809            true => (Datum::True, SqlScalarType::Bool),
5810        },
5811        Value::Interval(i) => {
5812            let i = literal::plan_interval(i)?;
5813            (Datum::Interval(i), SqlScalarType::Interval)
5814        }
5815        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5816        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5817    };
5818    let expr = HirScalarExpr::literal(datum, scalar_type);
5819    Ok(expr.into())
5820}
5821
5822/// The common part of the planning of non-aggregate window functions, i.e.,
5823/// scalar window functions and value window functions.
5824fn plan_window_function_non_aggr<'a>(
5825    ecx: &ExprContext,
5826    Function {
5827        name,
5828        args,
5829        filter,
5830        over,
5831        distinct,
5832    }: &'a Function<Aug>,
5833) -> Result<
5834    (
5835        bool,
5836        Vec<HirScalarExpr>,
5837        Vec<ColumnOrder>,
5838        mz_expr::WindowFrame,
5839        Vec<HirScalarExpr>,
5840        Vec<CoercibleScalarExpr>,
5841    ),
5842    PlanError,
5843> {
5844    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5845        plan_window_function_common(ecx, name, over)?;
5846
5847    if *distinct {
5848        sql_bail!(
5849            "DISTINCT specified, but {} is not an aggregate function",
5850            name
5851        );
5852    }
5853
5854    if filter.is_some() {
5855        bail_unsupported!("FILTER in non-aggregate window functions");
5856    }
5857
5858    let scalar_args = match &args {
5859        FunctionArgs::Star => {
5860            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5861        }
5862        FunctionArgs::Args { args, order_by } => {
5863            if !order_by.is_empty() {
5864                sql_bail!(
5865                    "ORDER BY specified, but {} is not an aggregate function",
5866                    name
5867                );
5868            }
5869            plan_exprs(ecx, args)?
5870        }
5871    };
5872
5873    Ok((
5874        ignore_nulls,
5875        order_by_exprs,
5876        col_orders,
5877        window_frame,
5878        partition,
5879        scalar_args,
5880    ))
5881}
5882
5883/// The common part of the planning of all window functions.
5884fn plan_window_function_common(
5885    ecx: &ExprContext,
5886    name: &<Aug as AstInfo>::ItemName,
5887    over: &Option<WindowSpec<Aug>>,
5888) -> Result<
5889    (
5890        bool,
5891        Vec<HirScalarExpr>,
5892        Vec<ColumnOrder>,
5893        mz_expr::WindowFrame,
5894        Vec<HirScalarExpr>,
5895    ),
5896    PlanError,
5897> {
5898    if !ecx.allow_windows {
5899        sql_bail!(
5900            "window functions are not allowed in {} (function {})",
5901            ecx.name,
5902            name
5903        );
5904    }
5905
5906    let window_spec = match over.as_ref() {
5907        Some(over) => over,
5908        None => sql_bail!("window function {} requires an OVER clause", name),
5909    };
5910    if window_spec.ignore_nulls && window_spec.respect_nulls {
5911        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5912    }
5913    let window_frame = match window_spec.window_frame.as_ref() {
5914        Some(frame) => plan_window_frame(frame)?,
5915        None => mz_expr::WindowFrame::default(),
5916    };
5917    let mut partition = Vec::new();
5918    for expr in &window_spec.partition_by {
5919        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5920    }
5921
5922    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5923
5924    Ok((
5925        window_spec.ignore_nulls,
5926        order_by_exprs,
5927        col_orders,
5928        window_frame,
5929        partition,
5930    ))
5931}
5932
5933fn plan_window_frame(
5934    WindowFrame {
5935        units,
5936        start_bound,
5937        end_bound,
5938    }: &WindowFrame,
5939) -> Result<mz_expr::WindowFrame, PlanError> {
5940    use mz_expr::WindowFrameBound::*;
5941    let units = window_frame_unit_ast_to_expr(units)?;
5942    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5943    let end_bound = end_bound
5944        .as_ref()
5945        .map(window_frame_bound_ast_to_expr)
5946        .unwrap_or(CurrentRow);
5947
5948    // Validate bounds according to Postgres rules
5949    match (&start_bound, &end_bound) {
5950        // Start bound can't be UNBOUNDED FOLLOWING
5951        (UnboundedFollowing, _) => {
5952            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5953        }
5954        // End bound can't be UNBOUNDED PRECEDING
5955        (_, UnboundedPreceding) => {
5956            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5957        }
5958        // Start bound should come before end bound in the list of bound definitions
5959        (CurrentRow, OffsetPreceding(_)) => {
5960            sql_bail!("frame starting from current row cannot have preceding rows")
5961        }
5962        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5963            sql_bail!("frame starting from following row cannot have preceding rows")
5964        }
5965        // The above rules are adopted from Postgres.
5966        // The following rules are Materialize-specific.
5967        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5968            // Note that the only hard limit is that partition size + offset should fit in i64, so
5969            // in theory, we could support much larger offsets than this. But for our current
5970            // performance, even 1000000 is quite big.
5971            if *o1 > 1000000 || *o2 > 1000000 {
5972                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5973            }
5974        }
5975        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5976            if *o1 > 1000000 || *o2 > 1000000 {
5977                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5978            }
5979        }
5980        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5981            if *o1 > 1000000 || *o2 > 1000000 {
5982                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5983            }
5984        }
5985        (OffsetPreceding(o), CurrentRow) => {
5986            if *o > 1000000 {
5987                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5988            }
5989        }
5990        (CurrentRow, OffsetFollowing(o)) => {
5991            if *o > 1000000 {
5992                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5993            }
5994        }
5995        // Other bounds are valid
5996        (_, _) => (),
5997    }
5998
5999    // RANGE is only supported in the default frame
6000    // https://github.com/MaterializeInc/database-issues/issues/6585
6001    if units == mz_expr::WindowFrameUnits::Range
6002        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
6003    {
6004        bail_unsupported!("RANGE in non-default window frames")
6005    }
6006
6007    let frame = mz_expr::WindowFrame {
6008        units,
6009        start_bound,
6010        end_bound,
6011    };
6012    Ok(frame)
6013}
6014
6015fn window_frame_unit_ast_to_expr(
6016    unit: &WindowFrameUnits,
6017) -> Result<mz_expr::WindowFrameUnits, PlanError> {
6018    match unit {
6019        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
6020        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
6021        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
6022    }
6023}
6024
6025fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
6026    match bound {
6027        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
6028        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
6029        WindowFrameBound::Preceding(Some(offset)) => {
6030            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6031        }
6032        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6033        WindowFrameBound::Following(Some(offset)) => {
6034            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6035        }
6036    }
6037}
6038
6039pub fn scalar_type_from_sql(
6040    scx: &StatementContext,
6041    data_type: &ResolvedDataType,
6042) -> Result<SqlScalarType, PlanError> {
6043    match data_type {
6044        ResolvedDataType::AnonymousList(elem_type) => {
6045            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6046            if matches!(elem_type, SqlScalarType::Char { .. }) {
6047                bail_unsupported!("char list");
6048            }
6049            Ok(SqlScalarType::List {
6050                element_type: Box::new(elem_type),
6051                custom_id: None,
6052            })
6053        }
6054        ResolvedDataType::AnonymousMap {
6055            key_type,
6056            value_type,
6057        } => {
6058            match scalar_type_from_sql(scx, key_type)? {
6059                SqlScalarType::String => {}
6060                other => sql_bail!(
6061                    "map key type must be {}, got {}",
6062                    scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6063                    scx.humanize_sql_scalar_type(&other, false)
6064                ),
6065            }
6066            Ok(SqlScalarType::Map {
6067                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6068                custom_id: None,
6069            })
6070        }
6071        ResolvedDataType::Named { id, modifiers, .. } => {
6072            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6073        }
6074        ResolvedDataType::Error => bail_internal!("should have been caught in name resolution"),
6075    }
6076}
6077
6078pub fn scalar_type_from_catalog(
6079    catalog: &dyn SessionCatalog,
6080    id: CatalogItemId,
6081    modifiers: &[i64],
6082) -> Result<SqlScalarType, PlanError> {
6083    let entry = catalog.get_item(&id);
6084    let type_details = match entry.type_details() {
6085        Some(type_details) => type_details,
6086        None => {
6087            // Resolution should never produce a `ResolvedDataType::Named` with
6088            // an ID of a non-type, but we error gracefully just in case.
6089            sql_bail!(
6090                "internal error: {} does not refer to a type",
6091                catalog.resolve_full_name(entry.name()).to_string().quoted()
6092            );
6093        }
6094    };
6095    match &type_details.typ {
6096        CatalogType::Numeric => {
6097            let mut modifiers = modifiers.iter().fuse();
6098            let precision = match modifiers.next() {
6099                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6100                    sql_bail!(
6101                        "precision for type numeric must be between 1 and {}",
6102                        NUMERIC_DATUM_MAX_PRECISION,
6103                    );
6104                }
6105                Some(p) => Some(*p),
6106                None => None,
6107            };
6108            let scale = match modifiers.next() {
6109                Some(scale) => {
6110                    if let Some(precision) = precision {
6111                        if *scale > precision {
6112                            sql_bail!(
6113                                "scale for type numeric must be between 0 and precision {}",
6114                                precision
6115                            );
6116                        }
6117                    }
6118                    Some(NumericMaxScale::try_from(*scale)?)
6119                }
6120                None => None,
6121            };
6122            if modifiers.next().is_some() {
6123                sql_bail!("type numeric supports at most two type modifiers");
6124            }
6125            Ok(SqlScalarType::Numeric { max_scale: scale })
6126        }
6127        CatalogType::Char => {
6128            let mut modifiers = modifiers.iter().fuse();
6129            let length = match modifiers.next() {
6130                Some(l) => Some(CharLength::try_from(*l)?),
6131                None => Some(CharLength::ONE),
6132            };
6133            if modifiers.next().is_some() {
6134                sql_bail!("type character supports at most one type modifier");
6135            }
6136            Ok(SqlScalarType::Char { length })
6137        }
6138        CatalogType::VarChar => {
6139            let mut modifiers = modifiers.iter().fuse();
6140            let length = match modifiers.next() {
6141                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6142                None => None,
6143            };
6144            if modifiers.next().is_some() {
6145                sql_bail!("type character varying supports at most one type modifier");
6146            }
6147            Ok(SqlScalarType::VarChar { max_length: length })
6148        }
6149        CatalogType::Timestamp => {
6150            let mut modifiers = modifiers.iter().fuse();
6151            let precision = match modifiers.next() {
6152                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6153                None => None,
6154            };
6155            if modifiers.next().is_some() {
6156                sql_bail!("type timestamp supports at most one type modifier");
6157            }
6158            Ok(SqlScalarType::Timestamp { precision })
6159        }
6160        CatalogType::TimestampTz => {
6161            let mut modifiers = modifiers.iter().fuse();
6162            let precision = match modifiers.next() {
6163                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6164                None => None,
6165            };
6166            if modifiers.next().is_some() {
6167                sql_bail!("type timestamp with time zone supports at most one type modifier");
6168            }
6169            Ok(SqlScalarType::TimestampTz { precision })
6170        }
6171        t => {
6172            if !modifiers.is_empty() {
6173                sql_bail!(
6174                    "{} does not support type modifiers",
6175                    catalog.resolve_full_name(entry.name()).to_string()
6176                );
6177            }
6178            match t {
6179                CatalogType::Array {
6180                    element_reference: element_id,
6181                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6182                    catalog,
6183                    *element_id,
6184                    modifiers,
6185                )?))),
6186                CatalogType::List {
6187                    element_reference: element_id,
6188                    element_modifiers,
6189                } => Ok(SqlScalarType::List {
6190                    element_type: Box::new(scalar_type_from_catalog(
6191                        catalog,
6192                        *element_id,
6193                        element_modifiers,
6194                    )?),
6195                    custom_id: Some(id),
6196                }),
6197                CatalogType::Map {
6198                    key_reference: _,
6199                    key_modifiers: _,
6200                    value_reference: value_id,
6201                    value_modifiers,
6202                } => Ok(SqlScalarType::Map {
6203                    value_type: Box::new(scalar_type_from_catalog(
6204                        catalog,
6205                        *value_id,
6206                        value_modifiers,
6207                    )?),
6208                    custom_id: Some(id),
6209                }),
6210                CatalogType::Range {
6211                    element_reference: element_id,
6212                } => Ok(SqlScalarType::Range {
6213                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6214                }),
6215                CatalogType::Record { fields } => {
6216                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6217                        .iter()
6218                        .map(|f| {
6219                            let scalar_type = scalar_type_from_catalog(
6220                                catalog,
6221                                f.type_reference,
6222                                &f.type_modifiers,
6223                            )?;
6224                            Ok((
6225                                f.name.clone(),
6226                                SqlColumnType {
6227                                    scalar_type,
6228                                    nullable: true,
6229                                },
6230                            ))
6231                        })
6232                        .collect::<Result<Box<_>, PlanError>>()?;
6233                    Ok(SqlScalarType::Record {
6234                        fields: scalars,
6235                        custom_id: Some(id),
6236                    })
6237                }
6238                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6239                CatalogType::Bool => Ok(SqlScalarType::Bool),
6240                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6241                CatalogType::Date => Ok(SqlScalarType::Date),
6242                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6243                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6244                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6245                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6246                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6247                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6248                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6249                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6250                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6251                CatalogType::Interval => Ok(SqlScalarType::Interval),
6252                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6253                CatalogType::Oid => Ok(SqlScalarType::Oid),
6254                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6255                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6256                CatalogType::Pseudo => {
6257                    sql_bail!(
6258                        "cannot reference pseudo type {}",
6259                        catalog.resolve_full_name(entry.name()).to_string()
6260                    )
6261                }
6262                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6263                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6264                CatalogType::RegType => Ok(SqlScalarType::RegType),
6265                CatalogType::String => Ok(SqlScalarType::String),
6266                CatalogType::Time => Ok(SqlScalarType::Time),
6267                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6268                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6269                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6270                CatalogType::Numeric => unreachable!("handled above"),
6271                CatalogType::Char => unreachable!("handled above"),
6272                CatalogType::VarChar => unreachable!("handled above"),
6273                CatalogType::Timestamp => unreachable!("handled above"),
6274                CatalogType::TimestampTz => unreachable!("handled above"),
6275            }
6276        }
6277    }
6278}
6279
6280/// This is used to collect aggregates and table functions from within an `Expr`.
6281/// See the explanation of aggregate handling at the top of the file for more details.
6282struct AggregateTableFuncVisitor<'a> {
6283    scx: &'a StatementContext<'a>,
6284    aggs: Vec<Function<Aug>>,
6285    within_aggregate: bool,
6286    tables: BTreeMap<Function<Aug>, String>,
6287    table_disallowed_context: Vec<&'static str>,
6288    in_select_item: bool,
6289    id_gen: IdGen,
6290    err: Option<PlanError>,
6291}
6292
6293impl<'a> AggregateTableFuncVisitor<'a> {
6294    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6295        AggregateTableFuncVisitor {
6296            scx,
6297            aggs: Vec::new(),
6298            within_aggregate: false,
6299            tables: BTreeMap::new(),
6300            table_disallowed_context: Vec::new(),
6301            in_select_item: false,
6302            id_gen: Default::default(),
6303            err: None,
6304        }
6305    }
6306
6307    fn into_result(
6308        self,
6309    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6310        match self.err {
6311            Some(err) => Err(err),
6312            None => {
6313                // Dedup while preserving the order. We don't care what the order is, but it
6314                // has to be reproducible so that EXPLAIN PLAN tests work.
6315                let mut seen = BTreeSet::new();
6316                let aggs = self
6317                    .aggs
6318                    .into_iter()
6319                    .filter(move |agg| seen.insert(agg.clone()))
6320                    .collect();
6321                Ok((aggs, self.tables))
6322            }
6323        }
6324    }
6325}
6326
6327impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6328    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6329        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6330            Ok(i) => i,
6331            // Catching missing functions later in planning improves error messages.
6332            Err(_) => return,
6333        };
6334
6335        match item.func() {
6336            // We don't want to collect window aggregations, because these will be handled not by
6337            // plan_aggregate, but by plan_function.
6338            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6339                if self.within_aggregate {
6340                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6341                    return;
6342                }
6343                self.aggs.push(func.clone());
6344                let Function {
6345                    name: _,
6346                    args,
6347                    filter,
6348                    over: _,
6349                    distinct: _,
6350                } = func;
6351                if let Some(filter) = filter {
6352                    self.visit_expr_mut(filter);
6353                }
6354                let old_within_aggregate = self.within_aggregate;
6355                self.within_aggregate = true;
6356                self.table_disallowed_context
6357                    .push("aggregate function calls");
6358
6359                self.visit_function_args_mut(args);
6360
6361                self.within_aggregate = old_within_aggregate;
6362                self.table_disallowed_context.pop();
6363            }
6364            Ok(Func::Table { .. }) => {
6365                self.table_disallowed_context.push("other table functions");
6366                visit_mut::visit_function_mut(self, func);
6367                self.table_disallowed_context.pop();
6368            }
6369            _ => visit_mut::visit_function_mut(self, func),
6370        }
6371    }
6372
6373    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6374        // Don't go into subqueries.
6375    }
6376
6377    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6378        let (disallowed_context, func) = match expr {
6379            Expr::Case { .. } => (Some("CASE"), None),
6380            Expr::HomogenizingFunction {
6381                function: HomogenizingFunction::Coalesce,
6382                ..
6383            } => (Some("COALESCE"), None),
6384            Expr::Function(func) if self.in_select_item => {
6385                // If we're in a SELECT list, replace table functions with a uuid identifier
6386                // and save the table func so it can be planned elsewhere.
6387                let mut table_func = None;
6388                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6389                    if let Ok(Func::Table { .. }) = item.func() {
6390                        if let Some(context) = self.table_disallowed_context.last() {
6391                            self.err = Some(sql_err!(
6392                                "table functions are not allowed in {} (function {})",
6393                                context,
6394                                func.name
6395                            ));
6396                            return;
6397                        }
6398                        table_func = Some(func.clone());
6399                    }
6400                }
6401                // Since we will descend into the table func below, don't add its own disallow
6402                // context here, instead use visit_function to set that.
6403                (None, table_func)
6404            }
6405            _ => (None, None),
6406        };
6407        if let Some(func) = func {
6408            // Since we are trading out expr, we need to visit the table func here.
6409            visit_mut::visit_expr_mut(self, expr);
6410            // Don't attempt to replace table functions with unsupported syntax.
6411            if let Function {
6412                name: _,
6413                args: _,
6414                filter: None,
6415                over: None,
6416                distinct: false,
6417            } = &func
6418            {
6419                // Identical table functions can be de-duplicated.
6420                let unique_id = self.id_gen.allocate_id();
6421                let id = self
6422                    .tables
6423                    .entry(func)
6424                    .or_insert_with(|| format!("table_func_{unique_id}"));
6425                // We know this is okay because id is is 11 characters + <=20 characters, which is
6426                // less than our max length.
6427                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6428            }
6429        }
6430        if let Some(context) = disallowed_context {
6431            self.table_disallowed_context.push(context);
6432        }
6433
6434        visit_mut::visit_expr_mut(self, expr);
6435
6436        if disallowed_context.is_some() {
6437            self.table_disallowed_context.pop();
6438        }
6439    }
6440
6441    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6442        let old = self.in_select_item;
6443        self.in_select_item = true;
6444        visit_mut::visit_select_item_mut(self, si);
6445        self.in_select_item = old;
6446    }
6447}
6448
6449#[derive(Default)]
6450struct WindowFuncCollector {
6451    window_funcs: Vec<Expr<Aug>>,
6452}
6453
6454impl WindowFuncCollector {
6455    fn into_result(self) -> Vec<Expr<Aug>> {
6456        // Dedup while preserving the order.
6457        let mut seen = BTreeSet::new();
6458        let window_funcs_dedupped = self
6459            .window_funcs
6460            .into_iter()
6461            .filter(move |expr| seen.insert(expr.clone()))
6462            // Reverse the order, so that in case of a nested window function call, the
6463            // inner one is evaluated first.
6464            .rev()
6465            .collect();
6466        window_funcs_dedupped
6467    }
6468}
6469
6470impl Visit<'_, Aug> for WindowFuncCollector {
6471    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6472        match expr {
6473            Expr::Function(func) => {
6474                if func.over.is_some() {
6475                    self.window_funcs.push(expr.clone());
6476                }
6477            }
6478            _ => (),
6479        }
6480        visit::visit_expr(self, expr);
6481    }
6482
6483    fn visit_query(&mut self, _query: &Query<Aug>) {
6484        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6485    }
6486}
6487
6488/// Specifies how long a query will live.
6489#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6490pub enum QueryLifetime {
6491    /// The query's (or the expression's) result will be computed at one point in time.
6492    OneShot,
6493    /// The query (or expression) is used in a dataflow that maintains an index.
6494    Index,
6495    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6496    MaterializedView,
6497    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6498    Subscribe,
6499    /// The query (or expression) is part of a (non-materialized) view.
6500    View,
6501    /// The expression is part of a source definition.
6502    Source,
6503}
6504
6505impl QueryLifetime {
6506    /// (This used to impact whether the query is allowed to reason about the time at which it is
6507    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6508    /// mechanism, see `ExprPrepStyle`.)
6509    pub fn is_one_shot(&self) -> bool {
6510        let result = match self {
6511            QueryLifetime::OneShot => true,
6512            QueryLifetime::Index => false,
6513            QueryLifetime::MaterializedView => false,
6514            QueryLifetime::Subscribe => false,
6515            QueryLifetime::View => false,
6516            QueryLifetime::Source => false,
6517        };
6518        assert_eq!(!result, self.is_maintained());
6519        result
6520    }
6521
6522    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6523    /// turned into a `TopK`.
6524    pub fn is_maintained(&self) -> bool {
6525        match self {
6526            QueryLifetime::OneShot => false,
6527            QueryLifetime::Index => true,
6528            QueryLifetime::MaterializedView => true,
6529            QueryLifetime::Subscribe => true,
6530            QueryLifetime::View => true,
6531            QueryLifetime::Source => true,
6532        }
6533    }
6534
6535    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6536    pub fn allow_show(&self) -> bool {
6537        match self {
6538            QueryLifetime::OneShot => true,
6539            QueryLifetime::Index => false,
6540            QueryLifetime::MaterializedView => false,
6541            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6542            QueryLifetime::View => false,
6543            QueryLifetime::Source => false,
6544        }
6545    }
6546}
6547
6548/// Description of a CTE sufficient for query planning.
6549#[derive(Debug, Clone)]
6550pub struct CteDesc {
6551    pub name: String,
6552    pub desc: RelationDesc,
6553}
6554
6555/// The state required when planning a `Query`.
6556#[derive(Debug, Clone)]
6557pub struct QueryContext<'a> {
6558    /// The context for the containing `Statement`.
6559    pub scx: &'a StatementContext<'a>,
6560    /// The lifetime that the planned query will have.
6561    pub lifetime: QueryLifetime,
6562    /// The scopes of the outer relation expression.
6563    pub outer_scopes: Vec<Scope>,
6564    /// The type of the outer relation expressions.
6565    pub outer_relation_types: Vec<SqlRelationType>,
6566    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6567    pub ctes: BTreeMap<LocalId, CteDesc>,
6568    /// A name manager, for interning column names that will be stored in HIR and MIR.
6569    pub name_manager: Rc<RefCell<NameManager>>,
6570    pub recursion_guard: RecursionGuard,
6571}
6572
6573impl CheckedRecursion for QueryContext<'_> {
6574    fn recursion_guard(&self) -> &RecursionGuard {
6575        &self.recursion_guard
6576    }
6577}
6578
6579impl<'a> QueryContext<'a> {
6580    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6581        QueryContext {
6582            scx,
6583            lifetime,
6584            outer_scopes: vec![],
6585            outer_relation_types: vec![],
6586            ctes: BTreeMap::new(),
6587            name_manager: Rc::new(RefCell::new(NameManager::new())),
6588            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6589        }
6590    }
6591
6592    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6593        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6594    }
6595
6596    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6597    /// `self`.
6598    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6599        let ctes = self.ctes.clone();
6600        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6601        let outer_relation_types = iter::once(relation_type)
6602            .chain(self.outer_relation_types.clone())
6603            .collect();
6604        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6605        let name_manager = Rc::clone(&self.name_manager);
6606
6607        QueryContext {
6608            scx: self.scx,
6609            lifetime: self.lifetime,
6610            outer_scopes,
6611            outer_relation_types,
6612            ctes,
6613            name_manager,
6614            recursion_guard: self.recursion_guard.clone(),
6615        }
6616    }
6617
6618    /// Derives a `QueryContext` for a scope that contains no columns.
6619    fn empty_derived_context(&self) -> QueryContext<'a> {
6620        let scope = Scope::empty();
6621        let ty = SqlRelationType::empty();
6622        self.derived_context(scope, ty)
6623    }
6624
6625    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6626    /// CTE.
6627    pub fn resolve_table_name(
6628        &self,
6629        object: ResolvedItemName,
6630    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6631        match object {
6632            ResolvedItemName::Item {
6633                id,
6634                full_name,
6635                version,
6636                ..
6637            } => {
6638                let item = self.scx.get_item(&id).at_version(version);
6639                let desc = match item.relation_desc() {
6640                    Some(desc) => desc.clone(),
6641                    None => {
6642                        return Err(PlanError::InvalidDependency {
6643                            name: full_name.to_string(),
6644                            item_type: item.item_type().to_string(),
6645                        });
6646                    }
6647                };
6648                let expr = HirRelationExpr::Get {
6649                    id: Id::Global(item.global_id()),
6650                    typ: desc.typ().clone(),
6651                };
6652
6653                let name = full_name.into();
6654                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6655
6656                Ok((expr, scope))
6657            }
6658            ResolvedItemName::Cte { id, name } => {
6659                let name = name.into();
6660                let cte = self.ctes.get(&id).unwrap();
6661                let expr = HirRelationExpr::Get {
6662                    id: Id::Local(id),
6663                    typ: cte.desc.typ().clone(),
6664                };
6665
6666                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6667
6668                Ok((expr, scope))
6669            }
6670            ResolvedItemName::Error => bail_internal!("should have been caught in name resolution"),
6671        }
6672    }
6673
6674    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6675    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6676    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6677        self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6678    }
6679}
6680
6681/// A bundle of unrelated things that we need for planning `Expr`s.
6682#[derive(Debug, Clone)]
6683pub struct ExprContext<'a> {
6684    pub qcx: &'a QueryContext<'a>,
6685    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6686    pub name: &'a str,
6687    /// The context for the `Query` that contains this `Expr`.
6688    /// The current scope.
6689    pub scope: &'a Scope,
6690    /// The type of the current relation expression upon which this scalar
6691    /// expression will be evaluated.
6692    pub relation_type: &'a SqlRelationType,
6693    /// Are aggregate functions allowed in this context
6694    pub allow_aggregates: bool,
6695    /// Are subqueries allowed in this context
6696    pub allow_subqueries: bool,
6697    /// Are parameters allowed in this context.
6698    pub allow_parameters: bool,
6699    /// Are window functions allowed in this context
6700    pub allow_windows: bool,
6701}
6702
6703impl CheckedRecursion for ExprContext<'_> {
6704    fn recursion_guard(&self) -> &RecursionGuard {
6705        &self.qcx.recursion_guard
6706    }
6707}
6708
6709impl<'a> ExprContext<'a> {
6710    pub fn catalog(&self) -> &dyn SessionCatalog {
6711        self.qcx.scx.catalog
6712    }
6713
6714    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6715        let mut ecx = self.clone();
6716        ecx.name = name;
6717        ecx
6718    }
6719
6720    pub fn column_type<E>(&self, expr: &E) -> E::Type
6721    where
6722        E: AbstractExpr,
6723    {
6724        expr.typ(
6725            &self.qcx.outer_relation_types,
6726            self.relation_type,
6727            &self.qcx.scx.param_types.borrow(),
6728        )
6729    }
6730
6731    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6732    where
6733        E: AbstractExpr,
6734    {
6735        self.column_type(expr).scalar_type()
6736    }
6737
6738    fn derived_query_context(&self) -> QueryContext<'_> {
6739        let mut scope = self.scope.clone();
6740        scope.lateral_barrier = true;
6741        self.qcx.derived_context(scope, self.relation_type.clone())
6742    }
6743
6744    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6745        self.qcx.scx.require_feature_flag(flag)
6746    }
6747
6748    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6749        &self.qcx.scx.param_types
6750    }
6751
6752    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6753    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6754    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6755        self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6756    }
6757
6758    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6759        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6760    }
6761}
6762
6763/// Manages column names, doing lightweight string internment.
6764///
6765/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6766/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6767/// to ensure maximal sharing.
6768#[derive(Debug, Clone)]
6769pub struct NameManager(BTreeSet<Arc<str>>);
6770
6771impl NameManager {
6772    /// Creates a new `NameManager`, with no interned names
6773    pub fn new() -> Self {
6774        Self(BTreeSet::new())
6775    }
6776
6777    /// Interns a string, returning a reference-counted pointer to the interned
6778    /// string.
6779    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6780        let s = s.as_ref();
6781        if let Some(interned) = self.0.get(s) {
6782            Arc::clone(interned)
6783        } else {
6784            let interned: Arc<str> = Arc::from(s);
6785            self.0.insert(Arc::clone(&interned));
6786            interned
6787        }
6788    }
6789
6790    /// Interns a string representing a reference to a `ScopeItem`, returning a
6791    /// reference-counted pointer to the interned string.
6792    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6793        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6794        //
6795        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6796        // name information. Note that as of 2025-04-09, we don't support column
6797        // renames.
6798        //
6799        // A few bad alternatives:
6800        //
6801        // (1) Store it but don't write it down. This fails because the expression
6802        //     cache will erase our names on restart.
6803        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6804        //     get the right names. But the world now and the world when we made
6805        //     those objects may be different.
6806        // (3) Just don't write down the table name. Nothing fails... for now.
6807
6808        self.intern(item.column_name.as_str())
6809    }
6810}
6811
6812#[cfg(test)]
6813mod test {
6814    use super::*;
6815
6816    /// Ensure that `NameManager`'s string interning works as expected.
6817    ///
6818    /// In particular, structurally but not referentially identical strings should
6819    /// be interned to the same `Arc`ed pointer.
6820    #[mz_ore::test]
6821    pub fn test_name_manager_string_interning() {
6822        let mut nm = NameManager::new();
6823
6824        let orig_hi = "hi";
6825        let hi = nm.intern(orig_hi);
6826        let hello = nm.intern("hello");
6827
6828        assert_ne!(hi.as_ptr(), hello.as_ptr());
6829
6830        // this static string is _likely_ the same as `orig_hi``
6831        let hi2 = nm.intern("hi");
6832        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6833
6834        // generate a "hi" string that doesn't get optimized to the same static string
6835        let s = format!(
6836            "{}{}",
6837            hi.chars().nth(0).unwrap(),
6838            hi2.chars().nth(1).unwrap()
6839        );
6840        // make sure that we're testing with a fresh string!
6841        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6842
6843        let hi3 = nm.intern(s);
6844        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6845    }
6846}