Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50    ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51    MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
56    func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::{
70    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
71    ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
72    UNKNOWN_COLUMN_NAME, strconv,
73};
74use mz_sql_parser::ast::display::AstDisplay;
75use mz_sql_parser::ast::visit::Visit;
76use mz_sql_parser::ast::visit_mut::{self, VisitMut};
77use mz_sql_parser::ast::{
78    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
79    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
80    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
81    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
82    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
83    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
84    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
85    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
86};
87use mz_sql_parser::ident;
88
89use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
90use crate::func::{self, Func, FuncSpec, TableFuncImpl};
91use crate::names::{
92    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
93};
94use crate::plan::PlanError::InvalidWmrRecursionLimit;
95use crate::plan::error::PlanError;
96use crate::plan::hir::{
97    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
98    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
99    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
100    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
101};
102use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
103use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
104use crate::plan::statement::{StatementContext, StatementDesc, show};
105use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
106use crate::plan::{
107    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
108    literal, transform_ast,
109};
110use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
111use crate::session::vars::{self, FeatureFlag};
112use crate::{ORDINALITY_COL_NAME, normalize};
113
114#[derive(Debug)]
115pub struct PlannedRootQuery<E> {
116    pub expr: E,
117    pub desc: RelationDesc,
118    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
119    pub scope: Scope,
120}
121
122/// Plans a top-level query, returning the `HirRelationExpr` describing the query
123/// plan, the `RelationDesc` describing the shape of the result set, a
124/// `RowSetFinishing` describing post-processing that must occur before results
125/// are sent to the client, and the types of the parameters in the query, if any
126/// were present.
127///
128/// Note that the returned `RelationDesc` describes the expression after
129/// applying the returned `RowSetFinishing`.
130#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
131pub fn plan_root_query(
132    scx: &StatementContext,
133    mut query: Query<Aug>,
134    lifetime: QueryLifetime,
135) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
136    transform_ast::transform(scx, &mut query)?;
137    let mut qcx = QueryContext::root(scx, lifetime);
138    let PlannedQuery {
139        mut expr,
140        scope,
141        order_by,
142        limit,
143        offset,
144        project,
145        group_size_hints,
146    } = plan_query(&mut qcx, &query)?;
147
148    let mut finishing = RowSetFinishing {
149        limit,
150        offset,
151        project,
152        order_by,
153    };
154
155    // Attempt to push the finishing's ordering past its projection. This allows
156    // data to be projected down on the workers rather than the coordinator. It
157    // also improves the optimizer's demand analysis, as the optimizer can only
158    // reason about demand information in `expr` (i.e., it can't see
159    // `finishing.project`).
160    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
161
162    if lifetime.is_maintained() {
163        expr.finish_maintained(&mut finishing, group_size_hints);
164    }
165
166    let typ = qcx.relation_type(&expr);
167    let typ = SqlRelationType::new(
168        finishing
169            .project
170            .iter()
171            .map(|i| typ.column_types[*i].clone())
172            .collect(),
173    );
174    let desc = RelationDesc::new(typ, scope.column_names());
175
176    Ok(PlannedRootQuery {
177        expr,
178        desc,
179        finishing,
180        scope,
181    })
182}
183
184/// TODO(ct2): Dedup this with [plan_root_query].
185#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
186pub fn plan_ct_query(
187    qcx: &mut QueryContext,
188    mut query: Query<Aug>,
189) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
190    transform_ast::transform(qcx.scx, &mut query)?;
191    let PlannedQuery {
192        mut expr,
193        scope,
194        order_by,
195        limit,
196        offset,
197        project,
198        group_size_hints,
199    } = plan_query(qcx, &query)?;
200
201    let mut finishing = RowSetFinishing {
202        limit,
203        offset,
204        project,
205        order_by,
206    };
207
208    // Attempt to push the finishing's ordering past its projection. This allows
209    // data to be projected down on the workers rather than the coordinator. It
210    // also improves the optimizer's demand analysis, as the optimizer can only
211    // reason about demand information in `expr` (i.e., it can't see
212    // `finishing.project`).
213    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
214
215    expr.finish_maintained(&mut finishing, group_size_hints);
216
217    let typ = qcx.relation_type(&expr);
218    let typ = SqlRelationType::new(
219        finishing
220            .project
221            .iter()
222            .map(|i| typ.column_types[*i].clone())
223            .collect(),
224    );
225    let desc = RelationDesc::new(typ, scope.column_names());
226
227    Ok(PlannedRootQuery {
228        expr,
229        desc,
230        finishing,
231        scope,
232    })
233}
234
235/// Attempts to push a projection through an order by.
236///
237/// The returned bool indicates whether the pushdown was successful or not.
238/// Successful pushdown requires that all the columns referenced in `order_by`
239/// are included in `project`.
240///
241/// When successful, `expr` is wrapped in a projection node, `order_by` is
242/// rewritten to account for the pushed-down projection, and `project` is
243/// replaced with the trivial projection. When unsuccessful, no changes are made
244/// to any of the inputs.
245fn try_push_projection_order_by(
246    expr: &mut HirRelationExpr,
247    project: &mut Vec<usize>,
248    order_by: &mut Vec<ColumnOrder>,
249) -> bool {
250    let mut unproject = vec![None; expr.arity()];
251    for (out_i, in_i) in project.iter().copied().enumerate() {
252        unproject[in_i] = Some(out_i);
253    }
254    if order_by
255        .iter()
256        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
257    {
258        let trivial_project = (0..project.len()).collect();
259        *expr = expr.take().project(mem::replace(project, trivial_project));
260        for ob in order_by {
261            ob.column = unproject[ob.column].unwrap();
262        }
263        true
264    } else {
265        false
266    }
267}
268
269pub fn plan_insert_query(
270    scx: &StatementContext,
271    table_name: ResolvedItemName,
272    columns: Vec<Ident>,
273    source: InsertSource<Aug>,
274    returning: Vec<SelectItem<Aug>>,
275) -> Result<
276    (
277        CatalogItemId,
278        HirRelationExpr,
279        PlannedRootQuery<Vec<HirScalarExpr>>,
280    ),
281    PlanError,
282> {
283    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
284    let table = scx.get_item_by_resolved_name(&table_name)?;
285
286    // Validate the target of the insert.
287    if table.item_type() != CatalogItemType::Table {
288        sql_bail!(
289            "cannot insert into {} '{}'",
290            table.item_type(),
291            table_name.full_name_str()
292        );
293    }
294    let desc = table.relation_desc().expect("table has desc");
295    let mut defaults = table
296        .writable_table_details()
297        .ok_or_else(|| {
298            sql_err!(
299                "cannot insert into non-writeable table '{}'",
300                table_name.full_name_str()
301            )
302        })?
303        .to_vec();
304
305    for default in &mut defaults {
306        transform_ast::transform(scx, default)?;
307    }
308
309    if table.id().is_system() {
310        sql_bail!(
311            "cannot insert into system table '{}'",
312            table_name.full_name_str()
313        );
314    }
315
316    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
317
318    // Validate target column order.
319    let mut source_types = Vec::with_capacity(columns.len());
320    let mut ordering = Vec::with_capacity(columns.len());
321
322    if columns.is_empty() {
323        // Columns in source query must be in order. Let's guess the full shape and truncate to the
324        // right size later after planning the source query
325        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
326        ordering.extend(0..desc.arity());
327    } else {
328        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
329            .iter()
330            .enumerate()
331            .map(|(idx, (name, typ))| (name, (idx, typ)))
332            .collect();
333
334        for c in &columns {
335            if let Some((idx, typ)) = column_by_name.get(c) {
336                ordering.push(*idx);
337                source_types.push(&typ.scalar_type);
338            } else {
339                sql_bail!(
340                    "column {} of relation {} does not exist",
341                    c.quoted(),
342                    table_name.full_name_str().quoted()
343                );
344            }
345        }
346        if let Some(dup) = columns.iter().duplicates().next() {
347            sql_bail!("column {} specified more than once", dup.quoted());
348        }
349    };
350
351    // Plan the source.
352    let expr = match source {
353        InsertSource::Query(mut query) => {
354            transform_ast::transform(scx, &mut query)?;
355
356            match query {
357                // Special-case simple VALUES clauses as PostgreSQL does.
358                Query {
359                    body: SetExpr::Values(Values(values)),
360                    ctes,
361                    order_by,
362                    limit: None,
363                    offset: None,
364                } if ctes.is_empty() && order_by.is_empty() => {
365                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
366                    plan_values_insert(&qcx, &names, &source_types, &values)?
367                }
368                _ => {
369                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
370                    expr
371                }
372            }
373        }
374        InsertSource::DefaultValues => {
375            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
376        }
377    };
378
379    let expr_arity = expr.arity();
380
381    // Validate that the arity of the source query is at most the size of declared columns or the
382    // size of the table if none are declared
383    let max_columns = if columns.is_empty() {
384        desc.arity()
385    } else {
386        columns.len()
387    };
388    if expr_arity > max_columns {
389        sql_bail!("INSERT has more expressions than target columns");
390    }
391    // But it should never have less than the declared columns (or zero)
392    if expr_arity < columns.len() {
393        sql_bail!("INSERT has more target columns than expressions");
394    }
395
396    // Trim now that we know for sure the correct arity of the source query
397    source_types.truncate(expr_arity);
398    ordering.truncate(expr_arity);
399
400    // Ensure the types of the source query match the types of the target table,
401    // installing assignment casts where necessary and possible.
402    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
403        sql_err!(
404            "column {} is of type {} but expression is of type {}",
405            desc.get_name(ordering[e.column]).quoted(),
406            qcx.humanize_scalar_type(&e.target_type, false),
407            qcx.humanize_scalar_type(&e.source_type, false),
408        )
409    })?;
410
411    // Fill in any omitted columns and rearrange into correct order
412    let mut map_exprs = vec![];
413    let mut project_key = Vec::with_capacity(desc.arity());
414
415    // Maps from table column index to position in the source query
416    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
417
418    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
419    for (col_idx, (col_typ, default)) in column_details {
420        if let Some(src_idx) = col_to_source.get(&col_idx) {
421            project_key.push(*src_idx);
422        } else {
423            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
424            project_key.push(expr_arity + map_exprs.len());
425            map_exprs.push(hir);
426        }
427    }
428
429    let returning = {
430        let (scope, typ) = if let ResolvedItemName::Item {
431            full_name,
432            version: _,
433            ..
434        } = table_name
435        {
436            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
437            let typ = desc.typ().clone();
438            (scope, typ)
439        } else {
440            (Scope::empty(), SqlRelationType::empty())
441        };
442        let ecx = &ExprContext {
443            qcx: &qcx,
444            name: "RETURNING clause",
445            scope: &scope,
446            relation_type: &typ,
447            allow_aggregates: false,
448            allow_subqueries: false,
449            allow_parameters: true,
450            allow_windows: false,
451        };
452        let table_func_names = BTreeMap::new();
453        let mut output_columns = vec![];
454        let mut new_exprs = vec![];
455        let mut new_type = SqlRelationType::empty();
456        for mut si in returning {
457            transform_ast::transform(scx, &mut si)?;
458            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
459                let expr = match &select_item {
460                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
461                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
462                };
463                output_columns.push(column_name);
464                let typ = ecx.column_type(&expr);
465                new_type.column_types.push(typ);
466                new_exprs.push(expr);
467            }
468        }
469        let desc = RelationDesc::new(new_type, output_columns);
470        let desc_arity = desc.arity();
471        PlannedRootQuery {
472            expr: new_exprs,
473            desc,
474            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
475            scope,
476        }
477    };
478
479    Ok((
480        table.id(),
481        expr.map(map_exprs).project(project_key),
482        returning,
483    ))
484}
485
486/// Determines the mapping between some external data and a Materialize relation.
487///
488/// Returns the following:
489/// * [`CatalogItemId`] for the destination table.
490/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
491/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
492///   since we now return a [`MapFilterProject`].
493/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
494///   destination table.
495///
496pub fn plan_copy_item(
497    scx: &StatementContext,
498    item_name: ResolvedItemName,
499    columns: Vec<Ident>,
500) -> Result<
501    (
502        CatalogItemId,
503        RelationDesc,
504        Vec<ColumnIndex>,
505        Option<MapFilterProject>,
506    ),
507    PlanError,
508> {
509    let item = scx.get_item_by_resolved_name(&item_name)?;
510    let fullname = scx.catalog.resolve_full_name(item.name());
511    let table_desc = match item.relation_desc() {
512        Some(desc) => desc.into_owned(),
513        None => {
514            return Err(PlanError::InvalidDependency {
515                name: fullname.to_string(),
516                item_type: item.item_type().to_string(),
517            });
518        }
519    };
520    let mut ordering = Vec::with_capacity(columns.len());
521
522    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
523    // should be simplified. The reason they are currently separate code paths is so we can roll
524    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
525
526    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
527    // FROM SOURCE ...`), then we generate an MFP.
528    //
529    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
530    // so it's not always guaranteed that our `item` is a table.
531    let mfp = if let Some(table_defaults) = item.writable_table_details() {
532        let mut table_defaults = table_defaults.to_vec();
533
534        for default in &mut table_defaults {
535            transform_ast::transform(scx, default)?;
536        }
537
538        // Fill in any omitted columns and rearrange into correct order
539        let source_column_names: Vec<_> = columns
540            .iter()
541            .cloned()
542            .map(normalize::column_name)
543            .collect();
544
545        let mut default_exprs = Vec::new();
546        let mut project_keys = Vec::with_capacity(table_desc.arity());
547
548        // For each column in the destination table, either project it from the source data, or provide
549        // an expression to fill in a default value.
550        let column_details = table_desc.iter().zip_eq(table_defaults);
551        for ((col_name, col_type), col_default) in column_details {
552            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
553            if let Some(src_idx) = maybe_src_idx {
554                project_keys.push(src_idx);
555            } else {
556                // If one a column from the table does not exist in the source data, then a default
557                // value will get appended to the end of the input Row from the source data.
558                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
559                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
560                project_keys.push(source_column_names.len() + default_exprs.len());
561                default_exprs.push(mir);
562            }
563        }
564
565        let mfp = MapFilterProject::new(source_column_names.len())
566            .map(default_exprs)
567            .project(project_keys);
568        Some(mfp)
569    } else {
570        None
571    };
572
573    // Create a mapping from input data to the table we're copying into.
574    let source_desc = if columns.is_empty() {
575        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
576        ordering.extend(indexes);
577
578        // The source data should be in the same order as the table.
579        table_desc
580    } else {
581        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
582        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
583            .iter_all()
584            .map(|(idx, name, typ)| (name, (*idx, typ)))
585            .collect();
586
587        let mut names = Vec::with_capacity(columns.len());
588        let mut source_types = Vec::with_capacity(columns.len());
589
590        for c in &columns {
591            if let Some((idx, typ)) = column_by_name.get(c) {
592                ordering.push(*idx);
593                source_types.push((*typ).clone());
594                names.push(c.clone());
595            } else {
596                sql_bail!(
597                    "column {} of relation {} does not exist",
598                    c.quoted(),
599                    item_name.full_name_str().quoted()
600                );
601            }
602        }
603        if let Some(dup) = columns.iter().duplicates().next() {
604            sql_bail!("column {} specified more than once", dup.quoted());
605        }
606
607        // The source data is a different shape than the destination table.
608        RelationDesc::new(SqlRelationType::new(source_types), names)
609    };
610
611    Ok((item.id(), source_desc, ordering, mfp))
612}
613
614/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
615///
616/// TODO(cf3): Merge this method with [`plan_copy_item`].
617pub fn plan_copy_from(
618    scx: &StatementContext,
619    table_name: ResolvedItemName,
620    columns: Vec<Ident>,
621) -> Result<
622    (
623        CatalogItemId,
624        RelationDesc,
625        Vec<ColumnIndex>,
626        Option<MapFilterProject>,
627    ),
628    PlanError,
629> {
630    let table = scx.get_item_by_resolved_name(&table_name)?;
631
632    // Validate the target of the insert.
633    if table.item_type() != CatalogItemType::Table {
634        sql_bail!(
635            "cannot insert into {} '{}'",
636            table.item_type(),
637            table_name.full_name_str()
638        );
639    }
640
641    let _ = table.writable_table_details().ok_or_else(|| {
642        sql_err!(
643            "cannot insert into non-writeable table '{}'",
644            table_name.full_name_str()
645        )
646    })?;
647
648    if table.id().is_system() {
649        sql_bail!(
650            "cannot insert into system table '{}'",
651            table_name.full_name_str()
652        );
653    }
654    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
655
656    Ok((id, desc, ordering, mfp))
657}
658
659/// Builds a plan that adds the default values for the missing columns and re-orders
660/// the datums in the given rows to match the order in the target table.
661pub fn plan_copy_from_rows(
662    pcx: &PlanContext,
663    catalog: &dyn SessionCatalog,
664    target_id: CatalogItemId,
665    target_name: String,
666    columns: Vec<ColumnIndex>,
667    rows: Vec<mz_repr::Row>,
668) -> Result<HirRelationExpr, PlanError> {
669    let scx = StatementContext::new(Some(pcx), catalog);
670
671    // Always copy at the latest version of the table.
672    let table = catalog
673        .try_get_item(&target_id)
674        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
675        .at_version(RelationVersionSelector::Latest);
676
677    let mut defaults = table
678        .writable_table_details()
679        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
680        .to_vec();
681
682    for default in &mut defaults {
683        transform_ast::transform(&scx, default)?;
684    }
685
686    let desc = table.relation_desc().expect("table has desc");
687    let column_types = columns
688        .iter()
689        .map(|x| desc.get_type(x).clone())
690        .map(|mut x| {
691            // Null constraint is enforced later, when inserting the row in the table.
692            // Without this, an assert is hit during lowering.
693            x.nullable = true;
694            x
695        })
696        .collect();
697    let typ = SqlRelationType::new(column_types);
698    let expr = HirRelationExpr::Constant {
699        rows,
700        typ: typ.clone(),
701    };
702
703    // Exit early with just the raw constant if we know that all columns are present
704    // and in the correct order. This lets us bypass expensive downstream optimizations
705    // more easily, as at every stage we know this expression is nothing more than
706    // a constant (as opposed to e.g. a constant with with an identity map and identity
707    // projection).
708    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
709    if columns == default {
710        return Ok(expr);
711    }
712
713    // Fill in any omitted columns and rearrange into correct order
714    let mut map_exprs = vec![];
715    let mut project_key = Vec::with_capacity(desc.arity());
716
717    // Maps from table column index to position in the source query
718    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
719
720    let column_details = desc.iter_all().zip_eq(defaults);
721    for ((col_idx, _col_name, col_typ), default) in column_details {
722        if let Some(src_idx) = col_to_source.get(&col_idx) {
723            project_key.push(*src_idx);
724        } else {
725            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
726            project_key.push(typ.arity() + map_exprs.len());
727            map_exprs.push(hir);
728        }
729    }
730
731    Ok(expr.map(map_exprs).project(project_key))
732}
733
734/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
735pub struct ReadThenWritePlan {
736    pub id: CatalogItemId,
737    /// Read portion of query.
738    ///
739    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
740    /// retractions.
741    pub selection: HirRelationExpr,
742    /// Map from column index to SET expression. Empty for DELETE statements.
743    pub assignments: BTreeMap<usize, HirScalarExpr>,
744    pub finishing: RowSetFinishing,
745}
746
747pub fn plan_delete_query(
748    scx: &StatementContext,
749    mut delete_stmt: DeleteStatement<Aug>,
750) -> Result<ReadThenWritePlan, PlanError> {
751    transform_ast::transform(scx, &mut delete_stmt)?;
752
753    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
754    plan_mutation_query_inner(
755        qcx,
756        delete_stmt.table_name,
757        delete_stmt.alias,
758        delete_stmt.using,
759        vec![],
760        delete_stmt.selection,
761    )
762}
763
764pub fn plan_update_query(
765    scx: &StatementContext,
766    mut update_stmt: UpdateStatement<Aug>,
767) -> Result<ReadThenWritePlan, PlanError> {
768    transform_ast::transform(scx, &mut update_stmt)?;
769
770    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
771
772    plan_mutation_query_inner(
773        qcx,
774        update_stmt.table_name,
775        update_stmt.alias,
776        vec![],
777        update_stmt.assignments,
778        update_stmt.selection,
779    )
780}
781
782pub fn plan_mutation_query_inner(
783    qcx: QueryContext,
784    table_name: ResolvedItemName,
785    alias: Option<TableAlias>,
786    using: Vec<TableWithJoins<Aug>>,
787    assignments: Vec<Assignment<Aug>>,
788    selection: Option<Expr<Aug>>,
789) -> Result<ReadThenWritePlan, PlanError> {
790    // Get ID and version of the relation desc.
791    let (id, version) = match table_name {
792        ResolvedItemName::Item { id, version, .. } => (id, version),
793        _ => sql_bail!("cannot mutate non-user table"),
794    };
795
796    // Perform checks on item with given ID.
797    let item = qcx.scx.get_item(&id).at_version(version);
798    if item.item_type() != CatalogItemType::Table {
799        sql_bail!(
800            "cannot mutate {} '{}'",
801            item.item_type(),
802            table_name.full_name_str()
803        );
804    }
805    let _ = item.writable_table_details().ok_or_else(|| {
806        sql_err!(
807            "cannot mutate non-writeable table '{}'",
808            table_name.full_name_str()
809        )
810    })?;
811    if id.is_system() {
812        sql_bail!(
813            "cannot mutate system table '{}'",
814            table_name.full_name_str()
815        );
816    }
817
818    // Derive structs for operation from validated table
819    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
820    let scope = plan_table_alias(scope, alias.as_ref())?;
821    let desc = item.relation_desc().expect("table has desc");
822    let relation_type = qcx.relation_type(&get);
823
824    if using.is_empty() {
825        if let Some(expr) = selection {
826            let ecx = &ExprContext {
827                qcx: &qcx,
828                name: "WHERE clause",
829                scope: &scope,
830                relation_type: &relation_type,
831                allow_aggregates: false,
832                allow_subqueries: true,
833                allow_parameters: true,
834                allow_windows: false,
835            };
836            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
837            get = get.filter(vec![expr]);
838        }
839    } else {
840        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
841    }
842
843    let mut sets = BTreeMap::new();
844    for Assignment { id, value } in assignments {
845        // Get the index and type of the column.
846        let name = normalize::column_name(id);
847        match desc.get_by_name(&name) {
848            Some((idx, typ)) => {
849                let ecx = &ExprContext {
850                    qcx: &qcx,
851                    name: "SET clause",
852                    scope: &scope,
853                    relation_type: &relation_type,
854                    allow_aggregates: false,
855                    allow_subqueries: false,
856                    allow_parameters: true,
857                    allow_windows: false,
858                };
859                let expr = plan_expr(ecx, &value)?.cast_to(
860                    ecx,
861                    CastContext::Assignment,
862                    &typ.scalar_type,
863                )?;
864
865                if sets.insert(idx, expr).is_some() {
866                    sql_bail!("column {} set twice", name)
867                }
868            }
869            None => sql_bail!("unknown column {}", name),
870        };
871    }
872
873    let finishing = RowSetFinishing {
874        order_by: vec![],
875        limit: None,
876        offset: 0,
877        project: (0..desc.arity()).collect(),
878    };
879
880    Ok(ReadThenWritePlan {
881        id,
882        selection: get,
883        finishing,
884        assignments: sets,
885    })
886}
887
888// Adjust `get` to perform an existential subquery on `using` accounting for
889// `selection`.
890//
891// If `USING`, we essentially want to rewrite the query as a correlated
892// existential subquery, i.e.
893// ```
894// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
895// ```
896// However, we can't do that directly because of esoteric rules w/r/t `lateral`
897// subqueries.
898// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
899fn handle_mutation_using_clause(
900    qcx: &QueryContext,
901    selection: Option<Expr<Aug>>,
902    using: Vec<TableWithJoins<Aug>>,
903    get: HirRelationExpr,
904    outer_scope: Scope,
905) -> Result<HirRelationExpr, PlanError> {
906    // Plan `USING` as a cross-joined `FROM` without knowledge of the
907    // statement's `FROM` target. This prevents `lateral` subqueries from
908    // "seeing" the `FROM` target.
909    let (mut using_rel_expr, using_scope) =
910        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
911            let (left, left_scope) = l;
912            plan_join(
913                qcx,
914                left,
915                left_scope,
916                &Join {
917                    relation: TableFactor::NestedJoin {
918                        join: Box::new(twj),
919                        alias: None,
920                    },
921                    join_operator: JoinOperator::CrossJoin,
922                },
923            )
924        })?;
925
926    if let Some(expr) = selection {
927        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
928        // PG-like semantics e.g. expressing ambiguous column references. We put
929        // `USING...` first for no real reason, but making a different decision
930        // would require adjusting the column references on this relation
931        // differently.
932        let on = HirScalarExpr::literal_true();
933        let joined = using_rel_expr
934            .clone()
935            .join(get.clone(), on, JoinKind::Inner);
936        let joined_scope = using_scope.product(outer_scope)?;
937        let joined_relation_type = qcx.relation_type(&joined);
938
939        let ecx = &ExprContext {
940            qcx,
941            name: "WHERE clause",
942            scope: &joined_scope,
943            relation_type: &joined_relation_type,
944            allow_aggregates: false,
945            allow_subqueries: true,
946            allow_parameters: true,
947            allow_windows: false,
948        };
949
950        // Plan the filter expression on `FROM, USING...`.
951        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
952
953        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
954        // those to the right of `using_rel_expr`) to instead be correlated to
955        // the outer relation, i.e. `get`.
956        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
957        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
958        use mz_expr::visit::Visit;
959        expr.visit_mut_post(&mut |e| {
960            if let HirScalarExpr::Column(c, _name) = e {
961                if c.column >= using_rel_arity {
962                    c.level += 1;
963                    c.column -= using_rel_arity;
964                };
965            }
966        })?;
967
968        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
969        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
970        // relation.
971        using_rel_expr = using_rel_expr.filter(vec![expr]);
972    } else {
973        // Check that scopes are at compatible (i.e. do not double-reference
974        // same table), despite lack of selection
975        let _joined_scope = using_scope.product(outer_scope)?;
976    }
977    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
978    // whether any rows are returned, and not on the contents of those rows,
979    // the output list of the subquery is normally unimportant.
980    //
981    // This means we don't need to worry about projecting/mapping any
982    // additional expressions here.
983    //
984    // https://www.postgresql.org/docs/14/functions-subquery.html
985
986    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
987    Ok(get.filter(vec![using_rel_expr.exists()]))
988}
989
990#[derive(Debug)]
991pub(crate) struct CastRelationError {
992    pub(crate) column: usize,
993    pub(crate) source_type: SqlScalarType,
994    pub(crate) target_type: SqlScalarType,
995}
996
997/// Cast a relation from one type to another using the specified type of cast.
998///
999/// The length of `target_types` must match the arity of `expr`.
1000pub(crate) fn cast_relation<'a, I>(
1001    qcx: &QueryContext,
1002    ccx: CastContext,
1003    expr: HirRelationExpr,
1004    target_types: I,
1005) -> Result<HirRelationExpr, CastRelationError>
1006where
1007    I: IntoIterator<Item = &'a SqlScalarType>,
1008{
1009    let ecx = &ExprContext {
1010        qcx,
1011        name: "values",
1012        scope: &Scope::empty(),
1013        relation_type: &qcx.relation_type(&expr),
1014        allow_aggregates: false,
1015        allow_subqueries: true,
1016        allow_parameters: true,
1017        allow_windows: false,
1018    };
1019    let mut map_exprs = vec![];
1020    let mut project_key = vec![];
1021    for (i, target_typ) in target_types.into_iter().enumerate() {
1022        let expr = HirScalarExpr::column(i);
1023        // We plan every cast and check the evaluated expressions rather than
1024        // checking the types directly because of some complex casting rules
1025        // between types not expressed in `SqlScalarType` equality.
1026        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1027            Ok(cast_expr) => {
1028                if expr == cast_expr {
1029                    // Cast between types was unnecessary
1030                    project_key.push(i);
1031                } else {
1032                    // Cast between types required
1033                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1034                    map_exprs.push(cast_expr);
1035                }
1036            }
1037            Err(_) => {
1038                return Err(CastRelationError {
1039                    column: i,
1040                    source_type: ecx.scalar_type(&expr),
1041                    target_type: target_typ.clone(),
1042                });
1043            }
1044        }
1045    }
1046    Ok(expr.map(map_exprs).project(project_key))
1047}
1048
1049/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1050/// VIEW` statement.
1051pub fn plan_as_of(
1052    scx: &StatementContext,
1053    as_of: Option<AsOf<Aug>>,
1054) -> Result<QueryWhen, PlanError> {
1055    match as_of {
1056        None => Ok(QueryWhen::Immediately),
1057        Some(as_of) => match as_of {
1058            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1059            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1060        },
1061    }
1062}
1063
1064/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1065///
1066/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1067/// - not a constant,
1068/// - not castable to MzTimestamp,
1069/// - is null,
1070/// - contains an unmaterializable function,
1071/// - some other evaluation error occurs, e.g., a division by 0,
1072/// - contains aggregates, subqueries, parameters, or window function calls.
1073pub fn plan_as_of_or_up_to(
1074    scx: &StatementContext,
1075    mut expr: Expr<Aug>,
1076) -> Result<mz_repr::Timestamp, PlanError> {
1077    let scope = Scope::empty();
1078    let desc = RelationDesc::empty();
1079    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1080    // evaluated only once.)
1081    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1082    transform_ast::transform(scx, &mut expr)?;
1083    let ecx = &ExprContext {
1084        qcx: &qcx,
1085        name: "AS OF or UP TO",
1086        scope: &scope,
1087        relation_type: desc.typ(),
1088        allow_aggregates: false,
1089        allow_subqueries: false,
1090        allow_parameters: false,
1091        allow_windows: false,
1092    };
1093    let hir = plan_expr(ecx, &expr)?.cast_to(
1094        ecx,
1095        CastContext::Assignment,
1096        &SqlScalarType::MzTimestamp,
1097    )?;
1098    if hir.contains_unmaterializable() {
1099        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1100    }
1101    // At this point, we definitely have a constant expression:
1102    // - it can't contain any unmaterializable functions;
1103    // - it can't refer to any columns.
1104    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1105    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1106    // division by 0.
1107    let timestamp = hir
1108        .into_literal_mz_timestamp()
1109        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1110    Ok(timestamp)
1111}
1112
1113/// Plans an expression in the AS position of a `CREATE SECRET`.
1114pub fn plan_secret_as(
1115    scx: &StatementContext,
1116    mut expr: Expr<Aug>,
1117) -> Result<MirScalarExpr, PlanError> {
1118    let scope = Scope::empty();
1119    let desc = RelationDesc::empty();
1120    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1121
1122    transform_ast::transform(scx, &mut expr)?;
1123
1124    let ecx = &ExprContext {
1125        qcx: &qcx,
1126        name: "AS",
1127        scope: &scope,
1128        relation_type: desc.typ(),
1129        allow_aggregates: false,
1130        allow_subqueries: false,
1131        allow_parameters: false,
1132        allow_windows: false,
1133    };
1134    let expr = plan_expr(ecx, &expr)?
1135        .type_as(ecx, &SqlScalarType::Bytes)?
1136        .lower_uncorrelated(scx.catalog.system_vars())?;
1137    Ok(expr)
1138}
1139
1140/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1141pub fn plan_webhook_validate_using(
1142    scx: &StatementContext,
1143    validate_using: CreateWebhookSourceCheck<Aug>,
1144) -> Result<WebhookValidation, PlanError> {
1145    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1146
1147    let CreateWebhookSourceCheck {
1148        options,
1149        using: mut expr,
1150    } = validate_using;
1151
1152    let mut column_typs = vec![];
1153    let mut column_names = vec![];
1154
1155    let (bodies, headers, secrets) = options
1156        .map(|o| (o.bodies, o.headers, o.secrets))
1157        .unwrap_or_default();
1158
1159    // Append all of the bodies so they can be used in the expression.
1160    let mut body_tuples = vec![];
1161    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1162        let scalar_type = use_bytes
1163            .then_some(SqlScalarType::Bytes)
1164            .unwrap_or(SqlScalarType::String);
1165        let name = alias
1166            .map(|a| a.into_string())
1167            .unwrap_or_else(|| "body".to_string());
1168
1169        column_typs.push(SqlColumnType {
1170            scalar_type,
1171            nullable: false,
1172        });
1173        column_names.push(name);
1174
1175        // Store the column index so we can be sure to provide this body correctly.
1176        let column_idx = column_typs.len() - 1;
1177        // Double check we're consistent with column names.
1178        assert_eq!(
1179            column_idx,
1180            column_names.len() - 1,
1181            "body column names and types don't match"
1182        );
1183        body_tuples.push((column_idx, use_bytes));
1184    }
1185
1186    // Append all of the headers so they can be used in the expression.
1187    let mut header_tuples = vec![];
1188
1189    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1190        let value_type = use_bytes
1191            .then_some(SqlScalarType::Bytes)
1192            .unwrap_or(SqlScalarType::String);
1193        let name = alias
1194            .map(|a| a.into_string())
1195            .unwrap_or_else(|| "headers".to_string());
1196
1197        column_typs.push(SqlColumnType {
1198            scalar_type: SqlScalarType::Map {
1199                value_type: Box::new(value_type),
1200                custom_id: None,
1201            },
1202            nullable: false,
1203        });
1204        column_names.push(name);
1205
1206        // Store the column index so we can be sure to provide this body correctly.
1207        let column_idx = column_typs.len() - 1;
1208        // Double check we're consistent with column names.
1209        assert_eq!(
1210            column_idx,
1211            column_names.len() - 1,
1212            "header column names and types don't match"
1213        );
1214        header_tuples.push((column_idx, use_bytes));
1215    }
1216
1217    // Append all secrets so they can be used in the expression.
1218    let mut validation_secrets = vec![];
1219
1220    for CreateWebhookSourceSecret {
1221        secret,
1222        alias,
1223        use_bytes,
1224    } in secrets
1225    {
1226        // Either provide the secret to the validation expression as Bytes or a String.
1227        let scalar_type = use_bytes
1228            .then_some(SqlScalarType::Bytes)
1229            .unwrap_or(SqlScalarType::String);
1230
1231        column_typs.push(SqlColumnType {
1232            scalar_type,
1233            nullable: false,
1234        });
1235        let ResolvedItemName::Item {
1236            id,
1237            full_name: FullItemName { item, .. },
1238            ..
1239        } = secret
1240        else {
1241            return Err(PlanError::InvalidSecret(Box::new(secret)));
1242        };
1243
1244        // Plan the expression using the secret's alias, if one is provided.
1245        let name = if let Some(alias) = alias {
1246            alias.into_string()
1247        } else {
1248            item
1249        };
1250        column_names.push(name);
1251
1252        // Get the column index that corresponds for this secret, so we can make sure to provide the
1253        // secrets in the correct order during evaluation.
1254        let column_idx = column_typs.len() - 1;
1255        // Double check that our column names and types match.
1256        assert_eq!(
1257            column_idx,
1258            column_names.len() - 1,
1259            "column names and types don't match"
1260        );
1261
1262        validation_secrets.push(WebhookValidationSecret {
1263            id,
1264            column_idx,
1265            use_bytes,
1266        });
1267    }
1268
1269    let relation_typ = SqlRelationType::new(column_typs);
1270    let desc = RelationDesc::new(relation_typ, column_names.clone());
1271    let scope = Scope::from_source(None, column_names);
1272
1273    transform_ast::transform(scx, &mut expr)?;
1274
1275    let ecx = &ExprContext {
1276        qcx: &qcx,
1277        name: "CHECK",
1278        scope: &scope,
1279        relation_type: desc.typ(),
1280        allow_aggregates: false,
1281        allow_subqueries: false,
1282        allow_parameters: false,
1283        allow_windows: false,
1284    };
1285    let expr = plan_expr(ecx, &expr)?
1286        .type_as(ecx, &SqlScalarType::Bool)?
1287        .lower_uncorrelated(scx.catalog.system_vars())?;
1288    let validation = WebhookValidation {
1289        expression: expr,
1290        relation_desc: desc,
1291        bodies: body_tuples,
1292        headers: header_tuples,
1293        secrets: validation_secrets,
1294    };
1295    Ok(validation)
1296}
1297
1298pub fn plan_default_expr(
1299    scx: &StatementContext,
1300    expr: &Expr<Aug>,
1301    target_ty: &SqlScalarType,
1302) -> Result<HirScalarExpr, PlanError> {
1303    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1304    let ecx = &ExprContext {
1305        qcx: &qcx,
1306        name: "DEFAULT expression",
1307        scope: &Scope::empty(),
1308        relation_type: &SqlRelationType::empty(),
1309        allow_aggregates: false,
1310        allow_subqueries: false,
1311        allow_parameters: false,
1312        allow_windows: false,
1313    };
1314    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1315    Ok(hir)
1316}
1317
1318pub fn plan_params<'a>(
1319    scx: &'a StatementContext,
1320    params: Vec<Expr<Aug>>,
1321    desc: &StatementDesc,
1322) -> Result<Params, PlanError> {
1323    if params.len() != desc.param_types.len() {
1324        sql_bail!(
1325            "expected {} params, got {}",
1326            desc.param_types.len(),
1327            params.len()
1328        );
1329    }
1330
1331    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1332
1333    let mut datums = Row::default();
1334    let mut packer = datums.packer();
1335    let mut actual_types = Vec::new();
1336    let temp_storage = &RowArena::new();
1337    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1338        transform_ast::transform(scx, &mut expr)?;
1339
1340        let ecx = execute_expr_context(&qcx);
1341        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1342        let actual_ty = ecx.scalar_type(&ex);
1343        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1344            return Err(PlanError::WrongParameterType(
1345                i + 1,
1346                ecx.humanize_scalar_type(expected_ty, false),
1347                ecx.humanize_scalar_type(&actual_ty, false),
1348            ));
1349        }
1350        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1351        let evaled = ex.eval(&[], temp_storage)?;
1352        packer.push(evaled);
1353        actual_types.push(actual_ty);
1354    }
1355    Ok(Params {
1356        datums,
1357        execute_types: actual_types,
1358        expected_types: desc.param_types.clone(),
1359    })
1360}
1361
1362static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1363static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1364
1365/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1366pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1367    ExprContext {
1368        qcx,
1369        name: "EXECUTE",
1370        scope: &EXECUTE_CONTEXT_SCOPE,
1371        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1372        allow_aggregates: false,
1373        allow_subqueries: false,
1374        allow_parameters: false,
1375        allow_windows: false,
1376    }
1377}
1378
1379/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1380///
1381/// This is an assignment cast also in Postgres, see
1382/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1383pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1384    LazyLock::new(|| CastContext::Assignment);
1385
1386pub fn plan_index_exprs<'a>(
1387    scx: &'a StatementContext,
1388    on_desc: &RelationDesc,
1389    exprs: Vec<Expr<Aug>>,
1390) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1391    let scope = Scope::from_source(None, on_desc.iter_names());
1392    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1393
1394    let ecx = &ExprContext {
1395        qcx: &qcx,
1396        name: "CREATE INDEX",
1397        scope: &scope,
1398        relation_type: on_desc.typ(),
1399        allow_aggregates: false,
1400        allow_subqueries: false,
1401        allow_parameters: false,
1402        allow_windows: false,
1403    };
1404    let mut out = vec![];
1405    for mut expr in exprs {
1406        transform_ast::transform(scx, &mut expr)?;
1407        let expr = plan_expr_or_col_index(ecx, &expr)?;
1408        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1409        let repr_col_types: Vec<ReprColumnType> = on_desc
1410            .typ()
1411            .column_types
1412            .iter()
1413            .map(ReprColumnType::from)
1414            .collect();
1415        expr.reduce(&repr_col_types);
1416        out.push(expr);
1417    }
1418    Ok(out)
1419}
1420
1421fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1422    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1423        Some(column) => Ok(HirScalarExpr::column(column)),
1424        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1425    }
1426}
1427
1428fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1429    match e {
1430        Expr::Value(Value::Number(n)) => {
1431            let n = n.parse::<usize>().map_err(|e| {
1432                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1433            })?;
1434            if n < 1 || n > max {
1435                sql_bail!(
1436                    "column reference {} in {} is out of range (1 - {})",
1437                    n,
1438                    name,
1439                    max
1440                );
1441            }
1442            Ok(Some(n - 1))
1443        }
1444        _ => Ok(None),
1445    }
1446}
1447
1448struct PlannedQuery {
1449    expr: HirRelationExpr,
1450    scope: Scope,
1451    order_by: Vec<ColumnOrder>,
1452    limit: Option<HirScalarExpr>,
1453    /// `offset` is either
1454    /// - an Int64 literal
1455    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1456    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1457    ///   later.)
1458    offset: HirScalarExpr,
1459    project: Vec<usize>,
1460    group_size_hints: GroupSizeHints,
1461}
1462
1463fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1464    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1465}
1466
1467fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1468    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1469    // for the identifiers, so that they can be re-installed before returning.
1470    let cte_bindings = plan_ctes(qcx, q)?;
1471
1472    let limit = match &q.limit {
1473        None => None,
1474        Some(Limit {
1475            quantity,
1476            with_ties: false,
1477        }) => {
1478            let ecx = &ExprContext {
1479                qcx,
1480                name: "LIMIT",
1481                scope: &Scope::empty(),
1482                relation_type: &SqlRelationType::empty(),
1483                allow_aggregates: false,
1484                allow_subqueries: true,
1485                allow_parameters: true,
1486                allow_windows: false,
1487            };
1488            let limit = plan_expr(ecx, quantity)?;
1489            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491            let limit = if limit.is_constant() {
1492                let arena = RowArena::new();
1493                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1494
1495                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1496                // that the error happened in a LIMIT clause, so that we have better error msg for
1497                // something like `SELECT 5 LIMIT 'aaa'`.
1498                match limit.eval(&[], &arena)? {
1499                    d @ Datum::Int64(v) if v >= 0 => {
1500                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1501                    }
1502                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1503                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1504                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1505                }
1506            } else {
1507                // Gate non-constant LIMIT expressions behind a feature flag
1508                qcx.scx
1509                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1510                limit
1511            };
1512
1513            Some(limit)
1514        }
1515        Some(Limit {
1516            quantity: _,
1517            with_ties: true,
1518        }) => bail_unsupported!("FETCH ... WITH TIES"),
1519    };
1520
1521    let offset = match &q.offset {
1522        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1523        Some(offset) => {
1524            let ecx = &ExprContext {
1525                qcx,
1526                name: "OFFSET",
1527                scope: &Scope::empty(),
1528                relation_type: &SqlRelationType::empty(),
1529                allow_aggregates: false,
1530                allow_subqueries: false,
1531                allow_parameters: true,
1532                allow_windows: false,
1533            };
1534            let offset = plan_expr(ecx, offset)?;
1535            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1536
1537            let offset = if offset.is_constant() {
1538                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1539                let offset_value = offset_into_value(offset)?;
1540                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1541            } else {
1542                // The only case when this is allowed to not be a constant is if it contains
1543                // parameters. (In which case, we'll later check that it's a constant after
1544                // parameter binding.)
1545                if !offset.contains_parameters() {
1546                    return Err(PlanError::InvalidOffset(format!(
1547                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1548                        offset
1549                    )));
1550                }
1551                offset
1552            };
1553            offset
1554        }
1555    };
1556
1557    let mut planned_query = match &q.body {
1558        SetExpr::Select(s) => {
1559            // Extract query options.
1560            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1561            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1562
1563            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1564            PlannedQuery {
1565                expr: plan.expr,
1566                scope: plan.scope,
1567                order_by: plan.order_by,
1568                project: plan.project,
1569                limit,
1570                offset,
1571                group_size_hints,
1572            }
1573        }
1574        _ => {
1575            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1576            let ecx = &ExprContext {
1577                qcx,
1578                name: "ORDER BY clause of a set expression",
1579                scope: &scope,
1580                relation_type: &qcx.relation_type(&expr),
1581                allow_aggregates: false,
1582                allow_subqueries: true,
1583                allow_parameters: true,
1584                allow_windows: false,
1585            };
1586            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1587            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1588            let project = (0..ecx.relation_type.arity()).collect();
1589            PlannedQuery {
1590                expr: expr.map(map_exprs),
1591                scope,
1592                order_by,
1593                limit,
1594                project,
1595                offset,
1596                group_size_hints: GroupSizeHints::default(),
1597            }
1598        }
1599    };
1600
1601    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1602    match &q.ctes {
1603        CteBlock::Simple(_) => {
1604            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1605                if let Some(cte) = qcx.ctes.remove(&id) {
1606                    planned_query.expr = HirRelationExpr::Let {
1607                        name: cte.name,
1608                        id: id.clone(),
1609                        value: Box::new(value),
1610                        body: Box::new(planned_query.expr),
1611                    };
1612                }
1613                if let Some(shadowed_val) = shadowed_val {
1614                    qcx.ctes.insert(id, shadowed_val);
1615                }
1616            }
1617        }
1618        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1619            let MutRecBlockOptionExtracted {
1620                recursion_limit,
1621                return_at_recursion_limit,
1622                error_at_recursion_limit,
1623                seen: _,
1624            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1625            let limit = match (
1626                recursion_limit,
1627                return_at_recursion_limit,
1628                error_at_recursion_limit,
1629            ) {
1630                (None, None, None) => None,
1631                (Some(max_iters), None, None) => {
1632                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1633                }
1634                (None, Some(max_iters), None) => Some((max_iters, true)),
1635                (None, None, Some(max_iters)) => Some((max_iters, false)),
1636                _ => {
1637                    return Err(InvalidWmrRecursionLimit(
1638                        "More than one recursion limit given. \
1639                         Please give at most one of RECURSION LIMIT, \
1640                         ERROR AT RECURSION LIMIT, \
1641                         RETURN AT RECURSION LIMIT."
1642                            .to_owned(),
1643                    ));
1644                }
1645            }
1646            .try_map(|(max_iters, return_at_limit)| {
1647                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1648                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1649                        "Recursion limit has to be greater than 0.".to_owned(),
1650                    ))?,
1651                    return_at_limit: *return_at_limit,
1652                })
1653            })?;
1654
1655            let mut bindings = Vec::new();
1656            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1657                if let Some(cte) = qcx.ctes.remove(&id) {
1658                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1659                }
1660                if let Some(shadowed_val) = shadowed_val {
1661                    qcx.ctes.insert(id, shadowed_val);
1662                }
1663            }
1664            if !bindings.is_empty() {
1665                planned_query.expr = HirRelationExpr::LetRec {
1666                    limit,
1667                    bindings,
1668                    body: Box::new(planned_query.expr),
1669                }
1670            }
1671        }
1672    }
1673
1674    Ok(planned_query)
1675}
1676
1677/// Converts an OFFSET expression into a value.
1678pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1679    let offset = offset
1680        .try_into_literal_int64()
1681        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1682    if offset < 0 {
1683        return Err(PlanError::InvalidOffset(format!(
1684            "must not be negative, got {}",
1685            offset
1686        )));
1687    }
1688    Ok(offset)
1689}
1690
1691generate_extracted_config!(
1692    MutRecBlockOption,
1693    (RecursionLimit, u64),
1694    (ReturnAtRecursionLimit, u64),
1695    (ErrorAtRecursionLimit, u64)
1696);
1697
1698/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1699///
1700/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1701/// shadowed value that can be reinstalled once the planning has completed.
1702pub fn plan_ctes(
1703    qcx: &mut QueryContext,
1704    q: &Query<Aug>,
1705) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1706    // Accumulate planned expressions and shadowed descriptions.
1707    let mut result = Vec::new();
1708    // Retain the old descriptions of CTE bindings so that we can restore them
1709    // after we're done planning this SELECT.
1710    let mut shadowed_descs = BTreeMap::new();
1711
1712    // A reused identifier indicates a reused name.
1713    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1714        sql_bail!(
1715            "WITH query name {} specified more than once",
1716            normalize::ident_ref(ident).quoted()
1717        )
1718    }
1719
1720    match &q.ctes {
1721        CteBlock::Simple(ctes) => {
1722            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1723            for cte in ctes.iter() {
1724                let cte_name = normalize::ident(cte.alias.name.clone());
1725                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1726                let typ = qcx.relation_type(&val);
1727                let mut desc = RelationDesc::new(typ, scope.column_names());
1728                plan_utils::maybe_rename_columns(
1729                    format!("CTE {}", cte.alias.name),
1730                    &mut desc,
1731                    &cte.alias.columns,
1732                )?;
1733                // Capture the prior value if it exists, so that it can be re-installed.
1734                let shadowed = qcx.ctes.insert(
1735                    cte.id,
1736                    CteDesc {
1737                        name: cte_name,
1738                        desc,
1739                    },
1740                );
1741
1742                result.push((cte.id, val, shadowed));
1743            }
1744        }
1745        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1746            // Insert column types into `qcx.ctes` first for recursive bindings.
1747            for cte in ctes.iter() {
1748                let cte_name = normalize::ident(cte.name.clone());
1749                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1750                for column in cte.columns.iter() {
1751                    desc_columns.push((
1752                        normalize::column_name(column.name.clone()),
1753                        SqlColumnType {
1754                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1755                            nullable: true,
1756                        },
1757                    ));
1758                }
1759                let desc = RelationDesc::from_names_and_types(desc_columns);
1760                let shadowed = qcx.ctes.insert(
1761                    cte.id,
1762                    CteDesc {
1763                        name: cte_name,
1764                        desc,
1765                    },
1766                );
1767                // Capture the prior value if it exists, so that it can be re-installed.
1768                if let Some(shadowed) = shadowed {
1769                    shadowed_descs.insert(cte.id, shadowed);
1770                }
1771            }
1772
1773            // Plan all CTEs and validate the proposed types.
1774            for cte in ctes.iter() {
1775                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1776
1777                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1778
1779                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1780                    // Once WMR CTEs support NOT NULL constraints, check that
1781                    // nullability of derived column types are compatible.
1782                    sql_bail!(
1783                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1784                    );
1785                }
1786
1787                if !proposed_typ.keys.is_empty() {
1788                    // Once WMR CTEs support keys, check that keys exactly
1789                    // overlap.
1790                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1791                }
1792
1793                // Validate that the derived and proposed types are the same.
1794                let derived_typ = qcx.relation_type(&val);
1795
1796                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1797                    let cte_name = normalize::ident(cte.name.clone());
1798                    let proposed_typ = proposed_typ
1799                        .column_types
1800                        .iter()
1801                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1802                        .collect::<Vec<_>>();
1803                    let inferred_typ = derived_typ
1804                        .column_types
1805                        .iter()
1806                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1807                        .collect::<Vec<_>>();
1808                    Err(PlanError::RecursiveTypeMismatch(
1809                        cte_name,
1810                        proposed_typ,
1811                        inferred_typ,
1812                    ))
1813                };
1814
1815                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1816                    return type_err(proposed_typ, derived_typ);
1817                }
1818
1819                // Cast derived types to proposed types or error.
1820                let val = match cast_relation(
1821                    qcx,
1822                    // Choose `CastContext::Assignment`` because the user has
1823                    // been explicit about the types they expect. Choosing
1824                    // `CastContext::Implicit` is not "strong" enough to impose
1825                    // typmods from proposed types onto values.
1826                    CastContext::Assignment,
1827                    val,
1828                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1829                ) {
1830                    Ok(val) => val,
1831                    Err(_) => return type_err(proposed_typ, derived_typ),
1832                };
1833
1834                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1835            }
1836        }
1837    }
1838
1839    Ok(result)
1840}
1841
1842pub fn plan_nested_query(
1843    qcx: &mut QueryContext,
1844    q: &Query<Aug>,
1845) -> Result<(HirRelationExpr, Scope), PlanError> {
1846    let PlannedQuery {
1847        mut expr,
1848        scope,
1849        order_by,
1850        limit,
1851        offset,
1852        project,
1853        group_size_hints,
1854    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1855    if limit.is_some()
1856        || !offset
1857            .clone()
1858            .try_into_literal_int64()
1859            .is_ok_and(|offset| offset == 0)
1860    {
1861        expr = HirRelationExpr::top_k(
1862            expr,
1863            vec![],
1864            order_by,
1865            limit,
1866            offset,
1867            group_size_hints.limit_input_group_size,
1868        );
1869    }
1870    Ok((expr.project(project), scope))
1871}
1872
1873fn plan_set_expr(
1874    qcx: &mut QueryContext,
1875    q: &SetExpr<Aug>,
1876) -> Result<(HirRelationExpr, Scope), PlanError> {
1877    match q {
1878        SetExpr::Select(select) => {
1879            let order_by_exprs = Vec::new();
1880            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1881            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1882            // should not have planned any ordering.
1883            assert!(plan.order_by.is_empty());
1884            Ok((plan.expr.project(plan.project), plan.scope))
1885        }
1886        SetExpr::SetOperation {
1887            op,
1888            all,
1889            left,
1890            right,
1891        } => {
1892            // Plan the LHS and RHS.
1893            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1894            let (right_expr, right_scope) =
1895                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1896
1897            // Validate that the LHS and RHS are the same width.
1898            let left_type = qcx.relation_type(&left_expr);
1899            let right_type = qcx.relation_type(&right_expr);
1900            if left_type.arity() != right_type.arity() {
1901                sql_bail!(
1902                    "each {} query must have the same number of columns: {} vs {}",
1903                    op,
1904                    left_type.arity(),
1905                    right_type.arity(),
1906                );
1907            }
1908
1909            // Match the types of the corresponding columns on the LHS and RHS
1910            // using the normal type coercion rules. This is equivalent to
1911            // `coerce_homogeneous_exprs`, but implemented in terms of
1912            // `HirRelationExpr` rather than `HirScalarExpr`.
1913            let left_ecx = &ExprContext {
1914                qcx,
1915                name: &op.to_string(),
1916                scope: &left_scope,
1917                relation_type: &left_type,
1918                allow_aggregates: false,
1919                allow_subqueries: false,
1920                allow_parameters: false,
1921                allow_windows: false,
1922            };
1923            let right_ecx = &ExprContext {
1924                qcx,
1925                name: &op.to_string(),
1926                scope: &right_scope,
1927                relation_type: &right_type,
1928                allow_aggregates: false,
1929                allow_subqueries: false,
1930                allow_parameters: false,
1931                allow_windows: false,
1932            };
1933            let mut left_casts = vec![];
1934            let mut right_casts = vec![];
1935            for (i, (left_type, right_type)) in left_type
1936                .column_types
1937                .iter()
1938                .zip_eq(right_type.column_types.iter())
1939                .enumerate()
1940            {
1941                let types = &[
1942                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1943                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1944                ];
1945                let target =
1946                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1947                match typeconv::plan_cast(
1948                    left_ecx,
1949                    CastContext::Implicit,
1950                    HirScalarExpr::column(i),
1951                    &target,
1952                ) {
1953                    Ok(expr) => left_casts.push(expr),
1954                    Err(_) => sql_bail!(
1955                        "{} types {} and {} cannot be matched",
1956                        op,
1957                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1958                        qcx.humanize_scalar_type(&target, false),
1959                    ),
1960                }
1961                match typeconv::plan_cast(
1962                    right_ecx,
1963                    CastContext::Implicit,
1964                    HirScalarExpr::column(i),
1965                    &target,
1966                ) {
1967                    Ok(expr) => right_casts.push(expr),
1968                    Err(_) => sql_bail!(
1969                        "{} types {} and {} cannot be matched",
1970                        op,
1971                        qcx.humanize_scalar_type(&target, false),
1972                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1973                    ),
1974                }
1975            }
1976            let lhs = if left_casts
1977                .iter()
1978                .enumerate()
1979                .any(|(i, e)| e != &HirScalarExpr::column(i))
1980            {
1981                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1982                left_expr.map(left_casts).project(project_key)
1983            } else {
1984                left_expr
1985            };
1986            let rhs = if right_casts
1987                .iter()
1988                .enumerate()
1989                .any(|(i, e)| e != &HirScalarExpr::column(i))
1990            {
1991                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1992                right_expr.map(right_casts).project(project_key)
1993            } else {
1994                right_expr
1995            };
1996
1997            let relation_expr = match op {
1998                SetOperator::Union => {
1999                    if *all {
2000                        lhs.union(rhs)
2001                    } else {
2002                        lhs.union(rhs).distinct()
2003                    }
2004                }
2005                SetOperator::Except => Hir::except(all, lhs, rhs),
2006                SetOperator::Intersect => {
2007                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
2008                    // Though we believe that render() does The Right Thing (TM)
2009                    // Also note that we do *not* need another threshold() at the end of the method chain
2010                    // because the right-hand side of the outer union only produces existing records,
2011                    // i.e., the record counts for differential data flow definitely remain non-negative.
2012                    let left_clone = lhs.clone();
2013                    if *all {
2014                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2015                    } else {
2016                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2017                            .distinct()
2018                    }
2019                }
2020            };
2021            let scope = Scope::from_source(
2022                None,
2023                // Column names are taken from the left, as in Postgres.
2024                left_scope.column_names(),
2025            );
2026
2027            Ok((relation_expr, scope))
2028        }
2029        SetExpr::Values(Values(values)) => plan_values(qcx, values),
2030        SetExpr::Table(name) => {
2031            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2032            Ok((expr, scope))
2033        }
2034        SetExpr::Query(query) => {
2035            let (expr, scope) = plan_nested_query(qcx, query)?;
2036            Ok((expr, scope))
2037        }
2038        SetExpr::Show(stmt) => {
2039            // The create SQL definition of involving this query, will have the explicit `SHOW`
2040            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2041            // current schema of the executing user. When Materialize restarts and tries to re-plan
2042            // these queries, it will only have access to the raw `SHOW` command and have no idea
2043            // what schema to use. As a result Materialize will fail to boot.
2044            //
2045            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2046            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2047            // However, banning show commands in views gives us more flexibility to change their
2048            // output.
2049            //
2050            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2051            // with all show commands expanded into their equivalent SELECT statements.
2052            if !qcx.lifetime.allow_show() {
2053                return Err(PlanError::ShowCommandInView);
2054            }
2055
2056            // Some SHOW statements are a SELECT query. Others produces Rows
2057            // directly. Convert both of these to the needed Hir and Scope.
2058            fn to_hirscope(
2059                plan: ShowCreatePlan,
2060                desc: StatementDesc,
2061            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2062                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2063                let desc = desc.relation_desc.expect("must exist");
2064                let scope = Scope::from_source(None, desc.iter_names());
2065                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2066                Ok((expr, scope))
2067            }
2068
2069            match stmt.clone() {
2070                ShowStatement::ShowColumns(stmt) => {
2071                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2072                }
2073                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2074                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2075                    show::describe_show_create_connection(qcx.scx, stmt)?,
2076                ),
2077                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2078                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2079                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2080                ),
2081                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2082                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2083                    show::describe_show_create_index(qcx.scx, stmt)?,
2084                ),
2085                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2086                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2087                    show::describe_show_create_sink(qcx.scx, stmt)?,
2088                ),
2089                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2090                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2091                    show::describe_show_create_source(qcx.scx, stmt)?,
2092                ),
2093                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2094                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2095                    show::describe_show_create_table(qcx.scx, stmt)?,
2096                ),
2097                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2098                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2099                    show::describe_show_create_view(qcx.scx, stmt)?,
2100                ),
2101                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2102                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2103                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2104                ),
2105                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2106                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2107                    show::describe_show_create_type(qcx.scx, stmt)?,
2108                ),
2109                ShowStatement::ShowObjects(stmt) => {
2110                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2111                }
2112                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2113                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2114            }
2115        }
2116    }
2117}
2118
2119/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2120fn plan_values(
2121    qcx: &QueryContext,
2122    values: &[Vec<Expr<Aug>>],
2123) -> Result<(HirRelationExpr, Scope), PlanError> {
2124    assert!(!values.is_empty());
2125
2126    let ecx = &ExprContext {
2127        qcx,
2128        name: "VALUES",
2129        scope: &Scope::empty(),
2130        relation_type: &SqlRelationType::empty(),
2131        allow_aggregates: false,
2132        allow_subqueries: true,
2133        allow_parameters: true,
2134        allow_windows: false,
2135    };
2136
2137    let ncols = values[0].len();
2138    let nrows = values.len();
2139
2140    // Arrange input expressions by columns, not rows, so that we can
2141    // call `coerce_homogeneous_exprs` on each column.
2142    let mut cols = vec![vec![]; ncols];
2143    for row in values {
2144        if row.len() != ncols {
2145            sql_bail!(
2146                "VALUES expression has varying number of columns: {} vs {}",
2147                row.len(),
2148                ncols
2149            );
2150        }
2151        for (i, v) in row.iter().enumerate() {
2152            cols[i].push(v);
2153        }
2154    }
2155
2156    // Plan each column.
2157    let mut col_iters = Vec::with_capacity(ncols);
2158    let mut col_types = Vec::with_capacity(ncols);
2159    for col in &cols {
2160        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2161        let mut col_type = ecx.column_type(&col[0]);
2162        for val in &col[1..] {
2163            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2164        }
2165        col_types.push(col_type);
2166        col_iters.push(col.into_iter());
2167    }
2168
2169    // Build constant relation.
2170    let mut exprs = vec![];
2171    for _ in 0..nrows {
2172        for i in 0..ncols {
2173            exprs.push(col_iters[i].next().unwrap());
2174        }
2175    }
2176    let out = HirRelationExpr::CallTable {
2177        func: TableFunc::Wrap {
2178            width: ncols,
2179            types: col_types,
2180        },
2181        exprs,
2182    };
2183
2184    // Build column names.
2185    let mut scope = Scope::empty();
2186    for i in 0..ncols {
2187        let name = format!("column{}", i + 1);
2188        scope.items.push(ScopeItem::from_column_name(name));
2189    }
2190
2191    Ok((out, scope))
2192}
2193
2194/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2195/// statement.
2196///
2197/// This is special-cased in PostgreSQL and different enough from `plan_values`
2198/// that it is easier to use a separate function entirely. Unlike a normal
2199/// `VALUES` clause, each value is coerced to the type of the target table
2200/// via an assignment cast.
2201///
2202/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2203fn plan_values_insert(
2204    qcx: &QueryContext,
2205    target_names: &[&ColumnName],
2206    target_types: &[&SqlScalarType],
2207    values: &[Vec<Expr<Aug>>],
2208) -> Result<HirRelationExpr, PlanError> {
2209    assert!(!values.is_empty());
2210
2211    if !values.iter().map(|row| row.len()).all_equal() {
2212        sql_bail!("VALUES lists must all be the same length");
2213    }
2214
2215    let ecx = &ExprContext {
2216        qcx,
2217        name: "VALUES",
2218        scope: &Scope::empty(),
2219        relation_type: &SqlRelationType::empty(),
2220        allow_aggregates: false,
2221        allow_subqueries: true,
2222        allow_parameters: true,
2223        allow_windows: false,
2224    };
2225
2226    let mut exprs = vec![];
2227    let mut types = vec![];
2228    for row in values {
2229        if row.len() > target_names.len() {
2230            sql_bail!("INSERT has more expressions than target columns");
2231        }
2232        for (column, val) in row.into_iter().enumerate() {
2233            let target_type = &target_types[column];
2234            let val = plan_expr(ecx, val)?;
2235            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2236            let source_type = &ecx.scalar_type(&val);
2237            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2238                Ok(val) => val,
2239                Err(_) => sql_bail!(
2240                    "column {} is of type {} but expression is of type {}",
2241                    target_names[column].quoted(),
2242                    qcx.humanize_scalar_type(target_type, false),
2243                    qcx.humanize_scalar_type(source_type, false),
2244                ),
2245            };
2246            if column >= types.len() {
2247                types.push(ecx.column_type(&val));
2248            } else {
2249                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2250            }
2251            exprs.push(val);
2252        }
2253    }
2254
2255    Ok(HirRelationExpr::CallTable {
2256        func: TableFunc::Wrap {
2257            width: values[0].len(),
2258            types,
2259        },
2260        exprs,
2261    })
2262}
2263
2264fn plan_join_identity() -> (HirRelationExpr, Scope) {
2265    let typ = SqlRelationType::new(vec![]);
2266    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2267    let scope = Scope::empty();
2268    (expr, scope)
2269}
2270
2271/// Describes how to execute a SELECT query.
2272///
2273/// `order_by` describes how to order the rows in `expr` *before* applying the
2274/// projection. The `scope` describes the columns in `expr` *after* the
2275/// projection has been applied.
2276#[derive(Debug)]
2277struct SelectPlan {
2278    expr: HirRelationExpr,
2279    scope: Scope,
2280    order_by: Vec<ColumnOrder>,
2281    project: Vec<usize>,
2282}
2283
2284generate_extracted_config!(
2285    SelectOption,
2286    (ExpectedGroupSize, u64),
2287    (AggregateInputGroupSize, u64),
2288    (DistinctOnInputGroupSize, u64),
2289    (LimitInputGroupSize, u64)
2290);
2291
2292/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2293///
2294/// Normally, the ORDER BY clause occurs after the columns specified in the
2295/// SELECT list have been projected. In a query like
2296///
2297///   CREATE TABLE (a int, b int)
2298///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2299///
2300/// it is valid to refer to `a`, because it is explicitly selected, but it would
2301/// not be valid to refer to unselected column `b`.
2302///
2303/// But PostgreSQL extends the standard to permit queries like
2304///
2305///   SELECT a FROM t ORDER BY b
2306///
2307/// where expressions in the ORDER BY clause can refer to *both* input columns
2308/// and output columns.
2309fn plan_select_from_where(
2310    qcx: &QueryContext,
2311    mut s: Select<Aug>,
2312    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2313) -> Result<SelectPlan, PlanError> {
2314    // TODO: Both `s` and `order_by_exprs` are not references because the
2315    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2316    // table function support (the UUID mapping). Attempt to change this so callers
2317    // don't need to clone the Select.
2318
2319    // Extract query options.
2320    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2321    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2322
2323    // Step 1. Handle FROM clause, including joins.
2324    let (mut relation_expr, mut from_scope) =
2325        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2326            let (left, left_scope) = l;
2327            plan_join(
2328                qcx,
2329                left,
2330                left_scope,
2331                &Join {
2332                    relation: TableFactor::NestedJoin {
2333                        join: Box::new(twj.clone()),
2334                        alias: None,
2335                    },
2336                    join_operator: JoinOperator::CrossJoin,
2337                },
2338            )
2339        })?;
2340
2341    // Step 2. Handle WHERE clause.
2342    if let Some(selection) = &s.selection {
2343        let ecx = &ExprContext {
2344            qcx,
2345            name: "WHERE clause",
2346            scope: &from_scope,
2347            relation_type: &qcx.relation_type(&relation_expr),
2348            allow_aggregates: false,
2349            allow_subqueries: true,
2350            allow_parameters: true,
2351            allow_windows: false,
2352        };
2353        let expr = plan_expr(ecx, selection)
2354            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2355            .type_as(ecx, &SqlScalarType::Bool)?;
2356        relation_expr = relation_expr.filter(vec![expr]);
2357    }
2358
2359    // Step 3. Gather aggregates and table functions.
2360    // (But skip window aggregates.)
2361    let (aggregates, table_funcs) = {
2362        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2363        visitor.visit_select_mut(&mut s);
2364        for o in order_by_exprs.iter_mut() {
2365            visitor.visit_order_by_expr_mut(o);
2366        }
2367        visitor.into_result()?
2368    };
2369    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2370    if !table_funcs.is_empty() {
2371        let (expr, scope) = plan_scalar_table_funcs(
2372            qcx,
2373            table_funcs,
2374            &mut table_func_names,
2375            &relation_expr,
2376            &from_scope,
2377        )?;
2378        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2379        from_scope = from_scope.product(scope)?;
2380    }
2381
2382    // Step 4. Expand SELECT clause.
2383    let projection = {
2384        let ecx = &ExprContext {
2385            qcx,
2386            name: "SELECT clause",
2387            scope: &from_scope,
2388            relation_type: &qcx.relation_type(&relation_expr),
2389            allow_aggregates: true,
2390            allow_subqueries: true,
2391            allow_parameters: true,
2392            allow_windows: true,
2393        };
2394        let mut out = vec![];
2395        for si in &s.projection {
2396            if *si == SelectItem::Wildcard && s.from.is_empty() {
2397                sql_bail!("SELECT * with no tables specified is not valid");
2398            }
2399            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2400        }
2401        out
2402    };
2403
2404    // Step 5. Handle GROUP BY clause.
2405    // This will also plan the aggregates gathered in Step 3.
2406    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2407    let (mut group_scope, select_all_mapping) = {
2408        // Compute GROUP BY expressions.
2409        let ecx = &ExprContext {
2410            qcx,
2411            name: "GROUP BY clause",
2412            scope: &from_scope,
2413            relation_type: &qcx.relation_type(&relation_expr),
2414            allow_aggregates: false,
2415            allow_subqueries: true,
2416            allow_parameters: true,
2417            allow_windows: false,
2418        };
2419        let mut group_key = vec![];
2420        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2421        let mut group_hir_exprs = vec![];
2422        let mut group_scope = Scope::empty();
2423        let mut select_all_mapping = BTreeMap::new();
2424
2425        for group_expr in &s.group_by {
2426            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2427            let new_column = group_key.len();
2428
2429            if let Some(group_expr) = group_expr {
2430                // Multiple AST expressions can map to the same HIR expression.
2431                // If we already have a ScopeItem for this HIR, we can add this
2432                // next AST expression to its set
2433                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2434                    existing_scope_item.exprs.insert(group_expr.clone());
2435                    continue;
2436                }
2437            }
2438
2439            let mut scope_item = if let HirScalarExpr::Column(
2440                ColumnRef {
2441                    level: 0,
2442                    column: old_column,
2443                },
2444                _name,
2445            ) = &expr
2446            {
2447                // If we later have `SELECT foo.*` then we have to find all
2448                // the `foo` items in `from_scope` and figure out where they
2449                // ended up in `group_scope`. This is really hard to do
2450                // right using SQL name resolution, so instead we just track
2451                // the movement here.
2452                select_all_mapping.insert(*old_column, new_column);
2453                let scope_item = ecx.scope.items[*old_column].clone();
2454                scope_item
2455            } else {
2456                ScopeItem::empty()
2457            };
2458
2459            if let Some(group_expr) = group_expr.cloned() {
2460                scope_item.exprs.insert(group_expr);
2461            }
2462
2463            group_key.push(from_scope.len() + group_exprs.len());
2464            group_hir_exprs.push(expr.clone());
2465            group_exprs.insert(expr, scope_item);
2466        }
2467
2468        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2469        for expr in &group_hir_exprs {
2470            if let Some(scope_item) = group_exprs.remove(expr) {
2471                group_scope.items.push(scope_item);
2472            }
2473        }
2474
2475        // Plan aggregates.
2476        let ecx = &ExprContext {
2477            qcx,
2478            name: "aggregate function",
2479            scope: &from_scope,
2480            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2481            allow_aggregates: false,
2482            allow_subqueries: true,
2483            allow_parameters: true,
2484            allow_windows: false,
2485        };
2486        let mut agg_exprs = vec![];
2487        for sql_function in aggregates {
2488            if sql_function.over.is_some() {
2489                unreachable!(
2490                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2491                );
2492            }
2493            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2494            group_scope
2495                .items
2496                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2497        }
2498        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2499            // apply GROUP BY / aggregates
2500            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2501                group_key,
2502                agg_exprs,
2503                group_size_hints.aggregate_input_group_size,
2504            );
2505
2506            // For every old column that wasn't a group key, add a scope item
2507            // that errors when referenced. We can't simply drop these items
2508            // from scope. These items need to *exist* because they might shadow
2509            // variables in outer scopes that would otherwise be valid to
2510            // reference, but accessing them needs to produce an error.
2511            for i in 0..from_scope.len() {
2512                if !select_all_mapping.contains_key(&i) {
2513                    let scope_item = &ecx.scope.items[i];
2514                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2515                        table_name: scope_item.table_name.clone(),
2516                        column_name: scope_item.column_name.clone(),
2517                        allow_unqualified_references: scope_item.allow_unqualified_references,
2518                    });
2519                }
2520            }
2521
2522            (group_scope, select_all_mapping)
2523        } else {
2524            // if no GROUP BY, aggregates or having then all columns remain in scope
2525            (
2526                from_scope.clone(),
2527                (0..from_scope.len()).map(|i| (i, i)).collect(),
2528            )
2529        }
2530    };
2531
2532    // Step 6. Handle HAVING clause.
2533    if let Some(ref having) = s.having {
2534        let ecx = &ExprContext {
2535            qcx,
2536            name: "HAVING clause",
2537            scope: &group_scope,
2538            relation_type: &qcx.relation_type(&relation_expr),
2539            allow_aggregates: true,
2540            allow_subqueries: true,
2541            allow_parameters: true,
2542            allow_windows: false,
2543        };
2544        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2545        relation_expr = relation_expr.filter(vec![expr]);
2546    }
2547
2548    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2549    // (This includes window aggregations.)
2550    //
2551    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2552    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2553    //
2554    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2555    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2556    // and can't be part of a bigger expression.
2557    // See https://www.postgresql.org/docs/current/queries-order.html:
2558    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2559    // expression"
2560    let window_funcs = {
2561        let mut visitor = WindowFuncCollector::default();
2562        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2563        // window functions are excluded from other things by `allow_windows` being false when
2564        // planning those before this code).
2565        visitor.visit_select(&s);
2566        for o in order_by_exprs.iter() {
2567            visitor.visit_order_by_expr(o);
2568        }
2569        visitor.into_result()
2570    };
2571    for window_func in window_funcs {
2572        let ecx = &ExprContext {
2573            qcx,
2574            name: "window function",
2575            scope: &group_scope,
2576            relation_type: &qcx.relation_type(&relation_expr),
2577            allow_aggregates: true,
2578            allow_subqueries: true,
2579            allow_parameters: true,
2580            allow_windows: true,
2581        };
2582        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2583        group_scope.items.push(ScopeItem::from_expr(window_func));
2584    }
2585    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2586    // been already planned now. However, we should still set `allow_windows: true` for the
2587    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2588    // msg if an OVER clause is missing from a window function.
2589
2590    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2591    if let Some(ref qualify) = s.qualify {
2592        let ecx = &ExprContext {
2593            qcx,
2594            name: "QUALIFY clause",
2595            scope: &group_scope,
2596            relation_type: &qcx.relation_type(&relation_expr),
2597            allow_aggregates: true,
2598            allow_subqueries: true,
2599            allow_parameters: true,
2600            allow_windows: true,
2601        };
2602        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2603        relation_expr = relation_expr.filter(vec![expr]);
2604    }
2605
2606    // Step 9. Handle SELECT clause.
2607    let output_columns = {
2608        let mut new_exprs = vec![];
2609        let mut new_type = qcx.relation_type(&relation_expr);
2610        let mut output_columns = vec![];
2611        for (select_item, column_name) in &projection {
2612            let ecx = &ExprContext {
2613                qcx,
2614                name: "SELECT clause",
2615                scope: &group_scope,
2616                relation_type: &new_type,
2617                allow_aggregates: true,
2618                allow_subqueries: true,
2619                allow_parameters: true,
2620                allow_windows: true,
2621            };
2622            let expr = match select_item {
2623                ExpandedSelectItem::InputOrdinal(i) => {
2624                    if let Some(column) = select_all_mapping.get(i).copied() {
2625                        HirScalarExpr::column(column)
2626                    } else {
2627                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2628                    }
2629                }
2630                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2631            };
2632            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2633                // Simple column reference; no need to map on a new expression.
2634                output_columns.push((column, column_name));
2635            } else {
2636                // Complicated expression that requires a map expression. We
2637                // update `group_scope` as we go so that future expressions that
2638                // are textually identical to this one can reuse it. This
2639                // duplicate detection is required for proper determination of
2640                // ambiguous column references with SQL92-style `ORDER BY`
2641                // items. See `plan_order_by_or_distinct_expr` for more.
2642                let typ = ecx.column_type(&expr);
2643                new_type.column_types.push(typ);
2644                new_exprs.push(expr);
2645                output_columns.push((group_scope.len(), column_name));
2646                group_scope
2647                    .items
2648                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2649            }
2650        }
2651        relation_expr = relation_expr.map(new_exprs);
2652        output_columns
2653    };
2654    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2655
2656    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2657    let order_by = {
2658        let relation_type = qcx.relation_type(&relation_expr);
2659        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2660            &ExprContext {
2661                qcx,
2662                name: "ORDER BY clause",
2663                scope: &group_scope,
2664                relation_type: &relation_type,
2665                allow_aggregates: true,
2666                allow_subqueries: true,
2667                allow_parameters: true,
2668                allow_windows: true,
2669            },
2670            &order_by_exprs,
2671            &output_columns,
2672        )?;
2673
2674        match s.distinct {
2675            None => relation_expr = relation_expr.map(map_exprs),
2676            Some(Distinct::EntireRow) => {
2677                if relation_type.arity() == 0 {
2678                    sql_bail!("SELECT DISTINCT must have at least one column");
2679                }
2680                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2681                // list, so we can't proceed if `ORDER BY` has introduced any
2682                // columns for arbitrary expressions. This matches PostgreSQL.
2683                if !try_push_projection_order_by(
2684                    &mut relation_expr,
2685                    &mut project_key,
2686                    &mut order_by,
2687                ) {
2688                    sql_bail!(
2689                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2690                    );
2691                }
2692                assert!(map_exprs.is_empty());
2693                relation_expr = relation_expr.distinct();
2694            }
2695            Some(Distinct::On(exprs)) => {
2696                let ecx = &ExprContext {
2697                    qcx,
2698                    name: "DISTINCT ON clause",
2699                    scope: &group_scope,
2700                    relation_type: &qcx.relation_type(&relation_expr),
2701                    allow_aggregates: true,
2702                    allow_subqueries: true,
2703                    allow_parameters: true,
2704                    allow_windows: true,
2705                };
2706
2707                let mut distinct_exprs = vec![];
2708                for expr in &exprs {
2709                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2710                    distinct_exprs.push(expr);
2711                }
2712
2713                let mut distinct_key = vec![];
2714
2715                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2716                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2717                // expressions, though the order of `DISTINCT ON` expressions
2718                // does not matter. This matches PostgreSQL and leaves the door
2719                // open to a future optimization where the `DISTINCT ON` and
2720                // `ORDER BY` operations happen in one pass.
2721                //
2722                // On the bright side, any columns that have already been
2723                // computed by `ORDER BY` can be reused in the distinct key.
2724                let arity = relation_type.arity();
2725                for ord in order_by.iter().take(distinct_exprs.len()) {
2726                    // The unusual construction of `expr` here is to ensure the
2727                    // temporary column expression lives long enough.
2728                    let mut expr = &HirScalarExpr::column(ord.column);
2729                    if ord.column >= arity {
2730                        expr = &map_exprs[ord.column - arity];
2731                    };
2732                    match distinct_exprs.iter().position(move |e| e == expr) {
2733                        None => sql_bail!(
2734                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2735                        ),
2736                        Some(pos) => {
2737                            distinct_exprs.remove(pos);
2738                        }
2739                    }
2740                    distinct_key.push(ord.column);
2741                }
2742
2743                // Add any remaining `DISTINCT ON` expressions to the key.
2744                for expr in distinct_exprs {
2745                    // If the expression is a reference to an existing column,
2746                    // do not introduce a new column to support it.
2747                    let column = match expr {
2748                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2749                        _ => {
2750                            map_exprs.push(expr);
2751                            arity + map_exprs.len() - 1
2752                        }
2753                    };
2754                    distinct_key.push(column);
2755                }
2756
2757                // `DISTINCT ON` is semantically a TopK with limit 1. The
2758                // columns in `ORDER BY` that are not part of the distinct key,
2759                // if there are any, determine the ordering within each group,
2760                // per PostgreSQL semantics.
2761                let distinct_len = distinct_key.len();
2762                relation_expr = HirRelationExpr::top_k(
2763                    relation_expr.map(map_exprs),
2764                    distinct_key,
2765                    order_by.iter().skip(distinct_len).cloned().collect(),
2766                    Some(HirScalarExpr::literal(
2767                        Datum::Int64(1),
2768                        SqlScalarType::Int64,
2769                    )),
2770                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2771                    group_size_hints.distinct_on_input_group_size,
2772                );
2773            }
2774        }
2775
2776        order_by
2777    };
2778
2779    // Construct a clean scope to expose outwards, where all of the state that
2780    // accumulated in the scope during planning of this SELECT is erased. The
2781    // clean scope has at most one name for each column, and the names are not
2782    // associated with any table.
2783    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2784
2785    Ok(SelectPlan {
2786        expr: relation_expr,
2787        scope,
2788        order_by,
2789        project: project_key,
2790    })
2791}
2792
2793fn plan_scalar_table_funcs(
2794    qcx: &QueryContext,
2795    table_funcs: BTreeMap<Function<Aug>, String>,
2796    table_func_names: &mut BTreeMap<String, Ident>,
2797    relation_expr: &HirRelationExpr,
2798    from_scope: &Scope,
2799) -> Result<(HirRelationExpr, Scope), PlanError> {
2800    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2801
2802    for (table_func, id) in table_funcs.iter() {
2803        table_func_names.insert(
2804            id.clone(),
2805            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2806            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2807        );
2808    }
2809    // If there's only a single table function, we can skip generating
2810    // ordinality columns.
2811    if table_funcs.len() == 1 {
2812        let (table_func, id) = table_funcs.iter().next().unwrap();
2813        let (expr, mut scope) =
2814            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2815
2816        // A single table-function might return several columns as a record
2817        let num_cols = scope.len();
2818        for i in 0..scope.len() {
2819            scope.items[i].table_name = Some(PartialItemName {
2820                database: None,
2821                schema: None,
2822                item: id.clone(),
2823            });
2824            scope.items[i].from_single_column_function = num_cols == 1;
2825            scope.items[i].allow_unqualified_references = false;
2826        }
2827        return Ok((expr, scope));
2828    }
2829    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2830    let (expr, mut scope, num_cols) =
2831        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2832
2833    // Munge the scope so table names match with the generated ids.
2834    let mut i = 0;
2835    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2836        for _ in 0..num_cols {
2837            scope.items[i].table_name = Some(PartialItemName {
2838                database: None,
2839                schema: None,
2840                item: id.clone(),
2841            });
2842            scope.items[i].from_single_column_function = num_cols == 1;
2843            scope.items[i].allow_unqualified_references = false;
2844            i += 1;
2845        }
2846        // Ordinality column. This doubles as the
2847        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2848        // because it only needs to be NULL or not.
2849        scope.items[i].table_name = Some(PartialItemName {
2850            database: None,
2851            schema: None,
2852            item: id.clone(),
2853        });
2854        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2855        scope.items[i].allow_unqualified_references = false;
2856        i += 1;
2857    }
2858    // Coalesced ordinality column.
2859    scope.items[i].allow_unqualified_references = false;
2860    Ok((expr, scope))
2861}
2862
2863/// Plans an expression in a `GROUP BY` clause.
2864///
2865/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2866/// names/expressions defined in the `SELECT` clause. These special cases are
2867/// handled by this function; see comments within the implementation for
2868/// details.
2869fn plan_group_by_expr<'a>(
2870    ecx: &ExprContext,
2871    group_expr: &'a Expr<Aug>,
2872    projection: &'a [(ExpandedSelectItem, ColumnName)],
2873) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2874    let plan_projection = |column: usize| match &projection[column].0 {
2875        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2876        ExpandedSelectItem::Expr(expr) => {
2877            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2878        }
2879    };
2880
2881    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2882    // a special case that means to use the ith item in the SELECT clause.
2883    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2884        return plan_projection(column);
2885    }
2886
2887    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2888    // The `foo` can refer to *either* an input column or an output column. If
2889    // both exist, the input column is preferred.
2890    match group_expr {
2891        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2892            Err(PlanError::UnknownColumn {
2893                table: None,
2894                column,
2895                similar,
2896            }) => {
2897                // The expression was a simple identifier that did not match an
2898                // input column. See if it matches an output column.
2899                let mut iter = projection.iter().map(|(_expr, name)| name);
2900                if let Some(i) = iter.position(|n| *n == column) {
2901                    if iter.any(|n| *n == column) {
2902                        Err(PlanError::AmbiguousColumn(column))
2903                    } else {
2904                        plan_projection(i)
2905                    }
2906                } else {
2907                    // The name didn't match an output column either. Return the
2908                    // "unknown column" error.
2909                    Err(PlanError::UnknownColumn {
2910                        table: None,
2911                        column,
2912                        similar,
2913                    })
2914                }
2915            }
2916            res => Ok((Some(group_expr), res?)),
2917        },
2918        _ => Ok((
2919            Some(group_expr),
2920            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2921        )),
2922    }
2923}
2924
2925/// Plans a slice of `ORDER BY` expressions.
2926///
2927/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2928/// parameter.
2929///
2930/// Returns the determined column orderings and a list of scalar expressions
2931/// that must be mapped onto the underlying relation expression.
2932pub(crate) fn plan_order_by_exprs(
2933    ecx: &ExprContext,
2934    order_by_exprs: &[OrderByExpr<Aug>],
2935    output_columns: &[(usize, &ColumnName)],
2936) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2937    let mut order_by = vec![];
2938    let mut map_exprs = vec![];
2939    for obe in order_by_exprs {
2940        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2941        // If the expression is a reference to an existing column,
2942        // do not introduce a new column to support it.
2943        let column = match expr {
2944            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2945            _ => {
2946                map_exprs.push(expr);
2947                ecx.relation_type.arity() + map_exprs.len() - 1
2948            }
2949        };
2950        order_by.push(resolve_desc_and_nulls_last(obe, column));
2951    }
2952    Ok((order_by, map_exprs))
2953}
2954
2955/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2956///
2957/// The `output_columns` parameter describes, in order, the physical index and
2958/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2959/// corresponds to a `SELECT` list with a single entry named "a" that can be
2960/// found at index 3 in the underlying relation expression.
2961///
2962/// There are three cases to handle.
2963///
2964///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2965///       reference to the specified output column.
2966///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2967///       an output column, if it exists; otherwise it is a reference to an
2968///       input column.
2969///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2970///       arbitrary expressions exclusively refer to input columns, never output
2971///       columns.
2972fn plan_order_by_or_distinct_expr(
2973    ecx: &ExprContext,
2974    expr: &Expr<Aug>,
2975    output_columns: &[(usize, &ColumnName)],
2976) -> Result<HirScalarExpr, PlanError> {
2977    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2978        return Ok(HirScalarExpr::column(output_columns[i].0));
2979    }
2980
2981    if let Expr::Identifier(names) = expr {
2982        if let [name] = &names[..] {
2983            let name = normalize::column_name(name.clone());
2984            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2985            if let Some((i, _)) = iter.next() {
2986                match iter.next() {
2987                    // Per SQL92, names are not considered ambiguous if they
2988                    // refer to identical target list expressions, as in
2989                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2990                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2991                    _ => return Ok(HirScalarExpr::column(*i)),
2992                }
2993            }
2994        }
2995    }
2996
2997    plan_expr(ecx, expr)?.type_as_any(ecx)
2998}
2999
3000fn plan_table_with_joins(
3001    qcx: &QueryContext,
3002    table_with_joins: &TableWithJoins<Aug>,
3003) -> Result<(HirRelationExpr, Scope), PlanError> {
3004    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
3005    for join in &table_with_joins.joins {
3006        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
3007        expr = new_expr;
3008        scope = new_scope;
3009    }
3010    Ok((expr, scope))
3011}
3012
3013fn plan_table_factor(
3014    qcx: &QueryContext,
3015    table_factor: &TableFactor<Aug>,
3016) -> Result<(HirRelationExpr, Scope), PlanError> {
3017    match table_factor {
3018        TableFactor::Table { name, alias } => {
3019            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3020            let scope = plan_table_alias(scope, alias.as_ref())?;
3021            Ok((expr, scope))
3022        }
3023
3024        TableFactor::Function {
3025            function,
3026            alias,
3027            with_ordinality,
3028        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3029
3030        TableFactor::RowsFrom {
3031            functions,
3032            alias,
3033            with_ordinality,
3034        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3035
3036        TableFactor::Derived {
3037            lateral,
3038            subquery,
3039            alias,
3040        } => {
3041            let mut qcx = (*qcx).clone();
3042            if !lateral {
3043                // Since this derived table was not marked as `LATERAL`,
3044                // make elements in outer scopes invisible until we reach the
3045                // next lateral barrier.
3046                for scope in &mut qcx.outer_scopes {
3047                    if scope.lateral_barrier {
3048                        break;
3049                    }
3050                    scope.items.clear();
3051                }
3052            }
3053            qcx.outer_scopes[0].lateral_barrier = true;
3054            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3055            let scope = plan_table_alias(scope, alias.as_ref())?;
3056            Ok((expr, scope))
3057        }
3058
3059        TableFactor::NestedJoin { join, alias } => {
3060            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3061            let scope = plan_table_alias(scope, alias.as_ref())?;
3062            Ok((expr, scope))
3063        }
3064    }
3065}
3066
3067/// Plans a `ROWS FROM` expression.
3068///
3069/// `ROWS FROM` concatenates table functions into a single table, filling in
3070/// `NULL`s in places where one table function has fewer rows than another. We
3071/// can achieve this by augmenting each table function with a row number, doing
3072/// a `FULL JOIN` between each table function on the row number and eventually
3073/// projecting away the row number columns. Concretely, the following query
3074/// using `ROWS FROM`
3075///
3076/// ```sql
3077/// SELECT
3078///     *
3079/// FROM
3080///     ROWS FROM (
3081///         generate_series(1, 2),
3082///         information_schema._pg_expandarray(ARRAY[9]),
3083///         generate_series(3, 6)
3084///     );
3085/// ```
3086///
3087/// is equivalent to the following query that does not use `ROWS FROM`:
3088///
3089/// ```sql
3090/// SELECT
3091///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3092/// FROM
3093///     generate_series(1, 2) WITH ORDINALITY AS gs1
3094///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3095///         ON gs1.ordinality = expand.ordinality
3096///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3097///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3098/// ```
3099///
3100/// Note the call to `coalesce` in the last join condition, which ensures that
3101/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3102///
3103/// This function creates a HirRelationExpr that follows the structure of the
3104/// latter query.
3105///
3106/// `with_ordinality` can be used to have the output expression contain a
3107/// single coalesced ordinality column at the end of the entire expression.
3108fn plan_rows_from(
3109    qcx: &QueryContext,
3110    functions: &[Function<Aug>],
3111    alias: Option<&TableAlias>,
3112    with_ordinality: bool,
3113) -> Result<(HirRelationExpr, Scope), PlanError> {
3114    // If there's only a single table function, planning proceeds as if `ROWS
3115    // FROM` hadn't been written at all.
3116    if let [function] = functions {
3117        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3118    }
3119
3120    // Per PostgreSQL, all scope items take the name of the first function
3121    // (unless aliased).
3122    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3123    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3124        qcx,
3125        functions,
3126        Some(functions[0].name.full_item_name().clone()),
3127    )?;
3128
3129    // Columns tracks the set of columns we will keep in the projection.
3130    let mut columns = Vec::new();
3131    let mut offset = 0;
3132    // Retain table function's non-ordinality columns.
3133    for (idx, cols) in num_cols.into_iter().enumerate() {
3134        for i in 0..cols {
3135            columns.push(offset + i);
3136        }
3137        offset += cols + 1;
3138
3139        // Remove the ordinality column from the scope, accounting for previous scope
3140        // changes from this loop.
3141        scope.items.remove(offset - idx - 1);
3142    }
3143
3144    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3145    // column. Otherwise remove it from the scope.
3146    if with_ordinality {
3147        columns.push(offset);
3148    } else {
3149        scope.items.pop();
3150    }
3151
3152    let expr = expr.project(columns);
3153
3154    let scope = plan_table_alias(scope, alias)?;
3155    Ok((expr, scope))
3156}
3157
3158/// Plans an expression coalescing multiple table functions. Each table
3159/// function is followed by its row ordinality. The entire expression is
3160/// followed by the coalesced row ordinality.
3161///
3162/// The returned Scope will set all item's table_name's to the `table_name`
3163/// parameter if it is `Some`. If `None`, they will be the name of each table
3164/// function.
3165///
3166/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3167/// each table function.
3168///
3169/// For example, with table functions tf1 returning 1 column (a) and tf2
3170/// returning 2 columns (b, c), this function will return an expr 6 columns:
3171///
3172/// - tf1.a
3173/// - tf1.ordinality
3174/// - tf2.b
3175/// - tf2.c
3176/// - tf2.ordinality
3177/// - coalesced_ordinality
3178///
3179/// And a `Vec<usize>` of `[1, 2]`.
3180fn plan_rows_from_internal<'a>(
3181    qcx: &QueryContext,
3182    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3183    table_name: Option<FullItemName>,
3184) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3185    let mut functions = functions.into_iter();
3186    let mut num_cols = Vec::new();
3187
3188    // Join together each of the table functions in turn. The last column is
3189    // always the column to join against and is maintained to be the coalescence
3190    // of the row number column for all prior functions.
3191    let (mut left_expr, mut left_scope) =
3192        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3193    num_cols.push(left_scope.len() - 1);
3194    // Create the coalesced ordinality column.
3195    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3196    left_scope
3197        .items
3198        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3199
3200    for function in functions {
3201        // The right hand side of a join must be planned in a new scope.
3202        let qcx = qcx.empty_derived_context();
3203        let (right_expr, mut right_scope) =
3204            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3205        num_cols.push(right_scope.len() - 1);
3206        let left_col = left_scope.len() - 1;
3207        let right_col = left_scope.len() + right_scope.len() - 1;
3208        let on = HirScalarExpr::call_binary(
3209            HirScalarExpr::column(left_col),
3210            HirScalarExpr::column(right_col),
3211            expr_func::Eq,
3212        );
3213        left_expr = left_expr
3214            .join(right_expr, on, JoinKind::FullOuter)
3215            .map(vec![HirScalarExpr::call_variadic(
3216                Coalesce,
3217                vec![
3218                    HirScalarExpr::column(left_col),
3219                    HirScalarExpr::column(right_col),
3220                ],
3221            )]);
3222
3223        // Project off the previous iteration's coalesced column, but keep both of this
3224        // iteration's ordinality columns.
3225        left_expr = left_expr.project(
3226            (0..left_col) // non-coalesced ordinality columns from left function
3227                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3228                .collect(),
3229        );
3230        // Move the coalesced ordinality column.
3231        right_scope.items.push(left_scope.items.pop().unwrap());
3232
3233        left_scope.items.extend(right_scope.items);
3234    }
3235
3236    Ok((left_expr, left_scope, num_cols))
3237}
3238
3239/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3240/// FROM` clause that contains other table functions. Special aliasing rules
3241/// apply.
3242fn plan_solitary_table_function(
3243    qcx: &QueryContext,
3244    function: &Function<Aug>,
3245    alias: Option<&TableAlias>,
3246    with_ordinality: bool,
3247) -> Result<(HirRelationExpr, Scope), PlanError> {
3248    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3249
3250    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3251    if single_column_function {
3252        let item = &mut scope.items[0];
3253
3254        // Mark that the function only produced a single column. This impacts
3255        // whole-row references.
3256        item.from_single_column_function = true;
3257
3258        // Strange special case for solitary table functions that output one
3259        // column whose name matches the name of the table function. If a table
3260        // alias is provided, the column name is changed to the table alias's
3261        // name. Concretely, the following query returns a column named `x`
3262        // rather than a column named `generate_series`:
3263        //
3264        //     SELECT * FROM generate_series(1, 5) AS x
3265        //
3266        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3267        // since its output column is explicitly named `value`, not
3268        // `jsonb_array_elements`.
3269        //
3270        // Note also that we may (correctly) change the column name again when
3271        // we plan the table alias below if the `alias.columns` is non-empty.
3272        if let Some(alias) = alias {
3273            if let ScopeItem {
3274                table_name: Some(table_name),
3275                column_name,
3276                ..
3277            } = item
3278            {
3279                if table_name.item.as_str() == column_name.as_str() {
3280                    *column_name = normalize::column_name(alias.name.clone());
3281                }
3282            }
3283        }
3284    }
3285
3286    let scope = plan_table_alias(scope, alias)?;
3287    Ok((expr, scope))
3288}
3289
3290/// Plans a table function.
3291///
3292/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3293/// instead to get the appropriate aliasing behavior.
3294fn plan_table_function_internal(
3295    qcx: &QueryContext,
3296    Function {
3297        name,
3298        args,
3299        filter,
3300        over,
3301        distinct,
3302    }: &Function<Aug>,
3303    with_ordinality: bool,
3304    table_name: Option<FullItemName>,
3305) -> Result<(HirRelationExpr, Scope), PlanError> {
3306    assert_none!(filter, "cannot parse table function with FILTER");
3307    assert_none!(over, "cannot parse table function with OVER");
3308    assert!(!*distinct, "cannot parse table function with DISTINCT");
3309
3310    let ecx = &ExprContext {
3311        qcx,
3312        name: "table function arguments",
3313        scope: &Scope::empty(),
3314        relation_type: &SqlRelationType::empty(),
3315        allow_aggregates: false,
3316        allow_subqueries: true,
3317        allow_parameters: true,
3318        allow_windows: false,
3319    };
3320
3321    let scalar_args = match args {
3322        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3323        FunctionArgs::Args { args, order_by } => {
3324            if !order_by.is_empty() {
3325                sql_bail!(
3326                    "ORDER BY specified, but {} is not an aggregate function",
3327                    name
3328                );
3329            }
3330            plan_exprs(ecx, args)?
3331        }
3332    };
3333
3334    let table_name = match table_name {
3335        Some(table_name) => table_name.item,
3336        None => name.full_item_name().item.clone(),
3337    };
3338
3339    let scope_name = Some(PartialItemName {
3340        database: None,
3341        schema: None,
3342        item: table_name,
3343    });
3344
3345    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3346        Func::Table(impls) => {
3347            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3348            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3349            let expr = match tf.imp {
3350                TableFuncImpl::CallTable { mut func, exprs } => {
3351                    if with_ordinality {
3352                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3353                            PlanError::Unsupported {
3354                                feature: format!("WITH ORDINALITY on {}", func),
3355                                discussion_no: None,
3356                            },
3357                        )?;
3358                    }
3359                    HirRelationExpr::CallTable { func, exprs }
3360                }
3361                TableFuncImpl::Expr(expr) => {
3362                    if !with_ordinality {
3363                        expr
3364                    } else {
3365                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3366                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3367                        // back to the legacy implementation or error out the query.
3368                        if qcx
3369                            .scx
3370                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3371                        {
3372                            // Note that this can give an incorrect ordering, and also has an extreme
3373                            // performance problem in some cases. See the doc comment of
3374                            // `TableFuncImpl`.
3375                            tracing::error!(
3376                                %name,
3377                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3378                            );
3379                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3380                                func: WindowExprType::Scalar(ScalarWindowExpr {
3381                                    func: ScalarWindowFunc::RowNumber,
3382                                    order_by: vec![],
3383                                }),
3384                                partition_by: vec![],
3385                                order_by: vec![],
3386                            })])
3387                        } else {
3388                            bail_unsupported!(format!(
3389                                "WITH ORDINALITY or ROWS FROM with {}",
3390                                name
3391                            ));
3392                        }
3393                    }
3394                }
3395            };
3396            (expr, scope)
3397        }
3398        Func::Scalar(impls) => {
3399            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3400            let output = expr.typ(
3401                &qcx.outer_relation_types,
3402                &SqlRelationType::new(vec![]),
3403                &qcx.scx.param_types.borrow(),
3404            );
3405
3406            let relation = SqlRelationType::new(vec![output]);
3407
3408            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3409            let column_name = normalize::column_name(function_ident);
3410            let name = column_name.to_string();
3411
3412            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3413
3414            let mut func = TableFunc::TabletizedScalar { relation, name };
3415            if with_ordinality {
3416                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3417                    feature: format!("WITH ORDINALITY on {}", func),
3418                    discussion_no: None,
3419                })?;
3420            }
3421            (
3422                HirRelationExpr::CallTable {
3423                    func,
3424                    exprs: vec![expr],
3425                },
3426                scope,
3427            )
3428        }
3429        o => sql_bail!(
3430            "{} functions are not supported in functions in FROM",
3431            o.class()
3432        ),
3433    };
3434
3435    if with_ordinality {
3436        scope
3437            .items
3438            .push(ScopeItem::from_name(scope_name, "ordinality"));
3439    }
3440
3441    Ok((expr, scope))
3442}
3443
3444fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3445    if let Some(TableAlias {
3446        name,
3447        columns,
3448        strict,
3449    }) = alias
3450    {
3451        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3452            sql_bail!(
3453                "{} has {} columns available but {} columns specified",
3454                name,
3455                scope.items.len(),
3456                columns.len()
3457            );
3458        }
3459
3460        let table_name = normalize::ident(name.to_owned());
3461        for (i, item) in scope.items.iter_mut().enumerate() {
3462            item.table_name = if item.allow_unqualified_references {
3463                Some(PartialItemName {
3464                    database: None,
3465                    schema: None,
3466                    item: table_name.clone(),
3467                })
3468            } else {
3469                // Columns that prohibit unqualified references are special
3470                // columns from the output of a NATURAL or USING join that can
3471                // only be referenced by their full, pre-join name. Applying an
3472                // alias to the output of that join renders those columns
3473                // inaccessible, which we accomplish here by setting the
3474                // table name to `None`.
3475                //
3476                // Concretely, consider:
3477                //
3478                //      CREATE TABLE t1 (a int);
3479                //      CREATE TABLE t2 (a int);
3480                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3481                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3482                //
3483                // In (1), the join has no alias. The underlying columns from
3484                // either side of the join can be referenced as `t1.a` and
3485                // `t2.a`, respectively, and the unqualified name `a` refers to
3486                // a column whose value is `coalesce(t1.a, t2.a)`.
3487                //
3488                // In (2), the join is aliased as `t`. The columns from either
3489                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3490                // the coalesced column can be named as either `a` or `t.a`.
3491                //
3492                // We previously had a bug [0] that mishandled this subtle
3493                // logic.
3494                //
3495                // NOTE(benesch): We could in theory choose to project away
3496                // those inaccessible columns and drop them from the scope
3497                // entirely, but that would require that this function also
3498                // take and return the `HirRelationExpr` that is being aliased,
3499                // which is a rather large refactor.
3500                //
3501                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3502                None
3503            };
3504            item.column_name = columns
3505                .get(i)
3506                .map(|a| normalize::column_name(a.clone()))
3507                .unwrap_or_else(|| item.column_name.clone());
3508        }
3509    }
3510    Ok(scope)
3511}
3512
3513// `table_func_names` is a mapping from a UUID to the original function
3514// name. The UUIDs are identifiers that have been rewritten from some table
3515// function expression, and this mapping restores the original names.
3516fn invent_column_name(
3517    ecx: &ExprContext,
3518    expr: &Expr<Aug>,
3519    table_func_names: &BTreeMap<String, Ident>,
3520) -> Option<ColumnName> {
3521    // We follow PostgreSQL exactly here, which has some complicated rules
3522    // around "high" and "low" quality names. Low quality names override other
3523    // low quality names but not high quality names.
3524    //
3525    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3526
3527    #[derive(Debug)]
3528    enum NameQuality {
3529        Low,
3530        High,
3531    }
3532
3533    fn invent(
3534        ecx: &ExprContext,
3535        expr: &Expr<Aug>,
3536        table_func_names: &BTreeMap<String, Ident>,
3537    ) -> Option<(ColumnName, NameQuality)> {
3538        match expr {
3539            Expr::Identifier(names) => {
3540                if let [name] = names.as_slice() {
3541                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3542                        return Some((
3543                            normalize::column_name(table_func_name.clone()),
3544                            NameQuality::High,
3545                        ));
3546                    }
3547                }
3548                names
3549                    .last()
3550                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3551            }
3552            Expr::Value(v) => match v {
3553                // Per PostgreSQL, `bool` and `interval` literals take on the name
3554                // of their type, but not other literal types.
3555                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3556                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3557                _ => None,
3558            },
3559            Expr::Function(func) => {
3560                let (schema, item) = match &func.name {
3561                    ResolvedItemName::Item {
3562                        qualifiers,
3563                        full_name,
3564                        ..
3565                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3566                    _ => unreachable!(),
3567                };
3568
3569                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3570                    || schema
3571                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3572                {
3573                    None
3574                } else {
3575                    Some((item.into(), NameQuality::High))
3576                }
3577            }
3578            Expr::HomogenizingFunction { function, .. } => Some((
3579                function.to_string().to_lowercase().into(),
3580                NameQuality::High,
3581            )),
3582            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3583            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3584            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3585            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3586            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3587                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3588                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3589            },
3590            Expr::Case { else_result, .. } => {
3591                match else_result
3592                    .as_ref()
3593                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3594                {
3595                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3596                    _ => Some(("case".into(), NameQuality::Low)),
3597                }
3598            }
3599            Expr::FieldAccess { field, .. } => {
3600                Some((normalize::column_name(field.clone()), NameQuality::High))
3601            }
3602            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3603            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3604            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3605                // A bit silly to have to plan the query here just to get its column
3606                // name, since we throw away the planned expression, but fixing this
3607                // requires a separate semantic analysis phase.
3608                let (_expr, scope) =
3609                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3610                scope
3611                    .items
3612                    .first()
3613                    .map(|name| (name.column_name.clone(), NameQuality::High))
3614            }
3615            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3616            _ => None,
3617        }
3618    }
3619
3620    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3621}
3622
3623#[derive(Debug)]
3624enum ExpandedSelectItem<'a> {
3625    InputOrdinal(usize),
3626    Expr(Cow<'a, Expr<Aug>>),
3627}
3628
3629impl ExpandedSelectItem<'_> {
3630    fn as_expr(&self) -> Option<&Expr<Aug>> {
3631        match self {
3632            ExpandedSelectItem::InputOrdinal(_) => None,
3633            ExpandedSelectItem::Expr(expr) => Some(expr),
3634        }
3635    }
3636}
3637
3638fn expand_select_item<'a>(
3639    ecx: &ExprContext,
3640    s: &'a SelectItem<Aug>,
3641    table_func_names: &BTreeMap<String, Ident>,
3642) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3643    match s {
3644        SelectItem::Expr {
3645            expr: Expr::QualifiedWildcard(table_name),
3646            alias: _,
3647        } => {
3648            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3649            let table_name =
3650                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3651            let out: Vec<_> = ecx
3652                .scope
3653                .items
3654                .iter()
3655                .enumerate()
3656                .filter(|(_i, item)| item.is_from_table(&table_name))
3657                .map(|(i, item)| {
3658                    let name = item.column_name.clone();
3659                    (ExpandedSelectItem::InputOrdinal(i), name)
3660                })
3661                .collect();
3662            if out.is_empty() {
3663                sql_bail!("no table named '{}' in scope", table_name);
3664            }
3665            Ok(out)
3666        }
3667        SelectItem::Expr {
3668            expr: Expr::WildcardAccess(sql_expr),
3669            alias: _,
3670        } => {
3671            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3672            // A bit silly to have to plan the expression here just to get its
3673            // type, since we throw away the planned expression, but fixing this
3674            // requires a separate semantic analysis phase. Luckily this is an
3675            // uncommon operation and the PostgreSQL docs have a warning that
3676            // this operation is slow in Postgres too.
3677            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3678            let fields = match ecx.scalar_type(&expr) {
3679                SqlScalarType::Record { fields, .. } => fields,
3680                ty => sql_bail!(
3681                    "type {} is not composite",
3682                    ecx.humanize_scalar_type(&ty, false)
3683                ),
3684            };
3685            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3686            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3687                if let [name] = ident.as_slice() {
3688                    if let Ok(items) = ecx.scope.items_from_table(
3689                        &[],
3690                        &PartialItemName {
3691                            database: None,
3692                            schema: None,
3693                            item: name.as_str().to_string(),
3694                        },
3695                    ) {
3696                        for (_, item) in items {
3697                            if item
3698                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3699                            {
3700                                skip_cols.insert(item.column_name.clone());
3701                            }
3702                        }
3703                    }
3704                }
3705            }
3706            let items = fields
3707                .iter()
3708                .filter_map(|(name, _ty)| {
3709                    if skip_cols.contains(name) {
3710                        None
3711                    } else {
3712                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3713                            expr: sql_expr.clone(),
3714                            field: name.clone().into(),
3715                        }));
3716                        Some((item, name.clone()))
3717                    }
3718                })
3719                .collect();
3720            Ok(items)
3721        }
3722        SelectItem::Wildcard => {
3723            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3724            let items: Vec<_> = ecx
3725                .scope
3726                .items
3727                .iter()
3728                .enumerate()
3729                .filter(|(_i, item)| item.allow_unqualified_references)
3730                .map(|(i, item)| {
3731                    let name = item.column_name.clone();
3732                    (ExpandedSelectItem::InputOrdinal(i), name)
3733                })
3734                .collect();
3735
3736            Ok(items)
3737        }
3738        SelectItem::Expr { expr, alias } => {
3739            let name = alias
3740                .clone()
3741                .map(normalize::column_name)
3742                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3743                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3744            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3745        }
3746    }
3747}
3748
3749fn plan_join(
3750    left_qcx: &QueryContext,
3751    left: HirRelationExpr,
3752    left_scope: Scope,
3753    join: &Join<Aug>,
3754) -> Result<(HirRelationExpr, Scope), PlanError> {
3755    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3756    let (kind, constraint) = match &join.join_operator {
3757        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3758        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3759        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3760        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3761        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3762    };
3763
3764    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3765    if !kind.can_be_correlated() {
3766        for item in &mut right_qcx.outer_scopes[0].items {
3767            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3768            // these items from scope. These items need to *exist* because they
3769            // might shadow variables in outer scopes that would otherwise be
3770            // valid to reference, but accessing them needs to produce an error.
3771            item.error_if_referenced =
3772                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3773                    table: table.cloned(),
3774                    column: column.clone(),
3775                });
3776        }
3777    }
3778    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3779
3780    let (expr, scope) = match constraint {
3781        JoinConstraint::On(expr) => {
3782            let product_scope = left_scope.product(right_scope)?;
3783            let ecx = &ExprContext {
3784                qcx: left_qcx,
3785                name: "ON clause",
3786                scope: &product_scope,
3787                relation_type: &SqlRelationType::new(
3788                    left_qcx
3789                        .relation_type(&left)
3790                        .column_types
3791                        .into_iter()
3792                        .chain(right_qcx.relation_type(&right).column_types)
3793                        .collect(),
3794                ),
3795                allow_aggregates: false,
3796                allow_subqueries: true,
3797                allow_parameters: true,
3798                allow_windows: false,
3799            };
3800            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3801            let joined = left.join(right, on, kind);
3802            (joined, product_scope)
3803        }
3804        JoinConstraint::Using { columns, alias } => {
3805            let column_names = columns
3806                .iter()
3807                .map(|ident| normalize::column_name(ident.clone()))
3808                .collect::<Vec<_>>();
3809
3810            plan_using_constraint(
3811                &column_names,
3812                left_qcx,
3813                left,
3814                left_scope,
3815                &right_qcx,
3816                right,
3817                right_scope,
3818                kind,
3819                alias.as_ref(),
3820            )?
3821        }
3822        JoinConstraint::Natural => {
3823            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3824            // have the same scx. However, it doesn't hurt to be safe.
3825            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3826            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3827            let left_column_names = left_scope.column_names();
3828            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3829            let column_names: Vec<_> = left_column_names
3830                .filter(|col| right_column_names.contains(col))
3831                .cloned()
3832                .collect();
3833            plan_using_constraint(
3834                &column_names,
3835                left_qcx,
3836                left,
3837                left_scope,
3838                &right_qcx,
3839                right,
3840                right_scope,
3841                kind,
3842                None,
3843            )?
3844        }
3845    };
3846    Ok((expr, scope))
3847}
3848
3849// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3850#[allow(clippy::too_many_arguments)]
3851fn plan_using_constraint(
3852    column_names: &[ColumnName],
3853    left_qcx: &QueryContext,
3854    left: HirRelationExpr,
3855    left_scope: Scope,
3856    right_qcx: &QueryContext,
3857    right: HirRelationExpr,
3858    right_scope: Scope,
3859    kind: JoinKind,
3860    alias: Option<&Ident>,
3861) -> Result<(HirRelationExpr, Scope), PlanError> {
3862    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3863
3864    // Cargo culting PG here; no discernable reason this must fail, but PG does
3865    // so we do, as well.
3866    let mut unique_column_names = BTreeSet::new();
3867    for c in column_names {
3868        if !unique_column_names.insert(c) {
3869            return Err(PlanError::Unsupported {
3870                feature: format!(
3871                    "column name {} appears more than once in USING clause",
3872                    c.quoted()
3873                ),
3874                discussion_no: None,
3875            });
3876        }
3877    }
3878
3879    let alias_item_name = alias.map(|alias| PartialItemName {
3880        database: None,
3881        schema: None,
3882        item: alias.clone().to_string(),
3883    });
3884
3885    if let Some(alias_item_name) = &alias_item_name {
3886        for partial_item_name in both_scope.table_names() {
3887            if partial_item_name.matches(alias_item_name) {
3888                sql_bail!(
3889                    "table name \"{}\" specified more than once",
3890                    alias_item_name
3891                )
3892            }
3893        }
3894    }
3895
3896    let ecx = &ExprContext {
3897        qcx: right_qcx,
3898        name: "USING clause",
3899        scope: &both_scope,
3900        relation_type: &SqlRelationType::new(
3901            left_qcx
3902                .relation_type(&left)
3903                .column_types
3904                .into_iter()
3905                .chain(right_qcx.relation_type(&right).column_types)
3906                .collect(),
3907        ),
3908        allow_aggregates: false,
3909        allow_subqueries: false,
3910        allow_parameters: false,
3911        allow_windows: false,
3912    };
3913
3914    let mut join_exprs = vec![];
3915    let mut map_exprs = vec![];
3916    let mut new_items = vec![];
3917    let mut join_cols = vec![];
3918    let mut hidden_cols = vec![];
3919
3920    for column_name in column_names {
3921        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3922        let (lhs, lhs_name) = left_scope.resolve_using_column(
3923            column_name,
3924            JoinSide::Left,
3925            &mut left_qcx.name_manager.borrow_mut(),
3926        )?;
3927        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3928            column_name,
3929            JoinSide::Right,
3930            &mut right_qcx.name_manager.borrow_mut(),
3931        )?;
3932
3933        // Adjust the RHS reference to its post-join location.
3934        rhs.column += left_scope.len();
3935
3936        // Join keys must be resolved to same type.
3937        let mut exprs = coerce_homogeneous_exprs(
3938            &ecx.with_name(&format!(
3939                "NATURAL/USING join column {}",
3940                column_name.quoted()
3941            )),
3942            vec![
3943                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3944                    lhs,
3945                    Arc::clone(&lhs_name),
3946                )),
3947                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3948                    rhs,
3949                    Arc::clone(&rhs_name),
3950                )),
3951            ],
3952            None,
3953        )?;
3954        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3955
3956        match kind {
3957            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3958                join_cols.push(lhs.column);
3959                hidden_cols.push(rhs.column);
3960            }
3961            JoinKind::RightOuter => {
3962                join_cols.push(rhs.column);
3963                hidden_cols.push(lhs.column);
3964            }
3965            JoinKind::FullOuter => {
3966                // Create a new column that will be the coalesced value of left
3967                // and right.
3968                join_cols.push(both_scope.items.len() + map_exprs.len());
3969                hidden_cols.push(lhs.column);
3970                hidden_cols.push(rhs.column);
3971                map_exprs.push(HirScalarExpr::call_variadic(
3972                    Coalesce,
3973                    vec![expr1.clone(), expr2.clone()],
3974                ));
3975                new_items.push(ScopeItem::from_column_name(column_name));
3976            }
3977        }
3978
3979        // If a `join_using_alias` is present, add a new scope item that accepts
3980        // only table-qualified references for each specified join column.
3981        // Unlike regular table aliases, a `join_using_alias` should not hide the
3982        // names of the joined relations.
3983        if alias_item_name.is_some() {
3984            let new_item_col = both_scope.items.len() + new_items.len();
3985            join_cols.push(new_item_col);
3986            hidden_cols.push(new_item_col);
3987
3988            new_items.push(ScopeItem::from_name(
3989                alias_item_name.clone(),
3990                column_name.clone().to_string(),
3991            ));
3992
3993            // Should be safe to use either `lhs` or `rhs` here since the column
3994            // is available in both scopes and must have the same type of the new item.
3995            // We (arbitrarily) choose the left name.
3996            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3997        }
3998
3999        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4000    }
4001    both_scope.items.extend(new_items);
4002
4003    // The columns from the secondary side of the join remain accessible by
4004    // their table-qualified name, but not by their column name alone. They are
4005    // also excluded from `SELECT *`.
4006    for c in hidden_cols {
4007        both_scope.items[c].allow_unqualified_references = false;
4008    }
4009
4010    // Reproject all returned elements to the front of the list.
4011    let project_key = join_cols
4012        .into_iter()
4013        .chain(0..both_scope.items.len())
4014        .unique()
4015        .collect::<Vec<_>>();
4016
4017    both_scope = both_scope.project(&project_key);
4018
4019    let on = HirScalarExpr::variadic_and(join_exprs);
4020
4021    let both = left
4022        .join(right, on, kind)
4023        .map(map_exprs)
4024        .project(project_key);
4025    Ok((both, both_scope))
4026}
4027
4028pub fn plan_expr<'a>(
4029    ecx: &'a ExprContext,
4030    e: &Expr<Aug>,
4031) -> Result<CoercibleScalarExpr, PlanError> {
4032    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4033}
4034
4035fn plan_expr_inner<'a>(
4036    ecx: &'a ExprContext,
4037    e: &Expr<Aug>,
4038) -> Result<CoercibleScalarExpr, PlanError> {
4039    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4040        // We've already calculated this expression.
4041        return Ok(HirScalarExpr::named_column(
4042            i,
4043            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4044        )
4045        .into());
4046    }
4047
4048    match e {
4049        // Names.
4050        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4051            Ok(plan_identifier(ecx, names)?.into())
4052        }
4053
4054        // Literals.
4055        Expr::Value(val) => plan_literal(val),
4056        Expr::Parameter(n) => plan_parameter(ecx, *n),
4057        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4058        Expr::List(exprs) => plan_list(ecx, exprs, None),
4059        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4060        Expr::Row { exprs } => plan_row(ecx, exprs),
4061
4062        // Generalized functions, operators, and casts.
4063        Expr::Op { op, expr1, expr2 } => {
4064            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4065        }
4066        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4067        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4068
4069        // Special functions and operators.
4070        Expr::Not { expr } => plan_not(ecx, expr),
4071        Expr::And { left, right } => plan_and(ecx, left, right),
4072        Expr::Or { left, right } => plan_or(ecx, left, right),
4073        Expr::IsExpr {
4074            expr,
4075            construct,
4076            negated,
4077        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4078        Expr::Case {
4079            operand,
4080            conditions,
4081            results,
4082            else_result,
4083        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4084        Expr::HomogenizingFunction { function, exprs } => {
4085            plan_homogenizing_function(ecx, function, exprs)
4086        }
4087        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4088            ecx,
4089            &None,
4090            &[l_expr.clone().equals(*r_expr.clone())],
4091            &[Expr::null()],
4092            &Some(Box::new(*l_expr.clone())),
4093        )?
4094        .into()),
4095        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4096        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4097        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4098        Expr::Like {
4099            expr,
4100            pattern,
4101            escape,
4102            case_insensitive,
4103            negated,
4104        } => Ok(plan_like(
4105            ecx,
4106            expr,
4107            pattern,
4108            escape.as_deref(),
4109            *case_insensitive,
4110            *negated,
4111        )?
4112        .into()),
4113
4114        Expr::InList {
4115            expr,
4116            list,
4117            negated,
4118        } => plan_in_list(ecx, expr, list, negated),
4119
4120        // Subqueries.
4121        Expr::Exists(query) => plan_exists(ecx, query),
4122        Expr::Subquery(query) => plan_subquery(ecx, query),
4123        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4124        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4125        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4126        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4127        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4128        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4129        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4130        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4131        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4132        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4133        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4134    }
4135}
4136
4137fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4138    if !ecx.allow_parameters {
4139        // It might be clearer to return an error like "cannot use parameter
4140        // here", but this is how PostgreSQL does it, and so for now we follow
4141        // PostgreSQL.
4142        return Err(PlanError::UnknownParameter(n));
4143    }
4144    if n == 0 || n > 65536 {
4145        return Err(PlanError::UnknownParameter(n));
4146    }
4147    if ecx.param_types().borrow().contains_key(&n) {
4148        Ok(HirScalarExpr::parameter(n).into())
4149    } else {
4150        Ok(CoercibleScalarExpr::Parameter(n))
4151    }
4152}
4153
4154fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4155    let mut out = vec![];
4156    for e in exprs {
4157        out.push(plan_expr(ecx, e)?);
4158    }
4159    Ok(CoercibleScalarExpr::LiteralRecord(out))
4160}
4161
4162fn plan_cast(
4163    ecx: &ExprContext,
4164    expr: &Expr<Aug>,
4165    data_type: &ResolvedDataType,
4166) -> Result<CoercibleScalarExpr, PlanError> {
4167    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4168    let expr = match expr {
4169        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4170        // we can pass in the target type as a type hint. This is
4171        // a limited form of the coercion that we do for string literals
4172        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4173        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4174        // PostgreSQL compatibility trouble.
4175        //
4176        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4177        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4178        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4179        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4180        _ => plan_expr(ecx, expr)?,
4181    };
4182    let ecx = &ecx.with_name("CAST");
4183    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4184    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4185    Ok(expr.into())
4186}
4187
4188fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4189    let ecx = ecx.with_name("NOT argument");
4190    Ok(plan_expr(&ecx, expr)?
4191        .type_as(&ecx, &SqlScalarType::Bool)?
4192        .call_unary(UnaryFunc::Not(expr_func::Not))
4193        .into())
4194}
4195
4196fn plan_and(
4197    ecx: &ExprContext,
4198    left: &Expr<Aug>,
4199    right: &Expr<Aug>,
4200) -> Result<CoercibleScalarExpr, PlanError> {
4201    let ecx = ecx.with_name("AND argument");
4202    Ok(HirScalarExpr::variadic_and(vec![
4203        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4204        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4205    ])
4206    .into())
4207}
4208
4209fn plan_or(
4210    ecx: &ExprContext,
4211    left: &Expr<Aug>,
4212    right: &Expr<Aug>,
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214    let ecx = ecx.with_name("OR argument");
4215    Ok(HirScalarExpr::variadic_or(vec![
4216        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4218    ])
4219    .into())
4220}
4221
4222fn plan_in_list(
4223    ecx: &ExprContext,
4224    lhs: &Expr<Aug>,
4225    list: &Vec<Expr<Aug>>,
4226    negated: &bool,
4227) -> Result<CoercibleScalarExpr, PlanError> {
4228    let ecx = ecx.with_name("IN list");
4229    let or = HirScalarExpr::variadic_or(
4230        list.into_iter()
4231            .map(|e| {
4232                let eq = lhs.clone().equals(e.clone());
4233                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4234            })
4235            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4236    );
4237    Ok(if *negated {
4238        or.call_unary(UnaryFunc::Not(expr_func::Not))
4239    } else {
4240        or
4241    }
4242    .into())
4243}
4244
4245fn plan_homogenizing_function(
4246    ecx: &ExprContext,
4247    function: &HomogenizingFunction,
4248    exprs: &[Expr<Aug>],
4249) -> Result<CoercibleScalarExpr, PlanError> {
4250    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4251    let expr = HirScalarExpr::call_variadic(
4252        match function {
4253            HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4254            HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4255            HomogenizingFunction::Least => VariadicFunc::from(Least),
4256        },
4257        coerce_homogeneous_exprs(
4258            &ecx.with_name(&function.to_string().to_lowercase()),
4259            plan_exprs(ecx, exprs)?,
4260            None,
4261        )?,
4262    );
4263    Ok(expr.into())
4264}
4265
4266fn plan_field_access(
4267    ecx: &ExprContext,
4268    expr: &Expr<Aug>,
4269    field: &Ident,
4270) -> Result<CoercibleScalarExpr, PlanError> {
4271    let field = normalize::column_name(field.clone());
4272    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4273    let ty = ecx.scalar_type(&expr);
4274    let i = match &ty {
4275        SqlScalarType::Record { fields, .. } => {
4276            fields.iter().position(|(name, _ty)| *name == field)
4277        }
4278        ty => sql_bail!(
4279            "column notation applied to type {}, which is not a composite type",
4280            ecx.humanize_scalar_type(ty, false)
4281        ),
4282    };
4283    match i {
4284        None => sql_bail!(
4285            "field {} not found in data type {}",
4286            field,
4287            ecx.humanize_scalar_type(&ty, false)
4288        ),
4289        Some(i) => Ok(expr
4290            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4291            .into()),
4292    }
4293}
4294
4295fn plan_subscript(
4296    ecx: &ExprContext,
4297    expr: &Expr<Aug>,
4298    positions: &[SubscriptPosition<Aug>],
4299) -> Result<CoercibleScalarExpr, PlanError> {
4300    assert!(
4301        !positions.is_empty(),
4302        "subscript expression must contain at least one position"
4303    );
4304
4305    let ecx = &ecx.with_name("subscripting");
4306    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4307    let ty = ecx.scalar_type(&expr);
4308    match &ty {
4309        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4310            ecx,
4311            expr,
4312            positions,
4313            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4314            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4315            // track in its backing data).
4316            if ty == SqlScalarType::Int2Vector {
4317                1
4318            } else {
4319                0
4320            },
4321        ),
4322        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4323        SqlScalarType::List { element_type, .. } => {
4324            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4325            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4326            let n_layers = ty.unwrap_list_n_layers();
4327            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4328        }
4329        ty => sql_bail!(
4330            "cannot subscript type {}",
4331            ecx.humanize_scalar_type(ty, false)
4332        ),
4333    }
4334}
4335
4336// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4337// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4338// any were slices (i.e. included colon).
4339fn extract_scalar_subscript_from_positions<'a>(
4340    positions: &'a [SubscriptPosition<Aug>],
4341    expr_type_name: &str,
4342) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4343    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4344    for p in positions {
4345        if p.explicit_slice {
4346            sql_bail!("{} subscript does not support slices", expr_type_name);
4347        }
4348        assert!(
4349            p.end.is_none(),
4350            "index-appearing subscripts cannot have end value"
4351        );
4352        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4353    }
4354    Ok(scalar_subscripts)
4355}
4356
4357fn plan_subscript_array(
4358    ecx: &ExprContext,
4359    expr: HirScalarExpr,
4360    positions: &[SubscriptPosition<Aug>],
4361    offset: i64,
4362) -> Result<CoercibleScalarExpr, PlanError> {
4363    let mut exprs = Vec::with_capacity(positions.len() + 1);
4364    exprs.push(expr);
4365
4366    // Subscripting arrays doesn't yet support slicing, so we always want to
4367    // extract scalars or error.
4368    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4369
4370    for i in indexes {
4371        exprs.push(plan_expr(ecx, i)?.cast_to(
4372            ecx,
4373            CastContext::Explicit,
4374            &SqlScalarType::Int64,
4375        )?);
4376    }
4377
4378    Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4379}
4380
4381fn plan_subscript_list(
4382    ecx: &ExprContext,
4383    mut expr: HirScalarExpr,
4384    positions: &[SubscriptPosition<Aug>],
4385    mut remaining_layers: usize,
4386    elem_type_name: &str,
4387) -> Result<CoercibleScalarExpr, PlanError> {
4388    let mut i = 0;
4389
4390    while i < positions.len() {
4391        // Take all contiguous index operations, i.e. find next slice operation.
4392        let j = positions[i..]
4393            .iter()
4394            .position(|p| p.explicit_slice)
4395            .unwrap_or(positions.len() - i);
4396        if j != 0 {
4397            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4398            let (n, e) = plan_index_list(
4399                ecx,
4400                expr,
4401                indexes.as_slice(),
4402                remaining_layers,
4403                elem_type_name,
4404            )?;
4405            remaining_layers = n;
4406            expr = e;
4407            i += j;
4408        }
4409
4410        // Take all contiguous slice operations, i.e. find next index operation.
4411        let j = positions[i..]
4412            .iter()
4413            .position(|p| !p.explicit_slice)
4414            .unwrap_or(positions.len() - i);
4415        if j != 0 {
4416            expr = plan_slice_list(
4417                ecx,
4418                expr,
4419                &positions[i..i + j],
4420                remaining_layers,
4421                elem_type_name,
4422            )?;
4423            i += j;
4424        }
4425    }
4426
4427    Ok(expr.into())
4428}
4429
4430fn plan_index_list(
4431    ecx: &ExprContext,
4432    expr: HirScalarExpr,
4433    indexes: &[&Expr<Aug>],
4434    n_layers: usize,
4435    elem_type_name: &str,
4436) -> Result<(usize, HirScalarExpr), PlanError> {
4437    let depth = indexes.len();
4438
4439    if depth > n_layers {
4440        if n_layers == 0 {
4441            sql_bail!("cannot subscript type {}", elem_type_name)
4442        } else {
4443            sql_bail!(
4444                "cannot index into {} layers; list only has {} layer{}",
4445                depth,
4446                n_layers,
4447                if n_layers == 1 { "" } else { "s" }
4448            )
4449        }
4450    }
4451
4452    let mut exprs = Vec::with_capacity(depth + 1);
4453    exprs.push(expr);
4454
4455    for i in indexes {
4456        exprs.push(plan_expr(ecx, i)?.cast_to(
4457            ecx,
4458            CastContext::Explicit,
4459            &SqlScalarType::Int64,
4460        )?);
4461    }
4462
4463    Ok((
4464        n_layers - depth,
4465        HirScalarExpr::call_variadic(ListIndex, exprs),
4466    ))
4467}
4468
4469fn plan_slice_list(
4470    ecx: &ExprContext,
4471    expr: HirScalarExpr,
4472    slices: &[SubscriptPosition<Aug>],
4473    n_layers: usize,
4474    elem_type_name: &str,
4475) -> Result<HirScalarExpr, PlanError> {
4476    if n_layers == 0 {
4477        sql_bail!("cannot subscript type {}", elem_type_name)
4478    }
4479
4480    // first arg will be list
4481    let mut exprs = Vec::with_capacity(slices.len() + 1);
4482    exprs.push(expr);
4483    // extract (start, end) parts from collected slices
4484    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4485        Ok(match position {
4486            Some(p) => {
4487                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4488            }
4489            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4490        })
4491    };
4492    for p in slices {
4493        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4494        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4495        exprs.push(start);
4496        exprs.push(end);
4497    }
4498
4499    Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4500}
4501
4502fn plan_like(
4503    ecx: &ExprContext,
4504    expr: &Expr<Aug>,
4505    pattern: &Expr<Aug>,
4506    escape: Option<&Expr<Aug>>,
4507    case_insensitive: bool,
4508    not: bool,
4509) -> Result<HirScalarExpr, PlanError> {
4510    use CastContext::Implicit;
4511    let ecx = ecx.with_name("LIKE argument");
4512    let expr = plan_expr(&ecx, expr)?;
4513    let haystack = match ecx.scalar_type(&expr) {
4514        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4515            .type_as(&ecx, ty)?
4516            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4517        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4518    };
4519    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4520    if let Some(escape) = escape {
4521        pattern = pattern.call_binary(
4522            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4523            expr_func::LikeEscape,
4524        );
4525    }
4526    let func: BinaryFunc = if case_insensitive {
4527        expr_func::IsLikeMatchCaseInsensitive.into()
4528    } else {
4529        expr_func::IsLikeMatchCaseSensitive.into()
4530    };
4531    let like = haystack.call_binary(pattern, func);
4532    if not {
4533        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4534    } else {
4535        Ok(like)
4536    }
4537}
4538
4539fn plan_subscript_jsonb(
4540    ecx: &ExprContext,
4541    expr: HirScalarExpr,
4542    positions: &[SubscriptPosition<Aug>],
4543) -> Result<CoercibleScalarExpr, PlanError> {
4544    use CastContext::Implicit;
4545    use SqlScalarType::{Int64, String};
4546
4547    // JSONB doesn't support the slicing syntax, so simply error if you
4548    // encounter any explicit slices.
4549    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4550
4551    let mut exprs = Vec::with_capacity(subscripts.len());
4552    for s in subscripts {
4553        let subscript = plan_expr(ecx, s)?;
4554        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4555            subscript
4556        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4557            // Integers are converted to a string here and then re-parsed as an
4558            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4559            // do it.
4560            typeconv::to_string(ecx, subscript)
4561        } else {
4562            sql_bail!("jsonb subscript type must be coercible to integer or text");
4563        };
4564        exprs.push(subscript);
4565    }
4566
4567    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4568    // `expr->subscript` as you might expect.
4569    let expr = expr.call_binary(
4570        HirScalarExpr::call_variadic(
4571            ArrayCreate {
4572                elem_type: SqlScalarType::String,
4573            },
4574            exprs,
4575        ),
4576        expr_func::JsonbGetPath,
4577    );
4578    Ok(expr.into())
4579}
4580
4581fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4582    if !ecx.allow_subqueries {
4583        sql_bail!("{} does not allow subqueries", ecx.name)
4584    }
4585    let mut qcx = ecx.derived_query_context();
4586    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4587    Ok(expr.exists().into())
4588}
4589
4590fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4591    if !ecx.allow_subqueries {
4592        sql_bail!("{} does not allow subqueries", ecx.name)
4593    }
4594    let mut qcx = ecx.derived_query_context();
4595    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4596    let column_types = qcx.relation_type(&expr).column_types;
4597    if column_types.len() != 1 {
4598        sql_bail!(
4599            "Expected subselect to return 1 column, got {} columns",
4600            column_types.len()
4601        );
4602    }
4603    Ok(expr.select().into())
4604}
4605
4606fn plan_list_subquery(
4607    ecx: &ExprContext,
4608    query: &Query<Aug>,
4609) -> Result<CoercibleScalarExpr, PlanError> {
4610    plan_vector_like_subquery(
4611        ecx,
4612        query,
4613        |_| false,
4614        |elem_type| ListCreate { elem_type }.into(),
4615        |order_by| AggregateFunc::ListConcat { order_by },
4616        expr_func::ListListConcat.into(),
4617        |elem_type| {
4618            HirScalarExpr::literal(
4619                Datum::empty_list(),
4620                SqlScalarType::List {
4621                    element_type: Box::new(elem_type),
4622                    custom_id: None,
4623                },
4624            )
4625        },
4626        "list",
4627    )
4628}
4629
4630fn plan_array_subquery(
4631    ecx: &ExprContext,
4632    query: &Query<Aug>,
4633) -> Result<CoercibleScalarExpr, PlanError> {
4634    plan_vector_like_subquery(
4635        ecx,
4636        query,
4637        |elem_type| {
4638            matches!(
4639                elem_type,
4640                SqlScalarType::Char { .. }
4641                    | SqlScalarType::Array { .. }
4642                    | SqlScalarType::List { .. }
4643                    | SqlScalarType::Map { .. }
4644            )
4645        },
4646        |elem_type| ArrayCreate { elem_type }.into(),
4647        |order_by| AggregateFunc::ArrayConcat { order_by },
4648        expr_func::ArrayArrayConcat.into(),
4649        |elem_type| {
4650            HirScalarExpr::literal(
4651                Datum::empty_array(),
4652                SqlScalarType::Array(Box::new(elem_type)),
4653            )
4654        },
4655        "[]",
4656    )
4657}
4658
4659/// Generic function used to plan both array subqueries and list subqueries
4660fn plan_vector_like_subquery<F1, F2, F3, F4>(
4661    ecx: &ExprContext,
4662    query: &Query<Aug>,
4663    is_unsupported_type: F1,
4664    vector_create: F2,
4665    aggregate_concat: F3,
4666    binary_concat: BinaryFunc,
4667    empty_literal: F4,
4668    vector_type_string: &str,
4669) -> Result<CoercibleScalarExpr, PlanError>
4670where
4671    F1: Fn(&SqlScalarType) -> bool,
4672    F2: Fn(SqlScalarType) -> VariadicFunc,
4673    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4674    F4: Fn(SqlScalarType) -> HirScalarExpr,
4675{
4676    if !ecx.allow_subqueries {
4677        sql_bail!("{} does not allow subqueries", ecx.name)
4678    }
4679
4680    let mut qcx = ecx.derived_query_context();
4681    let mut planned_query = plan_query(&mut qcx, query)?;
4682    if planned_query.limit.is_some()
4683        || !planned_query
4684            .offset
4685            .clone()
4686            .try_into_literal_int64()
4687            .is_ok_and(|offset| offset == 0)
4688    {
4689        planned_query.expr = HirRelationExpr::top_k(
4690            planned_query.expr,
4691            vec![],
4692            planned_query.order_by.clone(),
4693            planned_query.limit,
4694            planned_query.offset,
4695            planned_query.group_size_hints.limit_input_group_size,
4696        );
4697    }
4698
4699    if planned_query.project.len() != 1 {
4700        sql_bail!(
4701            "Expected subselect to return 1 column, got {} columns",
4702            planned_query.project.len()
4703        );
4704    }
4705
4706    let project_column = *planned_query.project.get(0).unwrap();
4707    let elem_type = qcx
4708        .relation_type(&planned_query.expr)
4709        .column_types
4710        .get(project_column)
4711        .cloned()
4712        .unwrap()
4713        .scalar_type();
4714
4715    if is_unsupported_type(&elem_type) {
4716        bail_unsupported!(format!(
4717            "cannot build array from subquery because return type {}{}",
4718            ecx.humanize_scalar_type(&elem_type, false),
4719            vector_type_string
4720        ));
4721    }
4722
4723    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4724    // subquery above.
4725    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4726        vector_create(elem_type.clone()),
4727        vec![HirScalarExpr::column(project_column)],
4728    ))
4729    .chain(
4730        planned_query
4731            .order_by
4732            .iter()
4733            .map(|co| HirScalarExpr::column(co.column)),
4734    )
4735    .collect();
4736
4737    // However, column references for `aggregation_projection` and `aggregation_order_by`
4738    // are with reference to the `exprs` of the aggregation expression.  Here that is
4739    // `aggregation_exprs`.
4740    let aggregation_projection = vec![0];
4741    let aggregation_order_by = planned_query
4742        .order_by
4743        .into_iter()
4744        .enumerate()
4745        .map(|(i, order)| ColumnOrder { column: i, ..order })
4746        .collect();
4747
4748    let reduced_expr = planned_query
4749        .expr
4750        .reduce(
4751            vec![],
4752            vec![AggregateExpr {
4753                func: aggregate_concat(aggregation_order_by),
4754                expr: Box::new(HirScalarExpr::call_variadic(
4755                    RecordCreate {
4756                        field_names: iter::repeat(ColumnName::from(""))
4757                            .take(aggregation_exprs.len())
4758                            .collect(),
4759                    },
4760                    aggregation_exprs,
4761                )),
4762                distinct: false,
4763            }],
4764            None,
4765        )
4766        .project(aggregation_projection);
4767
4768    // If `expr` has no rows, return an empty array/list rather than NULL.
4769    Ok(reduced_expr
4770        .select()
4771        .call_binary(empty_literal(elem_type), binary_concat)
4772        .into())
4773}
4774
4775fn plan_map_subquery(
4776    ecx: &ExprContext,
4777    query: &Query<Aug>,
4778) -> Result<CoercibleScalarExpr, PlanError> {
4779    if !ecx.allow_subqueries {
4780        sql_bail!("{} does not allow subqueries", ecx.name)
4781    }
4782
4783    let mut qcx = ecx.derived_query_context();
4784    let mut query = plan_query(&mut qcx, query)?;
4785    if query.limit.is_some()
4786        || !query
4787            .offset
4788            .clone()
4789            .try_into_literal_int64()
4790            .is_ok_and(|offset| offset == 0)
4791    {
4792        query.expr = HirRelationExpr::top_k(
4793            query.expr,
4794            vec![],
4795            query.order_by.clone(),
4796            query.limit,
4797            query.offset,
4798            query.group_size_hints.limit_input_group_size,
4799        );
4800    }
4801    if query.project.len() != 2 {
4802        sql_bail!(
4803            "expected map subquery to return 2 columns, got {} columns",
4804            query.project.len()
4805        );
4806    }
4807
4808    let query_types = qcx.relation_type(&query.expr).column_types;
4809    let key_column = query.project[0];
4810    let key_type = query_types[key_column].clone().scalar_type();
4811    let value_column = query.project[1];
4812    let value_type = query_types[value_column].clone().scalar_type();
4813
4814    if key_type != SqlScalarType::String {
4815        sql_bail!("cannot build map from subquery because first column is not of type text");
4816    }
4817
4818    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4819        RecordCreate {
4820            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4821        },
4822        vec![
4823            HirScalarExpr::column(key_column),
4824            HirScalarExpr::column(value_column),
4825        ],
4826    ))
4827    .chain(
4828        query
4829            .order_by
4830            .iter()
4831            .map(|co| HirScalarExpr::column(co.column)),
4832    )
4833    .collect();
4834
4835    let expr = query
4836        .expr
4837        .reduce(
4838            vec![],
4839            vec![AggregateExpr {
4840                func: AggregateFunc::MapAgg {
4841                    order_by: query
4842                        .order_by
4843                        .into_iter()
4844                        .enumerate()
4845                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4846                        .collect(),
4847                    value_type: value_type.clone(),
4848                },
4849                expr: Box::new(HirScalarExpr::call_variadic(
4850                    RecordCreate {
4851                        field_names: iter::repeat(ColumnName::from(""))
4852                            .take(aggregation_exprs.len())
4853                            .collect(),
4854                    },
4855                    aggregation_exprs,
4856                )),
4857                distinct: false,
4858            }],
4859            None,
4860        )
4861        .project(vec![0]);
4862
4863    // If `expr` has no rows, return an empty map rather than NULL.
4864    let expr = HirScalarExpr::call_variadic(
4865        Coalesce,
4866        vec![
4867            expr.select(),
4868            HirScalarExpr::literal(
4869                Datum::empty_map(),
4870                SqlScalarType::Map {
4871                    value_type: Box::new(value_type),
4872                    custom_id: None,
4873                },
4874            ),
4875        ],
4876    );
4877
4878    Ok(expr.into())
4879}
4880
4881fn plan_collate(
4882    ecx: &ExprContext,
4883    expr: &Expr<Aug>,
4884    collation: &UnresolvedItemName,
4885) -> Result<CoercibleScalarExpr, PlanError> {
4886    if collation.0.len() == 2
4887        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4888        && collation.0[1] == ident!("default")
4889    {
4890        plan_expr(ecx, expr)
4891    } else {
4892        bail_unsupported!("COLLATE");
4893    }
4894}
4895
4896/// Plans a slice of expressions.
4897///
4898/// This function is a simple convenience function for mapping [`plan_expr`]
4899/// over a slice of expressions. The planned expressions are returned in the
4900/// same order as the input. If any of the expressions fail to plan, returns an
4901/// error instead.
4902fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4903where
4904    E: std::borrow::Borrow<Expr<Aug>>,
4905{
4906    let mut out = vec![];
4907    for expr in exprs {
4908        out.push(plan_expr(ecx, expr.borrow())?);
4909    }
4910    Ok(out)
4911}
4912
4913/// Plans an `ARRAY` expression.
4914fn plan_array(
4915    ecx: &ExprContext,
4916    exprs: &[Expr<Aug>],
4917    type_hint: Option<&SqlScalarType>,
4918) -> Result<CoercibleScalarExpr, PlanError> {
4919    // Plan each element expression.
4920    let mut out = vec![];
4921    for expr in exprs {
4922        out.push(match expr {
4923            // Special case nested ARRAY expressions so we can plumb
4924            // the type hint through.
4925            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4926            _ => plan_expr(ecx, expr)?,
4927        });
4928    }
4929
4930    // Attempt to make use of the type hint.
4931    let type_hint = match type_hint {
4932        // The user has provided an explicit cast to an array type. We know the
4933        // element type to coerce to. Need to be careful, though: if there's
4934        // evidence that any of the array elements are themselves arrays, we
4935        // want to coerce to the array type, not the element type.
4936        Some(SqlScalarType::Array(elem_type)) => {
4937            let multidimensional = out.iter().any(|e| {
4938                matches!(
4939                    ecx.scalar_type(e),
4940                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4941                )
4942            });
4943            if multidimensional {
4944                type_hint
4945            } else {
4946                Some(&**elem_type)
4947            }
4948        }
4949        // The user provided an explicit cast to a non-array type. We'll have to
4950        // guess what the correct type for the array. Our caller will then
4951        // handle converting that array type to the desired non-array type.
4952        Some(_) => None,
4953        // No type hint. We'll have to guess the correct type for the array.
4954        None => None,
4955    };
4956
4957    // Coerce all elements to the same type.
4958    let (elem_type, exprs) = if exprs.is_empty() {
4959        if let Some(elem_type) = type_hint {
4960            (elem_type.clone(), vec![])
4961        } else {
4962            sql_bail!("cannot determine type of empty array");
4963        }
4964    } else {
4965        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4966        (ecx.scalar_type(&out[0]), out)
4967    };
4968
4969    // Arrays of `char` type are disallowed due to a known limitation:
4970    // https://github.com/MaterializeInc/database-issues/issues/2360.
4971    //
4972    // Arrays of `list` and `map` types are disallowed due to mind-bending
4973    // semantics.
4974    if matches!(
4975        elem_type,
4976        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4977    ) {
4978        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4979    }
4980
4981    Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4982}
4983
4984fn plan_list(
4985    ecx: &ExprContext,
4986    exprs: &[Expr<Aug>],
4987    type_hint: Option<&SqlScalarType>,
4988) -> Result<CoercibleScalarExpr, PlanError> {
4989    let (elem_type, exprs) = if exprs.is_empty() {
4990        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4991            (element_type.without_modifiers(), vec![])
4992        } else {
4993            sql_bail!("cannot determine type of empty list");
4994        }
4995    } else {
4996        let type_hint = match type_hint {
4997            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4998            _ => None,
4999        };
5000
5001        let mut out = vec![];
5002        for expr in exprs {
5003            out.push(match expr {
5004                // Special case nested LIST expressions so we can plumb
5005                // the type hint through.
5006                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5007                _ => plan_expr(ecx, expr)?,
5008            });
5009        }
5010        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5011        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5012    };
5013
5014    if matches!(elem_type, SqlScalarType::Char { .. }) {
5015        bail_unsupported!("char list");
5016    }
5017
5018    Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5019}
5020
5021fn plan_map(
5022    ecx: &ExprContext,
5023    entries: &[MapEntry<Aug>],
5024    type_hint: Option<&SqlScalarType>,
5025) -> Result<CoercibleScalarExpr, PlanError> {
5026    let (value_type, exprs) = if entries.is_empty() {
5027        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5028            (value_type.without_modifiers(), vec![])
5029        } else {
5030            sql_bail!("cannot determine type of empty map");
5031        }
5032    } else {
5033        let type_hint = match type_hint {
5034            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5035            _ => None,
5036        };
5037
5038        let mut keys = vec![];
5039        let mut values = vec![];
5040        for MapEntry { key, value } in entries {
5041            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5042            let value = match value {
5043                // Special case nested MAP expressions so we can plumb
5044                // the type hint through.
5045                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5046                _ => plan_expr(ecx, value)?,
5047            };
5048            keys.push(key);
5049            values.push(value);
5050        }
5051        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5052        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5053        let out = itertools::interleave(keys, values).collect();
5054        (value_type, out)
5055    };
5056
5057    if matches!(value_type, SqlScalarType::Char { .. }) {
5058        bail_unsupported!("char map");
5059    }
5060
5061    let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5062    Ok(expr.into())
5063}
5064
5065/// Coerces a list of expressions such that all input expressions will be cast
5066/// to the same type. If successful, returns a new list of expressions in the
5067/// same order as the input, where each expression has the appropriate casts to
5068/// make them all of a uniform type.
5069///
5070/// If `force_type` is `Some`, the expressions are forced to the specified type
5071/// via an explicit cast. Otherwise the best common type is guessed via
5072/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5073/// implicit casts
5074///
5075/// Note that this is our implementation of Postgres' type conversion for
5076/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5077/// isn't yet used in all of those cases.
5078///
5079/// [union-type-conv]:
5080/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5081pub fn coerce_homogeneous_exprs(
5082    ecx: &ExprContext,
5083    exprs: Vec<CoercibleScalarExpr>,
5084    force_type: Option<&SqlScalarType>,
5085) -> Result<Vec<HirScalarExpr>, PlanError> {
5086    assert!(!exprs.is_empty());
5087
5088    let target_holder;
5089    let target = match force_type {
5090        Some(t) => t,
5091        None => {
5092            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5093            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5094            &target_holder
5095        }
5096    };
5097
5098    // Try to cast all expressions to `target`.
5099    let mut out = Vec::new();
5100    for expr in exprs {
5101        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5102        let ccx = match force_type {
5103            None => CastContext::Implicit,
5104            Some(_) => CastContext::Explicit,
5105        };
5106        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5107            Ok(expr) => out.push(expr),
5108            Err(_) => sql_bail!(
5109                "{} could not convert type {} to {}",
5110                ecx.name,
5111                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5112                ecx.humanize_scalar_type(target, false),
5113            ),
5114        }
5115    }
5116    Ok(out)
5117}
5118
5119/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5120/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5121pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5122    obe: &OrderByExpr<T>,
5123    column: usize,
5124) -> ColumnOrder {
5125    let desc = !obe.asc.unwrap_or(true);
5126    ColumnOrder {
5127        column,
5128        desc,
5129        // https://www.postgresql.org/docs/14/queries-order.html
5130        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5131        nulls_last: obe.nulls_last.unwrap_or(!desc),
5132    }
5133}
5134
5135/// Plans the ORDER BY clause of a window function.
5136///
5137/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5138/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5139/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5140/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5141/// `Vec<HirScalarExpr>` that we return.
5142fn plan_function_order_by(
5143    ecx: &ExprContext,
5144    order_by: &[OrderByExpr<Aug>],
5145) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5146    let mut order_by_exprs = vec![];
5147    let mut col_orders = vec![];
5148    {
5149        for (i, obe) in order_by.iter().enumerate() {
5150            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5151            // do not support ordinal references in PostgreSQL. So we use
5152            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5153            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5154            order_by_exprs.push(expr);
5155            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5156        }
5157    }
5158    Ok((order_by_exprs, col_orders))
5159}
5160
5161/// Common part of the planning of windowed and non-windowed aggregation functions.
5162fn plan_aggregate_common(
5163    ecx: &ExprContext,
5164    Function::<Aug> {
5165        name,
5166        args,
5167        filter,
5168        over: _,
5169        distinct,
5170    }: &Function<Aug>,
5171) -> Result<AggregateExpr, PlanError> {
5172    // Normal aggregate functions, like `sum`, expect as input a single expression
5173    // which yields the datum to aggregate. Order sensitive aggregate functions,
5174    // like `jsonb_agg`, are special, and instead expect a Record whose first
5175    // element yields the datum to aggregate and whose successive elements yield
5176    // keys to order by. This expectation is hard coded within the implementation
5177    // of each of the order-sensitive aggregates. The specification of how many
5178    // order by keys to consider, and in what order, is passed via the `order_by`
5179    // field on the `AggregateFunc` variant.
5180
5181    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5182    // most, so explicitly drop it if the function doesn't care about order. This
5183    // prevents the projection into Record below from triggering on unsupported
5184    // functions.
5185
5186    let impls = match resolve_func(ecx, name, args)? {
5187        Func::Aggregate(impls) => impls,
5188        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5189    };
5190
5191    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5192    // generalized function selection framework. The rule is simple: the user
5193    // must type `count(*)`, but the function selection framework sees an empty
5194    // parameter list, as if the user had typed `count()`. But if the user types
5195    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5196    // rules to all aggregates, not just `count`, since we may one day support
5197    // user-defined aggregates, including user-defined aggregates that take no
5198    // parameters.
5199    let (args, order_by) = match &args {
5200        FunctionArgs::Star => (vec![], vec![]),
5201        FunctionArgs::Args { args, order_by } => {
5202            if args.is_empty() {
5203                sql_bail!(
5204                    "{}(*) must be used to call a parameterless aggregate function",
5205                    ecx.qcx
5206                        .scx
5207                        .humanize_resolved_name(name)
5208                        .expect("name actually resolved")
5209                );
5210            }
5211            let args = plan_exprs(ecx, args)?;
5212            (args, order_by.clone())
5213        }
5214    };
5215
5216    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5217
5218    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5219    if let Some(filter) = &filter {
5220        // If a filter is present, as in
5221        //
5222        //     <agg>(<expr>) FILTER (WHERE <cond>)
5223        //
5224        // we plan it by essentially rewriting the expression to
5225        //
5226        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5227        //
5228        // where <identity> is the identity input for <agg>.
5229        let cond =
5230            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5231        let expr_typ = ecx.scalar_type(&expr);
5232        expr = HirScalarExpr::if_then_else(
5233            cond,
5234            expr,
5235            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5236        );
5237    }
5238
5239    let mut seen_outer = false;
5240    let mut seen_inner = false;
5241    #[allow(deprecated)]
5242    expr.visit_columns(0, &mut |depth, col| {
5243        if depth == 0 && col.level == 0 {
5244            seen_inner = true;
5245        } else if col.level > depth {
5246            seen_outer = true;
5247        }
5248    });
5249    if seen_outer && !seen_inner {
5250        bail_unsupported!(
5251            3720,
5252            "aggregate functions that refer exclusively to outer columns"
5253        );
5254    }
5255
5256    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5257    // map the needed expressions into the aggregate datum.
5258    if func.is_order_sensitive() {
5259        let field_names = iter::repeat(ColumnName::from(""))
5260            .take(1 + order_by_exprs.len())
5261            .collect();
5262        let mut exprs = vec![expr];
5263        exprs.extend(order_by_exprs);
5264        expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5265    }
5266
5267    Ok(AggregateExpr {
5268        func,
5269        expr: Box::new(expr),
5270        distinct: *distinct,
5271    })
5272}
5273
5274fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5275    let mut names = names.to_vec();
5276    let col_name = normalize::column_name(names.pop().unwrap());
5277
5278    // If the name is qualified, it must refer to a column in a table.
5279    if !names.is_empty() {
5280        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5281        let (i, i_name) = ecx.scope.resolve_table_column(
5282            &ecx.qcx.outer_scopes,
5283            &table_name,
5284            &col_name,
5285            &mut ecx.qcx.name_manager.borrow_mut(),
5286        )?;
5287        return Ok(HirScalarExpr::named_column(i, i_name));
5288    }
5289
5290    // If the name is unqualified, first check if it refers to a column. Track any similar names
5291    // that might exist for a better error message.
5292    let similar_names = match ecx.scope.resolve_column(
5293        &ecx.qcx.outer_scopes,
5294        &col_name,
5295        &mut ecx.qcx.name_manager.borrow_mut(),
5296    ) {
5297        Ok((i, i_name)) => {
5298            return Ok(HirScalarExpr::named_column(i, i_name));
5299        }
5300        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5301        Err(e) => return Err(e),
5302    };
5303
5304    // The name doesn't refer to a column. Check if it is a whole-row reference
5305    // to a table.
5306    let items = ecx.scope.items_from_table(
5307        &ecx.qcx.outer_scopes,
5308        &PartialItemName {
5309            database: None,
5310            schema: None,
5311            item: col_name.as_str().to_owned(),
5312        },
5313    )?;
5314    match items.as_slice() {
5315        // The name doesn't refer to a table either. Return an error.
5316        [] => Err(PlanError::UnknownColumn {
5317            table: None,
5318            column: col_name,
5319            similar: similar_names,
5320        }),
5321        // The name refers to a table that is the result of a function that
5322        // returned a single column. Per PostgreSQL, this is a special case
5323        // that returns the value directly.
5324        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5325        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5326            *column,
5327            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5328        )),
5329        // The name refers to a normal table. Return a record containing all the
5330        // columns of the table.
5331        _ => {
5332            let mut has_exists_column = None;
5333            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5334                .into_iter()
5335                .filter_map(|(column, item)| {
5336                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5337                        has_exists_column = Some(column);
5338                        None
5339                    } else {
5340                        let expr = HirScalarExpr::named_column(
5341                            column,
5342                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5343                        );
5344                        let name = item.column_name.clone();
5345                        Some((expr, name))
5346                    }
5347                })
5348                .unzip();
5349            // For the special case of a table function with a single column, the single column is instead not wrapped.
5350            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5351                exprs.into_element()
5352            } else {
5353                HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5354            };
5355            if let Some(has_exists_column) = has_exists_column {
5356                Ok(HirScalarExpr::if_then_else(
5357                    HirScalarExpr::unnamed_column(has_exists_column)
5358                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5359                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5360                    expr,
5361                ))
5362            } else {
5363                Ok(expr)
5364            }
5365        }
5366    }
5367}
5368
5369fn plan_op(
5370    ecx: &ExprContext,
5371    op: &str,
5372    expr1: &Expr<Aug>,
5373    expr2: Option<&Expr<Aug>>,
5374) -> Result<HirScalarExpr, PlanError> {
5375    let impls = func::resolve_op(op)?;
5376    let args = match expr2 {
5377        None => plan_exprs(ecx, &[expr1])?,
5378        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5379    };
5380    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5381}
5382
5383fn plan_function<'a>(
5384    ecx: &ExprContext,
5385    f @ Function {
5386        name,
5387        args,
5388        filter,
5389        over,
5390        distinct,
5391    }: &'a Function<Aug>,
5392) -> Result<HirScalarExpr, PlanError> {
5393    let impls = match resolve_func(ecx, name, args)? {
5394        Func::Table(_) => {
5395            sql_bail!(
5396                "table functions are not allowed in {} (function {})",
5397                ecx.name,
5398                name
5399            );
5400        }
5401        Func::Scalar(impls) => {
5402            if over.is_some() {
5403                sql_bail!(
5404                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5405                );
5406            }
5407            impls
5408        }
5409        Func::ScalarWindow(impls) => {
5410            let (
5411                ignore_nulls,
5412                order_by_exprs,
5413                col_orders,
5414                _window_frame,
5415                partition_by,
5416                scalar_args,
5417            ) = plan_window_function_non_aggr(ecx, f)?;
5418
5419            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5420            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5421            // msg there is less informative.)
5422            if !scalar_args.is_empty() {
5423                if let ResolvedItemName::Item {
5424                    full_name: FullItemName { item, .. },
5425                    ..
5426                } = name
5427                {
5428                    sql_bail!(
5429                        "function {} has 0 parameters, but was called with {}",
5430                        item,
5431                        scalar_args.len()
5432                    );
5433                }
5434            }
5435
5436            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5437            // accept a window frame here without an error msg. (Postgres also does this.)
5438            // TODO: maybe we should give a notice
5439
5440            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5441
5442            if ignore_nulls {
5443                // If we ever add a scalar window function that supports ignore, then don't forget
5444                // to also update HIR EXPLAIN.
5445                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5446            }
5447
5448            return Ok(HirScalarExpr::windowing(WindowExpr {
5449                func: WindowExprType::Scalar(ScalarWindowExpr {
5450                    func,
5451                    order_by: col_orders,
5452                }),
5453                partition_by,
5454                order_by: order_by_exprs,
5455            }));
5456        }
5457        Func::ValueWindow(impls) => {
5458            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5459            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5460                window_plan;
5461
5462            let (args_encoded, func) =
5463                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5464
5465            if ignore_nulls {
5466                match func {
5467                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5468                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5469                }
5470            }
5471
5472            return Ok(HirScalarExpr::windowing(WindowExpr {
5473                func: WindowExprType::Value(ValueWindowExpr {
5474                    func,
5475                    args: Box::new(args_encoded),
5476                    order_by: col_orders,
5477                    window_frame,
5478                    ignore_nulls, // (RESPECT NULLS is the default)
5479                }),
5480                partition_by,
5481                order_by: order_by_exprs,
5482            }));
5483        }
5484        Func::Aggregate(_) => {
5485            if f.over.is_none() {
5486                // Not a window aggregate. Something is wrong.
5487                if ecx.allow_aggregates {
5488                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5489                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5490                    sql_bail!(
5491                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5492                        name,
5493                    );
5494                } else {
5495                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5496                    // because it was in an unsupported context.
5497                    sql_bail!(
5498                        "aggregate functions are not allowed in {} (function {})",
5499                        ecx.name,
5500                        name
5501                    );
5502                }
5503            } else {
5504                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5505                    plan_window_function_common(ecx, &f.name, &f.over)?;
5506
5507                // https://github.com/MaterializeInc/database-issues/issues/6720
5508                match (&window_frame.start_bound, &window_frame.end_bound) {
5509                    (
5510                        mz_expr::WindowFrameBound::UnboundedPreceding,
5511                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5512                    )
5513                    | (
5514                        mz_expr::WindowFrameBound::UnboundedPreceding,
5515                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5516                    )
5517                    | (
5518                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5519                        mz_expr::WindowFrameBound::UnboundedFollowing,
5520                    )
5521                    | (
5522                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5523                        mz_expr::WindowFrameBound::UnboundedFollowing,
5524                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5525                    (_, _) => {} // other cases are ok
5526                }
5527
5528                if ignore_nulls {
5529                    // https://github.com/MaterializeInc/database-issues/issues/6722
5530                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5531                    // forget to also update HIR EXPLAIN.
5532                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5533                }
5534
5535                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5536
5537                if aggregate_expr.distinct {
5538                    // https://github.com/MaterializeInc/database-issues/issues/6626
5539                    bail_unsupported!("DISTINCT in window aggregates");
5540                }
5541
5542                return Ok(HirScalarExpr::windowing(WindowExpr {
5543                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5544                        aggregate_expr,
5545                        order_by: col_orders,
5546                        window_frame,
5547                    }),
5548                    partition_by,
5549                    order_by: order_by_exprs,
5550                }));
5551            }
5552        }
5553    };
5554
5555    if over.is_some() {
5556        unreachable!("If there is an OVER clause, we should have returned already above.");
5557    }
5558
5559    if *distinct {
5560        sql_bail!(
5561            "DISTINCT specified, but {} is not an aggregate function",
5562            ecx.qcx
5563                .scx
5564                .humanize_resolved_name(name)
5565                .expect("already resolved")
5566        );
5567    }
5568    if filter.is_some() {
5569        sql_bail!(
5570            "FILTER specified, but {} is not an aggregate function",
5571            ecx.qcx
5572                .scx
5573                .humanize_resolved_name(name)
5574                .expect("already resolved")
5575        );
5576    }
5577
5578    let scalar_args = match &args {
5579        FunctionArgs::Star => {
5580            sql_bail!(
5581                "* argument is invalid with non-aggregate function {}",
5582                ecx.qcx
5583                    .scx
5584                    .humanize_resolved_name(name)
5585                    .expect("already resolved")
5586            )
5587        }
5588        FunctionArgs::Args { args, order_by } => {
5589            if !order_by.is_empty() {
5590                sql_bail!(
5591                    "ORDER BY specified, but {} is not an aggregate function",
5592                    ecx.qcx
5593                        .scx
5594                        .humanize_resolved_name(name)
5595                        .expect("already resolved")
5596                );
5597            }
5598            plan_exprs(ecx, args)?
5599        }
5600    };
5601
5602    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5603}
5604
5605pub const IGNORE_NULLS_ERROR_MSG: &str =
5606    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5607
5608/// Resolves the name to a set of function implementations.
5609///
5610/// If the name does not specify a known built-in function, returns an error.
5611pub fn resolve_func(
5612    ecx: &ExprContext,
5613    name: &ResolvedItemName,
5614    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5615) -> Result<&'static Func, PlanError> {
5616    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5617        if let Ok(f) = i.func() {
5618            return Ok(f);
5619        }
5620    }
5621
5622    // Couldn't resolve function with this name, so generate verbose error
5623    // message.
5624    let cexprs = match args {
5625        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5626        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5627            if !order_by.is_empty() {
5628                sql_bail!(
5629                    "ORDER BY specified, but {} is not an aggregate function",
5630                    name
5631                );
5632            }
5633            plan_exprs(ecx, args)?
5634        }
5635    };
5636
5637    let arg_types: Vec<_> = cexprs
5638        .into_iter()
5639        .map(|ty| match ecx.scalar_type(&ty) {
5640            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5641            CoercibleScalarType::Record(_) => "record".to_string(),
5642            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5643        })
5644        .collect();
5645
5646    Err(PlanError::UnknownFunction {
5647        name: name.to_string(),
5648        arg_types,
5649    })
5650}
5651
5652fn plan_is_expr<'a>(
5653    ecx: &ExprContext,
5654    expr: &'a Expr<Aug>,
5655    construct: &IsExprConstruct<Aug>,
5656    not: bool,
5657) -> Result<HirScalarExpr, PlanError> {
5658    let expr_hir = plan_expr(ecx, expr)?;
5659
5660    let mut result = match construct {
5661        IsExprConstruct::Null => {
5662            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5663            // at odds with our type coercion rules, which treat `NULL` literals
5664            // and unconstrained parameters identically. Providing a type hint
5665            // of string means we wind up supporting both.
5666            expr_hir.type_as_any(ecx)?.call_is_null()
5667        }
5668        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5669        IsExprConstruct::True => expr_hir
5670            .type_as(ecx, &SqlScalarType::Bool)?
5671            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5672        IsExprConstruct::False => expr_hir
5673            .type_as(ecx, &SqlScalarType::Bool)?
5674            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5675        IsExprConstruct::DistinctFrom(expr2) => {
5676            // There are three cases:
5677            // 1. Both terms are non-null, in which case the result should be `a != b`.
5678            // 2. Exactly one term is null, in which case the result should be true.
5679            // 3. Both terms are null, in which case the result should be false.
5680            //
5681            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5682
5683            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5684            // construct an AST expression for `expr != expr2` and plan it to get proper type
5685            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5686            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5687            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5688
5689            let expr1_hir = expr_hir.type_as_any(ecx)?;
5690            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5691
5692            let term1 = HirScalarExpr::variadic_or(vec![
5693                ne_hir,
5694                expr1_hir.clone().call_is_null(),
5695                expr2_hir.clone().call_is_null(),
5696            ]);
5697            let term2 = HirScalarExpr::variadic_or(vec![
5698                expr1_hir.call_is_null().not(),
5699                expr2_hir.call_is_null().not(),
5700            ]);
5701            term1.and(term2)
5702        }
5703    };
5704    if not {
5705        result = result.not();
5706    }
5707    Ok(result)
5708}
5709
5710fn plan_case<'a>(
5711    ecx: &ExprContext,
5712    operand: &'a Option<Box<Expr<Aug>>>,
5713    conditions: &'a [Expr<Aug>],
5714    results: &'a [Expr<Aug>],
5715    else_result: &'a Option<Box<Expr<Aug>>>,
5716) -> Result<HirScalarExpr, PlanError> {
5717    let mut cond_exprs = Vec::new();
5718    let mut result_exprs = Vec::new();
5719    for (c, r) in conditions.iter().zip_eq(results) {
5720        let c = match operand {
5721            Some(operand) => operand.clone().equals(c.clone()),
5722            None => c.clone(),
5723        };
5724        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5725        cond_exprs.push(cexpr);
5726        result_exprs.push(r);
5727    }
5728    result_exprs.push(match else_result {
5729        Some(else_result) => else_result,
5730        None => &Expr::Value(Value::Null),
5731    });
5732    let mut result_exprs = coerce_homogeneous_exprs(
5733        &ecx.with_name("CASE"),
5734        plan_exprs(ecx, &result_exprs)?,
5735        None,
5736    )?;
5737    let mut expr = result_exprs.pop().unwrap();
5738    assert_eq!(cond_exprs.len(), result_exprs.len());
5739    for (cexpr, rexpr) in cond_exprs
5740        .into_iter()
5741        .rev()
5742        .zip_eq(result_exprs.into_iter().rev())
5743    {
5744        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5745    }
5746    Ok(expr)
5747}
5748
5749fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5750    let (datum, scalar_type) = match l {
5751        Value::Number(s) => {
5752            let d = strconv::parse_numeric(s.as_str())?;
5753            if !s.contains(&['E', '.'][..]) {
5754                // Maybe representable as an int?
5755                if let Ok(n) = d.0.try_into() {
5756                    (Datum::Int32(n), SqlScalarType::Int32)
5757                } else if let Ok(n) = d.0.try_into() {
5758                    (Datum::Int64(n), SqlScalarType::Int64)
5759                } else {
5760                    (
5761                        Datum::Numeric(d),
5762                        SqlScalarType::Numeric { max_scale: None },
5763                    )
5764                }
5765            } else {
5766                (
5767                    Datum::Numeric(d),
5768                    SqlScalarType::Numeric { max_scale: None },
5769                )
5770            }
5771        }
5772        Value::HexString(_) => bail_unsupported!("hex string literals"),
5773        Value::Boolean(b) => match b {
5774            false => (Datum::False, SqlScalarType::Bool),
5775            true => (Datum::True, SqlScalarType::Bool),
5776        },
5777        Value::Interval(i) => {
5778            let i = literal::plan_interval(i)?;
5779            (Datum::Interval(i), SqlScalarType::Interval)
5780        }
5781        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5782        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5783    };
5784    let expr = HirScalarExpr::literal(datum, scalar_type);
5785    Ok(expr.into())
5786}
5787
5788/// The common part of the planning of non-aggregate window functions, i.e.,
5789/// scalar window functions and value window functions.
5790fn plan_window_function_non_aggr<'a>(
5791    ecx: &ExprContext,
5792    Function {
5793        name,
5794        args,
5795        filter,
5796        over,
5797        distinct,
5798    }: &'a Function<Aug>,
5799) -> Result<
5800    (
5801        bool,
5802        Vec<HirScalarExpr>,
5803        Vec<ColumnOrder>,
5804        mz_expr::WindowFrame,
5805        Vec<HirScalarExpr>,
5806        Vec<CoercibleScalarExpr>,
5807    ),
5808    PlanError,
5809> {
5810    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5811        plan_window_function_common(ecx, name, over)?;
5812
5813    if *distinct {
5814        sql_bail!(
5815            "DISTINCT specified, but {} is not an aggregate function",
5816            name
5817        );
5818    }
5819
5820    if filter.is_some() {
5821        bail_unsupported!("FILTER in non-aggregate window functions");
5822    }
5823
5824    let scalar_args = match &args {
5825        FunctionArgs::Star => {
5826            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5827        }
5828        FunctionArgs::Args { args, order_by } => {
5829            if !order_by.is_empty() {
5830                sql_bail!(
5831                    "ORDER BY specified, but {} is not an aggregate function",
5832                    name
5833                );
5834            }
5835            plan_exprs(ecx, args)?
5836        }
5837    };
5838
5839    Ok((
5840        ignore_nulls,
5841        order_by_exprs,
5842        col_orders,
5843        window_frame,
5844        partition,
5845        scalar_args,
5846    ))
5847}
5848
5849/// The common part of the planning of all window functions.
5850fn plan_window_function_common(
5851    ecx: &ExprContext,
5852    name: &<Aug as AstInfo>::ItemName,
5853    over: &Option<WindowSpec<Aug>>,
5854) -> Result<
5855    (
5856        bool,
5857        Vec<HirScalarExpr>,
5858        Vec<ColumnOrder>,
5859        mz_expr::WindowFrame,
5860        Vec<HirScalarExpr>,
5861    ),
5862    PlanError,
5863> {
5864    if !ecx.allow_windows {
5865        sql_bail!(
5866            "window functions are not allowed in {} (function {})",
5867            ecx.name,
5868            name
5869        );
5870    }
5871
5872    let window_spec = match over.as_ref() {
5873        Some(over) => over,
5874        None => sql_bail!("window function {} requires an OVER clause", name),
5875    };
5876    if window_spec.ignore_nulls && window_spec.respect_nulls {
5877        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5878    }
5879    let window_frame = match window_spec.window_frame.as_ref() {
5880        Some(frame) => plan_window_frame(frame)?,
5881        None => mz_expr::WindowFrame::default(),
5882    };
5883    let mut partition = Vec::new();
5884    for expr in &window_spec.partition_by {
5885        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5886    }
5887
5888    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5889
5890    Ok((
5891        window_spec.ignore_nulls,
5892        order_by_exprs,
5893        col_orders,
5894        window_frame,
5895        partition,
5896    ))
5897}
5898
5899fn plan_window_frame(
5900    WindowFrame {
5901        units,
5902        start_bound,
5903        end_bound,
5904    }: &WindowFrame,
5905) -> Result<mz_expr::WindowFrame, PlanError> {
5906    use mz_expr::WindowFrameBound::*;
5907    let units = window_frame_unit_ast_to_expr(units)?;
5908    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5909    let end_bound = end_bound
5910        .as_ref()
5911        .map(window_frame_bound_ast_to_expr)
5912        .unwrap_or(CurrentRow);
5913
5914    // Validate bounds according to Postgres rules
5915    match (&start_bound, &end_bound) {
5916        // Start bound can't be UNBOUNDED FOLLOWING
5917        (UnboundedFollowing, _) => {
5918            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5919        }
5920        // End bound can't be UNBOUNDED PRECEDING
5921        (_, UnboundedPreceding) => {
5922            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5923        }
5924        // Start bound should come before end bound in the list of bound definitions
5925        (CurrentRow, OffsetPreceding(_)) => {
5926            sql_bail!("frame starting from current row cannot have preceding rows")
5927        }
5928        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5929            sql_bail!("frame starting from following row cannot have preceding rows")
5930        }
5931        // The above rules are adopted from Postgres.
5932        // The following rules are Materialize-specific.
5933        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5934            // Note that the only hard limit is that partition size + offset should fit in i64, so
5935            // in theory, we could support much larger offsets than this. But for our current
5936            // performance, even 1000000 is quite big.
5937            if *o1 > 1000000 || *o2 > 1000000 {
5938                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5939            }
5940        }
5941        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5942            if *o1 > 1000000 || *o2 > 1000000 {
5943                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5944            }
5945        }
5946        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5947            if *o1 > 1000000 || *o2 > 1000000 {
5948                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5949            }
5950        }
5951        (OffsetPreceding(o), CurrentRow) => {
5952            if *o > 1000000 {
5953                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5954            }
5955        }
5956        (CurrentRow, OffsetFollowing(o)) => {
5957            if *o > 1000000 {
5958                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5959            }
5960        }
5961        // Other bounds are valid
5962        (_, _) => (),
5963    }
5964
5965    // RANGE is only supported in the default frame
5966    // https://github.com/MaterializeInc/database-issues/issues/6585
5967    if units == mz_expr::WindowFrameUnits::Range
5968        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5969    {
5970        bail_unsupported!("RANGE in non-default window frames")
5971    }
5972
5973    let frame = mz_expr::WindowFrame {
5974        units,
5975        start_bound,
5976        end_bound,
5977    };
5978    Ok(frame)
5979}
5980
5981fn window_frame_unit_ast_to_expr(
5982    unit: &WindowFrameUnits,
5983) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5984    match unit {
5985        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5986        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5987        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5988    }
5989}
5990
5991fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5992    match bound {
5993        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5994        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5995        WindowFrameBound::Preceding(Some(offset)) => {
5996            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5997        }
5998        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5999        WindowFrameBound::Following(Some(offset)) => {
6000            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6001        }
6002    }
6003}
6004
6005pub fn scalar_type_from_sql(
6006    scx: &StatementContext,
6007    data_type: &ResolvedDataType,
6008) -> Result<SqlScalarType, PlanError> {
6009    match data_type {
6010        ResolvedDataType::AnonymousList(elem_type) => {
6011            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6012            if matches!(elem_type, SqlScalarType::Char { .. }) {
6013                bail_unsupported!("char list");
6014            }
6015            Ok(SqlScalarType::List {
6016                element_type: Box::new(elem_type),
6017                custom_id: None,
6018            })
6019        }
6020        ResolvedDataType::AnonymousMap {
6021            key_type,
6022            value_type,
6023        } => {
6024            match scalar_type_from_sql(scx, key_type)? {
6025                SqlScalarType::String => {}
6026                other => sql_bail!(
6027                    "map key type must be {}, got {}",
6028                    scx.humanize_scalar_type(&SqlScalarType::String, false),
6029                    scx.humanize_scalar_type(&other, false)
6030                ),
6031            }
6032            Ok(SqlScalarType::Map {
6033                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6034                custom_id: None,
6035            })
6036        }
6037        ResolvedDataType::Named { id, modifiers, .. } => {
6038            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6039        }
6040        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6041    }
6042}
6043
6044pub fn scalar_type_from_catalog(
6045    catalog: &dyn SessionCatalog,
6046    id: CatalogItemId,
6047    modifiers: &[i64],
6048) -> Result<SqlScalarType, PlanError> {
6049    let entry = catalog.get_item(&id);
6050    let type_details = match entry.type_details() {
6051        Some(type_details) => type_details,
6052        None => {
6053            // Resolution should never produce a `ResolvedDataType::Named` with
6054            // an ID of a non-type, but we error gracefully just in case.
6055            sql_bail!(
6056                "internal error: {} does not refer to a type",
6057                catalog.resolve_full_name(entry.name()).to_string().quoted()
6058            );
6059        }
6060    };
6061    match &type_details.typ {
6062        CatalogType::Numeric => {
6063            let mut modifiers = modifiers.iter().fuse();
6064            let precision = match modifiers.next() {
6065                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6066                    sql_bail!(
6067                        "precision for type numeric must be between 1 and {}",
6068                        NUMERIC_DATUM_MAX_PRECISION,
6069                    );
6070                }
6071                Some(p) => Some(*p),
6072                None => None,
6073            };
6074            let scale = match modifiers.next() {
6075                Some(scale) => {
6076                    if let Some(precision) = precision {
6077                        if *scale > precision {
6078                            sql_bail!(
6079                                "scale for type numeric must be between 0 and precision {}",
6080                                precision
6081                            );
6082                        }
6083                    }
6084                    Some(NumericMaxScale::try_from(*scale)?)
6085                }
6086                None => None,
6087            };
6088            if modifiers.next().is_some() {
6089                sql_bail!("type numeric supports at most two type modifiers");
6090            }
6091            Ok(SqlScalarType::Numeric { max_scale: scale })
6092        }
6093        CatalogType::Char => {
6094            let mut modifiers = modifiers.iter().fuse();
6095            let length = match modifiers.next() {
6096                Some(l) => Some(CharLength::try_from(*l)?),
6097                None => Some(CharLength::ONE),
6098            };
6099            if modifiers.next().is_some() {
6100                sql_bail!("type character supports at most one type modifier");
6101            }
6102            Ok(SqlScalarType::Char { length })
6103        }
6104        CatalogType::VarChar => {
6105            let mut modifiers = modifiers.iter().fuse();
6106            let length = match modifiers.next() {
6107                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6108                None => None,
6109            };
6110            if modifiers.next().is_some() {
6111                sql_bail!("type character varying supports at most one type modifier");
6112            }
6113            Ok(SqlScalarType::VarChar { max_length: length })
6114        }
6115        CatalogType::Timestamp => {
6116            let mut modifiers = modifiers.iter().fuse();
6117            let precision = match modifiers.next() {
6118                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6119                None => None,
6120            };
6121            if modifiers.next().is_some() {
6122                sql_bail!("type timestamp supports at most one type modifier");
6123            }
6124            Ok(SqlScalarType::Timestamp { precision })
6125        }
6126        CatalogType::TimestampTz => {
6127            let mut modifiers = modifiers.iter().fuse();
6128            let precision = match modifiers.next() {
6129                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6130                None => None,
6131            };
6132            if modifiers.next().is_some() {
6133                sql_bail!("type timestamp with time zone supports at most one type modifier");
6134            }
6135            Ok(SqlScalarType::TimestampTz { precision })
6136        }
6137        t => {
6138            if !modifiers.is_empty() {
6139                sql_bail!(
6140                    "{} does not support type modifiers",
6141                    catalog.resolve_full_name(entry.name()).to_string()
6142                );
6143            }
6144            match t {
6145                CatalogType::Array {
6146                    element_reference: element_id,
6147                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6148                    catalog,
6149                    *element_id,
6150                    modifiers,
6151                )?))),
6152                CatalogType::List {
6153                    element_reference: element_id,
6154                    element_modifiers,
6155                } => Ok(SqlScalarType::List {
6156                    element_type: Box::new(scalar_type_from_catalog(
6157                        catalog,
6158                        *element_id,
6159                        element_modifiers,
6160                    )?),
6161                    custom_id: Some(id),
6162                }),
6163                CatalogType::Map {
6164                    key_reference: _,
6165                    key_modifiers: _,
6166                    value_reference: value_id,
6167                    value_modifiers,
6168                } => Ok(SqlScalarType::Map {
6169                    value_type: Box::new(scalar_type_from_catalog(
6170                        catalog,
6171                        *value_id,
6172                        value_modifiers,
6173                    )?),
6174                    custom_id: Some(id),
6175                }),
6176                CatalogType::Range {
6177                    element_reference: element_id,
6178                } => Ok(SqlScalarType::Range {
6179                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6180                }),
6181                CatalogType::Record { fields } => {
6182                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6183                        .iter()
6184                        .map(|f| {
6185                            let scalar_type = scalar_type_from_catalog(
6186                                catalog,
6187                                f.type_reference,
6188                                &f.type_modifiers,
6189                            )?;
6190                            Ok((
6191                                f.name.clone(),
6192                                SqlColumnType {
6193                                    scalar_type,
6194                                    nullable: true,
6195                                },
6196                            ))
6197                        })
6198                        .collect::<Result<Box<_>, PlanError>>()?;
6199                    Ok(SqlScalarType::Record {
6200                        fields: scalars,
6201                        custom_id: Some(id),
6202                    })
6203                }
6204                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6205                CatalogType::Bool => Ok(SqlScalarType::Bool),
6206                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6207                CatalogType::Date => Ok(SqlScalarType::Date),
6208                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6209                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6210                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6211                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6212                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6213                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6214                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6215                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6216                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6217                CatalogType::Interval => Ok(SqlScalarType::Interval),
6218                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6219                CatalogType::Oid => Ok(SqlScalarType::Oid),
6220                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6221                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6222                CatalogType::Pseudo => {
6223                    sql_bail!(
6224                        "cannot reference pseudo type {}",
6225                        catalog.resolve_full_name(entry.name()).to_string()
6226                    )
6227                }
6228                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6229                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6230                CatalogType::RegType => Ok(SqlScalarType::RegType),
6231                CatalogType::String => Ok(SqlScalarType::String),
6232                CatalogType::Time => Ok(SqlScalarType::Time),
6233                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6234                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6235                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6236                CatalogType::Numeric => unreachable!("handled above"),
6237                CatalogType::Char => unreachable!("handled above"),
6238                CatalogType::VarChar => unreachable!("handled above"),
6239                CatalogType::Timestamp => unreachable!("handled above"),
6240                CatalogType::TimestampTz => unreachable!("handled above"),
6241            }
6242        }
6243    }
6244}
6245
6246/// This is used to collect aggregates and table functions from within an `Expr`.
6247/// See the explanation of aggregate handling at the top of the file for more details.
6248struct AggregateTableFuncVisitor<'a> {
6249    scx: &'a StatementContext<'a>,
6250    aggs: Vec<Function<Aug>>,
6251    within_aggregate: bool,
6252    tables: BTreeMap<Function<Aug>, String>,
6253    table_disallowed_context: Vec<&'static str>,
6254    in_select_item: bool,
6255    id_gen: IdGen,
6256    err: Option<PlanError>,
6257}
6258
6259impl<'a> AggregateTableFuncVisitor<'a> {
6260    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6261        AggregateTableFuncVisitor {
6262            scx,
6263            aggs: Vec::new(),
6264            within_aggregate: false,
6265            tables: BTreeMap::new(),
6266            table_disallowed_context: Vec::new(),
6267            in_select_item: false,
6268            id_gen: Default::default(),
6269            err: None,
6270        }
6271    }
6272
6273    fn into_result(
6274        self,
6275    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6276        match self.err {
6277            Some(err) => Err(err),
6278            None => {
6279                // Dedup while preserving the order. We don't care what the order is, but it
6280                // has to be reproducible so that EXPLAIN PLAN tests work.
6281                let mut seen = BTreeSet::new();
6282                let aggs = self
6283                    .aggs
6284                    .into_iter()
6285                    .filter(move |agg| seen.insert(agg.clone()))
6286                    .collect();
6287                Ok((aggs, self.tables))
6288            }
6289        }
6290    }
6291}
6292
6293impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6294    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6295        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6296            Ok(i) => i,
6297            // Catching missing functions later in planning improves error messages.
6298            Err(_) => return,
6299        };
6300
6301        match item.func() {
6302            // We don't want to collect window aggregations, because these will be handled not by
6303            // plan_aggregate, but by plan_function.
6304            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6305                if self.within_aggregate {
6306                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6307                    return;
6308                }
6309                self.aggs.push(func.clone());
6310                let Function {
6311                    name: _,
6312                    args,
6313                    filter,
6314                    over: _,
6315                    distinct: _,
6316                } = func;
6317                if let Some(filter) = filter {
6318                    self.visit_expr_mut(filter);
6319                }
6320                let old_within_aggregate = self.within_aggregate;
6321                self.within_aggregate = true;
6322                self.table_disallowed_context
6323                    .push("aggregate function calls");
6324
6325                self.visit_function_args_mut(args);
6326
6327                self.within_aggregate = old_within_aggregate;
6328                self.table_disallowed_context.pop();
6329            }
6330            Ok(Func::Table { .. }) => {
6331                self.table_disallowed_context.push("other table functions");
6332                visit_mut::visit_function_mut(self, func);
6333                self.table_disallowed_context.pop();
6334            }
6335            _ => visit_mut::visit_function_mut(self, func),
6336        }
6337    }
6338
6339    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6340        // Don't go into subqueries.
6341    }
6342
6343    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6344        let (disallowed_context, func) = match expr {
6345            Expr::Case { .. } => (Some("CASE"), None),
6346            Expr::HomogenizingFunction {
6347                function: HomogenizingFunction::Coalesce,
6348                ..
6349            } => (Some("COALESCE"), None),
6350            Expr::Function(func) if self.in_select_item => {
6351                // If we're in a SELECT list, replace table functions with a uuid identifier
6352                // and save the table func so it can be planned elsewhere.
6353                let mut table_func = None;
6354                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6355                    if let Ok(Func::Table { .. }) = item.func() {
6356                        if let Some(context) = self.table_disallowed_context.last() {
6357                            self.err = Some(sql_err!(
6358                                "table functions are not allowed in {} (function {})",
6359                                context,
6360                                func.name
6361                            ));
6362                            return;
6363                        }
6364                        table_func = Some(func.clone());
6365                    }
6366                }
6367                // Since we will descend into the table func below, don't add its own disallow
6368                // context here, instead use visit_function to set that.
6369                (None, table_func)
6370            }
6371            _ => (None, None),
6372        };
6373        if let Some(func) = func {
6374            // Since we are trading out expr, we need to visit the table func here.
6375            visit_mut::visit_expr_mut(self, expr);
6376            // Don't attempt to replace table functions with unsupported syntax.
6377            if let Function {
6378                name: _,
6379                args: _,
6380                filter: None,
6381                over: None,
6382                distinct: false,
6383            } = &func
6384            {
6385                // Identical table functions can be de-duplicated.
6386                let unique_id = self.id_gen.allocate_id();
6387                let id = self
6388                    .tables
6389                    .entry(func)
6390                    .or_insert_with(|| format!("table_func_{unique_id}"));
6391                // We know this is okay because id is is 11 characters + <=20 characters, which is
6392                // less than our max length.
6393                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6394            }
6395        }
6396        if let Some(context) = disallowed_context {
6397            self.table_disallowed_context.push(context);
6398        }
6399
6400        visit_mut::visit_expr_mut(self, expr);
6401
6402        if disallowed_context.is_some() {
6403            self.table_disallowed_context.pop();
6404        }
6405    }
6406
6407    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6408        let old = self.in_select_item;
6409        self.in_select_item = true;
6410        visit_mut::visit_select_item_mut(self, si);
6411        self.in_select_item = old;
6412    }
6413}
6414
6415#[derive(Default)]
6416struct WindowFuncCollector {
6417    window_funcs: Vec<Expr<Aug>>,
6418}
6419
6420impl WindowFuncCollector {
6421    fn into_result(self) -> Vec<Expr<Aug>> {
6422        // Dedup while preserving the order.
6423        let mut seen = BTreeSet::new();
6424        let window_funcs_dedupped = self
6425            .window_funcs
6426            .into_iter()
6427            .filter(move |expr| seen.insert(expr.clone()))
6428            // Reverse the order, so that in case of a nested window function call, the
6429            // inner one is evaluated first.
6430            .rev()
6431            .collect();
6432        window_funcs_dedupped
6433    }
6434}
6435
6436impl Visit<'_, Aug> for WindowFuncCollector {
6437    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6438        match expr {
6439            Expr::Function(func) => {
6440                if func.over.is_some() {
6441                    self.window_funcs.push(expr.clone());
6442                }
6443            }
6444            _ => (),
6445        }
6446        visit::visit_expr(self, expr);
6447    }
6448
6449    fn visit_query(&mut self, _query: &Query<Aug>) {
6450        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6451    }
6452}
6453
6454/// Specifies how long a query will live.
6455#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6456pub enum QueryLifetime {
6457    /// The query's (or the expression's) result will be computed at one point in time.
6458    OneShot,
6459    /// The query (or expression) is used in a dataflow that maintains an index.
6460    Index,
6461    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6462    MaterializedView,
6463    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6464    Subscribe,
6465    /// The query (or expression) is part of a (non-materialized) view.
6466    View,
6467    /// The expression is part of a source definition.
6468    Source,
6469}
6470
6471impl QueryLifetime {
6472    /// (This used to impact whether the query is allowed to reason about the time at which it is
6473    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6474    /// mechanism, see `ExprPrepStyle`.)
6475    pub fn is_one_shot(&self) -> bool {
6476        let result = match self {
6477            QueryLifetime::OneShot => true,
6478            QueryLifetime::Index => false,
6479            QueryLifetime::MaterializedView => false,
6480            QueryLifetime::Subscribe => false,
6481            QueryLifetime::View => false,
6482            QueryLifetime::Source => false,
6483        };
6484        assert_eq!(!result, self.is_maintained());
6485        result
6486    }
6487
6488    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6489    /// turned into a `TopK`.
6490    pub fn is_maintained(&self) -> bool {
6491        match self {
6492            QueryLifetime::OneShot => false,
6493            QueryLifetime::Index => true,
6494            QueryLifetime::MaterializedView => true,
6495            QueryLifetime::Subscribe => true,
6496            QueryLifetime::View => true,
6497            QueryLifetime::Source => true,
6498        }
6499    }
6500
6501    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6502    pub fn allow_show(&self) -> bool {
6503        match self {
6504            QueryLifetime::OneShot => true,
6505            QueryLifetime::Index => false,
6506            QueryLifetime::MaterializedView => false,
6507            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6508            QueryLifetime::View => false,
6509            QueryLifetime::Source => false,
6510        }
6511    }
6512}
6513
6514/// Description of a CTE sufficient for query planning.
6515#[derive(Debug, Clone)]
6516pub struct CteDesc {
6517    pub name: String,
6518    pub desc: RelationDesc,
6519}
6520
6521/// The state required when planning a `Query`.
6522#[derive(Debug, Clone)]
6523pub struct QueryContext<'a> {
6524    /// The context for the containing `Statement`.
6525    pub scx: &'a StatementContext<'a>,
6526    /// The lifetime that the planned query will have.
6527    pub lifetime: QueryLifetime,
6528    /// The scopes of the outer relation expression.
6529    pub outer_scopes: Vec<Scope>,
6530    /// The type of the outer relation expressions.
6531    pub outer_relation_types: Vec<SqlRelationType>,
6532    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6533    pub ctes: BTreeMap<LocalId, CteDesc>,
6534    /// A name manager, for interning column names that will be stored in HIR and MIR.
6535    pub name_manager: Rc<RefCell<NameManager>>,
6536    pub recursion_guard: RecursionGuard,
6537}
6538
6539impl CheckedRecursion for QueryContext<'_> {
6540    fn recursion_guard(&self) -> &RecursionGuard {
6541        &self.recursion_guard
6542    }
6543}
6544
6545impl<'a> QueryContext<'a> {
6546    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6547        QueryContext {
6548            scx,
6549            lifetime,
6550            outer_scopes: vec![],
6551            outer_relation_types: vec![],
6552            ctes: BTreeMap::new(),
6553            name_manager: Rc::new(RefCell::new(NameManager::new())),
6554            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6555        }
6556    }
6557
6558    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6559        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6560    }
6561
6562    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6563    /// `self`.
6564    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6565        let ctes = self.ctes.clone();
6566        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6567        let outer_relation_types = iter::once(relation_type)
6568            .chain(self.outer_relation_types.clone())
6569            .collect();
6570        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6571        let name_manager = Rc::clone(&self.name_manager);
6572
6573        QueryContext {
6574            scx: self.scx,
6575            lifetime: self.lifetime,
6576            outer_scopes,
6577            outer_relation_types,
6578            ctes,
6579            name_manager,
6580            recursion_guard: self.recursion_guard.clone(),
6581        }
6582    }
6583
6584    /// Derives a `QueryContext` for a scope that contains no columns.
6585    fn empty_derived_context(&self) -> QueryContext<'a> {
6586        let scope = Scope::empty();
6587        let ty = SqlRelationType::empty();
6588        self.derived_context(scope, ty)
6589    }
6590
6591    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6592    /// CTE.
6593    pub fn resolve_table_name(
6594        &self,
6595        object: ResolvedItemName,
6596    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6597        match object {
6598            ResolvedItemName::Item {
6599                id,
6600                full_name,
6601                version,
6602                ..
6603            } => {
6604                let item = self.scx.get_item(&id).at_version(version);
6605                let desc = match item.relation_desc() {
6606                    Some(desc) => desc.clone(),
6607                    None => {
6608                        return Err(PlanError::InvalidDependency {
6609                            name: full_name.to_string(),
6610                            item_type: item.item_type().to_string(),
6611                        });
6612                    }
6613                };
6614                let expr = HirRelationExpr::Get {
6615                    id: Id::Global(item.global_id()),
6616                    typ: desc.typ().clone(),
6617                };
6618
6619                let name = full_name.into();
6620                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6621
6622                Ok((expr, scope))
6623            }
6624            ResolvedItemName::Cte { id, name } => {
6625                let name = name.into();
6626                let cte = self.ctes.get(&id).unwrap();
6627                let expr = HirRelationExpr::Get {
6628                    id: Id::Local(id),
6629                    typ: cte.desc.typ().clone(),
6630                };
6631
6632                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6633
6634                Ok((expr, scope))
6635            }
6636            ResolvedItemName::ContinualTask { id, name } => {
6637                let cte = self.ctes.get(&id).unwrap();
6638                let expr = HirRelationExpr::Get {
6639                    id: Id::Local(id),
6640                    typ: cte.desc.typ().clone(),
6641                };
6642
6643                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6644
6645                Ok((expr, scope))
6646            }
6647            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6648        }
6649    }
6650
6651    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6652    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6653    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6654        self.scx.humanize_scalar_type(typ, postgres_compat)
6655    }
6656}
6657
6658/// A bundle of unrelated things that we need for planning `Expr`s.
6659#[derive(Debug, Clone)]
6660pub struct ExprContext<'a> {
6661    pub qcx: &'a QueryContext<'a>,
6662    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6663    pub name: &'a str,
6664    /// The context for the `Query` that contains this `Expr`.
6665    /// The current scope.
6666    pub scope: &'a Scope,
6667    /// The type of the current relation expression upon which this scalar
6668    /// expression will be evaluated.
6669    pub relation_type: &'a SqlRelationType,
6670    /// Are aggregate functions allowed in this context
6671    pub allow_aggregates: bool,
6672    /// Are subqueries allowed in this context
6673    pub allow_subqueries: bool,
6674    /// Are parameters allowed in this context.
6675    pub allow_parameters: bool,
6676    /// Are window functions allowed in this context
6677    pub allow_windows: bool,
6678}
6679
6680impl CheckedRecursion for ExprContext<'_> {
6681    fn recursion_guard(&self) -> &RecursionGuard {
6682        &self.qcx.recursion_guard
6683    }
6684}
6685
6686impl<'a> ExprContext<'a> {
6687    pub fn catalog(&self) -> &dyn SessionCatalog {
6688        self.qcx.scx.catalog
6689    }
6690
6691    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6692        let mut ecx = self.clone();
6693        ecx.name = name;
6694        ecx
6695    }
6696
6697    pub fn column_type<E>(&self, expr: &E) -> E::Type
6698    where
6699        E: AbstractExpr,
6700    {
6701        expr.typ(
6702            &self.qcx.outer_relation_types,
6703            self.relation_type,
6704            &self.qcx.scx.param_types.borrow(),
6705        )
6706    }
6707
6708    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6709    where
6710        E: AbstractExpr,
6711    {
6712        self.column_type(expr).scalar_type()
6713    }
6714
6715    fn derived_query_context(&self) -> QueryContext<'_> {
6716        let mut scope = self.scope.clone();
6717        scope.lateral_barrier = true;
6718        self.qcx.derived_context(scope, self.relation_type.clone())
6719    }
6720
6721    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6722        self.qcx.scx.require_feature_flag(flag)
6723    }
6724
6725    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6726        &self.qcx.scx.param_types
6727    }
6728
6729    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6730    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6731    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6732        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6733    }
6734
6735    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6736        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6737    }
6738}
6739
6740/// Manages column names, doing lightweight string internment.
6741///
6742/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6743/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6744/// to ensure maximal sharing.
6745#[derive(Debug, Clone)]
6746pub struct NameManager(BTreeSet<Arc<str>>);
6747
6748impl NameManager {
6749    /// Creates a new `NameManager`, with no interned names
6750    pub fn new() -> Self {
6751        Self(BTreeSet::new())
6752    }
6753
6754    /// Interns a string, returning a reference-counted pointer to the interned
6755    /// string.
6756    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6757        let s = s.as_ref();
6758        if let Some(interned) = self.0.get(s) {
6759            Arc::clone(interned)
6760        } else {
6761            let interned: Arc<str> = Arc::from(s);
6762            self.0.insert(Arc::clone(&interned));
6763            interned
6764        }
6765    }
6766
6767    /// Interns a string representing a reference to a `ScopeItem`, returning a
6768    /// reference-counted pointer to the interned string.
6769    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6770        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6771        //
6772        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6773        // name information. Note that as of 2025-04-09, we don't support column
6774        // renames.
6775        //
6776        // A few bad alternatives:
6777        //
6778        // (1) Store it but don't write it down. This fails because the expression
6779        //     cache will erase our names on restart.
6780        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6781        //     get the right names. But the world now and the world when we made
6782        //     those objects may be different.
6783        // (3) Just don't write down the table name. Nothing fails... for now.
6784
6785        self.intern(item.column_name.as_str())
6786    }
6787}
6788
6789#[cfg(test)]
6790mod test {
6791    use super::*;
6792
6793    /// Ensure that `NameManager`'s string interning works as expected.
6794    ///
6795    /// In particular, structurally but not referentially identical strings should
6796    /// be interned to the same `Arc`ed pointer.
6797    #[mz_ore::test]
6798    pub fn test_name_manager_string_interning() {
6799        let mut nm = NameManager::new();
6800
6801        let orig_hi = "hi";
6802        let hi = nm.intern(orig_hi);
6803        let hello = nm.intern("hello");
6804
6805        assert_ne!(hi.as_ptr(), hello.as_ptr());
6806
6807        // this static string is _likely_ the same as `orig_hi``
6808        let hi2 = nm.intern("hi");
6809        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6810
6811        // generate a "hi" string that doesn't get optimized to the same static string
6812        let s = format!(
6813            "{}{}",
6814            hi.chars().nth(0).unwrap(),
6815            hi2.chars().nth(1).unwrap()
6816        );
6817        // make sure that we're testing with a fresh string!
6818        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6819
6820        let hi3 = nm.intern(s);
6821        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6822    }
6823}