mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52    func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67    RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103    literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111    pub expr: E,
112    pub desc: RelationDesc,
113    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114    pub scope: Scope,
115}
116
117/// Plans a top-level query, returning the `HirRelationExpr` describing the query
118/// plan, the `RelationDesc` describing the shape of the result set, a
119/// `RowSetFinishing` describing post-processing that must occur before results
120/// are sent to the client, and the types of the parameters in the query, if any
121/// were present.
122///
123/// Note that the returned `RelationDesc` describes the expression after
124/// applying the returned `RowSetFinishing`.
125#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127    scx: &StatementContext,
128    mut query: Query<Aug>,
129    lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131    transform_ast::transform(scx, &mut query)?;
132    let mut qcx = QueryContext::root(scx, lifetime);
133    let PlannedQuery {
134        mut expr,
135        scope,
136        order_by,
137        limit,
138        offset,
139        project,
140        group_size_hints,
141    } = plan_query(&mut qcx, &query)?;
142
143    let mut finishing = RowSetFinishing {
144        limit,
145        offset,
146        project,
147        order_by,
148    };
149
150    // Attempt to push the finishing's ordering past its projection. This allows
151    // data to be projected down on the workers rather than the coordinator. It
152    // also improves the optimizer's demand analysis, as the optimizer can only
153    // reason about demand information in `expr` (i.e., it can't see
154    // `finishing.project`).
155    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157    if lifetime.is_maintained() {
158        expr.finish_maintained(&mut finishing, group_size_hints);
159    }
160
161    let typ = qcx.relation_type(&expr);
162    let typ = SqlRelationType::new(
163        finishing
164            .project
165            .iter()
166            .map(|i| typ.column_types[*i].clone())
167            .collect(),
168    );
169    let desc = RelationDesc::new(typ, scope.column_names());
170
171    Ok(PlannedRootQuery {
172        expr,
173        desc,
174        finishing,
175        scope,
176    })
177}
178
179/// TODO(ct2): Dedup this with [plan_root_query].
180#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182    qcx: &mut QueryContext,
183    mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185    transform_ast::transform(qcx.scx, &mut query)?;
186    let PlannedQuery {
187        mut expr,
188        scope,
189        order_by,
190        limit,
191        offset,
192        project,
193        group_size_hints,
194    } = plan_query(qcx, &query)?;
195
196    let mut finishing = RowSetFinishing {
197        limit,
198        offset,
199        project,
200        order_by,
201    };
202
203    // Attempt to push the finishing's ordering past its projection. This allows
204    // data to be projected down on the workers rather than the coordinator. It
205    // also improves the optimizer's demand analysis, as the optimizer can only
206    // reason about demand information in `expr` (i.e., it can't see
207    // `finishing.project`).
208    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210    expr.finish_maintained(&mut finishing, group_size_hints);
211
212    let typ = qcx.relation_type(&expr);
213    let typ = SqlRelationType::new(
214        finishing
215            .project
216            .iter()
217            .map(|i| typ.column_types[*i].clone())
218            .collect(),
219    );
220    let desc = RelationDesc::new(typ, scope.column_names());
221
222    Ok(PlannedRootQuery {
223        expr,
224        desc,
225        finishing,
226        scope,
227    })
228}
229
230/// Attempts to push a projection through an order by.
231///
232/// The returned bool indicates whether the pushdown was successful or not.
233/// Successful pushdown requires that all the columns referenced in `order_by`
234/// are included in `project`.
235///
236/// When successful, `expr` is wrapped in a projection node, `order_by` is
237/// rewritten to account for the pushed-down projection, and `project` is
238/// replaced with the trivial projection. When unsuccessful, no changes are made
239/// to any of the inputs.
240fn try_push_projection_order_by(
241    expr: &mut HirRelationExpr,
242    project: &mut Vec<usize>,
243    order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245    let mut unproject = vec![None; expr.arity()];
246    for (out_i, in_i) in project.iter().copied().enumerate() {
247        unproject[in_i] = Some(out_i);
248    }
249    if order_by
250        .iter()
251        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252    {
253        let trivial_project = (0..project.len()).collect();
254        *expr = expr.take().project(mem::replace(project, trivial_project));
255        for ob in order_by {
256            ob.column = unproject[ob.column].unwrap();
257        }
258        true
259    } else {
260        false
261    }
262}
263
264pub fn plan_insert_query(
265    scx: &StatementContext,
266    table_name: ResolvedItemName,
267    columns: Vec<Ident>,
268    source: InsertSource<Aug>,
269    returning: Vec<SelectItem<Aug>>,
270) -> Result<
271    (
272        CatalogItemId,
273        HirRelationExpr,
274        PlannedRootQuery<Vec<HirScalarExpr>>,
275    ),
276    PlanError,
277> {
278    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279    let table = scx.get_item_by_resolved_name(&table_name)?;
280
281    // Validate the target of the insert.
282    if table.item_type() != CatalogItemType::Table {
283        sql_bail!(
284            "cannot insert into {} '{}'",
285            table.item_type(),
286            table_name.full_name_str()
287        );
288    }
289    let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
290    let mut defaults = table
291        .writable_table_details()
292        .ok_or_else(|| {
293            sql_err!(
294                "cannot insert into non-writeable table '{}'",
295                table_name.full_name_str()
296            )
297        })?
298        .to_vec();
299
300    for default in &mut defaults {
301        transform_ast::transform(scx, default)?;
302    }
303
304    if table.id().is_system() {
305        sql_bail!(
306            "cannot insert into system table '{}'",
307            table_name.full_name_str()
308        );
309    }
310
311    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313    // Validate target column order.
314    let mut source_types = Vec::with_capacity(columns.len());
315    let mut ordering = Vec::with_capacity(columns.len());
316
317    if columns.is_empty() {
318        // Columns in source query must be in order. Let's guess the full shape and truncate to the
319        // right size later after planning the source query
320        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321        ordering.extend(0..desc.arity());
322    } else {
323        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324            .iter()
325            .enumerate()
326            .map(|(idx, (name, typ))| (name, (idx, typ)))
327            .collect();
328
329        for c in &columns {
330            if let Some((idx, typ)) = column_by_name.get(c) {
331                ordering.push(*idx);
332                source_types.push(&typ.scalar_type);
333            } else {
334                sql_bail!(
335                    "column {} of relation {} does not exist",
336                    c.quoted(),
337                    table_name.full_name_str().quoted()
338                );
339            }
340        }
341        if let Some(dup) = columns.iter().duplicates().next() {
342            sql_bail!("column {} specified more than once", dup.quoted());
343        }
344    };
345
346    // Plan the source.
347    let expr = match source {
348        InsertSource::Query(mut query) => {
349            transform_ast::transform(scx, &mut query)?;
350
351            match query {
352                // Special-case simple VALUES clauses as PostgreSQL does.
353                Query {
354                    body: SetExpr::Values(Values(values)),
355                    ctes,
356                    order_by,
357                    limit: None,
358                    offset: None,
359                } if ctes.is_empty() && order_by.is_empty() => {
360                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361                    plan_values_insert(&qcx, &names, &source_types, &values)?
362                }
363                _ => {
364                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365                    expr
366                }
367            }
368        }
369        InsertSource::DefaultValues => {
370            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371        }
372    };
373
374    let expr_arity = expr.arity();
375
376    // Validate that the arity of the source query is at most the size of declared columns or the
377    // size of the table if none are declared
378    let max_columns = if columns.is_empty() {
379        desc.arity()
380    } else {
381        columns.len()
382    };
383    if expr_arity > max_columns {
384        sql_bail!("INSERT has more expressions than target columns");
385    }
386    // But it should never have less than the declared columns (or zero)
387    if expr_arity < columns.len() {
388        sql_bail!("INSERT has more target columns than expressions");
389    }
390
391    // Trim now that we know for sure the correct arity of the source query
392    source_types.truncate(expr_arity);
393    ordering.truncate(expr_arity);
394
395    // Ensure the types of the source query match the types of the target table,
396    // installing assignment casts where necessary and possible.
397    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398        sql_err!(
399            "column {} is of type {} but expression is of type {}",
400            desc.get_name(ordering[e.column]).quoted(),
401            qcx.humanize_scalar_type(&e.target_type, false),
402            qcx.humanize_scalar_type(&e.source_type, false),
403        )
404    })?;
405
406    // Fill in any omitted columns and rearrange into correct order
407    let mut map_exprs = vec![];
408    let mut project_key = Vec::with_capacity(desc.arity());
409
410    // Maps from table column index to position in the source query
411    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414    for (col_idx, (col_typ, default)) in column_details {
415        if let Some(src_idx) = col_to_source.get(&col_idx) {
416            project_key.push(*src_idx);
417        } else {
418            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419            project_key.push(expr_arity + map_exprs.len());
420            map_exprs.push(hir);
421        }
422    }
423
424    let returning = {
425        let (scope, typ) = if let ResolvedItemName::Item {
426            full_name,
427            version: _,
428            ..
429        } = table_name
430        {
431            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432            let typ = desc.typ().clone();
433            (scope, typ)
434        } else {
435            (Scope::empty(), SqlRelationType::empty())
436        };
437        let ecx = &ExprContext {
438            qcx: &qcx,
439            name: "RETURNING clause",
440            scope: &scope,
441            relation_type: &typ,
442            allow_aggregates: false,
443            allow_subqueries: false,
444            allow_parameters: true,
445            allow_windows: false,
446        };
447        let table_func_names = BTreeMap::new();
448        let mut output_columns = vec![];
449        let mut new_exprs = vec![];
450        let mut new_type = SqlRelationType::empty();
451        for mut si in returning {
452            transform_ast::transform(scx, &mut si)?;
453            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454                let expr = match &select_item {
455                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457                };
458                output_columns.push(column_name);
459                let typ = ecx.column_type(&expr);
460                new_type.column_types.push(typ);
461                new_exprs.push(expr);
462            }
463        }
464        let desc = RelationDesc::new(new_type, output_columns);
465        let desc_arity = desc.arity();
466        PlannedRootQuery {
467            expr: new_exprs,
468            desc,
469            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470            scope,
471        }
472    };
473
474    Ok((
475        table.id(),
476        expr.map(map_exprs).project(project_key),
477        returning,
478    ))
479}
480
481/// Determines the mapping between some external data and a Materialize relation.
482///
483/// Returns the following:
484/// * [`CatalogItemId`] for the destination table.
485/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
486/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
487///   since we now return a [`MapFilterProject`].
488/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
489///   destination table.
490///
491pub fn plan_copy_item(
492    scx: &StatementContext,
493    item_name: ResolvedItemName,
494    columns: Vec<Ident>,
495) -> Result<
496    (
497        CatalogItemId,
498        RelationDesc,
499        Vec<ColumnIndex>,
500        Option<MapFilterProject>,
501    ),
502    PlanError,
503> {
504    let item = scx.get_item_by_resolved_name(&item_name)?;
505    let fullname = scx.catalog.resolve_full_name(item.name());
506    let table_desc = item.desc(&fullname)?.into_owned();
507    let mut ordering = Vec::with_capacity(columns.len());
508
509    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
510    // should be simplified. The reason they are currently separate code paths is so we can roll
511    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
512
513    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
514    // FROM SOURCE ...`), then we generate an MFP.
515    //
516    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
517    // so it's not always guaranteed that our `item` is a table.
518    let mfp = if let Some(table_defaults) = item.writable_table_details() {
519        let mut table_defaults = table_defaults.to_vec();
520
521        for default in &mut table_defaults {
522            transform_ast::transform(scx, default)?;
523        }
524
525        // Fill in any omitted columns and rearrange into correct order
526        let source_column_names: Vec<_> = columns
527            .iter()
528            .cloned()
529            .map(normalize::column_name)
530            .collect();
531
532        let mut default_exprs = Vec::new();
533        let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535        // For each column in the destination table, either project it from the source data, or provide
536        // an expression to fill in a default value.
537        let column_details = table_desc.iter().zip_eq(table_defaults);
538        for ((col_name, col_type), col_default) in column_details {
539            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540            if let Some(src_idx) = maybe_src_idx {
541                project_keys.push(src_idx);
542            } else {
543                // If one a column from the table does not exist in the source data, then a default
544                // value will get appended to the end of the input Row from the source data.
545                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546                let mir = hir.lower_uncorrelated()?;
547                project_keys.push(source_column_names.len() + default_exprs.len());
548                default_exprs.push(mir);
549            }
550        }
551
552        let mfp = MapFilterProject::new(source_column_names.len())
553            .map(default_exprs)
554            .project(project_keys);
555        Some(mfp)
556    } else {
557        None
558    };
559
560    // Create a mapping from input data to the table we're copying into.
561    let source_desc = if columns.is_empty() {
562        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563        ordering.extend(indexes);
564
565        // The source data should be in the same order as the table.
566        table_desc
567    } else {
568        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
570            .iter_all()
571            .map(|(idx, name, typ)| (name, (*idx, typ)))
572            .collect();
573
574        let mut names = Vec::with_capacity(columns.len());
575        let mut source_types = Vec::with_capacity(columns.len());
576
577        for c in &columns {
578            if let Some((idx, typ)) = column_by_name.get(c) {
579                ordering.push(*idx);
580                source_types.push((*typ).clone());
581                names.push(c.clone());
582            } else {
583                sql_bail!(
584                    "column {} of relation {} does not exist",
585                    c.quoted(),
586                    item_name.full_name_str().quoted()
587                );
588            }
589        }
590        if let Some(dup) = columns.iter().duplicates().next() {
591            sql_bail!("column {} specified more than once", dup.quoted());
592        }
593
594        // The source data is a different shape than the destination table.
595        RelationDesc::new(SqlRelationType::new(source_types), names)
596    };
597
598    Ok((item.id(), source_desc, ordering, mfp))
599}
600
601/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
602///
603/// TODO(cf3): Merge this method with [`plan_copy_item`].
604pub fn plan_copy_from(
605    scx: &StatementContext,
606    table_name: ResolvedItemName,
607    columns: Vec<Ident>,
608) -> Result<
609    (
610        CatalogItemId,
611        RelationDesc,
612        Vec<ColumnIndex>,
613        Option<MapFilterProject>,
614    ),
615    PlanError,
616> {
617    let table = scx.get_item_by_resolved_name(&table_name)?;
618
619    // Validate the target of the insert.
620    if table.item_type() != CatalogItemType::Table {
621        sql_bail!(
622            "cannot insert into {} '{}'",
623            table.item_type(),
624            table_name.full_name_str()
625        );
626    }
627
628    let _ = table.writable_table_details().ok_or_else(|| {
629        sql_err!(
630            "cannot insert into non-writeable table '{}'",
631            table_name.full_name_str()
632        )
633    })?;
634
635    if table.id().is_system() {
636        sql_bail!(
637            "cannot insert into system table '{}'",
638            table_name.full_name_str()
639        );
640    }
641    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643    Ok((id, desc, ordering, mfp))
644}
645
646/// Builds a plan that adds the default values for the missing columns and re-orders
647/// the datums in the given rows to match the order in the target table.
648pub fn plan_copy_from_rows(
649    pcx: &PlanContext,
650    catalog: &dyn SessionCatalog,
651    target_id: CatalogItemId,
652    target_name: String,
653    columns: Vec<ColumnIndex>,
654    rows: Vec<mz_repr::Row>,
655) -> Result<HirRelationExpr, PlanError> {
656    let scx = StatementContext::new(Some(pcx), catalog);
657
658    // Always copy at the latest version of the table.
659    let table = catalog
660        .try_get_item(&target_id)
661        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
662        .at_version(RelationVersionSelector::Latest);
663    let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
664
665    let mut defaults = table
666        .writable_table_details()
667        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
668        .to_vec();
669
670    for default in &mut defaults {
671        transform_ast::transform(&scx, default)?;
672    }
673
674    let column_types = columns
675        .iter()
676        .map(|x| desc.get_type(x).clone())
677        .map(|mut x| {
678            // Null constraint is enforced later, when inserting the row in the table.
679            // Without this, an assert is hit during lowering.
680            x.nullable = true;
681            x
682        })
683        .collect();
684    let typ = SqlRelationType::new(column_types);
685    let expr = HirRelationExpr::Constant {
686        rows,
687        typ: typ.clone(),
688    };
689
690    // Exit early with just the raw constant if we know that all columns are present
691    // and in the correct order. This lets us bypass expensive downstream optimizations
692    // more easily, as at every stage we know this expression is nothing more than
693    // a constant (as opposed to e.g. a constant with with an identity map and identity
694    // projection).
695    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
696    if columns == default {
697        return Ok(expr);
698    }
699
700    // Fill in any omitted columns and rearrange into correct order
701    let mut map_exprs = vec![];
702    let mut project_key = Vec::with_capacity(desc.arity());
703
704    // Maps from table column index to position in the source query
705    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
706
707    let column_details = desc.iter_all().zip_eq(defaults);
708    for ((col_idx, _col_name, col_typ), default) in column_details {
709        if let Some(src_idx) = col_to_source.get(&col_idx) {
710            project_key.push(*src_idx);
711        } else {
712            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
713            project_key.push(typ.arity() + map_exprs.len());
714            map_exprs.push(hir);
715        }
716    }
717
718    Ok(expr.map(map_exprs).project(project_key))
719}
720
721/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
722pub struct ReadThenWritePlan {
723    pub id: CatalogItemId,
724    /// Read portion of query.
725    ///
726    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
727    /// retractions.
728    pub selection: HirRelationExpr,
729    /// Map from column index to SET expression. Empty for DELETE statements.
730    pub assignments: BTreeMap<usize, HirScalarExpr>,
731    pub finishing: RowSetFinishing,
732}
733
734pub fn plan_delete_query(
735    scx: &StatementContext,
736    mut delete_stmt: DeleteStatement<Aug>,
737) -> Result<ReadThenWritePlan, PlanError> {
738    transform_ast::transform(scx, &mut delete_stmt)?;
739
740    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
741    plan_mutation_query_inner(
742        qcx,
743        delete_stmt.table_name,
744        delete_stmt.alias,
745        delete_stmt.using,
746        vec![],
747        delete_stmt.selection,
748    )
749}
750
751pub fn plan_update_query(
752    scx: &StatementContext,
753    mut update_stmt: UpdateStatement<Aug>,
754) -> Result<ReadThenWritePlan, PlanError> {
755    transform_ast::transform(scx, &mut update_stmt)?;
756
757    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
758
759    plan_mutation_query_inner(
760        qcx,
761        update_stmt.table_name,
762        update_stmt.alias,
763        vec![],
764        update_stmt.assignments,
765        update_stmt.selection,
766    )
767}
768
769pub fn plan_mutation_query_inner(
770    qcx: QueryContext,
771    table_name: ResolvedItemName,
772    alias: Option<TableAlias>,
773    using: Vec<TableWithJoins<Aug>>,
774    assignments: Vec<Assignment<Aug>>,
775    selection: Option<Expr<Aug>>,
776) -> Result<ReadThenWritePlan, PlanError> {
777    // Get ID and version of the relation desc.
778    let (id, version) = match table_name {
779        ResolvedItemName::Item { id, version, .. } => (id, version),
780        _ => sql_bail!("cannot mutate non-user table"),
781    };
782
783    // Perform checks on item with given ID.
784    let item = qcx.scx.get_item(&id).at_version(version);
785    if item.item_type() != CatalogItemType::Table {
786        sql_bail!(
787            "cannot mutate {} '{}'",
788            item.item_type(),
789            table_name.full_name_str()
790        );
791    }
792    let _ = item.writable_table_details().ok_or_else(|| {
793        sql_err!(
794            "cannot mutate non-writeable table '{}'",
795            table_name.full_name_str()
796        )
797    })?;
798    if id.is_system() {
799        sql_bail!(
800            "cannot mutate system table '{}'",
801            table_name.full_name_str()
802        );
803    }
804
805    // Derive structs for operation from validated table
806    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
807    let scope = plan_table_alias(scope, alias.as_ref())?;
808    let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
809    let relation_type = qcx.relation_type(&get);
810
811    if using.is_empty() {
812        if let Some(expr) = selection {
813            let ecx = &ExprContext {
814                qcx: &qcx,
815                name: "WHERE clause",
816                scope: &scope,
817                relation_type: &relation_type,
818                allow_aggregates: false,
819                allow_subqueries: true,
820                allow_parameters: true,
821                allow_windows: false,
822            };
823            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
824            get = get.filter(vec![expr]);
825        }
826    } else {
827        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
828    }
829
830    let mut sets = BTreeMap::new();
831    for Assignment { id, value } in assignments {
832        // Get the index and type of the column.
833        let name = normalize::column_name(id);
834        match desc.get_by_name(&name) {
835            Some((idx, typ)) => {
836                let ecx = &ExprContext {
837                    qcx: &qcx,
838                    name: "SET clause",
839                    scope: &scope,
840                    relation_type: &relation_type,
841                    allow_aggregates: false,
842                    allow_subqueries: false,
843                    allow_parameters: true,
844                    allow_windows: false,
845                };
846                let expr = plan_expr(ecx, &value)?.cast_to(
847                    ecx,
848                    CastContext::Assignment,
849                    &typ.scalar_type,
850                )?;
851
852                if sets.insert(idx, expr).is_some() {
853                    sql_bail!("column {} set twice", name)
854                }
855            }
856            None => sql_bail!("unknown column {}", name),
857        };
858    }
859
860    let finishing = RowSetFinishing {
861        order_by: vec![],
862        limit: None,
863        offset: 0,
864        project: (0..desc.arity()).collect(),
865    };
866
867    Ok(ReadThenWritePlan {
868        id,
869        selection: get,
870        finishing,
871        assignments: sets,
872    })
873}
874
875// Adjust `get` to perform an existential subquery on `using` accounting for
876// `selection`.
877//
878// If `USING`, we essentially want to rewrite the query as a correlated
879// existential subquery, i.e.
880// ```
881// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
882// ```
883// However, we can't do that directly because of esoteric rules w/r/t `lateral`
884// subqueries.
885// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
886fn handle_mutation_using_clause(
887    qcx: &QueryContext,
888    selection: Option<Expr<Aug>>,
889    using: Vec<TableWithJoins<Aug>>,
890    get: HirRelationExpr,
891    outer_scope: Scope,
892) -> Result<HirRelationExpr, PlanError> {
893    // Plan `USING` as a cross-joined `FROM` without knowledge of the
894    // statement's `FROM` target. This prevents `lateral` subqueries from
895    // "seeing" the `FROM` target.
896    let (mut using_rel_expr, using_scope) =
897        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
898            let (left, left_scope) = l;
899            plan_join(
900                qcx,
901                left,
902                left_scope,
903                &Join {
904                    relation: TableFactor::NestedJoin {
905                        join: Box::new(twj),
906                        alias: None,
907                    },
908                    join_operator: JoinOperator::CrossJoin,
909                },
910            )
911        })?;
912
913    if let Some(expr) = selection {
914        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
915        // PG-like semantics e.g. expressing ambiguous column references. We put
916        // `USING...` first for no real reason, but making a different decision
917        // would require adjusting the column references on this relation
918        // differently.
919        let on = HirScalarExpr::literal_true();
920        let joined = using_rel_expr
921            .clone()
922            .join(get.clone(), on, JoinKind::Inner);
923        let joined_scope = using_scope.product(outer_scope)?;
924        let joined_relation_type = qcx.relation_type(&joined);
925
926        let ecx = &ExprContext {
927            qcx,
928            name: "WHERE clause",
929            scope: &joined_scope,
930            relation_type: &joined_relation_type,
931            allow_aggregates: false,
932            allow_subqueries: true,
933            allow_parameters: true,
934            allow_windows: false,
935        };
936
937        // Plan the filter expression on `FROM, USING...`.
938        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
939
940        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
941        // those to the right of `using_rel_expr`) to instead be correlated to
942        // the outer relation, i.e. `get`.
943        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
944        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
945        use mz_expr::visit::Visit;
946        expr.visit_mut_post(&mut |e| {
947            if let HirScalarExpr::Column(c, _name) = e {
948                if c.column >= using_rel_arity {
949                    c.level += 1;
950                    c.column -= using_rel_arity;
951                };
952            }
953        })?;
954
955        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
956        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
957        // relation.
958        using_rel_expr = using_rel_expr.filter(vec![expr]);
959    } else {
960        // Check that scopes are at compatible (i.e. do not double-reference
961        // same table), despite lack of selection
962        let _joined_scope = using_scope.product(outer_scope)?;
963    }
964    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
965    // whether any rows are returned, and not on the contents of those rows,
966    // the output list of the subquery is normally unimportant.
967    //
968    // This means we don't need to worry about projecting/mapping any
969    // additional expressions here.
970    //
971    // https://www.postgresql.org/docs/14/functions-subquery.html
972
973    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
974    Ok(get.filter(vec![using_rel_expr.exists()]))
975}
976
977#[derive(Debug)]
978pub(crate) struct CastRelationError {
979    pub(crate) column: usize,
980    pub(crate) source_type: SqlScalarType,
981    pub(crate) target_type: SqlScalarType,
982}
983
984/// Cast a relation from one type to another using the specified type of cast.
985///
986/// The length of `target_types` must match the arity of `expr`.
987pub(crate) fn cast_relation<'a, I>(
988    qcx: &QueryContext,
989    ccx: CastContext,
990    expr: HirRelationExpr,
991    target_types: I,
992) -> Result<HirRelationExpr, CastRelationError>
993where
994    I: IntoIterator<Item = &'a SqlScalarType>,
995{
996    let ecx = &ExprContext {
997        qcx,
998        name: "values",
999        scope: &Scope::empty(),
1000        relation_type: &qcx.relation_type(&expr),
1001        allow_aggregates: false,
1002        allow_subqueries: true,
1003        allow_parameters: true,
1004        allow_windows: false,
1005    };
1006    let mut map_exprs = vec![];
1007    let mut project_key = vec![];
1008    for (i, target_typ) in target_types.into_iter().enumerate() {
1009        let expr = HirScalarExpr::column(i);
1010        // We plan every cast and check the evaluated expressions rather than
1011        // checking the types directly because of some complex casting rules
1012        // between types not expressed in `SqlScalarType` equality.
1013        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1014            Ok(cast_expr) => {
1015                if expr == cast_expr {
1016                    // Cast between types was unnecessary
1017                    project_key.push(i);
1018                } else {
1019                    // Cast between types required
1020                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1021                    map_exprs.push(cast_expr);
1022                }
1023            }
1024            Err(_) => {
1025                return Err(CastRelationError {
1026                    column: i,
1027                    source_type: ecx.scalar_type(&expr),
1028                    target_type: target_typ.clone(),
1029                });
1030            }
1031        }
1032    }
1033    Ok(expr.map(map_exprs).project(project_key))
1034}
1035
1036/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1037/// VIEW` statement.
1038pub fn plan_as_of(
1039    scx: &StatementContext,
1040    as_of: Option<AsOf<Aug>>,
1041) -> Result<QueryWhen, PlanError> {
1042    match as_of {
1043        None => Ok(QueryWhen::Immediately),
1044        Some(as_of) => match as_of {
1045            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1046            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1047        },
1048    }
1049}
1050
1051/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1052///
1053/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1054/// - not a constant,
1055/// - not castable to MzTimestamp,
1056/// - is null,
1057/// - contains an unmaterializable function,
1058/// - some other evaluation error occurs, e.g., a division by 0,
1059/// - contains aggregates, subqueries, parameters, or window function calls.
1060pub fn plan_as_of_or_up_to(
1061    scx: &StatementContext,
1062    mut expr: Expr<Aug>,
1063) -> Result<mz_repr::Timestamp, PlanError> {
1064    let scope = Scope::empty();
1065    let desc = RelationDesc::empty();
1066    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1067    // evaluated only once.)
1068    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1069    transform_ast::transform(scx, &mut expr)?;
1070    let ecx = &ExprContext {
1071        qcx: &qcx,
1072        name: "AS OF or UP TO",
1073        scope: &scope,
1074        relation_type: desc.typ(),
1075        allow_aggregates: false,
1076        allow_subqueries: false,
1077        allow_parameters: false,
1078        allow_windows: false,
1079    };
1080    let hir = plan_expr(ecx, &expr)?.cast_to(
1081        ecx,
1082        CastContext::Assignment,
1083        &SqlScalarType::MzTimestamp,
1084    )?;
1085    if hir.contains_unmaterializable() {
1086        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1087    }
1088    // At this point, we definitely have a constant expression:
1089    // - it can't contain any unmaterializable functions;
1090    // - it can't refer to any columns.
1091    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1092    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1093    // division by 0.
1094    let timestamp = hir
1095        .into_literal_mz_timestamp()
1096        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1097    Ok(timestamp)
1098}
1099
1100/// Plans an expression in the AS position of a `CREATE SECRET`.
1101pub fn plan_secret_as(
1102    scx: &StatementContext,
1103    mut expr: Expr<Aug>,
1104) -> Result<MirScalarExpr, PlanError> {
1105    let scope = Scope::empty();
1106    let desc = RelationDesc::empty();
1107    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1108
1109    transform_ast::transform(scx, &mut expr)?;
1110
1111    let ecx = &ExprContext {
1112        qcx: &qcx,
1113        name: "AS",
1114        scope: &scope,
1115        relation_type: desc.typ(),
1116        allow_aggregates: false,
1117        allow_subqueries: false,
1118        allow_parameters: false,
1119        allow_windows: false,
1120    };
1121    let expr = plan_expr(ecx, &expr)?
1122        .type_as(ecx, &SqlScalarType::Bytes)?
1123        .lower_uncorrelated()?;
1124    Ok(expr)
1125}
1126
1127/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1128pub fn plan_webhook_validate_using(
1129    scx: &StatementContext,
1130    validate_using: CreateWebhookSourceCheck<Aug>,
1131) -> Result<WebhookValidation, PlanError> {
1132    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1133
1134    let CreateWebhookSourceCheck {
1135        options,
1136        using: mut expr,
1137    } = validate_using;
1138
1139    let mut column_typs = vec![];
1140    let mut column_names = vec![];
1141
1142    let (bodies, headers, secrets) = options
1143        .map(|o| (o.bodies, o.headers, o.secrets))
1144        .unwrap_or_default();
1145
1146    // Append all of the bodies so they can be used in the expression.
1147    let mut body_tuples = vec![];
1148    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1149        let scalar_type = use_bytes
1150            .then_some(SqlScalarType::Bytes)
1151            .unwrap_or(SqlScalarType::String);
1152        let name = alias
1153            .map(|a| a.into_string())
1154            .unwrap_or_else(|| "body".to_string());
1155
1156        column_typs.push(SqlColumnType {
1157            scalar_type,
1158            nullable: false,
1159        });
1160        column_names.push(name);
1161
1162        // Store the column index so we can be sure to provide this body correctly.
1163        let column_idx = column_typs.len() - 1;
1164        // Double check we're consistent with column names.
1165        assert_eq!(
1166            column_idx,
1167            column_names.len() - 1,
1168            "body column names and types don't match"
1169        );
1170        body_tuples.push((column_idx, use_bytes));
1171    }
1172
1173    // Append all of the headers so they can be used in the expression.
1174    let mut header_tuples = vec![];
1175
1176    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1177        let value_type = use_bytes
1178            .then_some(SqlScalarType::Bytes)
1179            .unwrap_or(SqlScalarType::String);
1180        let name = alias
1181            .map(|a| a.into_string())
1182            .unwrap_or_else(|| "headers".to_string());
1183
1184        column_typs.push(SqlColumnType {
1185            scalar_type: SqlScalarType::Map {
1186                value_type: Box::new(value_type),
1187                custom_id: None,
1188            },
1189            nullable: false,
1190        });
1191        column_names.push(name);
1192
1193        // Store the column index so we can be sure to provide this body correctly.
1194        let column_idx = column_typs.len() - 1;
1195        // Double check we're consistent with column names.
1196        assert_eq!(
1197            column_idx,
1198            column_names.len() - 1,
1199            "header column names and types don't match"
1200        );
1201        header_tuples.push((column_idx, use_bytes));
1202    }
1203
1204    // Append all secrets so they can be used in the expression.
1205    let mut validation_secrets = vec![];
1206
1207    for CreateWebhookSourceSecret {
1208        secret,
1209        alias,
1210        use_bytes,
1211    } in secrets
1212    {
1213        // Either provide the secret to the validation expression as Bytes or a String.
1214        let scalar_type = use_bytes
1215            .then_some(SqlScalarType::Bytes)
1216            .unwrap_or(SqlScalarType::String);
1217
1218        column_typs.push(SqlColumnType {
1219            scalar_type,
1220            nullable: false,
1221        });
1222        let ResolvedItemName::Item {
1223            id,
1224            full_name: FullItemName { item, .. },
1225            ..
1226        } = secret
1227        else {
1228            return Err(PlanError::InvalidSecret(Box::new(secret)));
1229        };
1230
1231        // Plan the expression using the secret's alias, if one is provided.
1232        let name = if let Some(alias) = alias {
1233            alias.into_string()
1234        } else {
1235            item
1236        };
1237        column_names.push(name);
1238
1239        // Get the column index that corresponds for this secret, so we can make sure to provide the
1240        // secrets in the correct order during evaluation.
1241        let column_idx = column_typs.len() - 1;
1242        // Double check that our column names and types match.
1243        assert_eq!(
1244            column_idx,
1245            column_names.len() - 1,
1246            "column names and types don't match"
1247        );
1248
1249        validation_secrets.push(WebhookValidationSecret {
1250            id,
1251            column_idx,
1252            use_bytes,
1253        });
1254    }
1255
1256    let relation_typ = SqlRelationType::new(column_typs);
1257    let desc = RelationDesc::new(relation_typ, column_names.clone());
1258    let scope = Scope::from_source(None, column_names);
1259
1260    transform_ast::transform(scx, &mut expr)?;
1261
1262    let ecx = &ExprContext {
1263        qcx: &qcx,
1264        name: "CHECK",
1265        scope: &scope,
1266        relation_type: desc.typ(),
1267        allow_aggregates: false,
1268        allow_subqueries: false,
1269        allow_parameters: false,
1270        allow_windows: false,
1271    };
1272    let expr = plan_expr(ecx, &expr)?
1273        .type_as(ecx, &SqlScalarType::Bool)?
1274        .lower_uncorrelated()?;
1275    let validation = WebhookValidation {
1276        expression: expr,
1277        relation_desc: desc,
1278        bodies: body_tuples,
1279        headers: header_tuples,
1280        secrets: validation_secrets,
1281    };
1282    Ok(validation)
1283}
1284
1285pub fn plan_default_expr(
1286    scx: &StatementContext,
1287    expr: &Expr<Aug>,
1288    target_ty: &SqlScalarType,
1289) -> Result<HirScalarExpr, PlanError> {
1290    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1291    let ecx = &ExprContext {
1292        qcx: &qcx,
1293        name: "DEFAULT expression",
1294        scope: &Scope::empty(),
1295        relation_type: &SqlRelationType::empty(),
1296        allow_aggregates: false,
1297        allow_subqueries: false,
1298        allow_parameters: false,
1299        allow_windows: false,
1300    };
1301    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1302    Ok(hir)
1303}
1304
1305pub fn plan_params<'a>(
1306    scx: &'a StatementContext,
1307    params: Vec<Expr<Aug>>,
1308    desc: &StatementDesc,
1309) -> Result<Params, PlanError> {
1310    if params.len() != desc.param_types.len() {
1311        sql_bail!(
1312            "expected {} params, got {}",
1313            desc.param_types.len(),
1314            params.len()
1315        );
1316    }
1317
1318    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1319
1320    let mut datums = Row::default();
1321    let mut packer = datums.packer();
1322    let mut actual_types = Vec::new();
1323    let temp_storage = &RowArena::new();
1324    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1325        transform_ast::transform(scx, &mut expr)?;
1326
1327        let ecx = execute_expr_context(&qcx);
1328        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1329        let actual_ty = ecx.scalar_type(&ex);
1330        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1331            return Err(PlanError::WrongParameterType(
1332                i + 1,
1333                ecx.humanize_scalar_type(expected_ty, false),
1334                ecx.humanize_scalar_type(&actual_ty, false),
1335            ));
1336        }
1337        let ex = ex.lower_uncorrelated()?;
1338        let evaled = ex.eval(&[], temp_storage)?;
1339        packer.push(evaled);
1340        actual_types.push(actual_ty);
1341    }
1342    Ok(Params {
1343        datums,
1344        execute_types: actual_types,
1345        expected_types: desc.param_types.clone(),
1346    })
1347}
1348
1349static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1350static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1351
1352/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1353pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1354    ExprContext {
1355        qcx,
1356        name: "EXECUTE",
1357        scope: &EXECUTE_CONTEXT_SCOPE,
1358        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1359        allow_aggregates: false,
1360        allow_subqueries: false,
1361        allow_parameters: false,
1362        allow_windows: false,
1363    }
1364}
1365
1366/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1367///
1368/// This is an assignment cast also in Postgres, see
1369/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1370pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1371    LazyLock::new(|| CastContext::Assignment);
1372
1373pub fn plan_index_exprs<'a>(
1374    scx: &'a StatementContext,
1375    on_desc: &RelationDesc,
1376    exprs: Vec<Expr<Aug>>,
1377) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1378    let scope = Scope::from_source(None, on_desc.iter_names());
1379    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1380
1381    let ecx = &ExprContext {
1382        qcx: &qcx,
1383        name: "CREATE INDEX",
1384        scope: &scope,
1385        relation_type: on_desc.typ(),
1386        allow_aggregates: false,
1387        allow_subqueries: false,
1388        allow_parameters: false,
1389        allow_windows: false,
1390    };
1391    let mut out = vec![];
1392    for mut expr in exprs {
1393        transform_ast::transform(scx, &mut expr)?;
1394        let expr = plan_expr_or_col_index(ecx, &expr)?;
1395        let mut expr = expr.lower_uncorrelated()?;
1396        expr.reduce(&on_desc.typ().column_types);
1397        out.push(expr);
1398    }
1399    Ok(out)
1400}
1401
1402fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1403    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1404        Some(column) => Ok(HirScalarExpr::column(column)),
1405        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1406    }
1407}
1408
1409fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1410    match e {
1411        Expr::Value(Value::Number(n)) => {
1412            let n = n.parse::<usize>().map_err(|e| {
1413                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1414            })?;
1415            if n < 1 || n > max {
1416                sql_bail!(
1417                    "column reference {} in {} is out of range (1 - {})",
1418                    n,
1419                    name,
1420                    max
1421                );
1422            }
1423            Ok(Some(n - 1))
1424        }
1425        _ => Ok(None),
1426    }
1427}
1428
1429struct PlannedQuery {
1430    expr: HirRelationExpr,
1431    scope: Scope,
1432    order_by: Vec<ColumnOrder>,
1433    limit: Option<HirScalarExpr>,
1434    /// `offset` is either
1435    /// - an Int64 literal
1436    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1437    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1438    ///   later.)
1439    offset: HirScalarExpr,
1440    project: Vec<usize>,
1441    group_size_hints: GroupSizeHints,
1442}
1443
1444fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1445    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1446}
1447
1448fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1449    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1450    // for the identifiers, so that they can be re-installed before returning.
1451    let cte_bindings = plan_ctes(qcx, q)?;
1452
1453    let limit = match &q.limit {
1454        None => None,
1455        Some(Limit {
1456            quantity,
1457            with_ties: false,
1458        }) => {
1459            let ecx = &ExprContext {
1460                qcx,
1461                name: "LIMIT",
1462                scope: &Scope::empty(),
1463                relation_type: &SqlRelationType::empty(),
1464                allow_aggregates: false,
1465                allow_subqueries: true,
1466                allow_parameters: true,
1467                allow_windows: false,
1468            };
1469            let limit = plan_expr(ecx, quantity)?;
1470            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1471
1472            let limit = if limit.is_constant() {
1473                let arena = RowArena::new();
1474                let limit = limit.lower_uncorrelated()?;
1475
1476                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1477                // that the error happened in a LIMIT clause, so that we have better error msg for
1478                // something like `SELECT 5 LIMIT 'aaa'`.
1479                match limit.eval(&[], &arena)? {
1480                    d @ Datum::Int64(v) if v >= 0 => {
1481                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1482                    }
1483                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1484                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1485                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1486                }
1487            } else {
1488                // Gate non-constant LIMIT expressions behind a feature flag
1489                qcx.scx
1490                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1491                limit
1492            };
1493
1494            Some(limit)
1495        }
1496        Some(Limit {
1497            quantity: _,
1498            with_ties: true,
1499        }) => bail_unsupported!("FETCH ... WITH TIES"),
1500    };
1501
1502    let offset = match &q.offset {
1503        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1504        Some(offset) => {
1505            let ecx = &ExprContext {
1506                qcx,
1507                name: "OFFSET",
1508                scope: &Scope::empty(),
1509                relation_type: &SqlRelationType::empty(),
1510                allow_aggregates: false,
1511                allow_subqueries: false,
1512                allow_parameters: true,
1513                allow_windows: false,
1514            };
1515            let offset = plan_expr(ecx, offset)?;
1516            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1517
1518            let offset = if offset.is_constant() {
1519                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1520                let offset_value = offset_into_value(offset)?;
1521                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1522            } else {
1523                // The only case when this is allowed to not be a constant is if it contains
1524                // parameters. (In which case, we'll later check that it's a constant after
1525                // parameter binding.)
1526                if !offset.contains_parameters() {
1527                    return Err(PlanError::InvalidOffset(format!(
1528                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1529                        offset
1530                    )));
1531                }
1532                offset
1533            };
1534            offset
1535        }
1536    };
1537
1538    let mut planned_query = match &q.body {
1539        SetExpr::Select(s) => {
1540            // Extract query options.
1541            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1542            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1543
1544            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1545            PlannedQuery {
1546                expr: plan.expr,
1547                scope: plan.scope,
1548                order_by: plan.order_by,
1549                project: plan.project,
1550                limit,
1551                offset,
1552                group_size_hints,
1553            }
1554        }
1555        _ => {
1556            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1557            let ecx = &ExprContext {
1558                qcx,
1559                name: "ORDER BY clause of a set expression",
1560                scope: &scope,
1561                relation_type: &qcx.relation_type(&expr),
1562                allow_aggregates: false,
1563                allow_subqueries: true,
1564                allow_parameters: true,
1565                allow_windows: false,
1566            };
1567            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1568            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1569            let project = (0..ecx.relation_type.arity()).collect();
1570            PlannedQuery {
1571                expr: expr.map(map_exprs),
1572                scope,
1573                order_by,
1574                limit,
1575                project,
1576                offset,
1577                group_size_hints: GroupSizeHints::default(),
1578            }
1579        }
1580    };
1581
1582    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1583    match &q.ctes {
1584        CteBlock::Simple(_) => {
1585            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1586                if let Some(cte) = qcx.ctes.remove(&id) {
1587                    planned_query.expr = HirRelationExpr::Let {
1588                        name: cte.name,
1589                        id: id.clone(),
1590                        value: Box::new(value),
1591                        body: Box::new(planned_query.expr),
1592                    };
1593                }
1594                if let Some(shadowed_val) = shadowed_val {
1595                    qcx.ctes.insert(id, shadowed_val);
1596                }
1597            }
1598        }
1599        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1600            let MutRecBlockOptionExtracted {
1601                recursion_limit,
1602                return_at_recursion_limit,
1603                error_at_recursion_limit,
1604                seen: _,
1605            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1606            let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1607                (None, None, None) => None,
1608                (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1609                (None, Some(max_iters), None) => Some((max_iters, true)),
1610                (None, None, Some(max_iters)) => Some((max_iters, false)),
1611                _ => {
1612                    return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1613                }
1614            }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1615                max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1616                return_at_limit: *return_at_limit,
1617            }))?;
1618
1619            let mut bindings = Vec::new();
1620            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1621                if let Some(cte) = qcx.ctes.remove(&id) {
1622                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1623                }
1624                if let Some(shadowed_val) = shadowed_val {
1625                    qcx.ctes.insert(id, shadowed_val);
1626                }
1627            }
1628            if !bindings.is_empty() {
1629                planned_query.expr = HirRelationExpr::LetRec {
1630                    limit,
1631                    bindings,
1632                    body: Box::new(planned_query.expr),
1633                }
1634            }
1635        }
1636    }
1637
1638    Ok(planned_query)
1639}
1640
1641/// Converts an OFFSET expression into a value.
1642pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1643    let offset = offset
1644        .try_into_literal_int64()
1645        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1646    if offset < 0 {
1647        return Err(PlanError::InvalidOffset(format!(
1648            "must not be negative, got {}",
1649            offset
1650        )));
1651    }
1652    Ok(offset)
1653}
1654
1655generate_extracted_config!(
1656    MutRecBlockOption,
1657    (RecursionLimit, u64),
1658    (ReturnAtRecursionLimit, u64),
1659    (ErrorAtRecursionLimit, u64)
1660);
1661
1662/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1663///
1664/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1665/// shadowed value that can be reinstalled once the planning has completed.
1666pub fn plan_ctes(
1667    qcx: &mut QueryContext,
1668    q: &Query<Aug>,
1669) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1670    // Accumulate planned expressions and shadowed descriptions.
1671    let mut result = Vec::new();
1672    // Retain the old descriptions of CTE bindings so that we can restore them
1673    // after we're done planning this SELECT.
1674    let mut shadowed_descs = BTreeMap::new();
1675
1676    // A reused identifier indicates a reused name.
1677    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1678        sql_bail!(
1679            "WITH query name {} specified more than once",
1680            normalize::ident_ref(ident).quoted()
1681        )
1682    }
1683
1684    match &q.ctes {
1685        CteBlock::Simple(ctes) => {
1686            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1687            for cte in ctes.iter() {
1688                let cte_name = normalize::ident(cte.alias.name.clone());
1689                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1690                let typ = qcx.relation_type(&val);
1691                let mut desc = RelationDesc::new(typ, scope.column_names());
1692                plan_utils::maybe_rename_columns(
1693                    format!("CTE {}", cte.alias.name),
1694                    &mut desc,
1695                    &cte.alias.columns,
1696                )?;
1697                // Capture the prior value if it exists, so that it can be re-installed.
1698                let shadowed = qcx.ctes.insert(
1699                    cte.id,
1700                    CteDesc {
1701                        name: cte_name,
1702                        desc,
1703                    },
1704                );
1705
1706                result.push((cte.id, val, shadowed));
1707            }
1708        }
1709        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1710            // Insert column types into `qcx.ctes` first for recursive bindings.
1711            for cte in ctes.iter() {
1712                let cte_name = normalize::ident(cte.name.clone());
1713                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1714                for column in cte.columns.iter() {
1715                    desc_columns.push((
1716                        normalize::column_name(column.name.clone()),
1717                        SqlColumnType {
1718                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1719                            nullable: true,
1720                        },
1721                    ));
1722                }
1723                let desc = RelationDesc::from_names_and_types(desc_columns);
1724                let shadowed = qcx.ctes.insert(
1725                    cte.id,
1726                    CteDesc {
1727                        name: cte_name,
1728                        desc,
1729                    },
1730                );
1731                // Capture the prior value if it exists, so that it can be re-installed.
1732                if let Some(shadowed) = shadowed {
1733                    shadowed_descs.insert(cte.id, shadowed);
1734                }
1735            }
1736
1737            // Plan all CTEs and validate the proposed types.
1738            for cte in ctes.iter() {
1739                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1740
1741                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1742
1743                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1744                    // Once WMR CTEs support NOT NULL constraints, check that
1745                    // nullability of derived column types are compatible.
1746                    sql_bail!(
1747                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1748                    );
1749                }
1750
1751                if !proposed_typ.keys.is_empty() {
1752                    // Once WMR CTEs support keys, check that keys exactly
1753                    // overlap.
1754                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1755                }
1756
1757                // Validate that the derived and proposed types are the same.
1758                let derived_typ = qcx.relation_type(&val);
1759
1760                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1761                    let cte_name = normalize::ident(cte.name.clone());
1762                    let proposed_typ = proposed_typ
1763                        .column_types
1764                        .iter()
1765                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1766                        .collect::<Vec<_>>();
1767                    let inferred_typ = derived_typ
1768                        .column_types
1769                        .iter()
1770                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1771                        .collect::<Vec<_>>();
1772                    Err(PlanError::RecursiveTypeMismatch(
1773                        cte_name,
1774                        proposed_typ,
1775                        inferred_typ,
1776                    ))
1777                };
1778
1779                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1780                    return type_err(proposed_typ, derived_typ);
1781                }
1782
1783                // Cast derived types to proposed types or error.
1784                let val = match cast_relation(
1785                    qcx,
1786                    // Choose `CastContext::Assignment`` because the user has
1787                    // been explicit about the types they expect. Choosing
1788                    // `CastContext::Implicit` is not "strong" enough to impose
1789                    // typmods from proposed types onto values.
1790                    CastContext::Assignment,
1791                    val,
1792                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1793                ) {
1794                    Ok(val) => val,
1795                    Err(_) => return type_err(proposed_typ, derived_typ),
1796                };
1797
1798                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1799            }
1800        }
1801    }
1802
1803    Ok(result)
1804}
1805
1806pub fn plan_nested_query(
1807    qcx: &mut QueryContext,
1808    q: &Query<Aug>,
1809) -> Result<(HirRelationExpr, Scope), PlanError> {
1810    let PlannedQuery {
1811        mut expr,
1812        scope,
1813        order_by,
1814        limit,
1815        offset,
1816        project,
1817        group_size_hints,
1818    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1819    if limit.is_some()
1820        || !offset
1821            .clone()
1822            .try_into_literal_int64()
1823            .is_ok_and(|offset| offset == 0)
1824    {
1825        expr = HirRelationExpr::top_k(
1826            expr,
1827            vec![],
1828            order_by,
1829            limit,
1830            offset,
1831            group_size_hints.limit_input_group_size,
1832        );
1833    }
1834    Ok((expr.project(project), scope))
1835}
1836
1837fn plan_set_expr(
1838    qcx: &mut QueryContext,
1839    q: &SetExpr<Aug>,
1840) -> Result<(HirRelationExpr, Scope), PlanError> {
1841    match q {
1842        SetExpr::Select(select) => {
1843            let order_by_exprs = Vec::new();
1844            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1845            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1846            // should not have planned any ordering.
1847            assert!(plan.order_by.is_empty());
1848            Ok((plan.expr.project(plan.project), plan.scope))
1849        }
1850        SetExpr::SetOperation {
1851            op,
1852            all,
1853            left,
1854            right,
1855        } => {
1856            // Plan the LHS and RHS.
1857            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1858            let (right_expr, right_scope) =
1859                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1860
1861            // Validate that the LHS and RHS are the same width.
1862            let left_type = qcx.relation_type(&left_expr);
1863            let right_type = qcx.relation_type(&right_expr);
1864            if left_type.arity() != right_type.arity() {
1865                sql_bail!(
1866                    "each {} query must have the same number of columns: {} vs {}",
1867                    op,
1868                    left_type.arity(),
1869                    right_type.arity(),
1870                );
1871            }
1872
1873            // Match the types of the corresponding columns on the LHS and RHS
1874            // using the normal type coercion rules. This is equivalent to
1875            // `coerce_homogeneous_exprs`, but implemented in terms of
1876            // `HirRelationExpr` rather than `HirScalarExpr`.
1877            let left_ecx = &ExprContext {
1878                qcx,
1879                name: &op.to_string(),
1880                scope: &left_scope,
1881                relation_type: &left_type,
1882                allow_aggregates: false,
1883                allow_subqueries: false,
1884                allow_parameters: false,
1885                allow_windows: false,
1886            };
1887            let right_ecx = &ExprContext {
1888                qcx,
1889                name: &op.to_string(),
1890                scope: &right_scope,
1891                relation_type: &right_type,
1892                allow_aggregates: false,
1893                allow_subqueries: false,
1894                allow_parameters: false,
1895                allow_windows: false,
1896            };
1897            let mut left_casts = vec![];
1898            let mut right_casts = vec![];
1899            for (i, (left_type, right_type)) in left_type
1900                .column_types
1901                .iter()
1902                .zip_eq(right_type.column_types.iter())
1903                .enumerate()
1904            {
1905                let types = &[
1906                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1907                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1908                ];
1909                let target =
1910                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1911                match typeconv::plan_cast(
1912                    left_ecx,
1913                    CastContext::Implicit,
1914                    HirScalarExpr::column(i),
1915                    &target,
1916                ) {
1917                    Ok(expr) => left_casts.push(expr),
1918                    Err(_) => sql_bail!(
1919                        "{} types {} and {} cannot be matched",
1920                        op,
1921                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1922                        qcx.humanize_scalar_type(&target, false),
1923                    ),
1924                }
1925                match typeconv::plan_cast(
1926                    right_ecx,
1927                    CastContext::Implicit,
1928                    HirScalarExpr::column(i),
1929                    &target,
1930                ) {
1931                    Ok(expr) => right_casts.push(expr),
1932                    Err(_) => sql_bail!(
1933                        "{} types {} and {} cannot be matched",
1934                        op,
1935                        qcx.humanize_scalar_type(&target, false),
1936                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1937                    ),
1938                }
1939            }
1940            let lhs = if left_casts
1941                .iter()
1942                .enumerate()
1943                .any(|(i, e)| e != &HirScalarExpr::column(i))
1944            {
1945                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1946                left_expr.map(left_casts).project(project_key)
1947            } else {
1948                left_expr
1949            };
1950            let rhs = if right_casts
1951                .iter()
1952                .enumerate()
1953                .any(|(i, e)| e != &HirScalarExpr::column(i))
1954            {
1955                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1956                right_expr.map(right_casts).project(project_key)
1957            } else {
1958                right_expr
1959            };
1960
1961            let relation_expr = match op {
1962                SetOperator::Union => {
1963                    if *all {
1964                        lhs.union(rhs)
1965                    } else {
1966                        lhs.union(rhs).distinct()
1967                    }
1968                }
1969                SetOperator::Except => Hir::except(all, lhs, rhs),
1970                SetOperator::Intersect => {
1971                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1972                    // Though we believe that render() does The Right Thing (TM)
1973                    // Also note that we do *not* need another threshold() at the end of the method chain
1974                    // because the right-hand side of the outer union only produces existing records,
1975                    // i.e., the record counts for differential data flow definitely remain non-negative.
1976                    let left_clone = lhs.clone();
1977                    if *all {
1978                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1979                    } else {
1980                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1981                            .distinct()
1982                    }
1983                }
1984            };
1985            let scope = Scope::from_source(
1986                None,
1987                // Column names are taken from the left, as in Postgres.
1988                left_scope.column_names(),
1989            );
1990
1991            Ok((relation_expr, scope))
1992        }
1993        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1994        SetExpr::Table(name) => {
1995            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1996            Ok((expr, scope))
1997        }
1998        SetExpr::Query(query) => {
1999            let (expr, scope) = plan_nested_query(qcx, query)?;
2000            Ok((expr, scope))
2001        }
2002        SetExpr::Show(stmt) => {
2003            // The create SQL definition of involving this query, will have the explicit `SHOW`
2004            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2005            // current schema of the executing user. When Materialize restarts and tries to re-plan
2006            // these queries, it will only have access to the raw `SHOW` command and have no idea
2007            // what schema to use. As a result Materialize will fail to boot.
2008            //
2009            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2010            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2011            // However, banning show commands in views gives us more flexibility to change their
2012            // output.
2013            //
2014            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2015            // with all show commands expanded into their equivalent SELECT statements.
2016            if !qcx.lifetime.allow_show() {
2017                return Err(PlanError::ShowCommandInView);
2018            }
2019
2020            // Some SHOW statements are a SELECT query. Others produces Rows
2021            // directly. Convert both of these to the needed Hir and Scope.
2022            fn to_hirscope(
2023                plan: ShowCreatePlan,
2024                desc: StatementDesc,
2025            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2026                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2027                let desc = desc.relation_desc.expect("must exist");
2028                let scope = Scope::from_source(None, desc.iter_names());
2029                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2030                Ok((expr, scope))
2031            }
2032
2033            match stmt.clone() {
2034                ShowStatement::ShowColumns(stmt) => {
2035                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2036                }
2037                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2038                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2039                    show::describe_show_create_connection(qcx.scx, stmt)?,
2040                ),
2041                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2042                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2043                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2044                ),
2045                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2046                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2047                    show::describe_show_create_index(qcx.scx, stmt)?,
2048                ),
2049                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2050                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2051                    show::describe_show_create_sink(qcx.scx, stmt)?,
2052                ),
2053                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2054                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2055                    show::describe_show_create_source(qcx.scx, stmt)?,
2056                ),
2057                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2058                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2059                    show::describe_show_create_table(qcx.scx, stmt)?,
2060                ),
2061                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2062                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2063                    show::describe_show_create_view(qcx.scx, stmt)?,
2064                ),
2065                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2066                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2067                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2068                ),
2069                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2070                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2071                    show::describe_show_create_type(qcx.scx, stmt)?,
2072                ),
2073                ShowStatement::ShowObjects(stmt) => {
2074                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2075                }
2076                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2077                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2078            }
2079        }
2080    }
2081}
2082
2083/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2084fn plan_values(
2085    qcx: &QueryContext,
2086    values: &[Vec<Expr<Aug>>],
2087) -> Result<(HirRelationExpr, Scope), PlanError> {
2088    assert!(!values.is_empty());
2089
2090    let ecx = &ExprContext {
2091        qcx,
2092        name: "VALUES",
2093        scope: &Scope::empty(),
2094        relation_type: &SqlRelationType::empty(),
2095        allow_aggregates: false,
2096        allow_subqueries: true,
2097        allow_parameters: true,
2098        allow_windows: false,
2099    };
2100
2101    let ncols = values[0].len();
2102    let nrows = values.len();
2103
2104    // Arrange input expressions by columns, not rows, so that we can
2105    // call `coerce_homogeneous_exprs` on each column.
2106    let mut cols = vec![vec![]; ncols];
2107    for row in values {
2108        if row.len() != ncols {
2109            sql_bail!(
2110                "VALUES expression has varying number of columns: {} vs {}",
2111                row.len(),
2112                ncols
2113            );
2114        }
2115        for (i, v) in row.iter().enumerate() {
2116            cols[i].push(v);
2117        }
2118    }
2119
2120    // Plan each column.
2121    let mut col_iters = Vec::with_capacity(ncols);
2122    let mut col_types = Vec::with_capacity(ncols);
2123    for col in &cols {
2124        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2125        let mut col_type = ecx.column_type(&col[0]);
2126        for val in &col[1..] {
2127            col_type = col_type.union(&ecx.column_type(val))?;
2128        }
2129        col_types.push(col_type);
2130        col_iters.push(col.into_iter());
2131    }
2132
2133    // Build constant relation.
2134    let mut exprs = vec![];
2135    for _ in 0..nrows {
2136        for i in 0..ncols {
2137            exprs.push(col_iters[i].next().unwrap());
2138        }
2139    }
2140    let out = HirRelationExpr::CallTable {
2141        func: TableFunc::Wrap {
2142            width: ncols,
2143            types: col_types,
2144        },
2145        exprs,
2146    };
2147
2148    // Build column names.
2149    let mut scope = Scope::empty();
2150    for i in 0..ncols {
2151        let name = format!("column{}", i + 1);
2152        scope.items.push(ScopeItem::from_column_name(name));
2153    }
2154
2155    Ok((out, scope))
2156}
2157
2158/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2159/// statement.
2160///
2161/// This is special-cased in PostgreSQL and different enough from `plan_values`
2162/// that it is easier to use a separate function entirely. Unlike a normal
2163/// `VALUES` clause, each value is coerced to the type of the target table
2164/// via an assignment cast.
2165///
2166/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2167fn plan_values_insert(
2168    qcx: &QueryContext,
2169    target_names: &[&ColumnName],
2170    target_types: &[&SqlScalarType],
2171    values: &[Vec<Expr<Aug>>],
2172) -> Result<HirRelationExpr, PlanError> {
2173    assert!(!values.is_empty());
2174
2175    if !values.iter().map(|row| row.len()).all_equal() {
2176        sql_bail!("VALUES lists must all be the same length");
2177    }
2178
2179    let ecx = &ExprContext {
2180        qcx,
2181        name: "VALUES",
2182        scope: &Scope::empty(),
2183        relation_type: &SqlRelationType::empty(),
2184        allow_aggregates: false,
2185        allow_subqueries: true,
2186        allow_parameters: true,
2187        allow_windows: false,
2188    };
2189
2190    let mut exprs = vec![];
2191    let mut types = vec![];
2192    for row in values {
2193        if row.len() > target_names.len() {
2194            sql_bail!("INSERT has more expressions than target columns");
2195        }
2196        for (column, val) in row.into_iter().enumerate() {
2197            let target_type = &target_types[column];
2198            let val = plan_expr(ecx, val)?;
2199            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2200            let source_type = &ecx.scalar_type(&val);
2201            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2202                Ok(val) => val,
2203                Err(_) => sql_bail!(
2204                    "column {} is of type {} but expression is of type {}",
2205                    target_names[column].quoted(),
2206                    qcx.humanize_scalar_type(target_type, false),
2207                    qcx.humanize_scalar_type(source_type, false),
2208                ),
2209            };
2210            if column >= types.len() {
2211                types.push(ecx.column_type(&val));
2212            } else {
2213                types[column] = types[column].union(&ecx.column_type(&val))?;
2214            }
2215            exprs.push(val);
2216        }
2217    }
2218
2219    Ok(HirRelationExpr::CallTable {
2220        func: TableFunc::Wrap {
2221            width: values[0].len(),
2222            types,
2223        },
2224        exprs,
2225    })
2226}
2227
2228fn plan_join_identity() -> (HirRelationExpr, Scope) {
2229    let typ = SqlRelationType::new(vec![]);
2230    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2231    let scope = Scope::empty();
2232    (expr, scope)
2233}
2234
2235/// Describes how to execute a SELECT query.
2236///
2237/// `order_by` describes how to order the rows in `expr` *before* applying the
2238/// projection. The `scope` describes the columns in `expr` *after* the
2239/// projection has been applied.
2240#[derive(Debug)]
2241struct SelectPlan {
2242    expr: HirRelationExpr,
2243    scope: Scope,
2244    order_by: Vec<ColumnOrder>,
2245    project: Vec<usize>,
2246}
2247
2248generate_extracted_config!(
2249    SelectOption,
2250    (ExpectedGroupSize, u64),
2251    (AggregateInputGroupSize, u64),
2252    (DistinctOnInputGroupSize, u64),
2253    (LimitInputGroupSize, u64)
2254);
2255
2256/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2257///
2258/// Normally, the ORDER BY clause occurs after the columns specified in the
2259/// SELECT list have been projected. In a query like
2260///
2261///   CREATE TABLE (a int, b int)
2262///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2263///
2264/// it is valid to refer to `a`, because it is explicitly selected, but it would
2265/// not be valid to refer to unselected column `b`.
2266///
2267/// But PostgreSQL extends the standard to permit queries like
2268///
2269///   SELECT a FROM t ORDER BY b
2270///
2271/// where expressions in the ORDER BY clause can refer to *both* input columns
2272/// and output columns.
2273fn plan_select_from_where(
2274    qcx: &QueryContext,
2275    mut s: Select<Aug>,
2276    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2277) -> Result<SelectPlan, PlanError> {
2278    // TODO: Both `s` and `order_by_exprs` are not references because the
2279    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2280    // table function support (the UUID mapping). Attempt to change this so callers
2281    // don't need to clone the Select.
2282
2283    // Extract query options.
2284    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2285    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2286
2287    // Step 1. Handle FROM clause, including joins.
2288    let (mut relation_expr, mut from_scope) =
2289        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2290            let (left, left_scope) = l;
2291            plan_join(
2292                qcx,
2293                left,
2294                left_scope,
2295                &Join {
2296                    relation: TableFactor::NestedJoin {
2297                        join: Box::new(twj.clone()),
2298                        alias: None,
2299                    },
2300                    join_operator: JoinOperator::CrossJoin,
2301                },
2302            )
2303        })?;
2304
2305    // Step 2. Handle WHERE clause.
2306    if let Some(selection) = &s.selection {
2307        let ecx = &ExprContext {
2308            qcx,
2309            name: "WHERE clause",
2310            scope: &from_scope,
2311            relation_type: &qcx.relation_type(&relation_expr),
2312            allow_aggregates: false,
2313            allow_subqueries: true,
2314            allow_parameters: true,
2315            allow_windows: false,
2316        };
2317        let expr = plan_expr(ecx, selection)
2318            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2319            .type_as(ecx, &SqlScalarType::Bool)?;
2320        relation_expr = relation_expr.filter(vec![expr]);
2321    }
2322
2323    // Step 3. Gather aggregates and table functions.
2324    // (But skip window aggregates.)
2325    let (aggregates, table_funcs) = {
2326        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2327        visitor.visit_select_mut(&mut s);
2328        for o in order_by_exprs.iter_mut() {
2329            visitor.visit_order_by_expr_mut(o);
2330        }
2331        visitor.into_result()?
2332    };
2333    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2334    if !table_funcs.is_empty() {
2335        let (expr, scope) = plan_scalar_table_funcs(
2336            qcx,
2337            table_funcs,
2338            &mut table_func_names,
2339            &relation_expr,
2340            &from_scope,
2341        )?;
2342        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2343        from_scope = from_scope.product(scope)?;
2344    }
2345
2346    // Step 4. Expand SELECT clause.
2347    let projection = {
2348        let ecx = &ExprContext {
2349            qcx,
2350            name: "SELECT clause",
2351            scope: &from_scope,
2352            relation_type: &qcx.relation_type(&relation_expr),
2353            allow_aggregates: true,
2354            allow_subqueries: true,
2355            allow_parameters: true,
2356            allow_windows: true,
2357        };
2358        let mut out = vec![];
2359        for si in &s.projection {
2360            if *si == SelectItem::Wildcard && s.from.is_empty() {
2361                sql_bail!("SELECT * with no tables specified is not valid");
2362            }
2363            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2364        }
2365        out
2366    };
2367
2368    // Step 5. Handle GROUP BY clause.
2369    // This will also plan the aggregates gathered in Step 3.
2370    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2371    let (mut group_scope, select_all_mapping) = {
2372        // Compute GROUP BY expressions.
2373        let ecx = &ExprContext {
2374            qcx,
2375            name: "GROUP BY clause",
2376            scope: &from_scope,
2377            relation_type: &qcx.relation_type(&relation_expr),
2378            allow_aggregates: false,
2379            allow_subqueries: true,
2380            allow_parameters: true,
2381            allow_windows: false,
2382        };
2383        let mut group_key = vec![];
2384        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2385        let mut group_hir_exprs = vec![];
2386        let mut group_scope = Scope::empty();
2387        let mut select_all_mapping = BTreeMap::new();
2388
2389        for group_expr in &s.group_by {
2390            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2391            let new_column = group_key.len();
2392
2393            if let Some(group_expr) = group_expr {
2394                // Multiple AST expressions can map to the same HIR expression.
2395                // If we already have a ScopeItem for this HIR, we can add this
2396                // next AST expression to its set
2397                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2398                    existing_scope_item.exprs.insert(group_expr.clone());
2399                    continue;
2400                }
2401            }
2402
2403            let mut scope_item = if let HirScalarExpr::Column(
2404                ColumnRef {
2405                    level: 0,
2406                    column: old_column,
2407                },
2408                _name,
2409            ) = &expr
2410            {
2411                // If we later have `SELECT foo.*` then we have to find all
2412                // the `foo` items in `from_scope` and figure out where they
2413                // ended up in `group_scope`. This is really hard to do
2414                // right using SQL name resolution, so instead we just track
2415                // the movement here.
2416                select_all_mapping.insert(*old_column, new_column);
2417                let scope_item = ecx.scope.items[*old_column].clone();
2418                scope_item
2419            } else {
2420                ScopeItem::empty()
2421            };
2422
2423            if let Some(group_expr) = group_expr.cloned() {
2424                scope_item.exprs.insert(group_expr);
2425            }
2426
2427            group_key.push(from_scope.len() + group_exprs.len());
2428            group_hir_exprs.push(expr.clone());
2429            group_exprs.insert(expr, scope_item);
2430        }
2431
2432        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2433        for expr in &group_hir_exprs {
2434            if let Some(scope_item) = group_exprs.remove(expr) {
2435                group_scope.items.push(scope_item);
2436            }
2437        }
2438
2439        // Plan aggregates.
2440        let ecx = &ExprContext {
2441            qcx,
2442            name: "aggregate function",
2443            scope: &from_scope,
2444            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2445            allow_aggregates: false,
2446            allow_subqueries: true,
2447            allow_parameters: true,
2448            allow_windows: false,
2449        };
2450        let mut agg_exprs = vec![];
2451        for sql_function in aggregates {
2452            if sql_function.over.is_some() {
2453                unreachable!(
2454                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2455                );
2456            }
2457            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2458            group_scope
2459                .items
2460                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2461        }
2462        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2463            // apply GROUP BY / aggregates
2464            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2465                group_key,
2466                agg_exprs,
2467                group_size_hints.aggregate_input_group_size,
2468            );
2469
2470            // For every old column that wasn't a group key, add a scope item
2471            // that errors when referenced. We can't simply drop these items
2472            // from scope. These items need to *exist* because they might shadow
2473            // variables in outer scopes that would otherwise be valid to
2474            // reference, but accessing them needs to produce an error.
2475            for i in 0..from_scope.len() {
2476                if !select_all_mapping.contains_key(&i) {
2477                    let scope_item = &ecx.scope.items[i];
2478                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2479                        table_name: scope_item.table_name.clone(),
2480                        column_name: scope_item.column_name.clone(),
2481                        allow_unqualified_references: scope_item.allow_unqualified_references,
2482                    });
2483                }
2484            }
2485
2486            (group_scope, select_all_mapping)
2487        } else {
2488            // if no GROUP BY, aggregates or having then all columns remain in scope
2489            (
2490                from_scope.clone(),
2491                (0..from_scope.len()).map(|i| (i, i)).collect(),
2492            )
2493        }
2494    };
2495
2496    // Step 6. Handle HAVING clause.
2497    if let Some(ref having) = s.having {
2498        let ecx = &ExprContext {
2499            qcx,
2500            name: "HAVING clause",
2501            scope: &group_scope,
2502            relation_type: &qcx.relation_type(&relation_expr),
2503            allow_aggregates: true,
2504            allow_subqueries: true,
2505            allow_parameters: true,
2506            allow_windows: false,
2507        };
2508        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2509        relation_expr = relation_expr.filter(vec![expr]);
2510    }
2511
2512    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2513    // (This includes window aggregations.)
2514    //
2515    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2516    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2517    //
2518    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2519    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2520    // and can't be part of a bigger expression.
2521    // See https://www.postgresql.org/docs/current/queries-order.html:
2522    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2523    // expression"
2524    let window_funcs = {
2525        let mut visitor = WindowFuncCollector::default();
2526        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2527        // window functions are excluded from other things by `allow_windows` being false when
2528        // planning those before this code).
2529        visitor.visit_select(&s);
2530        for o in order_by_exprs.iter() {
2531            visitor.visit_order_by_expr(o);
2532        }
2533        visitor.into_result()
2534    };
2535    for window_func in window_funcs {
2536        let ecx = &ExprContext {
2537            qcx,
2538            name: "window function",
2539            scope: &group_scope,
2540            relation_type: &qcx.relation_type(&relation_expr),
2541            allow_aggregates: true,
2542            allow_subqueries: true,
2543            allow_parameters: true,
2544            allow_windows: true,
2545        };
2546        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2547        group_scope.items.push(ScopeItem::from_expr(window_func));
2548    }
2549    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2550    // been already planned now. However, we should still set `allow_windows: true` for the
2551    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2552    // msg if an OVER clause is missing from a window function.
2553
2554    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2555    if let Some(ref qualify) = s.qualify {
2556        let ecx = &ExprContext {
2557            qcx,
2558            name: "QUALIFY clause",
2559            scope: &group_scope,
2560            relation_type: &qcx.relation_type(&relation_expr),
2561            allow_aggregates: true,
2562            allow_subqueries: true,
2563            allow_parameters: true,
2564            allow_windows: true,
2565        };
2566        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2567        relation_expr = relation_expr.filter(vec![expr]);
2568    }
2569
2570    // Step 9. Handle SELECT clause.
2571    let output_columns = {
2572        let mut new_exprs = vec![];
2573        let mut new_type = qcx.relation_type(&relation_expr);
2574        let mut output_columns = vec![];
2575        for (select_item, column_name) in &projection {
2576            let ecx = &ExprContext {
2577                qcx,
2578                name: "SELECT clause",
2579                scope: &group_scope,
2580                relation_type: &new_type,
2581                allow_aggregates: true,
2582                allow_subqueries: true,
2583                allow_parameters: true,
2584                allow_windows: true,
2585            };
2586            let expr = match select_item {
2587                ExpandedSelectItem::InputOrdinal(i) => {
2588                    if let Some(column) = select_all_mapping.get(i).copied() {
2589                        HirScalarExpr::column(column)
2590                    } else {
2591                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2592                    }
2593                }
2594                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2595            };
2596            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2597                // Simple column reference; no need to map on a new expression.
2598                output_columns.push((column, column_name));
2599            } else {
2600                // Complicated expression that requires a map expression. We
2601                // update `group_scope` as we go so that future expressions that
2602                // are textually identical to this one can reuse it. This
2603                // duplicate detection is required for proper determination of
2604                // ambiguous column references with SQL92-style `ORDER BY`
2605                // items. See `plan_order_by_or_distinct_expr` for more.
2606                let typ = ecx.column_type(&expr);
2607                new_type.column_types.push(typ);
2608                new_exprs.push(expr);
2609                output_columns.push((group_scope.len(), column_name));
2610                group_scope
2611                    .items
2612                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2613            }
2614        }
2615        relation_expr = relation_expr.map(new_exprs);
2616        output_columns
2617    };
2618    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2619
2620    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2621    let order_by = {
2622        let relation_type = qcx.relation_type(&relation_expr);
2623        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2624            &ExprContext {
2625                qcx,
2626                name: "ORDER BY clause",
2627                scope: &group_scope,
2628                relation_type: &relation_type,
2629                allow_aggregates: true,
2630                allow_subqueries: true,
2631                allow_parameters: true,
2632                allow_windows: true,
2633            },
2634            &order_by_exprs,
2635            &output_columns,
2636        )?;
2637
2638        match s.distinct {
2639            None => relation_expr = relation_expr.map(map_exprs),
2640            Some(Distinct::EntireRow) => {
2641                if relation_type.arity() == 0 {
2642                    sql_bail!("SELECT DISTINCT must have at least one column");
2643                }
2644                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2645                // list, so we can't proceed if `ORDER BY` has introduced any
2646                // columns for arbitrary expressions. This matches PostgreSQL.
2647                if !try_push_projection_order_by(
2648                    &mut relation_expr,
2649                    &mut project_key,
2650                    &mut order_by,
2651                ) {
2652                    sql_bail!(
2653                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2654                    );
2655                }
2656                assert!(map_exprs.is_empty());
2657                relation_expr = relation_expr.distinct();
2658            }
2659            Some(Distinct::On(exprs)) => {
2660                let ecx = &ExprContext {
2661                    qcx,
2662                    name: "DISTINCT ON clause",
2663                    scope: &group_scope,
2664                    relation_type: &qcx.relation_type(&relation_expr),
2665                    allow_aggregates: true,
2666                    allow_subqueries: true,
2667                    allow_parameters: true,
2668                    allow_windows: true,
2669                };
2670
2671                let mut distinct_exprs = vec![];
2672                for expr in &exprs {
2673                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2674                    distinct_exprs.push(expr);
2675                }
2676
2677                let mut distinct_key = vec![];
2678
2679                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2680                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2681                // expressions, though the order of `DISTINCT ON` expressions
2682                // does not matter. This matches PostgreSQL and leaves the door
2683                // open to a future optimization where the `DISTINCT ON` and
2684                // `ORDER BY` operations happen in one pass.
2685                //
2686                // On the bright side, any columns that have already been
2687                // computed by `ORDER BY` can be reused in the distinct key.
2688                let arity = relation_type.arity();
2689                for ord in order_by.iter().take(distinct_exprs.len()) {
2690                    // The unusual construction of `expr` here is to ensure the
2691                    // temporary column expression lives long enough.
2692                    let mut expr = &HirScalarExpr::column(ord.column);
2693                    if ord.column >= arity {
2694                        expr = &map_exprs[ord.column - arity];
2695                    };
2696                    match distinct_exprs.iter().position(move |e| e == expr) {
2697                        None => sql_bail!(
2698                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2699                        ),
2700                        Some(pos) => {
2701                            distinct_exprs.remove(pos);
2702                        }
2703                    }
2704                    distinct_key.push(ord.column);
2705                }
2706
2707                // Add any remaining `DISTINCT ON` expressions to the key.
2708                for expr in distinct_exprs {
2709                    // If the expression is a reference to an existing column,
2710                    // do not introduce a new column to support it.
2711                    let column = match expr {
2712                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2713                        _ => {
2714                            map_exprs.push(expr);
2715                            arity + map_exprs.len() - 1
2716                        }
2717                    };
2718                    distinct_key.push(column);
2719                }
2720
2721                // `DISTINCT ON` is semantically a TopK with limit 1. The
2722                // columns in `ORDER BY` that are not part of the distinct key,
2723                // if there are any, determine the ordering within each group,
2724                // per PostgreSQL semantics.
2725                let distinct_len = distinct_key.len();
2726                relation_expr = HirRelationExpr::top_k(
2727                    relation_expr.map(map_exprs),
2728                    distinct_key,
2729                    order_by.iter().skip(distinct_len).cloned().collect(),
2730                    Some(HirScalarExpr::literal(
2731                        Datum::Int64(1),
2732                        SqlScalarType::Int64,
2733                    )),
2734                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2735                    group_size_hints.distinct_on_input_group_size,
2736                );
2737            }
2738        }
2739
2740        order_by
2741    };
2742
2743    // Construct a clean scope to expose outwards, where all of the state that
2744    // accumulated in the scope during planning of this SELECT is erased. The
2745    // clean scope has at most one name for each column, and the names are not
2746    // associated with any table.
2747    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2748
2749    Ok(SelectPlan {
2750        expr: relation_expr,
2751        scope,
2752        order_by,
2753        project: project_key,
2754    })
2755}
2756
2757fn plan_scalar_table_funcs(
2758    qcx: &QueryContext,
2759    table_funcs: BTreeMap<Function<Aug>, String>,
2760    table_func_names: &mut BTreeMap<String, Ident>,
2761    relation_expr: &HirRelationExpr,
2762    from_scope: &Scope,
2763) -> Result<(HirRelationExpr, Scope), PlanError> {
2764    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2765
2766    for (table_func, id) in table_funcs.iter() {
2767        table_func_names.insert(
2768            id.clone(),
2769            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2770            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2771        );
2772    }
2773    // If there's only a single table function, we can skip generating
2774    // ordinality columns.
2775    if table_funcs.len() == 1 {
2776        let (table_func, id) = table_funcs.iter().next().unwrap();
2777        let (expr, mut scope) =
2778            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2779
2780        // A single table-function might return several columns as a record
2781        let num_cols = scope.len();
2782        for i in 0..scope.len() {
2783            scope.items[i].table_name = Some(PartialItemName {
2784                database: None,
2785                schema: None,
2786                item: id.clone(),
2787            });
2788            scope.items[i].from_single_column_function = num_cols == 1;
2789            scope.items[i].allow_unqualified_references = false;
2790        }
2791        return Ok((expr, scope));
2792    }
2793    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2794    let (expr, mut scope, num_cols) =
2795        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2796
2797    // Munge the scope so table names match with the generated ids.
2798    let mut i = 0;
2799    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2800        for _ in 0..num_cols {
2801            scope.items[i].table_name = Some(PartialItemName {
2802                database: None,
2803                schema: None,
2804                item: id.clone(),
2805            });
2806            scope.items[i].from_single_column_function = num_cols == 1;
2807            scope.items[i].allow_unqualified_references = false;
2808            i += 1;
2809        }
2810        // Ordinality column. This doubles as the
2811        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2812        // because it only needs to be NULL or not.
2813        scope.items[i].table_name = Some(PartialItemName {
2814            database: None,
2815            schema: None,
2816            item: id.clone(),
2817        });
2818        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2819        scope.items[i].allow_unqualified_references = false;
2820        i += 1;
2821    }
2822    // Coalesced ordinality column.
2823    scope.items[i].allow_unqualified_references = false;
2824    Ok((expr, scope))
2825}
2826
2827/// Plans an expression in a `GROUP BY` clause.
2828///
2829/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2830/// names/expressions defined in the `SELECT` clause. These special cases are
2831/// handled by this function; see comments within the implementation for
2832/// details.
2833fn plan_group_by_expr<'a>(
2834    ecx: &ExprContext,
2835    group_expr: &'a Expr<Aug>,
2836    projection: &'a [(ExpandedSelectItem, ColumnName)],
2837) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2838    let plan_projection = |column: usize| match &projection[column].0 {
2839        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2840        ExpandedSelectItem::Expr(expr) => {
2841            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2842        }
2843    };
2844
2845    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2846    // a special case that means to use the ith item in the SELECT clause.
2847    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2848        return plan_projection(column);
2849    }
2850
2851    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2852    // The `foo` can refer to *either* an input column or an output column. If
2853    // both exist, the input column is preferred.
2854    match group_expr {
2855        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2856            Err(PlanError::UnknownColumn {
2857                table: None,
2858                column,
2859                similar,
2860            }) => {
2861                // The expression was a simple identifier that did not match an
2862                // input column. See if it matches an output column.
2863                let mut iter = projection.iter().map(|(_expr, name)| name);
2864                if let Some(i) = iter.position(|n| *n == column) {
2865                    if iter.any(|n| *n == column) {
2866                        Err(PlanError::AmbiguousColumn(column))
2867                    } else {
2868                        plan_projection(i)
2869                    }
2870                } else {
2871                    // The name didn't match an output column either. Return the
2872                    // "unknown column" error.
2873                    Err(PlanError::UnknownColumn {
2874                        table: None,
2875                        column,
2876                        similar,
2877                    })
2878                }
2879            }
2880            res => Ok((Some(group_expr), res?)),
2881        },
2882        _ => Ok((
2883            Some(group_expr),
2884            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2885        )),
2886    }
2887}
2888
2889/// Plans a slice of `ORDER BY` expressions.
2890///
2891/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2892/// parameter.
2893///
2894/// Returns the determined column orderings and a list of scalar expressions
2895/// that must be mapped onto the underlying relation expression.
2896pub(crate) fn plan_order_by_exprs(
2897    ecx: &ExprContext,
2898    order_by_exprs: &[OrderByExpr<Aug>],
2899    output_columns: &[(usize, &ColumnName)],
2900) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2901    let mut order_by = vec![];
2902    let mut map_exprs = vec![];
2903    for obe in order_by_exprs {
2904        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2905        // If the expression is a reference to an existing column,
2906        // do not introduce a new column to support it.
2907        let column = match expr {
2908            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2909            _ => {
2910                map_exprs.push(expr);
2911                ecx.relation_type.arity() + map_exprs.len() - 1
2912            }
2913        };
2914        order_by.push(resolve_desc_and_nulls_last(obe, column));
2915    }
2916    Ok((order_by, map_exprs))
2917}
2918
2919/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2920///
2921/// The `output_columns` parameter describes, in order, the physical index and
2922/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2923/// corresponds to a `SELECT` list with a single entry named "a" that can be
2924/// found at index 3 in the underlying relation expression.
2925///
2926/// There are three cases to handle.
2927///
2928///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2929///       reference to the specified output column.
2930///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2931///       an output column, if it exists; otherwise it is a reference to an
2932///       input column.
2933///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2934///       arbitrary expressions exclusively refer to input columns, never output
2935///       columns.
2936fn plan_order_by_or_distinct_expr(
2937    ecx: &ExprContext,
2938    expr: &Expr<Aug>,
2939    output_columns: &[(usize, &ColumnName)],
2940) -> Result<HirScalarExpr, PlanError> {
2941    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2942        return Ok(HirScalarExpr::column(output_columns[i].0));
2943    }
2944
2945    if let Expr::Identifier(names) = expr {
2946        if let [name] = &names[..] {
2947            let name = normalize::column_name(name.clone());
2948            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2949            if let Some((i, _)) = iter.next() {
2950                match iter.next() {
2951                    // Per SQL92, names are not considered ambiguous if they
2952                    // refer to identical target list expressions, as in
2953                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2954                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2955                    _ => return Ok(HirScalarExpr::column(*i)),
2956                }
2957            }
2958        }
2959    }
2960
2961    plan_expr(ecx, expr)?.type_as_any(ecx)
2962}
2963
2964fn plan_table_with_joins(
2965    qcx: &QueryContext,
2966    table_with_joins: &TableWithJoins<Aug>,
2967) -> Result<(HirRelationExpr, Scope), PlanError> {
2968    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2969    for join in &table_with_joins.joins {
2970        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2971        expr = new_expr;
2972        scope = new_scope;
2973    }
2974    Ok((expr, scope))
2975}
2976
2977fn plan_table_factor(
2978    qcx: &QueryContext,
2979    table_factor: &TableFactor<Aug>,
2980) -> Result<(HirRelationExpr, Scope), PlanError> {
2981    match table_factor {
2982        TableFactor::Table { name, alias } => {
2983            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2984            let scope = plan_table_alias(scope, alias.as_ref())?;
2985            Ok((expr, scope))
2986        }
2987
2988        TableFactor::Function {
2989            function,
2990            alias,
2991            with_ordinality,
2992        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2993
2994        TableFactor::RowsFrom {
2995            functions,
2996            alias,
2997            with_ordinality,
2998        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2999
3000        TableFactor::Derived {
3001            lateral,
3002            subquery,
3003            alias,
3004        } => {
3005            let mut qcx = (*qcx).clone();
3006            if !lateral {
3007                // Since this derived table was not marked as `LATERAL`,
3008                // make elements in outer scopes invisible until we reach the
3009                // next lateral barrier.
3010                for scope in &mut qcx.outer_scopes {
3011                    if scope.lateral_barrier {
3012                        break;
3013                    }
3014                    scope.items.clear();
3015                }
3016            }
3017            qcx.outer_scopes[0].lateral_barrier = true;
3018            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3019            let scope = plan_table_alias(scope, alias.as_ref())?;
3020            Ok((expr, scope))
3021        }
3022
3023        TableFactor::NestedJoin { join, alias } => {
3024            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3025            let scope = plan_table_alias(scope, alias.as_ref())?;
3026            Ok((expr, scope))
3027        }
3028    }
3029}
3030
3031/// Plans a `ROWS FROM` expression.
3032///
3033/// `ROWS FROM` concatenates table functions into a single table, filling in
3034/// `NULL`s in places where one table function has fewer rows than another. We
3035/// can achieve this by augmenting each table function with a row number, doing
3036/// a `FULL JOIN` between each table function on the row number and eventually
3037/// projecting away the row number columns. Concretely, the following query
3038/// using `ROWS FROM`
3039///
3040/// ```sql
3041/// SELECT
3042///     *
3043/// FROM
3044///     ROWS FROM (
3045///         generate_series(1, 2),
3046///         information_schema._pg_expandarray(ARRAY[9]),
3047///         generate_series(3, 6)
3048///     );
3049/// ```
3050///
3051/// is equivalent to the following query that does not use `ROWS FROM`:
3052///
3053/// ```sql
3054/// SELECT
3055///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3056/// FROM
3057///     generate_series(1, 2) WITH ORDINALITY AS gs1
3058///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3059///         ON gs1.ordinality = expand.ordinality
3060///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3061///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3062/// ```
3063///
3064/// Note the call to `coalesce` in the last join condition, which ensures that
3065/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3066///
3067/// This function creates a HirRelationExpr that follows the structure of the
3068/// latter query.
3069///
3070/// `with_ordinality` can be used to have the output expression contain a
3071/// single coalesced ordinality column at the end of the entire expression.
3072fn plan_rows_from(
3073    qcx: &QueryContext,
3074    functions: &[Function<Aug>],
3075    alias: Option<&TableAlias>,
3076    with_ordinality: bool,
3077) -> Result<(HirRelationExpr, Scope), PlanError> {
3078    // If there's only a single table function, planning proceeds as if `ROWS
3079    // FROM` hadn't been written at all.
3080    if let [function] = functions {
3081        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3082    }
3083
3084    // Per PostgreSQL, all scope items take the name of the first function
3085    // (unless aliased).
3086    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3087    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3088        qcx,
3089        functions,
3090        Some(functions[0].name.full_item_name().clone()),
3091    )?;
3092
3093    // Columns tracks the set of columns we will keep in the projection.
3094    let mut columns = Vec::new();
3095    let mut offset = 0;
3096    // Retain table function's non-ordinality columns.
3097    for (idx, cols) in num_cols.into_iter().enumerate() {
3098        for i in 0..cols {
3099            columns.push(offset + i);
3100        }
3101        offset += cols + 1;
3102
3103        // Remove the ordinality column from the scope, accounting for previous scope
3104        // changes from this loop.
3105        scope.items.remove(offset - idx - 1);
3106    }
3107
3108    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3109    // column. Otherwise remove it from the scope.
3110    if with_ordinality {
3111        columns.push(offset);
3112    } else {
3113        scope.items.pop();
3114    }
3115
3116    let expr = expr.project(columns);
3117
3118    let scope = plan_table_alias(scope, alias)?;
3119    Ok((expr, scope))
3120}
3121
3122/// Plans an expression coalescing multiple table functions. Each table
3123/// function is followed by its row ordinality. The entire expression is
3124/// followed by the coalesced row ordinality.
3125///
3126/// The returned Scope will set all item's table_name's to the `table_name`
3127/// parameter if it is `Some`. If `None`, they will be the name of each table
3128/// function.
3129///
3130/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3131/// each table function.
3132///
3133/// For example, with table functions tf1 returning 1 column (a) and tf2
3134/// returning 2 columns (b, c), this function will return an expr 6 columns:
3135///
3136/// - tf1.a
3137/// - tf1.ordinality
3138/// - tf2.b
3139/// - tf2.c
3140/// - tf2.ordinality
3141/// - coalesced_ordinality
3142///
3143/// And a `Vec<usize>` of `[1, 2]`.
3144fn plan_rows_from_internal<'a>(
3145    qcx: &QueryContext,
3146    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3147    table_name: Option<FullItemName>,
3148) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3149    let mut functions = functions.into_iter();
3150    let mut num_cols = Vec::new();
3151
3152    // Join together each of the table functions in turn. The last column is
3153    // always the column to join against and is maintained to be the coalescence
3154    // of the row number column for all prior functions.
3155    let (mut left_expr, mut left_scope) =
3156        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3157    num_cols.push(left_scope.len() - 1);
3158    // Create the coalesced ordinality column.
3159    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3160    left_scope
3161        .items
3162        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3163
3164    for function in functions {
3165        // The right hand side of a join must be planned in a new scope.
3166        let qcx = qcx.empty_derived_context();
3167        let (right_expr, mut right_scope) =
3168            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3169        num_cols.push(right_scope.len() - 1);
3170        let left_col = left_scope.len() - 1;
3171        let right_col = left_scope.len() + right_scope.len() - 1;
3172        let on = HirScalarExpr::call_binary(
3173            HirScalarExpr::column(left_col),
3174            HirScalarExpr::column(right_col),
3175            expr_func::Eq,
3176        );
3177        left_expr = left_expr
3178            .join(right_expr, on, JoinKind::FullOuter)
3179            .map(vec![HirScalarExpr::call_variadic(
3180                VariadicFunc::Coalesce,
3181                vec![
3182                    HirScalarExpr::column(left_col),
3183                    HirScalarExpr::column(right_col),
3184                ],
3185            )]);
3186
3187        // Project off the previous iteration's coalesced column, but keep both of this
3188        // iteration's ordinality columns.
3189        left_expr = left_expr.project(
3190            (0..left_col) // non-coalesced ordinality columns from left function
3191                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3192                .collect(),
3193        );
3194        // Move the coalesced ordinality column.
3195        right_scope.items.push(left_scope.items.pop().unwrap());
3196
3197        left_scope.items.extend(right_scope.items);
3198    }
3199
3200    Ok((left_expr, left_scope, num_cols))
3201}
3202
3203/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3204/// FROM` clause that contains other table functions. Special aliasing rules
3205/// apply.
3206fn plan_solitary_table_function(
3207    qcx: &QueryContext,
3208    function: &Function<Aug>,
3209    alias: Option<&TableAlias>,
3210    with_ordinality: bool,
3211) -> Result<(HirRelationExpr, Scope), PlanError> {
3212    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3213
3214    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3215    if single_column_function {
3216        let item = &mut scope.items[0];
3217
3218        // Mark that the function only produced a single column. This impacts
3219        // whole-row references.
3220        item.from_single_column_function = true;
3221
3222        // Strange special case for solitary table functions that output one
3223        // column whose name matches the name of the table function. If a table
3224        // alias is provided, the column name is changed to the table alias's
3225        // name. Concretely, the following query returns a column named `x`
3226        // rather than a column named `generate_series`:
3227        //
3228        //     SELECT * FROM generate_series(1, 5) AS x
3229        //
3230        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3231        // since its output column is explicitly named `value`, not
3232        // `jsonb_array_elements`.
3233        //
3234        // Note also that we may (correctly) change the column name again when
3235        // we plan the table alias below if the `alias.columns` is non-empty.
3236        if let Some(alias) = alias {
3237            if let ScopeItem {
3238                table_name: Some(table_name),
3239                column_name,
3240                ..
3241            } = item
3242            {
3243                if table_name.item.as_str() == column_name.as_str() {
3244                    *column_name = normalize::column_name(alias.name.clone());
3245                }
3246            }
3247        }
3248    }
3249
3250    let scope = plan_table_alias(scope, alias)?;
3251    Ok((expr, scope))
3252}
3253
3254/// Plans a table function.
3255///
3256/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3257/// instead to get the appropriate aliasing behavior.
3258fn plan_table_function_internal(
3259    qcx: &QueryContext,
3260    Function {
3261        name,
3262        args,
3263        filter,
3264        over,
3265        distinct,
3266    }: &Function<Aug>,
3267    with_ordinality: bool,
3268    table_name: Option<FullItemName>,
3269) -> Result<(HirRelationExpr, Scope), PlanError> {
3270    assert_none!(filter, "cannot parse table function with FILTER");
3271    assert_none!(over, "cannot parse table function with OVER");
3272    assert!(!*distinct, "cannot parse table function with DISTINCT");
3273
3274    let ecx = &ExprContext {
3275        qcx,
3276        name: "table function arguments",
3277        scope: &Scope::empty(),
3278        relation_type: &SqlRelationType::empty(),
3279        allow_aggregates: false,
3280        allow_subqueries: true,
3281        allow_parameters: true,
3282        allow_windows: false,
3283    };
3284
3285    let scalar_args = match args {
3286        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3287        FunctionArgs::Args { args, order_by } => {
3288            if !order_by.is_empty() {
3289                sql_bail!(
3290                    "ORDER BY specified, but {} is not an aggregate function",
3291                    name
3292                );
3293            }
3294            plan_exprs(ecx, args)?
3295        }
3296    };
3297
3298    let table_name = match table_name {
3299        Some(table_name) => table_name.item,
3300        None => name.full_item_name().item.clone(),
3301    };
3302
3303    let scope_name = Some(PartialItemName {
3304        database: None,
3305        schema: None,
3306        item: table_name,
3307    });
3308
3309    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3310        Func::Table(impls) => {
3311            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3312            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3313            let expr = match tf.imp {
3314                TableFuncImpl::CallTable { mut func, exprs } => {
3315                    if with_ordinality {
3316                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3317                            PlanError::Unsupported {
3318                                feature: format!("WITH ORDINALITY on {}", func),
3319                                discussion_no: None,
3320                            },
3321                        )?;
3322                    }
3323                    HirRelationExpr::CallTable { func, exprs }
3324                }
3325                TableFuncImpl::Expr(expr) => {
3326                    if !with_ordinality {
3327                        expr
3328                    } else {
3329                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3330                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3331                        // back to the legacy implementation or error out the query.
3332                        if qcx
3333                            .scx
3334                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3335                        {
3336                            // Note that this can give an incorrect ordering, and also has an extreme
3337                            // performance problem in some cases. See the doc comment of
3338                            // `TableFuncImpl`.
3339                            tracing::error!(
3340                                %name,
3341                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3342                            );
3343                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3344                                func: WindowExprType::Scalar(ScalarWindowExpr {
3345                                    func: ScalarWindowFunc::RowNumber,
3346                                    order_by: vec![],
3347                                }),
3348                                partition_by: vec![],
3349                                order_by: vec![],
3350                            })])
3351                        } else {
3352                            bail_unsupported!(format!(
3353                                "WITH ORDINALITY or ROWS FROM with {}",
3354                                name
3355                            ));
3356                        }
3357                    }
3358                }
3359            };
3360            (expr, scope)
3361        }
3362        Func::Scalar(impls) => {
3363            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3364            let output = expr.typ(
3365                &qcx.outer_relation_types,
3366                &SqlRelationType::new(vec![]),
3367                &qcx.scx.param_types.borrow(),
3368            );
3369
3370            let relation = SqlRelationType::new(vec![output]);
3371
3372            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3373            let column_name = normalize::column_name(function_ident);
3374            let name = column_name.to_string();
3375
3376            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3377
3378            let mut func = TableFunc::TabletizedScalar { relation, name };
3379            if with_ordinality {
3380                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3381                    feature: format!("WITH ORDINALITY on {}", func),
3382                    discussion_no: None,
3383                })?;
3384            }
3385            (
3386                HirRelationExpr::CallTable {
3387                    func,
3388                    exprs: vec![expr],
3389                },
3390                scope,
3391            )
3392        }
3393        o => sql_bail!(
3394            "{} functions are not supported in functions in FROM",
3395            o.class()
3396        ),
3397    };
3398
3399    if with_ordinality {
3400        scope
3401            .items
3402            .push(ScopeItem::from_name(scope_name, "ordinality"));
3403    }
3404
3405    Ok((expr, scope))
3406}
3407
3408fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3409    if let Some(TableAlias {
3410        name,
3411        columns,
3412        strict,
3413    }) = alias
3414    {
3415        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3416            sql_bail!(
3417                "{} has {} columns available but {} columns specified",
3418                name,
3419                scope.items.len(),
3420                columns.len()
3421            );
3422        }
3423
3424        let table_name = normalize::ident(name.to_owned());
3425        for (i, item) in scope.items.iter_mut().enumerate() {
3426            item.table_name = if item.allow_unqualified_references {
3427                Some(PartialItemName {
3428                    database: None,
3429                    schema: None,
3430                    item: table_name.clone(),
3431                })
3432            } else {
3433                // Columns that prohibit unqualified references are special
3434                // columns from the output of a NATURAL or USING join that can
3435                // only be referenced by their full, pre-join name. Applying an
3436                // alias to the output of that join renders those columns
3437                // inaccessible, which we accomplish here by setting the
3438                // table name to `None`.
3439                //
3440                // Concretely, consider:
3441                //
3442                //      CREATE TABLE t1 (a int);
3443                //      CREATE TABLE t2 (a int);
3444                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3445                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3446                //
3447                // In (1), the join has no alias. The underlying columns from
3448                // either side of the join can be referenced as `t1.a` and
3449                // `t2.a`, respectively, and the unqualified name `a` refers to
3450                // a column whose value is `coalesce(t1.a, t2.a)`.
3451                //
3452                // In (2), the join is aliased as `t`. The columns from either
3453                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3454                // the coalesced column can be named as either `a` or `t.a`.
3455                //
3456                // We previously had a bug [0] that mishandled this subtle
3457                // logic.
3458                //
3459                // NOTE(benesch): We could in theory choose to project away
3460                // those inaccessible columns and drop them from the scope
3461                // entirely, but that would require that this function also
3462                // take and return the `HirRelationExpr` that is being aliased,
3463                // which is a rather large refactor.
3464                //
3465                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3466                None
3467            };
3468            item.column_name = columns
3469                .get(i)
3470                .map(|a| normalize::column_name(a.clone()))
3471                .unwrap_or_else(|| item.column_name.clone());
3472        }
3473    }
3474    Ok(scope)
3475}
3476
3477// `table_func_names` is a mapping from a UUID to the original function
3478// name. The UUIDs are identifiers that have been rewritten from some table
3479// function expression, and this mapping restores the original names.
3480fn invent_column_name(
3481    ecx: &ExprContext,
3482    expr: &Expr<Aug>,
3483    table_func_names: &BTreeMap<String, Ident>,
3484) -> Option<ColumnName> {
3485    // We follow PostgreSQL exactly here, which has some complicated rules
3486    // around "high" and "low" quality names. Low quality names override other
3487    // low quality names but not high quality names.
3488    //
3489    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3490
3491    #[derive(Debug)]
3492    enum NameQuality {
3493        Low,
3494        High,
3495    }
3496
3497    fn invent(
3498        ecx: &ExprContext,
3499        expr: &Expr<Aug>,
3500        table_func_names: &BTreeMap<String, Ident>,
3501    ) -> Option<(ColumnName, NameQuality)> {
3502        match expr {
3503            Expr::Identifier(names) => {
3504                if let [name] = names.as_slice() {
3505                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3506                        return Some((
3507                            normalize::column_name(table_func_name.clone()),
3508                            NameQuality::High,
3509                        ));
3510                    }
3511                }
3512                names
3513                    .last()
3514                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3515            }
3516            Expr::Value(v) => match v {
3517                // Per PostgreSQL, `bool` and `interval` literals take on the name
3518                // of their type, but not other literal types.
3519                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3520                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3521                _ => None,
3522            },
3523            Expr::Function(func) => {
3524                let (schema, item) = match &func.name {
3525                    ResolvedItemName::Item {
3526                        qualifiers,
3527                        full_name,
3528                        ..
3529                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3530                    _ => unreachable!(),
3531                };
3532
3533                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3534                    || schema
3535                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3536                {
3537                    None
3538                } else {
3539                    Some((item.into(), NameQuality::High))
3540                }
3541            }
3542            Expr::HomogenizingFunction { function, .. } => Some((
3543                function.to_string().to_lowercase().into(),
3544                NameQuality::High,
3545            )),
3546            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3547            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3548            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3549            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3550            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3551                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3552                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3553            },
3554            Expr::Case { else_result, .. } => {
3555                match else_result
3556                    .as_ref()
3557                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3558                {
3559                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3560                    _ => Some(("case".into(), NameQuality::Low)),
3561                }
3562            }
3563            Expr::FieldAccess { field, .. } => {
3564                Some((normalize::column_name(field.clone()), NameQuality::High))
3565            }
3566            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3567            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3568            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3569                // A bit silly to have to plan the query here just to get its column
3570                // name, since we throw away the planned expression, but fixing this
3571                // requires a separate semantic analysis phase.
3572                let (_expr, scope) =
3573                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3574                scope
3575                    .items
3576                    .first()
3577                    .map(|name| (name.column_name.clone(), NameQuality::High))
3578            }
3579            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3580            _ => None,
3581        }
3582    }
3583
3584    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3585}
3586
3587#[derive(Debug)]
3588enum ExpandedSelectItem<'a> {
3589    InputOrdinal(usize),
3590    Expr(Cow<'a, Expr<Aug>>),
3591}
3592
3593impl ExpandedSelectItem<'_> {
3594    fn as_expr(&self) -> Option<&Expr<Aug>> {
3595        match self {
3596            ExpandedSelectItem::InputOrdinal(_) => None,
3597            ExpandedSelectItem::Expr(expr) => Some(expr),
3598        }
3599    }
3600}
3601
3602fn expand_select_item<'a>(
3603    ecx: &ExprContext,
3604    s: &'a SelectItem<Aug>,
3605    table_func_names: &BTreeMap<String, Ident>,
3606) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3607    match s {
3608        SelectItem::Expr {
3609            expr: Expr::QualifiedWildcard(table_name),
3610            alias: _,
3611        } => {
3612            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3613            let table_name =
3614                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3615            let out: Vec<_> = ecx
3616                .scope
3617                .items
3618                .iter()
3619                .enumerate()
3620                .filter(|(_i, item)| item.is_from_table(&table_name))
3621                .map(|(i, item)| {
3622                    let name = item.column_name.clone();
3623                    (ExpandedSelectItem::InputOrdinal(i), name)
3624                })
3625                .collect();
3626            if out.is_empty() {
3627                sql_bail!("no table named '{}' in scope", table_name);
3628            }
3629            Ok(out)
3630        }
3631        SelectItem::Expr {
3632            expr: Expr::WildcardAccess(sql_expr),
3633            alias: _,
3634        } => {
3635            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3636            // A bit silly to have to plan the expression here just to get its
3637            // type, since we throw away the planned expression, but fixing this
3638            // requires a separate semantic analysis phase. Luckily this is an
3639            // uncommon operation and the PostgreSQL docs have a warning that
3640            // this operation is slow in Postgres too.
3641            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3642            let fields = match ecx.scalar_type(&expr) {
3643                SqlScalarType::Record { fields, .. } => fields,
3644                ty => sql_bail!(
3645                    "type {} is not composite",
3646                    ecx.humanize_scalar_type(&ty, false)
3647                ),
3648            };
3649            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3650            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3651                if let [name] = ident.as_slice() {
3652                    if let Ok(items) = ecx.scope.items_from_table(
3653                        &[],
3654                        &PartialItemName {
3655                            database: None,
3656                            schema: None,
3657                            item: name.as_str().to_string(),
3658                        },
3659                    ) {
3660                        for (_, item) in items {
3661                            if item
3662                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3663                            {
3664                                skip_cols.insert(item.column_name.clone());
3665                            }
3666                        }
3667                    }
3668                }
3669            }
3670            let items = fields
3671                .iter()
3672                .filter_map(|(name, _ty)| {
3673                    if skip_cols.contains(name) {
3674                        None
3675                    } else {
3676                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3677                            expr: sql_expr.clone(),
3678                            field: name.clone().into(),
3679                        }));
3680                        Some((item, name.clone()))
3681                    }
3682                })
3683                .collect();
3684            Ok(items)
3685        }
3686        SelectItem::Wildcard => {
3687            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3688            let items: Vec<_> = ecx
3689                .scope
3690                .items
3691                .iter()
3692                .enumerate()
3693                .filter(|(_i, item)| item.allow_unqualified_references)
3694                .map(|(i, item)| {
3695                    let name = item.column_name.clone();
3696                    (ExpandedSelectItem::InputOrdinal(i), name)
3697                })
3698                .collect();
3699
3700            Ok(items)
3701        }
3702        SelectItem::Expr { expr, alias } => {
3703            let name = alias
3704                .clone()
3705                .map(normalize::column_name)
3706                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3707                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3708            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3709        }
3710    }
3711}
3712
3713fn plan_join(
3714    left_qcx: &QueryContext,
3715    left: HirRelationExpr,
3716    left_scope: Scope,
3717    join: &Join<Aug>,
3718) -> Result<(HirRelationExpr, Scope), PlanError> {
3719    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3720    let (kind, constraint) = match &join.join_operator {
3721        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3722        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3723        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3724        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3725        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3726    };
3727
3728    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3729    if !kind.can_be_correlated() {
3730        for item in &mut right_qcx.outer_scopes[0].items {
3731            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3732            // these items from scope. These items need to *exist* because they
3733            // might shadow variables in outer scopes that would otherwise be
3734            // valid to reference, but accessing them needs to produce an error.
3735            item.error_if_referenced =
3736                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3737                    table: table.cloned(),
3738                    column: column.clone(),
3739                });
3740        }
3741    }
3742    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3743
3744    let (expr, scope) = match constraint {
3745        JoinConstraint::On(expr) => {
3746            let product_scope = left_scope.product(right_scope)?;
3747            let ecx = &ExprContext {
3748                qcx: left_qcx,
3749                name: "ON clause",
3750                scope: &product_scope,
3751                relation_type: &SqlRelationType::new(
3752                    left_qcx
3753                        .relation_type(&left)
3754                        .column_types
3755                        .into_iter()
3756                        .chain(right_qcx.relation_type(&right).column_types)
3757                        .collect(),
3758                ),
3759                allow_aggregates: false,
3760                allow_subqueries: true,
3761                allow_parameters: true,
3762                allow_windows: false,
3763            };
3764            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3765            let joined = left.join(right, on, kind);
3766            (joined, product_scope)
3767        }
3768        JoinConstraint::Using { columns, alias } => {
3769            let column_names = columns
3770                .iter()
3771                .map(|ident| normalize::column_name(ident.clone()))
3772                .collect::<Vec<_>>();
3773
3774            plan_using_constraint(
3775                &column_names,
3776                left_qcx,
3777                left,
3778                left_scope,
3779                &right_qcx,
3780                right,
3781                right_scope,
3782                kind,
3783                alias.as_ref(),
3784            )?
3785        }
3786        JoinConstraint::Natural => {
3787            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3788            // have the same scx. However, it doesn't hurt to be safe.
3789            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3790            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3791            let left_column_names = left_scope.column_names();
3792            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3793            let column_names: Vec<_> = left_column_names
3794                .filter(|col| right_column_names.contains(col))
3795                .cloned()
3796                .collect();
3797            plan_using_constraint(
3798                &column_names,
3799                left_qcx,
3800                left,
3801                left_scope,
3802                &right_qcx,
3803                right,
3804                right_scope,
3805                kind,
3806                None,
3807            )?
3808        }
3809    };
3810    Ok((expr, scope))
3811}
3812
3813// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3814#[allow(clippy::too_many_arguments)]
3815fn plan_using_constraint(
3816    column_names: &[ColumnName],
3817    left_qcx: &QueryContext,
3818    left: HirRelationExpr,
3819    left_scope: Scope,
3820    right_qcx: &QueryContext,
3821    right: HirRelationExpr,
3822    right_scope: Scope,
3823    kind: JoinKind,
3824    alias: Option<&Ident>,
3825) -> Result<(HirRelationExpr, Scope), PlanError> {
3826    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3827
3828    // Cargo culting PG here; no discernable reason this must fail, but PG does
3829    // so we do, as well.
3830    let mut unique_column_names = BTreeSet::new();
3831    for c in column_names {
3832        if !unique_column_names.insert(c) {
3833            return Err(PlanError::Unsupported {
3834                feature: format!(
3835                    "column name {} appears more than once in USING clause",
3836                    c.quoted()
3837                ),
3838                discussion_no: None,
3839            });
3840        }
3841    }
3842
3843    let alias_item_name = alias.map(|alias| PartialItemName {
3844        database: None,
3845        schema: None,
3846        item: alias.clone().to_string(),
3847    });
3848
3849    if let Some(alias_item_name) = &alias_item_name {
3850        for partial_item_name in both_scope.table_names() {
3851            if partial_item_name.matches(alias_item_name) {
3852                sql_bail!(
3853                    "table name \"{}\" specified more than once",
3854                    alias_item_name
3855                )
3856            }
3857        }
3858    }
3859
3860    let ecx = &ExprContext {
3861        qcx: right_qcx,
3862        name: "USING clause",
3863        scope: &both_scope,
3864        relation_type: &SqlRelationType::new(
3865            left_qcx
3866                .relation_type(&left)
3867                .column_types
3868                .into_iter()
3869                .chain(right_qcx.relation_type(&right).column_types)
3870                .collect(),
3871        ),
3872        allow_aggregates: false,
3873        allow_subqueries: false,
3874        allow_parameters: false,
3875        allow_windows: false,
3876    };
3877
3878    let mut join_exprs = vec![];
3879    let mut map_exprs = vec![];
3880    let mut new_items = vec![];
3881    let mut join_cols = vec![];
3882    let mut hidden_cols = vec![];
3883
3884    for column_name in column_names {
3885        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3886        let (lhs, lhs_name) = left_scope.resolve_using_column(
3887            column_name,
3888            JoinSide::Left,
3889            &mut left_qcx.name_manager.borrow_mut(),
3890        )?;
3891        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3892            column_name,
3893            JoinSide::Right,
3894            &mut right_qcx.name_manager.borrow_mut(),
3895        )?;
3896
3897        // Adjust the RHS reference to its post-join location.
3898        rhs.column += left_scope.len();
3899
3900        // Join keys must be resolved to same type.
3901        let mut exprs = coerce_homogeneous_exprs(
3902            &ecx.with_name(&format!(
3903                "NATURAL/USING join column {}",
3904                column_name.quoted()
3905            )),
3906            vec![
3907                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3908                    lhs,
3909                    Arc::clone(&lhs_name),
3910                )),
3911                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3912                    rhs,
3913                    Arc::clone(&rhs_name),
3914                )),
3915            ],
3916            None,
3917        )?;
3918        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3919
3920        match kind {
3921            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3922                join_cols.push(lhs.column);
3923                hidden_cols.push(rhs.column);
3924            }
3925            JoinKind::RightOuter => {
3926                join_cols.push(rhs.column);
3927                hidden_cols.push(lhs.column);
3928            }
3929            JoinKind::FullOuter => {
3930                // Create a new column that will be the coalesced value of left
3931                // and right.
3932                join_cols.push(both_scope.items.len() + map_exprs.len());
3933                hidden_cols.push(lhs.column);
3934                hidden_cols.push(rhs.column);
3935                map_exprs.push(HirScalarExpr::call_variadic(
3936                    VariadicFunc::Coalesce,
3937                    vec![expr1.clone(), expr2.clone()],
3938                ));
3939                new_items.push(ScopeItem::from_column_name(column_name));
3940            }
3941        }
3942
3943        // If a `join_using_alias` is present, add a new scope item that accepts
3944        // only table-qualified references for each specified join column.
3945        // Unlike regular table aliases, a `join_using_alias` should not hide the
3946        // names of the joined relations.
3947        if alias_item_name.is_some() {
3948            let new_item_col = both_scope.items.len() + new_items.len();
3949            join_cols.push(new_item_col);
3950            hidden_cols.push(new_item_col);
3951
3952            new_items.push(ScopeItem::from_name(
3953                alias_item_name.clone(),
3954                column_name.clone().to_string(),
3955            ));
3956
3957            // Should be safe to use either `lhs` or `rhs` here since the column
3958            // is available in both scopes and must have the same type of the new item.
3959            // We (arbitrarily) choose the left name.
3960            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3961        }
3962
3963        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3964    }
3965    both_scope.items.extend(new_items);
3966
3967    // The columns from the secondary side of the join remain accessible by
3968    // their table-qualified name, but not by their column name alone. They are
3969    // also excluded from `SELECT *`.
3970    for c in hidden_cols {
3971        both_scope.items[c].allow_unqualified_references = false;
3972    }
3973
3974    // Reproject all returned elements to the front of the list.
3975    let project_key = join_cols
3976        .into_iter()
3977        .chain(0..both_scope.items.len())
3978        .unique()
3979        .collect::<Vec<_>>();
3980
3981    both_scope = both_scope.project(&project_key);
3982
3983    let on = HirScalarExpr::variadic_and(join_exprs);
3984
3985    let both = left
3986        .join(right, on, kind)
3987        .map(map_exprs)
3988        .project(project_key);
3989    Ok((both, both_scope))
3990}
3991
3992pub fn plan_expr<'a>(
3993    ecx: &'a ExprContext,
3994    e: &Expr<Aug>,
3995) -> Result<CoercibleScalarExpr, PlanError> {
3996    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3997}
3998
3999fn plan_expr_inner<'a>(
4000    ecx: &'a ExprContext,
4001    e: &Expr<Aug>,
4002) -> Result<CoercibleScalarExpr, PlanError> {
4003    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4004        // We've already calculated this expression.
4005        return Ok(HirScalarExpr::named_column(
4006            i,
4007            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4008        )
4009        .into());
4010    }
4011
4012    match e {
4013        // Names.
4014        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4015            Ok(plan_identifier(ecx, names)?.into())
4016        }
4017
4018        // Literals.
4019        Expr::Value(val) => plan_literal(val),
4020        Expr::Parameter(n) => plan_parameter(ecx, *n),
4021        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4022        Expr::List(exprs) => plan_list(ecx, exprs, None),
4023        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4024        Expr::Row { exprs } => plan_row(ecx, exprs),
4025
4026        // Generalized functions, operators, and casts.
4027        Expr::Op { op, expr1, expr2 } => {
4028            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4029        }
4030        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4031        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4032
4033        // Special functions and operators.
4034        Expr::Not { expr } => plan_not(ecx, expr),
4035        Expr::And { left, right } => plan_and(ecx, left, right),
4036        Expr::Or { left, right } => plan_or(ecx, left, right),
4037        Expr::IsExpr {
4038            expr,
4039            construct,
4040            negated,
4041        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4042        Expr::Case {
4043            operand,
4044            conditions,
4045            results,
4046            else_result,
4047        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4048        Expr::HomogenizingFunction { function, exprs } => {
4049            plan_homogenizing_function(ecx, function, exprs)
4050        }
4051        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4052            ecx,
4053            &None,
4054            &[l_expr.clone().equals(*r_expr.clone())],
4055            &[Expr::null()],
4056            &Some(Box::new(*l_expr.clone())),
4057        )?
4058        .into()),
4059        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4060        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4061        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4062        Expr::Like {
4063            expr,
4064            pattern,
4065            escape,
4066            case_insensitive,
4067            negated,
4068        } => Ok(plan_like(
4069            ecx,
4070            expr,
4071            pattern,
4072            escape.as_deref(),
4073            *case_insensitive,
4074            *negated,
4075        )?
4076        .into()),
4077
4078        Expr::InList {
4079            expr,
4080            list,
4081            negated,
4082        } => plan_in_list(ecx, expr, list, negated),
4083
4084        // Subqueries.
4085        Expr::Exists(query) => plan_exists(ecx, query),
4086        Expr::Subquery(query) => plan_subquery(ecx, query),
4087        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4088        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4089        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4090        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4091        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4092        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4093        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4094        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4095        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4096        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4097        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4098    }
4099}
4100
4101fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4102    if !ecx.allow_parameters {
4103        // It might be clearer to return an error like "cannot use parameter
4104        // here", but this is how PostgreSQL does it, and so for now we follow
4105        // PostgreSQL.
4106        return Err(PlanError::UnknownParameter(n));
4107    }
4108    if n == 0 || n > 65536 {
4109        return Err(PlanError::UnknownParameter(n));
4110    }
4111    if ecx.param_types().borrow().contains_key(&n) {
4112        Ok(HirScalarExpr::parameter(n).into())
4113    } else {
4114        Ok(CoercibleScalarExpr::Parameter(n))
4115    }
4116}
4117
4118fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4119    let mut out = vec![];
4120    for e in exprs {
4121        out.push(plan_expr(ecx, e)?);
4122    }
4123    Ok(CoercibleScalarExpr::LiteralRecord(out))
4124}
4125
4126fn plan_cast(
4127    ecx: &ExprContext,
4128    expr: &Expr<Aug>,
4129    data_type: &ResolvedDataType,
4130) -> Result<CoercibleScalarExpr, PlanError> {
4131    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4132    let expr = match expr {
4133        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4134        // we can pass in the target type as a type hint. This is
4135        // a limited form of the coercion that we do for string literals
4136        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4137        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4138        // PostgreSQL compatibility trouble.
4139        //
4140        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4141        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4142        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4143        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4144        _ => plan_expr(ecx, expr)?,
4145    };
4146    let ecx = &ecx.with_name("CAST");
4147    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4148    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4149    Ok(expr.into())
4150}
4151
4152fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4153    let ecx = ecx.with_name("NOT argument");
4154    Ok(plan_expr(&ecx, expr)?
4155        .type_as(&ecx, &SqlScalarType::Bool)?
4156        .call_unary(UnaryFunc::Not(expr_func::Not))
4157        .into())
4158}
4159
4160fn plan_and(
4161    ecx: &ExprContext,
4162    left: &Expr<Aug>,
4163    right: &Expr<Aug>,
4164) -> Result<CoercibleScalarExpr, PlanError> {
4165    let ecx = ecx.with_name("AND argument");
4166    Ok(HirScalarExpr::variadic_and(vec![
4167        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4168        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4169    ])
4170    .into())
4171}
4172
4173fn plan_or(
4174    ecx: &ExprContext,
4175    left: &Expr<Aug>,
4176    right: &Expr<Aug>,
4177) -> Result<CoercibleScalarExpr, PlanError> {
4178    let ecx = ecx.with_name("OR argument");
4179    Ok(HirScalarExpr::variadic_or(vec![
4180        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4181        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4182    ])
4183    .into())
4184}
4185
4186fn plan_in_list(
4187    ecx: &ExprContext,
4188    lhs: &Expr<Aug>,
4189    list: &Vec<Expr<Aug>>,
4190    negated: &bool,
4191) -> Result<CoercibleScalarExpr, PlanError> {
4192    let ecx = ecx.with_name("IN list");
4193    let or = HirScalarExpr::variadic_or(
4194        list.into_iter()
4195            .map(|e| {
4196                let eq = lhs.clone().equals(e.clone());
4197                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4198            })
4199            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4200    );
4201    Ok(if *negated {
4202        or.call_unary(UnaryFunc::Not(expr_func::Not))
4203    } else {
4204        or
4205    }
4206    .into())
4207}
4208
4209fn plan_homogenizing_function(
4210    ecx: &ExprContext,
4211    function: &HomogenizingFunction,
4212    exprs: &[Expr<Aug>],
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4215    let expr = HirScalarExpr::call_variadic(
4216        match function {
4217            HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4218            HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4219            HomogenizingFunction::Least => VariadicFunc::Least,
4220        },
4221        coerce_homogeneous_exprs(
4222            &ecx.with_name(&function.to_string().to_lowercase()),
4223            plan_exprs(ecx, exprs)?,
4224            None,
4225        )?,
4226    );
4227    Ok(expr.into())
4228}
4229
4230fn plan_field_access(
4231    ecx: &ExprContext,
4232    expr: &Expr<Aug>,
4233    field: &Ident,
4234) -> Result<CoercibleScalarExpr, PlanError> {
4235    let field = normalize::column_name(field.clone());
4236    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4237    let ty = ecx.scalar_type(&expr);
4238    let i = match &ty {
4239        SqlScalarType::Record { fields, .. } => {
4240            fields.iter().position(|(name, _ty)| *name == field)
4241        }
4242        ty => sql_bail!(
4243            "column notation applied to type {}, which is not a composite type",
4244            ecx.humanize_scalar_type(ty, false)
4245        ),
4246    };
4247    match i {
4248        None => sql_bail!(
4249            "field {} not found in data type {}",
4250            field,
4251            ecx.humanize_scalar_type(&ty, false)
4252        ),
4253        Some(i) => Ok(expr
4254            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4255            .into()),
4256    }
4257}
4258
4259fn plan_subscript(
4260    ecx: &ExprContext,
4261    expr: &Expr<Aug>,
4262    positions: &[SubscriptPosition<Aug>],
4263) -> Result<CoercibleScalarExpr, PlanError> {
4264    assert!(
4265        !positions.is_empty(),
4266        "subscript expression must contain at least one position"
4267    );
4268
4269    let ecx = &ecx.with_name("subscripting");
4270    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4271    let ty = ecx.scalar_type(&expr);
4272    match &ty {
4273        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4274            ecx,
4275            expr,
4276            positions,
4277            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4278            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4279            // track in its backing data).
4280            if ty == SqlScalarType::Int2Vector {
4281                1
4282            } else {
4283                0
4284            },
4285        ),
4286        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4287        SqlScalarType::List { element_type, .. } => {
4288            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4289            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4290            let n_layers = ty.unwrap_list_n_layers();
4291            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4292        }
4293        ty => sql_bail!(
4294            "cannot subscript type {}",
4295            ecx.humanize_scalar_type(ty, false)
4296        ),
4297    }
4298}
4299
4300// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4301// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4302// any were slices (i.e. included colon).
4303fn extract_scalar_subscript_from_positions<'a>(
4304    positions: &'a [SubscriptPosition<Aug>],
4305    expr_type_name: &str,
4306) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4307    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4308    for p in positions {
4309        if p.explicit_slice {
4310            sql_bail!("{} subscript does not support slices", expr_type_name);
4311        }
4312        assert!(
4313            p.end.is_none(),
4314            "index-appearing subscripts cannot have end value"
4315        );
4316        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4317    }
4318    Ok(scalar_subscripts)
4319}
4320
4321fn plan_subscript_array(
4322    ecx: &ExprContext,
4323    expr: HirScalarExpr,
4324    positions: &[SubscriptPosition<Aug>],
4325    offset: i64,
4326) -> Result<CoercibleScalarExpr, PlanError> {
4327    let mut exprs = Vec::with_capacity(positions.len() + 1);
4328    exprs.push(expr);
4329
4330    // Subscripting arrays doesn't yet support slicing, so we always want to
4331    // extract scalars or error.
4332    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4333
4334    for i in indexes {
4335        exprs.push(plan_expr(ecx, i)?.cast_to(
4336            ecx,
4337            CastContext::Explicit,
4338            &SqlScalarType::Int64,
4339        )?);
4340    }
4341
4342    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4343}
4344
4345fn plan_subscript_list(
4346    ecx: &ExprContext,
4347    mut expr: HirScalarExpr,
4348    positions: &[SubscriptPosition<Aug>],
4349    mut remaining_layers: usize,
4350    elem_type_name: &str,
4351) -> Result<CoercibleScalarExpr, PlanError> {
4352    let mut i = 0;
4353
4354    while i < positions.len() {
4355        // Take all contiguous index operations, i.e. find next slice operation.
4356        let j = positions[i..]
4357            .iter()
4358            .position(|p| p.explicit_slice)
4359            .unwrap_or(positions.len() - i);
4360        if j != 0 {
4361            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4362            let (n, e) = plan_index_list(
4363                ecx,
4364                expr,
4365                indexes.as_slice(),
4366                remaining_layers,
4367                elem_type_name,
4368            )?;
4369            remaining_layers = n;
4370            expr = e;
4371            i += j;
4372        }
4373
4374        // Take all contiguous slice operations, i.e. find next index operation.
4375        let j = positions[i..]
4376            .iter()
4377            .position(|p| !p.explicit_slice)
4378            .unwrap_or(positions.len() - i);
4379        if j != 0 {
4380            expr = plan_slice_list(
4381                ecx,
4382                expr,
4383                &positions[i..i + j],
4384                remaining_layers,
4385                elem_type_name,
4386            )?;
4387            i += j;
4388        }
4389    }
4390
4391    Ok(expr.into())
4392}
4393
4394fn plan_index_list(
4395    ecx: &ExprContext,
4396    expr: HirScalarExpr,
4397    indexes: &[&Expr<Aug>],
4398    n_layers: usize,
4399    elem_type_name: &str,
4400) -> Result<(usize, HirScalarExpr), PlanError> {
4401    let depth = indexes.len();
4402
4403    if depth > n_layers {
4404        if n_layers == 0 {
4405            sql_bail!("cannot subscript type {}", elem_type_name)
4406        } else {
4407            sql_bail!(
4408                "cannot index into {} layers; list only has {} layer{}",
4409                depth,
4410                n_layers,
4411                if n_layers == 1 { "" } else { "s" }
4412            )
4413        }
4414    }
4415
4416    let mut exprs = Vec::with_capacity(depth + 1);
4417    exprs.push(expr);
4418
4419    for i in indexes {
4420        exprs.push(plan_expr(ecx, i)?.cast_to(
4421            ecx,
4422            CastContext::Explicit,
4423            &SqlScalarType::Int64,
4424        )?);
4425    }
4426
4427    Ok((
4428        n_layers - depth,
4429        HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4430    ))
4431}
4432
4433fn plan_slice_list(
4434    ecx: &ExprContext,
4435    expr: HirScalarExpr,
4436    slices: &[SubscriptPosition<Aug>],
4437    n_layers: usize,
4438    elem_type_name: &str,
4439) -> Result<HirScalarExpr, PlanError> {
4440    if n_layers == 0 {
4441        sql_bail!("cannot subscript type {}", elem_type_name)
4442    }
4443
4444    // first arg will be list
4445    let mut exprs = Vec::with_capacity(slices.len() + 1);
4446    exprs.push(expr);
4447    // extract (start, end) parts from collected slices
4448    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4449        Ok(match position {
4450            Some(p) => {
4451                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4452            }
4453            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4454        })
4455    };
4456    for p in slices {
4457        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4458        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4459        exprs.push(start);
4460        exprs.push(end);
4461    }
4462
4463    Ok(HirScalarExpr::call_variadic(
4464        VariadicFunc::ListSliceLinear,
4465        exprs,
4466    ))
4467}
4468
4469fn plan_like(
4470    ecx: &ExprContext,
4471    expr: &Expr<Aug>,
4472    pattern: &Expr<Aug>,
4473    escape: Option<&Expr<Aug>>,
4474    case_insensitive: bool,
4475    not: bool,
4476) -> Result<HirScalarExpr, PlanError> {
4477    use CastContext::Implicit;
4478    let ecx = ecx.with_name("LIKE argument");
4479    let expr = plan_expr(&ecx, expr)?;
4480    let haystack = match ecx.scalar_type(&expr) {
4481        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4482            .type_as(&ecx, ty)?
4483            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4484        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4485    };
4486    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4487    if let Some(escape) = escape {
4488        pattern = pattern.call_binary(
4489            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4490            expr_func::LikeEscape,
4491        );
4492    }
4493    let func: BinaryFunc = if case_insensitive {
4494        expr_func::IsLikeMatchCaseInsensitive.into()
4495    } else {
4496        expr_func::IsLikeMatchCaseSensitive.into()
4497    };
4498    let like = haystack.call_binary(pattern, func);
4499    if not {
4500        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4501    } else {
4502        Ok(like)
4503    }
4504}
4505
4506fn plan_subscript_jsonb(
4507    ecx: &ExprContext,
4508    expr: HirScalarExpr,
4509    positions: &[SubscriptPosition<Aug>],
4510) -> Result<CoercibleScalarExpr, PlanError> {
4511    use CastContext::Implicit;
4512    use SqlScalarType::{Int64, String};
4513
4514    // JSONB doesn't support the slicing syntax, so simply error if you
4515    // encounter any explicit slices.
4516    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4517
4518    let mut exprs = Vec::with_capacity(subscripts.len());
4519    for s in subscripts {
4520        let subscript = plan_expr(ecx, s)?;
4521        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4522            subscript
4523        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4524            // Integers are converted to a string here and then re-parsed as an
4525            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4526            // do it.
4527            typeconv::to_string(ecx, subscript)
4528        } else {
4529            sql_bail!("jsonb subscript type must be coercible to integer or text");
4530        };
4531        exprs.push(subscript);
4532    }
4533
4534    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4535    // `expr->subscript` as you might expect.
4536    let expr = expr.call_binary(
4537        HirScalarExpr::call_variadic(
4538            VariadicFunc::ArrayCreate {
4539                elem_type: SqlScalarType::String,
4540            },
4541            exprs,
4542        ),
4543        BinaryFunc::JsonbGetPath,
4544    );
4545    Ok(expr.into())
4546}
4547
4548fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4549    if !ecx.allow_subqueries {
4550        sql_bail!("{} does not allow subqueries", ecx.name)
4551    }
4552    let mut qcx = ecx.derived_query_context();
4553    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4554    Ok(expr.exists().into())
4555}
4556
4557fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4558    if !ecx.allow_subqueries {
4559        sql_bail!("{} does not allow subqueries", ecx.name)
4560    }
4561    let mut qcx = ecx.derived_query_context();
4562    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4563    let column_types = qcx.relation_type(&expr).column_types;
4564    if column_types.len() != 1 {
4565        sql_bail!(
4566            "Expected subselect to return 1 column, got {} columns",
4567            column_types.len()
4568        );
4569    }
4570    Ok(expr.select().into())
4571}
4572
4573fn plan_list_subquery(
4574    ecx: &ExprContext,
4575    query: &Query<Aug>,
4576) -> Result<CoercibleScalarExpr, PlanError> {
4577    plan_vector_like_subquery(
4578        ecx,
4579        query,
4580        |_| false,
4581        |elem_type| VariadicFunc::ListCreate { elem_type },
4582        |order_by| AggregateFunc::ListConcat { order_by },
4583        expr_func::ListListConcat.into(),
4584        |elem_type| {
4585            HirScalarExpr::literal(
4586                Datum::empty_list(),
4587                SqlScalarType::List {
4588                    element_type: Box::new(elem_type),
4589                    custom_id: None,
4590                },
4591            )
4592        },
4593        "list",
4594    )
4595}
4596
4597fn plan_array_subquery(
4598    ecx: &ExprContext,
4599    query: &Query<Aug>,
4600) -> Result<CoercibleScalarExpr, PlanError> {
4601    plan_vector_like_subquery(
4602        ecx,
4603        query,
4604        |elem_type| {
4605            matches!(
4606                elem_type,
4607                SqlScalarType::Char { .. }
4608                    | SqlScalarType::Array { .. }
4609                    | SqlScalarType::List { .. }
4610                    | SqlScalarType::Map { .. }
4611            )
4612        },
4613        |elem_type| VariadicFunc::ArrayCreate { elem_type },
4614        |order_by| AggregateFunc::ArrayConcat { order_by },
4615        expr_func::ArrayArrayConcat.into(),
4616        |elem_type| {
4617            HirScalarExpr::literal(
4618                Datum::empty_array(),
4619                SqlScalarType::Array(Box::new(elem_type)),
4620            )
4621        },
4622        "[]",
4623    )
4624}
4625
4626/// Generic function used to plan both array subqueries and list subqueries
4627fn plan_vector_like_subquery<F1, F2, F3, F4>(
4628    ecx: &ExprContext,
4629    query: &Query<Aug>,
4630    is_unsupported_type: F1,
4631    vector_create: F2,
4632    aggregate_concat: F3,
4633    binary_concat: BinaryFunc,
4634    empty_literal: F4,
4635    vector_type_string: &str,
4636) -> Result<CoercibleScalarExpr, PlanError>
4637where
4638    F1: Fn(&SqlScalarType) -> bool,
4639    F2: Fn(SqlScalarType) -> VariadicFunc,
4640    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4641    F4: Fn(SqlScalarType) -> HirScalarExpr,
4642{
4643    if !ecx.allow_subqueries {
4644        sql_bail!("{} does not allow subqueries", ecx.name)
4645    }
4646
4647    let mut qcx = ecx.derived_query_context();
4648    let mut planned_query = plan_query(&mut qcx, query)?;
4649    if planned_query.limit.is_some()
4650        || !planned_query
4651            .offset
4652            .clone()
4653            .try_into_literal_int64()
4654            .is_ok_and(|offset| offset == 0)
4655    {
4656        planned_query.expr = HirRelationExpr::top_k(
4657            planned_query.expr,
4658            vec![],
4659            planned_query.order_by.clone(),
4660            planned_query.limit,
4661            planned_query.offset,
4662            planned_query.group_size_hints.limit_input_group_size,
4663        );
4664    }
4665
4666    if planned_query.project.len() != 1 {
4667        sql_bail!(
4668            "Expected subselect to return 1 column, got {} columns",
4669            planned_query.project.len()
4670        );
4671    }
4672
4673    let project_column = *planned_query.project.get(0).unwrap();
4674    let elem_type = qcx
4675        .relation_type(&planned_query.expr)
4676        .column_types
4677        .get(project_column)
4678        .cloned()
4679        .unwrap()
4680        .scalar_type();
4681
4682    if is_unsupported_type(&elem_type) {
4683        bail_unsupported!(format!(
4684            "cannot build array from subquery because return type {}{}",
4685            ecx.humanize_scalar_type(&elem_type, false),
4686            vector_type_string
4687        ));
4688    }
4689
4690    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4691    // subquery above.
4692    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4693        vector_create(elem_type.clone()),
4694        vec![HirScalarExpr::column(project_column)],
4695    ))
4696    .chain(
4697        planned_query
4698            .order_by
4699            .iter()
4700            .map(|co| HirScalarExpr::column(co.column)),
4701    )
4702    .collect();
4703
4704    // However, column references for `aggregation_projection` and `aggregation_order_by`
4705    // are with reference to the `exprs` of the aggregation expression.  Here that is
4706    // `aggregation_exprs`.
4707    let aggregation_projection = vec![0];
4708    let aggregation_order_by = planned_query
4709        .order_by
4710        .into_iter()
4711        .enumerate()
4712        .map(|(i, order)| ColumnOrder { column: i, ..order })
4713        .collect();
4714
4715    let reduced_expr = planned_query
4716        .expr
4717        .reduce(
4718            vec![],
4719            vec![AggregateExpr {
4720                func: aggregate_concat(aggregation_order_by),
4721                expr: Box::new(HirScalarExpr::call_variadic(
4722                    VariadicFunc::RecordCreate {
4723                        field_names: iter::repeat(ColumnName::from(""))
4724                            .take(aggregation_exprs.len())
4725                            .collect(),
4726                    },
4727                    aggregation_exprs,
4728                )),
4729                distinct: false,
4730            }],
4731            None,
4732        )
4733        .project(aggregation_projection);
4734
4735    // If `expr` has no rows, return an empty array/list rather than NULL.
4736    Ok(reduced_expr
4737        .select()
4738        .call_binary(empty_literal(elem_type), binary_concat)
4739        .into())
4740}
4741
4742fn plan_map_subquery(
4743    ecx: &ExprContext,
4744    query: &Query<Aug>,
4745) -> Result<CoercibleScalarExpr, PlanError> {
4746    if !ecx.allow_subqueries {
4747        sql_bail!("{} does not allow subqueries", ecx.name)
4748    }
4749
4750    let mut qcx = ecx.derived_query_context();
4751    let mut query = plan_query(&mut qcx, query)?;
4752    if query.limit.is_some()
4753        || !query
4754            .offset
4755            .clone()
4756            .try_into_literal_int64()
4757            .is_ok_and(|offset| offset == 0)
4758    {
4759        query.expr = HirRelationExpr::top_k(
4760            query.expr,
4761            vec![],
4762            query.order_by.clone(),
4763            query.limit,
4764            query.offset,
4765            query.group_size_hints.limit_input_group_size,
4766        );
4767    }
4768    if query.project.len() != 2 {
4769        sql_bail!(
4770            "expected map subquery to return 2 columns, got {} columns",
4771            query.project.len()
4772        );
4773    }
4774
4775    let query_types = qcx.relation_type(&query.expr).column_types;
4776    let key_column = query.project[0];
4777    let key_type = query_types[key_column].clone().scalar_type();
4778    let value_column = query.project[1];
4779    let value_type = query_types[value_column].clone().scalar_type();
4780
4781    if key_type != SqlScalarType::String {
4782        sql_bail!("cannot build map from subquery because first column is not of type text");
4783    }
4784
4785    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4786        VariadicFunc::RecordCreate {
4787            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4788        },
4789        vec![
4790            HirScalarExpr::column(key_column),
4791            HirScalarExpr::column(value_column),
4792        ],
4793    ))
4794    .chain(
4795        query
4796            .order_by
4797            .iter()
4798            .map(|co| HirScalarExpr::column(co.column)),
4799    )
4800    .collect();
4801
4802    let expr = query
4803        .expr
4804        .reduce(
4805            vec![],
4806            vec![AggregateExpr {
4807                func: AggregateFunc::MapAgg {
4808                    order_by: query
4809                        .order_by
4810                        .into_iter()
4811                        .enumerate()
4812                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4813                        .collect(),
4814                    value_type: value_type.clone(),
4815                },
4816                expr: Box::new(HirScalarExpr::call_variadic(
4817                    VariadicFunc::RecordCreate {
4818                        field_names: iter::repeat(ColumnName::from(""))
4819                            .take(aggregation_exprs.len())
4820                            .collect(),
4821                    },
4822                    aggregation_exprs,
4823                )),
4824                distinct: false,
4825            }],
4826            None,
4827        )
4828        .project(vec![0]);
4829
4830    // If `expr` has no rows, return an empty map rather than NULL.
4831    let expr = HirScalarExpr::call_variadic(
4832        VariadicFunc::Coalesce,
4833        vec![
4834            expr.select(),
4835            HirScalarExpr::literal(
4836                Datum::empty_map(),
4837                SqlScalarType::Map {
4838                    value_type: Box::new(value_type),
4839                    custom_id: None,
4840                },
4841            ),
4842        ],
4843    );
4844
4845    Ok(expr.into())
4846}
4847
4848fn plan_collate(
4849    ecx: &ExprContext,
4850    expr: &Expr<Aug>,
4851    collation: &UnresolvedItemName,
4852) -> Result<CoercibleScalarExpr, PlanError> {
4853    if collation.0.len() == 2
4854        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4855        && collation.0[1] == ident!("default")
4856    {
4857        plan_expr(ecx, expr)
4858    } else {
4859        bail_unsupported!("COLLATE");
4860    }
4861}
4862
4863/// Plans a slice of expressions.
4864///
4865/// This function is a simple convenience function for mapping [`plan_expr`]
4866/// over a slice of expressions. The planned expressions are returned in the
4867/// same order as the input. If any of the expressions fail to plan, returns an
4868/// error instead.
4869fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4870where
4871    E: std::borrow::Borrow<Expr<Aug>>,
4872{
4873    let mut out = vec![];
4874    for expr in exprs {
4875        out.push(plan_expr(ecx, expr.borrow())?);
4876    }
4877    Ok(out)
4878}
4879
4880/// Plans an `ARRAY` expression.
4881fn plan_array(
4882    ecx: &ExprContext,
4883    exprs: &[Expr<Aug>],
4884    type_hint: Option<&SqlScalarType>,
4885) -> Result<CoercibleScalarExpr, PlanError> {
4886    // Plan each element expression.
4887    let mut out = vec![];
4888    for expr in exprs {
4889        out.push(match expr {
4890            // Special case nested ARRAY expressions so we can plumb
4891            // the type hint through.
4892            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4893            _ => plan_expr(ecx, expr)?,
4894        });
4895    }
4896
4897    // Attempt to make use of the type hint.
4898    let type_hint = match type_hint {
4899        // The user has provided an explicit cast to an array type. We know the
4900        // element type to coerce to. Need to be careful, though: if there's
4901        // evidence that any of the array elements are themselves arrays, we
4902        // want to coerce to the array type, not the element type.
4903        Some(SqlScalarType::Array(elem_type)) => {
4904            let multidimensional = out.iter().any(|e| {
4905                matches!(
4906                    ecx.scalar_type(e),
4907                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4908                )
4909            });
4910            if multidimensional {
4911                type_hint
4912            } else {
4913                Some(&**elem_type)
4914            }
4915        }
4916        // The user provided an explicit cast to a non-array type. We'll have to
4917        // guess what the correct type for the array. Our caller will then
4918        // handle converting that array type to the desired non-array type.
4919        Some(_) => None,
4920        // No type hint. We'll have to guess the correct type for the array.
4921        None => None,
4922    };
4923
4924    // Coerce all elements to the same type.
4925    let (elem_type, exprs) = if exprs.is_empty() {
4926        if let Some(elem_type) = type_hint {
4927            (elem_type.clone(), vec![])
4928        } else {
4929            sql_bail!("cannot determine type of empty array");
4930        }
4931    } else {
4932        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4933        (ecx.scalar_type(&out[0]), out)
4934    };
4935
4936    // Arrays of `char` type are disallowed due to a known limitation:
4937    // https://github.com/MaterializeInc/database-issues/issues/2360.
4938    //
4939    // Arrays of `list` and `map` types are disallowed due to mind-bending
4940    // semantics.
4941    if matches!(
4942        elem_type,
4943        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4944    ) {
4945        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4946    }
4947
4948    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4949}
4950
4951fn plan_list(
4952    ecx: &ExprContext,
4953    exprs: &[Expr<Aug>],
4954    type_hint: Option<&SqlScalarType>,
4955) -> Result<CoercibleScalarExpr, PlanError> {
4956    let (elem_type, exprs) = if exprs.is_empty() {
4957        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4958            (element_type.without_modifiers(), vec![])
4959        } else {
4960            sql_bail!("cannot determine type of empty list");
4961        }
4962    } else {
4963        let type_hint = match type_hint {
4964            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4965            _ => None,
4966        };
4967
4968        let mut out = vec![];
4969        for expr in exprs {
4970            out.push(match expr {
4971                // Special case nested LIST expressions so we can plumb
4972                // the type hint through.
4973                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4974                _ => plan_expr(ecx, expr)?,
4975            });
4976        }
4977        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4978        (ecx.scalar_type(&out[0]).without_modifiers(), out)
4979    };
4980
4981    if matches!(elem_type, SqlScalarType::Char { .. }) {
4982        bail_unsupported!("char list");
4983    }
4984
4985    Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4986}
4987
4988fn plan_map(
4989    ecx: &ExprContext,
4990    entries: &[MapEntry<Aug>],
4991    type_hint: Option<&SqlScalarType>,
4992) -> Result<CoercibleScalarExpr, PlanError> {
4993    let (value_type, exprs) = if entries.is_empty() {
4994        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
4995            (value_type.without_modifiers(), vec![])
4996        } else {
4997            sql_bail!("cannot determine type of empty map");
4998        }
4999    } else {
5000        let type_hint = match type_hint {
5001            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5002            _ => None,
5003        };
5004
5005        let mut keys = vec![];
5006        let mut values = vec![];
5007        for MapEntry { key, value } in entries {
5008            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5009            let value = match value {
5010                // Special case nested MAP expressions so we can plumb
5011                // the type hint through.
5012                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5013                _ => plan_expr(ecx, value)?,
5014            };
5015            keys.push(key);
5016            values.push(value);
5017        }
5018        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5019        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5020        let out = itertools::interleave(keys, values).collect();
5021        (value_type, out)
5022    };
5023
5024    if matches!(value_type, SqlScalarType::Char { .. }) {
5025        bail_unsupported!("char map");
5026    }
5027
5028    let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5029    Ok(expr.into())
5030}
5031
5032/// Coerces a list of expressions such that all input expressions will be cast
5033/// to the same type. If successful, returns a new list of expressions in the
5034/// same order as the input, where each expression has the appropriate casts to
5035/// make them all of a uniform type.
5036///
5037/// If `force_type` is `Some`, the expressions are forced to the specified type
5038/// via an explicit cast. Otherwise the best common type is guessed via
5039/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5040/// implicit casts
5041///
5042/// Note that this is our implementation of Postgres' type conversion for
5043/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5044/// isn't yet used in all of those cases.
5045///
5046/// [union-type-conv]:
5047/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5048pub fn coerce_homogeneous_exprs(
5049    ecx: &ExprContext,
5050    exprs: Vec<CoercibleScalarExpr>,
5051    force_type: Option<&SqlScalarType>,
5052) -> Result<Vec<HirScalarExpr>, PlanError> {
5053    assert!(!exprs.is_empty());
5054
5055    let target_holder;
5056    let target = match force_type {
5057        Some(t) => t,
5058        None => {
5059            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5060            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5061            &target_holder
5062        }
5063    };
5064
5065    // Try to cast all expressions to `target`.
5066    let mut out = Vec::new();
5067    for expr in exprs {
5068        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5069        let ccx = match force_type {
5070            None => CastContext::Implicit,
5071            Some(_) => CastContext::Explicit,
5072        };
5073        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5074            Ok(expr) => out.push(expr),
5075            Err(_) => sql_bail!(
5076                "{} could not convert type {} to {}",
5077                ecx.name,
5078                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5079                ecx.humanize_scalar_type(target, false),
5080            ),
5081        }
5082    }
5083    Ok(out)
5084}
5085
5086/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5087/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5088pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5089    obe: &OrderByExpr<T>,
5090    column: usize,
5091) -> ColumnOrder {
5092    let desc = !obe.asc.unwrap_or(true);
5093    ColumnOrder {
5094        column,
5095        desc,
5096        // https://www.postgresql.org/docs/14/queries-order.html
5097        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5098        nulls_last: obe.nulls_last.unwrap_or(!desc),
5099    }
5100}
5101
5102/// Plans the ORDER BY clause of a window function.
5103///
5104/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5105/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5106/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5107/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5108/// `Vec<HirScalarExpr>` that we return.
5109fn plan_function_order_by(
5110    ecx: &ExprContext,
5111    order_by: &[OrderByExpr<Aug>],
5112) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5113    let mut order_by_exprs = vec![];
5114    let mut col_orders = vec![];
5115    {
5116        for (i, obe) in order_by.iter().enumerate() {
5117            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5118            // do not support ordinal references in PostgreSQL. So we use
5119            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5120            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5121            order_by_exprs.push(expr);
5122            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5123        }
5124    }
5125    Ok((order_by_exprs, col_orders))
5126}
5127
5128/// Common part of the planning of windowed and non-windowed aggregation functions.
5129fn plan_aggregate_common(
5130    ecx: &ExprContext,
5131    Function::<Aug> {
5132        name,
5133        args,
5134        filter,
5135        over: _,
5136        distinct,
5137    }: &Function<Aug>,
5138) -> Result<AggregateExpr, PlanError> {
5139    // Normal aggregate functions, like `sum`, expect as input a single expression
5140    // which yields the datum to aggregate. Order sensitive aggregate functions,
5141    // like `jsonb_agg`, are special, and instead expect a Record whose first
5142    // element yields the datum to aggregate and whose successive elements yield
5143    // keys to order by. This expectation is hard coded within the implementation
5144    // of each of the order-sensitive aggregates. The specification of how many
5145    // order by keys to consider, and in what order, is passed via the `order_by`
5146    // field on the `AggregateFunc` variant.
5147
5148    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5149    // most, so explicitly drop it if the function doesn't care about order. This
5150    // prevents the projection into Record below from triggering on unsupported
5151    // functions.
5152
5153    let impls = match resolve_func(ecx, name, args)? {
5154        Func::Aggregate(impls) => impls,
5155        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5156    };
5157
5158    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5159    // generalized function selection framework. The rule is simple: the user
5160    // must type `count(*)`, but the function selection framework sees an empty
5161    // parameter list, as if the user had typed `count()`. But if the user types
5162    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5163    // rules to all aggregates, not just `count`, since we may one day support
5164    // user-defined aggregates, including user-defined aggregates that take no
5165    // parameters.
5166    let (args, order_by) = match &args {
5167        FunctionArgs::Star => (vec![], vec![]),
5168        FunctionArgs::Args { args, order_by } => {
5169            if args.is_empty() {
5170                sql_bail!(
5171                    "{}(*) must be used to call a parameterless aggregate function",
5172                    ecx.qcx
5173                        .scx
5174                        .humanize_resolved_name(name)
5175                        .expect("name actually resolved")
5176                );
5177            }
5178            let args = plan_exprs(ecx, args)?;
5179            (args, order_by.clone())
5180        }
5181    };
5182
5183    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5184
5185    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5186    if let Some(filter) = &filter {
5187        // If a filter is present, as in
5188        //
5189        //     <agg>(<expr>) FILTER (WHERE <cond>)
5190        //
5191        // we plan it by essentially rewriting the expression to
5192        //
5193        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5194        //
5195        // where <identity> is the identity input for <agg>.
5196        let cond =
5197            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5198        let expr_typ = ecx.scalar_type(&expr);
5199        expr = HirScalarExpr::if_then_else(
5200            cond,
5201            expr,
5202            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5203        );
5204    }
5205
5206    let mut seen_outer = false;
5207    let mut seen_inner = false;
5208    #[allow(deprecated)]
5209    expr.visit_columns(0, &mut |depth, col| {
5210        if depth == 0 && col.level == 0 {
5211            seen_inner = true;
5212        } else if col.level > depth {
5213            seen_outer = true;
5214        }
5215    });
5216    if seen_outer && !seen_inner {
5217        bail_unsupported!(
5218            3720,
5219            "aggregate functions that refer exclusively to outer columns"
5220        );
5221    }
5222
5223    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5224    // map the needed expressions into the aggregate datum.
5225    if func.is_order_sensitive() {
5226        let field_names = iter::repeat(ColumnName::from(""))
5227            .take(1 + order_by_exprs.len())
5228            .collect();
5229        let mut exprs = vec![expr];
5230        exprs.extend(order_by_exprs);
5231        expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5232    }
5233
5234    Ok(AggregateExpr {
5235        func,
5236        expr: Box::new(expr),
5237        distinct: *distinct,
5238    })
5239}
5240
5241fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5242    let mut names = names.to_vec();
5243    let col_name = normalize::column_name(names.pop().unwrap());
5244
5245    // If the name is qualified, it must refer to a column in a table.
5246    if !names.is_empty() {
5247        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5248        let (i, i_name) = ecx.scope.resolve_table_column(
5249            &ecx.qcx.outer_scopes,
5250            &table_name,
5251            &col_name,
5252            &mut ecx.qcx.name_manager.borrow_mut(),
5253        )?;
5254        return Ok(HirScalarExpr::named_column(i, i_name));
5255    }
5256
5257    // If the name is unqualified, first check if it refers to a column. Track any similar names
5258    // that might exist for a better error message.
5259    let similar_names = match ecx.scope.resolve_column(
5260        &ecx.qcx.outer_scopes,
5261        &col_name,
5262        &mut ecx.qcx.name_manager.borrow_mut(),
5263    ) {
5264        Ok((i, i_name)) => {
5265            return Ok(HirScalarExpr::named_column(i, i_name));
5266        }
5267        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5268        Err(e) => return Err(e),
5269    };
5270
5271    // The name doesn't refer to a column. Check if it is a whole-row reference
5272    // to a table.
5273    let items = ecx.scope.items_from_table(
5274        &ecx.qcx.outer_scopes,
5275        &PartialItemName {
5276            database: None,
5277            schema: None,
5278            item: col_name.as_str().to_owned(),
5279        },
5280    )?;
5281    match items.as_slice() {
5282        // The name doesn't refer to a table either. Return an error.
5283        [] => Err(PlanError::UnknownColumn {
5284            table: None,
5285            column: col_name,
5286            similar: similar_names,
5287        }),
5288        // The name refers to a table that is the result of a function that
5289        // returned a single column. Per PostgreSQL, this is a special case
5290        // that returns the value directly.
5291        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5292        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5293            *column,
5294            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5295        )),
5296        // The name refers to a normal table. Return a record containing all the
5297        // columns of the table.
5298        _ => {
5299            let mut has_exists_column = None;
5300            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5301                .into_iter()
5302                .filter_map(|(column, item)| {
5303                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5304                        has_exists_column = Some(column);
5305                        None
5306                    } else {
5307                        let expr = HirScalarExpr::named_column(
5308                            column,
5309                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5310                        );
5311                        let name = item.column_name.clone();
5312                        Some((expr, name))
5313                    }
5314                })
5315                .unzip();
5316            // For the special case of a table function with a single column, the single column is instead not wrapped.
5317            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5318                exprs.into_element()
5319            } else {
5320                HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5321            };
5322            if let Some(has_exists_column) = has_exists_column {
5323                Ok(HirScalarExpr::if_then_else(
5324                    HirScalarExpr::unnamed_column(has_exists_column)
5325                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5326                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5327                    expr,
5328                ))
5329            } else {
5330                Ok(expr)
5331            }
5332        }
5333    }
5334}
5335
5336fn plan_op(
5337    ecx: &ExprContext,
5338    op: &str,
5339    expr1: &Expr<Aug>,
5340    expr2: Option<&Expr<Aug>>,
5341) -> Result<HirScalarExpr, PlanError> {
5342    let impls = func::resolve_op(op)?;
5343    let args = match expr2 {
5344        None => plan_exprs(ecx, &[expr1])?,
5345        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5346    };
5347    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5348}
5349
5350fn plan_function<'a>(
5351    ecx: &ExprContext,
5352    f @ Function {
5353        name,
5354        args,
5355        filter,
5356        over,
5357        distinct,
5358    }: &'a Function<Aug>,
5359) -> Result<HirScalarExpr, PlanError> {
5360    let impls = match resolve_func(ecx, name, args)? {
5361        Func::Table(_) => {
5362            sql_bail!(
5363                "table functions are not allowed in {} (function {})",
5364                ecx.name,
5365                name
5366            );
5367        }
5368        Func::Scalar(impls) => {
5369            if over.is_some() {
5370                sql_bail!(
5371                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5372                );
5373            }
5374            impls
5375        }
5376        Func::ScalarWindow(impls) => {
5377            let (
5378                ignore_nulls,
5379                order_by_exprs,
5380                col_orders,
5381                _window_frame,
5382                partition_by,
5383                scalar_args,
5384            ) = plan_window_function_non_aggr(ecx, f)?;
5385
5386            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5387            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5388            // msg there is less informative.)
5389            if !scalar_args.is_empty() {
5390                if let ResolvedItemName::Item {
5391                    full_name: FullItemName { item, .. },
5392                    ..
5393                } = name
5394                {
5395                    sql_bail!(
5396                        "function {} has 0 parameters, but was called with {}",
5397                        item,
5398                        scalar_args.len()
5399                    );
5400                }
5401            }
5402
5403            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5404            // accept a window frame here without an error msg. (Postgres also does this.)
5405            // TODO: maybe we should give a notice
5406
5407            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5408
5409            if ignore_nulls {
5410                // If we ever add a scalar window function that supports ignore, then don't forget
5411                // to also update HIR EXPLAIN.
5412                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5413            }
5414
5415            return Ok(HirScalarExpr::windowing(WindowExpr {
5416                func: WindowExprType::Scalar(ScalarWindowExpr {
5417                    func,
5418                    order_by: col_orders,
5419                }),
5420                partition_by,
5421                order_by: order_by_exprs,
5422            }));
5423        }
5424        Func::ValueWindow(impls) => {
5425            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5426                plan_window_function_non_aggr(ecx, f)?;
5427
5428            let (args_encoded, func) =
5429                func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5430
5431            if ignore_nulls {
5432                match func {
5433                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5434                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5435                }
5436            }
5437
5438            return Ok(HirScalarExpr::windowing(WindowExpr {
5439                func: WindowExprType::Value(ValueWindowExpr {
5440                    func,
5441                    args: Box::new(args_encoded),
5442                    order_by: col_orders,
5443                    window_frame,
5444                    ignore_nulls, // (RESPECT NULLS is the default)
5445                }),
5446                partition_by,
5447                order_by: order_by_exprs,
5448            }));
5449        }
5450        Func::Aggregate(_) => {
5451            if f.over.is_none() {
5452                // Not a window aggregate. Something is wrong.
5453                if ecx.allow_aggregates {
5454                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5455                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5456                    sql_bail!(
5457                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5458                        name,
5459                    );
5460                } else {
5461                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5462                    // because it was in an unsupported context.
5463                    sql_bail!(
5464                        "aggregate functions are not allowed in {} (function {})",
5465                        ecx.name,
5466                        name
5467                    );
5468                }
5469            } else {
5470                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5471                    plan_window_function_common(ecx, &f.name, &f.over)?;
5472
5473                // https://github.com/MaterializeInc/database-issues/issues/6720
5474                match (&window_frame.start_bound, &window_frame.end_bound) {
5475                    (
5476                        mz_expr::WindowFrameBound::UnboundedPreceding,
5477                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5478                    )
5479                    | (
5480                        mz_expr::WindowFrameBound::UnboundedPreceding,
5481                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5482                    )
5483                    | (
5484                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5485                        mz_expr::WindowFrameBound::UnboundedFollowing,
5486                    )
5487                    | (
5488                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5489                        mz_expr::WindowFrameBound::UnboundedFollowing,
5490                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5491                    (_, _) => {} // other cases are ok
5492                }
5493
5494                if ignore_nulls {
5495                    // https://github.com/MaterializeInc/database-issues/issues/6722
5496                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5497                    // forget to also update HIR EXPLAIN.
5498                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5499                }
5500
5501                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5502
5503                if aggregate_expr.distinct {
5504                    // https://github.com/MaterializeInc/database-issues/issues/6626
5505                    bail_unsupported!("DISTINCT in window aggregates");
5506                }
5507
5508                return Ok(HirScalarExpr::windowing(WindowExpr {
5509                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5510                        aggregate_expr,
5511                        order_by: col_orders,
5512                        window_frame,
5513                    }),
5514                    partition_by,
5515                    order_by: order_by_exprs,
5516                }));
5517            }
5518        }
5519    };
5520
5521    if over.is_some() {
5522        unreachable!("If there is an OVER clause, we should have returned already above.");
5523    }
5524
5525    if *distinct {
5526        sql_bail!(
5527            "DISTINCT specified, but {} is not an aggregate function",
5528            ecx.qcx
5529                .scx
5530                .humanize_resolved_name(name)
5531                .expect("already resolved")
5532        );
5533    }
5534    if filter.is_some() {
5535        sql_bail!(
5536            "FILTER specified, but {} is not an aggregate function",
5537            ecx.qcx
5538                .scx
5539                .humanize_resolved_name(name)
5540                .expect("already resolved")
5541        );
5542    }
5543
5544    let scalar_args = match &args {
5545        FunctionArgs::Star => {
5546            sql_bail!(
5547                "* argument is invalid with non-aggregate function {}",
5548                ecx.qcx
5549                    .scx
5550                    .humanize_resolved_name(name)
5551                    .expect("already resolved")
5552            )
5553        }
5554        FunctionArgs::Args { args, order_by } => {
5555            if !order_by.is_empty() {
5556                sql_bail!(
5557                    "ORDER BY specified, but {} is not an aggregate function",
5558                    ecx.qcx
5559                        .scx
5560                        .humanize_resolved_name(name)
5561                        .expect("already resolved")
5562                );
5563            }
5564            plan_exprs(ecx, args)?
5565        }
5566    };
5567
5568    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5569}
5570
5571pub const IGNORE_NULLS_ERROR_MSG: &str =
5572    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5573
5574/// Resolves the name to a set of function implementations.
5575///
5576/// If the name does not specify a known built-in function, returns an error.
5577pub fn resolve_func(
5578    ecx: &ExprContext,
5579    name: &ResolvedItemName,
5580    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5581) -> Result<&'static Func, PlanError> {
5582    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5583        if let Ok(f) = i.func() {
5584            return Ok(f);
5585        }
5586    }
5587
5588    // Couldn't resolve function with this name, so generate verbose error
5589    // message.
5590    let cexprs = match args {
5591        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5592        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5593            if !order_by.is_empty() {
5594                sql_bail!(
5595                    "ORDER BY specified, but {} is not an aggregate function",
5596                    name
5597                );
5598            }
5599            plan_exprs(ecx, args)?
5600        }
5601    };
5602
5603    let arg_types: Vec<_> = cexprs
5604        .into_iter()
5605        .map(|ty| match ecx.scalar_type(&ty) {
5606            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5607            CoercibleScalarType::Record(_) => "record".to_string(),
5608            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5609        })
5610        .collect();
5611
5612    Err(PlanError::UnknownFunction {
5613        name: name.to_string(),
5614        arg_types,
5615    })
5616}
5617
5618fn plan_is_expr<'a>(
5619    ecx: &ExprContext,
5620    expr: &'a Expr<Aug>,
5621    construct: &IsExprConstruct<Aug>,
5622    not: bool,
5623) -> Result<HirScalarExpr, PlanError> {
5624    let expr = plan_expr(ecx, expr)?;
5625    let mut expr = match construct {
5626        IsExprConstruct::Null => {
5627            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5628            // at odds with our type coercion rules, which treat `NULL` literals
5629            // and unconstrained parameters identically. Providing a type hint
5630            // of string means we wind up supporting both.
5631            let expr = expr.type_as_any(ecx)?;
5632            expr.call_is_null()
5633        }
5634        IsExprConstruct::Unknown => {
5635            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5636            expr.call_is_null()
5637        }
5638        IsExprConstruct::True => {
5639            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5640            expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5641        }
5642        IsExprConstruct::False => {
5643            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5644            expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5645        }
5646        IsExprConstruct::DistinctFrom(expr2) => {
5647            let expr1 = expr.type_as_any(ecx)?;
5648            let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5649            // There are three cases:
5650            // 1. Both terms are non-null, in which case the result should be `a != b`.
5651            // 2. Exactly one term is null, in which case the result should be true.
5652            // 3. Both terms are null, in which case the result should be false.
5653            //
5654            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5655            let term1 = HirScalarExpr::variadic_or(vec![
5656                expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5657                expr1.clone().call_is_null(),
5658                expr2.clone().call_is_null(),
5659            ]);
5660            let term2 = HirScalarExpr::variadic_or(vec![
5661                expr1.call_is_null().not(),
5662                expr2.call_is_null().not(),
5663            ]);
5664            term1.and(term2)
5665        }
5666    };
5667    if not {
5668        expr = expr.not();
5669    }
5670    Ok(expr)
5671}
5672
5673fn plan_case<'a>(
5674    ecx: &ExprContext,
5675    operand: &'a Option<Box<Expr<Aug>>>,
5676    conditions: &'a [Expr<Aug>],
5677    results: &'a [Expr<Aug>],
5678    else_result: &'a Option<Box<Expr<Aug>>>,
5679) -> Result<HirScalarExpr, PlanError> {
5680    let mut cond_exprs = Vec::new();
5681    let mut result_exprs = Vec::new();
5682    for (c, r) in conditions.iter().zip_eq(results) {
5683        let c = match operand {
5684            Some(operand) => operand.clone().equals(c.clone()),
5685            None => c.clone(),
5686        };
5687        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5688        cond_exprs.push(cexpr);
5689        result_exprs.push(r);
5690    }
5691    result_exprs.push(match else_result {
5692        Some(else_result) => else_result,
5693        None => &Expr::Value(Value::Null),
5694    });
5695    let mut result_exprs = coerce_homogeneous_exprs(
5696        &ecx.with_name("CASE"),
5697        plan_exprs(ecx, &result_exprs)?,
5698        None,
5699    )?;
5700    let mut expr = result_exprs.pop().unwrap();
5701    assert_eq!(cond_exprs.len(), result_exprs.len());
5702    for (cexpr, rexpr) in cond_exprs
5703        .into_iter()
5704        .rev()
5705        .zip_eq(result_exprs.into_iter().rev())
5706    {
5707        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5708    }
5709    Ok(expr)
5710}
5711
5712fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5713    let (datum, scalar_type) = match l {
5714        Value::Number(s) => {
5715            let d = strconv::parse_numeric(s.as_str())?;
5716            if !s.contains(&['E', '.'][..]) {
5717                // Maybe representable as an int?
5718                if let Ok(n) = d.0.try_into() {
5719                    (Datum::Int32(n), SqlScalarType::Int32)
5720                } else if let Ok(n) = d.0.try_into() {
5721                    (Datum::Int64(n), SqlScalarType::Int64)
5722                } else {
5723                    (
5724                        Datum::Numeric(d),
5725                        SqlScalarType::Numeric { max_scale: None },
5726                    )
5727                }
5728            } else {
5729                (
5730                    Datum::Numeric(d),
5731                    SqlScalarType::Numeric { max_scale: None },
5732                )
5733            }
5734        }
5735        Value::HexString(_) => bail_unsupported!("hex string literals"),
5736        Value::Boolean(b) => match b {
5737            false => (Datum::False, SqlScalarType::Bool),
5738            true => (Datum::True, SqlScalarType::Bool),
5739        },
5740        Value::Interval(i) => {
5741            let i = literal::plan_interval(i)?;
5742            (Datum::Interval(i), SqlScalarType::Interval)
5743        }
5744        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5745        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5746    };
5747    let expr = HirScalarExpr::literal(datum, scalar_type);
5748    Ok(expr.into())
5749}
5750
5751/// The common part of the planning of non-aggregate window functions, i.e.,
5752/// scalar window functions and value window functions.
5753fn plan_window_function_non_aggr<'a>(
5754    ecx: &ExprContext,
5755    Function {
5756        name,
5757        args,
5758        filter,
5759        over,
5760        distinct,
5761    }: &'a Function<Aug>,
5762) -> Result<
5763    (
5764        bool,
5765        Vec<HirScalarExpr>,
5766        Vec<ColumnOrder>,
5767        mz_expr::WindowFrame,
5768        Vec<HirScalarExpr>,
5769        Vec<CoercibleScalarExpr>,
5770    ),
5771    PlanError,
5772> {
5773    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5774        plan_window_function_common(ecx, name, over)?;
5775
5776    if *distinct {
5777        sql_bail!(
5778            "DISTINCT specified, but {} is not an aggregate function",
5779            name
5780        );
5781    }
5782
5783    if filter.is_some() {
5784        bail_unsupported!("FILTER in non-aggregate window functions");
5785    }
5786
5787    let scalar_args = match &args {
5788        FunctionArgs::Star => {
5789            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5790        }
5791        FunctionArgs::Args { args, order_by } => {
5792            if !order_by.is_empty() {
5793                sql_bail!(
5794                    "ORDER BY specified, but {} is not an aggregate function",
5795                    name
5796                );
5797            }
5798            plan_exprs(ecx, args)?
5799        }
5800    };
5801
5802    Ok((
5803        ignore_nulls,
5804        order_by_exprs,
5805        col_orders,
5806        window_frame,
5807        partition,
5808        scalar_args,
5809    ))
5810}
5811
5812/// The common part of the planning of all window functions.
5813fn plan_window_function_common(
5814    ecx: &ExprContext,
5815    name: &<Aug as AstInfo>::ItemName,
5816    over: &Option<WindowSpec<Aug>>,
5817) -> Result<
5818    (
5819        bool,
5820        Vec<HirScalarExpr>,
5821        Vec<ColumnOrder>,
5822        mz_expr::WindowFrame,
5823        Vec<HirScalarExpr>,
5824    ),
5825    PlanError,
5826> {
5827    if !ecx.allow_windows {
5828        sql_bail!(
5829            "window functions are not allowed in {} (function {})",
5830            ecx.name,
5831            name
5832        );
5833    }
5834
5835    let window_spec = match over.as_ref() {
5836        Some(over) => over,
5837        None => sql_bail!("window function {} requires an OVER clause", name),
5838    };
5839    if window_spec.ignore_nulls && window_spec.respect_nulls {
5840        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5841    }
5842    let window_frame = match window_spec.window_frame.as_ref() {
5843        Some(frame) => plan_window_frame(frame)?,
5844        None => mz_expr::WindowFrame::default(),
5845    };
5846    let mut partition = Vec::new();
5847    for expr in &window_spec.partition_by {
5848        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5849    }
5850
5851    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5852
5853    Ok((
5854        window_spec.ignore_nulls,
5855        order_by_exprs,
5856        col_orders,
5857        window_frame,
5858        partition,
5859    ))
5860}
5861
5862fn plan_window_frame(
5863    WindowFrame {
5864        units,
5865        start_bound,
5866        end_bound,
5867    }: &WindowFrame,
5868) -> Result<mz_expr::WindowFrame, PlanError> {
5869    use mz_expr::WindowFrameBound::*;
5870    let units = window_frame_unit_ast_to_expr(units)?;
5871    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5872    let end_bound = end_bound
5873        .as_ref()
5874        .map(window_frame_bound_ast_to_expr)
5875        .unwrap_or(CurrentRow);
5876
5877    // Validate bounds according to Postgres rules
5878    match (&start_bound, &end_bound) {
5879        // Start bound can't be UNBOUNDED FOLLOWING
5880        (UnboundedFollowing, _) => {
5881            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5882        }
5883        // End bound can't be UNBOUNDED PRECEDING
5884        (_, UnboundedPreceding) => {
5885            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5886        }
5887        // Start bound should come before end bound in the list of bound definitions
5888        (CurrentRow, OffsetPreceding(_)) => {
5889            sql_bail!("frame starting from current row cannot have preceding rows")
5890        }
5891        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5892            sql_bail!("frame starting from following row cannot have preceding rows")
5893        }
5894        // The above rules are adopted from Postgres.
5895        // The following rules are Materialize-specific.
5896        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5897            // Note that the only hard limit is that partition size + offset should fit in i64, so
5898            // in theory, we could support much larger offsets than this. But for our current
5899            // performance, even 1000000 is quite big.
5900            if *o1 > 1000000 || *o2 > 1000000 {
5901                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5902            }
5903        }
5904        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5905            if *o1 > 1000000 || *o2 > 1000000 {
5906                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5907            }
5908        }
5909        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5910            if *o1 > 1000000 || *o2 > 1000000 {
5911                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5912            }
5913        }
5914        (OffsetPreceding(o), CurrentRow) => {
5915            if *o > 1000000 {
5916                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5917            }
5918        }
5919        (CurrentRow, OffsetFollowing(o)) => {
5920            if *o > 1000000 {
5921                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5922            }
5923        }
5924        // Other bounds are valid
5925        (_, _) => (),
5926    }
5927
5928    // RANGE is only supported in the default frame
5929    // https://github.com/MaterializeInc/database-issues/issues/6585
5930    if units == mz_expr::WindowFrameUnits::Range
5931        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5932    {
5933        bail_unsupported!("RANGE in non-default window frames")
5934    }
5935
5936    let frame = mz_expr::WindowFrame {
5937        units,
5938        start_bound,
5939        end_bound,
5940    };
5941    Ok(frame)
5942}
5943
5944fn window_frame_unit_ast_to_expr(
5945    unit: &WindowFrameUnits,
5946) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5947    match unit {
5948        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5949        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5950        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5951    }
5952}
5953
5954fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5955    match bound {
5956        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5957        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5958        WindowFrameBound::Preceding(Some(offset)) => {
5959            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5960        }
5961        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5962        WindowFrameBound::Following(Some(offset)) => {
5963            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5964        }
5965    }
5966}
5967
5968pub fn scalar_type_from_sql(
5969    scx: &StatementContext,
5970    data_type: &ResolvedDataType,
5971) -> Result<SqlScalarType, PlanError> {
5972    match data_type {
5973        ResolvedDataType::AnonymousList(elem_type) => {
5974            let elem_type = scalar_type_from_sql(scx, elem_type)?;
5975            if matches!(elem_type, SqlScalarType::Char { .. }) {
5976                bail_unsupported!("char list");
5977            }
5978            Ok(SqlScalarType::List {
5979                element_type: Box::new(elem_type),
5980                custom_id: None,
5981            })
5982        }
5983        ResolvedDataType::AnonymousMap {
5984            key_type,
5985            value_type,
5986        } => {
5987            match scalar_type_from_sql(scx, key_type)? {
5988                SqlScalarType::String => {}
5989                other => sql_bail!(
5990                    "map key type must be {}, got {}",
5991                    scx.humanize_scalar_type(&SqlScalarType::String, false),
5992                    scx.humanize_scalar_type(&other, false)
5993                ),
5994            }
5995            Ok(SqlScalarType::Map {
5996                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5997                custom_id: None,
5998            })
5999        }
6000        ResolvedDataType::Named { id, modifiers, .. } => {
6001            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6002        }
6003        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6004    }
6005}
6006
6007pub fn scalar_type_from_catalog(
6008    catalog: &dyn SessionCatalog,
6009    id: CatalogItemId,
6010    modifiers: &[i64],
6011) -> Result<SqlScalarType, PlanError> {
6012    let entry = catalog.get_item(&id);
6013    let type_details = match entry.type_details() {
6014        Some(type_details) => type_details,
6015        None => {
6016            // Resolution should never produce a `ResolvedDataType::Named` with
6017            // an ID of a non-type, but we error gracefully just in case.
6018            sql_bail!(
6019                "internal error: {} does not refer to a type",
6020                catalog.resolve_full_name(entry.name()).to_string().quoted()
6021            );
6022        }
6023    };
6024    match &type_details.typ {
6025        CatalogType::Numeric => {
6026            let mut modifiers = modifiers.iter().fuse();
6027            let precision = match modifiers.next() {
6028                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6029                    sql_bail!(
6030                        "precision for type numeric must be between 1 and {}",
6031                        NUMERIC_DATUM_MAX_PRECISION,
6032                    );
6033                }
6034                Some(p) => Some(*p),
6035                None => None,
6036            };
6037            let scale = match modifiers.next() {
6038                Some(scale) => {
6039                    if let Some(precision) = precision {
6040                        if *scale > precision {
6041                            sql_bail!(
6042                                "scale for type numeric must be between 0 and precision {}",
6043                                precision
6044                            );
6045                        }
6046                    }
6047                    Some(NumericMaxScale::try_from(*scale)?)
6048                }
6049                None => None,
6050            };
6051            if modifiers.next().is_some() {
6052                sql_bail!("type numeric supports at most two type modifiers");
6053            }
6054            Ok(SqlScalarType::Numeric { max_scale: scale })
6055        }
6056        CatalogType::Char => {
6057            let mut modifiers = modifiers.iter().fuse();
6058            let length = match modifiers.next() {
6059                Some(l) => Some(CharLength::try_from(*l)?),
6060                None => Some(CharLength::ONE),
6061            };
6062            if modifiers.next().is_some() {
6063                sql_bail!("type character supports at most one type modifier");
6064            }
6065            Ok(SqlScalarType::Char { length })
6066        }
6067        CatalogType::VarChar => {
6068            let mut modifiers = modifiers.iter().fuse();
6069            let length = match modifiers.next() {
6070                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6071                None => None,
6072            };
6073            if modifiers.next().is_some() {
6074                sql_bail!("type character varying supports at most one type modifier");
6075            }
6076            Ok(SqlScalarType::VarChar { max_length: length })
6077        }
6078        CatalogType::Timestamp => {
6079            let mut modifiers = modifiers.iter().fuse();
6080            let precision = match modifiers.next() {
6081                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6082                None => None,
6083            };
6084            if modifiers.next().is_some() {
6085                sql_bail!("type timestamp supports at most one type modifier");
6086            }
6087            Ok(SqlScalarType::Timestamp { precision })
6088        }
6089        CatalogType::TimestampTz => {
6090            let mut modifiers = modifiers.iter().fuse();
6091            let precision = match modifiers.next() {
6092                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6093                None => None,
6094            };
6095            if modifiers.next().is_some() {
6096                sql_bail!("type timestamp with time zone supports at most one type modifier");
6097            }
6098            Ok(SqlScalarType::TimestampTz { precision })
6099        }
6100        t => {
6101            if !modifiers.is_empty() {
6102                sql_bail!(
6103                    "{} does not support type modifiers",
6104                    catalog.resolve_full_name(entry.name()).to_string()
6105                );
6106            }
6107            match t {
6108                CatalogType::Array {
6109                    element_reference: element_id,
6110                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6111                    catalog,
6112                    *element_id,
6113                    modifiers,
6114                )?))),
6115                CatalogType::List {
6116                    element_reference: element_id,
6117                    element_modifiers,
6118                } => Ok(SqlScalarType::List {
6119                    element_type: Box::new(scalar_type_from_catalog(
6120                        catalog,
6121                        *element_id,
6122                        element_modifiers,
6123                    )?),
6124                    custom_id: Some(id),
6125                }),
6126                CatalogType::Map {
6127                    key_reference: _,
6128                    key_modifiers: _,
6129                    value_reference: value_id,
6130                    value_modifiers,
6131                } => Ok(SqlScalarType::Map {
6132                    value_type: Box::new(scalar_type_from_catalog(
6133                        catalog,
6134                        *value_id,
6135                        value_modifiers,
6136                    )?),
6137                    custom_id: Some(id),
6138                }),
6139                CatalogType::Range {
6140                    element_reference: element_id,
6141                } => Ok(SqlScalarType::Range {
6142                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6143                }),
6144                CatalogType::Record { fields } => {
6145                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6146                        .iter()
6147                        .map(|f| {
6148                            let scalar_type = scalar_type_from_catalog(
6149                                catalog,
6150                                f.type_reference,
6151                                &f.type_modifiers,
6152                            )?;
6153                            Ok((
6154                                f.name.clone(),
6155                                SqlColumnType {
6156                                    scalar_type,
6157                                    nullable: true,
6158                                },
6159                            ))
6160                        })
6161                        .collect::<Result<Box<_>, PlanError>>()?;
6162                    Ok(SqlScalarType::Record {
6163                        fields: scalars,
6164                        custom_id: Some(id),
6165                    })
6166                }
6167                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6168                CatalogType::Bool => Ok(SqlScalarType::Bool),
6169                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6170                CatalogType::Date => Ok(SqlScalarType::Date),
6171                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6172                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6173                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6174                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6175                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6176                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6177                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6178                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6179                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6180                CatalogType::Interval => Ok(SqlScalarType::Interval),
6181                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6182                CatalogType::Oid => Ok(SqlScalarType::Oid),
6183                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6184                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6185                CatalogType::Pseudo => {
6186                    sql_bail!(
6187                        "cannot reference pseudo type {}",
6188                        catalog.resolve_full_name(entry.name()).to_string()
6189                    )
6190                }
6191                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6192                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6193                CatalogType::RegType => Ok(SqlScalarType::RegType),
6194                CatalogType::String => Ok(SqlScalarType::String),
6195                CatalogType::Time => Ok(SqlScalarType::Time),
6196                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6197                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6198                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6199                CatalogType::Numeric => unreachable!("handled above"),
6200                CatalogType::Char => unreachable!("handled above"),
6201                CatalogType::VarChar => unreachable!("handled above"),
6202                CatalogType::Timestamp => unreachable!("handled above"),
6203                CatalogType::TimestampTz => unreachable!("handled above"),
6204            }
6205        }
6206    }
6207}
6208
6209/// This is used to collect aggregates and table functions from within an `Expr`.
6210/// See the explanation of aggregate handling at the top of the file for more details.
6211struct AggregateTableFuncVisitor<'a> {
6212    scx: &'a StatementContext<'a>,
6213    aggs: Vec<Function<Aug>>,
6214    within_aggregate: bool,
6215    tables: BTreeMap<Function<Aug>, String>,
6216    table_disallowed_context: Vec<&'static str>,
6217    in_select_item: bool,
6218    id_gen: IdGen,
6219    err: Option<PlanError>,
6220}
6221
6222impl<'a> AggregateTableFuncVisitor<'a> {
6223    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6224        AggregateTableFuncVisitor {
6225            scx,
6226            aggs: Vec::new(),
6227            within_aggregate: false,
6228            tables: BTreeMap::new(),
6229            table_disallowed_context: Vec::new(),
6230            in_select_item: false,
6231            id_gen: Default::default(),
6232            err: None,
6233        }
6234    }
6235
6236    fn into_result(
6237        self,
6238    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6239        match self.err {
6240            Some(err) => Err(err),
6241            None => {
6242                // Dedup while preserving the order. We don't care what the order is, but it
6243                // has to be reproducible so that EXPLAIN PLAN tests work.
6244                let mut seen = BTreeSet::new();
6245                let aggs = self
6246                    .aggs
6247                    .into_iter()
6248                    .filter(move |agg| seen.insert(agg.clone()))
6249                    .collect();
6250                Ok((aggs, self.tables))
6251            }
6252        }
6253    }
6254}
6255
6256impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6257    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6258        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6259            Ok(i) => i,
6260            // Catching missing functions later in planning improves error messages.
6261            Err(_) => return,
6262        };
6263
6264        match item.func() {
6265            // We don't want to collect window aggregations, because these will be handled not by
6266            // plan_aggregate, but by plan_function.
6267            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6268                if self.within_aggregate {
6269                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6270                    return;
6271                }
6272                self.aggs.push(func.clone());
6273                let Function {
6274                    name: _,
6275                    args,
6276                    filter,
6277                    over: _,
6278                    distinct: _,
6279                } = func;
6280                if let Some(filter) = filter {
6281                    self.visit_expr_mut(filter);
6282                }
6283                let old_within_aggregate = self.within_aggregate;
6284                self.within_aggregate = true;
6285                self.table_disallowed_context
6286                    .push("aggregate function calls");
6287
6288                self.visit_function_args_mut(args);
6289
6290                self.within_aggregate = old_within_aggregate;
6291                self.table_disallowed_context.pop();
6292            }
6293            Ok(Func::Table { .. }) => {
6294                self.table_disallowed_context.push("other table functions");
6295                visit_mut::visit_function_mut(self, func);
6296                self.table_disallowed_context.pop();
6297            }
6298            _ => visit_mut::visit_function_mut(self, func),
6299        }
6300    }
6301
6302    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6303        // Don't go into subqueries.
6304    }
6305
6306    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6307        let (disallowed_context, func) = match expr {
6308            Expr::Case { .. } => (Some("CASE"), None),
6309            Expr::HomogenizingFunction {
6310                function: HomogenizingFunction::Coalesce,
6311                ..
6312            } => (Some("COALESCE"), None),
6313            Expr::Function(func) if self.in_select_item => {
6314                // If we're in a SELECT list, replace table functions with a uuid identifier
6315                // and save the table func so it can be planned elsewhere.
6316                let mut table_func = None;
6317                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6318                    if let Ok(Func::Table { .. }) = item.func() {
6319                        if let Some(context) = self.table_disallowed_context.last() {
6320                            self.err = Some(sql_err!(
6321                                "table functions are not allowed in {} (function {})",
6322                                context,
6323                                func.name
6324                            ));
6325                            return;
6326                        }
6327                        table_func = Some(func.clone());
6328                    }
6329                }
6330                // Since we will descend into the table func below, don't add its own disallow
6331                // context here, instead use visit_function to set that.
6332                (None, table_func)
6333            }
6334            _ => (None, None),
6335        };
6336        if let Some(func) = func {
6337            // Since we are trading out expr, we need to visit the table func here.
6338            visit_mut::visit_expr_mut(self, expr);
6339            // Don't attempt to replace table functions with unsupported syntax.
6340            if let Function {
6341                name: _,
6342                args: _,
6343                filter: None,
6344                over: None,
6345                distinct: false,
6346            } = &func
6347            {
6348                // Identical table functions can be de-duplicated.
6349                let unique_id = self.id_gen.allocate_id();
6350                let id = self
6351                    .tables
6352                    .entry(func)
6353                    .or_insert_with(|| format!("table_func_{unique_id}"));
6354                // We know this is okay because id is is 11 characters + <=20 characters, which is
6355                // less than our max length.
6356                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6357            }
6358        }
6359        if let Some(context) = disallowed_context {
6360            self.table_disallowed_context.push(context);
6361        }
6362
6363        visit_mut::visit_expr_mut(self, expr);
6364
6365        if disallowed_context.is_some() {
6366            self.table_disallowed_context.pop();
6367        }
6368    }
6369
6370    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6371        let old = self.in_select_item;
6372        self.in_select_item = true;
6373        visit_mut::visit_select_item_mut(self, si);
6374        self.in_select_item = old;
6375    }
6376}
6377
6378#[derive(Default)]
6379struct WindowFuncCollector {
6380    window_funcs: Vec<Expr<Aug>>,
6381}
6382
6383impl WindowFuncCollector {
6384    fn into_result(self) -> Vec<Expr<Aug>> {
6385        // Dedup while preserving the order.
6386        let mut seen = BTreeSet::new();
6387        let window_funcs_dedupped = self
6388            .window_funcs
6389            .into_iter()
6390            .filter(move |expr| seen.insert(expr.clone()))
6391            // Reverse the order, so that in case of a nested window function call, the
6392            // inner one is evaluated first.
6393            .rev()
6394            .collect();
6395        window_funcs_dedupped
6396    }
6397}
6398
6399impl Visit<'_, Aug> for WindowFuncCollector {
6400    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6401        match expr {
6402            Expr::Function(func) => {
6403                if func.over.is_some() {
6404                    self.window_funcs.push(expr.clone());
6405                }
6406            }
6407            _ => (),
6408        }
6409        visit::visit_expr(self, expr);
6410    }
6411
6412    fn visit_query(&mut self, _query: &Query<Aug>) {
6413        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6414    }
6415}
6416
6417/// Specifies how long a query will live.
6418#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6419pub enum QueryLifetime {
6420    /// The query's (or the expression's) result will be computed at one point in time.
6421    OneShot,
6422    /// The query (or expression) is used in a dataflow that maintains an index.
6423    Index,
6424    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6425    MaterializedView,
6426    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6427    Subscribe,
6428    /// The query (or expression) is part of a (non-materialized) view.
6429    View,
6430    /// The expression is part of a source definition.
6431    Source,
6432}
6433
6434impl QueryLifetime {
6435    /// (This used to impact whether the query is allowed to reason about the time at which it is
6436    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6437    /// mechanism, see `ExprPrepStyle`.)
6438    pub fn is_one_shot(&self) -> bool {
6439        let result = match self {
6440            QueryLifetime::OneShot => true,
6441            QueryLifetime::Index => false,
6442            QueryLifetime::MaterializedView => false,
6443            QueryLifetime::Subscribe => false,
6444            QueryLifetime::View => false,
6445            QueryLifetime::Source => false,
6446        };
6447        assert_eq!(!result, self.is_maintained());
6448        result
6449    }
6450
6451    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6452    /// turned into a `TopK`.
6453    pub fn is_maintained(&self) -> bool {
6454        match self {
6455            QueryLifetime::OneShot => false,
6456            QueryLifetime::Index => true,
6457            QueryLifetime::MaterializedView => true,
6458            QueryLifetime::Subscribe => true,
6459            QueryLifetime::View => true,
6460            QueryLifetime::Source => true,
6461        }
6462    }
6463
6464    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6465    pub fn allow_show(&self) -> bool {
6466        match self {
6467            QueryLifetime::OneShot => true,
6468            QueryLifetime::Index => false,
6469            QueryLifetime::MaterializedView => false,
6470            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6471            QueryLifetime::View => false,
6472            QueryLifetime::Source => false,
6473        }
6474    }
6475}
6476
6477/// Description of a CTE sufficient for query planning.
6478#[derive(Debug, Clone)]
6479pub struct CteDesc {
6480    pub name: String,
6481    pub desc: RelationDesc,
6482}
6483
6484/// The state required when planning a `Query`.
6485#[derive(Debug, Clone)]
6486pub struct QueryContext<'a> {
6487    /// The context for the containing `Statement`.
6488    pub scx: &'a StatementContext<'a>,
6489    /// The lifetime that the planned query will have.
6490    pub lifetime: QueryLifetime,
6491    /// The scopes of the outer relation expression.
6492    pub outer_scopes: Vec<Scope>,
6493    /// The type of the outer relation expressions.
6494    pub outer_relation_types: Vec<SqlRelationType>,
6495    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6496    pub ctes: BTreeMap<LocalId, CteDesc>,
6497    /// A name manager, for interning column names that will be stored in HIR and MIR.
6498    pub name_manager: Rc<RefCell<NameManager>>,
6499    pub recursion_guard: RecursionGuard,
6500}
6501
6502impl CheckedRecursion for QueryContext<'_> {
6503    fn recursion_guard(&self) -> &RecursionGuard {
6504        &self.recursion_guard
6505    }
6506}
6507
6508impl<'a> QueryContext<'a> {
6509    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6510        QueryContext {
6511            scx,
6512            lifetime,
6513            outer_scopes: vec![],
6514            outer_relation_types: vec![],
6515            ctes: BTreeMap::new(),
6516            name_manager: Rc::new(RefCell::new(NameManager::new())),
6517            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6518        }
6519    }
6520
6521    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6522        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6523    }
6524
6525    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6526    /// `self`.
6527    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6528        let ctes = self.ctes.clone();
6529        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6530        let outer_relation_types = iter::once(relation_type)
6531            .chain(self.outer_relation_types.clone())
6532            .collect();
6533        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6534        let name_manager = Rc::clone(&self.name_manager);
6535
6536        QueryContext {
6537            scx: self.scx,
6538            lifetime: self.lifetime,
6539            outer_scopes,
6540            outer_relation_types,
6541            ctes,
6542            name_manager,
6543            recursion_guard: self.recursion_guard.clone(),
6544        }
6545    }
6546
6547    /// Derives a `QueryContext` for a scope that contains no columns.
6548    fn empty_derived_context(&self) -> QueryContext<'a> {
6549        let scope = Scope::empty();
6550        let ty = SqlRelationType::empty();
6551        self.derived_context(scope, ty)
6552    }
6553
6554    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6555    /// CTE.
6556    pub fn resolve_table_name(
6557        &self,
6558        object: ResolvedItemName,
6559    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6560        match object {
6561            ResolvedItemName::Item {
6562                id,
6563                full_name,
6564                version,
6565                ..
6566            } => {
6567                let name = full_name.into();
6568                let item = self.scx.get_item(&id).at_version(version);
6569                let desc = item
6570                    .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6571                    .clone();
6572                let expr = HirRelationExpr::Get {
6573                    id: Id::Global(item.global_id()),
6574                    typ: desc.typ().clone(),
6575                };
6576
6577                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6578
6579                Ok((expr, scope))
6580            }
6581            ResolvedItemName::Cte { id, name } => {
6582                let name = name.into();
6583                let cte = self.ctes.get(&id).unwrap();
6584                let expr = HirRelationExpr::Get {
6585                    id: Id::Local(id),
6586                    typ: cte.desc.typ().clone(),
6587                };
6588
6589                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6590
6591                Ok((expr, scope))
6592            }
6593            ResolvedItemName::ContinualTask { id, name } => {
6594                let cte = self.ctes.get(&id).unwrap();
6595                let expr = HirRelationExpr::Get {
6596                    id: Id::Local(id),
6597                    typ: cte.desc.typ().clone(),
6598                };
6599
6600                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6601
6602                Ok((expr, scope))
6603            }
6604            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6605        }
6606    }
6607
6608    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6609    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6610    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6611        self.scx.humanize_scalar_type(typ, postgres_compat)
6612    }
6613}
6614
6615/// A bundle of unrelated things that we need for planning `Expr`s.
6616#[derive(Debug, Clone)]
6617pub struct ExprContext<'a> {
6618    pub qcx: &'a QueryContext<'a>,
6619    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6620    pub name: &'a str,
6621    /// The context for the `Query` that contains this `Expr`.
6622    /// The current scope.
6623    pub scope: &'a Scope,
6624    /// The type of the current relation expression upon which this scalar
6625    /// expression will be evaluated.
6626    pub relation_type: &'a SqlRelationType,
6627    /// Are aggregate functions allowed in this context
6628    pub allow_aggregates: bool,
6629    /// Are subqueries allowed in this context
6630    pub allow_subqueries: bool,
6631    /// Are parameters allowed in this context.
6632    pub allow_parameters: bool,
6633    /// Are window functions allowed in this context
6634    pub allow_windows: bool,
6635}
6636
6637impl CheckedRecursion for ExprContext<'_> {
6638    fn recursion_guard(&self) -> &RecursionGuard {
6639        &self.qcx.recursion_guard
6640    }
6641}
6642
6643impl<'a> ExprContext<'a> {
6644    pub fn catalog(&self) -> &dyn SessionCatalog {
6645        self.qcx.scx.catalog
6646    }
6647
6648    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6649        let mut ecx = self.clone();
6650        ecx.name = name;
6651        ecx
6652    }
6653
6654    pub fn column_type<E>(&self, expr: &E) -> E::Type
6655    where
6656        E: AbstractExpr,
6657    {
6658        expr.typ(
6659            &self.qcx.outer_relation_types,
6660            self.relation_type,
6661            &self.qcx.scx.param_types.borrow(),
6662        )
6663    }
6664
6665    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6666    where
6667        E: AbstractExpr,
6668    {
6669        self.column_type(expr).scalar_type()
6670    }
6671
6672    fn derived_query_context(&self) -> QueryContext<'_> {
6673        let mut scope = self.scope.clone();
6674        scope.lateral_barrier = true;
6675        self.qcx.derived_context(scope, self.relation_type.clone())
6676    }
6677
6678    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6679        self.qcx.scx.require_feature_flag(flag)
6680    }
6681
6682    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6683        &self.qcx.scx.param_types
6684    }
6685
6686    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6687    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6688    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6689        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6690    }
6691
6692    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6693        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6694    }
6695}
6696
6697/// Manages column names, doing lightweight string internment.
6698///
6699/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6700/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6701/// to ensure maximal sharing.
6702#[derive(Debug, Clone)]
6703pub struct NameManager(BTreeSet<Arc<str>>);
6704
6705impl NameManager {
6706    /// Creates a new `NameManager`, with no interned names
6707    pub fn new() -> Self {
6708        Self(BTreeSet::new())
6709    }
6710
6711    /// Interns a string, returning a reference-counted pointer to the interned
6712    /// string.
6713    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6714        let s = s.as_ref();
6715        if let Some(interned) = self.0.get(s) {
6716            Arc::clone(interned)
6717        } else {
6718            let interned: Arc<str> = Arc::from(s);
6719            self.0.insert(Arc::clone(&interned));
6720            interned
6721        }
6722    }
6723
6724    /// Interns a string representing a reference to a `ScopeItem`, returning a
6725    /// reference-counted pointer to the interned string.
6726    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6727        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6728        //
6729        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6730        // name information. Note that as of 2025-04-09, we don't support column
6731        // renames.
6732        //
6733        // A few bad alternatives:
6734        //
6735        // (1) Store it but don't write it down. This fails because the expression
6736        //     cache will erase our names on restart.
6737        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6738        //     get the right names. But the world now and the world when we made
6739        //     those objects may be different.
6740        // (3) Just don't write down the table name. Nothing fails... for now.
6741
6742        self.intern(item.column_name.as_str())
6743    }
6744}
6745
6746#[cfg(test)]
6747mod test {
6748    use super::*;
6749
6750    /// Ensure that `NameManager`'s string interning works as expected.
6751    ///
6752    /// In particular, structurally but not referentially identical strings should
6753    /// be interned to the same `Arc`ed pointer.
6754    #[mz_ore::test]
6755    pub fn test_name_manager_string_interning() {
6756        let mut nm = NameManager::new();
6757
6758        let orig_hi = "hi";
6759        let hi = nm.intern(orig_hi);
6760        let hello = nm.intern("hello");
6761
6762        assert_ne!(hi.as_ptr(), hello.as_ptr());
6763
6764        // this static string is _likely_ the same as `orig_hi``
6765        let hi2 = nm.intern("hi");
6766        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6767
6768        // generate a "hi" string that doesn't get optimized to the same static string
6769        let s = format!(
6770            "{}{}",
6771            hi.chars().nth(0).unwrap(),
6772            hi2.chars().nth(1).unwrap()
6773        );
6774        // make sure that we're testing with a fresh string!
6775        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6776
6777        let hi3 = nm.intern(s);
6778        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6779    }
6780}