mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52    func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67    RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103    literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111    pub expr: E,
112    pub desc: RelationDesc,
113    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114    pub scope: Scope,
115}
116
117/// Plans a top-level query, returning the `HirRelationExpr` describing the query
118/// plan, the `RelationDesc` describing the shape of the result set, a
119/// `RowSetFinishing` describing post-processing that must occur before results
120/// are sent to the client, and the types of the parameters in the query, if any
121/// were present.
122///
123/// Note that the returned `RelationDesc` describes the expression after
124/// applying the returned `RowSetFinishing`.
125#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127    scx: &StatementContext,
128    mut query: Query<Aug>,
129    lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131    transform_ast::transform(scx, &mut query)?;
132    let mut qcx = QueryContext::root(scx, lifetime);
133    let PlannedQuery {
134        mut expr,
135        scope,
136        order_by,
137        limit,
138        offset,
139        project,
140        group_size_hints,
141    } = plan_query(&mut qcx, &query)?;
142
143    let mut finishing = RowSetFinishing {
144        limit,
145        offset,
146        project,
147        order_by,
148    };
149
150    // Attempt to push the finishing's ordering past its projection. This allows
151    // data to be projected down on the workers rather than the coordinator. It
152    // also improves the optimizer's demand analysis, as the optimizer can only
153    // reason about demand information in `expr` (i.e., it can't see
154    // `finishing.project`).
155    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157    if lifetime.is_maintained() {
158        expr.finish_maintained(&mut finishing, group_size_hints);
159    }
160
161    let typ = qcx.relation_type(&expr);
162    let typ = SqlRelationType::new(
163        finishing
164            .project
165            .iter()
166            .map(|i| typ.column_types[*i].clone())
167            .collect(),
168    );
169    let desc = RelationDesc::new(typ, scope.column_names());
170
171    Ok(PlannedRootQuery {
172        expr,
173        desc,
174        finishing,
175        scope,
176    })
177}
178
179/// TODO(ct2): Dedup this with [plan_root_query].
180#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182    qcx: &mut QueryContext,
183    mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185    transform_ast::transform(qcx.scx, &mut query)?;
186    let PlannedQuery {
187        mut expr,
188        scope,
189        order_by,
190        limit,
191        offset,
192        project,
193        group_size_hints,
194    } = plan_query(qcx, &query)?;
195
196    let mut finishing = RowSetFinishing {
197        limit,
198        offset,
199        project,
200        order_by,
201    };
202
203    // Attempt to push the finishing's ordering past its projection. This allows
204    // data to be projected down on the workers rather than the coordinator. It
205    // also improves the optimizer's demand analysis, as the optimizer can only
206    // reason about demand information in `expr` (i.e., it can't see
207    // `finishing.project`).
208    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210    expr.finish_maintained(&mut finishing, group_size_hints);
211
212    let typ = qcx.relation_type(&expr);
213    let typ = SqlRelationType::new(
214        finishing
215            .project
216            .iter()
217            .map(|i| typ.column_types[*i].clone())
218            .collect(),
219    );
220    let desc = RelationDesc::new(typ, scope.column_names());
221
222    Ok(PlannedRootQuery {
223        expr,
224        desc,
225        finishing,
226        scope,
227    })
228}
229
230/// Attempts to push a projection through an order by.
231///
232/// The returned bool indicates whether the pushdown was successful or not.
233/// Successful pushdown requires that all the columns referenced in `order_by`
234/// are included in `project`.
235///
236/// When successful, `expr` is wrapped in a projection node, `order_by` is
237/// rewritten to account for the pushed-down projection, and `project` is
238/// replaced with the trivial projection. When unsuccessful, no changes are made
239/// to any of the inputs.
240fn try_push_projection_order_by(
241    expr: &mut HirRelationExpr,
242    project: &mut Vec<usize>,
243    order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245    let mut unproject = vec![None; expr.arity()];
246    for (out_i, in_i) in project.iter().copied().enumerate() {
247        unproject[in_i] = Some(out_i);
248    }
249    if order_by
250        .iter()
251        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252    {
253        let trivial_project = (0..project.len()).collect();
254        *expr = expr.take().project(mem::replace(project, trivial_project));
255        for ob in order_by {
256            ob.column = unproject[ob.column].unwrap();
257        }
258        true
259    } else {
260        false
261    }
262}
263
264pub fn plan_insert_query(
265    scx: &StatementContext,
266    table_name: ResolvedItemName,
267    columns: Vec<Ident>,
268    source: InsertSource<Aug>,
269    returning: Vec<SelectItem<Aug>>,
270) -> Result<
271    (
272        CatalogItemId,
273        HirRelationExpr,
274        PlannedRootQuery<Vec<HirScalarExpr>>,
275    ),
276    PlanError,
277> {
278    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279    let table = scx.get_item_by_resolved_name(&table_name)?;
280
281    // Validate the target of the insert.
282    if table.item_type() != CatalogItemType::Table {
283        sql_bail!(
284            "cannot insert into {} '{}'",
285            table.item_type(),
286            table_name.full_name_str()
287        );
288    }
289    let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
290    let mut defaults = table
291        .writable_table_details()
292        .ok_or_else(|| {
293            sql_err!(
294                "cannot insert into non-writeable table '{}'",
295                table_name.full_name_str()
296            )
297        })?
298        .to_vec();
299
300    for default in &mut defaults {
301        transform_ast::transform(scx, default)?;
302    }
303
304    if table.id().is_system() {
305        sql_bail!(
306            "cannot insert into system table '{}'",
307            table_name.full_name_str()
308        );
309    }
310
311    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313    // Validate target column order.
314    let mut source_types = Vec::with_capacity(columns.len());
315    let mut ordering = Vec::with_capacity(columns.len());
316
317    if columns.is_empty() {
318        // Columns in source query must be in order. Let's guess the full shape and truncate to the
319        // right size later after planning the source query
320        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321        ordering.extend(0..desc.arity());
322    } else {
323        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324            .iter()
325            .enumerate()
326            .map(|(idx, (name, typ))| (name, (idx, typ)))
327            .collect();
328
329        for c in &columns {
330            if let Some((idx, typ)) = column_by_name.get(c) {
331                ordering.push(*idx);
332                source_types.push(&typ.scalar_type);
333            } else {
334                sql_bail!(
335                    "column {} of relation {} does not exist",
336                    c.quoted(),
337                    table_name.full_name_str().quoted()
338                );
339            }
340        }
341        if let Some(dup) = columns.iter().duplicates().next() {
342            sql_bail!("column {} specified more than once", dup.quoted());
343        }
344    };
345
346    // Plan the source.
347    let expr = match source {
348        InsertSource::Query(mut query) => {
349            transform_ast::transform(scx, &mut query)?;
350
351            match query {
352                // Special-case simple VALUES clauses as PostgreSQL does.
353                Query {
354                    body: SetExpr::Values(Values(values)),
355                    ctes,
356                    order_by,
357                    limit: None,
358                    offset: None,
359                } if ctes.is_empty() && order_by.is_empty() => {
360                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361                    plan_values_insert(&qcx, &names, &source_types, &values)?
362                }
363                _ => {
364                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365                    expr
366                }
367            }
368        }
369        InsertSource::DefaultValues => {
370            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371        }
372    };
373
374    let expr_arity = expr.arity();
375
376    // Validate that the arity of the source query is at most the size of declared columns or the
377    // size of the table if none are declared
378    let max_columns = if columns.is_empty() {
379        desc.arity()
380    } else {
381        columns.len()
382    };
383    if expr_arity > max_columns {
384        sql_bail!("INSERT has more expressions than target columns");
385    }
386    // But it should never have less than the declared columns (or zero)
387    if expr_arity < columns.len() {
388        sql_bail!("INSERT has more target columns than expressions");
389    }
390
391    // Trim now that we know for sure the correct arity of the source query
392    source_types.truncate(expr_arity);
393    ordering.truncate(expr_arity);
394
395    // Ensure the types of the source query match the types of the target table,
396    // installing assignment casts where necessary and possible.
397    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398        sql_err!(
399            "column {} is of type {} but expression is of type {}",
400            desc.get_name(ordering[e.column]).quoted(),
401            qcx.humanize_scalar_type(&e.target_type, false),
402            qcx.humanize_scalar_type(&e.source_type, false),
403        )
404    })?;
405
406    // Fill in any omitted columns and rearrange into correct order
407    let mut map_exprs = vec![];
408    let mut project_key = Vec::with_capacity(desc.arity());
409
410    // Maps from table column index to position in the source query
411    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414    for (col_idx, (col_typ, default)) in column_details {
415        if let Some(src_idx) = col_to_source.get(&col_idx) {
416            project_key.push(*src_idx);
417        } else {
418            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419            project_key.push(expr_arity + map_exprs.len());
420            map_exprs.push(hir);
421        }
422    }
423
424    let returning = {
425        let (scope, typ) = if let ResolvedItemName::Item {
426            full_name,
427            version: _,
428            ..
429        } = table_name
430        {
431            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432            let typ = desc.typ().clone();
433            (scope, typ)
434        } else {
435            (Scope::empty(), SqlRelationType::empty())
436        };
437        let ecx = &ExprContext {
438            qcx: &qcx,
439            name: "RETURNING clause",
440            scope: &scope,
441            relation_type: &typ,
442            allow_aggregates: false,
443            allow_subqueries: false,
444            allow_parameters: true,
445            allow_windows: false,
446        };
447        let table_func_names = BTreeMap::new();
448        let mut output_columns = vec![];
449        let mut new_exprs = vec![];
450        let mut new_type = SqlRelationType::empty();
451        for mut si in returning {
452            transform_ast::transform(scx, &mut si)?;
453            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454                let expr = match &select_item {
455                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457                };
458                output_columns.push(column_name);
459                let typ = ecx.column_type(&expr);
460                new_type.column_types.push(typ);
461                new_exprs.push(expr);
462            }
463        }
464        let desc = RelationDesc::new(new_type, output_columns);
465        let desc_arity = desc.arity();
466        PlannedRootQuery {
467            expr: new_exprs,
468            desc,
469            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470            scope,
471        }
472    };
473
474    Ok((
475        table.id(),
476        expr.map(map_exprs).project(project_key),
477        returning,
478    ))
479}
480
481/// Determines the mapping between some external data and a Materialize relation.
482///
483/// Returns the following:
484/// * [`CatalogItemId`] for the destination table.
485/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
486/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
487///   since we now return a [`MapFilterProject`].
488/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
489///   destination table.
490///
491pub fn plan_copy_item(
492    scx: &StatementContext,
493    item_name: ResolvedItemName,
494    columns: Vec<Ident>,
495) -> Result<
496    (
497        CatalogItemId,
498        RelationDesc,
499        Vec<ColumnIndex>,
500        Option<MapFilterProject>,
501    ),
502    PlanError,
503> {
504    let item = scx.get_item_by_resolved_name(&item_name)?;
505    let fullname = scx.catalog.resolve_full_name(item.name());
506    let table_desc = item.desc(&fullname)?.into_owned();
507    let mut ordering = Vec::with_capacity(columns.len());
508
509    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
510    // should be simplified. The reason they are currently separate code paths is so we can roll
511    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
512
513    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
514    // FROM SOURCE ...`), then we generate an MFP.
515    //
516    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
517    // so it's not always guaranteed that our `item` is a table.
518    let mfp = if let Some(table_defaults) = item.writable_table_details() {
519        let mut table_defaults = table_defaults.to_vec();
520
521        for default in &mut table_defaults {
522            transform_ast::transform(scx, default)?;
523        }
524
525        // Fill in any omitted columns and rearrange into correct order
526        let source_column_names: Vec<_> = columns
527            .iter()
528            .cloned()
529            .map(normalize::column_name)
530            .collect();
531
532        let mut default_exprs = Vec::new();
533        let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535        // For each column in the destination table, either project it from the source data, or provide
536        // an expression to fill in a default value.
537        let column_details = table_desc.iter().zip_eq(table_defaults);
538        for ((col_name, col_type), col_default) in column_details {
539            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540            if let Some(src_idx) = maybe_src_idx {
541                project_keys.push(src_idx);
542            } else {
543                // If one a column from the table does not exist in the source data, then a default
544                // value will get appended to the end of the input Row from the source data.
545                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546                let mir = hir.lower_uncorrelated()?;
547                project_keys.push(source_column_names.len() + default_exprs.len());
548                default_exprs.push(mir);
549            }
550        }
551
552        let mfp = MapFilterProject::new(source_column_names.len())
553            .map(default_exprs)
554            .project(project_keys);
555        Some(mfp)
556    } else {
557        None
558    };
559
560    // Create a mapping from input data to the table we're copying into.
561    let source_desc = if columns.is_empty() {
562        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563        ordering.extend(indexes);
564
565        // The source data should be in the same order as the table.
566        table_desc
567    } else {
568        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
570            .iter_all()
571            .map(|(idx, name, typ)| (name, (*idx, typ)))
572            .collect();
573
574        let mut names = Vec::with_capacity(columns.len());
575        let mut source_types = Vec::with_capacity(columns.len());
576
577        for c in &columns {
578            if let Some((idx, typ)) = column_by_name.get(c) {
579                ordering.push(*idx);
580                source_types.push((*typ).clone());
581                names.push(c.clone());
582            } else {
583                sql_bail!(
584                    "column {} of relation {} does not exist",
585                    c.quoted(),
586                    item_name.full_name_str().quoted()
587                );
588            }
589        }
590        if let Some(dup) = columns.iter().duplicates().next() {
591            sql_bail!("column {} specified more than once", dup.quoted());
592        }
593
594        // The source data is a different shape than the destination table.
595        RelationDesc::new(SqlRelationType::new(source_types), names)
596    };
597
598    Ok((item.id(), source_desc, ordering, mfp))
599}
600
601/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
602///
603/// TODO(cf3): Merge this method with [`plan_copy_item`].
604pub fn plan_copy_from(
605    scx: &StatementContext,
606    table_name: ResolvedItemName,
607    columns: Vec<Ident>,
608) -> Result<
609    (
610        CatalogItemId,
611        RelationDesc,
612        Vec<ColumnIndex>,
613        Option<MapFilterProject>,
614    ),
615    PlanError,
616> {
617    let table = scx.get_item_by_resolved_name(&table_name)?;
618
619    // Validate the target of the insert.
620    if table.item_type() != CatalogItemType::Table {
621        sql_bail!(
622            "cannot insert into {} '{}'",
623            table.item_type(),
624            table_name.full_name_str()
625        );
626    }
627
628    let _ = table.writable_table_details().ok_or_else(|| {
629        sql_err!(
630            "cannot insert into non-writeable table '{}'",
631            table_name.full_name_str()
632        )
633    })?;
634
635    if table.id().is_system() {
636        sql_bail!(
637            "cannot insert into system table '{}'",
638            table_name.full_name_str()
639        );
640    }
641    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643    Ok((id, desc, ordering, mfp))
644}
645
646/// Builds a plan that adds the default values for the missing columns and re-orders
647/// the datums in the given rows to match the order in the target table.
648pub fn plan_copy_from_rows(
649    pcx: &PlanContext,
650    catalog: &dyn SessionCatalog,
651    target_id: CatalogItemId,
652    target_name: String,
653    columns: Vec<ColumnIndex>,
654    rows: Vec<mz_repr::Row>,
655) -> Result<HirRelationExpr, PlanError> {
656    let scx = StatementContext::new(Some(pcx), catalog);
657
658    // Always copy at the latest version of the table.
659    let table = catalog
660        .try_get_item(&target_id)
661        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
662        .at_version(RelationVersionSelector::Latest);
663    let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
664
665    let mut defaults = table
666        .writable_table_details()
667        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
668        .to_vec();
669
670    for default in &mut defaults {
671        transform_ast::transform(&scx, default)?;
672    }
673
674    let column_types = columns
675        .iter()
676        .map(|x| desc.get_type(x).clone())
677        .map(|mut x| {
678            // Null constraint is enforced later, when inserting the row in the table.
679            // Without this, an assert is hit during lowering.
680            x.nullable = true;
681            x
682        })
683        .collect();
684    let typ = SqlRelationType::new(column_types);
685    let expr = HirRelationExpr::Constant {
686        rows,
687        typ: typ.clone(),
688    };
689
690    // Exit early with just the raw constant if we know that all columns are present
691    // and in the correct order. This lets us bypass expensive downstream optimizations
692    // more easily, as at every stage we know this expression is nothing more than
693    // a constant (as opposed to e.g. a constant with with an identity map and identity
694    // projection).
695    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
696    if columns == default {
697        return Ok(expr);
698    }
699
700    // Fill in any omitted columns and rearrange into correct order
701    let mut map_exprs = vec![];
702    let mut project_key = Vec::with_capacity(desc.arity());
703
704    // Maps from table column index to position in the source query
705    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
706
707    let column_details = desc.iter_all().zip_eq(defaults);
708    for ((col_idx, _col_name, col_typ), default) in column_details {
709        if let Some(src_idx) = col_to_source.get(&col_idx) {
710            project_key.push(*src_idx);
711        } else {
712            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
713            project_key.push(typ.arity() + map_exprs.len());
714            map_exprs.push(hir);
715        }
716    }
717
718    Ok(expr.map(map_exprs).project(project_key))
719}
720
721/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
722pub struct ReadThenWritePlan {
723    pub id: CatalogItemId,
724    /// Read portion of query.
725    ///
726    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
727    /// retractions.
728    pub selection: HirRelationExpr,
729    /// Map from column index to SET expression. Empty for DELETE statements.
730    pub assignments: BTreeMap<usize, HirScalarExpr>,
731    pub finishing: RowSetFinishing,
732}
733
734pub fn plan_delete_query(
735    scx: &StatementContext,
736    mut delete_stmt: DeleteStatement<Aug>,
737) -> Result<ReadThenWritePlan, PlanError> {
738    transform_ast::transform(scx, &mut delete_stmt)?;
739
740    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
741    plan_mutation_query_inner(
742        qcx,
743        delete_stmt.table_name,
744        delete_stmt.alias,
745        delete_stmt.using,
746        vec![],
747        delete_stmt.selection,
748    )
749}
750
751pub fn plan_update_query(
752    scx: &StatementContext,
753    mut update_stmt: UpdateStatement<Aug>,
754) -> Result<ReadThenWritePlan, PlanError> {
755    transform_ast::transform(scx, &mut update_stmt)?;
756
757    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
758
759    plan_mutation_query_inner(
760        qcx,
761        update_stmt.table_name,
762        update_stmt.alias,
763        vec![],
764        update_stmt.assignments,
765        update_stmt.selection,
766    )
767}
768
769pub fn plan_mutation_query_inner(
770    qcx: QueryContext,
771    table_name: ResolvedItemName,
772    alias: Option<TableAlias>,
773    using: Vec<TableWithJoins<Aug>>,
774    assignments: Vec<Assignment<Aug>>,
775    selection: Option<Expr<Aug>>,
776) -> Result<ReadThenWritePlan, PlanError> {
777    // Get ID and version of the relation desc.
778    let (id, version) = match table_name {
779        ResolvedItemName::Item { id, version, .. } => (id, version),
780        _ => sql_bail!("cannot mutate non-user table"),
781    };
782
783    // Perform checks on item with given ID.
784    let item = qcx.scx.get_item(&id).at_version(version);
785    if item.item_type() != CatalogItemType::Table {
786        sql_bail!(
787            "cannot mutate {} '{}'",
788            item.item_type(),
789            table_name.full_name_str()
790        );
791    }
792    let _ = item.writable_table_details().ok_or_else(|| {
793        sql_err!(
794            "cannot mutate non-writeable table '{}'",
795            table_name.full_name_str()
796        )
797    })?;
798    if id.is_system() {
799        sql_bail!(
800            "cannot mutate system table '{}'",
801            table_name.full_name_str()
802        );
803    }
804
805    // Derive structs for operation from validated table
806    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
807    let scope = plan_table_alias(scope, alias.as_ref())?;
808    let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
809    let relation_type = qcx.relation_type(&get);
810
811    if using.is_empty() {
812        if let Some(expr) = selection {
813            let ecx = &ExprContext {
814                qcx: &qcx,
815                name: "WHERE clause",
816                scope: &scope,
817                relation_type: &relation_type,
818                allow_aggregates: false,
819                allow_subqueries: true,
820                allow_parameters: true,
821                allow_windows: false,
822            };
823            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
824            get = get.filter(vec![expr]);
825        }
826    } else {
827        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
828    }
829
830    let mut sets = BTreeMap::new();
831    for Assignment { id, value } in assignments {
832        // Get the index and type of the column.
833        let name = normalize::column_name(id);
834        match desc.get_by_name(&name) {
835            Some((idx, typ)) => {
836                let ecx = &ExprContext {
837                    qcx: &qcx,
838                    name: "SET clause",
839                    scope: &scope,
840                    relation_type: &relation_type,
841                    allow_aggregates: false,
842                    allow_subqueries: false,
843                    allow_parameters: true,
844                    allow_windows: false,
845                };
846                let expr = plan_expr(ecx, &value)?.cast_to(
847                    ecx,
848                    CastContext::Assignment,
849                    &typ.scalar_type,
850                )?;
851
852                if sets.insert(idx, expr).is_some() {
853                    sql_bail!("column {} set twice", name)
854                }
855            }
856            None => sql_bail!("unknown column {}", name),
857        };
858    }
859
860    let finishing = RowSetFinishing {
861        order_by: vec![],
862        limit: None,
863        offset: 0,
864        project: (0..desc.arity()).collect(),
865    };
866
867    Ok(ReadThenWritePlan {
868        id,
869        selection: get,
870        finishing,
871        assignments: sets,
872    })
873}
874
875// Adjust `get` to perform an existential subquery on `using` accounting for
876// `selection`.
877//
878// If `USING`, we essentially want to rewrite the query as a correlated
879// existential subquery, i.e.
880// ```
881// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
882// ```
883// However, we can't do that directly because of esoteric rules w/r/t `lateral`
884// subqueries.
885// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
886fn handle_mutation_using_clause(
887    qcx: &QueryContext,
888    selection: Option<Expr<Aug>>,
889    using: Vec<TableWithJoins<Aug>>,
890    get: HirRelationExpr,
891    outer_scope: Scope,
892) -> Result<HirRelationExpr, PlanError> {
893    // Plan `USING` as a cross-joined `FROM` without knowledge of the
894    // statement's `FROM` target. This prevents `lateral` subqueries from
895    // "seeing" the `FROM` target.
896    let (mut using_rel_expr, using_scope) =
897        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
898            let (left, left_scope) = l;
899            plan_join(
900                qcx,
901                left,
902                left_scope,
903                &Join {
904                    relation: TableFactor::NestedJoin {
905                        join: Box::new(twj),
906                        alias: None,
907                    },
908                    join_operator: JoinOperator::CrossJoin,
909                },
910            )
911        })?;
912
913    if let Some(expr) = selection {
914        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
915        // PG-like semantics e.g. expressing ambiguous column references. We put
916        // `USING...` first for no real reason, but making a different decision
917        // would require adjusting the column references on this relation
918        // differently.
919        let on = HirScalarExpr::literal_true();
920        let joined = using_rel_expr
921            .clone()
922            .join(get.clone(), on, JoinKind::Inner);
923        let joined_scope = using_scope.product(outer_scope)?;
924        let joined_relation_type = qcx.relation_type(&joined);
925
926        let ecx = &ExprContext {
927            qcx,
928            name: "WHERE clause",
929            scope: &joined_scope,
930            relation_type: &joined_relation_type,
931            allow_aggregates: false,
932            allow_subqueries: true,
933            allow_parameters: true,
934            allow_windows: false,
935        };
936
937        // Plan the filter expression on `FROM, USING...`.
938        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
939
940        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
941        // those to the right of `using_rel_expr`) to instead be correlated to
942        // the outer relation, i.e. `get`.
943        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
944        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
945        use mz_expr::visit::Visit;
946        expr.visit_mut_post(&mut |e| {
947            if let HirScalarExpr::Column(c, _name) = e {
948                if c.column >= using_rel_arity {
949                    c.level += 1;
950                    c.column -= using_rel_arity;
951                };
952            }
953        })?;
954
955        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
956        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
957        // relation.
958        using_rel_expr = using_rel_expr.filter(vec![expr]);
959    } else {
960        // Check that scopes are at compatible (i.e. do not double-reference
961        // same table), despite lack of selection
962        let _joined_scope = using_scope.product(outer_scope)?;
963    }
964    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
965    // whether any rows are returned, and not on the contents of those rows,
966    // the output list of the subquery is normally unimportant.
967    //
968    // This means we don't need to worry about projecting/mapping any
969    // additional expressions here.
970    //
971    // https://www.postgresql.org/docs/14/functions-subquery.html
972
973    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
974    Ok(get.filter(vec![using_rel_expr.exists()]))
975}
976
977#[derive(Debug)]
978pub(crate) struct CastRelationError {
979    pub(crate) column: usize,
980    pub(crate) source_type: SqlScalarType,
981    pub(crate) target_type: SqlScalarType,
982}
983
984/// Cast a relation from one type to another using the specified type of cast.
985///
986/// The length of `target_types` must match the arity of `expr`.
987pub(crate) fn cast_relation<'a, I>(
988    qcx: &QueryContext,
989    ccx: CastContext,
990    expr: HirRelationExpr,
991    target_types: I,
992) -> Result<HirRelationExpr, CastRelationError>
993where
994    I: IntoIterator<Item = &'a SqlScalarType>,
995{
996    let ecx = &ExprContext {
997        qcx,
998        name: "values",
999        scope: &Scope::empty(),
1000        relation_type: &qcx.relation_type(&expr),
1001        allow_aggregates: false,
1002        allow_subqueries: true,
1003        allow_parameters: true,
1004        allow_windows: false,
1005    };
1006    let mut map_exprs = vec![];
1007    let mut project_key = vec![];
1008    for (i, target_typ) in target_types.into_iter().enumerate() {
1009        let expr = HirScalarExpr::column(i);
1010        // We plan every cast and check the evaluated expressions rather than
1011        // checking the types directly because of some complex casting rules
1012        // between types not expressed in `SqlScalarType` equality.
1013        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1014            Ok(cast_expr) => {
1015                if expr == cast_expr {
1016                    // Cast between types was unnecessary
1017                    project_key.push(i);
1018                } else {
1019                    // Cast between types required
1020                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1021                    map_exprs.push(cast_expr);
1022                }
1023            }
1024            Err(_) => {
1025                return Err(CastRelationError {
1026                    column: i,
1027                    source_type: ecx.scalar_type(&expr),
1028                    target_type: target_typ.clone(),
1029                });
1030            }
1031        }
1032    }
1033    Ok(expr.map(map_exprs).project(project_key))
1034}
1035
1036/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1037/// VIEW` statement.
1038pub fn plan_as_of(
1039    scx: &StatementContext,
1040    as_of: Option<AsOf<Aug>>,
1041) -> Result<QueryWhen, PlanError> {
1042    match as_of {
1043        None => Ok(QueryWhen::Immediately),
1044        Some(as_of) => match as_of {
1045            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1046            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1047        },
1048    }
1049}
1050
1051/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1052///
1053/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1054/// - not a constant,
1055/// - not castable to MzTimestamp,
1056/// - is null,
1057/// - contains an unmaterializable function,
1058/// - some other evaluation error occurs, e.g., a division by 0,
1059/// - contains aggregates, subqueries, parameters, or window function calls.
1060pub fn plan_as_of_or_up_to(
1061    scx: &StatementContext,
1062    mut expr: Expr<Aug>,
1063) -> Result<mz_repr::Timestamp, PlanError> {
1064    let scope = Scope::empty();
1065    let desc = RelationDesc::empty();
1066    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1067    // evaluated only once.)
1068    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1069    transform_ast::transform(scx, &mut expr)?;
1070    let ecx = &ExprContext {
1071        qcx: &qcx,
1072        name: "AS OF or UP TO",
1073        scope: &scope,
1074        relation_type: desc.typ(),
1075        allow_aggregates: false,
1076        allow_subqueries: false,
1077        allow_parameters: false,
1078        allow_windows: false,
1079    };
1080    let hir = plan_expr(ecx, &expr)?.cast_to(
1081        ecx,
1082        CastContext::Assignment,
1083        &SqlScalarType::MzTimestamp,
1084    )?;
1085    if hir.contains_unmaterializable() {
1086        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1087    }
1088    // At this point, we definitely have a constant expression:
1089    // - it can't contain any unmaterializable functions;
1090    // - it can't refer to any columns.
1091    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1092    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1093    // division by 0.
1094    let timestamp = hir
1095        .into_literal_mz_timestamp()
1096        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1097    Ok(timestamp)
1098}
1099
1100/// Plans an expression in the AS position of a `CREATE SECRET`.
1101pub fn plan_secret_as(
1102    scx: &StatementContext,
1103    mut expr: Expr<Aug>,
1104) -> Result<MirScalarExpr, PlanError> {
1105    let scope = Scope::empty();
1106    let desc = RelationDesc::empty();
1107    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1108
1109    transform_ast::transform(scx, &mut expr)?;
1110
1111    let ecx = &ExprContext {
1112        qcx: &qcx,
1113        name: "AS",
1114        scope: &scope,
1115        relation_type: desc.typ(),
1116        allow_aggregates: false,
1117        allow_subqueries: false,
1118        allow_parameters: false,
1119        allow_windows: false,
1120    };
1121    let expr = plan_expr(ecx, &expr)?
1122        .type_as(ecx, &SqlScalarType::Bytes)?
1123        .lower_uncorrelated()?;
1124    Ok(expr)
1125}
1126
1127/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1128pub fn plan_webhook_validate_using(
1129    scx: &StatementContext,
1130    validate_using: CreateWebhookSourceCheck<Aug>,
1131) -> Result<WebhookValidation, PlanError> {
1132    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1133
1134    let CreateWebhookSourceCheck {
1135        options,
1136        using: mut expr,
1137    } = validate_using;
1138
1139    let mut column_typs = vec![];
1140    let mut column_names = vec![];
1141
1142    let (bodies, headers, secrets) = options
1143        .map(|o| (o.bodies, o.headers, o.secrets))
1144        .unwrap_or_default();
1145
1146    // Append all of the bodies so they can be used in the expression.
1147    let mut body_tuples = vec![];
1148    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1149        let scalar_type = use_bytes
1150            .then_some(SqlScalarType::Bytes)
1151            .unwrap_or(SqlScalarType::String);
1152        let name = alias
1153            .map(|a| a.into_string())
1154            .unwrap_or_else(|| "body".to_string());
1155
1156        column_typs.push(SqlColumnType {
1157            scalar_type,
1158            nullable: false,
1159        });
1160        column_names.push(name);
1161
1162        // Store the column index so we can be sure to provide this body correctly.
1163        let column_idx = column_typs.len() - 1;
1164        // Double check we're consistent with column names.
1165        assert_eq!(
1166            column_idx,
1167            column_names.len() - 1,
1168            "body column names and types don't match"
1169        );
1170        body_tuples.push((column_idx, use_bytes));
1171    }
1172
1173    // Append all of the headers so they can be used in the expression.
1174    let mut header_tuples = vec![];
1175
1176    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1177        let value_type = use_bytes
1178            .then_some(SqlScalarType::Bytes)
1179            .unwrap_or(SqlScalarType::String);
1180        let name = alias
1181            .map(|a| a.into_string())
1182            .unwrap_or_else(|| "headers".to_string());
1183
1184        column_typs.push(SqlColumnType {
1185            scalar_type: SqlScalarType::Map {
1186                value_type: Box::new(value_type),
1187                custom_id: None,
1188            },
1189            nullable: false,
1190        });
1191        column_names.push(name);
1192
1193        // Store the column index so we can be sure to provide this body correctly.
1194        let column_idx = column_typs.len() - 1;
1195        // Double check we're consistent with column names.
1196        assert_eq!(
1197            column_idx,
1198            column_names.len() - 1,
1199            "header column names and types don't match"
1200        );
1201        header_tuples.push((column_idx, use_bytes));
1202    }
1203
1204    // Append all secrets so they can be used in the expression.
1205    let mut validation_secrets = vec![];
1206
1207    for CreateWebhookSourceSecret {
1208        secret,
1209        alias,
1210        use_bytes,
1211    } in secrets
1212    {
1213        // Either provide the secret to the validation expression as Bytes or a String.
1214        let scalar_type = use_bytes
1215            .then_some(SqlScalarType::Bytes)
1216            .unwrap_or(SqlScalarType::String);
1217
1218        column_typs.push(SqlColumnType {
1219            scalar_type,
1220            nullable: false,
1221        });
1222        let ResolvedItemName::Item {
1223            id,
1224            full_name: FullItemName { item, .. },
1225            ..
1226        } = secret
1227        else {
1228            return Err(PlanError::InvalidSecret(Box::new(secret)));
1229        };
1230
1231        // Plan the expression using the secret's alias, if one is provided.
1232        let name = if let Some(alias) = alias {
1233            alias.into_string()
1234        } else {
1235            item
1236        };
1237        column_names.push(name);
1238
1239        // Get the column index that corresponds for this secret, so we can make sure to provide the
1240        // secrets in the correct order during evaluation.
1241        let column_idx = column_typs.len() - 1;
1242        // Double check that our column names and types match.
1243        assert_eq!(
1244            column_idx,
1245            column_names.len() - 1,
1246            "column names and types don't match"
1247        );
1248
1249        validation_secrets.push(WebhookValidationSecret {
1250            id,
1251            column_idx,
1252            use_bytes,
1253        });
1254    }
1255
1256    let relation_typ = SqlRelationType::new(column_typs);
1257    let desc = RelationDesc::new(relation_typ, column_names.clone());
1258    let scope = Scope::from_source(None, column_names);
1259
1260    transform_ast::transform(scx, &mut expr)?;
1261
1262    let ecx = &ExprContext {
1263        qcx: &qcx,
1264        name: "CHECK",
1265        scope: &scope,
1266        relation_type: desc.typ(),
1267        allow_aggregates: false,
1268        allow_subqueries: false,
1269        allow_parameters: false,
1270        allow_windows: false,
1271    };
1272    let expr = plan_expr(ecx, &expr)?
1273        .type_as(ecx, &SqlScalarType::Bool)?
1274        .lower_uncorrelated()?;
1275    let validation = WebhookValidation {
1276        expression: expr,
1277        relation_desc: desc,
1278        bodies: body_tuples,
1279        headers: header_tuples,
1280        secrets: validation_secrets,
1281    };
1282    Ok(validation)
1283}
1284
1285pub fn plan_default_expr(
1286    scx: &StatementContext,
1287    expr: &Expr<Aug>,
1288    target_ty: &SqlScalarType,
1289) -> Result<HirScalarExpr, PlanError> {
1290    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1291    let ecx = &ExprContext {
1292        qcx: &qcx,
1293        name: "DEFAULT expression",
1294        scope: &Scope::empty(),
1295        relation_type: &SqlRelationType::empty(),
1296        allow_aggregates: false,
1297        allow_subqueries: false,
1298        allow_parameters: false,
1299        allow_windows: false,
1300    };
1301    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1302    Ok(hir)
1303}
1304
1305pub fn plan_params<'a>(
1306    scx: &'a StatementContext,
1307    params: Vec<Expr<Aug>>,
1308    desc: &StatementDesc,
1309) -> Result<Params, PlanError> {
1310    if params.len() != desc.param_types.len() {
1311        sql_bail!(
1312            "expected {} params, got {}",
1313            desc.param_types.len(),
1314            params.len()
1315        );
1316    }
1317
1318    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1319
1320    let mut datums = Row::default();
1321    let mut packer = datums.packer();
1322    let mut actual_types = Vec::new();
1323    let temp_storage = &RowArena::new();
1324    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1325        transform_ast::transform(scx, &mut expr)?;
1326
1327        let ecx = execute_expr_context(&qcx);
1328        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1329        let actual_ty = ecx.scalar_type(&ex);
1330        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1331            return Err(PlanError::WrongParameterType(
1332                i + 1,
1333                ecx.humanize_scalar_type(expected_ty, false),
1334                ecx.humanize_scalar_type(&actual_ty, false),
1335            ));
1336        }
1337        let ex = ex.lower_uncorrelated()?;
1338        let evaled = ex.eval(&[], temp_storage)?;
1339        packer.push(evaled);
1340        actual_types.push(actual_ty);
1341    }
1342    Ok(Params {
1343        datums,
1344        execute_types: actual_types,
1345        expected_types: desc.param_types.clone(),
1346    })
1347}
1348
1349static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1350static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1351
1352/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1353pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1354    ExprContext {
1355        qcx,
1356        name: "EXECUTE",
1357        scope: &EXECUTE_CONTEXT_SCOPE,
1358        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1359        allow_aggregates: false,
1360        allow_subqueries: false,
1361        allow_parameters: false,
1362        allow_windows: false,
1363    }
1364}
1365
1366/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1367///
1368/// This is an assignment cast also in Postgres, see
1369/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1370pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1371    LazyLock::new(|| CastContext::Assignment);
1372
1373pub fn plan_index_exprs<'a>(
1374    scx: &'a StatementContext,
1375    on_desc: &RelationDesc,
1376    exprs: Vec<Expr<Aug>>,
1377) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1378    let scope = Scope::from_source(None, on_desc.iter_names());
1379    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1380
1381    let ecx = &ExprContext {
1382        qcx: &qcx,
1383        name: "CREATE INDEX",
1384        scope: &scope,
1385        relation_type: on_desc.typ(),
1386        allow_aggregates: false,
1387        allow_subqueries: false,
1388        allow_parameters: false,
1389        allow_windows: false,
1390    };
1391    let mut out = vec![];
1392    for mut expr in exprs {
1393        transform_ast::transform(scx, &mut expr)?;
1394        let expr = plan_expr_or_col_index(ecx, &expr)?;
1395        let mut expr = expr.lower_uncorrelated()?;
1396        expr.reduce(&on_desc.typ().column_types);
1397        out.push(expr);
1398    }
1399    Ok(out)
1400}
1401
1402fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1403    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1404        Some(column) => Ok(HirScalarExpr::column(column)),
1405        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1406    }
1407}
1408
1409fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1410    match e {
1411        Expr::Value(Value::Number(n)) => {
1412            let n = n.parse::<usize>().map_err(|e| {
1413                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1414            })?;
1415            if n < 1 || n > max {
1416                sql_bail!(
1417                    "column reference {} in {} is out of range (1 - {})",
1418                    n,
1419                    name,
1420                    max
1421                );
1422            }
1423            Ok(Some(n - 1))
1424        }
1425        _ => Ok(None),
1426    }
1427}
1428
1429struct PlannedQuery {
1430    expr: HirRelationExpr,
1431    scope: Scope,
1432    order_by: Vec<ColumnOrder>,
1433    limit: Option<HirScalarExpr>,
1434    /// `offset` is either
1435    /// - an Int64 literal
1436    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1437    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1438    ///   later.)
1439    offset: HirScalarExpr,
1440    project: Vec<usize>,
1441    group_size_hints: GroupSizeHints,
1442}
1443
1444fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1445    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1446}
1447
1448fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1449    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1450    // for the identifiers, so that they can be re-installed before returning.
1451    let cte_bindings = plan_ctes(qcx, q)?;
1452
1453    let limit = match &q.limit {
1454        None => None,
1455        Some(Limit {
1456            quantity,
1457            with_ties: false,
1458        }) => {
1459            let ecx = &ExprContext {
1460                qcx,
1461                name: "LIMIT",
1462                scope: &Scope::empty(),
1463                relation_type: &SqlRelationType::empty(),
1464                allow_aggregates: false,
1465                allow_subqueries: true,
1466                allow_parameters: true,
1467                allow_windows: false,
1468            };
1469            let limit = plan_expr(ecx, quantity)?;
1470            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1471
1472            let limit = if limit.is_constant() {
1473                let arena = RowArena::new();
1474                let limit = limit.lower_uncorrelated()?;
1475
1476                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1477                // that the error happened in a LIMIT clause, so that we have better error msg for
1478                // something like `SELECT 5 LIMIT 'aaa'`.
1479                match limit.eval(&[], &arena)? {
1480                    d @ Datum::Int64(v) if v >= 0 => {
1481                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1482                    }
1483                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1484                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1485                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1486                }
1487            } else {
1488                // Gate non-constant LIMIT expressions behind a feature flag
1489                qcx.scx
1490                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1491                limit
1492            };
1493
1494            Some(limit)
1495        }
1496        Some(Limit {
1497            quantity: _,
1498            with_ties: true,
1499        }) => bail_unsupported!("FETCH ... WITH TIES"),
1500    };
1501
1502    let offset = match &q.offset {
1503        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1504        Some(offset) => {
1505            let ecx = &ExprContext {
1506                qcx,
1507                name: "OFFSET",
1508                scope: &Scope::empty(),
1509                relation_type: &SqlRelationType::empty(),
1510                allow_aggregates: false,
1511                allow_subqueries: false,
1512                allow_parameters: true,
1513                allow_windows: false,
1514            };
1515            let offset = plan_expr(ecx, offset)?;
1516            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1517
1518            let offset = if offset.is_constant() {
1519                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1520                let offset_value = offset_into_value(offset)?;
1521                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1522            } else {
1523                // The only case when this is allowed to not be a constant is if it contains
1524                // parameters. (In which case, we'll later check that it's a constant after
1525                // parameter binding.)
1526                if !offset.contains_parameters() {
1527                    return Err(PlanError::InvalidOffset(format!(
1528                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1529                        offset
1530                    )));
1531                }
1532                offset
1533            };
1534            offset
1535        }
1536    };
1537
1538    let mut planned_query = match &q.body {
1539        SetExpr::Select(s) => {
1540            // Extract query options.
1541            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1542            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1543
1544            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1545            PlannedQuery {
1546                expr: plan.expr,
1547                scope: plan.scope,
1548                order_by: plan.order_by,
1549                project: plan.project,
1550                limit,
1551                offset,
1552                group_size_hints,
1553            }
1554        }
1555        _ => {
1556            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1557            let ecx = &ExprContext {
1558                qcx,
1559                name: "ORDER BY clause of a set expression",
1560                scope: &scope,
1561                relation_type: &qcx.relation_type(&expr),
1562                allow_aggregates: false,
1563                allow_subqueries: true,
1564                allow_parameters: true,
1565                allow_windows: false,
1566            };
1567            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1568            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1569            let project = (0..ecx.relation_type.arity()).collect();
1570            PlannedQuery {
1571                expr: expr.map(map_exprs),
1572                scope,
1573                order_by,
1574                limit,
1575                project,
1576                offset,
1577                group_size_hints: GroupSizeHints::default(),
1578            }
1579        }
1580    };
1581
1582    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1583    match &q.ctes {
1584        CteBlock::Simple(_) => {
1585            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1586                if let Some(cte) = qcx.ctes.remove(&id) {
1587                    planned_query.expr = HirRelationExpr::Let {
1588                        name: cte.name,
1589                        id: id.clone(),
1590                        value: Box::new(value),
1591                        body: Box::new(planned_query.expr),
1592                    };
1593                }
1594                if let Some(shadowed_val) = shadowed_val {
1595                    qcx.ctes.insert(id, shadowed_val);
1596                }
1597            }
1598        }
1599        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1600            let MutRecBlockOptionExtracted {
1601                recursion_limit,
1602                return_at_recursion_limit,
1603                error_at_recursion_limit,
1604                seen: _,
1605            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1606            let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1607                (None, None, None) => None,
1608                (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1609                (None, Some(max_iters), None) => Some((max_iters, true)),
1610                (None, None, Some(max_iters)) => Some((max_iters, false)),
1611                _ => {
1612                    return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1613                }
1614            }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1615                max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1616                return_at_limit: *return_at_limit,
1617            }))?;
1618
1619            let mut bindings = Vec::new();
1620            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1621                if let Some(cte) = qcx.ctes.remove(&id) {
1622                    bindings.push((cte.name, id, value, cte.desc.typ().clone()));
1623                }
1624                if let Some(shadowed_val) = shadowed_val {
1625                    qcx.ctes.insert(id, shadowed_val);
1626                }
1627            }
1628            if !bindings.is_empty() {
1629                planned_query.expr = HirRelationExpr::LetRec {
1630                    limit,
1631                    bindings,
1632                    body: Box::new(planned_query.expr),
1633                }
1634            }
1635        }
1636    }
1637
1638    Ok(planned_query)
1639}
1640
1641/// Converts an OFFSET expression into a value.
1642pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1643    let offset = offset
1644        .try_into_literal_int64()
1645        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1646    if offset < 0 {
1647        return Err(PlanError::InvalidOffset(format!(
1648            "must not be negative, got {}",
1649            offset
1650        )));
1651    }
1652    Ok(offset)
1653}
1654
1655generate_extracted_config!(
1656    MutRecBlockOption,
1657    (RecursionLimit, u64),
1658    (ReturnAtRecursionLimit, u64),
1659    (ErrorAtRecursionLimit, u64)
1660);
1661
1662/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1663///
1664/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1665/// shadowed value that can be reinstalled once the planning has completed.
1666pub fn plan_ctes(
1667    qcx: &mut QueryContext,
1668    q: &Query<Aug>,
1669) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1670    // Accumulate planned expressions and shadowed descriptions.
1671    let mut result = Vec::new();
1672    // Retain the old descriptions of CTE bindings so that we can restore them
1673    // after we're done planning this SELECT.
1674    let mut shadowed_descs = BTreeMap::new();
1675
1676    // A reused identifier indicates a reused name.
1677    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1678        sql_bail!(
1679            "WITH query name {} specified more than once",
1680            normalize::ident_ref(ident).quoted()
1681        )
1682    }
1683
1684    match &q.ctes {
1685        CteBlock::Simple(ctes) => {
1686            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1687            for cte in ctes.iter() {
1688                let cte_name = normalize::ident(cte.alias.name.clone());
1689                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1690                let typ = qcx.relation_type(&val);
1691                let mut desc = RelationDesc::new(typ, scope.column_names());
1692                plan_utils::maybe_rename_columns(
1693                    format!("CTE {}", cte.alias.name),
1694                    &mut desc,
1695                    &cte.alias.columns,
1696                )?;
1697                // Capture the prior value if it exists, so that it can be re-installed.
1698                let shadowed = qcx.ctes.insert(
1699                    cte.id,
1700                    CteDesc {
1701                        name: cte_name,
1702                        desc,
1703                    },
1704                );
1705
1706                result.push((cte.id, val, shadowed));
1707            }
1708        }
1709        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1710            // Insert column types into `qcx.ctes` first for recursive bindings.
1711            for cte in ctes.iter() {
1712                let cte_name = normalize::ident(cte.name.clone());
1713                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1714                for column in cte.columns.iter() {
1715                    desc_columns.push((
1716                        normalize::column_name(column.name.clone()),
1717                        SqlColumnType {
1718                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1719                            nullable: true,
1720                        },
1721                    ));
1722                }
1723                let desc = RelationDesc::from_names_and_types(desc_columns);
1724                let shadowed = qcx.ctes.insert(
1725                    cte.id,
1726                    CteDesc {
1727                        name: cte_name,
1728                        desc,
1729                    },
1730                );
1731                // Capture the prior value if it exists, so that it can be re-installed.
1732                if let Some(shadowed) = shadowed {
1733                    shadowed_descs.insert(cte.id, shadowed);
1734                }
1735            }
1736
1737            // Plan all CTEs and validate the proposed types.
1738            for cte in ctes.iter() {
1739                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1740
1741                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1742
1743                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1744                    // Once WMR CTEs support NOT NULL constraints, check that
1745                    // nullability of derived column types are compatible.
1746                    sql_bail!(
1747                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1748                    );
1749                }
1750
1751                if !proposed_typ.keys.is_empty() {
1752                    // Once WMR CTEs support keys, check that keys exactly
1753                    // overlap.
1754                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1755                }
1756
1757                // Validate that the derived and proposed types are the same.
1758                let derived_typ = qcx.relation_type(&val);
1759
1760                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1761                    let cte_name = normalize::ident(cte.name.clone());
1762                    let proposed_typ = proposed_typ
1763                        .column_types
1764                        .iter()
1765                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1766                        .collect::<Vec<_>>();
1767                    let inferred_typ = derived_typ
1768                        .column_types
1769                        .iter()
1770                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1771                        .collect::<Vec<_>>();
1772                    Err(PlanError::RecursiveTypeMismatch(
1773                        cte_name,
1774                        proposed_typ,
1775                        inferred_typ,
1776                    ))
1777                };
1778
1779                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1780                    return type_err(proposed_typ, derived_typ);
1781                }
1782
1783                // Cast derived types to proposed types or error.
1784                let val = match cast_relation(
1785                    qcx,
1786                    // Choose `CastContext::Assignment`` because the user has
1787                    // been explicit about the types they expect. Choosing
1788                    // `CastContext::Implicit` is not "strong" enough to impose
1789                    // typmods from proposed types onto values.
1790                    CastContext::Assignment,
1791                    val,
1792                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1793                ) {
1794                    Ok(val) => val,
1795                    Err(_) => return type_err(proposed_typ, derived_typ),
1796                };
1797
1798                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1799            }
1800        }
1801    }
1802
1803    Ok(result)
1804}
1805
1806pub fn plan_nested_query(
1807    qcx: &mut QueryContext,
1808    q: &Query<Aug>,
1809) -> Result<(HirRelationExpr, Scope), PlanError> {
1810    let PlannedQuery {
1811        mut expr,
1812        scope,
1813        order_by,
1814        limit,
1815        offset,
1816        project,
1817        group_size_hints,
1818    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1819    if limit.is_some()
1820        || !offset
1821            .clone()
1822            .try_into_literal_int64()
1823            .is_ok_and(|offset| offset == 0)
1824    {
1825        expr = HirRelationExpr::top_k(
1826            expr,
1827            vec![],
1828            order_by,
1829            limit,
1830            offset,
1831            group_size_hints.limit_input_group_size,
1832        );
1833    }
1834    Ok((expr.project(project), scope))
1835}
1836
1837fn plan_set_expr(
1838    qcx: &mut QueryContext,
1839    q: &SetExpr<Aug>,
1840) -> Result<(HirRelationExpr, Scope), PlanError> {
1841    match q {
1842        SetExpr::Select(select) => {
1843            let order_by_exprs = Vec::new();
1844            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1845            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1846            // should not have planned any ordering.
1847            assert!(plan.order_by.is_empty());
1848            Ok((plan.expr.project(plan.project), plan.scope))
1849        }
1850        SetExpr::SetOperation {
1851            op,
1852            all,
1853            left,
1854            right,
1855        } => {
1856            // Plan the LHS and RHS.
1857            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1858            let (right_expr, right_scope) =
1859                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1860
1861            // Validate that the LHS and RHS are the same width.
1862            let left_type = qcx.relation_type(&left_expr);
1863            let right_type = qcx.relation_type(&right_expr);
1864            if left_type.arity() != right_type.arity() {
1865                sql_bail!(
1866                    "each {} query must have the same number of columns: {} vs {}",
1867                    op,
1868                    left_type.arity(),
1869                    right_type.arity(),
1870                );
1871            }
1872
1873            // Match the types of the corresponding columns on the LHS and RHS
1874            // using the normal type coercion rules. This is equivalent to
1875            // `coerce_homogeneous_exprs`, but implemented in terms of
1876            // `HirRelationExpr` rather than `HirScalarExpr`.
1877            let left_ecx = &ExprContext {
1878                qcx,
1879                name: &op.to_string(),
1880                scope: &left_scope,
1881                relation_type: &left_type,
1882                allow_aggregates: false,
1883                allow_subqueries: false,
1884                allow_parameters: false,
1885                allow_windows: false,
1886            };
1887            let right_ecx = &ExprContext {
1888                qcx,
1889                name: &op.to_string(),
1890                scope: &right_scope,
1891                relation_type: &right_type,
1892                allow_aggregates: false,
1893                allow_subqueries: false,
1894                allow_parameters: false,
1895                allow_windows: false,
1896            };
1897            let mut left_casts = vec![];
1898            let mut right_casts = vec![];
1899            for (i, (left_type, right_type)) in left_type
1900                .column_types
1901                .iter()
1902                .zip_eq(right_type.column_types.iter())
1903                .enumerate()
1904            {
1905                let types = &[
1906                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1907                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1908                ];
1909                let target =
1910                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1911                match typeconv::plan_cast(
1912                    left_ecx,
1913                    CastContext::Implicit,
1914                    HirScalarExpr::column(i),
1915                    &target,
1916                ) {
1917                    Ok(expr) => left_casts.push(expr),
1918                    Err(_) => sql_bail!(
1919                        "{} types {} and {} cannot be matched",
1920                        op,
1921                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1922                        qcx.humanize_scalar_type(&target, false),
1923                    ),
1924                }
1925                match typeconv::plan_cast(
1926                    right_ecx,
1927                    CastContext::Implicit,
1928                    HirScalarExpr::column(i),
1929                    &target,
1930                ) {
1931                    Ok(expr) => right_casts.push(expr),
1932                    Err(_) => sql_bail!(
1933                        "{} types {} and {} cannot be matched",
1934                        op,
1935                        qcx.humanize_scalar_type(&target, false),
1936                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1937                    ),
1938                }
1939            }
1940            let lhs = if left_casts
1941                .iter()
1942                .enumerate()
1943                .any(|(i, e)| e != &HirScalarExpr::column(i))
1944            {
1945                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1946                left_expr.map(left_casts).project(project_key)
1947            } else {
1948                left_expr
1949            };
1950            let rhs = if right_casts
1951                .iter()
1952                .enumerate()
1953                .any(|(i, e)| e != &HirScalarExpr::column(i))
1954            {
1955                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1956                right_expr.map(right_casts).project(project_key)
1957            } else {
1958                right_expr
1959            };
1960
1961            let relation_expr = match op {
1962                SetOperator::Union => {
1963                    if *all {
1964                        lhs.union(rhs)
1965                    } else {
1966                        lhs.union(rhs).distinct()
1967                    }
1968                }
1969                SetOperator::Except => Hir::except(all, lhs, rhs),
1970                SetOperator::Intersect => {
1971                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1972                    // Though we believe that render() does The Right Thing (TM)
1973                    // Also note that we do *not* need another threshold() at the end of the method chain
1974                    // because the right-hand side of the outer union only produces existing records,
1975                    // i.e., the record counts for differential data flow definitely remain non-negative.
1976                    let left_clone = lhs.clone();
1977                    if *all {
1978                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1979                    } else {
1980                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1981                            .distinct()
1982                    }
1983                }
1984            };
1985            let scope = Scope::from_source(
1986                None,
1987                // Column names are taken from the left, as in Postgres.
1988                left_scope.column_names(),
1989            );
1990
1991            Ok((relation_expr, scope))
1992        }
1993        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1994        SetExpr::Table(name) => {
1995            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1996            Ok((expr, scope))
1997        }
1998        SetExpr::Query(query) => {
1999            let (expr, scope) = plan_nested_query(qcx, query)?;
2000            Ok((expr, scope))
2001        }
2002        SetExpr::Show(stmt) => {
2003            // The create SQL definition of involving this query, will have the explicit `SHOW`
2004            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2005            // current schema of the executing user. When Materialize restarts and tries to re-plan
2006            // these queries, it will only have access to the raw `SHOW` command and have no idea
2007            // what schema to use. As a result Materialize will fail to boot.
2008            //
2009            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2010            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2011            // However, banning show commands in views gives us more flexibility to change their
2012            // output.
2013            //
2014            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2015            // with all show commands expanded into their equivalent SELECT statements.
2016            if !qcx.lifetime.allow_show() {
2017                return Err(PlanError::ShowCommandInView);
2018            }
2019
2020            // Some SHOW statements are a SELECT query. Others produces Rows
2021            // directly. Convert both of these to the needed Hir and Scope.
2022            fn to_hirscope(
2023                plan: ShowCreatePlan,
2024                desc: StatementDesc,
2025            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2026                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2027                let desc = desc.relation_desc.expect("must exist");
2028                let expr = HirRelationExpr::constant(rows, desc.typ().clone());
2029                let scope = Scope::from_source(None, desc.iter_names());
2030                Ok((expr, scope))
2031            }
2032
2033            match stmt.clone() {
2034                ShowStatement::ShowColumns(stmt) => {
2035                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2036                }
2037                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2038                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2039                    show::describe_show_create_connection(qcx.scx, stmt)?,
2040                ),
2041                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2042                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2043                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2044                ),
2045                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2046                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2047                    show::describe_show_create_index(qcx.scx, stmt)?,
2048                ),
2049                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2050                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2051                    show::describe_show_create_sink(qcx.scx, stmt)?,
2052                ),
2053                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2054                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2055                    show::describe_show_create_source(qcx.scx, stmt)?,
2056                ),
2057                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2058                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2059                    show::describe_show_create_table(qcx.scx, stmt)?,
2060                ),
2061                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2062                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2063                    show::describe_show_create_view(qcx.scx, stmt)?,
2064                ),
2065                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2066                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2067                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2068                ),
2069                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2070                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2071                    show::describe_show_create_type(qcx.scx, stmt)?,
2072                ),
2073                ShowStatement::ShowObjects(stmt) => {
2074                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2075                }
2076                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2077                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2078            }
2079        }
2080    }
2081}
2082
2083/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2084fn plan_values(
2085    qcx: &QueryContext,
2086    values: &[Vec<Expr<Aug>>],
2087) -> Result<(HirRelationExpr, Scope), PlanError> {
2088    assert!(!values.is_empty());
2089
2090    let ecx = &ExprContext {
2091        qcx,
2092        name: "VALUES",
2093        scope: &Scope::empty(),
2094        relation_type: &SqlRelationType::empty(),
2095        allow_aggregates: false,
2096        allow_subqueries: true,
2097        allow_parameters: true,
2098        allow_windows: false,
2099    };
2100
2101    let ncols = values[0].len();
2102    let nrows = values.len();
2103
2104    // Arrange input expressions by columns, not rows, so that we can
2105    // call `coerce_homogeneous_exprs` on each column.
2106    let mut cols = vec![vec![]; ncols];
2107    for row in values {
2108        if row.len() != ncols {
2109            sql_bail!(
2110                "VALUES expression has varying number of columns: {} vs {}",
2111                row.len(),
2112                ncols
2113            );
2114        }
2115        for (i, v) in row.iter().enumerate() {
2116            cols[i].push(v);
2117        }
2118    }
2119
2120    // Plan each column.
2121    let mut col_iters = Vec::with_capacity(ncols);
2122    let mut col_types = Vec::with_capacity(ncols);
2123    for col in &cols {
2124        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2125        let mut col_type = ecx.column_type(&col[0]);
2126        for val in &col[1..] {
2127            col_type = col_type.union(&ecx.column_type(val))?;
2128        }
2129        col_types.push(col_type);
2130        col_iters.push(col.into_iter());
2131    }
2132
2133    // Build constant relation.
2134    let mut exprs = vec![];
2135    for _ in 0..nrows {
2136        for i in 0..ncols {
2137            exprs.push(col_iters[i].next().unwrap());
2138        }
2139    }
2140    let out = HirRelationExpr::CallTable {
2141        func: TableFunc::Wrap {
2142            width: ncols,
2143            types: col_types,
2144        },
2145        exprs,
2146    };
2147
2148    // Build column names.
2149    let mut scope = Scope::empty();
2150    for i in 0..ncols {
2151        let name = format!("column{}", i + 1);
2152        scope.items.push(ScopeItem::from_column_name(name));
2153    }
2154
2155    Ok((out, scope))
2156}
2157
2158/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2159/// statement.
2160///
2161/// This is special-cased in PostgreSQL and different enough from `plan_values`
2162/// that it is easier to use a separate function entirely. Unlike a normal
2163/// `VALUES` clause, each value is coerced to the type of the target table
2164/// via an assignment cast.
2165///
2166/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2167fn plan_values_insert(
2168    qcx: &QueryContext,
2169    target_names: &[&ColumnName],
2170    target_types: &[&SqlScalarType],
2171    values: &[Vec<Expr<Aug>>],
2172) -> Result<HirRelationExpr, PlanError> {
2173    assert!(!values.is_empty());
2174
2175    if !values.iter().map(|row| row.len()).all_equal() {
2176        sql_bail!("VALUES lists must all be the same length");
2177    }
2178
2179    let ecx = &ExprContext {
2180        qcx,
2181        name: "VALUES",
2182        scope: &Scope::empty(),
2183        relation_type: &SqlRelationType::empty(),
2184        allow_aggregates: false,
2185        allow_subqueries: true,
2186        allow_parameters: true,
2187        allow_windows: false,
2188    };
2189
2190    let mut exprs = vec![];
2191    let mut types = vec![];
2192    for row in values {
2193        if row.len() > target_names.len() {
2194            sql_bail!("INSERT has more expressions than target columns");
2195        }
2196        for (column, val) in row.into_iter().enumerate() {
2197            let target_type = &target_types[column];
2198            let val = plan_expr(ecx, val)?;
2199            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2200            let source_type = &ecx.scalar_type(&val);
2201            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2202                Ok(val) => val,
2203                Err(_) => sql_bail!(
2204                    "column {} is of type {} but expression is of type {}",
2205                    target_names[column].quoted(),
2206                    qcx.humanize_scalar_type(target_type, false),
2207                    qcx.humanize_scalar_type(source_type, false),
2208                ),
2209            };
2210            if column >= types.len() {
2211                types.push(ecx.column_type(&val));
2212            } else {
2213                types[column] = types[column].union(&ecx.column_type(&val))?;
2214            }
2215            exprs.push(val);
2216        }
2217    }
2218
2219    Ok(HirRelationExpr::CallTable {
2220        func: TableFunc::Wrap {
2221            width: values[0].len(),
2222            types,
2223        },
2224        exprs,
2225    })
2226}
2227
2228fn plan_join_identity() -> (HirRelationExpr, Scope) {
2229    let typ = SqlRelationType::new(vec![]);
2230    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2231    let scope = Scope::empty();
2232    (expr, scope)
2233}
2234
2235/// Describes how to execute a SELECT query.
2236///
2237/// `order_by` describes how to order the rows in `expr` *before* applying the
2238/// projection. The `scope` describes the columns in `expr` *after* the
2239/// projection has been applied.
2240#[derive(Debug)]
2241struct SelectPlan {
2242    expr: HirRelationExpr,
2243    scope: Scope,
2244    order_by: Vec<ColumnOrder>,
2245    project: Vec<usize>,
2246}
2247
2248generate_extracted_config!(
2249    SelectOption,
2250    (ExpectedGroupSize, u64),
2251    (AggregateInputGroupSize, u64),
2252    (DistinctOnInputGroupSize, u64),
2253    (LimitInputGroupSize, u64)
2254);
2255
2256/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2257///
2258/// Normally, the ORDER BY clause occurs after the columns specified in the
2259/// SELECT list have been projected. In a query like
2260///
2261///   CREATE TABLE (a int, b int)
2262///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2263///
2264/// it is valid to refer to `a`, because it is explicitly selected, but it would
2265/// not be valid to refer to unselected column `b`.
2266///
2267/// But PostgreSQL extends the standard to permit queries like
2268///
2269///   SELECT a FROM t ORDER BY b
2270///
2271/// where expressions in the ORDER BY clause can refer to *both* input columns
2272/// and output columns.
2273fn plan_select_from_where(
2274    qcx: &QueryContext,
2275    mut s: Select<Aug>,
2276    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2277) -> Result<SelectPlan, PlanError> {
2278    // TODO: Both `s` and `order_by_exprs` are not references because the
2279    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2280    // table function support (the UUID mapping). Attempt to change this so callers
2281    // don't need to clone the Select.
2282
2283    // Extract query options.
2284    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2285    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2286
2287    // Step 1. Handle FROM clause, including joins.
2288    let (mut relation_expr, mut from_scope) =
2289        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2290            let (left, left_scope) = l;
2291            plan_join(
2292                qcx,
2293                left,
2294                left_scope,
2295                &Join {
2296                    relation: TableFactor::NestedJoin {
2297                        join: Box::new(twj.clone()),
2298                        alias: None,
2299                    },
2300                    join_operator: JoinOperator::CrossJoin,
2301                },
2302            )
2303        })?;
2304
2305    // Step 2. Handle WHERE clause.
2306    if let Some(selection) = &s.selection {
2307        let ecx = &ExprContext {
2308            qcx,
2309            name: "WHERE clause",
2310            scope: &from_scope,
2311            relation_type: &qcx.relation_type(&relation_expr),
2312            allow_aggregates: false,
2313            allow_subqueries: true,
2314            allow_parameters: true,
2315            allow_windows: false,
2316        };
2317        let expr = plan_expr(ecx, selection)
2318            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2319            .type_as(ecx, &SqlScalarType::Bool)?;
2320        relation_expr = relation_expr.filter(vec![expr]);
2321    }
2322
2323    // Step 3. Gather aggregates and table functions.
2324    // (But skip window aggregates.)
2325    let (aggregates, table_funcs) = {
2326        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2327        visitor.visit_select_mut(&mut s);
2328        for o in order_by_exprs.iter_mut() {
2329            visitor.visit_order_by_expr_mut(o);
2330        }
2331        visitor.into_result()?
2332    };
2333    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2334    if !table_funcs.is_empty() {
2335        let (expr, scope) = plan_scalar_table_funcs(
2336            qcx,
2337            table_funcs,
2338            &mut table_func_names,
2339            &relation_expr,
2340            &from_scope,
2341        )?;
2342        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2343        from_scope = from_scope.product(scope)?;
2344    }
2345
2346    // Step 4. Expand SELECT clause.
2347    let projection = {
2348        let ecx = &ExprContext {
2349            qcx,
2350            name: "SELECT clause",
2351            scope: &from_scope,
2352            relation_type: &qcx.relation_type(&relation_expr),
2353            allow_aggregates: true,
2354            allow_subqueries: true,
2355            allow_parameters: true,
2356            allow_windows: true,
2357        };
2358        let mut out = vec![];
2359        for si in &s.projection {
2360            if *si == SelectItem::Wildcard && s.from.is_empty() {
2361                sql_bail!("SELECT * with no tables specified is not valid");
2362            }
2363            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2364        }
2365        out
2366    };
2367
2368    // Step 5. Handle GROUP BY clause.
2369    // This will also plan the aggregates gathered in Step 3.
2370    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2371    let (mut group_scope, select_all_mapping) = {
2372        // Compute GROUP BY expressions.
2373        let ecx = &ExprContext {
2374            qcx,
2375            name: "GROUP BY clause",
2376            scope: &from_scope,
2377            relation_type: &qcx.relation_type(&relation_expr),
2378            allow_aggregates: false,
2379            allow_subqueries: true,
2380            allow_parameters: true,
2381            allow_windows: false,
2382        };
2383        let mut group_key = vec![];
2384        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2385        let mut group_hir_exprs = vec![];
2386        let mut group_scope = Scope::empty();
2387        let mut select_all_mapping = BTreeMap::new();
2388
2389        for group_expr in &s.group_by {
2390            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2391            let new_column = group_key.len();
2392
2393            if let Some(group_expr) = group_expr {
2394                // Multiple AST expressions can map to the same HIR expression.
2395                // If we already have a ScopeItem for this HIR, we can add this
2396                // next AST expression to its set
2397                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2398                    existing_scope_item.exprs.insert(group_expr.clone());
2399                    continue;
2400                }
2401            }
2402
2403            let mut scope_item = if let HirScalarExpr::Column(
2404                ColumnRef {
2405                    level: 0,
2406                    column: old_column,
2407                },
2408                _name,
2409            ) = &expr
2410            {
2411                // If we later have `SELECT foo.*` then we have to find all
2412                // the `foo` items in `from_scope` and figure out where they
2413                // ended up in `group_scope`. This is really hard to do
2414                // right using SQL name resolution, so instead we just track
2415                // the movement here.
2416                select_all_mapping.insert(*old_column, new_column);
2417                let scope_item = ecx.scope.items[*old_column].clone();
2418                scope_item
2419            } else {
2420                ScopeItem::empty()
2421            };
2422
2423            if let Some(group_expr) = group_expr.cloned() {
2424                scope_item.exprs.insert(group_expr);
2425            }
2426
2427            group_key.push(from_scope.len() + group_exprs.len());
2428            group_hir_exprs.push(expr.clone());
2429            group_exprs.insert(expr, scope_item);
2430        }
2431
2432        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2433        for expr in &group_hir_exprs {
2434            if let Some(scope_item) = group_exprs.remove(expr) {
2435                group_scope.items.push(scope_item);
2436            }
2437        }
2438
2439        // Plan aggregates.
2440        let ecx = &ExprContext {
2441            qcx,
2442            name: "aggregate function",
2443            scope: &from_scope,
2444            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2445            allow_aggregates: false,
2446            allow_subqueries: true,
2447            allow_parameters: true,
2448            allow_windows: false,
2449        };
2450        let mut agg_exprs = vec![];
2451        for sql_function in aggregates {
2452            if sql_function.over.is_some() {
2453                unreachable!(
2454                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2455                );
2456            }
2457            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2458            group_scope
2459                .items
2460                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2461        }
2462        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2463            // apply GROUP BY / aggregates
2464            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2465                group_key,
2466                agg_exprs,
2467                group_size_hints.aggregate_input_group_size,
2468            );
2469
2470            // For every old column that wasn't a group key, add a scope item
2471            // that errors when referenced. We can't simply drop these items
2472            // from scope. These items need to *exist* because they might shadow
2473            // variables in outer scopes that would otherwise be valid to
2474            // reference, but accessing them needs to produce an error.
2475            for i in 0..from_scope.len() {
2476                if !select_all_mapping.contains_key(&i) {
2477                    let scope_item = &ecx.scope.items[i];
2478                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2479                        table_name: scope_item.table_name.clone(),
2480                        column_name: scope_item.column_name.clone(),
2481                        allow_unqualified_references: scope_item.allow_unqualified_references,
2482                    });
2483                }
2484            }
2485
2486            (group_scope, select_all_mapping)
2487        } else {
2488            // if no GROUP BY, aggregates or having then all columns remain in scope
2489            (
2490                from_scope.clone(),
2491                (0..from_scope.len()).map(|i| (i, i)).collect(),
2492            )
2493        }
2494    };
2495
2496    // Step 6. Handle HAVING clause.
2497    if let Some(ref having) = s.having {
2498        let ecx = &ExprContext {
2499            qcx,
2500            name: "HAVING clause",
2501            scope: &group_scope,
2502            relation_type: &qcx.relation_type(&relation_expr),
2503            allow_aggregates: true,
2504            allow_subqueries: true,
2505            allow_parameters: true,
2506            allow_windows: false,
2507        };
2508        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2509        relation_expr = relation_expr.filter(vec![expr]);
2510    }
2511
2512    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2513    // (This includes window aggregations.)
2514    //
2515    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2516    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2517    //
2518    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2519    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2520    // and can't be part of a bigger expression.
2521    // See https://www.postgresql.org/docs/current/queries-order.html:
2522    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2523    // expression"
2524    let window_funcs = {
2525        let mut visitor = WindowFuncCollector::default();
2526        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2527        // window functions are excluded from other things by `allow_windows` being false when
2528        // planning those before this code).
2529        visitor.visit_select(&s);
2530        for o in order_by_exprs.iter() {
2531            visitor.visit_order_by_expr(o);
2532        }
2533        visitor.into_result()
2534    };
2535    for window_func in window_funcs {
2536        let ecx = &ExprContext {
2537            qcx,
2538            name: "window function",
2539            scope: &group_scope,
2540            relation_type: &qcx.relation_type(&relation_expr),
2541            allow_aggregates: true,
2542            allow_subqueries: true,
2543            allow_parameters: true,
2544            allow_windows: true,
2545        };
2546        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2547        group_scope.items.push(ScopeItem::from_expr(window_func));
2548    }
2549    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2550    // been already planned now. However, we should still set `allow_windows: true` for the
2551    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2552    // msg if an OVER clause is missing from a window function.
2553
2554    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2555    if let Some(ref qualify) = s.qualify {
2556        let ecx = &ExprContext {
2557            qcx,
2558            name: "QUALIFY clause",
2559            scope: &group_scope,
2560            relation_type: &qcx.relation_type(&relation_expr),
2561            allow_aggregates: true,
2562            allow_subqueries: true,
2563            allow_parameters: true,
2564            allow_windows: true,
2565        };
2566        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2567        relation_expr = relation_expr.filter(vec![expr]);
2568    }
2569
2570    // Step 9. Handle SELECT clause.
2571    let output_columns = {
2572        let mut new_exprs = vec![];
2573        let mut new_type = qcx.relation_type(&relation_expr);
2574        let mut output_columns = vec![];
2575        for (select_item, column_name) in &projection {
2576            let ecx = &ExprContext {
2577                qcx,
2578                name: "SELECT clause",
2579                scope: &group_scope,
2580                relation_type: &new_type,
2581                allow_aggregates: true,
2582                allow_subqueries: true,
2583                allow_parameters: true,
2584                allow_windows: true,
2585            };
2586            let expr = match select_item {
2587                ExpandedSelectItem::InputOrdinal(i) => {
2588                    if let Some(column) = select_all_mapping.get(i).copied() {
2589                        HirScalarExpr::column(column)
2590                    } else {
2591                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2592                    }
2593                }
2594                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2595            };
2596            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2597                // Simple column reference; no need to map on a new expression.
2598                output_columns.push((column, column_name));
2599            } else {
2600                // Complicated expression that requires a map expression. We
2601                // update `group_scope` as we go so that future expressions that
2602                // are textually identical to this one can reuse it. This
2603                // duplicate detection is required for proper determination of
2604                // ambiguous column references with SQL92-style `ORDER BY`
2605                // items. See `plan_order_by_or_distinct_expr` for more.
2606                let typ = ecx.column_type(&expr);
2607                new_type.column_types.push(typ);
2608                new_exprs.push(expr);
2609                output_columns.push((group_scope.len(), column_name));
2610                group_scope
2611                    .items
2612                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2613            }
2614        }
2615        relation_expr = relation_expr.map(new_exprs);
2616        output_columns
2617    };
2618    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2619
2620    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2621    let order_by = {
2622        let relation_type = qcx.relation_type(&relation_expr);
2623        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2624            &ExprContext {
2625                qcx,
2626                name: "ORDER BY clause",
2627                scope: &group_scope,
2628                relation_type: &relation_type,
2629                allow_aggregates: true,
2630                allow_subqueries: true,
2631                allow_parameters: true,
2632                allow_windows: true,
2633            },
2634            &order_by_exprs,
2635            &output_columns,
2636        )?;
2637
2638        match s.distinct {
2639            None => relation_expr = relation_expr.map(map_exprs),
2640            Some(Distinct::EntireRow) => {
2641                if relation_type.arity() == 0 {
2642                    sql_bail!("SELECT DISTINCT must have at least one column");
2643                }
2644                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2645                // list, so we can't proceed if `ORDER BY` has introduced any
2646                // columns for arbitrary expressions. This matches PostgreSQL.
2647                if !try_push_projection_order_by(
2648                    &mut relation_expr,
2649                    &mut project_key,
2650                    &mut order_by,
2651                ) {
2652                    sql_bail!(
2653                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2654                    );
2655                }
2656                assert!(map_exprs.is_empty());
2657                relation_expr = relation_expr.distinct();
2658            }
2659            Some(Distinct::On(exprs)) => {
2660                let ecx = &ExprContext {
2661                    qcx,
2662                    name: "DISTINCT ON clause",
2663                    scope: &group_scope,
2664                    relation_type: &qcx.relation_type(&relation_expr),
2665                    allow_aggregates: true,
2666                    allow_subqueries: true,
2667                    allow_parameters: true,
2668                    allow_windows: true,
2669                };
2670
2671                let mut distinct_exprs = vec![];
2672                for expr in &exprs {
2673                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2674                    distinct_exprs.push(expr);
2675                }
2676
2677                let mut distinct_key = vec![];
2678
2679                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2680                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2681                // expressions, though the order of `DISTINCT ON` expressions
2682                // does not matter. This matches PostgreSQL and leaves the door
2683                // open to a future optimization where the `DISTINCT ON` and
2684                // `ORDER BY` operations happen in one pass.
2685                //
2686                // On the bright side, any columns that have already been
2687                // computed by `ORDER BY` can be reused in the distinct key.
2688                let arity = relation_type.arity();
2689                for ord in order_by.iter().take(distinct_exprs.len()) {
2690                    // The unusual construction of `expr` here is to ensure the
2691                    // temporary column expression lives long enough.
2692                    let mut expr = &HirScalarExpr::column(ord.column);
2693                    if ord.column >= arity {
2694                        expr = &map_exprs[ord.column - arity];
2695                    };
2696                    match distinct_exprs.iter().position(move |e| e == expr) {
2697                        None => sql_bail!(
2698                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2699                        ),
2700                        Some(pos) => {
2701                            distinct_exprs.remove(pos);
2702                        }
2703                    }
2704                    distinct_key.push(ord.column);
2705                }
2706
2707                // Add any remaining `DISTINCT ON` expressions to the key.
2708                for expr in distinct_exprs {
2709                    // If the expression is a reference to an existing column,
2710                    // do not introduce a new column to support it.
2711                    let column = match expr {
2712                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2713                        _ => {
2714                            map_exprs.push(expr);
2715                            arity + map_exprs.len() - 1
2716                        }
2717                    };
2718                    distinct_key.push(column);
2719                }
2720
2721                // `DISTINCT ON` is semantically a TopK with limit 1. The
2722                // columns in `ORDER BY` that are not part of the distinct key,
2723                // if there are any, determine the ordering within each group,
2724                // per PostgreSQL semantics.
2725                let distinct_len = distinct_key.len();
2726                relation_expr = HirRelationExpr::top_k(
2727                    relation_expr.map(map_exprs),
2728                    distinct_key,
2729                    order_by.iter().skip(distinct_len).cloned().collect(),
2730                    Some(HirScalarExpr::literal(
2731                        Datum::Int64(1),
2732                        SqlScalarType::Int64,
2733                    )),
2734                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2735                    group_size_hints.distinct_on_input_group_size,
2736                );
2737            }
2738        }
2739
2740        order_by
2741    };
2742
2743    // Construct a clean scope to expose outwards, where all of the state that
2744    // accumulated in the scope during planning of this SELECT is erased. The
2745    // clean scope has at most one name for each column, and the names are not
2746    // associated with any table.
2747    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2748
2749    Ok(SelectPlan {
2750        expr: relation_expr,
2751        scope,
2752        order_by,
2753        project: project_key,
2754    })
2755}
2756
2757fn plan_scalar_table_funcs(
2758    qcx: &QueryContext,
2759    table_funcs: BTreeMap<Function<Aug>, String>,
2760    table_func_names: &mut BTreeMap<String, Ident>,
2761    relation_expr: &HirRelationExpr,
2762    from_scope: &Scope,
2763) -> Result<(HirRelationExpr, Scope), PlanError> {
2764    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2765
2766    for (table_func, id) in table_funcs.iter() {
2767        table_func_names.insert(
2768            id.clone(),
2769            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2770            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2771        );
2772    }
2773    // If there's only a single table function, we can skip generating
2774    // ordinality columns.
2775    if table_funcs.len() == 1 {
2776        let (table_func, id) = table_funcs.iter().next().unwrap();
2777        let (expr, mut scope) =
2778            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2779
2780        // A single table-function might return several columns as a record
2781        let num_cols = scope.len();
2782        for i in 0..scope.len() {
2783            scope.items[i].table_name = Some(PartialItemName {
2784                database: None,
2785                schema: None,
2786                item: id.clone(),
2787            });
2788            scope.items[i].from_single_column_function = num_cols == 1;
2789            scope.items[i].allow_unqualified_references = false;
2790        }
2791        return Ok((expr, scope));
2792    }
2793    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2794    let (expr, mut scope, num_cols) =
2795        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2796
2797    // Munge the scope so table names match with the generated ids.
2798    let mut i = 0;
2799    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2800        for _ in 0..num_cols {
2801            scope.items[i].table_name = Some(PartialItemName {
2802                database: None,
2803                schema: None,
2804                item: id.clone(),
2805            });
2806            scope.items[i].from_single_column_function = num_cols == 1;
2807            scope.items[i].allow_unqualified_references = false;
2808            i += 1;
2809        }
2810        // Ordinality column. This doubles as the
2811        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2812        // because it only needs to be NULL or not.
2813        scope.items[i].table_name = Some(PartialItemName {
2814            database: None,
2815            schema: None,
2816            item: id.clone(),
2817        });
2818        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2819        scope.items[i].allow_unqualified_references = false;
2820        i += 1;
2821    }
2822    // Coalesced ordinality column.
2823    scope.items[i].allow_unqualified_references = false;
2824    Ok((expr, scope))
2825}
2826
2827/// Plans an expression in a `GROUP BY` clause.
2828///
2829/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2830/// names/expressions defined in the `SELECT` clause. These special cases are
2831/// handled by this function; see comments within the implementation for
2832/// details.
2833fn plan_group_by_expr<'a>(
2834    ecx: &ExprContext,
2835    group_expr: &'a Expr<Aug>,
2836    projection: &'a [(ExpandedSelectItem, ColumnName)],
2837) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2838    let plan_projection = |column: usize| match &projection[column].0 {
2839        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2840        ExpandedSelectItem::Expr(expr) => {
2841            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2842        }
2843    };
2844
2845    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2846    // a special case that means to use the ith item in the SELECT clause.
2847    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2848        return plan_projection(column);
2849    }
2850
2851    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2852    // The `foo` can refer to *either* an input column or an output column. If
2853    // both exist, the input column is preferred.
2854    match group_expr {
2855        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2856            Err(PlanError::UnknownColumn {
2857                table: None,
2858                column,
2859                similar,
2860            }) => {
2861                // The expression was a simple identifier that did not match an
2862                // input column. See if it matches an output column.
2863                let mut iter = projection.iter().map(|(_expr, name)| name);
2864                if let Some(i) = iter.position(|n| *n == column) {
2865                    if iter.any(|n| *n == column) {
2866                        Err(PlanError::AmbiguousColumn(column))
2867                    } else {
2868                        plan_projection(i)
2869                    }
2870                } else {
2871                    // The name didn't match an output column either. Return the
2872                    // "unknown column" error.
2873                    Err(PlanError::UnknownColumn {
2874                        table: None,
2875                        column,
2876                        similar,
2877                    })
2878                }
2879            }
2880            res => Ok((Some(group_expr), res?)),
2881        },
2882        _ => Ok((
2883            Some(group_expr),
2884            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2885        )),
2886    }
2887}
2888
2889/// Plans a slice of `ORDER BY` expressions.
2890///
2891/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2892/// parameter.
2893///
2894/// Returns the determined column orderings and a list of scalar expressions
2895/// that must be mapped onto the underlying relation expression.
2896pub(crate) fn plan_order_by_exprs(
2897    ecx: &ExprContext,
2898    order_by_exprs: &[OrderByExpr<Aug>],
2899    output_columns: &[(usize, &ColumnName)],
2900) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2901    let mut order_by = vec![];
2902    let mut map_exprs = vec![];
2903    for obe in order_by_exprs {
2904        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2905        // If the expression is a reference to an existing column,
2906        // do not introduce a new column to support it.
2907        let column = match expr {
2908            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2909            _ => {
2910                map_exprs.push(expr);
2911                ecx.relation_type.arity() + map_exprs.len() - 1
2912            }
2913        };
2914        order_by.push(resolve_desc_and_nulls_last(obe, column));
2915    }
2916    Ok((order_by, map_exprs))
2917}
2918
2919/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2920///
2921/// The `output_columns` parameter describes, in order, the physical index and
2922/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2923/// corresponds to a `SELECT` list with a single entry named "a" that can be
2924/// found at index 3 in the underlying relation expression.
2925///
2926/// There are three cases to handle.
2927///
2928///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2929///       reference to the specified output column.
2930///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2931///       an output column, if it exists; otherwise it is a reference to an
2932///       input column.
2933///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2934///       arbitrary expressions exclusively refer to input columns, never output
2935///       columns.
2936fn plan_order_by_or_distinct_expr(
2937    ecx: &ExprContext,
2938    expr: &Expr<Aug>,
2939    output_columns: &[(usize, &ColumnName)],
2940) -> Result<HirScalarExpr, PlanError> {
2941    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2942        return Ok(HirScalarExpr::column(output_columns[i].0));
2943    }
2944
2945    if let Expr::Identifier(names) = expr {
2946        if let [name] = &names[..] {
2947            let name = normalize::column_name(name.clone());
2948            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2949            if let Some((i, _)) = iter.next() {
2950                match iter.next() {
2951                    // Per SQL92, names are not considered ambiguous if they
2952                    // refer to identical target list expressions, as in
2953                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2954                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2955                    _ => return Ok(HirScalarExpr::column(*i)),
2956                }
2957            }
2958        }
2959    }
2960
2961    plan_expr(ecx, expr)?.type_as_any(ecx)
2962}
2963
2964fn plan_table_with_joins(
2965    qcx: &QueryContext,
2966    table_with_joins: &TableWithJoins<Aug>,
2967) -> Result<(HirRelationExpr, Scope), PlanError> {
2968    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2969    for join in &table_with_joins.joins {
2970        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2971        expr = new_expr;
2972        scope = new_scope;
2973    }
2974    Ok((expr, scope))
2975}
2976
2977fn plan_table_factor(
2978    qcx: &QueryContext,
2979    table_factor: &TableFactor<Aug>,
2980) -> Result<(HirRelationExpr, Scope), PlanError> {
2981    match table_factor {
2982        TableFactor::Table { name, alias } => {
2983            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2984            let scope = plan_table_alias(scope, alias.as_ref())?;
2985            Ok((expr, scope))
2986        }
2987
2988        TableFactor::Function {
2989            function,
2990            alias,
2991            with_ordinality,
2992        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2993
2994        TableFactor::RowsFrom {
2995            functions,
2996            alias,
2997            with_ordinality,
2998        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2999
3000        TableFactor::Derived {
3001            lateral,
3002            subquery,
3003            alias,
3004        } => {
3005            let mut qcx = (*qcx).clone();
3006            if !lateral {
3007                // Since this derived table was not marked as `LATERAL`,
3008                // make elements in outer scopes invisible until we reach the
3009                // next lateral barrier.
3010                for scope in &mut qcx.outer_scopes {
3011                    if scope.lateral_barrier {
3012                        break;
3013                    }
3014                    scope.items.clear();
3015                }
3016            }
3017            qcx.outer_scopes[0].lateral_barrier = true;
3018            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3019            let scope = plan_table_alias(scope, alias.as_ref())?;
3020            Ok((expr, scope))
3021        }
3022
3023        TableFactor::NestedJoin { join, alias } => {
3024            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3025            let scope = plan_table_alias(scope, alias.as_ref())?;
3026            Ok((expr, scope))
3027        }
3028    }
3029}
3030
3031/// Plans a `ROWS FROM` expression.
3032///
3033/// `ROWS FROM` concatenates table functions into a single table, filling in
3034/// `NULL`s in places where one table function has fewer rows than another. We
3035/// can achieve this by augmenting each table function with a row number, doing
3036/// a `FULL JOIN` between each table function on the row number and eventually
3037/// projecting away the row number columns. Concretely, the following query
3038/// using `ROWS FROM`
3039///
3040/// ```sql
3041/// SELECT
3042///     *
3043/// FROM
3044///     ROWS FROM (
3045///         generate_series(1, 2),
3046///         information_schema._pg_expandarray(ARRAY[9]),
3047///         generate_series(3, 6)
3048///     );
3049/// ```
3050///
3051/// is equivalent to the following query that does not use `ROWS FROM`:
3052///
3053/// ```sql
3054/// SELECT
3055///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3056/// FROM
3057///     generate_series(1, 2) WITH ORDINALITY AS gs1
3058///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3059///         ON gs1.ordinality = expand.ordinality
3060///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3061///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3062/// ```
3063///
3064/// Note the call to `coalesce` in the last join condition, which ensures that
3065/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3066///
3067/// This function creates a HirRelationExpr that follows the structure of the
3068/// latter query.
3069///
3070/// `with_ordinality` can be used to have the output expression contain a
3071/// single coalesced ordinality column at the end of the entire expression.
3072fn plan_rows_from(
3073    qcx: &QueryContext,
3074    functions: &[Function<Aug>],
3075    alias: Option<&TableAlias>,
3076    with_ordinality: bool,
3077) -> Result<(HirRelationExpr, Scope), PlanError> {
3078    // If there's only a single table function, planning proceeds as if `ROWS
3079    // FROM` hadn't been written at all.
3080    if let [function] = functions {
3081        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3082    }
3083
3084    // Per PostgreSQL, all scope items take the name of the first function
3085    // (unless aliased).
3086    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3087    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3088        qcx,
3089        functions,
3090        Some(functions[0].name.full_item_name().clone()),
3091    )?;
3092
3093    // Columns tracks the set of columns we will keep in the projection.
3094    let mut columns = Vec::new();
3095    let mut offset = 0;
3096    // Retain table function's non-ordinality columns.
3097    for (idx, cols) in num_cols.into_iter().enumerate() {
3098        for i in 0..cols {
3099            columns.push(offset + i);
3100        }
3101        offset += cols + 1;
3102
3103        // Remove the ordinality column from the scope, accounting for previous scope
3104        // changes from this loop.
3105        scope.items.remove(offset - idx - 1);
3106    }
3107
3108    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3109    // column. Otherwise remove it from the scope.
3110    if with_ordinality {
3111        columns.push(offset);
3112    } else {
3113        scope.items.pop();
3114    }
3115
3116    let expr = expr.project(columns);
3117
3118    let scope = plan_table_alias(scope, alias)?;
3119    Ok((expr, scope))
3120}
3121
3122/// Plans an expression coalescing multiple table functions. Each table
3123/// function is followed by its row ordinality. The entire expression is
3124/// followed by the coalesced row ordinality.
3125///
3126/// The returned Scope will set all item's table_name's to the `table_name`
3127/// parameter if it is `Some`. If `None`, they will be the name of each table
3128/// function.
3129///
3130/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3131/// each table function.
3132///
3133/// For example, with table functions tf1 returning 1 column (a) and tf2
3134/// returning 2 columns (b, c), this function will return an expr 6 columns:
3135///
3136/// - tf1.a
3137/// - tf1.ordinality
3138/// - tf2.b
3139/// - tf2.c
3140/// - tf2.ordinality
3141/// - coalesced_ordinality
3142///
3143/// And a `Vec<usize>` of `[1, 2]`.
3144fn plan_rows_from_internal<'a>(
3145    qcx: &QueryContext,
3146    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3147    table_name: Option<FullItemName>,
3148) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3149    let mut functions = functions.into_iter();
3150    let mut num_cols = Vec::new();
3151
3152    // Join together each of the table functions in turn. The last column is
3153    // always the column to join against and is maintained to be the coalescence
3154    // of the row number column for all prior functions.
3155    let (mut left_expr, mut left_scope) =
3156        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3157    num_cols.push(left_scope.len() - 1);
3158    // Create the coalesced ordinality column.
3159    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3160    left_scope
3161        .items
3162        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3163
3164    for function in functions {
3165        // The right hand side of a join must be planned in a new scope.
3166        let qcx = qcx.empty_derived_context();
3167        let (right_expr, mut right_scope) =
3168            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3169        num_cols.push(right_scope.len() - 1);
3170        let left_col = left_scope.len() - 1;
3171        let right_col = left_scope.len() + right_scope.len() - 1;
3172        let on = HirScalarExpr::call_binary(
3173            HirScalarExpr::column(left_col),
3174            HirScalarExpr::column(right_col),
3175            expr_func::Eq,
3176        );
3177        left_expr = left_expr
3178            .join(right_expr, on, JoinKind::FullOuter)
3179            .map(vec![HirScalarExpr::call_variadic(
3180                VariadicFunc::Coalesce,
3181                vec![
3182                    HirScalarExpr::column(left_col),
3183                    HirScalarExpr::column(right_col),
3184                ],
3185            )]);
3186
3187        // Project off the previous iteration's coalesced column, but keep both of this
3188        // iteration's ordinality columns.
3189        left_expr = left_expr.project(
3190            (0..left_col) // non-coalesced ordinality columns from left function
3191                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3192                .collect(),
3193        );
3194        // Move the coalesced ordinality column.
3195        right_scope.items.push(left_scope.items.pop().unwrap());
3196
3197        left_scope.items.extend(right_scope.items);
3198    }
3199
3200    Ok((left_expr, left_scope, num_cols))
3201}
3202
3203/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3204/// FROM` clause that contains other table functions. Special aliasing rules
3205/// apply.
3206fn plan_solitary_table_function(
3207    qcx: &QueryContext,
3208    function: &Function<Aug>,
3209    alias: Option<&TableAlias>,
3210    with_ordinality: bool,
3211) -> Result<(HirRelationExpr, Scope), PlanError> {
3212    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3213
3214    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3215    if single_column_function {
3216        let item = &mut scope.items[0];
3217
3218        // Mark that the function only produced a single column. This impacts
3219        // whole-row references.
3220        item.from_single_column_function = true;
3221
3222        // Strange special case for solitary table functions that output one
3223        // column whose name matches the name of the table function. If a table
3224        // alias is provided, the column name is changed to the table alias's
3225        // name. Concretely, the following query returns a column named `x`
3226        // rather than a column named `generate_series`:
3227        //
3228        //     SELECT * FROM generate_series(1, 5) AS x
3229        //
3230        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3231        // since its output column is explicitly named `value`, not
3232        // `jsonb_array_elements`.
3233        //
3234        // Note also that we may (correctly) change the column name again when
3235        // we plan the table alias below if the `alias.columns` is non-empty.
3236        if let Some(alias) = alias {
3237            if let ScopeItem {
3238                table_name: Some(table_name),
3239                column_name,
3240                ..
3241            } = item
3242            {
3243                if table_name.item.as_str() == column_name.as_str() {
3244                    *column_name = normalize::column_name(alias.name.clone());
3245                }
3246            }
3247        }
3248    }
3249
3250    let scope = plan_table_alias(scope, alias)?;
3251    Ok((expr, scope))
3252}
3253
3254/// Plans a table function.
3255///
3256/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3257/// instead to get the appropriate aliasing behavior.
3258fn plan_table_function_internal(
3259    qcx: &QueryContext,
3260    Function {
3261        name,
3262        args,
3263        filter,
3264        over,
3265        distinct,
3266    }: &Function<Aug>,
3267    with_ordinality: bool,
3268    table_name: Option<FullItemName>,
3269) -> Result<(HirRelationExpr, Scope), PlanError> {
3270    assert_none!(filter, "cannot parse table function with FILTER");
3271    assert_none!(over, "cannot parse table function with OVER");
3272    assert!(!*distinct, "cannot parse table function with DISTINCT");
3273
3274    let ecx = &ExprContext {
3275        qcx,
3276        name: "table function arguments",
3277        scope: &Scope::empty(),
3278        relation_type: &SqlRelationType::empty(),
3279        allow_aggregates: false,
3280        allow_subqueries: true,
3281        allow_parameters: true,
3282        allow_windows: false,
3283    };
3284
3285    let scalar_args = match args {
3286        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3287        FunctionArgs::Args { args, order_by } => {
3288            if !order_by.is_empty() {
3289                sql_bail!(
3290                    "ORDER BY specified, but {} is not an aggregate function",
3291                    name
3292                );
3293            }
3294            plan_exprs(ecx, args)?
3295        }
3296    };
3297
3298    let table_name = match table_name {
3299        Some(table_name) => table_name.item,
3300        None => name.full_item_name().item.clone(),
3301    };
3302
3303    let scope_name = Some(PartialItemName {
3304        database: None,
3305        schema: None,
3306        item: table_name,
3307    });
3308
3309    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3310        Func::Table(impls) => {
3311            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3312            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3313            let expr = match tf.imp {
3314                TableFuncImpl::CallTable { mut func, exprs } => {
3315                    if with_ordinality {
3316                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3317                            PlanError::Unsupported {
3318                                feature: format!("WITH ORDINALITY on {}", func),
3319                                discussion_no: None,
3320                            },
3321                        )?;
3322                    }
3323                    HirRelationExpr::CallTable { func, exprs }
3324                }
3325                TableFuncImpl::Expr(expr) => {
3326                    if !with_ordinality {
3327                        expr
3328                    } else {
3329                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3330                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3331                        // back to the legacy implementation or error out the query.
3332                        if qcx
3333                            .scx
3334                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3335                        {
3336                            // Note that this can give an incorrect ordering, and also has an extreme
3337                            // performance problem in some cases. See the doc comment of
3338                            // `TableFuncImpl`.
3339                            tracing::error!(
3340                                %name,
3341                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3342                            );
3343                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3344                                func: WindowExprType::Scalar(ScalarWindowExpr {
3345                                    func: ScalarWindowFunc::RowNumber,
3346                                    order_by: vec![],
3347                                }),
3348                                partition_by: vec![],
3349                                order_by: vec![],
3350                            })])
3351                        } else {
3352                            bail_unsupported!(format!(
3353                                "WITH ORDINALITY or ROWS FROM with {}",
3354                                name
3355                            ));
3356                        }
3357                    }
3358                }
3359            };
3360            (expr, scope)
3361        }
3362        Func::Scalar(impls) => {
3363            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3364            let output = expr.typ(
3365                &qcx.outer_relation_types,
3366                &SqlRelationType::new(vec![]),
3367                &qcx.scx.param_types.borrow(),
3368            );
3369
3370            let relation = SqlRelationType::new(vec![output]);
3371
3372            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3373            let column_name = normalize::column_name(function_ident);
3374            let name = column_name.to_string();
3375
3376            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3377
3378            let mut func = TableFunc::TabletizedScalar { relation, name };
3379            if with_ordinality {
3380                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3381                    feature: format!("WITH ORDINALITY on {}", func),
3382                    discussion_no: None,
3383                })?;
3384            }
3385            (
3386                HirRelationExpr::CallTable {
3387                    func,
3388                    exprs: vec![expr],
3389                },
3390                scope,
3391            )
3392        }
3393        o => sql_bail!(
3394            "{} functions are not supported in functions in FROM",
3395            o.class()
3396        ),
3397    };
3398
3399    if with_ordinality {
3400        scope
3401            .items
3402            .push(ScopeItem::from_name(scope_name, "ordinality"));
3403    }
3404
3405    Ok((expr, scope))
3406}
3407
3408fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3409    if let Some(TableAlias {
3410        name,
3411        columns,
3412        strict,
3413    }) = alias
3414    {
3415        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3416            sql_bail!(
3417                "{} has {} columns available but {} columns specified",
3418                name,
3419                scope.items.len(),
3420                columns.len()
3421            );
3422        }
3423
3424        let table_name = normalize::ident(name.to_owned());
3425        for (i, item) in scope.items.iter_mut().enumerate() {
3426            item.table_name = if item.allow_unqualified_references {
3427                Some(PartialItemName {
3428                    database: None,
3429                    schema: None,
3430                    item: table_name.clone(),
3431                })
3432            } else {
3433                // Columns that prohibit unqualified references are special
3434                // columns from the output of a NATURAL or USING join that can
3435                // only be referenced by their full, pre-join name. Applying an
3436                // alias to the output of that join renders those columns
3437                // inaccessible, which we accomplish here by setting the
3438                // table name to `None`.
3439                //
3440                // Concretely, consider:
3441                //
3442                //      CREATE TABLE t1 (a int);
3443                //      CREATE TABLE t2 (a int);
3444                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3445                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3446                //
3447                // In (1), the join has no alias. The underlying columns from
3448                // either side of the join can be referenced as `t1.a` and
3449                // `t2.a`, respectively, and the unqualified name `a` refers to
3450                // a column whose value is `coalesce(t1.a, t2.a)`.
3451                //
3452                // In (2), the join is aliased as `t`. The columns from either
3453                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3454                // the coalesced column can be named as either `a` or `t.a`.
3455                //
3456                // We previously had a bug [0] that mishandled this subtle
3457                // logic.
3458                //
3459                // NOTE(benesch): We could in theory choose to project away
3460                // those inaccessible columns and drop them from the scope
3461                // entirely, but that would require that this function also
3462                // take and return the `HirRelationExpr` that is being aliased,
3463                // which is a rather large refactor.
3464                //
3465                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3466                None
3467            };
3468            item.column_name = columns
3469                .get(i)
3470                .map(|a| normalize::column_name(a.clone()))
3471                .unwrap_or_else(|| item.column_name.clone());
3472        }
3473    }
3474    Ok(scope)
3475}
3476
3477// `table_func_names` is a mapping from a UUID to the original function
3478// name. The UUIDs are identifiers that have been rewritten from some table
3479// function expression, and this mapping restores the original names.
3480fn invent_column_name(
3481    ecx: &ExprContext,
3482    expr: &Expr<Aug>,
3483    table_func_names: &BTreeMap<String, Ident>,
3484) -> Option<ColumnName> {
3485    // We follow PostgreSQL exactly here, which has some complicated rules
3486    // around "high" and "low" quality names. Low quality names override other
3487    // low quality names but not high quality names.
3488    //
3489    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3490
3491    #[derive(Debug)]
3492    enum NameQuality {
3493        Low,
3494        High,
3495    }
3496
3497    fn invent(
3498        ecx: &ExprContext,
3499        expr: &Expr<Aug>,
3500        table_func_names: &BTreeMap<String, Ident>,
3501    ) -> Option<(ColumnName, NameQuality)> {
3502        match expr {
3503            Expr::Identifier(names) => {
3504                if let [name] = names.as_slice() {
3505                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3506                        return Some((
3507                            normalize::column_name(table_func_name.clone()),
3508                            NameQuality::High,
3509                        ));
3510                    }
3511                }
3512                names
3513                    .last()
3514                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3515            }
3516            Expr::Value(v) => match v {
3517                // Per PostgreSQL, `bool` and `interval` literals take on the name
3518                // of their type, but not other literal types.
3519                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3520                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3521                _ => None,
3522            },
3523            Expr::Function(func) => {
3524                let (schema, item) = match &func.name {
3525                    ResolvedItemName::Item {
3526                        qualifiers,
3527                        full_name,
3528                        ..
3529                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3530                    _ => unreachable!(),
3531                };
3532
3533                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3534                    || schema
3535                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3536                {
3537                    None
3538                } else {
3539                    Some((item.into(), NameQuality::High))
3540                }
3541            }
3542            Expr::HomogenizingFunction { function, .. } => Some((
3543                function.to_string().to_lowercase().into(),
3544                NameQuality::High,
3545            )),
3546            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3547            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3548            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3549            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3550            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3551                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3552                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3553            },
3554            Expr::Case { else_result, .. } => {
3555                match else_result
3556                    .as_ref()
3557                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3558                {
3559                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3560                    _ => Some(("case".into(), NameQuality::Low)),
3561                }
3562            }
3563            Expr::FieldAccess { field, .. } => {
3564                Some((normalize::column_name(field.clone()), NameQuality::High))
3565            }
3566            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3567            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3568            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3569                // A bit silly to have to plan the query here just to get its column
3570                // name, since we throw away the planned expression, but fixing this
3571                // requires a separate semantic analysis phase.
3572                let (_expr, scope) =
3573                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3574                scope
3575                    .items
3576                    .first()
3577                    .map(|name| (name.column_name.clone(), NameQuality::High))
3578            }
3579            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3580            _ => None,
3581        }
3582    }
3583
3584    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3585}
3586
3587#[derive(Debug)]
3588enum ExpandedSelectItem<'a> {
3589    InputOrdinal(usize),
3590    Expr(Cow<'a, Expr<Aug>>),
3591}
3592
3593impl ExpandedSelectItem<'_> {
3594    fn as_expr(&self) -> Option<&Expr<Aug>> {
3595        match self {
3596            ExpandedSelectItem::InputOrdinal(_) => None,
3597            ExpandedSelectItem::Expr(expr) => Some(expr),
3598        }
3599    }
3600}
3601
3602fn expand_select_item<'a>(
3603    ecx: &ExprContext,
3604    s: &'a SelectItem<Aug>,
3605    table_func_names: &BTreeMap<String, Ident>,
3606) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3607    match s {
3608        SelectItem::Expr {
3609            expr: Expr::QualifiedWildcard(table_name),
3610            alias: _,
3611        } => {
3612            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3613            let table_name =
3614                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3615            let out: Vec<_> = ecx
3616                .scope
3617                .items
3618                .iter()
3619                .enumerate()
3620                .filter(|(_i, item)| item.is_from_table(&table_name))
3621                .map(|(i, item)| {
3622                    let name = item.column_name.clone();
3623                    (ExpandedSelectItem::InputOrdinal(i), name)
3624                })
3625                .collect();
3626            if out.is_empty() {
3627                sql_bail!("no table named '{}' in scope", table_name);
3628            }
3629            Ok(out)
3630        }
3631        SelectItem::Expr {
3632            expr: Expr::WildcardAccess(sql_expr),
3633            alias: _,
3634        } => {
3635            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3636            // A bit silly to have to plan the expression here just to get its
3637            // type, since we throw away the planned expression, but fixing this
3638            // requires a separate semantic analysis phase. Luckily this is an
3639            // uncommon operation and the PostgreSQL docs have a warning that
3640            // this operation is slow in Postgres too.
3641            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3642            let fields = match ecx.scalar_type(&expr) {
3643                SqlScalarType::Record { fields, .. } => fields,
3644                ty => sql_bail!(
3645                    "type {} is not composite",
3646                    ecx.humanize_scalar_type(&ty, false)
3647                ),
3648            };
3649            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3650            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3651                if let [name] = ident.as_slice() {
3652                    if let Ok(items) = ecx.scope.items_from_table(
3653                        &[],
3654                        &PartialItemName {
3655                            database: None,
3656                            schema: None,
3657                            item: name.as_str().to_string(),
3658                        },
3659                    ) {
3660                        for (_, item) in items {
3661                            if item
3662                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3663                            {
3664                                skip_cols.insert(item.column_name.clone());
3665                            }
3666                        }
3667                    }
3668                }
3669            }
3670            let items = fields
3671                .iter()
3672                .filter_map(|(name, _ty)| {
3673                    if skip_cols.contains(name) {
3674                        None
3675                    } else {
3676                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3677                            expr: sql_expr.clone(),
3678                            field: name.clone().into(),
3679                        }));
3680                        Some((item, name.clone()))
3681                    }
3682                })
3683                .collect();
3684            Ok(items)
3685        }
3686        SelectItem::Wildcard => {
3687            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3688            let items: Vec<_> = ecx
3689                .scope
3690                .items
3691                .iter()
3692                .enumerate()
3693                .filter(|(_i, item)| item.allow_unqualified_references)
3694                .map(|(i, item)| {
3695                    let name = item.column_name.clone();
3696                    (ExpandedSelectItem::InputOrdinal(i), name)
3697                })
3698                .collect();
3699
3700            Ok(items)
3701        }
3702        SelectItem::Expr { expr, alias } => {
3703            let name = alias
3704                .clone()
3705                .map(normalize::column_name)
3706                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3707                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3708            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3709        }
3710    }
3711}
3712
3713fn plan_join(
3714    left_qcx: &QueryContext,
3715    left: HirRelationExpr,
3716    left_scope: Scope,
3717    join: &Join<Aug>,
3718) -> Result<(HirRelationExpr, Scope), PlanError> {
3719    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3720    let (kind, constraint) = match &join.join_operator {
3721        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3722        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3723        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3724        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3725        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3726    };
3727
3728    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3729    if !kind.can_be_correlated() {
3730        for item in &mut right_qcx.outer_scopes[0].items {
3731            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3732            // these items from scope. These items need to *exist* because they
3733            // might shadow variables in outer scopes that would otherwise be
3734            // valid to reference, but accessing them needs to produce an error.
3735            item.error_if_referenced =
3736                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3737                    table: table.cloned(),
3738                    column: column.clone(),
3739                });
3740        }
3741    }
3742    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3743
3744    let (expr, scope) = match constraint {
3745        JoinConstraint::On(expr) => {
3746            let product_scope = left_scope.product(right_scope)?;
3747            let ecx = &ExprContext {
3748                qcx: left_qcx,
3749                name: "ON clause",
3750                scope: &product_scope,
3751                relation_type: &SqlRelationType::new(
3752                    left_qcx
3753                        .relation_type(&left)
3754                        .column_types
3755                        .into_iter()
3756                        .chain(right_qcx.relation_type(&right).column_types)
3757                        .collect(),
3758                ),
3759                allow_aggregates: false,
3760                allow_subqueries: true,
3761                allow_parameters: true,
3762                allow_windows: false,
3763            };
3764            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3765            let joined = left.join(right, on, kind);
3766            (joined, product_scope)
3767        }
3768        JoinConstraint::Using { columns, alias } => {
3769            let column_names = columns
3770                .iter()
3771                .map(|ident| normalize::column_name(ident.clone()))
3772                .collect::<Vec<_>>();
3773
3774            plan_using_constraint(
3775                &column_names,
3776                left_qcx,
3777                left,
3778                left_scope,
3779                &right_qcx,
3780                right,
3781                right_scope,
3782                kind,
3783                alias.as_ref(),
3784            )?
3785        }
3786        JoinConstraint::Natural => {
3787            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3788            // have the same scx. However, it doesn't hurt to be safe.
3789            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3790            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3791            let left_column_names = left_scope.column_names();
3792            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3793            let column_names: Vec<_> = left_column_names
3794                .filter(|col| right_column_names.contains(col))
3795                .cloned()
3796                .collect();
3797            plan_using_constraint(
3798                &column_names,
3799                left_qcx,
3800                left,
3801                left_scope,
3802                &right_qcx,
3803                right,
3804                right_scope,
3805                kind,
3806                None,
3807            )?
3808        }
3809    };
3810    Ok((expr, scope))
3811}
3812
3813// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3814#[allow(clippy::too_many_arguments)]
3815fn plan_using_constraint(
3816    column_names: &[ColumnName],
3817    left_qcx: &QueryContext,
3818    left: HirRelationExpr,
3819    left_scope: Scope,
3820    right_qcx: &QueryContext,
3821    right: HirRelationExpr,
3822    right_scope: Scope,
3823    kind: JoinKind,
3824    alias: Option<&Ident>,
3825) -> Result<(HirRelationExpr, Scope), PlanError> {
3826    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3827
3828    // Cargo culting PG here; no discernable reason this must fail, but PG does
3829    // so we do, as well.
3830    let mut unique_column_names = BTreeSet::new();
3831    for c in column_names {
3832        if !unique_column_names.insert(c) {
3833            return Err(PlanError::Unsupported {
3834                feature: format!(
3835                    "column name {} appears more than once in USING clause",
3836                    c.quoted()
3837                ),
3838                discussion_no: None,
3839            });
3840        }
3841    }
3842
3843    let alias_item_name = alias.map(|alias| PartialItemName {
3844        database: None,
3845        schema: None,
3846        item: alias.clone().to_string(),
3847    });
3848
3849    if let Some(alias_item_name) = &alias_item_name {
3850        for partial_item_name in both_scope.table_names() {
3851            if partial_item_name.matches(alias_item_name) {
3852                sql_bail!(
3853                    "table name \"{}\" specified more than once",
3854                    alias_item_name
3855                )
3856            }
3857        }
3858    }
3859
3860    let ecx = &ExprContext {
3861        qcx: right_qcx,
3862        name: "USING clause",
3863        scope: &both_scope,
3864        relation_type: &SqlRelationType::new(
3865            left_qcx
3866                .relation_type(&left)
3867                .column_types
3868                .into_iter()
3869                .chain(right_qcx.relation_type(&right).column_types)
3870                .collect(),
3871        ),
3872        allow_aggregates: false,
3873        allow_subqueries: false,
3874        allow_parameters: false,
3875        allow_windows: false,
3876    };
3877
3878    let mut join_exprs = vec![];
3879    let mut map_exprs = vec![];
3880    let mut new_items = vec![];
3881    let mut join_cols = vec![];
3882    let mut hidden_cols = vec![];
3883
3884    for column_name in column_names {
3885        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3886        let (lhs, lhs_name) = left_scope.resolve_using_column(
3887            column_name,
3888            JoinSide::Left,
3889            &mut left_qcx.name_manager.borrow_mut(),
3890        )?;
3891        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3892            column_name,
3893            JoinSide::Right,
3894            &mut right_qcx.name_manager.borrow_mut(),
3895        )?;
3896
3897        // Adjust the RHS reference to its post-join location.
3898        rhs.column += left_scope.len();
3899
3900        // Join keys must be resolved to same type.
3901        let mut exprs = coerce_homogeneous_exprs(
3902            &ecx.with_name(&format!(
3903                "NATURAL/USING join column {}",
3904                column_name.quoted()
3905            )),
3906            vec![
3907                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3908                    lhs,
3909                    Arc::clone(&lhs_name),
3910                )),
3911                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3912                    rhs,
3913                    Arc::clone(&rhs_name),
3914                )),
3915            ],
3916            None,
3917        )?;
3918        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3919
3920        match kind {
3921            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3922                join_cols.push(lhs.column);
3923                hidden_cols.push(rhs.column);
3924            }
3925            JoinKind::RightOuter => {
3926                join_cols.push(rhs.column);
3927                hidden_cols.push(lhs.column);
3928            }
3929            JoinKind::FullOuter => {
3930                // Create a new column that will be the coalesced value of left
3931                // and right.
3932                join_cols.push(both_scope.items.len() + map_exprs.len());
3933                hidden_cols.push(lhs.column);
3934                hidden_cols.push(rhs.column);
3935                map_exprs.push(HirScalarExpr::call_variadic(
3936                    VariadicFunc::Coalesce,
3937                    vec![expr1.clone(), expr2.clone()],
3938                ));
3939                new_items.push(ScopeItem::from_column_name(column_name));
3940            }
3941        }
3942
3943        // If a `join_using_alias` is present, add a new scope item that accepts
3944        // only table-qualified references for each specified join column.
3945        // Unlike regular table aliases, a `join_using_alias` should not hide the
3946        // names of the joined relations.
3947        if alias_item_name.is_some() {
3948            let new_item_col = both_scope.items.len() + new_items.len();
3949            join_cols.push(new_item_col);
3950            hidden_cols.push(new_item_col);
3951
3952            new_items.push(ScopeItem::from_name(
3953                alias_item_name.clone(),
3954                column_name.clone().to_string(),
3955            ));
3956
3957            // Should be safe to use either `lhs` or `rhs` here since the column
3958            // is available in both scopes and must have the same type of the new item.
3959            // We (arbitrarily) choose the left name.
3960            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3961        }
3962
3963        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3964    }
3965    both_scope.items.extend(new_items);
3966
3967    // The columns from the secondary side of the join remain accessible by
3968    // their table-qualified name, but not by their column name alone. They are
3969    // also excluded from `SELECT *`.
3970    for c in hidden_cols {
3971        both_scope.items[c].allow_unqualified_references = false;
3972    }
3973
3974    // Reproject all returned elements to the front of the list.
3975    let project_key = join_cols
3976        .into_iter()
3977        .chain(0..both_scope.items.len())
3978        .unique()
3979        .collect::<Vec<_>>();
3980
3981    both_scope = both_scope.project(&project_key);
3982
3983    let on = HirScalarExpr::variadic_and(join_exprs);
3984
3985    let both = left
3986        .join(right, on, kind)
3987        .map(map_exprs)
3988        .project(project_key);
3989    Ok((both, both_scope))
3990}
3991
3992pub fn plan_expr<'a>(
3993    ecx: &'a ExprContext,
3994    e: &Expr<Aug>,
3995) -> Result<CoercibleScalarExpr, PlanError> {
3996    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3997}
3998
3999fn plan_expr_inner<'a>(
4000    ecx: &'a ExprContext,
4001    e: &Expr<Aug>,
4002) -> Result<CoercibleScalarExpr, PlanError> {
4003    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4004        // We've already calculated this expression.
4005        return Ok(HirScalarExpr::named_column(
4006            i,
4007            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4008        )
4009        .into());
4010    }
4011
4012    match e {
4013        // Names.
4014        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4015            Ok(plan_identifier(ecx, names)?.into())
4016        }
4017
4018        // Literals.
4019        Expr::Value(val) => plan_literal(val),
4020        Expr::Parameter(n) => plan_parameter(ecx, *n),
4021        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4022        Expr::List(exprs) => plan_list(ecx, exprs, None),
4023        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4024        Expr::Row { exprs } => plan_row(ecx, exprs),
4025
4026        // Generalized functions, operators, and casts.
4027        Expr::Op { op, expr1, expr2 } => {
4028            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4029        }
4030        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4031        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4032
4033        // Special functions and operators.
4034        Expr::Not { expr } => plan_not(ecx, expr),
4035        Expr::And { left, right } => plan_and(ecx, left, right),
4036        Expr::Or { left, right } => plan_or(ecx, left, right),
4037        Expr::IsExpr {
4038            expr,
4039            construct,
4040            negated,
4041        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4042        Expr::Case {
4043            operand,
4044            conditions,
4045            results,
4046            else_result,
4047        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4048        Expr::HomogenizingFunction { function, exprs } => {
4049            plan_homogenizing_function(ecx, function, exprs)
4050        }
4051        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4052            ecx,
4053            &None,
4054            &[l_expr.clone().equals(*r_expr.clone())],
4055            &[Expr::null()],
4056            &Some(Box::new(*l_expr.clone())),
4057        )?
4058        .into()),
4059        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4060        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4061        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4062        Expr::Like {
4063            expr,
4064            pattern,
4065            escape,
4066            case_insensitive,
4067            negated,
4068        } => Ok(plan_like(
4069            ecx,
4070            expr,
4071            pattern,
4072            escape.as_deref(),
4073            *case_insensitive,
4074            *negated,
4075        )?
4076        .into()),
4077
4078        Expr::InList {
4079            expr,
4080            list,
4081            negated,
4082        } => plan_in_list(ecx, expr, list, negated),
4083
4084        // Subqueries.
4085        Expr::Exists(query) => plan_exists(ecx, query),
4086        Expr::Subquery(query) => plan_subquery(ecx, query),
4087        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4088        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4089        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4090        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4091        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4092        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4093        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4094        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4095        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4096        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4097        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4098    }
4099}
4100
4101fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4102    if !ecx.allow_parameters {
4103        // It might be clearer to return an error like "cannot use parameter
4104        // here", but this is how PostgreSQL does it, and so for now we follow
4105        // PostgreSQL.
4106        return Err(PlanError::UnknownParameter(n));
4107    }
4108    if n == 0 || n > 65536 {
4109        return Err(PlanError::UnknownParameter(n));
4110    }
4111    if ecx.param_types().borrow().contains_key(&n) {
4112        Ok(HirScalarExpr::parameter(n).into())
4113    } else {
4114        Ok(CoercibleScalarExpr::Parameter(n))
4115    }
4116}
4117
4118fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4119    let mut out = vec![];
4120    for e in exprs {
4121        out.push(plan_expr(ecx, e)?);
4122    }
4123    Ok(CoercibleScalarExpr::LiteralRecord(out))
4124}
4125
4126fn plan_cast(
4127    ecx: &ExprContext,
4128    expr: &Expr<Aug>,
4129    data_type: &ResolvedDataType,
4130) -> Result<CoercibleScalarExpr, PlanError> {
4131    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4132    let expr = match expr {
4133        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4134        // we can pass in the target type as a type hint. This is
4135        // a limited form of the coercion that we do for string literals
4136        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4137        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4138        // PostgreSQL compatibility trouble.
4139        //
4140        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4141        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4142        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4143        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4144        _ => plan_expr(ecx, expr)?,
4145    };
4146    let ecx = &ecx.with_name("CAST");
4147    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4148    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4149    Ok(expr.into())
4150}
4151
4152fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4153    let ecx = ecx.with_name("NOT argument");
4154    Ok(plan_expr(&ecx, expr)?
4155        .type_as(&ecx, &SqlScalarType::Bool)?
4156        .call_unary(UnaryFunc::Not(expr_func::Not))
4157        .into())
4158}
4159
4160fn plan_and(
4161    ecx: &ExprContext,
4162    left: &Expr<Aug>,
4163    right: &Expr<Aug>,
4164) -> Result<CoercibleScalarExpr, PlanError> {
4165    let ecx = ecx.with_name("AND argument");
4166    Ok(HirScalarExpr::variadic_and(vec![
4167        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4168        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4169    ])
4170    .into())
4171}
4172
4173fn plan_or(
4174    ecx: &ExprContext,
4175    left: &Expr<Aug>,
4176    right: &Expr<Aug>,
4177) -> Result<CoercibleScalarExpr, PlanError> {
4178    let ecx = ecx.with_name("OR argument");
4179    Ok(HirScalarExpr::variadic_or(vec![
4180        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4181        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4182    ])
4183    .into())
4184}
4185
4186fn plan_in_list(
4187    ecx: &ExprContext,
4188    lhs: &Expr<Aug>,
4189    list: &Vec<Expr<Aug>>,
4190    negated: &bool,
4191) -> Result<CoercibleScalarExpr, PlanError> {
4192    let ecx = ecx.with_name("IN list");
4193    let or = HirScalarExpr::variadic_or(
4194        list.into_iter()
4195            .map(|e| {
4196                let eq = lhs.clone().equals(e.clone());
4197                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4198            })
4199            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4200    );
4201    Ok(if *negated {
4202        or.call_unary(UnaryFunc::Not(expr_func::Not))
4203    } else {
4204        or
4205    }
4206    .into())
4207}
4208
4209fn plan_homogenizing_function(
4210    ecx: &ExprContext,
4211    function: &HomogenizingFunction,
4212    exprs: &[Expr<Aug>],
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4215    let expr = HirScalarExpr::call_variadic(
4216        match function {
4217            HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4218            HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4219            HomogenizingFunction::Least => VariadicFunc::Least,
4220        },
4221        coerce_homogeneous_exprs(
4222            &ecx.with_name(&function.to_string().to_lowercase()),
4223            plan_exprs(ecx, exprs)?,
4224            None,
4225        )?,
4226    );
4227    Ok(expr.into())
4228}
4229
4230fn plan_field_access(
4231    ecx: &ExprContext,
4232    expr: &Expr<Aug>,
4233    field: &Ident,
4234) -> Result<CoercibleScalarExpr, PlanError> {
4235    let field = normalize::column_name(field.clone());
4236    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4237    let ty = ecx.scalar_type(&expr);
4238    let i = match &ty {
4239        SqlScalarType::Record { fields, .. } => {
4240            fields.iter().position(|(name, _ty)| *name == field)
4241        }
4242        ty => sql_bail!(
4243            "column notation applied to type {}, which is not a composite type",
4244            ecx.humanize_scalar_type(ty, false)
4245        ),
4246    };
4247    match i {
4248        None => sql_bail!(
4249            "field {} not found in data type {}",
4250            field,
4251            ecx.humanize_scalar_type(&ty, false)
4252        ),
4253        Some(i) => Ok(expr
4254            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4255            .into()),
4256    }
4257}
4258
4259fn plan_subscript(
4260    ecx: &ExprContext,
4261    expr: &Expr<Aug>,
4262    positions: &[SubscriptPosition<Aug>],
4263) -> Result<CoercibleScalarExpr, PlanError> {
4264    assert!(
4265        !positions.is_empty(),
4266        "subscript expression must contain at least one position"
4267    );
4268
4269    let ecx = &ecx.with_name("subscripting");
4270    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4271    let ty = ecx.scalar_type(&expr);
4272    match &ty {
4273        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4274            ecx,
4275            expr,
4276            positions,
4277            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4278            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4279            // track in its backing data).
4280            if ty == SqlScalarType::Int2Vector {
4281                1
4282            } else {
4283                0
4284            },
4285        ),
4286        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4287        SqlScalarType::List { element_type, .. } => {
4288            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4289            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4290            let n_layers = ty.unwrap_list_n_layers();
4291            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4292        }
4293        ty => sql_bail!(
4294            "cannot subscript type {}",
4295            ecx.humanize_scalar_type(ty, false)
4296        ),
4297    }
4298}
4299
4300// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4301// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4302// any were slices (i.e. included colon).
4303fn extract_scalar_subscript_from_positions<'a>(
4304    positions: &'a [SubscriptPosition<Aug>],
4305    expr_type_name: &str,
4306) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4307    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4308    for p in positions {
4309        if p.explicit_slice {
4310            sql_bail!("{} subscript does not support slices", expr_type_name);
4311        }
4312        assert!(
4313            p.end.is_none(),
4314            "index-appearing subscripts cannot have end value"
4315        );
4316        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4317    }
4318    Ok(scalar_subscripts)
4319}
4320
4321fn plan_subscript_array(
4322    ecx: &ExprContext,
4323    expr: HirScalarExpr,
4324    positions: &[SubscriptPosition<Aug>],
4325    offset: i64,
4326) -> Result<CoercibleScalarExpr, PlanError> {
4327    let mut exprs = Vec::with_capacity(positions.len() + 1);
4328    exprs.push(expr);
4329
4330    // Subscripting arrays doesn't yet support slicing, so we always want to
4331    // extract scalars or error.
4332    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4333
4334    for i in indexes {
4335        exprs.push(plan_expr(ecx, i)?.cast_to(
4336            ecx,
4337            CastContext::Explicit,
4338            &SqlScalarType::Int64,
4339        )?);
4340    }
4341
4342    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4343}
4344
4345fn plan_subscript_list(
4346    ecx: &ExprContext,
4347    mut expr: HirScalarExpr,
4348    positions: &[SubscriptPosition<Aug>],
4349    mut remaining_layers: usize,
4350    elem_type_name: &str,
4351) -> Result<CoercibleScalarExpr, PlanError> {
4352    let mut i = 0;
4353
4354    while i < positions.len() {
4355        // Take all contiguous index operations, i.e. find next slice operation.
4356        let j = positions[i..]
4357            .iter()
4358            .position(|p| p.explicit_slice)
4359            .unwrap_or(positions.len() - i);
4360        if j != 0 {
4361            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4362            let (n, e) = plan_index_list(
4363                ecx,
4364                expr,
4365                indexes.as_slice(),
4366                remaining_layers,
4367                elem_type_name,
4368            )?;
4369            remaining_layers = n;
4370            expr = e;
4371            i += j;
4372        }
4373
4374        // Take all contiguous slice operations, i.e. find next index operation.
4375        let j = positions[i..]
4376            .iter()
4377            .position(|p| !p.explicit_slice)
4378            .unwrap_or(positions.len() - i);
4379        if j != 0 {
4380            expr = plan_slice_list(
4381                ecx,
4382                expr,
4383                &positions[i..i + j],
4384                remaining_layers,
4385                elem_type_name,
4386            )?;
4387            i += j;
4388        }
4389    }
4390
4391    Ok(expr.into())
4392}
4393
4394fn plan_index_list(
4395    ecx: &ExprContext,
4396    expr: HirScalarExpr,
4397    indexes: &[&Expr<Aug>],
4398    n_layers: usize,
4399    elem_type_name: &str,
4400) -> Result<(usize, HirScalarExpr), PlanError> {
4401    let depth = indexes.len();
4402
4403    if depth > n_layers {
4404        if n_layers == 0 {
4405            sql_bail!("cannot subscript type {}", elem_type_name)
4406        } else {
4407            sql_bail!(
4408                "cannot index into {} layers; list only has {} layer{}",
4409                depth,
4410                n_layers,
4411                if n_layers == 1 { "" } else { "s" }
4412            )
4413        }
4414    }
4415
4416    let mut exprs = Vec::with_capacity(depth + 1);
4417    exprs.push(expr);
4418
4419    for i in indexes {
4420        exprs.push(plan_expr(ecx, i)?.cast_to(
4421            ecx,
4422            CastContext::Explicit,
4423            &SqlScalarType::Int64,
4424        )?);
4425    }
4426
4427    Ok((
4428        n_layers - depth,
4429        HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4430    ))
4431}
4432
4433fn plan_slice_list(
4434    ecx: &ExprContext,
4435    expr: HirScalarExpr,
4436    slices: &[SubscriptPosition<Aug>],
4437    n_layers: usize,
4438    elem_type_name: &str,
4439) -> Result<HirScalarExpr, PlanError> {
4440    if n_layers == 0 {
4441        sql_bail!("cannot subscript type {}", elem_type_name)
4442    }
4443
4444    // first arg will be list
4445    let mut exprs = Vec::with_capacity(slices.len() + 1);
4446    exprs.push(expr);
4447    // extract (start, end) parts from collected slices
4448    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4449        Ok(match position {
4450            Some(p) => {
4451                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4452            }
4453            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4454        })
4455    };
4456    for p in slices {
4457        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4458        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4459        exprs.push(start);
4460        exprs.push(end);
4461    }
4462
4463    Ok(HirScalarExpr::call_variadic(
4464        VariadicFunc::ListSliceLinear,
4465        exprs,
4466    ))
4467}
4468
4469fn plan_like(
4470    ecx: &ExprContext,
4471    expr: &Expr<Aug>,
4472    pattern: &Expr<Aug>,
4473    escape: Option<&Expr<Aug>>,
4474    case_insensitive: bool,
4475    not: bool,
4476) -> Result<HirScalarExpr, PlanError> {
4477    use CastContext::Implicit;
4478    let ecx = ecx.with_name("LIKE argument");
4479    let expr = plan_expr(&ecx, expr)?;
4480    let haystack = match ecx.scalar_type(&expr) {
4481        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4482            .type_as(&ecx, ty)?
4483            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4484        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4485    };
4486    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4487    if let Some(escape) = escape {
4488        pattern = pattern.call_binary(
4489            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4490            expr_func::LikeEscape,
4491        );
4492    }
4493    let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
4494    if not {
4495        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4496    } else {
4497        Ok(like)
4498    }
4499}
4500
4501fn plan_subscript_jsonb(
4502    ecx: &ExprContext,
4503    expr: HirScalarExpr,
4504    positions: &[SubscriptPosition<Aug>],
4505) -> Result<CoercibleScalarExpr, PlanError> {
4506    use CastContext::Implicit;
4507    use SqlScalarType::{Int64, String};
4508
4509    // JSONB doesn't support the slicing syntax, so simply error if you
4510    // encounter any explicit slices.
4511    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4512
4513    let mut exprs = Vec::with_capacity(subscripts.len());
4514    for s in subscripts {
4515        let subscript = plan_expr(ecx, s)?;
4516        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4517            subscript
4518        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4519            // Integers are converted to a string here and then re-parsed as an
4520            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4521            // do it.
4522            typeconv::to_string(ecx, subscript)
4523        } else {
4524            sql_bail!("jsonb subscript type must be coercible to integer or text");
4525        };
4526        exprs.push(subscript);
4527    }
4528
4529    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4530    // `expr->subscript` as you might expect.
4531    let expr = expr.call_binary(
4532        HirScalarExpr::call_variadic(
4533            VariadicFunc::ArrayCreate {
4534                elem_type: SqlScalarType::String,
4535            },
4536            exprs,
4537        ),
4538        BinaryFunc::JsonbGetPath,
4539    );
4540    Ok(expr.into())
4541}
4542
4543fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4544    if !ecx.allow_subqueries {
4545        sql_bail!("{} does not allow subqueries", ecx.name)
4546    }
4547    let mut qcx = ecx.derived_query_context();
4548    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4549    Ok(expr.exists().into())
4550}
4551
4552fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4553    if !ecx.allow_subqueries {
4554        sql_bail!("{} does not allow subqueries", ecx.name)
4555    }
4556    let mut qcx = ecx.derived_query_context();
4557    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4558    let column_types = qcx.relation_type(&expr).column_types;
4559    if column_types.len() != 1 {
4560        sql_bail!(
4561            "Expected subselect to return 1 column, got {} columns",
4562            column_types.len()
4563        );
4564    }
4565    Ok(expr.select().into())
4566}
4567
4568fn plan_list_subquery(
4569    ecx: &ExprContext,
4570    query: &Query<Aug>,
4571) -> Result<CoercibleScalarExpr, PlanError> {
4572    plan_vector_like_subquery(
4573        ecx,
4574        query,
4575        |_| false,
4576        |elem_type| VariadicFunc::ListCreate { elem_type },
4577        |order_by| AggregateFunc::ListConcat { order_by },
4578        expr_func::ListListConcat.into(),
4579        |elem_type| {
4580            HirScalarExpr::literal(
4581                Datum::empty_list(),
4582                SqlScalarType::List {
4583                    element_type: Box::new(elem_type),
4584                    custom_id: None,
4585                },
4586            )
4587        },
4588        "list",
4589    )
4590}
4591
4592fn plan_array_subquery(
4593    ecx: &ExprContext,
4594    query: &Query<Aug>,
4595) -> Result<CoercibleScalarExpr, PlanError> {
4596    plan_vector_like_subquery(
4597        ecx,
4598        query,
4599        |elem_type| {
4600            matches!(
4601                elem_type,
4602                SqlScalarType::Char { .. }
4603                    | SqlScalarType::Array { .. }
4604                    | SqlScalarType::List { .. }
4605                    | SqlScalarType::Map { .. }
4606            )
4607        },
4608        |elem_type| VariadicFunc::ArrayCreate { elem_type },
4609        |order_by| AggregateFunc::ArrayConcat { order_by },
4610        expr_func::ArrayArrayConcat.into(),
4611        |elem_type| {
4612            HirScalarExpr::literal(
4613                Datum::empty_array(),
4614                SqlScalarType::Array(Box::new(elem_type)),
4615            )
4616        },
4617        "[]",
4618    )
4619}
4620
4621/// Generic function used to plan both array subqueries and list subqueries
4622fn plan_vector_like_subquery<F1, F2, F3, F4>(
4623    ecx: &ExprContext,
4624    query: &Query<Aug>,
4625    is_unsupported_type: F1,
4626    vector_create: F2,
4627    aggregate_concat: F3,
4628    binary_concat: BinaryFunc,
4629    empty_literal: F4,
4630    vector_type_string: &str,
4631) -> Result<CoercibleScalarExpr, PlanError>
4632where
4633    F1: Fn(&SqlScalarType) -> bool,
4634    F2: Fn(SqlScalarType) -> VariadicFunc,
4635    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4636    F4: Fn(SqlScalarType) -> HirScalarExpr,
4637{
4638    if !ecx.allow_subqueries {
4639        sql_bail!("{} does not allow subqueries", ecx.name)
4640    }
4641
4642    let mut qcx = ecx.derived_query_context();
4643    let mut planned_query = plan_query(&mut qcx, query)?;
4644    if planned_query.limit.is_some()
4645        || !planned_query
4646            .offset
4647            .clone()
4648            .try_into_literal_int64()
4649            .is_ok_and(|offset| offset == 0)
4650    {
4651        planned_query.expr = HirRelationExpr::top_k(
4652            planned_query.expr,
4653            vec![],
4654            planned_query.order_by.clone(),
4655            planned_query.limit,
4656            planned_query.offset,
4657            planned_query.group_size_hints.limit_input_group_size,
4658        );
4659    }
4660
4661    if planned_query.project.len() != 1 {
4662        sql_bail!(
4663            "Expected subselect to return 1 column, got {} columns",
4664            planned_query.project.len()
4665        );
4666    }
4667
4668    let project_column = *planned_query.project.get(0).unwrap();
4669    let elem_type = qcx
4670        .relation_type(&planned_query.expr)
4671        .column_types
4672        .get(project_column)
4673        .cloned()
4674        .unwrap()
4675        .scalar_type();
4676
4677    if is_unsupported_type(&elem_type) {
4678        bail_unsupported!(format!(
4679            "cannot build array from subquery because return type {}{}",
4680            ecx.humanize_scalar_type(&elem_type, false),
4681            vector_type_string
4682        ));
4683    }
4684
4685    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4686    // subquery above.
4687    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4688        vector_create(elem_type.clone()),
4689        vec![HirScalarExpr::column(project_column)],
4690    ))
4691    .chain(
4692        planned_query
4693            .order_by
4694            .iter()
4695            .map(|co| HirScalarExpr::column(co.column)),
4696    )
4697    .collect();
4698
4699    // However, column references for `aggregation_projection` and `aggregation_order_by`
4700    // are with reference to the `exprs` of the aggregation expression.  Here that is
4701    // `aggregation_exprs`.
4702    let aggregation_projection = vec![0];
4703    let aggregation_order_by = planned_query
4704        .order_by
4705        .into_iter()
4706        .enumerate()
4707        .map(|(i, order)| ColumnOrder { column: i, ..order })
4708        .collect();
4709
4710    let reduced_expr = planned_query
4711        .expr
4712        .reduce(
4713            vec![],
4714            vec![AggregateExpr {
4715                func: aggregate_concat(aggregation_order_by),
4716                expr: Box::new(HirScalarExpr::call_variadic(
4717                    VariadicFunc::RecordCreate {
4718                        field_names: iter::repeat(ColumnName::from(""))
4719                            .take(aggregation_exprs.len())
4720                            .collect(),
4721                    },
4722                    aggregation_exprs,
4723                )),
4724                distinct: false,
4725            }],
4726            None,
4727        )
4728        .project(aggregation_projection);
4729
4730    // If `expr` has no rows, return an empty array/list rather than NULL.
4731    Ok(reduced_expr
4732        .select()
4733        .call_binary(empty_literal(elem_type), binary_concat)
4734        .into())
4735}
4736
4737fn plan_map_subquery(
4738    ecx: &ExprContext,
4739    query: &Query<Aug>,
4740) -> Result<CoercibleScalarExpr, PlanError> {
4741    if !ecx.allow_subqueries {
4742        sql_bail!("{} does not allow subqueries", ecx.name)
4743    }
4744
4745    let mut qcx = ecx.derived_query_context();
4746    let mut query = plan_query(&mut qcx, query)?;
4747    if query.limit.is_some()
4748        || !query
4749            .offset
4750            .clone()
4751            .try_into_literal_int64()
4752            .is_ok_and(|offset| offset == 0)
4753    {
4754        query.expr = HirRelationExpr::top_k(
4755            query.expr,
4756            vec![],
4757            query.order_by.clone(),
4758            query.limit,
4759            query.offset,
4760            query.group_size_hints.limit_input_group_size,
4761        );
4762    }
4763    if query.project.len() != 2 {
4764        sql_bail!(
4765            "expected map subquery to return 2 columns, got {} columns",
4766            query.project.len()
4767        );
4768    }
4769
4770    let query_types = qcx.relation_type(&query.expr).column_types;
4771    let key_column = query.project[0];
4772    let key_type = query_types[key_column].clone().scalar_type();
4773    let value_column = query.project[1];
4774    let value_type = query_types[value_column].clone().scalar_type();
4775
4776    if key_type != SqlScalarType::String {
4777        sql_bail!("cannot build map from subquery because first column is not of type text");
4778    }
4779
4780    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4781        VariadicFunc::RecordCreate {
4782            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4783        },
4784        vec![
4785            HirScalarExpr::column(key_column),
4786            HirScalarExpr::column(value_column),
4787        ],
4788    ))
4789    .chain(
4790        query
4791            .order_by
4792            .iter()
4793            .map(|co| HirScalarExpr::column(co.column)),
4794    )
4795    .collect();
4796
4797    let expr = query
4798        .expr
4799        .reduce(
4800            vec![],
4801            vec![AggregateExpr {
4802                func: AggregateFunc::MapAgg {
4803                    order_by: query
4804                        .order_by
4805                        .into_iter()
4806                        .enumerate()
4807                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4808                        .collect(),
4809                    value_type: value_type.clone(),
4810                },
4811                expr: Box::new(HirScalarExpr::call_variadic(
4812                    VariadicFunc::RecordCreate {
4813                        field_names: iter::repeat(ColumnName::from(""))
4814                            .take(aggregation_exprs.len())
4815                            .collect(),
4816                    },
4817                    aggregation_exprs,
4818                )),
4819                distinct: false,
4820            }],
4821            None,
4822        )
4823        .project(vec![0]);
4824
4825    // If `expr` has no rows, return an empty map rather than NULL.
4826    let expr = HirScalarExpr::call_variadic(
4827        VariadicFunc::Coalesce,
4828        vec![
4829            expr.select(),
4830            HirScalarExpr::literal(
4831                Datum::empty_map(),
4832                SqlScalarType::Map {
4833                    value_type: Box::new(value_type),
4834                    custom_id: None,
4835                },
4836            ),
4837        ],
4838    );
4839
4840    Ok(expr.into())
4841}
4842
4843fn plan_collate(
4844    ecx: &ExprContext,
4845    expr: &Expr<Aug>,
4846    collation: &UnresolvedItemName,
4847) -> Result<CoercibleScalarExpr, PlanError> {
4848    if collation.0.len() == 2
4849        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4850        && collation.0[1] == ident!("default")
4851    {
4852        plan_expr(ecx, expr)
4853    } else {
4854        bail_unsupported!("COLLATE");
4855    }
4856}
4857
4858/// Plans a slice of expressions.
4859///
4860/// This function is a simple convenience function for mapping [`plan_expr`]
4861/// over a slice of expressions. The planned expressions are returned in the
4862/// same order as the input. If any of the expressions fail to plan, returns an
4863/// error instead.
4864fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4865where
4866    E: std::borrow::Borrow<Expr<Aug>>,
4867{
4868    let mut out = vec![];
4869    for expr in exprs {
4870        out.push(plan_expr(ecx, expr.borrow())?);
4871    }
4872    Ok(out)
4873}
4874
4875/// Plans an `ARRAY` expression.
4876fn plan_array(
4877    ecx: &ExprContext,
4878    exprs: &[Expr<Aug>],
4879    type_hint: Option<&SqlScalarType>,
4880) -> Result<CoercibleScalarExpr, PlanError> {
4881    // Plan each element expression.
4882    let mut out = vec![];
4883    for expr in exprs {
4884        out.push(match expr {
4885            // Special case nested ARRAY expressions so we can plumb
4886            // the type hint through.
4887            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4888            _ => plan_expr(ecx, expr)?,
4889        });
4890    }
4891
4892    // Attempt to make use of the type hint.
4893    let type_hint = match type_hint {
4894        // The user has provided an explicit cast to an array type. We know the
4895        // element type to coerce to. Need to be careful, though: if there's
4896        // evidence that any of the array elements are themselves arrays, we
4897        // want to coerce to the array type, not the element type.
4898        Some(SqlScalarType::Array(elem_type)) => {
4899            let multidimensional = out.iter().any(|e| {
4900                matches!(
4901                    ecx.scalar_type(e),
4902                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4903                )
4904            });
4905            if multidimensional {
4906                type_hint
4907            } else {
4908                Some(&**elem_type)
4909            }
4910        }
4911        // The user provided an explicit cast to a non-array type. We'll have to
4912        // guess what the correct type for the array. Our caller will then
4913        // handle converting that array type to the desired non-array type.
4914        Some(_) => None,
4915        // No type hint. We'll have to guess the correct type for the array.
4916        None => None,
4917    };
4918
4919    // Coerce all elements to the same type.
4920    let (elem_type, exprs) = if exprs.is_empty() {
4921        if let Some(elem_type) = type_hint {
4922            (elem_type.clone(), vec![])
4923        } else {
4924            sql_bail!("cannot determine type of empty array");
4925        }
4926    } else {
4927        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4928        (ecx.scalar_type(&out[0]), out)
4929    };
4930
4931    // Arrays of `char` type are disallowed due to a known limitation:
4932    // https://github.com/MaterializeInc/database-issues/issues/2360.
4933    //
4934    // Arrays of `list` and `map` types are disallowed due to mind-bending
4935    // semantics.
4936    if matches!(
4937        elem_type,
4938        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4939    ) {
4940        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4941    }
4942
4943    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4944}
4945
4946fn plan_list(
4947    ecx: &ExprContext,
4948    exprs: &[Expr<Aug>],
4949    type_hint: Option<&SqlScalarType>,
4950) -> Result<CoercibleScalarExpr, PlanError> {
4951    let (elem_type, exprs) = if exprs.is_empty() {
4952        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4953            (element_type.without_modifiers(), vec![])
4954        } else {
4955            sql_bail!("cannot determine type of empty list");
4956        }
4957    } else {
4958        let type_hint = match type_hint {
4959            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4960            _ => None,
4961        };
4962
4963        let mut out = vec![];
4964        for expr in exprs {
4965            out.push(match expr {
4966                // Special case nested LIST expressions so we can plumb
4967                // the type hint through.
4968                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4969                _ => plan_expr(ecx, expr)?,
4970            });
4971        }
4972        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4973        (ecx.scalar_type(&out[0]).without_modifiers(), out)
4974    };
4975
4976    if matches!(elem_type, SqlScalarType::Char { .. }) {
4977        bail_unsupported!("char list");
4978    }
4979
4980    Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4981}
4982
4983fn plan_map(
4984    ecx: &ExprContext,
4985    entries: &[MapEntry<Aug>],
4986    type_hint: Option<&SqlScalarType>,
4987) -> Result<CoercibleScalarExpr, PlanError> {
4988    let (value_type, exprs) = if entries.is_empty() {
4989        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
4990            (value_type.without_modifiers(), vec![])
4991        } else {
4992            sql_bail!("cannot determine type of empty map");
4993        }
4994    } else {
4995        let type_hint = match type_hint {
4996            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
4997            _ => None,
4998        };
4999
5000        let mut keys = vec![];
5001        let mut values = vec![];
5002        for MapEntry { key, value } in entries {
5003            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5004            let value = match value {
5005                // Special case nested MAP expressions so we can plumb
5006                // the type hint through.
5007                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5008                _ => plan_expr(ecx, value)?,
5009            };
5010            keys.push(key);
5011            values.push(value);
5012        }
5013        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5014        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5015        let out = itertools::interleave(keys, values).collect();
5016        (value_type, out)
5017    };
5018
5019    if matches!(value_type, SqlScalarType::Char { .. }) {
5020        bail_unsupported!("char map");
5021    }
5022
5023    let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5024    Ok(expr.into())
5025}
5026
5027/// Coerces a list of expressions such that all input expressions will be cast
5028/// to the same type. If successful, returns a new list of expressions in the
5029/// same order as the input, where each expression has the appropriate casts to
5030/// make them all of a uniform type.
5031///
5032/// If `force_type` is `Some`, the expressions are forced to the specified type
5033/// via an explicit cast. Otherwise the best common type is guessed via
5034/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5035/// implicit casts
5036///
5037/// Note that this is our implementation of Postgres' type conversion for
5038/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5039/// isn't yet used in all of those cases.
5040///
5041/// [union-type-conv]:
5042/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5043pub fn coerce_homogeneous_exprs(
5044    ecx: &ExprContext,
5045    exprs: Vec<CoercibleScalarExpr>,
5046    force_type: Option<&SqlScalarType>,
5047) -> Result<Vec<HirScalarExpr>, PlanError> {
5048    assert!(!exprs.is_empty());
5049
5050    let target_holder;
5051    let target = match force_type {
5052        Some(t) => t,
5053        None => {
5054            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5055            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5056            &target_holder
5057        }
5058    };
5059
5060    // Try to cast all expressions to `target`.
5061    let mut out = Vec::new();
5062    for expr in exprs {
5063        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5064        let ccx = match force_type {
5065            None => CastContext::Implicit,
5066            Some(_) => CastContext::Explicit,
5067        };
5068        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5069            Ok(expr) => out.push(expr),
5070            Err(_) => sql_bail!(
5071                "{} could not convert type {} to {}",
5072                ecx.name,
5073                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5074                ecx.humanize_scalar_type(target, false),
5075            ),
5076        }
5077    }
5078    Ok(out)
5079}
5080
5081/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5082/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5083pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5084    obe: &OrderByExpr<T>,
5085    column: usize,
5086) -> ColumnOrder {
5087    let desc = !obe.asc.unwrap_or(true);
5088    ColumnOrder {
5089        column,
5090        desc,
5091        // https://www.postgresql.org/docs/14/queries-order.html
5092        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5093        nulls_last: obe.nulls_last.unwrap_or(!desc),
5094    }
5095}
5096
5097/// Plans the ORDER BY clause of a window function.
5098///
5099/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5100/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5101/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5102/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5103/// `Vec<HirScalarExpr>` that we return.
5104fn plan_function_order_by(
5105    ecx: &ExprContext,
5106    order_by: &[OrderByExpr<Aug>],
5107) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5108    let mut order_by_exprs = vec![];
5109    let mut col_orders = vec![];
5110    {
5111        for (i, obe) in order_by.iter().enumerate() {
5112            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5113            // do not support ordinal references in PostgreSQL. So we use
5114            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5115            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5116            order_by_exprs.push(expr);
5117            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5118        }
5119    }
5120    Ok((order_by_exprs, col_orders))
5121}
5122
5123/// Common part of the planning of windowed and non-windowed aggregation functions.
5124fn plan_aggregate_common(
5125    ecx: &ExprContext,
5126    Function::<Aug> {
5127        name,
5128        args,
5129        filter,
5130        over: _,
5131        distinct,
5132    }: &Function<Aug>,
5133) -> Result<AggregateExpr, PlanError> {
5134    // Normal aggregate functions, like `sum`, expect as input a single expression
5135    // which yields the datum to aggregate. Order sensitive aggregate functions,
5136    // like `jsonb_agg`, are special, and instead expect a Record whose first
5137    // element yields the datum to aggregate and whose successive elements yield
5138    // keys to order by. This expectation is hard coded within the implementation
5139    // of each of the order-sensitive aggregates. The specification of how many
5140    // order by keys to consider, and in what order, is passed via the `order_by`
5141    // field on the `AggregateFunc` variant.
5142
5143    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5144    // most, so explicitly drop it if the function doesn't care about order. This
5145    // prevents the projection into Record below from triggering on unsupported
5146    // functions.
5147
5148    let impls = match resolve_func(ecx, name, args)? {
5149        Func::Aggregate(impls) => impls,
5150        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5151    };
5152
5153    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5154    // generalized function selection framework. The rule is simple: the user
5155    // must type `count(*)`, but the function selection framework sees an empty
5156    // parameter list, as if the user had typed `count()`. But if the user types
5157    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5158    // rules to all aggregates, not just `count`, since we may one day support
5159    // user-defined aggregates, including user-defined aggregates that take no
5160    // parameters.
5161    let (args, order_by) = match &args {
5162        FunctionArgs::Star => (vec![], vec![]),
5163        FunctionArgs::Args { args, order_by } => {
5164            if args.is_empty() {
5165                sql_bail!(
5166                    "{}(*) must be used to call a parameterless aggregate function",
5167                    ecx.qcx
5168                        .scx
5169                        .humanize_resolved_name(name)
5170                        .expect("name actually resolved")
5171                );
5172            }
5173            let args = plan_exprs(ecx, args)?;
5174            (args, order_by.clone())
5175        }
5176    };
5177
5178    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5179
5180    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5181    if let Some(filter) = &filter {
5182        // If a filter is present, as in
5183        //
5184        //     <agg>(<expr>) FILTER (WHERE <cond>)
5185        //
5186        // we plan it by essentially rewriting the expression to
5187        //
5188        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5189        //
5190        // where <identity> is the identity input for <agg>.
5191        let cond =
5192            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5193        let expr_typ = ecx.scalar_type(&expr);
5194        expr = HirScalarExpr::if_then_else(
5195            cond,
5196            expr,
5197            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5198        );
5199    }
5200
5201    let mut seen_outer = false;
5202    let mut seen_inner = false;
5203    #[allow(deprecated)]
5204    expr.visit_columns(0, &mut |depth, col| {
5205        if depth == 0 && col.level == 0 {
5206            seen_inner = true;
5207        } else if col.level > depth {
5208            seen_outer = true;
5209        }
5210    });
5211    if seen_outer && !seen_inner {
5212        bail_unsupported!(
5213            3720,
5214            "aggregate functions that refer exclusively to outer columns"
5215        );
5216    }
5217
5218    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5219    // map the needed expressions into the aggregate datum.
5220    if func.is_order_sensitive() {
5221        let field_names = iter::repeat(ColumnName::from(""))
5222            .take(1 + order_by_exprs.len())
5223            .collect();
5224        let mut exprs = vec![expr];
5225        exprs.extend(order_by_exprs);
5226        expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5227    }
5228
5229    Ok(AggregateExpr {
5230        func,
5231        expr: Box::new(expr),
5232        distinct: *distinct,
5233    })
5234}
5235
5236fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5237    let mut names = names.to_vec();
5238    let col_name = normalize::column_name(names.pop().unwrap());
5239
5240    // If the name is qualified, it must refer to a column in a table.
5241    if !names.is_empty() {
5242        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5243        let (i, i_name) = ecx.scope.resolve_table_column(
5244            &ecx.qcx.outer_scopes,
5245            &table_name,
5246            &col_name,
5247            &mut ecx.qcx.name_manager.borrow_mut(),
5248        )?;
5249        return Ok(HirScalarExpr::named_column(i, i_name));
5250    }
5251
5252    // If the name is unqualified, first check if it refers to a column. Track any similar names
5253    // that might exist for a better error message.
5254    let similar_names = match ecx.scope.resolve_column(
5255        &ecx.qcx.outer_scopes,
5256        &col_name,
5257        &mut ecx.qcx.name_manager.borrow_mut(),
5258    ) {
5259        Ok((i, i_name)) => {
5260            return Ok(HirScalarExpr::named_column(i, i_name));
5261        }
5262        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5263        Err(e) => return Err(e),
5264    };
5265
5266    // The name doesn't refer to a column. Check if it is a whole-row reference
5267    // to a table.
5268    let items = ecx.scope.items_from_table(
5269        &ecx.qcx.outer_scopes,
5270        &PartialItemName {
5271            database: None,
5272            schema: None,
5273            item: col_name.as_str().to_owned(),
5274        },
5275    )?;
5276    match items.as_slice() {
5277        // The name doesn't refer to a table either. Return an error.
5278        [] => Err(PlanError::UnknownColumn {
5279            table: None,
5280            column: col_name,
5281            similar: similar_names,
5282        }),
5283        // The name refers to a table that is the result of a function that
5284        // returned a single column. Per PostgreSQL, this is a special case
5285        // that returns the value directly.
5286        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5287        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5288            *column,
5289            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5290        )),
5291        // The name refers to a normal table. Return a record containing all the
5292        // columns of the table.
5293        _ => {
5294            let mut has_exists_column = None;
5295            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5296                .into_iter()
5297                .filter_map(|(column, item)| {
5298                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5299                        has_exists_column = Some(column);
5300                        None
5301                    } else {
5302                        let expr = HirScalarExpr::named_column(
5303                            column,
5304                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5305                        );
5306                        let name = item.column_name.clone();
5307                        Some((expr, name))
5308                    }
5309                })
5310                .unzip();
5311            // For the special case of a table function with a single column, the single column is instead not wrapped.
5312            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5313                exprs.into_element()
5314            } else {
5315                HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5316            };
5317            if let Some(has_exists_column) = has_exists_column {
5318                Ok(HirScalarExpr::if_then_else(
5319                    HirScalarExpr::unnamed_column(has_exists_column)
5320                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5321                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5322                    expr,
5323                ))
5324            } else {
5325                Ok(expr)
5326            }
5327        }
5328    }
5329}
5330
5331fn plan_op(
5332    ecx: &ExprContext,
5333    op: &str,
5334    expr1: &Expr<Aug>,
5335    expr2: Option<&Expr<Aug>>,
5336) -> Result<HirScalarExpr, PlanError> {
5337    let impls = func::resolve_op(op)?;
5338    let args = match expr2 {
5339        None => plan_exprs(ecx, &[expr1])?,
5340        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5341    };
5342    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5343}
5344
5345fn plan_function<'a>(
5346    ecx: &ExprContext,
5347    f @ Function {
5348        name,
5349        args,
5350        filter,
5351        over,
5352        distinct,
5353    }: &'a Function<Aug>,
5354) -> Result<HirScalarExpr, PlanError> {
5355    let impls = match resolve_func(ecx, name, args)? {
5356        Func::Table(_) => {
5357            sql_bail!(
5358                "table functions are not allowed in {} (function {})",
5359                ecx.name,
5360                name
5361            );
5362        }
5363        Func::Scalar(impls) => {
5364            if over.is_some() {
5365                sql_bail!(
5366                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5367                );
5368            }
5369            impls
5370        }
5371        Func::ScalarWindow(impls) => {
5372            let (
5373                ignore_nulls,
5374                order_by_exprs,
5375                col_orders,
5376                _window_frame,
5377                partition_by,
5378                scalar_args,
5379            ) = plan_window_function_non_aggr(ecx, f)?;
5380
5381            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5382            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5383            // msg there is less informative.)
5384            if !scalar_args.is_empty() {
5385                if let ResolvedItemName::Item {
5386                    full_name: FullItemName { item, .. },
5387                    ..
5388                } = name
5389                {
5390                    sql_bail!(
5391                        "function {} has 0 parameters, but was called with {}",
5392                        item,
5393                        scalar_args.len()
5394                    );
5395                }
5396            }
5397
5398            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5399            // accept a window frame here without an error msg. (Postgres also does this.)
5400            // TODO: maybe we should give a notice
5401
5402            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5403
5404            if ignore_nulls {
5405                // If we ever add a scalar window function that supports ignore, then don't forget
5406                // to also update HIR EXPLAIN.
5407                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5408            }
5409
5410            return Ok(HirScalarExpr::windowing(WindowExpr {
5411                func: WindowExprType::Scalar(ScalarWindowExpr {
5412                    func,
5413                    order_by: col_orders,
5414                }),
5415                partition_by,
5416                order_by: order_by_exprs,
5417            }));
5418        }
5419        Func::ValueWindow(impls) => {
5420            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5421                plan_window_function_non_aggr(ecx, f)?;
5422
5423            let (args_encoded, func) =
5424                func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5425
5426            if ignore_nulls {
5427                match func {
5428                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5429                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5430                }
5431            }
5432
5433            return Ok(HirScalarExpr::windowing(WindowExpr {
5434                func: WindowExprType::Value(ValueWindowExpr {
5435                    func,
5436                    args: Box::new(args_encoded),
5437                    order_by: col_orders,
5438                    window_frame,
5439                    ignore_nulls, // (RESPECT NULLS is the default)
5440                }),
5441                partition_by,
5442                order_by: order_by_exprs,
5443            }));
5444        }
5445        Func::Aggregate(_) => {
5446            if f.over.is_none() {
5447                // Not a window aggregate. Something is wrong.
5448                if ecx.allow_aggregates {
5449                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5450                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5451                    sql_bail!(
5452                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5453                        name,
5454                    );
5455                } else {
5456                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5457                    // because it was in an unsupported context.
5458                    sql_bail!(
5459                        "aggregate functions are not allowed in {} (function {})",
5460                        ecx.name,
5461                        name
5462                    );
5463                }
5464            } else {
5465                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5466                    plan_window_function_common(ecx, &f.name, &f.over)?;
5467
5468                // https://github.com/MaterializeInc/database-issues/issues/6720
5469                match (&window_frame.start_bound, &window_frame.end_bound) {
5470                    (
5471                        mz_expr::WindowFrameBound::UnboundedPreceding,
5472                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5473                    )
5474                    | (
5475                        mz_expr::WindowFrameBound::UnboundedPreceding,
5476                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5477                    )
5478                    | (
5479                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5480                        mz_expr::WindowFrameBound::UnboundedFollowing,
5481                    )
5482                    | (
5483                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5484                        mz_expr::WindowFrameBound::UnboundedFollowing,
5485                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5486                    (_, _) => {} // other cases are ok
5487                }
5488
5489                if ignore_nulls {
5490                    // https://github.com/MaterializeInc/database-issues/issues/6722
5491                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5492                    // forget to also update HIR EXPLAIN.
5493                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5494                }
5495
5496                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5497
5498                if aggregate_expr.distinct {
5499                    // https://github.com/MaterializeInc/database-issues/issues/6626
5500                    bail_unsupported!("DISTINCT in window aggregates");
5501                }
5502
5503                return Ok(HirScalarExpr::windowing(WindowExpr {
5504                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5505                        aggregate_expr,
5506                        order_by: col_orders,
5507                        window_frame,
5508                    }),
5509                    partition_by,
5510                    order_by: order_by_exprs,
5511                }));
5512            }
5513        }
5514    };
5515
5516    if over.is_some() {
5517        unreachable!("If there is an OVER clause, we should have returned already above.");
5518    }
5519
5520    if *distinct {
5521        sql_bail!(
5522            "DISTINCT specified, but {} is not an aggregate function",
5523            ecx.qcx
5524                .scx
5525                .humanize_resolved_name(name)
5526                .expect("already resolved")
5527        );
5528    }
5529    if filter.is_some() {
5530        sql_bail!(
5531            "FILTER specified, but {} is not an aggregate function",
5532            ecx.qcx
5533                .scx
5534                .humanize_resolved_name(name)
5535                .expect("already resolved")
5536        );
5537    }
5538
5539    let scalar_args = match &args {
5540        FunctionArgs::Star => {
5541            sql_bail!(
5542                "* argument is invalid with non-aggregate function {}",
5543                ecx.qcx
5544                    .scx
5545                    .humanize_resolved_name(name)
5546                    .expect("already resolved")
5547            )
5548        }
5549        FunctionArgs::Args { args, order_by } => {
5550            if !order_by.is_empty() {
5551                sql_bail!(
5552                    "ORDER BY specified, but {} is not an aggregate function",
5553                    ecx.qcx
5554                        .scx
5555                        .humanize_resolved_name(name)
5556                        .expect("already resolved")
5557                );
5558            }
5559            plan_exprs(ecx, args)?
5560        }
5561    };
5562
5563    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5564}
5565
5566pub const IGNORE_NULLS_ERROR_MSG: &str =
5567    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5568
5569/// Resolves the name to a set of function implementations.
5570///
5571/// If the name does not specify a known built-in function, returns an error.
5572pub fn resolve_func(
5573    ecx: &ExprContext,
5574    name: &ResolvedItemName,
5575    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5576) -> Result<&'static Func, PlanError> {
5577    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5578        if let Ok(f) = i.func() {
5579            return Ok(f);
5580        }
5581    }
5582
5583    // Couldn't resolve function with this name, so generate verbose error
5584    // message.
5585    let cexprs = match args {
5586        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5587        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5588            if !order_by.is_empty() {
5589                sql_bail!(
5590                    "ORDER BY specified, but {} is not an aggregate function",
5591                    name
5592                );
5593            }
5594            plan_exprs(ecx, args)?
5595        }
5596    };
5597
5598    let arg_types: Vec<_> = cexprs
5599        .into_iter()
5600        .map(|ty| match ecx.scalar_type(&ty) {
5601            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5602            CoercibleScalarType::Record(_) => "record".to_string(),
5603            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5604        })
5605        .collect();
5606
5607    Err(PlanError::UnknownFunction {
5608        name: name.to_string(),
5609        arg_types,
5610    })
5611}
5612
5613fn plan_is_expr<'a>(
5614    ecx: &ExprContext,
5615    expr: &'a Expr<Aug>,
5616    construct: &IsExprConstruct<Aug>,
5617    not: bool,
5618) -> Result<HirScalarExpr, PlanError> {
5619    let expr = plan_expr(ecx, expr)?;
5620    let mut expr = match construct {
5621        IsExprConstruct::Null => {
5622            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5623            // at odds with our type coercion rules, which treat `NULL` literals
5624            // and unconstrained parameters identically. Providing a type hint
5625            // of string means we wind up supporting both.
5626            let expr = expr.type_as_any(ecx)?;
5627            expr.call_is_null()
5628        }
5629        IsExprConstruct::Unknown => {
5630            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5631            expr.call_is_null()
5632        }
5633        IsExprConstruct::True => {
5634            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5635            expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5636        }
5637        IsExprConstruct::False => {
5638            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5639            expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5640        }
5641        IsExprConstruct::DistinctFrom(expr2) => {
5642            let expr1 = expr.type_as_any(ecx)?;
5643            let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5644            // There are three cases:
5645            // 1. Both terms are non-null, in which case the result should be `a != b`.
5646            // 2. Exactly one term is null, in which case the result should be true.
5647            // 3. Both terms are null, in which case the result should be false.
5648            //
5649            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5650            let term1 = HirScalarExpr::variadic_or(vec![
5651                expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5652                expr1.clone().call_is_null(),
5653                expr2.clone().call_is_null(),
5654            ]);
5655            let term2 = HirScalarExpr::variadic_or(vec![
5656                expr1.call_is_null().not(),
5657                expr2.call_is_null().not(),
5658            ]);
5659            term1.and(term2)
5660        }
5661    };
5662    if not {
5663        expr = expr.not();
5664    }
5665    Ok(expr)
5666}
5667
5668fn plan_case<'a>(
5669    ecx: &ExprContext,
5670    operand: &'a Option<Box<Expr<Aug>>>,
5671    conditions: &'a [Expr<Aug>],
5672    results: &'a [Expr<Aug>],
5673    else_result: &'a Option<Box<Expr<Aug>>>,
5674) -> Result<HirScalarExpr, PlanError> {
5675    let mut cond_exprs = Vec::new();
5676    let mut result_exprs = Vec::new();
5677    for (c, r) in conditions.iter().zip_eq(results) {
5678        let c = match operand {
5679            Some(operand) => operand.clone().equals(c.clone()),
5680            None => c.clone(),
5681        };
5682        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5683        cond_exprs.push(cexpr);
5684        result_exprs.push(r);
5685    }
5686    result_exprs.push(match else_result {
5687        Some(else_result) => else_result,
5688        None => &Expr::Value(Value::Null),
5689    });
5690    let mut result_exprs = coerce_homogeneous_exprs(
5691        &ecx.with_name("CASE"),
5692        plan_exprs(ecx, &result_exprs)?,
5693        None,
5694    )?;
5695    let mut expr = result_exprs.pop().unwrap();
5696    assert_eq!(cond_exprs.len(), result_exprs.len());
5697    for (cexpr, rexpr) in cond_exprs
5698        .into_iter()
5699        .rev()
5700        .zip_eq(result_exprs.into_iter().rev())
5701    {
5702        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5703    }
5704    Ok(expr)
5705}
5706
5707fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5708    let (datum, scalar_type) = match l {
5709        Value::Number(s) => {
5710            let d = strconv::parse_numeric(s.as_str())?;
5711            if !s.contains(&['E', '.'][..]) {
5712                // Maybe representable as an int?
5713                if let Ok(n) = d.0.try_into() {
5714                    (Datum::Int32(n), SqlScalarType::Int32)
5715                } else if let Ok(n) = d.0.try_into() {
5716                    (Datum::Int64(n), SqlScalarType::Int64)
5717                } else {
5718                    (
5719                        Datum::Numeric(d),
5720                        SqlScalarType::Numeric { max_scale: None },
5721                    )
5722                }
5723            } else {
5724                (
5725                    Datum::Numeric(d),
5726                    SqlScalarType::Numeric { max_scale: None },
5727                )
5728            }
5729        }
5730        Value::HexString(_) => bail_unsupported!("hex string literals"),
5731        Value::Boolean(b) => match b {
5732            false => (Datum::False, SqlScalarType::Bool),
5733            true => (Datum::True, SqlScalarType::Bool),
5734        },
5735        Value::Interval(i) => {
5736            let i = literal::plan_interval(i)?;
5737            (Datum::Interval(i), SqlScalarType::Interval)
5738        }
5739        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5740        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5741    };
5742    let expr = HirScalarExpr::literal(datum, scalar_type);
5743    Ok(expr.into())
5744}
5745
5746/// The common part of the planning of non-aggregate window functions, i.e.,
5747/// scalar window functions and value window functions.
5748fn plan_window_function_non_aggr<'a>(
5749    ecx: &ExprContext,
5750    Function {
5751        name,
5752        args,
5753        filter,
5754        over,
5755        distinct,
5756    }: &'a Function<Aug>,
5757) -> Result<
5758    (
5759        bool,
5760        Vec<HirScalarExpr>,
5761        Vec<ColumnOrder>,
5762        mz_expr::WindowFrame,
5763        Vec<HirScalarExpr>,
5764        Vec<CoercibleScalarExpr>,
5765    ),
5766    PlanError,
5767> {
5768    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5769        plan_window_function_common(ecx, name, over)?;
5770
5771    if *distinct {
5772        sql_bail!(
5773            "DISTINCT specified, but {} is not an aggregate function",
5774            name
5775        );
5776    }
5777
5778    if filter.is_some() {
5779        bail_unsupported!("FILTER in non-aggregate window functions");
5780    }
5781
5782    let scalar_args = match &args {
5783        FunctionArgs::Star => {
5784            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5785        }
5786        FunctionArgs::Args { args, order_by } => {
5787            if !order_by.is_empty() {
5788                sql_bail!(
5789                    "ORDER BY specified, but {} is not an aggregate function",
5790                    name
5791                );
5792            }
5793            plan_exprs(ecx, args)?
5794        }
5795    };
5796
5797    Ok((
5798        ignore_nulls,
5799        order_by_exprs,
5800        col_orders,
5801        window_frame,
5802        partition,
5803        scalar_args,
5804    ))
5805}
5806
5807/// The common part of the planning of all window functions.
5808fn plan_window_function_common(
5809    ecx: &ExprContext,
5810    name: &<Aug as AstInfo>::ItemName,
5811    over: &Option<WindowSpec<Aug>>,
5812) -> Result<
5813    (
5814        bool,
5815        Vec<HirScalarExpr>,
5816        Vec<ColumnOrder>,
5817        mz_expr::WindowFrame,
5818        Vec<HirScalarExpr>,
5819    ),
5820    PlanError,
5821> {
5822    if !ecx.allow_windows {
5823        sql_bail!(
5824            "window functions are not allowed in {} (function {})",
5825            ecx.name,
5826            name
5827        );
5828    }
5829
5830    let window_spec = match over.as_ref() {
5831        Some(over) => over,
5832        None => sql_bail!("window function {} requires an OVER clause", name),
5833    };
5834    if window_spec.ignore_nulls && window_spec.respect_nulls {
5835        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5836    }
5837    let window_frame = match window_spec.window_frame.as_ref() {
5838        Some(frame) => plan_window_frame(frame)?,
5839        None => mz_expr::WindowFrame::default(),
5840    };
5841    let mut partition = Vec::new();
5842    for expr in &window_spec.partition_by {
5843        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5844    }
5845
5846    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5847
5848    Ok((
5849        window_spec.ignore_nulls,
5850        order_by_exprs,
5851        col_orders,
5852        window_frame,
5853        partition,
5854    ))
5855}
5856
5857fn plan_window_frame(
5858    WindowFrame {
5859        units,
5860        start_bound,
5861        end_bound,
5862    }: &WindowFrame,
5863) -> Result<mz_expr::WindowFrame, PlanError> {
5864    use mz_expr::WindowFrameBound::*;
5865    let units = window_frame_unit_ast_to_expr(units)?;
5866    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5867    let end_bound = end_bound
5868        .as_ref()
5869        .map(window_frame_bound_ast_to_expr)
5870        .unwrap_or(CurrentRow);
5871
5872    // Validate bounds according to Postgres rules
5873    match (&start_bound, &end_bound) {
5874        // Start bound can't be UNBOUNDED FOLLOWING
5875        (UnboundedFollowing, _) => {
5876            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5877        }
5878        // End bound can't be UNBOUNDED PRECEDING
5879        (_, UnboundedPreceding) => {
5880            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5881        }
5882        // Start bound should come before end bound in the list of bound definitions
5883        (CurrentRow, OffsetPreceding(_)) => {
5884            sql_bail!("frame starting from current row cannot have preceding rows")
5885        }
5886        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5887            sql_bail!("frame starting from following row cannot have preceding rows")
5888        }
5889        // The above rules are adopted from Postgres.
5890        // The following rules are Materialize-specific.
5891        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5892            // Note that the only hard limit is that partition size + offset should fit in i64, so
5893            // in theory, we could support much larger offsets than this. But for our current
5894            // performance, even 1000000 is quite big.
5895            if *o1 > 1000000 || *o2 > 1000000 {
5896                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5897            }
5898        }
5899        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5900            if *o1 > 1000000 || *o2 > 1000000 {
5901                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5902            }
5903        }
5904        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5905            if *o1 > 1000000 || *o2 > 1000000 {
5906                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5907            }
5908        }
5909        (OffsetPreceding(o), CurrentRow) => {
5910            if *o > 1000000 {
5911                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5912            }
5913        }
5914        (CurrentRow, OffsetFollowing(o)) => {
5915            if *o > 1000000 {
5916                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5917            }
5918        }
5919        // Other bounds are valid
5920        (_, _) => (),
5921    }
5922
5923    // RANGE is only supported in the default frame
5924    // https://github.com/MaterializeInc/database-issues/issues/6585
5925    if units == mz_expr::WindowFrameUnits::Range
5926        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5927    {
5928        bail_unsupported!("RANGE in non-default window frames")
5929    }
5930
5931    let frame = mz_expr::WindowFrame {
5932        units,
5933        start_bound,
5934        end_bound,
5935    };
5936    Ok(frame)
5937}
5938
5939fn window_frame_unit_ast_to_expr(
5940    unit: &WindowFrameUnits,
5941) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5942    match unit {
5943        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5944        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5945        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5946    }
5947}
5948
5949fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5950    match bound {
5951        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5952        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5953        WindowFrameBound::Preceding(Some(offset)) => {
5954            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5955        }
5956        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5957        WindowFrameBound::Following(Some(offset)) => {
5958            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5959        }
5960    }
5961}
5962
5963pub fn scalar_type_from_sql(
5964    scx: &StatementContext,
5965    data_type: &ResolvedDataType,
5966) -> Result<SqlScalarType, PlanError> {
5967    match data_type {
5968        ResolvedDataType::AnonymousList(elem_type) => {
5969            let elem_type = scalar_type_from_sql(scx, elem_type)?;
5970            if matches!(elem_type, SqlScalarType::Char { .. }) {
5971                bail_unsupported!("char list");
5972            }
5973            Ok(SqlScalarType::List {
5974                element_type: Box::new(elem_type),
5975                custom_id: None,
5976            })
5977        }
5978        ResolvedDataType::AnonymousMap {
5979            key_type,
5980            value_type,
5981        } => {
5982            match scalar_type_from_sql(scx, key_type)? {
5983                SqlScalarType::String => {}
5984                other => sql_bail!(
5985                    "map key type must be {}, got {}",
5986                    scx.humanize_scalar_type(&SqlScalarType::String, false),
5987                    scx.humanize_scalar_type(&other, false)
5988                ),
5989            }
5990            Ok(SqlScalarType::Map {
5991                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5992                custom_id: None,
5993            })
5994        }
5995        ResolvedDataType::Named { id, modifiers, .. } => {
5996            scalar_type_from_catalog(scx.catalog, *id, modifiers)
5997        }
5998        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
5999    }
6000}
6001
6002pub fn scalar_type_from_catalog(
6003    catalog: &dyn SessionCatalog,
6004    id: CatalogItemId,
6005    modifiers: &[i64],
6006) -> Result<SqlScalarType, PlanError> {
6007    let entry = catalog.get_item(&id);
6008    let type_details = match entry.type_details() {
6009        Some(type_details) => type_details,
6010        None => {
6011            // Resolution should never produce a `ResolvedDataType::Named` with
6012            // an ID of a non-type, but we error gracefully just in case.
6013            sql_bail!(
6014                "internal error: {} does not refer to a type",
6015                catalog.resolve_full_name(entry.name()).to_string().quoted()
6016            );
6017        }
6018    };
6019    match &type_details.typ {
6020        CatalogType::Numeric => {
6021            let mut modifiers = modifiers.iter().fuse();
6022            let precision = match modifiers.next() {
6023                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6024                    sql_bail!(
6025                        "precision for type numeric must be between 1 and {}",
6026                        NUMERIC_DATUM_MAX_PRECISION,
6027                    );
6028                }
6029                Some(p) => Some(*p),
6030                None => None,
6031            };
6032            let scale = match modifiers.next() {
6033                Some(scale) => {
6034                    if let Some(precision) = precision {
6035                        if *scale > precision {
6036                            sql_bail!(
6037                                "scale for type numeric must be between 0 and precision {}",
6038                                precision
6039                            );
6040                        }
6041                    }
6042                    Some(NumericMaxScale::try_from(*scale)?)
6043                }
6044                None => None,
6045            };
6046            if modifiers.next().is_some() {
6047                sql_bail!("type numeric supports at most two type modifiers");
6048            }
6049            Ok(SqlScalarType::Numeric { max_scale: scale })
6050        }
6051        CatalogType::Char => {
6052            let mut modifiers = modifiers.iter().fuse();
6053            let length = match modifiers.next() {
6054                Some(l) => Some(CharLength::try_from(*l)?),
6055                None => Some(CharLength::ONE),
6056            };
6057            if modifiers.next().is_some() {
6058                sql_bail!("type character supports at most one type modifier");
6059            }
6060            Ok(SqlScalarType::Char { length })
6061        }
6062        CatalogType::VarChar => {
6063            let mut modifiers = modifiers.iter().fuse();
6064            let length = match modifiers.next() {
6065                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6066                None => None,
6067            };
6068            if modifiers.next().is_some() {
6069                sql_bail!("type character varying supports at most one type modifier");
6070            }
6071            Ok(SqlScalarType::VarChar { max_length: length })
6072        }
6073        CatalogType::Timestamp => {
6074            let mut modifiers = modifiers.iter().fuse();
6075            let precision = match modifiers.next() {
6076                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6077                None => None,
6078            };
6079            if modifiers.next().is_some() {
6080                sql_bail!("type timestamp supports at most one type modifier");
6081            }
6082            Ok(SqlScalarType::Timestamp { precision })
6083        }
6084        CatalogType::TimestampTz => {
6085            let mut modifiers = modifiers.iter().fuse();
6086            let precision = match modifiers.next() {
6087                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6088                None => None,
6089            };
6090            if modifiers.next().is_some() {
6091                sql_bail!("type timestamp with time zone supports at most one type modifier");
6092            }
6093            Ok(SqlScalarType::TimestampTz { precision })
6094        }
6095        t => {
6096            if !modifiers.is_empty() {
6097                sql_bail!(
6098                    "{} does not support type modifiers",
6099                    catalog.resolve_full_name(entry.name()).to_string()
6100                );
6101            }
6102            match t {
6103                CatalogType::Array {
6104                    element_reference: element_id,
6105                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6106                    catalog,
6107                    *element_id,
6108                    modifiers,
6109                )?))),
6110                CatalogType::List {
6111                    element_reference: element_id,
6112                    element_modifiers,
6113                } => Ok(SqlScalarType::List {
6114                    element_type: Box::new(scalar_type_from_catalog(
6115                        catalog,
6116                        *element_id,
6117                        element_modifiers,
6118                    )?),
6119                    custom_id: Some(id),
6120                }),
6121                CatalogType::Map {
6122                    key_reference: _,
6123                    key_modifiers: _,
6124                    value_reference: value_id,
6125                    value_modifiers,
6126                } => Ok(SqlScalarType::Map {
6127                    value_type: Box::new(scalar_type_from_catalog(
6128                        catalog,
6129                        *value_id,
6130                        value_modifiers,
6131                    )?),
6132                    custom_id: Some(id),
6133                }),
6134                CatalogType::Range {
6135                    element_reference: element_id,
6136                } => Ok(SqlScalarType::Range {
6137                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6138                }),
6139                CatalogType::Record { fields } => {
6140                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6141                        .iter()
6142                        .map(|f| {
6143                            let scalar_type = scalar_type_from_catalog(
6144                                catalog,
6145                                f.type_reference,
6146                                &f.type_modifiers,
6147                            )?;
6148                            Ok((
6149                                f.name.clone(),
6150                                SqlColumnType {
6151                                    scalar_type,
6152                                    nullable: true,
6153                                },
6154                            ))
6155                        })
6156                        .collect::<Result<Box<_>, PlanError>>()?;
6157                    Ok(SqlScalarType::Record {
6158                        fields: scalars,
6159                        custom_id: Some(id),
6160                    })
6161                }
6162                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6163                CatalogType::Bool => Ok(SqlScalarType::Bool),
6164                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6165                CatalogType::Date => Ok(SqlScalarType::Date),
6166                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6167                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6168                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6169                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6170                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6171                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6172                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6173                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6174                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6175                CatalogType::Interval => Ok(SqlScalarType::Interval),
6176                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6177                CatalogType::Oid => Ok(SqlScalarType::Oid),
6178                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6179                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6180                CatalogType::Pseudo => {
6181                    sql_bail!(
6182                        "cannot reference pseudo type {}",
6183                        catalog.resolve_full_name(entry.name()).to_string()
6184                    )
6185                }
6186                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6187                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6188                CatalogType::RegType => Ok(SqlScalarType::RegType),
6189                CatalogType::String => Ok(SqlScalarType::String),
6190                CatalogType::Time => Ok(SqlScalarType::Time),
6191                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6192                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6193                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6194                CatalogType::Numeric => unreachable!("handled above"),
6195                CatalogType::Char => unreachable!("handled above"),
6196                CatalogType::VarChar => unreachable!("handled above"),
6197                CatalogType::Timestamp => unreachable!("handled above"),
6198                CatalogType::TimestampTz => unreachable!("handled above"),
6199            }
6200        }
6201    }
6202}
6203
6204/// This is used to collect aggregates and table functions from within an `Expr`.
6205/// See the explanation of aggregate handling at the top of the file for more details.
6206struct AggregateTableFuncVisitor<'a> {
6207    scx: &'a StatementContext<'a>,
6208    aggs: Vec<Function<Aug>>,
6209    within_aggregate: bool,
6210    tables: BTreeMap<Function<Aug>, String>,
6211    table_disallowed_context: Vec<&'static str>,
6212    in_select_item: bool,
6213    id_gen: IdGen,
6214    err: Option<PlanError>,
6215}
6216
6217impl<'a> AggregateTableFuncVisitor<'a> {
6218    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6219        AggregateTableFuncVisitor {
6220            scx,
6221            aggs: Vec::new(),
6222            within_aggregate: false,
6223            tables: BTreeMap::new(),
6224            table_disallowed_context: Vec::new(),
6225            in_select_item: false,
6226            id_gen: Default::default(),
6227            err: None,
6228        }
6229    }
6230
6231    fn into_result(
6232        self,
6233    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6234        match self.err {
6235            Some(err) => Err(err),
6236            None => {
6237                // Dedup while preserving the order. We don't care what the order is, but it
6238                // has to be reproducible so that EXPLAIN PLAN tests work.
6239                let mut seen = BTreeSet::new();
6240                let aggs = self
6241                    .aggs
6242                    .into_iter()
6243                    .filter(move |agg| seen.insert(agg.clone()))
6244                    .collect();
6245                Ok((aggs, self.tables))
6246            }
6247        }
6248    }
6249}
6250
6251impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6252    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6253        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6254            Ok(i) => i,
6255            // Catching missing functions later in planning improves error messages.
6256            Err(_) => return,
6257        };
6258
6259        match item.func() {
6260            // We don't want to collect window aggregations, because these will be handled not by
6261            // plan_aggregate, but by plan_function.
6262            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6263                if self.within_aggregate {
6264                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6265                    return;
6266                }
6267                self.aggs.push(func.clone());
6268                let Function {
6269                    name: _,
6270                    args,
6271                    filter,
6272                    over: _,
6273                    distinct: _,
6274                } = func;
6275                if let Some(filter) = filter {
6276                    self.visit_expr_mut(filter);
6277                }
6278                let old_within_aggregate = self.within_aggregate;
6279                self.within_aggregate = true;
6280                self.table_disallowed_context
6281                    .push("aggregate function calls");
6282
6283                self.visit_function_args_mut(args);
6284
6285                self.within_aggregate = old_within_aggregate;
6286                self.table_disallowed_context.pop();
6287            }
6288            Ok(Func::Table { .. }) => {
6289                self.table_disallowed_context.push("other table functions");
6290                visit_mut::visit_function_mut(self, func);
6291                self.table_disallowed_context.pop();
6292            }
6293            _ => visit_mut::visit_function_mut(self, func),
6294        }
6295    }
6296
6297    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6298        // Don't go into subqueries.
6299    }
6300
6301    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6302        let (disallowed_context, func) = match expr {
6303            Expr::Case { .. } => (Some("CASE"), None),
6304            Expr::HomogenizingFunction {
6305                function: HomogenizingFunction::Coalesce,
6306                ..
6307            } => (Some("COALESCE"), None),
6308            Expr::Function(func) if self.in_select_item => {
6309                // If we're in a SELECT list, replace table functions with a uuid identifier
6310                // and save the table func so it can be planned elsewhere.
6311                let mut table_func = None;
6312                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6313                    if let Ok(Func::Table { .. }) = item.func() {
6314                        if let Some(context) = self.table_disallowed_context.last() {
6315                            self.err = Some(sql_err!(
6316                                "table functions are not allowed in {} (function {})",
6317                                context,
6318                                func.name
6319                            ));
6320                            return;
6321                        }
6322                        table_func = Some(func.clone());
6323                    }
6324                }
6325                // Since we will descend into the table func below, don't add its own disallow
6326                // context here, instead use visit_function to set that.
6327                (None, table_func)
6328            }
6329            _ => (None, None),
6330        };
6331        if let Some(func) = func {
6332            // Since we are trading out expr, we need to visit the table func here.
6333            visit_mut::visit_expr_mut(self, expr);
6334            // Don't attempt to replace table functions with unsupported syntax.
6335            if let Function {
6336                name: _,
6337                args: _,
6338                filter: None,
6339                over: None,
6340                distinct: false,
6341            } = &func
6342            {
6343                // Identical table functions can be de-duplicated.
6344                let unique_id = self.id_gen.allocate_id();
6345                let id = self
6346                    .tables
6347                    .entry(func)
6348                    .or_insert_with(|| format!("table_func_{unique_id}"));
6349                // We know this is okay because id is is 11 characters + <=20 characters, which is
6350                // less than our max length.
6351                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6352            }
6353        }
6354        if let Some(context) = disallowed_context {
6355            self.table_disallowed_context.push(context);
6356        }
6357
6358        visit_mut::visit_expr_mut(self, expr);
6359
6360        if disallowed_context.is_some() {
6361            self.table_disallowed_context.pop();
6362        }
6363    }
6364
6365    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6366        let old = self.in_select_item;
6367        self.in_select_item = true;
6368        visit_mut::visit_select_item_mut(self, si);
6369        self.in_select_item = old;
6370    }
6371}
6372
6373#[derive(Default)]
6374struct WindowFuncCollector {
6375    window_funcs: Vec<Expr<Aug>>,
6376}
6377
6378impl WindowFuncCollector {
6379    fn into_result(self) -> Vec<Expr<Aug>> {
6380        // Dedup while preserving the order.
6381        let mut seen = BTreeSet::new();
6382        let window_funcs_dedupped = self
6383            .window_funcs
6384            .into_iter()
6385            .filter(move |expr| seen.insert(expr.clone()))
6386            // Reverse the order, so that in case of a nested window function call, the
6387            // inner one is evaluated first.
6388            .rev()
6389            .collect();
6390        window_funcs_dedupped
6391    }
6392}
6393
6394impl Visit<'_, Aug> for WindowFuncCollector {
6395    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6396        match expr {
6397            Expr::Function(func) => {
6398                if func.over.is_some() {
6399                    self.window_funcs.push(expr.clone());
6400                }
6401            }
6402            _ => (),
6403        }
6404        visit::visit_expr(self, expr);
6405    }
6406
6407    fn visit_query(&mut self, _query: &Query<Aug>) {
6408        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6409    }
6410}
6411
6412/// Specifies how long a query will live.
6413#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6414pub enum QueryLifetime {
6415    /// The query's (or the expression's) result will be computed at one point in time.
6416    OneShot,
6417    /// The query (or expression) is used in a dataflow that maintains an index.
6418    Index,
6419    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6420    MaterializedView,
6421    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6422    Subscribe,
6423    /// The query (or expression) is part of a (non-materialized) view.
6424    View,
6425    /// The expression is part of a source definition.
6426    Source,
6427}
6428
6429impl QueryLifetime {
6430    /// (This used to impact whether the query is allowed to reason about the time at which it is
6431    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6432    /// mechanism, see `ExprPrepStyle`.)
6433    pub fn is_one_shot(&self) -> bool {
6434        let result = match self {
6435            QueryLifetime::OneShot => true,
6436            QueryLifetime::Index => false,
6437            QueryLifetime::MaterializedView => false,
6438            QueryLifetime::Subscribe => false,
6439            QueryLifetime::View => false,
6440            QueryLifetime::Source => false,
6441        };
6442        assert_eq!(!result, self.is_maintained());
6443        result
6444    }
6445
6446    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6447    /// turned into a `TopK`.
6448    pub fn is_maintained(&self) -> bool {
6449        match self {
6450            QueryLifetime::OneShot => false,
6451            QueryLifetime::Index => true,
6452            QueryLifetime::MaterializedView => true,
6453            QueryLifetime::Subscribe => true,
6454            QueryLifetime::View => true,
6455            QueryLifetime::Source => true,
6456        }
6457    }
6458
6459    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6460    pub fn allow_show(&self) -> bool {
6461        match self {
6462            QueryLifetime::OneShot => true,
6463            QueryLifetime::Index => false,
6464            QueryLifetime::MaterializedView => false,
6465            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6466            QueryLifetime::View => false,
6467            QueryLifetime::Source => false,
6468        }
6469    }
6470}
6471
6472/// Description of a CTE sufficient for query planning.
6473#[derive(Debug, Clone)]
6474pub struct CteDesc {
6475    pub name: String,
6476    pub desc: RelationDesc,
6477}
6478
6479/// The state required when planning a `Query`.
6480#[derive(Debug, Clone)]
6481pub struct QueryContext<'a> {
6482    /// The context for the containing `Statement`.
6483    pub scx: &'a StatementContext<'a>,
6484    /// The lifetime that the planned query will have.
6485    pub lifetime: QueryLifetime,
6486    /// The scopes of the outer relation expression.
6487    pub outer_scopes: Vec<Scope>,
6488    /// The type of the outer relation expressions.
6489    pub outer_relation_types: Vec<SqlRelationType>,
6490    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6491    pub ctes: BTreeMap<LocalId, CteDesc>,
6492    /// A name manager, for interning column names that will be stored in HIR and MIR.
6493    pub name_manager: Rc<RefCell<NameManager>>,
6494    pub recursion_guard: RecursionGuard,
6495}
6496
6497impl CheckedRecursion for QueryContext<'_> {
6498    fn recursion_guard(&self) -> &RecursionGuard {
6499        &self.recursion_guard
6500    }
6501}
6502
6503impl<'a> QueryContext<'a> {
6504    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6505        QueryContext {
6506            scx,
6507            lifetime,
6508            outer_scopes: vec![],
6509            outer_relation_types: vec![],
6510            ctes: BTreeMap::new(),
6511            name_manager: Rc::new(RefCell::new(NameManager::new())),
6512            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6513        }
6514    }
6515
6516    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6517        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6518    }
6519
6520    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6521    /// `self`.
6522    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6523        let ctes = self.ctes.clone();
6524        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6525        let outer_relation_types = iter::once(relation_type)
6526            .chain(self.outer_relation_types.clone())
6527            .collect();
6528        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6529        let name_manager = Rc::clone(&self.name_manager);
6530
6531        QueryContext {
6532            scx: self.scx,
6533            lifetime: self.lifetime,
6534            outer_scopes,
6535            outer_relation_types,
6536            ctes,
6537            name_manager,
6538            recursion_guard: self.recursion_guard.clone(),
6539        }
6540    }
6541
6542    /// Derives a `QueryContext` for a scope that contains no columns.
6543    fn empty_derived_context(&self) -> QueryContext<'a> {
6544        let scope = Scope::empty();
6545        let ty = SqlRelationType::empty();
6546        self.derived_context(scope, ty)
6547    }
6548
6549    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6550    /// CTE.
6551    pub fn resolve_table_name(
6552        &self,
6553        object: ResolvedItemName,
6554    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6555        match object {
6556            ResolvedItemName::Item {
6557                id,
6558                full_name,
6559                version,
6560                ..
6561            } => {
6562                let name = full_name.into();
6563                let item = self.scx.get_item(&id).at_version(version);
6564                let desc = item
6565                    .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6566                    .clone();
6567                let expr = HirRelationExpr::Get {
6568                    id: Id::Global(item.global_id()),
6569                    typ: desc.typ().clone(),
6570                };
6571
6572                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6573
6574                Ok((expr, scope))
6575            }
6576            ResolvedItemName::Cte { id, name } => {
6577                let name = name.into();
6578                let cte = self.ctes.get(&id).unwrap();
6579                let expr = HirRelationExpr::Get {
6580                    id: Id::Local(id),
6581                    typ: cte.desc.typ().clone(),
6582                };
6583
6584                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6585
6586                Ok((expr, scope))
6587            }
6588            ResolvedItemName::ContinualTask { id, name } => {
6589                let cte = self.ctes.get(&id).unwrap();
6590                let expr = HirRelationExpr::Get {
6591                    id: Id::Local(id),
6592                    typ: cte.desc.typ().clone(),
6593                };
6594
6595                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6596
6597                Ok((expr, scope))
6598            }
6599            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6600        }
6601    }
6602
6603    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6604    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6605    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6606        self.scx.humanize_scalar_type(typ, postgres_compat)
6607    }
6608}
6609
6610/// A bundle of unrelated things that we need for planning `Expr`s.
6611#[derive(Debug, Clone)]
6612pub struct ExprContext<'a> {
6613    pub qcx: &'a QueryContext<'a>,
6614    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6615    pub name: &'a str,
6616    /// The context for the `Query` that contains this `Expr`.
6617    /// The current scope.
6618    pub scope: &'a Scope,
6619    /// The type of the current relation expression upon which this scalar
6620    /// expression will be evaluated.
6621    pub relation_type: &'a SqlRelationType,
6622    /// Are aggregate functions allowed in this context
6623    pub allow_aggregates: bool,
6624    /// Are subqueries allowed in this context
6625    pub allow_subqueries: bool,
6626    /// Are parameters allowed in this context.
6627    pub allow_parameters: bool,
6628    /// Are window functions allowed in this context
6629    pub allow_windows: bool,
6630}
6631
6632impl CheckedRecursion for ExprContext<'_> {
6633    fn recursion_guard(&self) -> &RecursionGuard {
6634        &self.qcx.recursion_guard
6635    }
6636}
6637
6638impl<'a> ExprContext<'a> {
6639    pub fn catalog(&self) -> &dyn SessionCatalog {
6640        self.qcx.scx.catalog
6641    }
6642
6643    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6644        let mut ecx = self.clone();
6645        ecx.name = name;
6646        ecx
6647    }
6648
6649    pub fn column_type<E>(&self, expr: &E) -> E::Type
6650    where
6651        E: AbstractExpr,
6652    {
6653        expr.typ(
6654            &self.qcx.outer_relation_types,
6655            self.relation_type,
6656            &self.qcx.scx.param_types.borrow(),
6657        )
6658    }
6659
6660    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6661    where
6662        E: AbstractExpr,
6663    {
6664        self.column_type(expr).scalar_type()
6665    }
6666
6667    fn derived_query_context(&self) -> QueryContext<'_> {
6668        let mut scope = self.scope.clone();
6669        scope.lateral_barrier = true;
6670        self.qcx.derived_context(scope, self.relation_type.clone())
6671    }
6672
6673    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6674        self.qcx.scx.require_feature_flag(flag)
6675    }
6676
6677    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6678        &self.qcx.scx.param_types
6679    }
6680
6681    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6682    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6683    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6684        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6685    }
6686
6687    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6688        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6689    }
6690}
6691
6692/// Manages column names, doing lightweight string internment.
6693///
6694/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6695/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6696/// to ensure maximal sharing.
6697#[derive(Debug, Clone)]
6698pub struct NameManager(BTreeSet<Arc<str>>);
6699
6700impl NameManager {
6701    /// Creates a new `NameManager`, with no interned names
6702    pub fn new() -> Self {
6703        Self(BTreeSet::new())
6704    }
6705
6706    /// Interns a string, returning a reference-counted pointer to the interned
6707    /// string.
6708    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6709        let s = s.as_ref();
6710        if let Some(interned) = self.0.get(s) {
6711            Arc::clone(interned)
6712        } else {
6713            let interned: Arc<str> = Arc::from(s);
6714            self.0.insert(Arc::clone(&interned));
6715            interned
6716        }
6717    }
6718
6719    /// Interns a string representing a reference to a `ScopeItem`, returning a
6720    /// reference-counted pointer to the interned string.
6721    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6722        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6723        //
6724        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6725        // name information. Note that as of 2025-04-09, we don't support column
6726        // renames.
6727        //
6728        // A few bad alternatives:
6729        //
6730        // (1) Store it but don't write it down. This fails because the expression
6731        //     cache will erase our names on restart.
6732        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6733        //     get the right names. But the world now and the world when we made
6734        //     those objects may be different.
6735        // (3) Just don't write down the table name. Nothing fails... for now.
6736
6737        self.intern(item.column_name.as_str())
6738    }
6739}
6740
6741#[cfg(test)]
6742mod test {
6743    use super::*;
6744
6745    /// Ensure that `NameManager`'s string interning works as expected.
6746    ///
6747    /// In particular, structurally but not referentially identical strings should
6748    /// be interned to the same `Arc`ed pointer.
6749    #[mz_ore::test]
6750    pub fn test_name_manager_string_interning() {
6751        let mut nm = NameManager::new();
6752
6753        let orig_hi = "hi";
6754        let hi = nm.intern(orig_hi);
6755        let hello = nm.intern("hello");
6756
6757        assert_ne!(hi.as_ptr(), hello.as_ptr());
6758
6759        // this static string is _likely_ the same as `orig_hi``
6760        let hi2 = nm.intern("hi");
6761        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6762
6763        // generate a "hi" string that doesn't get optimized to the same static string
6764        let s = format!(
6765            "{}{}",
6766            hi.chars().nth(0).unwrap(),
6767            hi2.chars().nth(1).unwrap()
6768        );
6769        // make sure that we're testing with a fresh string!
6770        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6771
6772        let hi3 = nm.intern(s);
6773        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6774    }
6775}