mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52    func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67    RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103    literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111    pub expr: E,
112    pub desc: RelationDesc,
113    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114    pub scope: Scope,
115}
116
117/// Plans a top-level query, returning the `HirRelationExpr` describing the query
118/// plan, the `RelationDesc` describing the shape of the result set, a
119/// `RowSetFinishing` describing post-processing that must occur before results
120/// are sent to the client, and the types of the parameters in the query, if any
121/// were present.
122///
123/// Note that the returned `RelationDesc` describes the expression after
124/// applying the returned `RowSetFinishing`.
125#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127    scx: &StatementContext,
128    mut query: Query<Aug>,
129    lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131    transform_ast::transform(scx, &mut query)?;
132    let mut qcx = QueryContext::root(scx, lifetime);
133    let PlannedQuery {
134        mut expr,
135        scope,
136        order_by,
137        limit,
138        offset,
139        project,
140        group_size_hints,
141    } = plan_query(&mut qcx, &query)?;
142
143    let mut finishing = RowSetFinishing {
144        limit,
145        offset,
146        project,
147        order_by,
148    };
149
150    // Attempt to push the finishing's ordering past its projection. This allows
151    // data to be projected down on the workers rather than the coordinator. It
152    // also improves the optimizer's demand analysis, as the optimizer can only
153    // reason about demand information in `expr` (i.e., it can't see
154    // `finishing.project`).
155    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157    if lifetime.is_maintained() {
158        expr.finish_maintained(&mut finishing, group_size_hints);
159    }
160
161    let typ = qcx.relation_type(&expr);
162    let typ = SqlRelationType::new(
163        finishing
164            .project
165            .iter()
166            .map(|i| typ.column_types[*i].clone())
167            .collect(),
168    );
169    let desc = RelationDesc::new(typ, scope.column_names());
170
171    Ok(PlannedRootQuery {
172        expr,
173        desc,
174        finishing,
175        scope,
176    })
177}
178
179/// TODO(ct2): Dedup this with [plan_root_query].
180#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182    qcx: &mut QueryContext,
183    mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185    transform_ast::transform(qcx.scx, &mut query)?;
186    let PlannedQuery {
187        mut expr,
188        scope,
189        order_by,
190        limit,
191        offset,
192        project,
193        group_size_hints,
194    } = plan_query(qcx, &query)?;
195
196    let mut finishing = RowSetFinishing {
197        limit,
198        offset,
199        project,
200        order_by,
201    };
202
203    // Attempt to push the finishing's ordering past its projection. This allows
204    // data to be projected down on the workers rather than the coordinator. It
205    // also improves the optimizer's demand analysis, as the optimizer can only
206    // reason about demand information in `expr` (i.e., it can't see
207    // `finishing.project`).
208    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210    expr.finish_maintained(&mut finishing, group_size_hints);
211
212    let typ = qcx.relation_type(&expr);
213    let typ = SqlRelationType::new(
214        finishing
215            .project
216            .iter()
217            .map(|i| typ.column_types[*i].clone())
218            .collect(),
219    );
220    let desc = RelationDesc::new(typ, scope.column_names());
221
222    Ok(PlannedRootQuery {
223        expr,
224        desc,
225        finishing,
226        scope,
227    })
228}
229
230/// Attempts to push a projection through an order by.
231///
232/// The returned bool indicates whether the pushdown was successful or not.
233/// Successful pushdown requires that all the columns referenced in `order_by`
234/// are included in `project`.
235///
236/// When successful, `expr` is wrapped in a projection node, `order_by` is
237/// rewritten to account for the pushed-down projection, and `project` is
238/// replaced with the trivial projection. When unsuccessful, no changes are made
239/// to any of the inputs.
240fn try_push_projection_order_by(
241    expr: &mut HirRelationExpr,
242    project: &mut Vec<usize>,
243    order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245    let mut unproject = vec![None; expr.arity()];
246    for (out_i, in_i) in project.iter().copied().enumerate() {
247        unproject[in_i] = Some(out_i);
248    }
249    if order_by
250        .iter()
251        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252    {
253        let trivial_project = (0..project.len()).collect();
254        *expr = expr.take().project(mem::replace(project, trivial_project));
255        for ob in order_by {
256            ob.column = unproject[ob.column].unwrap();
257        }
258        true
259    } else {
260        false
261    }
262}
263
264pub fn plan_insert_query(
265    scx: &StatementContext,
266    table_name: ResolvedItemName,
267    columns: Vec<Ident>,
268    source: InsertSource<Aug>,
269    returning: Vec<SelectItem<Aug>>,
270) -> Result<
271    (
272        CatalogItemId,
273        HirRelationExpr,
274        PlannedRootQuery<Vec<HirScalarExpr>>,
275    ),
276    PlanError,
277> {
278    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279    let table = scx.get_item_by_resolved_name(&table_name)?;
280
281    // Validate the target of the insert.
282    if table.item_type() != CatalogItemType::Table {
283        sql_bail!(
284            "cannot insert into {} '{}'",
285            table.item_type(),
286            table_name.full_name_str()
287        );
288    }
289    let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
290    let mut defaults = table
291        .writable_table_details()
292        .ok_or_else(|| {
293            sql_err!(
294                "cannot insert into non-writeable table '{}'",
295                table_name.full_name_str()
296            )
297        })?
298        .to_vec();
299
300    for default in &mut defaults {
301        transform_ast::transform(scx, default)?;
302    }
303
304    if table.id().is_system() {
305        sql_bail!(
306            "cannot insert into system table '{}'",
307            table_name.full_name_str()
308        );
309    }
310
311    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313    // Validate target column order.
314    let mut source_types = Vec::with_capacity(columns.len());
315    let mut ordering = Vec::with_capacity(columns.len());
316
317    if columns.is_empty() {
318        // Columns in source query must be in order. Let's guess the full shape and truncate to the
319        // right size later after planning the source query
320        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321        ordering.extend(0..desc.arity());
322    } else {
323        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324            .iter()
325            .enumerate()
326            .map(|(idx, (name, typ))| (name, (idx, typ)))
327            .collect();
328
329        for c in &columns {
330            if let Some((idx, typ)) = column_by_name.get(c) {
331                ordering.push(*idx);
332                source_types.push(&typ.scalar_type);
333            } else {
334                sql_bail!(
335                    "column {} of relation {} does not exist",
336                    c.quoted(),
337                    table_name.full_name_str().quoted()
338                );
339            }
340        }
341        if let Some(dup) = columns.iter().duplicates().next() {
342            sql_bail!("column {} specified more than once", dup.quoted());
343        }
344    };
345
346    // Plan the source.
347    let expr = match source {
348        InsertSource::Query(mut query) => {
349            transform_ast::transform(scx, &mut query)?;
350
351            match query {
352                // Special-case simple VALUES clauses as PostgreSQL does.
353                Query {
354                    body: SetExpr::Values(Values(values)),
355                    ctes,
356                    order_by,
357                    limit: None,
358                    offset: None,
359                } if ctes.is_empty() && order_by.is_empty() => {
360                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361                    plan_values_insert(&qcx, &names, &source_types, &values)?
362                }
363                _ => {
364                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365                    expr
366                }
367            }
368        }
369        InsertSource::DefaultValues => {
370            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371        }
372    };
373
374    let expr_arity = expr.arity();
375
376    // Validate that the arity of the source query is at most the size of declared columns or the
377    // size of the table if none are declared
378    let max_columns = if columns.is_empty() {
379        desc.arity()
380    } else {
381        columns.len()
382    };
383    if expr_arity > max_columns {
384        sql_bail!("INSERT has more expressions than target columns");
385    }
386    // But it should never have less than the declared columns (or zero)
387    if expr_arity < columns.len() {
388        sql_bail!("INSERT has more target columns than expressions");
389    }
390
391    // Trim now that we know for sure the correct arity of the source query
392    source_types.truncate(expr_arity);
393    ordering.truncate(expr_arity);
394
395    // Ensure the types of the source query match the types of the target table,
396    // installing assignment casts where necessary and possible.
397    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398        sql_err!(
399            "column {} is of type {} but expression is of type {}",
400            desc.get_name(ordering[e.column]).quoted(),
401            qcx.humanize_scalar_type(&e.target_type, false),
402            qcx.humanize_scalar_type(&e.source_type, false),
403        )
404    })?;
405
406    // Fill in any omitted columns and rearrange into correct order
407    let mut map_exprs = vec![];
408    let mut project_key = Vec::with_capacity(desc.arity());
409
410    // Maps from table column index to position in the source query
411    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414    for (col_idx, (col_typ, default)) in column_details {
415        if let Some(src_idx) = col_to_source.get(&col_idx) {
416            project_key.push(*src_idx);
417        } else {
418            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419            project_key.push(expr_arity + map_exprs.len());
420            map_exprs.push(hir);
421        }
422    }
423
424    let returning = {
425        let (scope, typ) = if let ResolvedItemName::Item {
426            full_name,
427            version: _,
428            ..
429        } = table_name
430        {
431            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432            let typ = desc.typ().clone();
433            (scope, typ)
434        } else {
435            (Scope::empty(), SqlRelationType::empty())
436        };
437        let ecx = &ExprContext {
438            qcx: &qcx,
439            name: "RETURNING clause",
440            scope: &scope,
441            relation_type: &typ,
442            allow_aggregates: false,
443            allow_subqueries: false,
444            allow_parameters: true,
445            allow_windows: false,
446        };
447        let table_func_names = BTreeMap::new();
448        let mut output_columns = vec![];
449        let mut new_exprs = vec![];
450        let mut new_type = SqlRelationType::empty();
451        for mut si in returning {
452            transform_ast::transform(scx, &mut si)?;
453            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454                let expr = match &select_item {
455                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457                };
458                output_columns.push(column_name);
459                let typ = ecx.column_type(&expr);
460                new_type.column_types.push(typ);
461                new_exprs.push(expr);
462            }
463        }
464        let desc = RelationDesc::new(new_type, output_columns);
465        let desc_arity = desc.arity();
466        PlannedRootQuery {
467            expr: new_exprs,
468            desc,
469            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470            scope,
471        }
472    };
473
474    Ok((
475        table.id(),
476        expr.map(map_exprs).project(project_key),
477        returning,
478    ))
479}
480
481/// Determines the mapping between some external data and a Materialize relation.
482///
483/// Returns the following:
484/// * [`CatalogItemId`] for the destination table.
485/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
486/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
487///   since we now return a [`MapFilterProject`].
488/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
489///   destination table.
490///
491pub fn plan_copy_item(
492    scx: &StatementContext,
493    item_name: ResolvedItemName,
494    columns: Vec<Ident>,
495) -> Result<
496    (
497        CatalogItemId,
498        RelationDesc,
499        Vec<ColumnIndex>,
500        Option<MapFilterProject>,
501    ),
502    PlanError,
503> {
504    let item = scx.get_item_by_resolved_name(&item_name)?;
505    let fullname = scx.catalog.resolve_full_name(item.name());
506    let table_desc = item.desc(&fullname)?.into_owned();
507    let mut ordering = Vec::with_capacity(columns.len());
508
509    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
510    // should be simplified. The reason they are currently separate code paths is so we can roll
511    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
512
513    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
514    // FROM SOURCE ...`), then we generate an MFP.
515    //
516    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
517    // so it's not always guaranteed that our `item` is a table.
518    let mfp = if let Some(table_defaults) = item.writable_table_details() {
519        let mut table_defaults = table_defaults.to_vec();
520
521        for default in &mut table_defaults {
522            transform_ast::transform(scx, default)?;
523        }
524
525        // Fill in any omitted columns and rearrange into correct order
526        let source_column_names: Vec<_> = columns
527            .iter()
528            .cloned()
529            .map(normalize::column_name)
530            .collect();
531
532        let mut default_exprs = Vec::new();
533        let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535        // For each column in the destination table, either project it from the source data, or provide
536        // an expression to fill in a default value.
537        let column_details = table_desc.iter().zip_eq(table_defaults);
538        for ((col_name, col_type), col_default) in column_details {
539            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540            if let Some(src_idx) = maybe_src_idx {
541                project_keys.push(src_idx);
542            } else {
543                // If one a column from the table does not exist in the source data, then a default
544                // value will get appended to the end of the input Row from the source data.
545                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546                let mir = hir.lower_uncorrelated()?;
547                project_keys.push(source_column_names.len() + default_exprs.len());
548                default_exprs.push(mir);
549            }
550        }
551
552        let mfp = MapFilterProject::new(source_column_names.len())
553            .map(default_exprs)
554            .project(project_keys);
555        Some(mfp)
556    } else {
557        None
558    };
559
560    // Create a mapping from input data to the table we're copying into.
561    let source_desc = if columns.is_empty() {
562        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563        ordering.extend(indexes);
564
565        // The source data should be in the same order as the table.
566        table_desc
567    } else {
568        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
570            .iter_all()
571            .map(|(idx, name, typ)| (name, (*idx, typ)))
572            .collect();
573
574        let mut names = Vec::with_capacity(columns.len());
575        let mut source_types = Vec::with_capacity(columns.len());
576
577        for c in &columns {
578            if let Some((idx, typ)) = column_by_name.get(c) {
579                ordering.push(*idx);
580                source_types.push((*typ).clone());
581                names.push(c.clone());
582            } else {
583                sql_bail!(
584                    "column {} of relation {} does not exist",
585                    c.quoted(),
586                    item_name.full_name_str().quoted()
587                );
588            }
589        }
590        if let Some(dup) = columns.iter().duplicates().next() {
591            sql_bail!("column {} specified more than once", dup.quoted());
592        }
593
594        // The source data is a different shape than the destination table.
595        RelationDesc::new(SqlRelationType::new(source_types), names)
596    };
597
598    Ok((item.id(), source_desc, ordering, mfp))
599}
600
601/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
602///
603/// TODO(cf3): Merge this method with [`plan_copy_item`].
604pub fn plan_copy_from(
605    scx: &StatementContext,
606    table_name: ResolvedItemName,
607    columns: Vec<Ident>,
608) -> Result<
609    (
610        CatalogItemId,
611        RelationDesc,
612        Vec<ColumnIndex>,
613        Option<MapFilterProject>,
614    ),
615    PlanError,
616> {
617    let table = scx.get_item_by_resolved_name(&table_name)?;
618
619    // Validate the target of the insert.
620    if table.item_type() != CatalogItemType::Table {
621        sql_bail!(
622            "cannot insert into {} '{}'",
623            table.item_type(),
624            table_name.full_name_str()
625        );
626    }
627
628    let _ = table.writable_table_details().ok_or_else(|| {
629        sql_err!(
630            "cannot insert into non-writeable table '{}'",
631            table_name.full_name_str()
632        )
633    })?;
634
635    if table.id().is_system() {
636        sql_bail!(
637            "cannot insert into system table '{}'",
638            table_name.full_name_str()
639        );
640    }
641    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643    Ok((id, desc, ordering, mfp))
644}
645
646/// Builds a plan that adds the default values for the missing columns and re-orders
647/// the datums in the given rows to match the order in the target table.
648pub fn plan_copy_from_rows(
649    pcx: &PlanContext,
650    catalog: &dyn SessionCatalog,
651    target_id: CatalogItemId,
652    target_name: String,
653    columns: Vec<ColumnIndex>,
654    rows: Vec<mz_repr::Row>,
655) -> Result<HirRelationExpr, PlanError> {
656    let scx = StatementContext::new(Some(pcx), catalog);
657
658    // Always copy at the latest version of the table.
659    let table = catalog
660        .try_get_item(&target_id)
661        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
662        .at_version(RelationVersionSelector::Latest);
663    let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
664
665    let mut defaults = table
666        .writable_table_details()
667        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
668        .to_vec();
669
670    for default in &mut defaults {
671        transform_ast::transform(&scx, default)?;
672    }
673
674    let column_types = columns
675        .iter()
676        .map(|x| desc.get_type(x).clone())
677        .map(|mut x| {
678            // Null constraint is enforced later, when inserting the row in the table.
679            // Without this, an assert is hit during lowering.
680            x.nullable = true;
681            x
682        })
683        .collect();
684    let typ = SqlRelationType::new(column_types);
685    let expr = HirRelationExpr::Constant {
686        rows,
687        typ: typ.clone(),
688    };
689
690    // Exit early with just the raw constant if we know that all columns are present
691    // and in the correct order. This lets us bypass expensive downstream optimizations
692    // more easily, as at every stage we know this expression is nothing more than
693    // a constant (as opposed to e.g. a constant with with an identity map and identity
694    // projection).
695    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
696    if columns == default {
697        return Ok(expr);
698    }
699
700    // Fill in any omitted columns and rearrange into correct order
701    let mut map_exprs = vec![];
702    let mut project_key = Vec::with_capacity(desc.arity());
703
704    // Maps from table column index to position in the source query
705    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
706
707    let column_details = desc.iter_all().zip_eq(defaults);
708    for ((col_idx, _col_name, col_typ), default) in column_details {
709        if let Some(src_idx) = col_to_source.get(&col_idx) {
710            project_key.push(*src_idx);
711        } else {
712            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
713            project_key.push(typ.arity() + map_exprs.len());
714            map_exprs.push(hir);
715        }
716    }
717
718    Ok(expr.map(map_exprs).project(project_key))
719}
720
721/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
722pub struct ReadThenWritePlan {
723    pub id: CatalogItemId,
724    /// Read portion of query.
725    ///
726    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
727    /// retractions.
728    pub selection: HirRelationExpr,
729    /// Map from column index to SET expression. Empty for DELETE statements.
730    pub assignments: BTreeMap<usize, HirScalarExpr>,
731    pub finishing: RowSetFinishing,
732}
733
734pub fn plan_delete_query(
735    scx: &StatementContext,
736    mut delete_stmt: DeleteStatement<Aug>,
737) -> Result<ReadThenWritePlan, PlanError> {
738    transform_ast::transform(scx, &mut delete_stmt)?;
739
740    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
741    plan_mutation_query_inner(
742        qcx,
743        delete_stmt.table_name,
744        delete_stmt.alias,
745        delete_stmt.using,
746        vec![],
747        delete_stmt.selection,
748    )
749}
750
751pub fn plan_update_query(
752    scx: &StatementContext,
753    mut update_stmt: UpdateStatement<Aug>,
754) -> Result<ReadThenWritePlan, PlanError> {
755    transform_ast::transform(scx, &mut update_stmt)?;
756
757    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
758
759    plan_mutation_query_inner(
760        qcx,
761        update_stmt.table_name,
762        update_stmt.alias,
763        vec![],
764        update_stmt.assignments,
765        update_stmt.selection,
766    )
767}
768
769pub fn plan_mutation_query_inner(
770    qcx: QueryContext,
771    table_name: ResolvedItemName,
772    alias: Option<TableAlias>,
773    using: Vec<TableWithJoins<Aug>>,
774    assignments: Vec<Assignment<Aug>>,
775    selection: Option<Expr<Aug>>,
776) -> Result<ReadThenWritePlan, PlanError> {
777    // Get ID and version of the relation desc.
778    let (id, version) = match table_name {
779        ResolvedItemName::Item { id, version, .. } => (id, version),
780        _ => sql_bail!("cannot mutate non-user table"),
781    };
782
783    // Perform checks on item with given ID.
784    let item = qcx.scx.get_item(&id).at_version(version);
785    if item.item_type() != CatalogItemType::Table {
786        sql_bail!(
787            "cannot mutate {} '{}'",
788            item.item_type(),
789            table_name.full_name_str()
790        );
791    }
792    let _ = item.writable_table_details().ok_or_else(|| {
793        sql_err!(
794            "cannot mutate non-writeable table '{}'",
795            table_name.full_name_str()
796        )
797    })?;
798    if id.is_system() {
799        sql_bail!(
800            "cannot mutate system table '{}'",
801            table_name.full_name_str()
802        );
803    }
804
805    // Derive structs for operation from validated table
806    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
807    let scope = plan_table_alias(scope, alias.as_ref())?;
808    let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
809    let relation_type = qcx.relation_type(&get);
810
811    if using.is_empty() {
812        if let Some(expr) = selection {
813            let ecx = &ExprContext {
814                qcx: &qcx,
815                name: "WHERE clause",
816                scope: &scope,
817                relation_type: &relation_type,
818                allow_aggregates: false,
819                allow_subqueries: true,
820                allow_parameters: true,
821                allow_windows: false,
822            };
823            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
824            get = get.filter(vec![expr]);
825        }
826    } else {
827        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
828    }
829
830    let mut sets = BTreeMap::new();
831    for Assignment { id, value } in assignments {
832        // Get the index and type of the column.
833        let name = normalize::column_name(id);
834        match desc.get_by_name(&name) {
835            Some((idx, typ)) => {
836                let ecx = &ExprContext {
837                    qcx: &qcx,
838                    name: "SET clause",
839                    scope: &scope,
840                    relation_type: &relation_type,
841                    allow_aggregates: false,
842                    allow_subqueries: false,
843                    allow_parameters: true,
844                    allow_windows: false,
845                };
846                let expr = plan_expr(ecx, &value)?.cast_to(
847                    ecx,
848                    CastContext::Assignment,
849                    &typ.scalar_type,
850                )?;
851
852                if sets.insert(idx, expr).is_some() {
853                    sql_bail!("column {} set twice", name)
854                }
855            }
856            None => sql_bail!("unknown column {}", name),
857        };
858    }
859
860    let finishing = RowSetFinishing {
861        order_by: vec![],
862        limit: None,
863        offset: 0,
864        project: (0..desc.arity()).collect(),
865    };
866
867    Ok(ReadThenWritePlan {
868        id,
869        selection: get,
870        finishing,
871        assignments: sets,
872    })
873}
874
875// Adjust `get` to perform an existential subquery on `using` accounting for
876// `selection`.
877//
878// If `USING`, we essentially want to rewrite the query as a correlated
879// existential subquery, i.e.
880// ```
881// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
882// ```
883// However, we can't do that directly because of esoteric rules w/r/t `lateral`
884// subqueries.
885// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
886fn handle_mutation_using_clause(
887    qcx: &QueryContext,
888    selection: Option<Expr<Aug>>,
889    using: Vec<TableWithJoins<Aug>>,
890    get: HirRelationExpr,
891    outer_scope: Scope,
892) -> Result<HirRelationExpr, PlanError> {
893    // Plan `USING` as a cross-joined `FROM` without knowledge of the
894    // statement's `FROM` target. This prevents `lateral` subqueries from
895    // "seeing" the `FROM` target.
896    let (mut using_rel_expr, using_scope) =
897        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
898            let (left, left_scope) = l;
899            plan_join(
900                qcx,
901                left,
902                left_scope,
903                &Join {
904                    relation: TableFactor::NestedJoin {
905                        join: Box::new(twj),
906                        alias: None,
907                    },
908                    join_operator: JoinOperator::CrossJoin,
909                },
910            )
911        })?;
912
913    if let Some(expr) = selection {
914        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
915        // PG-like semantics e.g. expressing ambiguous column references. We put
916        // `USING...` first for no real reason, but making a different decision
917        // would require adjusting the column references on this relation
918        // differently.
919        let on = HirScalarExpr::literal_true();
920        let joined = using_rel_expr
921            .clone()
922            .join(get.clone(), on, JoinKind::Inner);
923        let joined_scope = using_scope.product(outer_scope)?;
924        let joined_relation_type = qcx.relation_type(&joined);
925
926        let ecx = &ExprContext {
927            qcx,
928            name: "WHERE clause",
929            scope: &joined_scope,
930            relation_type: &joined_relation_type,
931            allow_aggregates: false,
932            allow_subqueries: true,
933            allow_parameters: true,
934            allow_windows: false,
935        };
936
937        // Plan the filter expression on `FROM, USING...`.
938        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
939
940        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
941        // those to the right of `using_rel_expr`) to instead be correlated to
942        // the outer relation, i.e. `get`.
943        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
944        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
945        use mz_expr::visit::Visit;
946        expr.visit_mut_post(&mut |e| {
947            if let HirScalarExpr::Column(c, _name) = e {
948                if c.column >= using_rel_arity {
949                    c.level += 1;
950                    c.column -= using_rel_arity;
951                };
952            }
953        })?;
954
955        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
956        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
957        // relation.
958        using_rel_expr = using_rel_expr.filter(vec![expr]);
959    } else {
960        // Check that scopes are at compatible (i.e. do not double-reference
961        // same table), despite lack of selection
962        let _joined_scope = using_scope.product(outer_scope)?;
963    }
964    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
965    // whether any rows are returned, and not on the contents of those rows,
966    // the output list of the subquery is normally unimportant.
967    //
968    // This means we don't need to worry about projecting/mapping any
969    // additional expressions here.
970    //
971    // https://www.postgresql.org/docs/14/functions-subquery.html
972
973    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
974    Ok(get.filter(vec![using_rel_expr.exists()]))
975}
976
977#[derive(Debug)]
978pub(crate) struct CastRelationError {
979    pub(crate) column: usize,
980    pub(crate) source_type: SqlScalarType,
981    pub(crate) target_type: SqlScalarType,
982}
983
984/// Cast a relation from one type to another using the specified type of cast.
985///
986/// The length of `target_types` must match the arity of `expr`.
987pub(crate) fn cast_relation<'a, I>(
988    qcx: &QueryContext,
989    ccx: CastContext,
990    expr: HirRelationExpr,
991    target_types: I,
992) -> Result<HirRelationExpr, CastRelationError>
993where
994    I: IntoIterator<Item = &'a SqlScalarType>,
995{
996    let ecx = &ExprContext {
997        qcx,
998        name: "values",
999        scope: &Scope::empty(),
1000        relation_type: &qcx.relation_type(&expr),
1001        allow_aggregates: false,
1002        allow_subqueries: true,
1003        allow_parameters: true,
1004        allow_windows: false,
1005    };
1006    let mut map_exprs = vec![];
1007    let mut project_key = vec![];
1008    for (i, target_typ) in target_types.into_iter().enumerate() {
1009        let expr = HirScalarExpr::column(i);
1010        // We plan every cast and check the evaluated expressions rather than
1011        // checking the types directly because of some complex casting rules
1012        // between types not expressed in `SqlScalarType` equality.
1013        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1014            Ok(cast_expr) => {
1015                if expr == cast_expr {
1016                    // Cast between types was unnecessary
1017                    project_key.push(i);
1018                } else {
1019                    // Cast between types required
1020                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1021                    map_exprs.push(cast_expr);
1022                }
1023            }
1024            Err(_) => {
1025                return Err(CastRelationError {
1026                    column: i,
1027                    source_type: ecx.scalar_type(&expr),
1028                    target_type: target_typ.clone(),
1029                });
1030            }
1031        }
1032    }
1033    Ok(expr.map(map_exprs).project(project_key))
1034}
1035
1036/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1037/// VIEW` statement.
1038pub fn plan_as_of(
1039    scx: &StatementContext,
1040    as_of: Option<AsOf<Aug>>,
1041) -> Result<QueryWhen, PlanError> {
1042    match as_of {
1043        None => Ok(QueryWhen::Immediately),
1044        Some(as_of) => match as_of {
1045            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1046            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1047        },
1048    }
1049}
1050
1051/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1052///
1053/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1054/// - not a constant,
1055/// - not castable to MzTimestamp,
1056/// - is null,
1057/// - contains an unmaterializable function,
1058/// - some other evaluation error occurs, e.g., a division by 0,
1059/// - contains aggregates, subqueries, parameters, or window function calls.
1060pub fn plan_as_of_or_up_to(
1061    scx: &StatementContext,
1062    mut expr: Expr<Aug>,
1063) -> Result<mz_repr::Timestamp, PlanError> {
1064    let scope = Scope::empty();
1065    let desc = RelationDesc::empty();
1066    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1067    // evaluated only once.)
1068    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1069    transform_ast::transform(scx, &mut expr)?;
1070    let ecx = &ExprContext {
1071        qcx: &qcx,
1072        name: "AS OF or UP TO",
1073        scope: &scope,
1074        relation_type: desc.typ(),
1075        allow_aggregates: false,
1076        allow_subqueries: false,
1077        allow_parameters: false,
1078        allow_windows: false,
1079    };
1080    let hir = plan_expr(ecx, &expr)?.cast_to(
1081        ecx,
1082        CastContext::Assignment,
1083        &SqlScalarType::MzTimestamp,
1084    )?;
1085    if hir.contains_unmaterializable() {
1086        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1087    }
1088    // At this point, we definitely have a constant expression:
1089    // - it can't contain any unmaterializable functions;
1090    // - it can't refer to any columns.
1091    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1092    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1093    // division by 0.
1094    let timestamp = hir
1095        .into_literal_mz_timestamp()
1096        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1097    Ok(timestamp)
1098}
1099
1100/// Plans an expression in the AS position of a `CREATE SECRET`.
1101pub fn plan_secret_as(
1102    scx: &StatementContext,
1103    mut expr: Expr<Aug>,
1104) -> Result<MirScalarExpr, PlanError> {
1105    let scope = Scope::empty();
1106    let desc = RelationDesc::empty();
1107    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1108
1109    transform_ast::transform(scx, &mut expr)?;
1110
1111    let ecx = &ExprContext {
1112        qcx: &qcx,
1113        name: "AS",
1114        scope: &scope,
1115        relation_type: desc.typ(),
1116        allow_aggregates: false,
1117        allow_subqueries: false,
1118        allow_parameters: false,
1119        allow_windows: false,
1120    };
1121    let expr = plan_expr(ecx, &expr)?
1122        .type_as(ecx, &SqlScalarType::Bytes)?
1123        .lower_uncorrelated()?;
1124    Ok(expr)
1125}
1126
1127/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1128pub fn plan_webhook_validate_using(
1129    scx: &StatementContext,
1130    validate_using: CreateWebhookSourceCheck<Aug>,
1131) -> Result<WebhookValidation, PlanError> {
1132    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1133
1134    let CreateWebhookSourceCheck {
1135        options,
1136        using: mut expr,
1137    } = validate_using;
1138
1139    let mut column_typs = vec![];
1140    let mut column_names = vec![];
1141
1142    let (bodies, headers, secrets) = options
1143        .map(|o| (o.bodies, o.headers, o.secrets))
1144        .unwrap_or_default();
1145
1146    // Append all of the bodies so they can be used in the expression.
1147    let mut body_tuples = vec![];
1148    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1149        let scalar_type = use_bytes
1150            .then_some(SqlScalarType::Bytes)
1151            .unwrap_or(SqlScalarType::String);
1152        let name = alias
1153            .map(|a| a.into_string())
1154            .unwrap_or_else(|| "body".to_string());
1155
1156        column_typs.push(SqlColumnType {
1157            scalar_type,
1158            nullable: false,
1159        });
1160        column_names.push(name);
1161
1162        // Store the column index so we can be sure to provide this body correctly.
1163        let column_idx = column_typs.len() - 1;
1164        // Double check we're consistent with column names.
1165        assert_eq!(
1166            column_idx,
1167            column_names.len() - 1,
1168            "body column names and types don't match"
1169        );
1170        body_tuples.push((column_idx, use_bytes));
1171    }
1172
1173    // Append all of the headers so they can be used in the expression.
1174    let mut header_tuples = vec![];
1175
1176    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1177        let value_type = use_bytes
1178            .then_some(SqlScalarType::Bytes)
1179            .unwrap_or(SqlScalarType::String);
1180        let name = alias
1181            .map(|a| a.into_string())
1182            .unwrap_or_else(|| "headers".to_string());
1183
1184        column_typs.push(SqlColumnType {
1185            scalar_type: SqlScalarType::Map {
1186                value_type: Box::new(value_type),
1187                custom_id: None,
1188            },
1189            nullable: false,
1190        });
1191        column_names.push(name);
1192
1193        // Store the column index so we can be sure to provide this body correctly.
1194        let column_idx = column_typs.len() - 1;
1195        // Double check we're consistent with column names.
1196        assert_eq!(
1197            column_idx,
1198            column_names.len() - 1,
1199            "header column names and types don't match"
1200        );
1201        header_tuples.push((column_idx, use_bytes));
1202    }
1203
1204    // Append all secrets so they can be used in the expression.
1205    let mut validation_secrets = vec![];
1206
1207    for CreateWebhookSourceSecret {
1208        secret,
1209        alias,
1210        use_bytes,
1211    } in secrets
1212    {
1213        // Either provide the secret to the validation expression as Bytes or a String.
1214        let scalar_type = use_bytes
1215            .then_some(SqlScalarType::Bytes)
1216            .unwrap_or(SqlScalarType::String);
1217
1218        column_typs.push(SqlColumnType {
1219            scalar_type,
1220            nullable: false,
1221        });
1222        let ResolvedItemName::Item {
1223            id,
1224            full_name: FullItemName { item, .. },
1225            ..
1226        } = secret
1227        else {
1228            return Err(PlanError::InvalidSecret(Box::new(secret)));
1229        };
1230
1231        // Plan the expression using the secret's alias, if one is provided.
1232        let name = if let Some(alias) = alias {
1233            alias.into_string()
1234        } else {
1235            item
1236        };
1237        column_names.push(name);
1238
1239        // Get the column index that corresponds for this secret, so we can make sure to provide the
1240        // secrets in the correct order during evaluation.
1241        let column_idx = column_typs.len() - 1;
1242        // Double check that our column names and types match.
1243        assert_eq!(
1244            column_idx,
1245            column_names.len() - 1,
1246            "column names and types don't match"
1247        );
1248
1249        validation_secrets.push(WebhookValidationSecret {
1250            id,
1251            column_idx,
1252            use_bytes,
1253        });
1254    }
1255
1256    let relation_typ = SqlRelationType::new(column_typs);
1257    let desc = RelationDesc::new(relation_typ, column_names.clone());
1258    let scope = Scope::from_source(None, column_names);
1259
1260    transform_ast::transform(scx, &mut expr)?;
1261
1262    let ecx = &ExprContext {
1263        qcx: &qcx,
1264        name: "CHECK",
1265        scope: &scope,
1266        relation_type: desc.typ(),
1267        allow_aggregates: false,
1268        allow_subqueries: false,
1269        allow_parameters: false,
1270        allow_windows: false,
1271    };
1272    let expr = plan_expr(ecx, &expr)?
1273        .type_as(ecx, &SqlScalarType::Bool)?
1274        .lower_uncorrelated()?;
1275    let validation = WebhookValidation {
1276        expression: expr,
1277        relation_desc: desc,
1278        bodies: body_tuples,
1279        headers: header_tuples,
1280        secrets: validation_secrets,
1281    };
1282    Ok(validation)
1283}
1284
1285pub fn plan_default_expr(
1286    scx: &StatementContext,
1287    expr: &Expr<Aug>,
1288    target_ty: &SqlScalarType,
1289) -> Result<HirScalarExpr, PlanError> {
1290    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1291    let ecx = &ExprContext {
1292        qcx: &qcx,
1293        name: "DEFAULT expression",
1294        scope: &Scope::empty(),
1295        relation_type: &SqlRelationType::empty(),
1296        allow_aggregates: false,
1297        allow_subqueries: false,
1298        allow_parameters: false,
1299        allow_windows: false,
1300    };
1301    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1302    Ok(hir)
1303}
1304
1305pub fn plan_params<'a>(
1306    scx: &'a StatementContext,
1307    params: Vec<Expr<Aug>>,
1308    desc: &StatementDesc,
1309) -> Result<Params, PlanError> {
1310    if params.len() != desc.param_types.len() {
1311        sql_bail!(
1312            "expected {} params, got {}",
1313            desc.param_types.len(),
1314            params.len()
1315        );
1316    }
1317
1318    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1319
1320    let mut datums = Row::default();
1321    let mut packer = datums.packer();
1322    let mut actual_types = Vec::new();
1323    let temp_storage = &RowArena::new();
1324    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1325        transform_ast::transform(scx, &mut expr)?;
1326
1327        let ecx = execute_expr_context(&qcx);
1328        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1329        let actual_ty = ecx.scalar_type(&ex);
1330        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1331            return Err(PlanError::WrongParameterType(
1332                i + 1,
1333                ecx.humanize_scalar_type(expected_ty, false),
1334                ecx.humanize_scalar_type(&actual_ty, false),
1335            ));
1336        }
1337        let ex = ex.lower_uncorrelated()?;
1338        let evaled = ex.eval(&[], temp_storage)?;
1339        packer.push(evaled);
1340        actual_types.push(actual_ty);
1341    }
1342    Ok(Params {
1343        datums,
1344        execute_types: actual_types,
1345        expected_types: desc.param_types.clone(),
1346    })
1347}
1348
1349static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1350static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1351
1352/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1353pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1354    ExprContext {
1355        qcx,
1356        name: "EXECUTE",
1357        scope: &EXECUTE_CONTEXT_SCOPE,
1358        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1359        allow_aggregates: false,
1360        allow_subqueries: false,
1361        allow_parameters: false,
1362        allow_windows: false,
1363    }
1364}
1365
1366/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1367///
1368/// This is an assignment cast also in Postgres, see
1369/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1370pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1371    LazyLock::new(|| CastContext::Assignment);
1372
1373pub fn plan_index_exprs<'a>(
1374    scx: &'a StatementContext,
1375    on_desc: &RelationDesc,
1376    exprs: Vec<Expr<Aug>>,
1377) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1378    let scope = Scope::from_source(None, on_desc.iter_names());
1379    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1380
1381    let ecx = &ExprContext {
1382        qcx: &qcx,
1383        name: "CREATE INDEX",
1384        scope: &scope,
1385        relation_type: on_desc.typ(),
1386        allow_aggregates: false,
1387        allow_subqueries: false,
1388        allow_parameters: false,
1389        allow_windows: false,
1390    };
1391    let mut out = vec![];
1392    for mut expr in exprs {
1393        transform_ast::transform(scx, &mut expr)?;
1394        let expr = plan_expr_or_col_index(ecx, &expr)?;
1395        let mut expr = expr.lower_uncorrelated()?;
1396        expr.reduce(&on_desc.typ().column_types);
1397        out.push(expr);
1398    }
1399    Ok(out)
1400}
1401
1402fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1403    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1404        Some(column) => Ok(HirScalarExpr::column(column)),
1405        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1406    }
1407}
1408
1409fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1410    match e {
1411        Expr::Value(Value::Number(n)) => {
1412            let n = n.parse::<usize>().map_err(|e| {
1413                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1414            })?;
1415            if n < 1 || n > max {
1416                sql_bail!(
1417                    "column reference {} in {} is out of range (1 - {})",
1418                    n,
1419                    name,
1420                    max
1421                );
1422            }
1423            Ok(Some(n - 1))
1424        }
1425        _ => Ok(None),
1426    }
1427}
1428
1429struct PlannedQuery {
1430    expr: HirRelationExpr,
1431    scope: Scope,
1432    order_by: Vec<ColumnOrder>,
1433    limit: Option<HirScalarExpr>,
1434    /// `offset` is either
1435    /// - an Int64 literal
1436    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1437    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1438    ///   later.)
1439    offset: HirScalarExpr,
1440    project: Vec<usize>,
1441    group_size_hints: GroupSizeHints,
1442}
1443
1444fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1445    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1446}
1447
1448fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1449    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1450    // for the identifiers, so that they can be re-installed before returning.
1451    let cte_bindings = plan_ctes(qcx, q)?;
1452
1453    let limit = match &q.limit {
1454        None => None,
1455        Some(Limit {
1456            quantity,
1457            with_ties: false,
1458        }) => {
1459            let ecx = &ExprContext {
1460                qcx,
1461                name: "LIMIT",
1462                scope: &Scope::empty(),
1463                relation_type: &SqlRelationType::empty(),
1464                allow_aggregates: false,
1465                allow_subqueries: true,
1466                allow_parameters: true,
1467                allow_windows: false,
1468            };
1469            let limit = plan_expr(ecx, quantity)?;
1470            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1471
1472            let limit = if limit.is_constant() {
1473                let arena = RowArena::new();
1474                let limit = limit.lower_uncorrelated()?;
1475
1476                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1477                // that the error happened in a LIMIT clause, so that we have better error msg for
1478                // something like `SELECT 5 LIMIT 'aaa'`.
1479                match limit.eval(&[], &arena)? {
1480                    d @ Datum::Int64(v) if v >= 0 => {
1481                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1482                    }
1483                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1484                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1485                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1486                }
1487            } else {
1488                // Gate non-constant LIMIT expressions behind a feature flag
1489                qcx.scx
1490                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1491                limit
1492            };
1493
1494            Some(limit)
1495        }
1496        Some(Limit {
1497            quantity: _,
1498            with_ties: true,
1499        }) => bail_unsupported!("FETCH ... WITH TIES"),
1500    };
1501
1502    let offset = match &q.offset {
1503        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1504        Some(offset) => {
1505            let ecx = &ExprContext {
1506                qcx,
1507                name: "OFFSET",
1508                scope: &Scope::empty(),
1509                relation_type: &SqlRelationType::empty(),
1510                allow_aggregates: false,
1511                allow_subqueries: false,
1512                allow_parameters: true,
1513                allow_windows: false,
1514            };
1515            let offset = plan_expr(ecx, offset)?;
1516            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1517
1518            let offset = if offset.is_constant() {
1519                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1520                let offset_value = offset_into_value(offset)?;
1521                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1522            } else {
1523                // The only case when this is allowed to not be a constant is if it contains
1524                // parameters. (In which case, we'll later check that it's a constant after
1525                // parameter binding.)
1526                if !offset.contains_parameters() {
1527                    return Err(PlanError::InvalidOffset(format!(
1528                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1529                        offset
1530                    )));
1531                }
1532                offset
1533            };
1534            offset
1535        }
1536    };
1537
1538    let mut planned_query = match &q.body {
1539        SetExpr::Select(s) => {
1540            // Extract query options.
1541            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1542            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1543
1544            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1545            PlannedQuery {
1546                expr: plan.expr,
1547                scope: plan.scope,
1548                order_by: plan.order_by,
1549                project: plan.project,
1550                limit,
1551                offset,
1552                group_size_hints,
1553            }
1554        }
1555        _ => {
1556            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1557            let ecx = &ExprContext {
1558                qcx,
1559                name: "ORDER BY clause of a set expression",
1560                scope: &scope,
1561                relation_type: &qcx.relation_type(&expr),
1562                allow_aggregates: false,
1563                allow_subqueries: true,
1564                allow_parameters: true,
1565                allow_windows: false,
1566            };
1567            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1568            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1569            let project = (0..ecx.relation_type.arity()).collect();
1570            PlannedQuery {
1571                expr: expr.map(map_exprs),
1572                scope,
1573                order_by,
1574                limit,
1575                project,
1576                offset,
1577                group_size_hints: GroupSizeHints::default(),
1578            }
1579        }
1580    };
1581
1582    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1583    match &q.ctes {
1584        CteBlock::Simple(_) => {
1585            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1586                if let Some(cte) = qcx.ctes.remove(&id) {
1587                    planned_query.expr = HirRelationExpr::Let {
1588                        name: cte.name,
1589                        id: id.clone(),
1590                        value: Box::new(value),
1591                        body: Box::new(planned_query.expr),
1592                    };
1593                }
1594                if let Some(shadowed_val) = shadowed_val {
1595                    qcx.ctes.insert(id, shadowed_val);
1596                }
1597            }
1598        }
1599        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1600            let MutRecBlockOptionExtracted {
1601                recursion_limit,
1602                return_at_recursion_limit,
1603                error_at_recursion_limit,
1604                seen: _,
1605            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1606            let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1607                (None, None, None) => None,
1608                (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1609                (None, Some(max_iters), None) => Some((max_iters, true)),
1610                (None, None, Some(max_iters)) => Some((max_iters, false)),
1611                _ => {
1612                    return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1613                }
1614            }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1615                max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1616                return_at_limit: *return_at_limit,
1617            }))?;
1618
1619            let mut bindings = Vec::new();
1620            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1621                if let Some(cte) = qcx.ctes.remove(&id) {
1622                    bindings.push((cte.name, id, value, cte.desc.typ().clone()));
1623                }
1624                if let Some(shadowed_val) = shadowed_val {
1625                    qcx.ctes.insert(id, shadowed_val);
1626                }
1627            }
1628            if !bindings.is_empty() {
1629                planned_query.expr = HirRelationExpr::LetRec {
1630                    limit,
1631                    bindings,
1632                    body: Box::new(planned_query.expr),
1633                }
1634            }
1635        }
1636    }
1637
1638    Ok(planned_query)
1639}
1640
1641/// Converts an OFFSET expression into a value.
1642pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1643    let offset = offset
1644        .try_into_literal_int64()
1645        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1646    if offset < 0 {
1647        return Err(PlanError::InvalidOffset(format!(
1648            "must not be negative, got {}",
1649            offset
1650        )));
1651    }
1652    Ok(offset)
1653}
1654
1655generate_extracted_config!(
1656    MutRecBlockOption,
1657    (RecursionLimit, u64),
1658    (ReturnAtRecursionLimit, u64),
1659    (ErrorAtRecursionLimit, u64)
1660);
1661
1662/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1663///
1664/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1665/// shadowed value that can be reinstalled once the planning has completed.
1666pub fn plan_ctes(
1667    qcx: &mut QueryContext,
1668    q: &Query<Aug>,
1669) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1670    // Accumulate planned expressions and shadowed descriptions.
1671    let mut result = Vec::new();
1672    // Retain the old descriptions of CTE bindings so that we can restore them
1673    // after we're done planning this SELECT.
1674    let mut shadowed_descs = BTreeMap::new();
1675
1676    // A reused identifier indicates a reused name.
1677    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1678        sql_bail!(
1679            "WITH query name {} specified more than once",
1680            normalize::ident_ref(ident).quoted()
1681        )
1682    }
1683
1684    match &q.ctes {
1685        CteBlock::Simple(ctes) => {
1686            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1687            for cte in ctes.iter() {
1688                let cte_name = normalize::ident(cte.alias.name.clone());
1689                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1690                let typ = qcx.relation_type(&val);
1691                let mut desc = RelationDesc::new(typ, scope.column_names());
1692                plan_utils::maybe_rename_columns(
1693                    format!("CTE {}", cte.alias.name),
1694                    &mut desc,
1695                    &cte.alias.columns,
1696                )?;
1697                // Capture the prior value if it exists, so that it can be re-installed.
1698                let shadowed = qcx.ctes.insert(
1699                    cte.id,
1700                    CteDesc {
1701                        name: cte_name,
1702                        desc,
1703                    },
1704                );
1705
1706                result.push((cte.id, val, shadowed));
1707            }
1708        }
1709        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1710            // Insert column types into `qcx.ctes` first for recursive bindings.
1711            for cte in ctes.iter() {
1712                let cte_name = normalize::ident(cte.name.clone());
1713                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1714                for column in cte.columns.iter() {
1715                    desc_columns.push((
1716                        normalize::column_name(column.name.clone()),
1717                        SqlColumnType {
1718                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1719                            nullable: true,
1720                        },
1721                    ));
1722                }
1723                let desc = RelationDesc::from_names_and_types(desc_columns);
1724                let shadowed = qcx.ctes.insert(
1725                    cte.id,
1726                    CteDesc {
1727                        name: cte_name,
1728                        desc,
1729                    },
1730                );
1731                // Capture the prior value if it exists, so that it can be re-installed.
1732                if let Some(shadowed) = shadowed {
1733                    shadowed_descs.insert(cte.id, shadowed);
1734                }
1735            }
1736
1737            // Plan all CTEs and validate the proposed types.
1738            for cte in ctes.iter() {
1739                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1740
1741                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1742
1743                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1744                    // Once WMR CTEs support NOT NULL constraints, check that
1745                    // nullability of derived column types are compatible.
1746                    sql_bail!(
1747                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1748                    );
1749                }
1750
1751                if !proposed_typ.keys.is_empty() {
1752                    // Once WMR CTEs support keys, check that keys exactly
1753                    // overlap.
1754                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1755                }
1756
1757                // Validate that the derived and proposed types are the same.
1758                let derived_typ = qcx.relation_type(&val);
1759
1760                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1761                    let cte_name = normalize::ident(cte.name.clone());
1762                    let proposed_typ = proposed_typ
1763                        .column_types
1764                        .iter()
1765                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1766                        .collect::<Vec<_>>();
1767                    let inferred_typ = derived_typ
1768                        .column_types
1769                        .iter()
1770                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1771                        .collect::<Vec<_>>();
1772                    Err(PlanError::RecursiveTypeMismatch(
1773                        cte_name,
1774                        proposed_typ,
1775                        inferred_typ,
1776                    ))
1777                };
1778
1779                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1780                    return type_err(proposed_typ, derived_typ);
1781                }
1782
1783                // Cast derived types to proposed types or error.
1784                let val = match cast_relation(
1785                    qcx,
1786                    // Choose `CastContext::Assignment`` because the user has
1787                    // been explicit about the types they expect. Choosing
1788                    // `CastContext::Implicit` is not "strong" enough to impose
1789                    // typmods from proposed types onto values.
1790                    CastContext::Assignment,
1791                    val,
1792                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1793                ) {
1794                    Ok(val) => val,
1795                    Err(_) => return type_err(proposed_typ, derived_typ),
1796                };
1797
1798                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1799            }
1800        }
1801    }
1802
1803    Ok(result)
1804}
1805
1806pub fn plan_nested_query(
1807    qcx: &mut QueryContext,
1808    q: &Query<Aug>,
1809) -> Result<(HirRelationExpr, Scope), PlanError> {
1810    let PlannedQuery {
1811        mut expr,
1812        scope,
1813        order_by,
1814        limit,
1815        offset,
1816        project,
1817        group_size_hints,
1818    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1819    if limit.is_some()
1820        || !offset
1821            .clone()
1822            .try_into_literal_int64()
1823            .is_ok_and(|offset| offset == 0)
1824    {
1825        expr = HirRelationExpr::top_k(
1826            expr,
1827            vec![],
1828            order_by,
1829            limit,
1830            offset,
1831            group_size_hints.limit_input_group_size,
1832        );
1833    }
1834    Ok((expr.project(project), scope))
1835}
1836
1837fn plan_set_expr(
1838    qcx: &mut QueryContext,
1839    q: &SetExpr<Aug>,
1840) -> Result<(HirRelationExpr, Scope), PlanError> {
1841    match q {
1842        SetExpr::Select(select) => {
1843            let order_by_exprs = Vec::new();
1844            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1845            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1846            // should not have planned any ordering.
1847            assert!(plan.order_by.is_empty());
1848            Ok((plan.expr.project(plan.project), plan.scope))
1849        }
1850        SetExpr::SetOperation {
1851            op,
1852            all,
1853            left,
1854            right,
1855        } => {
1856            // Plan the LHS and RHS.
1857            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1858            let (right_expr, right_scope) =
1859                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1860
1861            // Validate that the LHS and RHS are the same width.
1862            let left_type = qcx.relation_type(&left_expr);
1863            let right_type = qcx.relation_type(&right_expr);
1864            if left_type.arity() != right_type.arity() {
1865                sql_bail!(
1866                    "each {} query must have the same number of columns: {} vs {}",
1867                    op,
1868                    left_type.arity(),
1869                    right_type.arity(),
1870                );
1871            }
1872
1873            // Match the types of the corresponding columns on the LHS and RHS
1874            // using the normal type coercion rules. This is equivalent to
1875            // `coerce_homogeneous_exprs`, but implemented in terms of
1876            // `HirRelationExpr` rather than `HirScalarExpr`.
1877            let left_ecx = &ExprContext {
1878                qcx,
1879                name: &op.to_string(),
1880                scope: &left_scope,
1881                relation_type: &left_type,
1882                allow_aggregates: false,
1883                allow_subqueries: false,
1884                allow_parameters: false,
1885                allow_windows: false,
1886            };
1887            let right_ecx = &ExprContext {
1888                qcx,
1889                name: &op.to_string(),
1890                scope: &right_scope,
1891                relation_type: &right_type,
1892                allow_aggregates: false,
1893                allow_subqueries: false,
1894                allow_parameters: false,
1895                allow_windows: false,
1896            };
1897            let mut left_casts = vec![];
1898            let mut right_casts = vec![];
1899            for (i, (left_type, right_type)) in left_type
1900                .column_types
1901                .iter()
1902                .zip_eq(right_type.column_types.iter())
1903                .enumerate()
1904            {
1905                let types = &[
1906                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1907                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1908                ];
1909                let target =
1910                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1911                match typeconv::plan_cast(
1912                    left_ecx,
1913                    CastContext::Implicit,
1914                    HirScalarExpr::column(i),
1915                    &target,
1916                ) {
1917                    Ok(expr) => left_casts.push(expr),
1918                    Err(_) => sql_bail!(
1919                        "{} types {} and {} cannot be matched",
1920                        op,
1921                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1922                        qcx.humanize_scalar_type(&target, false),
1923                    ),
1924                }
1925                match typeconv::plan_cast(
1926                    right_ecx,
1927                    CastContext::Implicit,
1928                    HirScalarExpr::column(i),
1929                    &target,
1930                ) {
1931                    Ok(expr) => right_casts.push(expr),
1932                    Err(_) => sql_bail!(
1933                        "{} types {} and {} cannot be matched",
1934                        op,
1935                        qcx.humanize_scalar_type(&target, false),
1936                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1937                    ),
1938                }
1939            }
1940            let lhs = if left_casts
1941                .iter()
1942                .enumerate()
1943                .any(|(i, e)| e != &HirScalarExpr::column(i))
1944            {
1945                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1946                left_expr.map(left_casts).project(project_key)
1947            } else {
1948                left_expr
1949            };
1950            let rhs = if right_casts
1951                .iter()
1952                .enumerate()
1953                .any(|(i, e)| e != &HirScalarExpr::column(i))
1954            {
1955                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1956                right_expr.map(right_casts).project(project_key)
1957            } else {
1958                right_expr
1959            };
1960
1961            let relation_expr = match op {
1962                SetOperator::Union => {
1963                    if *all {
1964                        lhs.union(rhs)
1965                    } else {
1966                        lhs.union(rhs).distinct()
1967                    }
1968                }
1969                SetOperator::Except => Hir::except(all, lhs, rhs),
1970                SetOperator::Intersect => {
1971                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1972                    // Though we believe that render() does The Right Thing (TM)
1973                    // Also note that we do *not* need another threshold() at the end of the method chain
1974                    // because the right-hand side of the outer union only produces existing records,
1975                    // i.e., the record counts for differential data flow definitely remain non-negative.
1976                    let left_clone = lhs.clone();
1977                    if *all {
1978                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1979                    } else {
1980                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1981                            .distinct()
1982                    }
1983                }
1984            };
1985            let scope = Scope::from_source(
1986                None,
1987                // Column names are taken from the left, as in Postgres.
1988                left_scope.column_names(),
1989            );
1990
1991            Ok((relation_expr, scope))
1992        }
1993        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1994        SetExpr::Table(name) => {
1995            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1996            Ok((expr, scope))
1997        }
1998        SetExpr::Query(query) => {
1999            let (expr, scope) = plan_nested_query(qcx, query)?;
2000            Ok((expr, scope))
2001        }
2002        SetExpr::Show(stmt) => {
2003            // The create SQL definition of involving this query, will have the explicit `SHOW`
2004            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2005            // current schema of the executing user. When Materialize restarts and tries to re-plan
2006            // these queries, it will only have access to the raw `SHOW` command and have no idea
2007            // what schema to use. As a result Materialize will fail to boot.
2008            //
2009            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2010            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2011            // However, banning show commands in views gives us more flexibility to change their
2012            // output.
2013            //
2014            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2015            // with all show commands expanded into their equivalent SELECT statements.
2016            if !qcx.lifetime.allow_show() {
2017                return Err(PlanError::ShowCommandInView);
2018            }
2019
2020            // Some SHOW statements are a SELECT query. Others produces Rows
2021            // directly. Convert both of these to the needed Hir and Scope.
2022            fn to_hirscope(
2023                plan: ShowCreatePlan,
2024                desc: StatementDesc,
2025            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2026                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2027                let desc = desc.relation_desc.expect("must exist");
2028                let expr = HirRelationExpr::constant(rows, desc.typ().clone());
2029                let scope = Scope::from_source(None, desc.iter_names());
2030                Ok((expr, scope))
2031            }
2032
2033            match stmt.clone() {
2034                ShowStatement::ShowColumns(stmt) => {
2035                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2036                }
2037                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2038                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2039                    show::describe_show_create_connection(qcx.scx, stmt)?,
2040                ),
2041                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2042                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2043                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2044                ),
2045                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2046                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2047                    show::describe_show_create_index(qcx.scx, stmt)?,
2048                ),
2049                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2050                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2051                    show::describe_show_create_sink(qcx.scx, stmt)?,
2052                ),
2053                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2054                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2055                    show::describe_show_create_source(qcx.scx, stmt)?,
2056                ),
2057                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2058                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2059                    show::describe_show_create_table(qcx.scx, stmt)?,
2060                ),
2061                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2062                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2063                    show::describe_show_create_view(qcx.scx, stmt)?,
2064                ),
2065                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2066                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2067                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2068                ),
2069                ShowStatement::ShowObjects(stmt) => {
2070                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2071                }
2072                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2073                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2074            }
2075        }
2076    }
2077}
2078
2079/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2080fn plan_values(
2081    qcx: &QueryContext,
2082    values: &[Vec<Expr<Aug>>],
2083) -> Result<(HirRelationExpr, Scope), PlanError> {
2084    assert!(!values.is_empty());
2085
2086    let ecx = &ExprContext {
2087        qcx,
2088        name: "VALUES",
2089        scope: &Scope::empty(),
2090        relation_type: &SqlRelationType::empty(),
2091        allow_aggregates: false,
2092        allow_subqueries: true,
2093        allow_parameters: true,
2094        allow_windows: false,
2095    };
2096
2097    let ncols = values[0].len();
2098    let nrows = values.len();
2099
2100    // Arrange input expressions by columns, not rows, so that we can
2101    // call `coerce_homogeneous_exprs` on each column.
2102    let mut cols = vec![vec![]; ncols];
2103    for row in values {
2104        if row.len() != ncols {
2105            sql_bail!(
2106                "VALUES expression has varying number of columns: {} vs {}",
2107                row.len(),
2108                ncols
2109            );
2110        }
2111        for (i, v) in row.iter().enumerate() {
2112            cols[i].push(v);
2113        }
2114    }
2115
2116    // Plan each column.
2117    let mut col_iters = Vec::with_capacity(ncols);
2118    let mut col_types = Vec::with_capacity(ncols);
2119    for col in &cols {
2120        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2121        let mut col_type = ecx.column_type(&col[0]);
2122        for val in &col[1..] {
2123            col_type = col_type.union(&ecx.column_type(val))?;
2124        }
2125        col_types.push(col_type);
2126        col_iters.push(col.into_iter());
2127    }
2128
2129    // Build constant relation.
2130    let mut exprs = vec![];
2131    for _ in 0..nrows {
2132        for i in 0..ncols {
2133            exprs.push(col_iters[i].next().unwrap());
2134        }
2135    }
2136    let out = HirRelationExpr::CallTable {
2137        func: TableFunc::Wrap {
2138            width: ncols,
2139            types: col_types,
2140        },
2141        exprs,
2142    };
2143
2144    // Build column names.
2145    let mut scope = Scope::empty();
2146    for i in 0..ncols {
2147        let name = format!("column{}", i + 1);
2148        scope.items.push(ScopeItem::from_column_name(name));
2149    }
2150
2151    Ok((out, scope))
2152}
2153
2154/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2155/// statement.
2156///
2157/// This is special-cased in PostgreSQL and different enough from `plan_values`
2158/// that it is easier to use a separate function entirely. Unlike a normal
2159/// `VALUES` clause, each value is coerced to the type of the target table
2160/// via an assignment cast.
2161///
2162/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2163fn plan_values_insert(
2164    qcx: &QueryContext,
2165    target_names: &[&ColumnName],
2166    target_types: &[&SqlScalarType],
2167    values: &[Vec<Expr<Aug>>],
2168) -> Result<HirRelationExpr, PlanError> {
2169    assert!(!values.is_empty());
2170
2171    if !values.iter().map(|row| row.len()).all_equal() {
2172        sql_bail!("VALUES lists must all be the same length");
2173    }
2174
2175    let ecx = &ExprContext {
2176        qcx,
2177        name: "VALUES",
2178        scope: &Scope::empty(),
2179        relation_type: &SqlRelationType::empty(),
2180        allow_aggregates: false,
2181        allow_subqueries: true,
2182        allow_parameters: true,
2183        allow_windows: false,
2184    };
2185
2186    let mut exprs = vec![];
2187    let mut types = vec![];
2188    for row in values {
2189        if row.len() > target_names.len() {
2190            sql_bail!("INSERT has more expressions than target columns");
2191        }
2192        for (column, val) in row.into_iter().enumerate() {
2193            let target_type = &target_types[column];
2194            let val = plan_expr(ecx, val)?;
2195            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2196            let source_type = &ecx.scalar_type(&val);
2197            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2198                Ok(val) => val,
2199                Err(_) => sql_bail!(
2200                    "column {} is of type {} but expression is of type {}",
2201                    target_names[column].quoted(),
2202                    qcx.humanize_scalar_type(target_type, false),
2203                    qcx.humanize_scalar_type(source_type, false),
2204                ),
2205            };
2206            if column >= types.len() {
2207                types.push(ecx.column_type(&val));
2208            } else {
2209                types[column] = types[column].union(&ecx.column_type(&val))?;
2210            }
2211            exprs.push(val);
2212        }
2213    }
2214
2215    Ok(HirRelationExpr::CallTable {
2216        func: TableFunc::Wrap {
2217            width: values[0].len(),
2218            types,
2219        },
2220        exprs,
2221    })
2222}
2223
2224fn plan_join_identity() -> (HirRelationExpr, Scope) {
2225    let typ = SqlRelationType::new(vec![]);
2226    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2227    let scope = Scope::empty();
2228    (expr, scope)
2229}
2230
2231/// Describes how to execute a SELECT query.
2232///
2233/// `order_by` describes how to order the rows in `expr` *before* applying the
2234/// projection. The `scope` describes the columns in `expr` *after* the
2235/// projection has been applied.
2236#[derive(Debug)]
2237struct SelectPlan {
2238    expr: HirRelationExpr,
2239    scope: Scope,
2240    order_by: Vec<ColumnOrder>,
2241    project: Vec<usize>,
2242}
2243
2244generate_extracted_config!(
2245    SelectOption,
2246    (ExpectedGroupSize, u64),
2247    (AggregateInputGroupSize, u64),
2248    (DistinctOnInputGroupSize, u64),
2249    (LimitInputGroupSize, u64)
2250);
2251
2252/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2253///
2254/// Normally, the ORDER BY clause occurs after the columns specified in the
2255/// SELECT list have been projected. In a query like
2256///
2257///   CREATE TABLE (a int, b int)
2258///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2259///
2260/// it is valid to refer to `a`, because it is explicitly selected, but it would
2261/// not be valid to refer to unselected column `b`.
2262///
2263/// But PostgreSQL extends the standard to permit queries like
2264///
2265///   SELECT a FROM t ORDER BY b
2266///
2267/// where expressions in the ORDER BY clause can refer to *both* input columns
2268/// and output columns.
2269fn plan_select_from_where(
2270    qcx: &QueryContext,
2271    mut s: Select<Aug>,
2272    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2273) -> Result<SelectPlan, PlanError> {
2274    // TODO: Both `s` and `order_by_exprs` are not references because the
2275    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2276    // table function support (the UUID mapping). Attempt to change this so callers
2277    // don't need to clone the Select.
2278
2279    // Extract query options.
2280    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2281    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2282
2283    // Step 1. Handle FROM clause, including joins.
2284    let (mut relation_expr, mut from_scope) =
2285        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2286            let (left, left_scope) = l;
2287            plan_join(
2288                qcx,
2289                left,
2290                left_scope,
2291                &Join {
2292                    relation: TableFactor::NestedJoin {
2293                        join: Box::new(twj.clone()),
2294                        alias: None,
2295                    },
2296                    join_operator: JoinOperator::CrossJoin,
2297                },
2298            )
2299        })?;
2300
2301    // Step 2. Handle WHERE clause.
2302    if let Some(selection) = &s.selection {
2303        let ecx = &ExprContext {
2304            qcx,
2305            name: "WHERE clause",
2306            scope: &from_scope,
2307            relation_type: &qcx.relation_type(&relation_expr),
2308            allow_aggregates: false,
2309            allow_subqueries: true,
2310            allow_parameters: true,
2311            allow_windows: false,
2312        };
2313        let expr = plan_expr(ecx, selection)
2314            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2315            .type_as(ecx, &SqlScalarType::Bool)?;
2316        relation_expr = relation_expr.filter(vec![expr]);
2317    }
2318
2319    // Step 3. Gather aggregates and table functions.
2320    // (But skip window aggregates.)
2321    let (aggregates, table_funcs) = {
2322        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2323        visitor.visit_select_mut(&mut s);
2324        for o in order_by_exprs.iter_mut() {
2325            visitor.visit_order_by_expr_mut(o);
2326        }
2327        visitor.into_result()?
2328    };
2329    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2330    if !table_funcs.is_empty() {
2331        let (expr, scope) = plan_scalar_table_funcs(
2332            qcx,
2333            table_funcs,
2334            &mut table_func_names,
2335            &relation_expr,
2336            &from_scope,
2337        )?;
2338        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2339        from_scope = from_scope.product(scope)?;
2340    }
2341
2342    // Step 4. Expand SELECT clause.
2343    let projection = {
2344        let ecx = &ExprContext {
2345            qcx,
2346            name: "SELECT clause",
2347            scope: &from_scope,
2348            relation_type: &qcx.relation_type(&relation_expr),
2349            allow_aggregates: true,
2350            allow_subqueries: true,
2351            allow_parameters: true,
2352            allow_windows: true,
2353        };
2354        let mut out = vec![];
2355        for si in &s.projection {
2356            if *si == SelectItem::Wildcard && s.from.is_empty() {
2357                sql_bail!("SELECT * with no tables specified is not valid");
2358            }
2359            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2360        }
2361        out
2362    };
2363
2364    // Step 5. Handle GROUP BY clause.
2365    // This will also plan the aggregates gathered in Step 3.
2366    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2367    let (mut group_scope, select_all_mapping) = {
2368        // Compute GROUP BY expressions.
2369        let ecx = &ExprContext {
2370            qcx,
2371            name: "GROUP BY clause",
2372            scope: &from_scope,
2373            relation_type: &qcx.relation_type(&relation_expr),
2374            allow_aggregates: false,
2375            allow_subqueries: true,
2376            allow_parameters: true,
2377            allow_windows: false,
2378        };
2379        let mut group_key = vec![];
2380        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2381        let mut group_hir_exprs = vec![];
2382        let mut group_scope = Scope::empty();
2383        let mut select_all_mapping = BTreeMap::new();
2384
2385        for group_expr in &s.group_by {
2386            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2387            let new_column = group_key.len();
2388
2389            if let Some(group_expr) = group_expr {
2390                // Multiple AST expressions can map to the same HIR expression.
2391                // If we already have a ScopeItem for this HIR, we can add this
2392                // next AST expression to its set
2393                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2394                    existing_scope_item.exprs.insert(group_expr.clone());
2395                    continue;
2396                }
2397            }
2398
2399            let mut scope_item = if let HirScalarExpr::Column(
2400                ColumnRef {
2401                    level: 0,
2402                    column: old_column,
2403                },
2404                _name,
2405            ) = &expr
2406            {
2407                // If we later have `SELECT foo.*` then we have to find all
2408                // the `foo` items in `from_scope` and figure out where they
2409                // ended up in `group_scope`. This is really hard to do
2410                // right using SQL name resolution, so instead we just track
2411                // the movement here.
2412                select_all_mapping.insert(*old_column, new_column);
2413                let scope_item = ecx.scope.items[*old_column].clone();
2414                scope_item
2415            } else {
2416                ScopeItem::empty()
2417            };
2418
2419            if let Some(group_expr) = group_expr.cloned() {
2420                scope_item.exprs.insert(group_expr);
2421            }
2422
2423            group_key.push(from_scope.len() + group_exprs.len());
2424            group_hir_exprs.push(expr.clone());
2425            group_exprs.insert(expr, scope_item);
2426        }
2427
2428        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2429        for expr in &group_hir_exprs {
2430            if let Some(scope_item) = group_exprs.remove(expr) {
2431                group_scope.items.push(scope_item);
2432            }
2433        }
2434
2435        // Plan aggregates.
2436        let ecx = &ExprContext {
2437            qcx,
2438            name: "aggregate function",
2439            scope: &from_scope,
2440            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2441            allow_aggregates: false,
2442            allow_subqueries: true,
2443            allow_parameters: true,
2444            allow_windows: false,
2445        };
2446        let mut agg_exprs = vec![];
2447        for sql_function in aggregates {
2448            if sql_function.over.is_some() {
2449                unreachable!(
2450                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2451                );
2452            }
2453            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2454            group_scope
2455                .items
2456                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2457        }
2458        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2459            // apply GROUP BY / aggregates
2460            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2461                group_key,
2462                agg_exprs,
2463                group_size_hints.aggregate_input_group_size,
2464            );
2465
2466            // For every old column that wasn't a group key, add a scope item
2467            // that errors when referenced. We can't simply drop these items
2468            // from scope. These items need to *exist* because they might shadow
2469            // variables in outer scopes that would otherwise be valid to
2470            // reference, but accessing them needs to produce an error.
2471            for i in 0..from_scope.len() {
2472                if !select_all_mapping.contains_key(&i) {
2473                    let scope_item = &ecx.scope.items[i];
2474                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2475                        table_name: scope_item.table_name.clone(),
2476                        column_name: scope_item.column_name.clone(),
2477                        allow_unqualified_references: scope_item.allow_unqualified_references,
2478                    });
2479                }
2480            }
2481
2482            (group_scope, select_all_mapping)
2483        } else {
2484            // if no GROUP BY, aggregates or having then all columns remain in scope
2485            (
2486                from_scope.clone(),
2487                (0..from_scope.len()).map(|i| (i, i)).collect(),
2488            )
2489        }
2490    };
2491
2492    // Step 6. Handle HAVING clause.
2493    if let Some(ref having) = s.having {
2494        let ecx = &ExprContext {
2495            qcx,
2496            name: "HAVING clause",
2497            scope: &group_scope,
2498            relation_type: &qcx.relation_type(&relation_expr),
2499            allow_aggregates: true,
2500            allow_subqueries: true,
2501            allow_parameters: true,
2502            allow_windows: false,
2503        };
2504        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2505        relation_expr = relation_expr.filter(vec![expr]);
2506    }
2507
2508    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2509    // (This includes window aggregations.)
2510    //
2511    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2512    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2513    //
2514    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2515    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2516    // and can't be part of a bigger expression.
2517    // See https://www.postgresql.org/docs/current/queries-order.html:
2518    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2519    // expression"
2520    let window_funcs = {
2521        let mut visitor = WindowFuncCollector::default();
2522        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2523        // window functions are excluded from other things by `allow_windows` being false when
2524        // planning those before this code).
2525        visitor.visit_select(&s);
2526        for o in order_by_exprs.iter() {
2527            visitor.visit_order_by_expr(o);
2528        }
2529        visitor.into_result()
2530    };
2531    for window_func in window_funcs {
2532        let ecx = &ExprContext {
2533            qcx,
2534            name: "window function",
2535            scope: &group_scope,
2536            relation_type: &qcx.relation_type(&relation_expr),
2537            allow_aggregates: true,
2538            allow_subqueries: true,
2539            allow_parameters: true,
2540            allow_windows: true,
2541        };
2542        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2543        group_scope.items.push(ScopeItem::from_expr(window_func));
2544    }
2545    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2546    // been already planned now. However, we should still set `allow_windows: true` for the
2547    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2548    // msg if an OVER clause is missing from a window function.
2549
2550    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2551    if let Some(ref qualify) = s.qualify {
2552        let ecx = &ExprContext {
2553            qcx,
2554            name: "QUALIFY clause",
2555            scope: &group_scope,
2556            relation_type: &qcx.relation_type(&relation_expr),
2557            allow_aggregates: true,
2558            allow_subqueries: true,
2559            allow_parameters: true,
2560            allow_windows: true,
2561        };
2562        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2563        relation_expr = relation_expr.filter(vec![expr]);
2564    }
2565
2566    // Step 9. Handle SELECT clause.
2567    let output_columns = {
2568        let mut new_exprs = vec![];
2569        let mut new_type = qcx.relation_type(&relation_expr);
2570        let mut output_columns = vec![];
2571        for (select_item, column_name) in &projection {
2572            let ecx = &ExprContext {
2573                qcx,
2574                name: "SELECT clause",
2575                scope: &group_scope,
2576                relation_type: &new_type,
2577                allow_aggregates: true,
2578                allow_subqueries: true,
2579                allow_parameters: true,
2580                allow_windows: true,
2581            };
2582            let expr = match select_item {
2583                ExpandedSelectItem::InputOrdinal(i) => {
2584                    if let Some(column) = select_all_mapping.get(i).copied() {
2585                        HirScalarExpr::column(column)
2586                    } else {
2587                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2588                    }
2589                }
2590                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2591            };
2592            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2593                // Simple column reference; no need to map on a new expression.
2594                output_columns.push((column, column_name));
2595            } else {
2596                // Complicated expression that requires a map expression. We
2597                // update `group_scope` as we go so that future expressions that
2598                // are textually identical to this one can reuse it. This
2599                // duplicate detection is required for proper determination of
2600                // ambiguous column references with SQL92-style `ORDER BY`
2601                // items. See `plan_order_by_or_distinct_expr` for more.
2602                let typ = ecx.column_type(&expr);
2603                new_type.column_types.push(typ);
2604                new_exprs.push(expr);
2605                output_columns.push((group_scope.len(), column_name));
2606                group_scope
2607                    .items
2608                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2609            }
2610        }
2611        relation_expr = relation_expr.map(new_exprs);
2612        output_columns
2613    };
2614    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2615
2616    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2617    let order_by = {
2618        let relation_type = qcx.relation_type(&relation_expr);
2619        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2620            &ExprContext {
2621                qcx,
2622                name: "ORDER BY clause",
2623                scope: &group_scope,
2624                relation_type: &relation_type,
2625                allow_aggregates: true,
2626                allow_subqueries: true,
2627                allow_parameters: true,
2628                allow_windows: true,
2629            },
2630            &order_by_exprs,
2631            &output_columns,
2632        )?;
2633
2634        match s.distinct {
2635            None => relation_expr = relation_expr.map(map_exprs),
2636            Some(Distinct::EntireRow) => {
2637                if relation_type.arity() == 0 {
2638                    sql_bail!("SELECT DISTINCT must have at least one column");
2639                }
2640                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2641                // list, so we can't proceed if `ORDER BY` has introduced any
2642                // columns for arbitrary expressions. This matches PostgreSQL.
2643                if !try_push_projection_order_by(
2644                    &mut relation_expr,
2645                    &mut project_key,
2646                    &mut order_by,
2647                ) {
2648                    sql_bail!(
2649                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2650                    );
2651                }
2652                assert!(map_exprs.is_empty());
2653                relation_expr = relation_expr.distinct();
2654            }
2655            Some(Distinct::On(exprs)) => {
2656                let ecx = &ExprContext {
2657                    qcx,
2658                    name: "DISTINCT ON clause",
2659                    scope: &group_scope,
2660                    relation_type: &qcx.relation_type(&relation_expr),
2661                    allow_aggregates: true,
2662                    allow_subqueries: true,
2663                    allow_parameters: true,
2664                    allow_windows: true,
2665                };
2666
2667                let mut distinct_exprs = vec![];
2668                for expr in &exprs {
2669                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2670                    distinct_exprs.push(expr);
2671                }
2672
2673                let mut distinct_key = vec![];
2674
2675                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2676                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2677                // expressions, though the order of `DISTINCT ON` expressions
2678                // does not matter. This matches PostgreSQL and leaves the door
2679                // open to a future optimization where the `DISTINCT ON` and
2680                // `ORDER BY` operations happen in one pass.
2681                //
2682                // On the bright side, any columns that have already been
2683                // computed by `ORDER BY` can be reused in the distinct key.
2684                let arity = relation_type.arity();
2685                for ord in order_by.iter().take(distinct_exprs.len()) {
2686                    // The unusual construction of `expr` here is to ensure the
2687                    // temporary column expression lives long enough.
2688                    let mut expr = &HirScalarExpr::column(ord.column);
2689                    if ord.column >= arity {
2690                        expr = &map_exprs[ord.column - arity];
2691                    };
2692                    match distinct_exprs.iter().position(move |e| e == expr) {
2693                        None => sql_bail!(
2694                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2695                        ),
2696                        Some(pos) => {
2697                            distinct_exprs.remove(pos);
2698                        }
2699                    }
2700                    distinct_key.push(ord.column);
2701                }
2702
2703                // Add any remaining `DISTINCT ON` expressions to the key.
2704                for expr in distinct_exprs {
2705                    // If the expression is a reference to an existing column,
2706                    // do not introduce a new column to support it.
2707                    let column = match expr {
2708                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2709                        _ => {
2710                            map_exprs.push(expr);
2711                            arity + map_exprs.len() - 1
2712                        }
2713                    };
2714                    distinct_key.push(column);
2715                }
2716
2717                // `DISTINCT ON` is semantically a TopK with limit 1. The
2718                // columns in `ORDER BY` that are not part of the distinct key,
2719                // if there are any, determine the ordering within each group,
2720                // per PostgreSQL semantics.
2721                let distinct_len = distinct_key.len();
2722                relation_expr = HirRelationExpr::top_k(
2723                    relation_expr.map(map_exprs),
2724                    distinct_key,
2725                    order_by.iter().skip(distinct_len).cloned().collect(),
2726                    Some(HirScalarExpr::literal(
2727                        Datum::Int64(1),
2728                        SqlScalarType::Int64,
2729                    )),
2730                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2731                    group_size_hints.distinct_on_input_group_size,
2732                );
2733            }
2734        }
2735
2736        order_by
2737    };
2738
2739    // Construct a clean scope to expose outwards, where all of the state that
2740    // accumulated in the scope during planning of this SELECT is erased. The
2741    // clean scope has at most one name for each column, and the names are not
2742    // associated with any table.
2743    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2744
2745    Ok(SelectPlan {
2746        expr: relation_expr,
2747        scope,
2748        order_by,
2749        project: project_key,
2750    })
2751}
2752
2753fn plan_scalar_table_funcs(
2754    qcx: &QueryContext,
2755    table_funcs: BTreeMap<Function<Aug>, String>,
2756    table_func_names: &mut BTreeMap<String, Ident>,
2757    relation_expr: &HirRelationExpr,
2758    from_scope: &Scope,
2759) -> Result<(HirRelationExpr, Scope), PlanError> {
2760    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2761
2762    for (table_func, id) in table_funcs.iter() {
2763        table_func_names.insert(
2764            id.clone(),
2765            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2766            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2767        );
2768    }
2769    // If there's only a single table function, we can skip generating
2770    // ordinality columns.
2771    if table_funcs.len() == 1 {
2772        let (table_func, id) = table_funcs.iter().next().unwrap();
2773        let (expr, mut scope) =
2774            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2775
2776        // A single table-function might return several columns as a record
2777        let num_cols = scope.len();
2778        for i in 0..scope.len() {
2779            scope.items[i].table_name = Some(PartialItemName {
2780                database: None,
2781                schema: None,
2782                item: id.clone(),
2783            });
2784            scope.items[i].from_single_column_function = num_cols == 1;
2785            scope.items[i].allow_unqualified_references = false;
2786        }
2787        return Ok((expr, scope));
2788    }
2789    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2790    let (expr, mut scope, num_cols) =
2791        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2792
2793    // Munge the scope so table names match with the generated ids.
2794    let mut i = 0;
2795    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2796        for _ in 0..num_cols {
2797            scope.items[i].table_name = Some(PartialItemName {
2798                database: None,
2799                schema: None,
2800                item: id.clone(),
2801            });
2802            scope.items[i].from_single_column_function = num_cols == 1;
2803            scope.items[i].allow_unqualified_references = false;
2804            i += 1;
2805        }
2806        // Ordinality column. This doubles as the
2807        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2808        // because it only needs to be NULL or not.
2809        scope.items[i].table_name = Some(PartialItemName {
2810            database: None,
2811            schema: None,
2812            item: id.clone(),
2813        });
2814        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2815        scope.items[i].allow_unqualified_references = false;
2816        i += 1;
2817    }
2818    // Coalesced ordinality column.
2819    scope.items[i].allow_unqualified_references = false;
2820    Ok((expr, scope))
2821}
2822
2823/// Plans an expression in a `GROUP BY` clause.
2824///
2825/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2826/// names/expressions defined in the `SELECT` clause. These special cases are
2827/// handled by this function; see comments within the implementation for
2828/// details.
2829fn plan_group_by_expr<'a>(
2830    ecx: &ExprContext,
2831    group_expr: &'a Expr<Aug>,
2832    projection: &'a [(ExpandedSelectItem, ColumnName)],
2833) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2834    let plan_projection = |column: usize| match &projection[column].0 {
2835        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2836        ExpandedSelectItem::Expr(expr) => {
2837            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2838        }
2839    };
2840
2841    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2842    // a special case that means to use the ith item in the SELECT clause.
2843    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2844        return plan_projection(column);
2845    }
2846
2847    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2848    // The `foo` can refer to *either* an input column or an output column. If
2849    // both exist, the input column is preferred.
2850    match group_expr {
2851        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2852            Err(PlanError::UnknownColumn {
2853                table: None,
2854                column,
2855                similar,
2856            }) => {
2857                // The expression was a simple identifier that did not match an
2858                // input column. See if it matches an output column.
2859                let mut iter = projection.iter().map(|(_expr, name)| name);
2860                if let Some(i) = iter.position(|n| *n == column) {
2861                    if iter.any(|n| *n == column) {
2862                        Err(PlanError::AmbiguousColumn(column))
2863                    } else {
2864                        plan_projection(i)
2865                    }
2866                } else {
2867                    // The name didn't match an output column either. Return the
2868                    // "unknown column" error.
2869                    Err(PlanError::UnknownColumn {
2870                        table: None,
2871                        column,
2872                        similar,
2873                    })
2874                }
2875            }
2876            res => Ok((Some(group_expr), res?)),
2877        },
2878        _ => Ok((
2879            Some(group_expr),
2880            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2881        )),
2882    }
2883}
2884
2885/// Plans a slice of `ORDER BY` expressions.
2886///
2887/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2888/// parameter.
2889///
2890/// Returns the determined column orderings and a list of scalar expressions
2891/// that must be mapped onto the underlying relation expression.
2892pub(crate) fn plan_order_by_exprs(
2893    ecx: &ExprContext,
2894    order_by_exprs: &[OrderByExpr<Aug>],
2895    output_columns: &[(usize, &ColumnName)],
2896) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2897    let mut order_by = vec![];
2898    let mut map_exprs = vec![];
2899    for obe in order_by_exprs {
2900        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2901        // If the expression is a reference to an existing column,
2902        // do not introduce a new column to support it.
2903        let column = match expr {
2904            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2905            _ => {
2906                map_exprs.push(expr);
2907                ecx.relation_type.arity() + map_exprs.len() - 1
2908            }
2909        };
2910        order_by.push(resolve_desc_and_nulls_last(obe, column));
2911    }
2912    Ok((order_by, map_exprs))
2913}
2914
2915/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2916///
2917/// The `output_columns` parameter describes, in order, the physical index and
2918/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2919/// corresponds to a `SELECT` list with a single entry named "a" that can be
2920/// found at index 3 in the underlying relation expression.
2921///
2922/// There are three cases to handle.
2923///
2924///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2925///       reference to the specified output column.
2926///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2927///       an output column, if it exists; otherwise it is a reference to an
2928///       input column.
2929///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2930///       arbitrary expressions exclusively refer to input columns, never output
2931///       columns.
2932fn plan_order_by_or_distinct_expr(
2933    ecx: &ExprContext,
2934    expr: &Expr<Aug>,
2935    output_columns: &[(usize, &ColumnName)],
2936) -> Result<HirScalarExpr, PlanError> {
2937    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2938        return Ok(HirScalarExpr::column(output_columns[i].0));
2939    }
2940
2941    if let Expr::Identifier(names) = expr {
2942        if let [name] = &names[..] {
2943            let name = normalize::column_name(name.clone());
2944            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2945            if let Some((i, _)) = iter.next() {
2946                match iter.next() {
2947                    // Per SQL92, names are not considered ambiguous if they
2948                    // refer to identical target list expressions, as in
2949                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2950                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2951                    _ => return Ok(HirScalarExpr::column(*i)),
2952                }
2953            }
2954        }
2955    }
2956
2957    plan_expr(ecx, expr)?.type_as_any(ecx)
2958}
2959
2960fn plan_table_with_joins(
2961    qcx: &QueryContext,
2962    table_with_joins: &TableWithJoins<Aug>,
2963) -> Result<(HirRelationExpr, Scope), PlanError> {
2964    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2965    for join in &table_with_joins.joins {
2966        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2967        expr = new_expr;
2968        scope = new_scope;
2969    }
2970    Ok((expr, scope))
2971}
2972
2973fn plan_table_factor(
2974    qcx: &QueryContext,
2975    table_factor: &TableFactor<Aug>,
2976) -> Result<(HirRelationExpr, Scope), PlanError> {
2977    match table_factor {
2978        TableFactor::Table { name, alias } => {
2979            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2980            let scope = plan_table_alias(scope, alias.as_ref())?;
2981            Ok((expr, scope))
2982        }
2983
2984        TableFactor::Function {
2985            function,
2986            alias,
2987            with_ordinality,
2988        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2989
2990        TableFactor::RowsFrom {
2991            functions,
2992            alias,
2993            with_ordinality,
2994        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2995
2996        TableFactor::Derived {
2997            lateral,
2998            subquery,
2999            alias,
3000        } => {
3001            let mut qcx = (*qcx).clone();
3002            if !lateral {
3003                // Since this derived table was not marked as `LATERAL`,
3004                // make elements in outer scopes invisible until we reach the
3005                // next lateral barrier.
3006                for scope in &mut qcx.outer_scopes {
3007                    if scope.lateral_barrier {
3008                        break;
3009                    }
3010                    scope.items.clear();
3011                }
3012            }
3013            qcx.outer_scopes[0].lateral_barrier = true;
3014            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3015            let scope = plan_table_alias(scope, alias.as_ref())?;
3016            Ok((expr, scope))
3017        }
3018
3019        TableFactor::NestedJoin { join, alias } => {
3020            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3021            let scope = plan_table_alias(scope, alias.as_ref())?;
3022            Ok((expr, scope))
3023        }
3024    }
3025}
3026
3027/// Plans a `ROWS FROM` expression.
3028///
3029/// `ROWS FROM` concatenates table functions into a single table, filling in
3030/// `NULL`s in places where one table function has fewer rows than another. We
3031/// can achieve this by augmenting each table function with a row number, doing
3032/// a `FULL JOIN` between each table function on the row number and eventually
3033/// projecting away the row number columns. Concretely, the following query
3034/// using `ROWS FROM`
3035///
3036/// ```sql
3037/// SELECT
3038///     *
3039/// FROM
3040///     ROWS FROM (
3041///         generate_series(1, 2),
3042///         information_schema._pg_expandarray(ARRAY[9]),
3043///         generate_series(3, 6)
3044///     );
3045/// ```
3046///
3047/// is equivalent to the following query that does not use `ROWS FROM`:
3048///
3049/// ```sql
3050/// SELECT
3051///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3052/// FROM
3053///     generate_series(1, 2) WITH ORDINALITY AS gs1
3054///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3055///         ON gs1.ordinality = expand.ordinality
3056///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3057///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3058/// ```
3059///
3060/// Note the call to `coalesce` in the last join condition, which ensures that
3061/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3062///
3063/// This function creates a HirRelationExpr that follows the structure of the
3064/// latter query.
3065///
3066/// `with_ordinality` can be used to have the output expression contain a
3067/// single coalesced ordinality column at the end of the entire expression.
3068fn plan_rows_from(
3069    qcx: &QueryContext,
3070    functions: &[Function<Aug>],
3071    alias: Option<&TableAlias>,
3072    with_ordinality: bool,
3073) -> Result<(HirRelationExpr, Scope), PlanError> {
3074    // If there's only a single table function, planning proceeds as if `ROWS
3075    // FROM` hadn't been written at all.
3076    if let [function] = functions {
3077        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3078    }
3079
3080    // Per PostgreSQL, all scope items take the name of the first function
3081    // (unless aliased).
3082    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3083    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3084        qcx,
3085        functions,
3086        Some(functions[0].name.full_item_name().clone()),
3087    )?;
3088
3089    // Columns tracks the set of columns we will keep in the projection.
3090    let mut columns = Vec::new();
3091    let mut offset = 0;
3092    // Retain table function's non-ordinality columns.
3093    for (idx, cols) in num_cols.into_iter().enumerate() {
3094        for i in 0..cols {
3095            columns.push(offset + i);
3096        }
3097        offset += cols + 1;
3098
3099        // Remove the ordinality column from the scope, accounting for previous scope
3100        // changes from this loop.
3101        scope.items.remove(offset - idx - 1);
3102    }
3103
3104    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3105    // column. Otherwise remove it from the scope.
3106    if with_ordinality {
3107        columns.push(offset);
3108    } else {
3109        scope.items.pop();
3110    }
3111
3112    let expr = expr.project(columns);
3113
3114    let scope = plan_table_alias(scope, alias)?;
3115    Ok((expr, scope))
3116}
3117
3118/// Plans an expression coalescing multiple table functions. Each table
3119/// function is followed by its row ordinality. The entire expression is
3120/// followed by the coalesced row ordinality.
3121///
3122/// The returned Scope will set all item's table_name's to the `table_name`
3123/// parameter if it is `Some`. If `None`, they will be the name of each table
3124/// function.
3125///
3126/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3127/// each table function.
3128///
3129/// For example, with table functions tf1 returning 1 column (a) and tf2
3130/// returning 2 columns (b, c), this function will return an expr 6 columns:
3131///
3132/// - tf1.a
3133/// - tf1.ordinality
3134/// - tf2.b
3135/// - tf2.c
3136/// - tf2.ordinality
3137/// - coalesced_ordinality
3138///
3139/// And a `Vec<usize>` of `[1, 2]`.
3140fn plan_rows_from_internal<'a>(
3141    qcx: &QueryContext,
3142    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3143    table_name: Option<FullItemName>,
3144) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3145    let mut functions = functions.into_iter();
3146    let mut num_cols = Vec::new();
3147
3148    // Join together each of the table functions in turn. The last column is
3149    // always the column to join against and is maintained to be the coalescence
3150    // of the row number column for all prior functions.
3151    let (mut left_expr, mut left_scope) =
3152        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3153    num_cols.push(left_scope.len() - 1);
3154    // Create the coalesced ordinality column.
3155    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3156    left_scope
3157        .items
3158        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3159
3160    for function in functions {
3161        // The right hand side of a join must be planned in a new scope.
3162        let qcx = qcx.empty_derived_context();
3163        let (right_expr, mut right_scope) =
3164            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3165        num_cols.push(right_scope.len() - 1);
3166        let left_col = left_scope.len() - 1;
3167        let right_col = left_scope.len() + right_scope.len() - 1;
3168        let on = HirScalarExpr::call_binary(
3169            HirScalarExpr::column(left_col),
3170            HirScalarExpr::column(right_col),
3171            expr_func::Eq,
3172        );
3173        left_expr = left_expr
3174            .join(right_expr, on, JoinKind::FullOuter)
3175            .map(vec![HirScalarExpr::call_variadic(
3176                VariadicFunc::Coalesce,
3177                vec![
3178                    HirScalarExpr::column(left_col),
3179                    HirScalarExpr::column(right_col),
3180                ],
3181            )]);
3182
3183        // Project off the previous iteration's coalesced column, but keep both of this
3184        // iteration's ordinality columns.
3185        left_expr = left_expr.project(
3186            (0..left_col) // non-coalesced ordinality columns from left function
3187                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3188                .collect(),
3189        );
3190        // Move the coalesced ordinality column.
3191        right_scope.items.push(left_scope.items.pop().unwrap());
3192
3193        left_scope.items.extend(right_scope.items);
3194    }
3195
3196    Ok((left_expr, left_scope, num_cols))
3197}
3198
3199/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3200/// FROM` clause that contains other table functions. Special aliasing rules
3201/// apply.
3202fn plan_solitary_table_function(
3203    qcx: &QueryContext,
3204    function: &Function<Aug>,
3205    alias: Option<&TableAlias>,
3206    with_ordinality: bool,
3207) -> Result<(HirRelationExpr, Scope), PlanError> {
3208    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3209
3210    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3211    if single_column_function {
3212        let item = &mut scope.items[0];
3213
3214        // Mark that the function only produced a single column. This impacts
3215        // whole-row references.
3216        item.from_single_column_function = true;
3217
3218        // Strange special case for solitary table functions that output one
3219        // column whose name matches the name of the table function. If a table
3220        // alias is provided, the column name is changed to the table alias's
3221        // name. Concretely, the following query returns a column named `x`
3222        // rather than a column named `generate_series`:
3223        //
3224        //     SELECT * FROM generate_series(1, 5) AS x
3225        //
3226        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3227        // since its output column is explicitly named `value`, not
3228        // `jsonb_array_elements`.
3229        //
3230        // Note also that we may (correctly) change the column name again when
3231        // we plan the table alias below if the `alias.columns` is non-empty.
3232        if let Some(alias) = alias {
3233            if let ScopeItem {
3234                table_name: Some(table_name),
3235                column_name,
3236                ..
3237            } = item
3238            {
3239                if table_name.item.as_str() == column_name.as_str() {
3240                    *column_name = normalize::column_name(alias.name.clone());
3241                }
3242            }
3243        }
3244    }
3245
3246    let scope = plan_table_alias(scope, alias)?;
3247    Ok((expr, scope))
3248}
3249
3250/// Plans a table function.
3251///
3252/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3253/// instead to get the appropriate aliasing behavior.
3254fn plan_table_function_internal(
3255    qcx: &QueryContext,
3256    Function {
3257        name,
3258        args,
3259        filter,
3260        over,
3261        distinct,
3262    }: &Function<Aug>,
3263    with_ordinality: bool,
3264    table_name: Option<FullItemName>,
3265) -> Result<(HirRelationExpr, Scope), PlanError> {
3266    assert_none!(filter, "cannot parse table function with FILTER");
3267    assert_none!(over, "cannot parse table function with OVER");
3268    assert!(!*distinct, "cannot parse table function with DISTINCT");
3269
3270    let ecx = &ExprContext {
3271        qcx,
3272        name: "table function arguments",
3273        scope: &Scope::empty(),
3274        relation_type: &SqlRelationType::empty(),
3275        allow_aggregates: false,
3276        allow_subqueries: true,
3277        allow_parameters: true,
3278        allow_windows: false,
3279    };
3280
3281    let scalar_args = match args {
3282        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3283        FunctionArgs::Args { args, order_by } => {
3284            if !order_by.is_empty() {
3285                sql_bail!(
3286                    "ORDER BY specified, but {} is not an aggregate function",
3287                    name
3288                );
3289            }
3290            plan_exprs(ecx, args)?
3291        }
3292    };
3293
3294    let table_name = match table_name {
3295        Some(table_name) => table_name.item,
3296        None => name.full_item_name().item.clone(),
3297    };
3298
3299    let scope_name = Some(PartialItemName {
3300        database: None,
3301        schema: None,
3302        item: table_name,
3303    });
3304
3305    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3306        Func::Table(impls) => {
3307            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3308            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3309            let expr = match tf.imp {
3310                TableFuncImpl::CallTable { mut func, exprs } => {
3311                    if with_ordinality {
3312                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3313                            PlanError::Unsupported {
3314                                feature: format!("WITH ORDINALITY on {}", func),
3315                                discussion_no: None,
3316                            },
3317                        )?;
3318                    }
3319                    HirRelationExpr::CallTable { func, exprs }
3320                }
3321                TableFuncImpl::Expr(expr) => {
3322                    if !with_ordinality {
3323                        expr
3324                    } else {
3325                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3326                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3327                        // back to the legacy implementation or error out the query.
3328                        if qcx
3329                            .scx
3330                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3331                        {
3332                            // Note that this can give an incorrect ordering, and also has an extreme
3333                            // performance problem in some cases. See the doc comment of
3334                            // `TableFuncImpl`.
3335                            tracing::error!(
3336                                %name,
3337                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3338                            );
3339                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3340                                func: WindowExprType::Scalar(ScalarWindowExpr {
3341                                    func: ScalarWindowFunc::RowNumber,
3342                                    order_by: vec![],
3343                                }),
3344                                partition_by: vec![],
3345                                order_by: vec![],
3346                            })])
3347                        } else {
3348                            bail_unsupported!(format!(
3349                                "WITH ORDINALITY or ROWS FROM with {}",
3350                                name
3351                            ));
3352                        }
3353                    }
3354                }
3355            };
3356            (expr, scope)
3357        }
3358        Func::Scalar(impls) => {
3359            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3360            let output = expr.typ(
3361                &qcx.outer_relation_types,
3362                &SqlRelationType::new(vec![]),
3363                &qcx.scx.param_types.borrow(),
3364            );
3365
3366            let relation = SqlRelationType::new(vec![output]);
3367
3368            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3369            let column_name = normalize::column_name(function_ident);
3370            let name = column_name.to_string();
3371
3372            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3373
3374            let mut func = TableFunc::TabletizedScalar { relation, name };
3375            if with_ordinality {
3376                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3377                    feature: format!("WITH ORDINALITY on {}", func),
3378                    discussion_no: None,
3379                })?;
3380            }
3381            (
3382                HirRelationExpr::CallTable {
3383                    func,
3384                    exprs: vec![expr],
3385                },
3386                scope,
3387            )
3388        }
3389        o => sql_bail!(
3390            "{} functions are not supported in functions in FROM",
3391            o.class()
3392        ),
3393    };
3394
3395    if with_ordinality {
3396        scope
3397            .items
3398            .push(ScopeItem::from_name(scope_name, "ordinality"));
3399    }
3400
3401    Ok((expr, scope))
3402}
3403
3404fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3405    if let Some(TableAlias {
3406        name,
3407        columns,
3408        strict,
3409    }) = alias
3410    {
3411        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3412            sql_bail!(
3413                "{} has {} columns available but {} columns specified",
3414                name,
3415                scope.items.len(),
3416                columns.len()
3417            );
3418        }
3419
3420        let table_name = normalize::ident(name.to_owned());
3421        for (i, item) in scope.items.iter_mut().enumerate() {
3422            item.table_name = if item.allow_unqualified_references {
3423                Some(PartialItemName {
3424                    database: None,
3425                    schema: None,
3426                    item: table_name.clone(),
3427                })
3428            } else {
3429                // Columns that prohibit unqualified references are special
3430                // columns from the output of a NATURAL or USING join that can
3431                // only be referenced by their full, pre-join name. Applying an
3432                // alias to the output of that join renders those columns
3433                // inaccessible, which we accomplish here by setting the
3434                // table name to `None`.
3435                //
3436                // Concretely, consider:
3437                //
3438                //      CREATE TABLE t1 (a int);
3439                //      CREATE TABLE t2 (a int);
3440                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3441                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3442                //
3443                // In (1), the join has no alias. The underlying columns from
3444                // either side of the join can be referenced as `t1.a` and
3445                // `t2.a`, respectively, and the unqualified name `a` refers to
3446                // a column whose value is `coalesce(t1.a, t2.a)`.
3447                //
3448                // In (2), the join is aliased as `t`. The columns from either
3449                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3450                // the coalesced column can be named as either `a` or `t.a`.
3451                //
3452                // We previously had a bug [0] that mishandled this subtle
3453                // logic.
3454                //
3455                // NOTE(benesch): We could in theory choose to project away
3456                // those inaccessible columns and drop them from the scope
3457                // entirely, but that would require that this function also
3458                // take and return the `HirRelationExpr` that is being aliased,
3459                // which is a rather large refactor.
3460                //
3461                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3462                None
3463            };
3464            item.column_name = columns
3465                .get(i)
3466                .map(|a| normalize::column_name(a.clone()))
3467                .unwrap_or_else(|| item.column_name.clone());
3468        }
3469    }
3470    Ok(scope)
3471}
3472
3473// `table_func_names` is a mapping from a UUID to the original function
3474// name. The UUIDs are identifiers that have been rewritten from some table
3475// function expression, and this mapping restores the original names.
3476fn invent_column_name(
3477    ecx: &ExprContext,
3478    expr: &Expr<Aug>,
3479    table_func_names: &BTreeMap<String, Ident>,
3480) -> Option<ColumnName> {
3481    // We follow PostgreSQL exactly here, which has some complicated rules
3482    // around "high" and "low" quality names. Low quality names override other
3483    // low quality names but not high quality names.
3484    //
3485    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3486
3487    #[derive(Debug)]
3488    enum NameQuality {
3489        Low,
3490        High,
3491    }
3492
3493    fn invent(
3494        ecx: &ExprContext,
3495        expr: &Expr<Aug>,
3496        table_func_names: &BTreeMap<String, Ident>,
3497    ) -> Option<(ColumnName, NameQuality)> {
3498        match expr {
3499            Expr::Identifier(names) => {
3500                if let [name] = names.as_slice() {
3501                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3502                        return Some((
3503                            normalize::column_name(table_func_name.clone()),
3504                            NameQuality::High,
3505                        ));
3506                    }
3507                }
3508                names
3509                    .last()
3510                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3511            }
3512            Expr::Value(v) => match v {
3513                // Per PostgreSQL, `bool` and `interval` literals take on the name
3514                // of their type, but not other literal types.
3515                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3516                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3517                _ => None,
3518            },
3519            Expr::Function(func) => {
3520                let (schema, item) = match &func.name {
3521                    ResolvedItemName::Item {
3522                        qualifiers,
3523                        full_name,
3524                        ..
3525                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3526                    _ => unreachable!(),
3527                };
3528
3529                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3530                    || schema
3531                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3532                {
3533                    None
3534                } else {
3535                    Some((item.into(), NameQuality::High))
3536                }
3537            }
3538            Expr::HomogenizingFunction { function, .. } => Some((
3539                function.to_string().to_lowercase().into(),
3540                NameQuality::High,
3541            )),
3542            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3543            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3544            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3545            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3546            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3547                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3548                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3549            },
3550            Expr::Case { else_result, .. } => {
3551                match else_result
3552                    .as_ref()
3553                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3554                {
3555                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3556                    _ => Some(("case".into(), NameQuality::Low)),
3557                }
3558            }
3559            Expr::FieldAccess { field, .. } => {
3560                Some((normalize::column_name(field.clone()), NameQuality::High))
3561            }
3562            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3563            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3564            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3565                // A bit silly to have to plan the query here just to get its column
3566                // name, since we throw away the planned expression, but fixing this
3567                // requires a separate semantic analysis phase.
3568                let (_expr, scope) =
3569                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3570                scope
3571                    .items
3572                    .first()
3573                    .map(|name| (name.column_name.clone(), NameQuality::High))
3574            }
3575            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3576            _ => None,
3577        }
3578    }
3579
3580    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3581}
3582
3583#[derive(Debug)]
3584enum ExpandedSelectItem<'a> {
3585    InputOrdinal(usize),
3586    Expr(Cow<'a, Expr<Aug>>),
3587}
3588
3589impl ExpandedSelectItem<'_> {
3590    fn as_expr(&self) -> Option<&Expr<Aug>> {
3591        match self {
3592            ExpandedSelectItem::InputOrdinal(_) => None,
3593            ExpandedSelectItem::Expr(expr) => Some(expr),
3594        }
3595    }
3596}
3597
3598fn expand_select_item<'a>(
3599    ecx: &ExprContext,
3600    s: &'a SelectItem<Aug>,
3601    table_func_names: &BTreeMap<String, Ident>,
3602) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3603    match s {
3604        SelectItem::Expr {
3605            expr: Expr::QualifiedWildcard(table_name),
3606            alias: _,
3607        } => {
3608            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3609            let table_name =
3610                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3611            let out: Vec<_> = ecx
3612                .scope
3613                .items
3614                .iter()
3615                .enumerate()
3616                .filter(|(_i, item)| item.is_from_table(&table_name))
3617                .map(|(i, item)| {
3618                    let name = item.column_name.clone();
3619                    (ExpandedSelectItem::InputOrdinal(i), name)
3620                })
3621                .collect();
3622            if out.is_empty() {
3623                sql_bail!("no table named '{}' in scope", table_name);
3624            }
3625            Ok(out)
3626        }
3627        SelectItem::Expr {
3628            expr: Expr::WildcardAccess(sql_expr),
3629            alias: _,
3630        } => {
3631            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3632            // A bit silly to have to plan the expression here just to get its
3633            // type, since we throw away the planned expression, but fixing this
3634            // requires a separate semantic analysis phase. Luckily this is an
3635            // uncommon operation and the PostgreSQL docs have a warning that
3636            // this operation is slow in Postgres too.
3637            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3638            let fields = match ecx.scalar_type(&expr) {
3639                SqlScalarType::Record { fields, .. } => fields,
3640                ty => sql_bail!(
3641                    "type {} is not composite",
3642                    ecx.humanize_scalar_type(&ty, false)
3643                ),
3644            };
3645            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3646            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3647                if let [name] = ident.as_slice() {
3648                    if let Ok(items) = ecx.scope.items_from_table(
3649                        &[],
3650                        &PartialItemName {
3651                            database: None,
3652                            schema: None,
3653                            item: name.as_str().to_string(),
3654                        },
3655                    ) {
3656                        for (_, item) in items {
3657                            if item
3658                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3659                            {
3660                                skip_cols.insert(item.column_name.clone());
3661                            }
3662                        }
3663                    }
3664                }
3665            }
3666            let items = fields
3667                .iter()
3668                .filter_map(|(name, _ty)| {
3669                    if skip_cols.contains(name) {
3670                        None
3671                    } else {
3672                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3673                            expr: sql_expr.clone(),
3674                            field: name.clone().into(),
3675                        }));
3676                        Some((item, name.clone()))
3677                    }
3678                })
3679                .collect();
3680            Ok(items)
3681        }
3682        SelectItem::Wildcard => {
3683            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3684            let items: Vec<_> = ecx
3685                .scope
3686                .items
3687                .iter()
3688                .enumerate()
3689                .filter(|(_i, item)| item.allow_unqualified_references)
3690                .map(|(i, item)| {
3691                    let name = item.column_name.clone();
3692                    (ExpandedSelectItem::InputOrdinal(i), name)
3693                })
3694                .collect();
3695
3696            Ok(items)
3697        }
3698        SelectItem::Expr { expr, alias } => {
3699            let name = alias
3700                .clone()
3701                .map(normalize::column_name)
3702                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3703                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3704            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3705        }
3706    }
3707}
3708
3709fn plan_join(
3710    left_qcx: &QueryContext,
3711    left: HirRelationExpr,
3712    left_scope: Scope,
3713    join: &Join<Aug>,
3714) -> Result<(HirRelationExpr, Scope), PlanError> {
3715    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3716    let (kind, constraint) = match &join.join_operator {
3717        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3718        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3719        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3720        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3721        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3722    };
3723
3724    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3725    if !kind.can_be_correlated() {
3726        for item in &mut right_qcx.outer_scopes[0].items {
3727            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3728            // these items from scope. These items need to *exist* because they
3729            // might shadow variables in outer scopes that would otherwise be
3730            // valid to reference, but accessing them needs to produce an error.
3731            item.error_if_referenced =
3732                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3733                    table: table.cloned(),
3734                    column: column.clone(),
3735                });
3736        }
3737    }
3738    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3739
3740    let (expr, scope) = match constraint {
3741        JoinConstraint::On(expr) => {
3742            let product_scope = left_scope.product(right_scope)?;
3743            let ecx = &ExprContext {
3744                qcx: left_qcx,
3745                name: "ON clause",
3746                scope: &product_scope,
3747                relation_type: &SqlRelationType::new(
3748                    left_qcx
3749                        .relation_type(&left)
3750                        .column_types
3751                        .into_iter()
3752                        .chain(right_qcx.relation_type(&right).column_types)
3753                        .collect(),
3754                ),
3755                allow_aggregates: false,
3756                allow_subqueries: true,
3757                allow_parameters: true,
3758                allow_windows: false,
3759            };
3760            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3761            let joined = left.join(right, on, kind);
3762            (joined, product_scope)
3763        }
3764        JoinConstraint::Using { columns, alias } => {
3765            let column_names = columns
3766                .iter()
3767                .map(|ident| normalize::column_name(ident.clone()))
3768                .collect::<Vec<_>>();
3769
3770            plan_using_constraint(
3771                &column_names,
3772                left_qcx,
3773                left,
3774                left_scope,
3775                &right_qcx,
3776                right,
3777                right_scope,
3778                kind,
3779                alias.as_ref(),
3780            )?
3781        }
3782        JoinConstraint::Natural => {
3783            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3784            // have the same scx. However, it doesn't hurt to be safe.
3785            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3786            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3787            let left_column_names = left_scope.column_names();
3788            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3789            let column_names: Vec<_> = left_column_names
3790                .filter(|col| right_column_names.contains(col))
3791                .cloned()
3792                .collect();
3793            plan_using_constraint(
3794                &column_names,
3795                left_qcx,
3796                left,
3797                left_scope,
3798                &right_qcx,
3799                right,
3800                right_scope,
3801                kind,
3802                None,
3803            )?
3804        }
3805    };
3806    Ok((expr, scope))
3807}
3808
3809// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3810#[allow(clippy::too_many_arguments)]
3811fn plan_using_constraint(
3812    column_names: &[ColumnName],
3813    left_qcx: &QueryContext,
3814    left: HirRelationExpr,
3815    left_scope: Scope,
3816    right_qcx: &QueryContext,
3817    right: HirRelationExpr,
3818    right_scope: Scope,
3819    kind: JoinKind,
3820    alias: Option<&Ident>,
3821) -> Result<(HirRelationExpr, Scope), PlanError> {
3822    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3823
3824    // Cargo culting PG here; no discernable reason this must fail, but PG does
3825    // so we do, as well.
3826    let mut unique_column_names = BTreeSet::new();
3827    for c in column_names {
3828        if !unique_column_names.insert(c) {
3829            return Err(PlanError::Unsupported {
3830                feature: format!(
3831                    "column name {} appears more than once in USING clause",
3832                    c.quoted()
3833                ),
3834                discussion_no: None,
3835            });
3836        }
3837    }
3838
3839    let alias_item_name = alias.map(|alias| PartialItemName {
3840        database: None,
3841        schema: None,
3842        item: alias.clone().to_string(),
3843    });
3844
3845    if let Some(alias_item_name) = &alias_item_name {
3846        for partial_item_name in both_scope.table_names() {
3847            if partial_item_name.matches(alias_item_name) {
3848                sql_bail!(
3849                    "table name \"{}\" specified more than once",
3850                    alias_item_name
3851                )
3852            }
3853        }
3854    }
3855
3856    let ecx = &ExprContext {
3857        qcx: right_qcx,
3858        name: "USING clause",
3859        scope: &both_scope,
3860        relation_type: &SqlRelationType::new(
3861            left_qcx
3862                .relation_type(&left)
3863                .column_types
3864                .into_iter()
3865                .chain(right_qcx.relation_type(&right).column_types)
3866                .collect(),
3867        ),
3868        allow_aggregates: false,
3869        allow_subqueries: false,
3870        allow_parameters: false,
3871        allow_windows: false,
3872    };
3873
3874    let mut join_exprs = vec![];
3875    let mut map_exprs = vec![];
3876    let mut new_items = vec![];
3877    let mut join_cols = vec![];
3878    let mut hidden_cols = vec![];
3879
3880    for column_name in column_names {
3881        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3882        let (lhs, lhs_name) = left_scope.resolve_using_column(
3883            column_name,
3884            JoinSide::Left,
3885            &mut left_qcx.name_manager.borrow_mut(),
3886        )?;
3887        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3888            column_name,
3889            JoinSide::Right,
3890            &mut right_qcx.name_manager.borrow_mut(),
3891        )?;
3892
3893        // Adjust the RHS reference to its post-join location.
3894        rhs.column += left_scope.len();
3895
3896        // Join keys must be resolved to same type.
3897        let mut exprs = coerce_homogeneous_exprs(
3898            &ecx.with_name(&format!(
3899                "NATURAL/USING join column {}",
3900                column_name.quoted()
3901            )),
3902            vec![
3903                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3904                    lhs,
3905                    Arc::clone(&lhs_name),
3906                )),
3907                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3908                    rhs,
3909                    Arc::clone(&rhs_name),
3910                )),
3911            ],
3912            None,
3913        )?;
3914        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3915
3916        match kind {
3917            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3918                join_cols.push(lhs.column);
3919                hidden_cols.push(rhs.column);
3920            }
3921            JoinKind::RightOuter => {
3922                join_cols.push(rhs.column);
3923                hidden_cols.push(lhs.column);
3924            }
3925            JoinKind::FullOuter => {
3926                // Create a new column that will be the coalesced value of left
3927                // and right.
3928                join_cols.push(both_scope.items.len() + map_exprs.len());
3929                hidden_cols.push(lhs.column);
3930                hidden_cols.push(rhs.column);
3931                map_exprs.push(HirScalarExpr::call_variadic(
3932                    VariadicFunc::Coalesce,
3933                    vec![expr1.clone(), expr2.clone()],
3934                ));
3935                new_items.push(ScopeItem::from_column_name(column_name));
3936            }
3937        }
3938
3939        // If a `join_using_alias` is present, add a new scope item that accepts
3940        // only table-qualified references for each specified join column.
3941        // Unlike regular table aliases, a `join_using_alias` should not hide the
3942        // names of the joined relations.
3943        if alias_item_name.is_some() {
3944            let new_item_col = both_scope.items.len() + new_items.len();
3945            join_cols.push(new_item_col);
3946            hidden_cols.push(new_item_col);
3947
3948            new_items.push(ScopeItem::from_name(
3949                alias_item_name.clone(),
3950                column_name.clone().to_string(),
3951            ));
3952
3953            // Should be safe to use either `lhs` or `rhs` here since the column
3954            // is available in both scopes and must have the same type of the new item.
3955            // We (arbitrarily) choose the left name.
3956            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3957        }
3958
3959        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3960    }
3961    both_scope.items.extend(new_items);
3962
3963    // The columns from the secondary side of the join remain accessible by
3964    // their table-qualified name, but not by their column name alone. They are
3965    // also excluded from `SELECT *`.
3966    for c in hidden_cols {
3967        both_scope.items[c].allow_unqualified_references = false;
3968    }
3969
3970    // Reproject all returned elements to the front of the list.
3971    let project_key = join_cols
3972        .into_iter()
3973        .chain(0..both_scope.items.len())
3974        .unique()
3975        .collect::<Vec<_>>();
3976
3977    both_scope = both_scope.project(&project_key);
3978
3979    let on = HirScalarExpr::variadic_and(join_exprs);
3980
3981    let both = left
3982        .join(right, on, kind)
3983        .map(map_exprs)
3984        .project(project_key);
3985    Ok((both, both_scope))
3986}
3987
3988pub fn plan_expr<'a>(
3989    ecx: &'a ExprContext,
3990    e: &Expr<Aug>,
3991) -> Result<CoercibleScalarExpr, PlanError> {
3992    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3993}
3994
3995fn plan_expr_inner<'a>(
3996    ecx: &'a ExprContext,
3997    e: &Expr<Aug>,
3998) -> Result<CoercibleScalarExpr, PlanError> {
3999    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4000        // We've already calculated this expression.
4001        return Ok(HirScalarExpr::named_column(
4002            i,
4003            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4004        )
4005        .into());
4006    }
4007
4008    match e {
4009        // Names.
4010        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4011            Ok(plan_identifier(ecx, names)?.into())
4012        }
4013
4014        // Literals.
4015        Expr::Value(val) => plan_literal(val),
4016        Expr::Parameter(n) => plan_parameter(ecx, *n),
4017        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4018        Expr::List(exprs) => plan_list(ecx, exprs, None),
4019        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4020        Expr::Row { exprs } => plan_row(ecx, exprs),
4021
4022        // Generalized functions, operators, and casts.
4023        Expr::Op { op, expr1, expr2 } => {
4024            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4025        }
4026        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4027        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4028
4029        // Special functions and operators.
4030        Expr::Not { expr } => plan_not(ecx, expr),
4031        Expr::And { left, right } => plan_and(ecx, left, right),
4032        Expr::Or { left, right } => plan_or(ecx, left, right),
4033        Expr::IsExpr {
4034            expr,
4035            construct,
4036            negated,
4037        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4038        Expr::Case {
4039            operand,
4040            conditions,
4041            results,
4042            else_result,
4043        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4044        Expr::HomogenizingFunction { function, exprs } => {
4045            plan_homogenizing_function(ecx, function, exprs)
4046        }
4047        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4048            ecx,
4049            &None,
4050            &[l_expr.clone().equals(*r_expr.clone())],
4051            &[Expr::null()],
4052            &Some(Box::new(*l_expr.clone())),
4053        )?
4054        .into()),
4055        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4056        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4057        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4058        Expr::Like {
4059            expr,
4060            pattern,
4061            escape,
4062            case_insensitive,
4063            negated,
4064        } => Ok(plan_like(
4065            ecx,
4066            expr,
4067            pattern,
4068            escape.as_deref(),
4069            *case_insensitive,
4070            *negated,
4071        )?
4072        .into()),
4073
4074        Expr::InList {
4075            expr,
4076            list,
4077            negated,
4078        } => plan_in_list(ecx, expr, list, negated),
4079
4080        // Subqueries.
4081        Expr::Exists(query) => plan_exists(ecx, query),
4082        Expr::Subquery(query) => plan_subquery(ecx, query),
4083        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4084        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4085        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4086        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4087        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4088        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4089        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4090        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4091        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4092        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4093        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4094    }
4095}
4096
4097fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4098    if !ecx.allow_parameters {
4099        // It might be clearer to return an error like "cannot use parameter
4100        // here", but this is how PostgreSQL does it, and so for now we follow
4101        // PostgreSQL.
4102        return Err(PlanError::UnknownParameter(n));
4103    }
4104    if n == 0 || n > 65536 {
4105        return Err(PlanError::UnknownParameter(n));
4106    }
4107    if ecx.param_types().borrow().contains_key(&n) {
4108        Ok(HirScalarExpr::parameter(n).into())
4109    } else {
4110        Ok(CoercibleScalarExpr::Parameter(n))
4111    }
4112}
4113
4114fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4115    let mut out = vec![];
4116    for e in exprs {
4117        out.push(plan_expr(ecx, e)?);
4118    }
4119    Ok(CoercibleScalarExpr::LiteralRecord(out))
4120}
4121
4122fn plan_cast(
4123    ecx: &ExprContext,
4124    expr: &Expr<Aug>,
4125    data_type: &ResolvedDataType,
4126) -> Result<CoercibleScalarExpr, PlanError> {
4127    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4128    let expr = match expr {
4129        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4130        // we can pass in the target type as a type hint. This is
4131        // a limited form of the coercion that we do for string literals
4132        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4133        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4134        // PostgreSQL compatibility trouble.
4135        //
4136        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4137        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4138        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4139        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4140        _ => plan_expr(ecx, expr)?,
4141    };
4142    let ecx = &ecx.with_name("CAST");
4143    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4144    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4145    Ok(expr.into())
4146}
4147
4148fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4149    let ecx = ecx.with_name("NOT argument");
4150    Ok(plan_expr(&ecx, expr)?
4151        .type_as(&ecx, &SqlScalarType::Bool)?
4152        .call_unary(UnaryFunc::Not(expr_func::Not))
4153        .into())
4154}
4155
4156fn plan_and(
4157    ecx: &ExprContext,
4158    left: &Expr<Aug>,
4159    right: &Expr<Aug>,
4160) -> Result<CoercibleScalarExpr, PlanError> {
4161    let ecx = ecx.with_name("AND argument");
4162    Ok(HirScalarExpr::variadic_and(vec![
4163        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4164        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4165    ])
4166    .into())
4167}
4168
4169fn plan_or(
4170    ecx: &ExprContext,
4171    left: &Expr<Aug>,
4172    right: &Expr<Aug>,
4173) -> Result<CoercibleScalarExpr, PlanError> {
4174    let ecx = ecx.with_name("OR argument");
4175    Ok(HirScalarExpr::variadic_or(vec![
4176        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4177        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4178    ])
4179    .into())
4180}
4181
4182fn plan_in_list(
4183    ecx: &ExprContext,
4184    lhs: &Expr<Aug>,
4185    list: &Vec<Expr<Aug>>,
4186    negated: &bool,
4187) -> Result<CoercibleScalarExpr, PlanError> {
4188    let ecx = ecx.with_name("IN list");
4189    let or = HirScalarExpr::variadic_or(
4190        list.into_iter()
4191            .map(|e| {
4192                let eq = lhs.clone().equals(e.clone());
4193                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4194            })
4195            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4196    );
4197    Ok(if *negated {
4198        or.call_unary(UnaryFunc::Not(expr_func::Not))
4199    } else {
4200        or
4201    }
4202    .into())
4203}
4204
4205fn plan_homogenizing_function(
4206    ecx: &ExprContext,
4207    function: &HomogenizingFunction,
4208    exprs: &[Expr<Aug>],
4209) -> Result<CoercibleScalarExpr, PlanError> {
4210    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4211    let expr = HirScalarExpr::call_variadic(
4212        match function {
4213            HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4214            HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4215            HomogenizingFunction::Least => VariadicFunc::Least,
4216        },
4217        coerce_homogeneous_exprs(
4218            &ecx.with_name(&function.to_string().to_lowercase()),
4219            plan_exprs(ecx, exprs)?,
4220            None,
4221        )?,
4222    );
4223    Ok(expr.into())
4224}
4225
4226fn plan_field_access(
4227    ecx: &ExprContext,
4228    expr: &Expr<Aug>,
4229    field: &Ident,
4230) -> Result<CoercibleScalarExpr, PlanError> {
4231    let field = normalize::column_name(field.clone());
4232    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4233    let ty = ecx.scalar_type(&expr);
4234    let i = match &ty {
4235        SqlScalarType::Record { fields, .. } => {
4236            fields.iter().position(|(name, _ty)| *name == field)
4237        }
4238        ty => sql_bail!(
4239            "column notation applied to type {}, which is not a composite type",
4240            ecx.humanize_scalar_type(ty, false)
4241        ),
4242    };
4243    match i {
4244        None => sql_bail!(
4245            "field {} not found in data type {}",
4246            field,
4247            ecx.humanize_scalar_type(&ty, false)
4248        ),
4249        Some(i) => Ok(expr
4250            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4251            .into()),
4252    }
4253}
4254
4255fn plan_subscript(
4256    ecx: &ExprContext,
4257    expr: &Expr<Aug>,
4258    positions: &[SubscriptPosition<Aug>],
4259) -> Result<CoercibleScalarExpr, PlanError> {
4260    assert!(
4261        !positions.is_empty(),
4262        "subscript expression must contain at least one position"
4263    );
4264
4265    let ecx = &ecx.with_name("subscripting");
4266    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4267    let ty = ecx.scalar_type(&expr);
4268    match &ty {
4269        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4270            ecx,
4271            expr,
4272            positions,
4273            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4274            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4275            // track in its backing data).
4276            if ty == SqlScalarType::Int2Vector {
4277                1
4278            } else {
4279                0
4280            },
4281        ),
4282        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4283        SqlScalarType::List { element_type, .. } => {
4284            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4285            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4286            let n_layers = ty.unwrap_list_n_layers();
4287            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4288        }
4289        ty => sql_bail!(
4290            "cannot subscript type {}",
4291            ecx.humanize_scalar_type(ty, false)
4292        ),
4293    }
4294}
4295
4296// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4297// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4298// any were slices (i.e. included colon).
4299fn extract_scalar_subscript_from_positions<'a>(
4300    positions: &'a [SubscriptPosition<Aug>],
4301    expr_type_name: &str,
4302) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4303    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4304    for p in positions {
4305        if p.explicit_slice {
4306            sql_bail!("{} subscript does not support slices", expr_type_name);
4307        }
4308        assert!(
4309            p.end.is_none(),
4310            "index-appearing subscripts cannot have end value"
4311        );
4312        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4313    }
4314    Ok(scalar_subscripts)
4315}
4316
4317fn plan_subscript_array(
4318    ecx: &ExprContext,
4319    expr: HirScalarExpr,
4320    positions: &[SubscriptPosition<Aug>],
4321    offset: i64,
4322) -> Result<CoercibleScalarExpr, PlanError> {
4323    let mut exprs = Vec::with_capacity(positions.len() + 1);
4324    exprs.push(expr);
4325
4326    // Subscripting arrays doesn't yet support slicing, so we always want to
4327    // extract scalars or error.
4328    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4329
4330    for i in indexes {
4331        exprs.push(plan_expr(ecx, i)?.cast_to(
4332            ecx,
4333            CastContext::Explicit,
4334            &SqlScalarType::Int64,
4335        )?);
4336    }
4337
4338    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4339}
4340
4341fn plan_subscript_list(
4342    ecx: &ExprContext,
4343    mut expr: HirScalarExpr,
4344    positions: &[SubscriptPosition<Aug>],
4345    mut remaining_layers: usize,
4346    elem_type_name: &str,
4347) -> Result<CoercibleScalarExpr, PlanError> {
4348    let mut i = 0;
4349
4350    while i < positions.len() {
4351        // Take all contiguous index operations, i.e. find next slice operation.
4352        let j = positions[i..]
4353            .iter()
4354            .position(|p| p.explicit_slice)
4355            .unwrap_or(positions.len() - i);
4356        if j != 0 {
4357            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4358            let (n, e) = plan_index_list(
4359                ecx,
4360                expr,
4361                indexes.as_slice(),
4362                remaining_layers,
4363                elem_type_name,
4364            )?;
4365            remaining_layers = n;
4366            expr = e;
4367            i += j;
4368        }
4369
4370        // Take all contiguous slice operations, i.e. find next index operation.
4371        let j = positions[i..]
4372            .iter()
4373            .position(|p| !p.explicit_slice)
4374            .unwrap_or(positions.len() - i);
4375        if j != 0 {
4376            expr = plan_slice_list(
4377                ecx,
4378                expr,
4379                &positions[i..i + j],
4380                remaining_layers,
4381                elem_type_name,
4382            )?;
4383            i += j;
4384        }
4385    }
4386
4387    Ok(expr.into())
4388}
4389
4390fn plan_index_list(
4391    ecx: &ExprContext,
4392    expr: HirScalarExpr,
4393    indexes: &[&Expr<Aug>],
4394    n_layers: usize,
4395    elem_type_name: &str,
4396) -> Result<(usize, HirScalarExpr), PlanError> {
4397    let depth = indexes.len();
4398
4399    if depth > n_layers {
4400        if n_layers == 0 {
4401            sql_bail!("cannot subscript type {}", elem_type_name)
4402        } else {
4403            sql_bail!(
4404                "cannot index into {} layers; list only has {} layer{}",
4405                depth,
4406                n_layers,
4407                if n_layers == 1 { "" } else { "s" }
4408            )
4409        }
4410    }
4411
4412    let mut exprs = Vec::with_capacity(depth + 1);
4413    exprs.push(expr);
4414
4415    for i in indexes {
4416        exprs.push(plan_expr(ecx, i)?.cast_to(
4417            ecx,
4418            CastContext::Explicit,
4419            &SqlScalarType::Int64,
4420        )?);
4421    }
4422
4423    Ok((
4424        n_layers - depth,
4425        HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4426    ))
4427}
4428
4429fn plan_slice_list(
4430    ecx: &ExprContext,
4431    expr: HirScalarExpr,
4432    slices: &[SubscriptPosition<Aug>],
4433    n_layers: usize,
4434    elem_type_name: &str,
4435) -> Result<HirScalarExpr, PlanError> {
4436    if n_layers == 0 {
4437        sql_bail!("cannot subscript type {}", elem_type_name)
4438    }
4439
4440    // first arg will be list
4441    let mut exprs = Vec::with_capacity(slices.len() + 1);
4442    exprs.push(expr);
4443    // extract (start, end) parts from collected slices
4444    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4445        Ok(match position {
4446            Some(p) => {
4447                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4448            }
4449            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4450        })
4451    };
4452    for p in slices {
4453        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4454        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4455        exprs.push(start);
4456        exprs.push(end);
4457    }
4458
4459    Ok(HirScalarExpr::call_variadic(
4460        VariadicFunc::ListSliceLinear,
4461        exprs,
4462    ))
4463}
4464
4465fn plan_like(
4466    ecx: &ExprContext,
4467    expr: &Expr<Aug>,
4468    pattern: &Expr<Aug>,
4469    escape: Option<&Expr<Aug>>,
4470    case_insensitive: bool,
4471    not: bool,
4472) -> Result<HirScalarExpr, PlanError> {
4473    use CastContext::Implicit;
4474    let ecx = ecx.with_name("LIKE argument");
4475    let expr = plan_expr(&ecx, expr)?;
4476    let haystack = match ecx.scalar_type(&expr) {
4477        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4478            .type_as(&ecx, ty)?
4479            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4480        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4481    };
4482    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4483    if let Some(escape) = escape {
4484        pattern = pattern.call_binary(
4485            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4486            expr_func::LikeEscape,
4487        );
4488    }
4489    let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
4490    if not {
4491        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4492    } else {
4493        Ok(like)
4494    }
4495}
4496
4497fn plan_subscript_jsonb(
4498    ecx: &ExprContext,
4499    expr: HirScalarExpr,
4500    positions: &[SubscriptPosition<Aug>],
4501) -> Result<CoercibleScalarExpr, PlanError> {
4502    use CastContext::Implicit;
4503    use SqlScalarType::{Int64, String};
4504
4505    // JSONB doesn't support the slicing syntax, so simply error if you
4506    // encounter any explicit slices.
4507    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4508
4509    let mut exprs = Vec::with_capacity(subscripts.len());
4510    for s in subscripts {
4511        let subscript = plan_expr(ecx, s)?;
4512        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4513            subscript
4514        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4515            // Integers are converted to a string here and then re-parsed as an
4516            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4517            // do it.
4518            typeconv::to_string(ecx, subscript)
4519        } else {
4520            sql_bail!("jsonb subscript type must be coercible to integer or text");
4521        };
4522        exprs.push(subscript);
4523    }
4524
4525    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4526    // `expr->subscript` as you might expect.
4527    let expr = expr.call_binary(
4528        HirScalarExpr::call_variadic(
4529            VariadicFunc::ArrayCreate {
4530                elem_type: SqlScalarType::String,
4531            },
4532            exprs,
4533        ),
4534        BinaryFunc::JsonbGetPath,
4535    );
4536    Ok(expr.into())
4537}
4538
4539fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4540    if !ecx.allow_subqueries {
4541        sql_bail!("{} does not allow subqueries", ecx.name)
4542    }
4543    let mut qcx = ecx.derived_query_context();
4544    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4545    Ok(expr.exists().into())
4546}
4547
4548fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4549    if !ecx.allow_subqueries {
4550        sql_bail!("{} does not allow subqueries", ecx.name)
4551    }
4552    let mut qcx = ecx.derived_query_context();
4553    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4554    let column_types = qcx.relation_type(&expr).column_types;
4555    if column_types.len() != 1 {
4556        sql_bail!(
4557            "Expected subselect to return 1 column, got {} columns",
4558            column_types.len()
4559        );
4560    }
4561    Ok(expr.select().into())
4562}
4563
4564fn plan_list_subquery(
4565    ecx: &ExprContext,
4566    query: &Query<Aug>,
4567) -> Result<CoercibleScalarExpr, PlanError> {
4568    plan_vector_like_subquery(
4569        ecx,
4570        query,
4571        |_| false,
4572        |elem_type| VariadicFunc::ListCreate { elem_type },
4573        |order_by| AggregateFunc::ListConcat { order_by },
4574        expr_func::ListListConcat.into(),
4575        |elem_type| {
4576            HirScalarExpr::literal(
4577                Datum::empty_list(),
4578                SqlScalarType::List {
4579                    element_type: Box::new(elem_type),
4580                    custom_id: None,
4581                },
4582            )
4583        },
4584        "list",
4585    )
4586}
4587
4588fn plan_array_subquery(
4589    ecx: &ExprContext,
4590    query: &Query<Aug>,
4591) -> Result<CoercibleScalarExpr, PlanError> {
4592    plan_vector_like_subquery(
4593        ecx,
4594        query,
4595        |elem_type| {
4596            matches!(
4597                elem_type,
4598                SqlScalarType::Char { .. }
4599                    | SqlScalarType::Array { .. }
4600                    | SqlScalarType::List { .. }
4601                    | SqlScalarType::Map { .. }
4602            )
4603        },
4604        |elem_type| VariadicFunc::ArrayCreate { elem_type },
4605        |order_by| AggregateFunc::ArrayConcat { order_by },
4606        expr_func::ArrayArrayConcat.into(),
4607        |elem_type| {
4608            HirScalarExpr::literal(
4609                Datum::empty_array(),
4610                SqlScalarType::Array(Box::new(elem_type)),
4611            )
4612        },
4613        "[]",
4614    )
4615}
4616
4617/// Generic function used to plan both array subqueries and list subqueries
4618fn plan_vector_like_subquery<F1, F2, F3, F4>(
4619    ecx: &ExprContext,
4620    query: &Query<Aug>,
4621    is_unsupported_type: F1,
4622    vector_create: F2,
4623    aggregate_concat: F3,
4624    binary_concat: BinaryFunc,
4625    empty_literal: F4,
4626    vector_type_string: &str,
4627) -> Result<CoercibleScalarExpr, PlanError>
4628where
4629    F1: Fn(&SqlScalarType) -> bool,
4630    F2: Fn(SqlScalarType) -> VariadicFunc,
4631    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4632    F4: Fn(SqlScalarType) -> HirScalarExpr,
4633{
4634    if !ecx.allow_subqueries {
4635        sql_bail!("{} does not allow subqueries", ecx.name)
4636    }
4637
4638    let mut qcx = ecx.derived_query_context();
4639    let mut planned_query = plan_query(&mut qcx, query)?;
4640    if planned_query.limit.is_some()
4641        || !planned_query
4642            .offset
4643            .clone()
4644            .try_into_literal_int64()
4645            .is_ok_and(|offset| offset == 0)
4646    {
4647        planned_query.expr = HirRelationExpr::top_k(
4648            planned_query.expr,
4649            vec![],
4650            planned_query.order_by.clone(),
4651            planned_query.limit,
4652            planned_query.offset,
4653            planned_query.group_size_hints.limit_input_group_size,
4654        );
4655    }
4656
4657    if planned_query.project.len() != 1 {
4658        sql_bail!(
4659            "Expected subselect to return 1 column, got {} columns",
4660            planned_query.project.len()
4661        );
4662    }
4663
4664    let project_column = *planned_query.project.get(0).unwrap();
4665    let elem_type = qcx
4666        .relation_type(&planned_query.expr)
4667        .column_types
4668        .get(project_column)
4669        .cloned()
4670        .unwrap()
4671        .scalar_type();
4672
4673    if is_unsupported_type(&elem_type) {
4674        bail_unsupported!(format!(
4675            "cannot build array from subquery because return type {}{}",
4676            ecx.humanize_scalar_type(&elem_type, false),
4677            vector_type_string
4678        ));
4679    }
4680
4681    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4682    // subquery above.
4683    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4684        vector_create(elem_type.clone()),
4685        vec![HirScalarExpr::column(project_column)],
4686    ))
4687    .chain(
4688        planned_query
4689            .order_by
4690            .iter()
4691            .map(|co| HirScalarExpr::column(co.column)),
4692    )
4693    .collect();
4694
4695    // However, column references for `aggregation_projection` and `aggregation_order_by`
4696    // are with reference to the `exprs` of the aggregation expression.  Here that is
4697    // `aggregation_exprs`.
4698    let aggregation_projection = vec![0];
4699    let aggregation_order_by = planned_query
4700        .order_by
4701        .into_iter()
4702        .enumerate()
4703        .map(|(i, order)| ColumnOrder { column: i, ..order })
4704        .collect();
4705
4706    let reduced_expr = planned_query
4707        .expr
4708        .reduce(
4709            vec![],
4710            vec![AggregateExpr {
4711                func: aggregate_concat(aggregation_order_by),
4712                expr: Box::new(HirScalarExpr::call_variadic(
4713                    VariadicFunc::RecordCreate {
4714                        field_names: iter::repeat(ColumnName::from(""))
4715                            .take(aggregation_exprs.len())
4716                            .collect(),
4717                    },
4718                    aggregation_exprs,
4719                )),
4720                distinct: false,
4721            }],
4722            None,
4723        )
4724        .project(aggregation_projection);
4725
4726    // If `expr` has no rows, return an empty array/list rather than NULL.
4727    Ok(reduced_expr
4728        .select()
4729        .call_binary(empty_literal(elem_type), binary_concat)
4730        .into())
4731}
4732
4733fn plan_map_subquery(
4734    ecx: &ExprContext,
4735    query: &Query<Aug>,
4736) -> Result<CoercibleScalarExpr, PlanError> {
4737    if !ecx.allow_subqueries {
4738        sql_bail!("{} does not allow subqueries", ecx.name)
4739    }
4740
4741    let mut qcx = ecx.derived_query_context();
4742    let mut query = plan_query(&mut qcx, query)?;
4743    if query.limit.is_some()
4744        || !query
4745            .offset
4746            .clone()
4747            .try_into_literal_int64()
4748            .is_ok_and(|offset| offset == 0)
4749    {
4750        query.expr = HirRelationExpr::top_k(
4751            query.expr,
4752            vec![],
4753            query.order_by.clone(),
4754            query.limit,
4755            query.offset,
4756            query.group_size_hints.limit_input_group_size,
4757        );
4758    }
4759    if query.project.len() != 2 {
4760        sql_bail!(
4761            "expected map subquery to return 2 columns, got {} columns",
4762            query.project.len()
4763        );
4764    }
4765
4766    let query_types = qcx.relation_type(&query.expr).column_types;
4767    let key_column = query.project[0];
4768    let key_type = query_types[key_column].clone().scalar_type();
4769    let value_column = query.project[1];
4770    let value_type = query_types[value_column].clone().scalar_type();
4771
4772    if key_type != SqlScalarType::String {
4773        sql_bail!("cannot build map from subquery because first column is not of type text");
4774    }
4775
4776    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4777        VariadicFunc::RecordCreate {
4778            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4779        },
4780        vec![
4781            HirScalarExpr::column(key_column),
4782            HirScalarExpr::column(value_column),
4783        ],
4784    ))
4785    .chain(
4786        query
4787            .order_by
4788            .iter()
4789            .map(|co| HirScalarExpr::column(co.column)),
4790    )
4791    .collect();
4792
4793    let expr = query
4794        .expr
4795        .reduce(
4796            vec![],
4797            vec![AggregateExpr {
4798                func: AggregateFunc::MapAgg {
4799                    order_by: query
4800                        .order_by
4801                        .into_iter()
4802                        .enumerate()
4803                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4804                        .collect(),
4805                    value_type: value_type.clone(),
4806                },
4807                expr: Box::new(HirScalarExpr::call_variadic(
4808                    VariadicFunc::RecordCreate {
4809                        field_names: iter::repeat(ColumnName::from(""))
4810                            .take(aggregation_exprs.len())
4811                            .collect(),
4812                    },
4813                    aggregation_exprs,
4814                )),
4815                distinct: false,
4816            }],
4817            None,
4818        )
4819        .project(vec![0]);
4820
4821    // If `expr` has no rows, return an empty map rather than NULL.
4822    let expr = HirScalarExpr::call_variadic(
4823        VariadicFunc::Coalesce,
4824        vec![
4825            expr.select(),
4826            HirScalarExpr::literal(
4827                Datum::empty_map(),
4828                SqlScalarType::Map {
4829                    value_type: Box::new(value_type),
4830                    custom_id: None,
4831                },
4832            ),
4833        ],
4834    );
4835
4836    Ok(expr.into())
4837}
4838
4839fn plan_collate(
4840    ecx: &ExprContext,
4841    expr: &Expr<Aug>,
4842    collation: &UnresolvedItemName,
4843) -> Result<CoercibleScalarExpr, PlanError> {
4844    if collation.0.len() == 2
4845        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4846        && collation.0[1] == ident!("default")
4847    {
4848        plan_expr(ecx, expr)
4849    } else {
4850        bail_unsupported!("COLLATE");
4851    }
4852}
4853
4854/// Plans a slice of expressions.
4855///
4856/// This function is a simple convenience function for mapping [`plan_expr`]
4857/// over a slice of expressions. The planned expressions are returned in the
4858/// same order as the input. If any of the expressions fail to plan, returns an
4859/// error instead.
4860fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4861where
4862    E: std::borrow::Borrow<Expr<Aug>>,
4863{
4864    let mut out = vec![];
4865    for expr in exprs {
4866        out.push(plan_expr(ecx, expr.borrow())?);
4867    }
4868    Ok(out)
4869}
4870
4871/// Plans an `ARRAY` expression.
4872fn plan_array(
4873    ecx: &ExprContext,
4874    exprs: &[Expr<Aug>],
4875    type_hint: Option<&SqlScalarType>,
4876) -> Result<CoercibleScalarExpr, PlanError> {
4877    // Plan each element expression.
4878    let mut out = vec![];
4879    for expr in exprs {
4880        out.push(match expr {
4881            // Special case nested ARRAY expressions so we can plumb
4882            // the type hint through.
4883            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4884            _ => plan_expr(ecx, expr)?,
4885        });
4886    }
4887
4888    // Attempt to make use of the type hint.
4889    let type_hint = match type_hint {
4890        // The user has provided an explicit cast to an array type. We know the
4891        // element type to coerce to. Need to be careful, though: if there's
4892        // evidence that any of the array elements are themselves arrays, we
4893        // want to coerce to the array type, not the element type.
4894        Some(SqlScalarType::Array(elem_type)) => {
4895            let multidimensional = out.iter().any(|e| {
4896                matches!(
4897                    ecx.scalar_type(e),
4898                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4899                )
4900            });
4901            if multidimensional {
4902                type_hint
4903            } else {
4904                Some(&**elem_type)
4905            }
4906        }
4907        // The user provided an explicit cast to a non-array type. We'll have to
4908        // guess what the correct type for the array. Our caller will then
4909        // handle converting that array type to the desired non-array type.
4910        Some(_) => None,
4911        // No type hint. We'll have to guess the correct type for the array.
4912        None => None,
4913    };
4914
4915    // Coerce all elements to the same type.
4916    let (elem_type, exprs) = if exprs.is_empty() {
4917        if let Some(elem_type) = type_hint {
4918            (elem_type.clone(), vec![])
4919        } else {
4920            sql_bail!("cannot determine type of empty array");
4921        }
4922    } else {
4923        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4924        (ecx.scalar_type(&out[0]), out)
4925    };
4926
4927    // Arrays of `char` type are disallowed due to a known limitation:
4928    // https://github.com/MaterializeInc/database-issues/issues/2360.
4929    //
4930    // Arrays of `list` and `map` types are disallowed due to mind-bending
4931    // semantics.
4932    if matches!(
4933        elem_type,
4934        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4935    ) {
4936        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4937    }
4938
4939    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4940}
4941
4942fn plan_list(
4943    ecx: &ExprContext,
4944    exprs: &[Expr<Aug>],
4945    type_hint: Option<&SqlScalarType>,
4946) -> Result<CoercibleScalarExpr, PlanError> {
4947    let (elem_type, exprs) = if exprs.is_empty() {
4948        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4949            (element_type.without_modifiers(), vec![])
4950        } else {
4951            sql_bail!("cannot determine type of empty list");
4952        }
4953    } else {
4954        let type_hint = match type_hint {
4955            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4956            _ => None,
4957        };
4958
4959        let mut out = vec![];
4960        for expr in exprs {
4961            out.push(match expr {
4962                // Special case nested LIST expressions so we can plumb
4963                // the type hint through.
4964                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4965                _ => plan_expr(ecx, expr)?,
4966            });
4967        }
4968        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4969        (ecx.scalar_type(&out[0]).without_modifiers(), out)
4970    };
4971
4972    if matches!(elem_type, SqlScalarType::Char { .. }) {
4973        bail_unsupported!("char list");
4974    }
4975
4976    Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4977}
4978
4979fn plan_map(
4980    ecx: &ExprContext,
4981    entries: &[MapEntry<Aug>],
4982    type_hint: Option<&SqlScalarType>,
4983) -> Result<CoercibleScalarExpr, PlanError> {
4984    let (value_type, exprs) = if entries.is_empty() {
4985        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
4986            (value_type.without_modifiers(), vec![])
4987        } else {
4988            sql_bail!("cannot determine type of empty map");
4989        }
4990    } else {
4991        let type_hint = match type_hint {
4992            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
4993            _ => None,
4994        };
4995
4996        let mut keys = vec![];
4997        let mut values = vec![];
4998        for MapEntry { key, value } in entries {
4999            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5000            let value = match value {
5001                // Special case nested MAP expressions so we can plumb
5002                // the type hint through.
5003                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5004                _ => plan_expr(ecx, value)?,
5005            };
5006            keys.push(key);
5007            values.push(value);
5008        }
5009        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5010        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5011        let out = itertools::interleave(keys, values).collect();
5012        (value_type, out)
5013    };
5014
5015    if matches!(value_type, SqlScalarType::Char { .. }) {
5016        bail_unsupported!("char map");
5017    }
5018
5019    let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5020    Ok(expr.into())
5021}
5022
5023/// Coerces a list of expressions such that all input expressions will be cast
5024/// to the same type. If successful, returns a new list of expressions in the
5025/// same order as the input, where each expression has the appropriate casts to
5026/// make them all of a uniform type.
5027///
5028/// If `force_type` is `Some`, the expressions are forced to the specified type
5029/// via an explicit cast. Otherwise the best common type is guessed via
5030/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5031/// implicit casts
5032///
5033/// Note that this is our implementation of Postgres' type conversion for
5034/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5035/// isn't yet used in all of those cases.
5036///
5037/// [union-type-conv]:
5038/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5039pub fn coerce_homogeneous_exprs(
5040    ecx: &ExprContext,
5041    exprs: Vec<CoercibleScalarExpr>,
5042    force_type: Option<&SqlScalarType>,
5043) -> Result<Vec<HirScalarExpr>, PlanError> {
5044    assert!(!exprs.is_empty());
5045
5046    let target_holder;
5047    let target = match force_type {
5048        Some(t) => t,
5049        None => {
5050            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5051            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5052            &target_holder
5053        }
5054    };
5055
5056    // Try to cast all expressions to `target`.
5057    let mut out = Vec::new();
5058    for expr in exprs {
5059        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5060        let ccx = match force_type {
5061            None => CastContext::Implicit,
5062            Some(_) => CastContext::Explicit,
5063        };
5064        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5065            Ok(expr) => out.push(expr),
5066            Err(_) => sql_bail!(
5067                "{} could not convert type {} to {}",
5068                ecx.name,
5069                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5070                ecx.humanize_scalar_type(target, false),
5071            ),
5072        }
5073    }
5074    Ok(out)
5075}
5076
5077/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5078/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5079pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5080    obe: &OrderByExpr<T>,
5081    column: usize,
5082) -> ColumnOrder {
5083    let desc = !obe.asc.unwrap_or(true);
5084    ColumnOrder {
5085        column,
5086        desc,
5087        // https://www.postgresql.org/docs/14/queries-order.html
5088        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5089        nulls_last: obe.nulls_last.unwrap_or(!desc),
5090    }
5091}
5092
5093/// Plans the ORDER BY clause of a window function.
5094///
5095/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5096/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5097/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5098/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5099/// `Vec<HirScalarExpr>` that we return.
5100fn plan_function_order_by(
5101    ecx: &ExprContext,
5102    order_by: &[OrderByExpr<Aug>],
5103) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5104    let mut order_by_exprs = vec![];
5105    let mut col_orders = vec![];
5106    {
5107        for (i, obe) in order_by.iter().enumerate() {
5108            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5109            // do not support ordinal references in PostgreSQL. So we use
5110            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5111            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5112            order_by_exprs.push(expr);
5113            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5114        }
5115    }
5116    Ok((order_by_exprs, col_orders))
5117}
5118
5119/// Common part of the planning of windowed and non-windowed aggregation functions.
5120fn plan_aggregate_common(
5121    ecx: &ExprContext,
5122    Function::<Aug> {
5123        name,
5124        args,
5125        filter,
5126        over: _,
5127        distinct,
5128    }: &Function<Aug>,
5129) -> Result<AggregateExpr, PlanError> {
5130    // Normal aggregate functions, like `sum`, expect as input a single expression
5131    // which yields the datum to aggregate. Order sensitive aggregate functions,
5132    // like `jsonb_agg`, are special, and instead expect a Record whose first
5133    // element yields the datum to aggregate and whose successive elements yield
5134    // keys to order by. This expectation is hard coded within the implementation
5135    // of each of the order-sensitive aggregates. The specification of how many
5136    // order by keys to consider, and in what order, is passed via the `order_by`
5137    // field on the `AggregateFunc` variant.
5138
5139    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5140    // most, so explicitly drop it if the function doesn't care about order. This
5141    // prevents the projection into Record below from triggering on unsupported
5142    // functions.
5143
5144    let impls = match resolve_func(ecx, name, args)? {
5145        Func::Aggregate(impls) => impls,
5146        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5147    };
5148
5149    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5150    // generalized function selection framework. The rule is simple: the user
5151    // must type `count(*)`, but the function selection framework sees an empty
5152    // parameter list, as if the user had typed `count()`. But if the user types
5153    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5154    // rules to all aggregates, not just `count`, since we may one day support
5155    // user-defined aggregates, including user-defined aggregates that take no
5156    // parameters.
5157    let (args, order_by) = match &args {
5158        FunctionArgs::Star => (vec![], vec![]),
5159        FunctionArgs::Args { args, order_by } => {
5160            if args.is_empty() {
5161                sql_bail!(
5162                    "{}(*) must be used to call a parameterless aggregate function",
5163                    ecx.qcx
5164                        .scx
5165                        .humanize_resolved_name(name)
5166                        .expect("name actually resolved")
5167                );
5168            }
5169            let args = plan_exprs(ecx, args)?;
5170            (args, order_by.clone())
5171        }
5172    };
5173
5174    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5175
5176    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5177    if let Some(filter) = &filter {
5178        // If a filter is present, as in
5179        //
5180        //     <agg>(<expr>) FILTER (WHERE <cond>)
5181        //
5182        // we plan it by essentially rewriting the expression to
5183        //
5184        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5185        //
5186        // where <identity> is the identity input for <agg>.
5187        let cond =
5188            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5189        let expr_typ = ecx.scalar_type(&expr);
5190        expr = HirScalarExpr::if_then_else(
5191            cond,
5192            expr,
5193            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5194        );
5195    }
5196
5197    let mut seen_outer = false;
5198    let mut seen_inner = false;
5199    #[allow(deprecated)]
5200    expr.visit_columns(0, &mut |depth, col| {
5201        if depth == 0 && col.level == 0 {
5202            seen_inner = true;
5203        } else if col.level > depth {
5204            seen_outer = true;
5205        }
5206    });
5207    if seen_outer && !seen_inner {
5208        bail_unsupported!(
5209            3720,
5210            "aggregate functions that refer exclusively to outer columns"
5211        );
5212    }
5213
5214    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5215    // map the needed expressions into the aggregate datum.
5216    if func.is_order_sensitive() {
5217        let field_names = iter::repeat(ColumnName::from(""))
5218            .take(1 + order_by_exprs.len())
5219            .collect();
5220        let mut exprs = vec![expr];
5221        exprs.extend(order_by_exprs);
5222        expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5223    }
5224
5225    Ok(AggregateExpr {
5226        func,
5227        expr: Box::new(expr),
5228        distinct: *distinct,
5229    })
5230}
5231
5232fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5233    let mut names = names.to_vec();
5234    let col_name = normalize::column_name(names.pop().unwrap());
5235
5236    // If the name is qualified, it must refer to a column in a table.
5237    if !names.is_empty() {
5238        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5239        let (i, i_name) = ecx.scope.resolve_table_column(
5240            &ecx.qcx.outer_scopes,
5241            &table_name,
5242            &col_name,
5243            &mut ecx.qcx.name_manager.borrow_mut(),
5244        )?;
5245        return Ok(HirScalarExpr::named_column(i, i_name));
5246    }
5247
5248    // If the name is unqualified, first check if it refers to a column. Track any similar names
5249    // that might exist for a better error message.
5250    let similar_names = match ecx.scope.resolve_column(
5251        &ecx.qcx.outer_scopes,
5252        &col_name,
5253        &mut ecx.qcx.name_manager.borrow_mut(),
5254    ) {
5255        Ok((i, i_name)) => {
5256            return Ok(HirScalarExpr::named_column(i, i_name));
5257        }
5258        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5259        Err(e) => return Err(e),
5260    };
5261
5262    // The name doesn't refer to a column. Check if it is a whole-row reference
5263    // to a table.
5264    let items = ecx.scope.items_from_table(
5265        &ecx.qcx.outer_scopes,
5266        &PartialItemName {
5267            database: None,
5268            schema: None,
5269            item: col_name.as_str().to_owned(),
5270        },
5271    )?;
5272    match items.as_slice() {
5273        // The name doesn't refer to a table either. Return an error.
5274        [] => Err(PlanError::UnknownColumn {
5275            table: None,
5276            column: col_name,
5277            similar: similar_names,
5278        }),
5279        // The name refers to a table that is the result of a function that
5280        // returned a single column. Per PostgreSQL, this is a special case
5281        // that returns the value directly.
5282        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5283        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5284            *column,
5285            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5286        )),
5287        // The name refers to a normal table. Return a record containing all the
5288        // columns of the table.
5289        _ => {
5290            let mut has_exists_column = None;
5291            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5292                .into_iter()
5293                .filter_map(|(column, item)| {
5294                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5295                        has_exists_column = Some(column);
5296                        None
5297                    } else {
5298                        let expr = HirScalarExpr::named_column(
5299                            column,
5300                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5301                        );
5302                        let name = item.column_name.clone();
5303                        Some((expr, name))
5304                    }
5305                })
5306                .unzip();
5307            // For the special case of a table function with a single column, the single column is instead not wrapped.
5308            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5309                exprs.into_element()
5310            } else {
5311                HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5312            };
5313            if let Some(has_exists_column) = has_exists_column {
5314                Ok(HirScalarExpr::if_then_else(
5315                    HirScalarExpr::unnamed_column(has_exists_column)
5316                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5317                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5318                    expr,
5319                ))
5320            } else {
5321                Ok(expr)
5322            }
5323        }
5324    }
5325}
5326
5327fn plan_op(
5328    ecx: &ExprContext,
5329    op: &str,
5330    expr1: &Expr<Aug>,
5331    expr2: Option<&Expr<Aug>>,
5332) -> Result<HirScalarExpr, PlanError> {
5333    let impls = func::resolve_op(op)?;
5334    let args = match expr2 {
5335        None => plan_exprs(ecx, &[expr1])?,
5336        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5337    };
5338    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5339}
5340
5341fn plan_function<'a>(
5342    ecx: &ExprContext,
5343    f @ Function {
5344        name,
5345        args,
5346        filter,
5347        over,
5348        distinct,
5349    }: &'a Function<Aug>,
5350) -> Result<HirScalarExpr, PlanError> {
5351    let impls = match resolve_func(ecx, name, args)? {
5352        Func::Table(_) => {
5353            sql_bail!(
5354                "table functions are not allowed in {} (function {})",
5355                ecx.name,
5356                name
5357            );
5358        }
5359        Func::Scalar(impls) => {
5360            if over.is_some() {
5361                sql_bail!(
5362                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5363                );
5364            }
5365            impls
5366        }
5367        Func::ScalarWindow(impls) => {
5368            let (
5369                ignore_nulls,
5370                order_by_exprs,
5371                col_orders,
5372                _window_frame,
5373                partition_by,
5374                scalar_args,
5375            ) = plan_window_function_non_aggr(ecx, f)?;
5376
5377            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5378            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5379            // msg there is less informative.)
5380            if !scalar_args.is_empty() {
5381                if let ResolvedItemName::Item {
5382                    full_name: FullItemName { item, .. },
5383                    ..
5384                } = name
5385                {
5386                    sql_bail!(
5387                        "function {} has 0 parameters, but was called with {}",
5388                        item,
5389                        scalar_args.len()
5390                    );
5391                }
5392            }
5393
5394            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5395            // accept a window frame here without an error msg. (Postgres also does this.)
5396            // TODO: maybe we should give a notice
5397
5398            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5399
5400            if ignore_nulls {
5401                // If we ever add a scalar window function that supports ignore, then don't forget
5402                // to also update HIR EXPLAIN.
5403                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5404            }
5405
5406            return Ok(HirScalarExpr::windowing(WindowExpr {
5407                func: WindowExprType::Scalar(ScalarWindowExpr {
5408                    func,
5409                    order_by: col_orders,
5410                }),
5411                partition_by,
5412                order_by: order_by_exprs,
5413            }));
5414        }
5415        Func::ValueWindow(impls) => {
5416            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5417                plan_window_function_non_aggr(ecx, f)?;
5418
5419            let (args_encoded, func) =
5420                func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5421
5422            if ignore_nulls {
5423                match func {
5424                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5425                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5426                }
5427            }
5428
5429            return Ok(HirScalarExpr::windowing(WindowExpr {
5430                func: WindowExprType::Value(ValueWindowExpr {
5431                    func,
5432                    args: Box::new(args_encoded),
5433                    order_by: col_orders,
5434                    window_frame,
5435                    ignore_nulls, // (RESPECT NULLS is the default)
5436                }),
5437                partition_by,
5438                order_by: order_by_exprs,
5439            }));
5440        }
5441        Func::Aggregate(_) => {
5442            if f.over.is_none() {
5443                // Not a window aggregate. Something is wrong.
5444                if ecx.allow_aggregates {
5445                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5446                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5447                    sql_bail!(
5448                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5449                        name,
5450                    );
5451                } else {
5452                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5453                    // because it was in an unsupported context.
5454                    sql_bail!(
5455                        "aggregate functions are not allowed in {} (function {})",
5456                        ecx.name,
5457                        name
5458                    );
5459                }
5460            } else {
5461                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5462                    plan_window_function_common(ecx, &f.name, &f.over)?;
5463
5464                // https://github.com/MaterializeInc/database-issues/issues/6720
5465                match (&window_frame.start_bound, &window_frame.end_bound) {
5466                    (
5467                        mz_expr::WindowFrameBound::UnboundedPreceding,
5468                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5469                    )
5470                    | (
5471                        mz_expr::WindowFrameBound::UnboundedPreceding,
5472                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5473                    )
5474                    | (
5475                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5476                        mz_expr::WindowFrameBound::UnboundedFollowing,
5477                    )
5478                    | (
5479                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5480                        mz_expr::WindowFrameBound::UnboundedFollowing,
5481                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5482                    (_, _) => {} // other cases are ok
5483                }
5484
5485                if ignore_nulls {
5486                    // https://github.com/MaterializeInc/database-issues/issues/6722
5487                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5488                    // forget to also update HIR EXPLAIN.
5489                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5490                }
5491
5492                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5493
5494                if aggregate_expr.distinct {
5495                    // https://github.com/MaterializeInc/database-issues/issues/6626
5496                    bail_unsupported!("DISTINCT in window aggregates");
5497                }
5498
5499                return Ok(HirScalarExpr::windowing(WindowExpr {
5500                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5501                        aggregate_expr,
5502                        order_by: col_orders,
5503                        window_frame,
5504                    }),
5505                    partition_by,
5506                    order_by: order_by_exprs,
5507                }));
5508            }
5509        }
5510    };
5511
5512    if over.is_some() {
5513        unreachable!("If there is an OVER clause, we should have returned already above.");
5514    }
5515
5516    if *distinct {
5517        sql_bail!(
5518            "DISTINCT specified, but {} is not an aggregate function",
5519            ecx.qcx
5520                .scx
5521                .humanize_resolved_name(name)
5522                .expect("already resolved")
5523        );
5524    }
5525    if filter.is_some() {
5526        sql_bail!(
5527            "FILTER specified, but {} is not an aggregate function",
5528            ecx.qcx
5529                .scx
5530                .humanize_resolved_name(name)
5531                .expect("already resolved")
5532        );
5533    }
5534
5535    let scalar_args = match &args {
5536        FunctionArgs::Star => {
5537            sql_bail!(
5538                "* argument is invalid with non-aggregate function {}",
5539                ecx.qcx
5540                    .scx
5541                    .humanize_resolved_name(name)
5542                    .expect("already resolved")
5543            )
5544        }
5545        FunctionArgs::Args { args, order_by } => {
5546            if !order_by.is_empty() {
5547                sql_bail!(
5548                    "ORDER BY specified, but {} is not an aggregate function",
5549                    ecx.qcx
5550                        .scx
5551                        .humanize_resolved_name(name)
5552                        .expect("already resolved")
5553                );
5554            }
5555            plan_exprs(ecx, args)?
5556        }
5557    };
5558
5559    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5560}
5561
5562pub const IGNORE_NULLS_ERROR_MSG: &str =
5563    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5564
5565/// Resolves the name to a set of function implementations.
5566///
5567/// If the name does not specify a known built-in function, returns an error.
5568pub fn resolve_func(
5569    ecx: &ExprContext,
5570    name: &ResolvedItemName,
5571    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5572) -> Result<&'static Func, PlanError> {
5573    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5574        if let Ok(f) = i.func() {
5575            return Ok(f);
5576        }
5577    }
5578
5579    // Couldn't resolve function with this name, so generate verbose error
5580    // message.
5581    let cexprs = match args {
5582        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5583        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5584            if !order_by.is_empty() {
5585                sql_bail!(
5586                    "ORDER BY specified, but {} is not an aggregate function",
5587                    name
5588                );
5589            }
5590            plan_exprs(ecx, args)?
5591        }
5592    };
5593
5594    let arg_types: Vec<_> = cexprs
5595        .into_iter()
5596        .map(|ty| match ecx.scalar_type(&ty) {
5597            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5598            CoercibleScalarType::Record(_) => "record".to_string(),
5599            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5600        })
5601        .collect();
5602
5603    Err(PlanError::UnknownFunction {
5604        name: name.to_string(),
5605        arg_types,
5606    })
5607}
5608
5609fn plan_is_expr<'a>(
5610    ecx: &ExprContext,
5611    expr: &'a Expr<Aug>,
5612    construct: &IsExprConstruct<Aug>,
5613    not: bool,
5614) -> Result<HirScalarExpr, PlanError> {
5615    let expr = plan_expr(ecx, expr)?;
5616    let mut expr = match construct {
5617        IsExprConstruct::Null => {
5618            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5619            // at odds with our type coercion rules, which treat `NULL` literals
5620            // and unconstrained parameters identically. Providing a type hint
5621            // of string means we wind up supporting both.
5622            let expr = expr.type_as_any(ecx)?;
5623            expr.call_is_null()
5624        }
5625        IsExprConstruct::Unknown => {
5626            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5627            expr.call_is_null()
5628        }
5629        IsExprConstruct::True => {
5630            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5631            expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5632        }
5633        IsExprConstruct::False => {
5634            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5635            expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5636        }
5637        IsExprConstruct::DistinctFrom(expr2) => {
5638            let expr1 = expr.type_as_any(ecx)?;
5639            let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5640            // There are three cases:
5641            // 1. Both terms are non-null, in which case the result should be `a != b`.
5642            // 2. Exactly one term is null, in which case the result should be true.
5643            // 3. Both terms are null, in which case the result should be false.
5644            //
5645            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5646            let term1 = HirScalarExpr::variadic_or(vec![
5647                expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5648                expr1.clone().call_is_null(),
5649                expr2.clone().call_is_null(),
5650            ]);
5651            let term2 = HirScalarExpr::variadic_or(vec![
5652                expr1.call_is_null().not(),
5653                expr2.call_is_null().not(),
5654            ]);
5655            term1.and(term2)
5656        }
5657    };
5658    if not {
5659        expr = expr.not();
5660    }
5661    Ok(expr)
5662}
5663
5664fn plan_case<'a>(
5665    ecx: &ExprContext,
5666    operand: &'a Option<Box<Expr<Aug>>>,
5667    conditions: &'a [Expr<Aug>],
5668    results: &'a [Expr<Aug>],
5669    else_result: &'a Option<Box<Expr<Aug>>>,
5670) -> Result<HirScalarExpr, PlanError> {
5671    let mut cond_exprs = Vec::new();
5672    let mut result_exprs = Vec::new();
5673    for (c, r) in conditions.iter().zip_eq(results) {
5674        let c = match operand {
5675            Some(operand) => operand.clone().equals(c.clone()),
5676            None => c.clone(),
5677        };
5678        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5679        cond_exprs.push(cexpr);
5680        result_exprs.push(r);
5681    }
5682    result_exprs.push(match else_result {
5683        Some(else_result) => else_result,
5684        None => &Expr::Value(Value::Null),
5685    });
5686    let mut result_exprs = coerce_homogeneous_exprs(
5687        &ecx.with_name("CASE"),
5688        plan_exprs(ecx, &result_exprs)?,
5689        None,
5690    )?;
5691    let mut expr = result_exprs.pop().unwrap();
5692    assert_eq!(cond_exprs.len(), result_exprs.len());
5693    for (cexpr, rexpr) in cond_exprs
5694        .into_iter()
5695        .rev()
5696        .zip_eq(result_exprs.into_iter().rev())
5697    {
5698        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5699    }
5700    Ok(expr)
5701}
5702
5703fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5704    let (datum, scalar_type) = match l {
5705        Value::Number(s) => {
5706            let d = strconv::parse_numeric(s.as_str())?;
5707            if !s.contains(&['E', '.'][..]) {
5708                // Maybe representable as an int?
5709                if let Ok(n) = d.0.try_into() {
5710                    (Datum::Int32(n), SqlScalarType::Int32)
5711                } else if let Ok(n) = d.0.try_into() {
5712                    (Datum::Int64(n), SqlScalarType::Int64)
5713                } else {
5714                    (
5715                        Datum::Numeric(d),
5716                        SqlScalarType::Numeric { max_scale: None },
5717                    )
5718                }
5719            } else {
5720                (
5721                    Datum::Numeric(d),
5722                    SqlScalarType::Numeric { max_scale: None },
5723                )
5724            }
5725        }
5726        Value::HexString(_) => bail_unsupported!("hex string literals"),
5727        Value::Boolean(b) => match b {
5728            false => (Datum::False, SqlScalarType::Bool),
5729            true => (Datum::True, SqlScalarType::Bool),
5730        },
5731        Value::Interval(i) => {
5732            let i = literal::plan_interval(i)?;
5733            (Datum::Interval(i), SqlScalarType::Interval)
5734        }
5735        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5736        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5737    };
5738    let expr = HirScalarExpr::literal(datum, scalar_type);
5739    Ok(expr.into())
5740}
5741
5742/// The common part of the planning of non-aggregate window functions, i.e.,
5743/// scalar window functions and value window functions.
5744fn plan_window_function_non_aggr<'a>(
5745    ecx: &ExprContext,
5746    Function {
5747        name,
5748        args,
5749        filter,
5750        over,
5751        distinct,
5752    }: &'a Function<Aug>,
5753) -> Result<
5754    (
5755        bool,
5756        Vec<HirScalarExpr>,
5757        Vec<ColumnOrder>,
5758        mz_expr::WindowFrame,
5759        Vec<HirScalarExpr>,
5760        Vec<CoercibleScalarExpr>,
5761    ),
5762    PlanError,
5763> {
5764    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5765        plan_window_function_common(ecx, name, over)?;
5766
5767    if *distinct {
5768        sql_bail!(
5769            "DISTINCT specified, but {} is not an aggregate function",
5770            name
5771        );
5772    }
5773
5774    if filter.is_some() {
5775        bail_unsupported!("FILTER in non-aggregate window functions");
5776    }
5777
5778    let scalar_args = match &args {
5779        FunctionArgs::Star => {
5780            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5781        }
5782        FunctionArgs::Args { args, order_by } => {
5783            if !order_by.is_empty() {
5784                sql_bail!(
5785                    "ORDER BY specified, but {} is not an aggregate function",
5786                    name
5787                );
5788            }
5789            plan_exprs(ecx, args)?
5790        }
5791    };
5792
5793    Ok((
5794        ignore_nulls,
5795        order_by_exprs,
5796        col_orders,
5797        window_frame,
5798        partition,
5799        scalar_args,
5800    ))
5801}
5802
5803/// The common part of the planning of all window functions.
5804fn plan_window_function_common(
5805    ecx: &ExprContext,
5806    name: &<Aug as AstInfo>::ItemName,
5807    over: &Option<WindowSpec<Aug>>,
5808) -> Result<
5809    (
5810        bool,
5811        Vec<HirScalarExpr>,
5812        Vec<ColumnOrder>,
5813        mz_expr::WindowFrame,
5814        Vec<HirScalarExpr>,
5815    ),
5816    PlanError,
5817> {
5818    if !ecx.allow_windows {
5819        sql_bail!(
5820            "window functions are not allowed in {} (function {})",
5821            ecx.name,
5822            name
5823        );
5824    }
5825
5826    let window_spec = match over.as_ref() {
5827        Some(over) => over,
5828        None => sql_bail!("window function {} requires an OVER clause", name),
5829    };
5830    if window_spec.ignore_nulls && window_spec.respect_nulls {
5831        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5832    }
5833    let window_frame = match window_spec.window_frame.as_ref() {
5834        Some(frame) => plan_window_frame(frame)?,
5835        None => mz_expr::WindowFrame::default(),
5836    };
5837    let mut partition = Vec::new();
5838    for expr in &window_spec.partition_by {
5839        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5840    }
5841
5842    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5843
5844    Ok((
5845        window_spec.ignore_nulls,
5846        order_by_exprs,
5847        col_orders,
5848        window_frame,
5849        partition,
5850    ))
5851}
5852
5853fn plan_window_frame(
5854    WindowFrame {
5855        units,
5856        start_bound,
5857        end_bound,
5858    }: &WindowFrame,
5859) -> Result<mz_expr::WindowFrame, PlanError> {
5860    use mz_expr::WindowFrameBound::*;
5861    let units = window_frame_unit_ast_to_expr(units)?;
5862    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5863    let end_bound = end_bound
5864        .as_ref()
5865        .map(window_frame_bound_ast_to_expr)
5866        .unwrap_or(CurrentRow);
5867
5868    // Validate bounds according to Postgres rules
5869    match (&start_bound, &end_bound) {
5870        // Start bound can't be UNBOUNDED FOLLOWING
5871        (UnboundedFollowing, _) => {
5872            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5873        }
5874        // End bound can't be UNBOUNDED PRECEDING
5875        (_, UnboundedPreceding) => {
5876            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5877        }
5878        // Start bound should come before end bound in the list of bound definitions
5879        (CurrentRow, OffsetPreceding(_)) => {
5880            sql_bail!("frame starting from current row cannot have preceding rows")
5881        }
5882        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5883            sql_bail!("frame starting from following row cannot have preceding rows")
5884        }
5885        // The above rules are adopted from Postgres.
5886        // The following rules are Materialize-specific.
5887        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5888            // Note that the only hard limit is that partition size + offset should fit in i64, so
5889            // in theory, we could support much larger offsets than this. But for our current
5890            // performance, even 1000000 is quite big.
5891            if *o1 > 1000000 || *o2 > 1000000 {
5892                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5893            }
5894        }
5895        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5896            if *o1 > 1000000 || *o2 > 1000000 {
5897                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5898            }
5899        }
5900        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5901            if *o1 > 1000000 || *o2 > 1000000 {
5902                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5903            }
5904        }
5905        (OffsetPreceding(o), CurrentRow) => {
5906            if *o > 1000000 {
5907                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5908            }
5909        }
5910        (CurrentRow, OffsetFollowing(o)) => {
5911            if *o > 1000000 {
5912                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5913            }
5914        }
5915        // Other bounds are valid
5916        (_, _) => (),
5917    }
5918
5919    // RANGE is only supported in the default frame
5920    // https://github.com/MaterializeInc/database-issues/issues/6585
5921    if units == mz_expr::WindowFrameUnits::Range
5922        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5923    {
5924        bail_unsupported!("RANGE in non-default window frames")
5925    }
5926
5927    let frame = mz_expr::WindowFrame {
5928        units,
5929        start_bound,
5930        end_bound,
5931    };
5932    Ok(frame)
5933}
5934
5935fn window_frame_unit_ast_to_expr(
5936    unit: &WindowFrameUnits,
5937) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5938    match unit {
5939        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5940        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5941        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5942    }
5943}
5944
5945fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5946    match bound {
5947        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5948        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5949        WindowFrameBound::Preceding(Some(offset)) => {
5950            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5951        }
5952        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5953        WindowFrameBound::Following(Some(offset)) => {
5954            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5955        }
5956    }
5957}
5958
5959pub fn scalar_type_from_sql(
5960    scx: &StatementContext,
5961    data_type: &ResolvedDataType,
5962) -> Result<SqlScalarType, PlanError> {
5963    match data_type {
5964        ResolvedDataType::AnonymousList(elem_type) => {
5965            let elem_type = scalar_type_from_sql(scx, elem_type)?;
5966            if matches!(elem_type, SqlScalarType::Char { .. }) {
5967                bail_unsupported!("char list");
5968            }
5969            Ok(SqlScalarType::List {
5970                element_type: Box::new(elem_type),
5971                custom_id: None,
5972            })
5973        }
5974        ResolvedDataType::AnonymousMap {
5975            key_type,
5976            value_type,
5977        } => {
5978            match scalar_type_from_sql(scx, key_type)? {
5979                SqlScalarType::String => {}
5980                other => sql_bail!(
5981                    "map key type must be {}, got {}",
5982                    scx.humanize_scalar_type(&SqlScalarType::String, false),
5983                    scx.humanize_scalar_type(&other, false)
5984                ),
5985            }
5986            Ok(SqlScalarType::Map {
5987                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5988                custom_id: None,
5989            })
5990        }
5991        ResolvedDataType::Named { id, modifiers, .. } => {
5992            scalar_type_from_catalog(scx.catalog, *id, modifiers)
5993        }
5994        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
5995    }
5996}
5997
5998pub fn scalar_type_from_catalog(
5999    catalog: &dyn SessionCatalog,
6000    id: CatalogItemId,
6001    modifiers: &[i64],
6002) -> Result<SqlScalarType, PlanError> {
6003    let entry = catalog.get_item(&id);
6004    let type_details = match entry.type_details() {
6005        Some(type_details) => type_details,
6006        None => {
6007            // Resolution should never produce a `ResolvedDataType::Named` with
6008            // an ID of a non-type, but we error gracefully just in case.
6009            sql_bail!(
6010                "internal error: {} does not refer to a type",
6011                catalog.resolve_full_name(entry.name()).to_string().quoted()
6012            );
6013        }
6014    };
6015    match &type_details.typ {
6016        CatalogType::Numeric => {
6017            let mut modifiers = modifiers.iter().fuse();
6018            let precision = match modifiers.next() {
6019                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6020                    sql_bail!(
6021                        "precision for type numeric must be between 1 and {}",
6022                        NUMERIC_DATUM_MAX_PRECISION,
6023                    );
6024                }
6025                Some(p) => Some(*p),
6026                None => None,
6027            };
6028            let scale = match modifiers.next() {
6029                Some(scale) => {
6030                    if let Some(precision) = precision {
6031                        if *scale > precision {
6032                            sql_bail!(
6033                                "scale for type numeric must be between 0 and precision {}",
6034                                precision
6035                            );
6036                        }
6037                    }
6038                    Some(NumericMaxScale::try_from(*scale)?)
6039                }
6040                None => None,
6041            };
6042            if modifiers.next().is_some() {
6043                sql_bail!("type numeric supports at most two type modifiers");
6044            }
6045            Ok(SqlScalarType::Numeric { max_scale: scale })
6046        }
6047        CatalogType::Char => {
6048            let mut modifiers = modifiers.iter().fuse();
6049            let length = match modifiers.next() {
6050                Some(l) => Some(CharLength::try_from(*l)?),
6051                None => Some(CharLength::ONE),
6052            };
6053            if modifiers.next().is_some() {
6054                sql_bail!("type character supports at most one type modifier");
6055            }
6056            Ok(SqlScalarType::Char { length })
6057        }
6058        CatalogType::VarChar => {
6059            let mut modifiers = modifiers.iter().fuse();
6060            let length = match modifiers.next() {
6061                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6062                None => None,
6063            };
6064            if modifiers.next().is_some() {
6065                sql_bail!("type character varying supports at most one type modifier");
6066            }
6067            Ok(SqlScalarType::VarChar { max_length: length })
6068        }
6069        CatalogType::Timestamp => {
6070            let mut modifiers = modifiers.iter().fuse();
6071            let precision = match modifiers.next() {
6072                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6073                None => None,
6074            };
6075            if modifiers.next().is_some() {
6076                sql_bail!("type timestamp supports at most one type modifier");
6077            }
6078            Ok(SqlScalarType::Timestamp { precision })
6079        }
6080        CatalogType::TimestampTz => {
6081            let mut modifiers = modifiers.iter().fuse();
6082            let precision = match modifiers.next() {
6083                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6084                None => None,
6085            };
6086            if modifiers.next().is_some() {
6087                sql_bail!("type timestamp with time zone supports at most one type modifier");
6088            }
6089            Ok(SqlScalarType::TimestampTz { precision })
6090        }
6091        t => {
6092            if !modifiers.is_empty() {
6093                sql_bail!(
6094                    "{} does not support type modifiers",
6095                    catalog.resolve_full_name(entry.name()).to_string()
6096                );
6097            }
6098            match t {
6099                CatalogType::Array {
6100                    element_reference: element_id,
6101                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6102                    catalog,
6103                    *element_id,
6104                    modifiers,
6105                )?))),
6106                CatalogType::List {
6107                    element_reference: element_id,
6108                    element_modifiers,
6109                } => Ok(SqlScalarType::List {
6110                    element_type: Box::new(scalar_type_from_catalog(
6111                        catalog,
6112                        *element_id,
6113                        element_modifiers,
6114                    )?),
6115                    custom_id: Some(id),
6116                }),
6117                CatalogType::Map {
6118                    key_reference: _,
6119                    key_modifiers: _,
6120                    value_reference: value_id,
6121                    value_modifiers,
6122                } => Ok(SqlScalarType::Map {
6123                    value_type: Box::new(scalar_type_from_catalog(
6124                        catalog,
6125                        *value_id,
6126                        value_modifiers,
6127                    )?),
6128                    custom_id: Some(id),
6129                }),
6130                CatalogType::Range {
6131                    element_reference: element_id,
6132                } => Ok(SqlScalarType::Range {
6133                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6134                }),
6135                CatalogType::Record { fields } => {
6136                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6137                        .iter()
6138                        .map(|f| {
6139                            let scalar_type = scalar_type_from_catalog(
6140                                catalog,
6141                                f.type_reference,
6142                                &f.type_modifiers,
6143                            )?;
6144                            Ok((
6145                                f.name.clone(),
6146                                SqlColumnType {
6147                                    scalar_type,
6148                                    nullable: true,
6149                                },
6150                            ))
6151                        })
6152                        .collect::<Result<Box<_>, PlanError>>()?;
6153                    Ok(SqlScalarType::Record {
6154                        fields: scalars,
6155                        custom_id: Some(id),
6156                    })
6157                }
6158                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6159                CatalogType::Bool => Ok(SqlScalarType::Bool),
6160                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6161                CatalogType::Date => Ok(SqlScalarType::Date),
6162                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6163                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6164                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6165                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6166                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6167                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6168                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6169                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6170                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6171                CatalogType::Interval => Ok(SqlScalarType::Interval),
6172                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6173                CatalogType::Oid => Ok(SqlScalarType::Oid),
6174                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6175                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6176                CatalogType::Pseudo => {
6177                    sql_bail!(
6178                        "cannot reference pseudo type {}",
6179                        catalog.resolve_full_name(entry.name()).to_string()
6180                    )
6181                }
6182                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6183                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6184                CatalogType::RegType => Ok(SqlScalarType::RegType),
6185                CatalogType::String => Ok(SqlScalarType::String),
6186                CatalogType::Time => Ok(SqlScalarType::Time),
6187                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6188                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6189                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6190                CatalogType::Numeric => unreachable!("handled above"),
6191                CatalogType::Char => unreachable!("handled above"),
6192                CatalogType::VarChar => unreachable!("handled above"),
6193                CatalogType::Timestamp => unreachable!("handled above"),
6194                CatalogType::TimestampTz => unreachable!("handled above"),
6195            }
6196        }
6197    }
6198}
6199
6200/// This is used to collect aggregates and table functions from within an `Expr`.
6201/// See the explanation of aggregate handling at the top of the file for more details.
6202struct AggregateTableFuncVisitor<'a> {
6203    scx: &'a StatementContext<'a>,
6204    aggs: Vec<Function<Aug>>,
6205    within_aggregate: bool,
6206    tables: BTreeMap<Function<Aug>, String>,
6207    table_disallowed_context: Vec<&'static str>,
6208    in_select_item: bool,
6209    id_gen: IdGen,
6210    err: Option<PlanError>,
6211}
6212
6213impl<'a> AggregateTableFuncVisitor<'a> {
6214    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6215        AggregateTableFuncVisitor {
6216            scx,
6217            aggs: Vec::new(),
6218            within_aggregate: false,
6219            tables: BTreeMap::new(),
6220            table_disallowed_context: Vec::new(),
6221            in_select_item: false,
6222            id_gen: Default::default(),
6223            err: None,
6224        }
6225    }
6226
6227    fn into_result(
6228        self,
6229    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6230        match self.err {
6231            Some(err) => Err(err),
6232            None => {
6233                // Dedup while preserving the order. We don't care what the order is, but it
6234                // has to be reproducible so that EXPLAIN PLAN tests work.
6235                let mut seen = BTreeSet::new();
6236                let aggs = self
6237                    .aggs
6238                    .into_iter()
6239                    .filter(move |agg| seen.insert(agg.clone()))
6240                    .collect();
6241                Ok((aggs, self.tables))
6242            }
6243        }
6244    }
6245}
6246
6247impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6248    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6249        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6250            Ok(i) => i,
6251            // Catching missing functions later in planning improves error messages.
6252            Err(_) => return,
6253        };
6254
6255        match item.func() {
6256            // We don't want to collect window aggregations, because these will be handled not by
6257            // plan_aggregate, but by plan_function.
6258            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6259                if self.within_aggregate {
6260                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6261                    return;
6262                }
6263                self.aggs.push(func.clone());
6264                let Function {
6265                    name: _,
6266                    args,
6267                    filter,
6268                    over: _,
6269                    distinct: _,
6270                } = func;
6271                if let Some(filter) = filter {
6272                    self.visit_expr_mut(filter);
6273                }
6274                let old_within_aggregate = self.within_aggregate;
6275                self.within_aggregate = true;
6276                self.table_disallowed_context
6277                    .push("aggregate function calls");
6278
6279                self.visit_function_args_mut(args);
6280
6281                self.within_aggregate = old_within_aggregate;
6282                self.table_disallowed_context.pop();
6283            }
6284            Ok(Func::Table { .. }) => {
6285                self.table_disallowed_context.push("other table functions");
6286                visit_mut::visit_function_mut(self, func);
6287                self.table_disallowed_context.pop();
6288            }
6289            _ => visit_mut::visit_function_mut(self, func),
6290        }
6291    }
6292
6293    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6294        // Don't go into subqueries.
6295    }
6296
6297    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6298        let (disallowed_context, func) = match expr {
6299            Expr::Case { .. } => (Some("CASE"), None),
6300            Expr::HomogenizingFunction {
6301                function: HomogenizingFunction::Coalesce,
6302                ..
6303            } => (Some("COALESCE"), None),
6304            Expr::Function(func) if self.in_select_item => {
6305                // If we're in a SELECT list, replace table functions with a uuid identifier
6306                // and save the table func so it can be planned elsewhere.
6307                let mut table_func = None;
6308                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6309                    if let Ok(Func::Table { .. }) = item.func() {
6310                        if let Some(context) = self.table_disallowed_context.last() {
6311                            self.err = Some(sql_err!(
6312                                "table functions are not allowed in {} (function {})",
6313                                context,
6314                                func.name
6315                            ));
6316                            return;
6317                        }
6318                        table_func = Some(func.clone());
6319                    }
6320                }
6321                // Since we will descend into the table func below, don't add its own disallow
6322                // context here, instead use visit_function to set that.
6323                (None, table_func)
6324            }
6325            _ => (None, None),
6326        };
6327        if let Some(func) = func {
6328            // Since we are trading out expr, we need to visit the table func here.
6329            visit_mut::visit_expr_mut(self, expr);
6330            // Don't attempt to replace table functions with unsupported syntax.
6331            if let Function {
6332                name: _,
6333                args: _,
6334                filter: None,
6335                over: None,
6336                distinct: false,
6337            } = &func
6338            {
6339                // Identical table functions can be de-duplicated.
6340                let unique_id = self.id_gen.allocate_id();
6341                let id = self
6342                    .tables
6343                    .entry(func)
6344                    .or_insert_with(|| format!("table_func_{unique_id}"));
6345                // We know this is okay because id is is 11 characters + <=20 characters, which is
6346                // less than our max length.
6347                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6348            }
6349        }
6350        if let Some(context) = disallowed_context {
6351            self.table_disallowed_context.push(context);
6352        }
6353
6354        visit_mut::visit_expr_mut(self, expr);
6355
6356        if disallowed_context.is_some() {
6357            self.table_disallowed_context.pop();
6358        }
6359    }
6360
6361    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6362        let old = self.in_select_item;
6363        self.in_select_item = true;
6364        visit_mut::visit_select_item_mut(self, si);
6365        self.in_select_item = old;
6366    }
6367}
6368
6369#[derive(Default)]
6370struct WindowFuncCollector {
6371    window_funcs: Vec<Expr<Aug>>,
6372}
6373
6374impl WindowFuncCollector {
6375    fn into_result(self) -> Vec<Expr<Aug>> {
6376        // Dedup while preserving the order.
6377        let mut seen = BTreeSet::new();
6378        let window_funcs_dedupped = self
6379            .window_funcs
6380            .into_iter()
6381            .filter(move |expr| seen.insert(expr.clone()))
6382            // Reverse the order, so that in case of a nested window function call, the
6383            // inner one is evaluated first.
6384            .rev()
6385            .collect();
6386        window_funcs_dedupped
6387    }
6388}
6389
6390impl Visit<'_, Aug> for WindowFuncCollector {
6391    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6392        match expr {
6393            Expr::Function(func) => {
6394                if func.over.is_some() {
6395                    self.window_funcs.push(expr.clone());
6396                }
6397            }
6398            _ => (),
6399        }
6400        visit::visit_expr(self, expr);
6401    }
6402
6403    fn visit_query(&mut self, _query: &Query<Aug>) {
6404        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6405    }
6406}
6407
6408/// Specifies how long a query will live.
6409#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6410pub enum QueryLifetime {
6411    /// The query's (or the expression's) result will be computed at one point in time.
6412    OneShot,
6413    /// The query (or expression) is used in a dataflow that maintains an index.
6414    Index,
6415    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6416    MaterializedView,
6417    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6418    Subscribe,
6419    /// The query (or expression) is part of a (non-materialized) view.
6420    View,
6421    /// The expression is part of a source definition.
6422    Source,
6423}
6424
6425impl QueryLifetime {
6426    /// (This used to impact whether the query is allowed to reason about the time at which it is
6427    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6428    /// mechanism, see `ExprPrepStyle`.)
6429    pub fn is_one_shot(&self) -> bool {
6430        let result = match self {
6431            QueryLifetime::OneShot => true,
6432            QueryLifetime::Index => false,
6433            QueryLifetime::MaterializedView => false,
6434            QueryLifetime::Subscribe => false,
6435            QueryLifetime::View => false,
6436            QueryLifetime::Source => false,
6437        };
6438        assert_eq!(!result, self.is_maintained());
6439        result
6440    }
6441
6442    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6443    /// turned into a `TopK`.
6444    pub fn is_maintained(&self) -> bool {
6445        match self {
6446            QueryLifetime::OneShot => false,
6447            QueryLifetime::Index => true,
6448            QueryLifetime::MaterializedView => true,
6449            QueryLifetime::Subscribe => true,
6450            QueryLifetime::View => true,
6451            QueryLifetime::Source => true,
6452        }
6453    }
6454
6455    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6456    pub fn allow_show(&self) -> bool {
6457        match self {
6458            QueryLifetime::OneShot => true,
6459            QueryLifetime::Index => false,
6460            QueryLifetime::MaterializedView => false,
6461            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6462            QueryLifetime::View => false,
6463            QueryLifetime::Source => false,
6464        }
6465    }
6466}
6467
6468/// Description of a CTE sufficient for query planning.
6469#[derive(Debug, Clone)]
6470pub struct CteDesc {
6471    pub name: String,
6472    pub desc: RelationDesc,
6473}
6474
6475/// The state required when planning a `Query`.
6476#[derive(Debug, Clone)]
6477pub struct QueryContext<'a> {
6478    /// The context for the containing `Statement`.
6479    pub scx: &'a StatementContext<'a>,
6480    /// The lifetime that the planned query will have.
6481    pub lifetime: QueryLifetime,
6482    /// The scopes of the outer relation expression.
6483    pub outer_scopes: Vec<Scope>,
6484    /// The type of the outer relation expressions.
6485    pub outer_relation_types: Vec<SqlRelationType>,
6486    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6487    pub ctes: BTreeMap<LocalId, CteDesc>,
6488    /// A name manager, for interning column names that will be stored in HIR and MIR.
6489    pub name_manager: Rc<RefCell<NameManager>>,
6490    pub recursion_guard: RecursionGuard,
6491}
6492
6493impl CheckedRecursion for QueryContext<'_> {
6494    fn recursion_guard(&self) -> &RecursionGuard {
6495        &self.recursion_guard
6496    }
6497}
6498
6499impl<'a> QueryContext<'a> {
6500    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6501        QueryContext {
6502            scx,
6503            lifetime,
6504            outer_scopes: vec![],
6505            outer_relation_types: vec![],
6506            ctes: BTreeMap::new(),
6507            name_manager: Rc::new(RefCell::new(NameManager::new())),
6508            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6509        }
6510    }
6511
6512    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6513        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6514    }
6515
6516    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6517    /// `self`.
6518    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6519        let ctes = self.ctes.clone();
6520        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6521        let outer_relation_types = iter::once(relation_type)
6522            .chain(self.outer_relation_types.clone())
6523            .collect();
6524        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6525        let name_manager = Rc::clone(&self.name_manager);
6526
6527        QueryContext {
6528            scx: self.scx,
6529            lifetime: self.lifetime,
6530            outer_scopes,
6531            outer_relation_types,
6532            ctes,
6533            name_manager,
6534            recursion_guard: self.recursion_guard.clone(),
6535        }
6536    }
6537
6538    /// Derives a `QueryContext` for a scope that contains no columns.
6539    fn empty_derived_context(&self) -> QueryContext<'a> {
6540        let scope = Scope::empty();
6541        let ty = SqlRelationType::empty();
6542        self.derived_context(scope, ty)
6543    }
6544
6545    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6546    /// CTE.
6547    pub fn resolve_table_name(
6548        &self,
6549        object: ResolvedItemName,
6550    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6551        match object {
6552            ResolvedItemName::Item {
6553                id,
6554                full_name,
6555                version,
6556                ..
6557            } => {
6558                let name = full_name.into();
6559                let item = self.scx.get_item(&id).at_version(version);
6560                let desc = item
6561                    .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6562                    .clone();
6563                let expr = HirRelationExpr::Get {
6564                    id: Id::Global(item.global_id()),
6565                    typ: desc.typ().clone(),
6566                };
6567
6568                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6569
6570                Ok((expr, scope))
6571            }
6572            ResolvedItemName::Cte { id, name } => {
6573                let name = name.into();
6574                let cte = self.ctes.get(&id).unwrap();
6575                let expr = HirRelationExpr::Get {
6576                    id: Id::Local(id),
6577                    typ: cte.desc.typ().clone(),
6578                };
6579
6580                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6581
6582                Ok((expr, scope))
6583            }
6584            ResolvedItemName::ContinualTask { id, name } => {
6585                let cte = self.ctes.get(&id).unwrap();
6586                let expr = HirRelationExpr::Get {
6587                    id: Id::Local(id),
6588                    typ: cte.desc.typ().clone(),
6589                };
6590
6591                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6592
6593                Ok((expr, scope))
6594            }
6595            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6596        }
6597    }
6598
6599    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6600    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6601    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6602        self.scx.humanize_scalar_type(typ, postgres_compat)
6603    }
6604}
6605
6606/// A bundle of unrelated things that we need for planning `Expr`s.
6607#[derive(Debug, Clone)]
6608pub struct ExprContext<'a> {
6609    pub qcx: &'a QueryContext<'a>,
6610    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6611    pub name: &'a str,
6612    /// The context for the `Query` that contains this `Expr`.
6613    /// The current scope.
6614    pub scope: &'a Scope,
6615    /// The type of the current relation expression upon which this scalar
6616    /// expression will be evaluated.
6617    pub relation_type: &'a SqlRelationType,
6618    /// Are aggregate functions allowed in this context
6619    pub allow_aggregates: bool,
6620    /// Are subqueries allowed in this context
6621    pub allow_subqueries: bool,
6622    /// Are parameters allowed in this context.
6623    pub allow_parameters: bool,
6624    /// Are window functions allowed in this context
6625    pub allow_windows: bool,
6626}
6627
6628impl CheckedRecursion for ExprContext<'_> {
6629    fn recursion_guard(&self) -> &RecursionGuard {
6630        &self.qcx.recursion_guard
6631    }
6632}
6633
6634impl<'a> ExprContext<'a> {
6635    pub fn catalog(&self) -> &dyn SessionCatalog {
6636        self.qcx.scx.catalog
6637    }
6638
6639    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6640        let mut ecx = self.clone();
6641        ecx.name = name;
6642        ecx
6643    }
6644
6645    pub fn column_type<E>(&self, expr: &E) -> E::Type
6646    where
6647        E: AbstractExpr,
6648    {
6649        expr.typ(
6650            &self.qcx.outer_relation_types,
6651            self.relation_type,
6652            &self.qcx.scx.param_types.borrow(),
6653        )
6654    }
6655
6656    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6657    where
6658        E: AbstractExpr,
6659    {
6660        self.column_type(expr).scalar_type()
6661    }
6662
6663    fn derived_query_context(&self) -> QueryContext<'_> {
6664        let mut scope = self.scope.clone();
6665        scope.lateral_barrier = true;
6666        self.qcx.derived_context(scope, self.relation_type.clone())
6667    }
6668
6669    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6670        self.qcx.scx.require_feature_flag(flag)
6671    }
6672
6673    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6674        &self.qcx.scx.param_types
6675    }
6676
6677    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6678    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6679    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6680        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6681    }
6682
6683    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6684        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6685    }
6686}
6687
6688/// Manages column names, doing lightweight string internment.
6689///
6690/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6691/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6692/// to ensure maximal sharing.
6693#[derive(Debug, Clone)]
6694pub struct NameManager(BTreeSet<Arc<str>>);
6695
6696impl NameManager {
6697    /// Creates a new `NameManager`, with no interned names
6698    pub fn new() -> Self {
6699        Self(BTreeSet::new())
6700    }
6701
6702    /// Interns a string, returning a reference-counted pointer to the interned
6703    /// string.
6704    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6705        let s = s.as_ref();
6706        if let Some(interned) = self.0.get(s) {
6707            Arc::clone(interned)
6708        } else {
6709            let interned: Arc<str> = Arc::from(s);
6710            self.0.insert(Arc::clone(&interned));
6711            interned
6712        }
6713    }
6714
6715    /// Interns a string representing a reference to a `ScopeItem`, returning a
6716    /// reference-counted pointer to the interned string.
6717    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6718        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6719        //
6720        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6721        // name information. Note that as of 2025-04-09, we don't support column
6722        // renames.
6723        //
6724        // A few bad alternatives:
6725        //
6726        // (1) Store it but don't write it down. This fails because the expression
6727        //     cache will erase our names on restart.
6728        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6729        //     get the right names. But the world now and the world when we made
6730        //     those objects may be different.
6731        // (3) Just don't write down the table name. Nothing fails... for now.
6732
6733        self.intern(item.column_name.as_str())
6734    }
6735}
6736
6737#[cfg(test)]
6738mod test {
6739    use super::*;
6740
6741    /// Ensure that `NameManager`'s string interning works as expected.
6742    ///
6743    /// In particular, structurally but not referentially identical strings should
6744    /// be interned to the same `Arc`ed pointer.
6745    #[mz_ore::test]
6746    pub fn test_name_manager_string_interning() {
6747        let mut nm = NameManager::new();
6748
6749        let orig_hi = "hi";
6750        let hi = nm.intern(orig_hi);
6751        let hello = nm.intern("hello");
6752
6753        assert_ne!(hi.as_ptr(), hello.as_ptr());
6754
6755        // this static string is _likely_ the same as `orig_hi``
6756        let hi2 = nm.intern("hi");
6757        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6758
6759        // generate a "hi" string that doesn't get optimized to the same static string
6760        let s = format!(
6761            "{}{}",
6762            hi.chars().nth(0).unwrap(),
6763            hi2.chars().nth(1).unwrap()
6764        );
6765        // make sure that we're testing with a fresh string!
6766        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6767
6768        let hi3 = nm.intern(s);
6769        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6770    }
6771}