Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50    ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51    MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
56    func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::{
70    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
71    ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
72    UNKNOWN_COLUMN_NAME, strconv,
73};
74use mz_sql_parser::ast::display::AstDisplay;
75use mz_sql_parser::ast::visit::Visit;
76use mz_sql_parser::ast::visit_mut::{self, VisitMut};
77use mz_sql_parser::ast::{
78    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
79    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
80    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
81    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
82    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
83    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
84    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
85    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
86};
87use mz_sql_parser::ident;
88
89use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
90use crate::func::{self, Func, FuncSpec, TableFuncImpl};
91use crate::names::{
92    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
93};
94use crate::plan::PlanError::InvalidWmrRecursionLimit;
95use crate::plan::error::PlanError;
96use crate::plan::hir::{
97    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
98    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
99    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
100    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
101};
102use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
103use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
104use crate::plan::statement::{StatementContext, StatementDesc, show};
105use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
106use crate::plan::{
107    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
108    literal, transform_ast,
109};
110use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
111use crate::session::vars::{self, FeatureFlag};
112use crate::{ORDINALITY_COL_NAME, normalize};
113
114#[derive(Debug)]
115pub struct PlannedRootQuery<E> {
116    pub expr: E,
117    pub desc: RelationDesc,
118    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
119    pub scope: Scope,
120}
121
122/// Plans a top-level query, returning the `HirRelationExpr` describing the query
123/// plan, the `RelationDesc` describing the shape of the result set, a
124/// `RowSetFinishing` describing post-processing that must occur before results
125/// are sent to the client, and the types of the parameters in the query, if any
126/// were present.
127///
128/// Note that the returned `RelationDesc` describes the expression after
129/// applying the returned `RowSetFinishing`.
130#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
131pub fn plan_root_query(
132    scx: &StatementContext,
133    mut query: Query<Aug>,
134    lifetime: QueryLifetime,
135) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
136    transform_ast::transform(scx, &mut query)?;
137    let mut qcx = QueryContext::root(scx, lifetime);
138    let PlannedQuery {
139        mut expr,
140        scope,
141        order_by,
142        limit,
143        offset,
144        project,
145        group_size_hints,
146    } = plan_query(&mut qcx, &query)?;
147
148    let mut finishing = RowSetFinishing {
149        limit,
150        offset,
151        project,
152        order_by,
153    };
154
155    // Attempt to push the finishing's ordering past its projection. This allows
156    // data to be projected down on the workers rather than the coordinator. It
157    // also improves the optimizer's demand analysis, as the optimizer can only
158    // reason about demand information in `expr` (i.e., it can't see
159    // `finishing.project`).
160    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
161
162    if lifetime.is_maintained() {
163        expr.finish_maintained(&mut finishing, group_size_hints);
164    }
165
166    let typ = qcx.relation_type(&expr);
167    let typ = SqlRelationType::new(
168        finishing
169            .project
170            .iter()
171            .map(|i| typ.column_types[*i].clone())
172            .collect(),
173    );
174    let desc = RelationDesc::new(typ, scope.column_names());
175
176    Ok(PlannedRootQuery {
177        expr,
178        desc,
179        finishing,
180        scope,
181    })
182}
183
184/// TODO(ct2): Dedup this with [plan_root_query].
185#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
186pub fn plan_ct_query(
187    qcx: &mut QueryContext,
188    mut query: Query<Aug>,
189) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
190    transform_ast::transform(qcx.scx, &mut query)?;
191    let PlannedQuery {
192        mut expr,
193        scope,
194        order_by,
195        limit,
196        offset,
197        project,
198        group_size_hints,
199    } = plan_query(qcx, &query)?;
200
201    let mut finishing = RowSetFinishing {
202        limit,
203        offset,
204        project,
205        order_by,
206    };
207
208    // Attempt to push the finishing's ordering past its projection. This allows
209    // data to be projected down on the workers rather than the coordinator. It
210    // also improves the optimizer's demand analysis, as the optimizer can only
211    // reason about demand information in `expr` (i.e., it can't see
212    // `finishing.project`).
213    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
214
215    expr.finish_maintained(&mut finishing, group_size_hints);
216
217    let typ = qcx.relation_type(&expr);
218    let typ = SqlRelationType::new(
219        finishing
220            .project
221            .iter()
222            .map(|i| typ.column_types[*i].clone())
223            .collect(),
224    );
225    let desc = RelationDesc::new(typ, scope.column_names());
226
227    Ok(PlannedRootQuery {
228        expr,
229        desc,
230        finishing,
231        scope,
232    })
233}
234
235/// Attempts to push a projection through an order by.
236///
237/// The returned bool indicates whether the pushdown was successful or not.
238/// Successful pushdown requires that all the columns referenced in `order_by`
239/// are included in `project`.
240///
241/// When successful, `expr` is wrapped in a projection node, `order_by` is
242/// rewritten to account for the pushed-down projection, and `project` is
243/// replaced with the trivial projection. When unsuccessful, no changes are made
244/// to any of the inputs.
245fn try_push_projection_order_by(
246    expr: &mut HirRelationExpr,
247    project: &mut Vec<usize>,
248    order_by: &mut Vec<ColumnOrder>,
249) -> bool {
250    let mut unproject = vec![None; expr.arity()];
251    for (out_i, in_i) in project.iter().copied().enumerate() {
252        unproject[in_i] = Some(out_i);
253    }
254    if order_by
255        .iter()
256        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
257    {
258        let trivial_project = (0..project.len()).collect();
259        *expr = expr.take().project(mem::replace(project, trivial_project));
260        for ob in order_by {
261            ob.column = unproject[ob.column].unwrap();
262        }
263        true
264    } else {
265        false
266    }
267}
268
269pub fn plan_insert_query(
270    scx: &StatementContext,
271    table_name: ResolvedItemName,
272    columns: Vec<Ident>,
273    source: InsertSource<Aug>,
274    returning: Vec<SelectItem<Aug>>,
275) -> Result<
276    (
277        CatalogItemId,
278        HirRelationExpr,
279        PlannedRootQuery<Vec<HirScalarExpr>>,
280    ),
281    PlanError,
282> {
283    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
284    let table = scx.get_item_by_resolved_name(&table_name)?;
285
286    // Validate the target of the insert.
287    if table.item_type() != CatalogItemType::Table {
288        sql_bail!(
289            "cannot insert into {} '{}'",
290            table.item_type(),
291            table_name.full_name_str()
292        );
293    }
294    let desc = table.relation_desc().expect("table has desc");
295    let mut defaults = table
296        .writable_table_details()
297        .ok_or_else(|| {
298            sql_err!(
299                "cannot insert into non-writeable table '{}'",
300                table_name.full_name_str()
301            )
302        })?
303        .to_vec();
304
305    for default in &mut defaults {
306        transform_ast::transform(scx, default)?;
307    }
308
309    if table.id().is_system() {
310        sql_bail!(
311            "cannot insert into system table '{}'",
312            table_name.full_name_str()
313        );
314    }
315
316    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
317
318    // Validate target column order.
319    let mut source_types = Vec::with_capacity(columns.len());
320    let mut ordering = Vec::with_capacity(columns.len());
321
322    if columns.is_empty() {
323        // Columns in source query must be in order. Let's guess the full shape and truncate to the
324        // right size later after planning the source query
325        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
326        ordering.extend(0..desc.arity());
327    } else {
328        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
329            .iter()
330            .enumerate()
331            .map(|(idx, (name, typ))| (name, (idx, typ)))
332            .collect();
333
334        for c in &columns {
335            if let Some((idx, typ)) = column_by_name.get(c) {
336                ordering.push(*idx);
337                source_types.push(&typ.scalar_type);
338            } else {
339                sql_bail!(
340                    "column {} of relation {} does not exist",
341                    c.quoted(),
342                    table_name.full_name_str().quoted()
343                );
344            }
345        }
346        if let Some(dup) = columns.iter().duplicates().next() {
347            sql_bail!("column {} specified more than once", dup.quoted());
348        }
349    };
350
351    // Plan the source.
352    let expr = match source {
353        InsertSource::Query(mut query) => {
354            transform_ast::transform(scx, &mut query)?;
355
356            match query {
357                // Special-case simple VALUES clauses as PostgreSQL does.
358                Query {
359                    body: SetExpr::Values(Values(values)),
360                    ctes,
361                    order_by,
362                    limit: None,
363                    offset: None,
364                } if ctes.is_empty() && order_by.is_empty() => {
365                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
366                    plan_values_insert(&qcx, &names, &source_types, &values)?
367                }
368                _ => {
369                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
370                    expr
371                }
372            }
373        }
374        InsertSource::DefaultValues => {
375            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
376        }
377    };
378
379    let expr_arity = expr.arity();
380
381    // Validate that the arity of the source query is at most the size of declared columns or the
382    // size of the table if none are declared
383    let max_columns = if columns.is_empty() {
384        desc.arity()
385    } else {
386        columns.len()
387    };
388    if expr_arity > max_columns {
389        sql_bail!("INSERT has more expressions than target columns");
390    }
391    // But it should never have less than the declared columns (or zero)
392    if expr_arity < columns.len() {
393        sql_bail!("INSERT has more target columns than expressions");
394    }
395
396    // Trim now that we know for sure the correct arity of the source query
397    source_types.truncate(expr_arity);
398    ordering.truncate(expr_arity);
399
400    // Ensure the types of the source query match the types of the target table,
401    // installing assignment casts where necessary and possible.
402    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
403        sql_err!(
404            "column {} is of type {} but expression is of type {}",
405            desc.get_name(ordering[e.column]).quoted(),
406            qcx.humanize_sql_scalar_type(&e.target_type, false),
407            qcx.humanize_sql_scalar_type(&e.source_type, false),
408        )
409    })?;
410
411    // Fill in any omitted columns and rearrange into correct order
412    let mut map_exprs = vec![];
413    let mut project_key = Vec::with_capacity(desc.arity());
414
415    // Maps from table column index to position in the source query
416    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
417
418    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
419    for (col_idx, (col_typ, default)) in column_details {
420        if let Some(src_idx) = col_to_source.get(&col_idx) {
421            project_key.push(*src_idx);
422        } else {
423            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
424            project_key.push(expr_arity + map_exprs.len());
425            map_exprs.push(hir);
426        }
427    }
428
429    let returning = {
430        let (scope, typ) = if let ResolvedItemName::Item {
431            full_name,
432            version: _,
433            ..
434        } = table_name
435        {
436            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
437            let typ = desc.typ().clone();
438            (scope, typ)
439        } else {
440            (Scope::empty(), SqlRelationType::empty())
441        };
442        let ecx = &ExprContext {
443            qcx: &qcx,
444            name: "RETURNING clause",
445            scope: &scope,
446            relation_type: &typ,
447            allow_aggregates: false,
448            allow_subqueries: false,
449            allow_parameters: true,
450            allow_windows: false,
451        };
452        let table_func_names = BTreeMap::new();
453        let mut output_columns = vec![];
454        let mut new_exprs = vec![];
455        let mut new_type = SqlRelationType::empty();
456        for mut si in returning {
457            transform_ast::transform(scx, &mut si)?;
458            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
459                let expr = match &select_item {
460                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
461                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
462                };
463                output_columns.push(column_name);
464                let typ = ecx.column_type(&expr);
465                new_type.column_types.push(typ);
466                new_exprs.push(expr);
467            }
468        }
469        let desc = RelationDesc::new(new_type, output_columns);
470        let desc_arity = desc.arity();
471        PlannedRootQuery {
472            expr: new_exprs,
473            desc,
474            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
475            scope,
476        }
477    };
478
479    Ok((
480        table.id(),
481        expr.map(map_exprs).project(project_key),
482        returning,
483    ))
484}
485
486/// Determines the mapping between some external data and a Materialize relation.
487///
488/// Returns the following:
489/// * [`CatalogItemId`] for the destination table.
490/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
491/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
492///   since we now return a [`MapFilterProject`].
493/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
494///   destination table.
495///
496pub fn plan_copy_item(
497    scx: &StatementContext,
498    item_name: ResolvedItemName,
499    columns: Vec<Ident>,
500) -> Result<
501    (
502        CatalogItemId,
503        RelationDesc,
504        Vec<ColumnIndex>,
505        Option<MapFilterProject>,
506    ),
507    PlanError,
508> {
509    let item = scx.get_item_by_resolved_name(&item_name)?;
510    let fullname = scx.catalog.resolve_full_name(item.name());
511    let table_desc = match item.relation_desc() {
512        Some(desc) => desc.into_owned(),
513        None => {
514            return Err(PlanError::InvalidDependency {
515                name: fullname.to_string(),
516                item_type: item.item_type().to_string(),
517            });
518        }
519    };
520    let mut ordering = Vec::with_capacity(columns.len());
521
522    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
523    // should be simplified. The reason they are currently separate code paths is so we can roll
524    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
525
526    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
527    // FROM SOURCE ...`), then we generate an MFP.
528    //
529    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
530    // so it's not always guaranteed that our `item` is a table.
531    let mfp = if let Some(table_defaults) = item.writable_table_details() {
532        let mut table_defaults = table_defaults.to_vec();
533
534        for default in &mut table_defaults {
535            transform_ast::transform(scx, default)?;
536        }
537
538        // Fill in any omitted columns and rearrange into correct order
539        let source_column_names: Vec<_> = columns
540            .iter()
541            .cloned()
542            .map(normalize::column_name)
543            .collect();
544
545        let mut default_exprs = Vec::new();
546        let mut project_keys = Vec::with_capacity(table_desc.arity());
547
548        // For each column in the destination table, either project it from the source data, or provide
549        // an expression to fill in a default value.
550        let column_details = table_desc.iter().zip_eq(table_defaults);
551        for ((col_name, col_type), col_default) in column_details {
552            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
553            if let Some(src_idx) = maybe_src_idx {
554                project_keys.push(src_idx);
555            } else {
556                // If one a column from the table does not exist in the source data, then a default
557                // value will get appended to the end of the input Row from the source data.
558                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
559                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
560                project_keys.push(source_column_names.len() + default_exprs.len());
561                default_exprs.push(mir);
562            }
563        }
564
565        let mfp = MapFilterProject::new(source_column_names.len())
566            .map(default_exprs)
567            .project(project_keys);
568        Some(mfp)
569    } else {
570        None
571    };
572
573    // Create a mapping from input data to the table we're copying into.
574    let source_desc = if columns.is_empty() {
575        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
576        ordering.extend(indexes);
577
578        // The source data should be in the same order as the table.
579        table_desc
580    } else {
581        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
582        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
583            .iter_all()
584            .map(|(idx, name, typ)| (name, (*idx, typ)))
585            .collect();
586
587        let mut names = Vec::with_capacity(columns.len());
588        let mut source_types = Vec::with_capacity(columns.len());
589
590        for c in &columns {
591            if let Some((idx, typ)) = column_by_name.get(c) {
592                ordering.push(*idx);
593                source_types.push((*typ).clone());
594                names.push(c.clone());
595            } else {
596                sql_bail!(
597                    "column {} of relation {} does not exist",
598                    c.quoted(),
599                    item_name.full_name_str().quoted()
600                );
601            }
602        }
603        if let Some(dup) = columns.iter().duplicates().next() {
604            sql_bail!("column {} specified more than once", dup.quoted());
605        }
606
607        // The source data is a different shape than the destination table.
608        RelationDesc::new(SqlRelationType::new(source_types), names)
609    };
610
611    Ok((item.id(), source_desc, ordering, mfp))
612}
613
614/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
615///
616/// TODO(cf3): Merge this method with [`plan_copy_item`].
617pub fn plan_copy_from(
618    scx: &StatementContext,
619    table_name: ResolvedItemName,
620    columns: Vec<Ident>,
621) -> Result<
622    (
623        CatalogItemId,
624        RelationDesc,
625        Vec<ColumnIndex>,
626        Option<MapFilterProject>,
627    ),
628    PlanError,
629> {
630    let table = scx.get_item_by_resolved_name(&table_name)?;
631
632    // Validate the target of the insert.
633    if table.item_type() != CatalogItemType::Table {
634        sql_bail!(
635            "cannot insert into {} '{}'",
636            table.item_type(),
637            table_name.full_name_str()
638        );
639    }
640
641    let _ = table.writable_table_details().ok_or_else(|| {
642        sql_err!(
643            "cannot insert into non-writeable table '{}'",
644            table_name.full_name_str()
645        )
646    })?;
647
648    if table.id().is_system() {
649        sql_bail!(
650            "cannot insert into system table '{}'",
651            table_name.full_name_str()
652        );
653    }
654    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
655
656    Ok((id, desc, ordering, mfp))
657}
658
659/// Builds a plan that adds the default values for the missing columns and re-orders
660/// the datums in the given rows to match the order in the target table.
661pub fn plan_copy_from_rows(
662    pcx: &PlanContext,
663    catalog: &dyn SessionCatalog,
664    target_id: CatalogItemId,
665    target_name: String,
666    columns: Vec<ColumnIndex>,
667    rows: Vec<mz_repr::Row>,
668) -> Result<HirRelationExpr, PlanError> {
669    let scx = StatementContext::new(Some(pcx), catalog);
670
671    // Always copy at the latest version of the table.
672    let table = catalog
673        .try_get_item(&target_id)
674        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
675        .at_version(RelationVersionSelector::Latest);
676
677    let mut defaults = table
678        .writable_table_details()
679        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
680        .to_vec();
681
682    for default in &mut defaults {
683        transform_ast::transform(&scx, default)?;
684    }
685
686    let desc = table.relation_desc().expect("table has desc");
687    let column_types = columns
688        .iter()
689        .map(|x| desc.get_type(x).clone())
690        .map(|mut x| {
691            // Null constraint is enforced later, when inserting the row in the table.
692            // Without this, an assert is hit during lowering.
693            x.nullable = true;
694            x
695        })
696        .collect();
697    let typ = SqlRelationType::new(column_types);
698    let expr = HirRelationExpr::Constant {
699        rows,
700        typ: typ.clone(),
701    };
702
703    // Exit early with just the raw constant if we know that all columns are present
704    // and in the correct order. This lets us bypass expensive downstream optimizations
705    // more easily, as at every stage we know this expression is nothing more than
706    // a constant (as opposed to e.g. a constant with with an identity map and identity
707    // projection).
708    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
709    if columns == default {
710        return Ok(expr);
711    }
712
713    // Fill in any omitted columns and rearrange into correct order
714    let mut map_exprs = vec![];
715    let mut project_key = Vec::with_capacity(desc.arity());
716
717    // Maps from table column index to position in the source query
718    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
719
720    let column_details = desc.iter_all().zip_eq(defaults);
721    for ((col_idx, _col_name, col_typ), default) in column_details {
722        if let Some(src_idx) = col_to_source.get(&col_idx) {
723            project_key.push(*src_idx);
724        } else {
725            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
726            project_key.push(typ.arity() + map_exprs.len());
727            map_exprs.push(hir);
728        }
729    }
730
731    Ok(expr.map(map_exprs).project(project_key))
732}
733
734/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
735pub struct ReadThenWritePlan {
736    pub id: CatalogItemId,
737    /// Read portion of query.
738    ///
739    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
740    /// retractions.
741    pub selection: HirRelationExpr,
742    /// Map from column index to SET expression. Empty for DELETE statements.
743    pub assignments: BTreeMap<usize, HirScalarExpr>,
744    pub finishing: RowSetFinishing,
745}
746
747pub fn plan_delete_query(
748    scx: &StatementContext,
749    mut delete_stmt: DeleteStatement<Aug>,
750) -> Result<ReadThenWritePlan, PlanError> {
751    transform_ast::transform(scx, &mut delete_stmt)?;
752
753    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
754    plan_mutation_query_inner(
755        qcx,
756        delete_stmt.table_name,
757        delete_stmt.alias,
758        delete_stmt.using,
759        vec![],
760        delete_stmt.selection,
761    )
762}
763
764pub fn plan_update_query(
765    scx: &StatementContext,
766    mut update_stmt: UpdateStatement<Aug>,
767) -> Result<ReadThenWritePlan, PlanError> {
768    transform_ast::transform(scx, &mut update_stmt)?;
769
770    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
771
772    plan_mutation_query_inner(
773        qcx,
774        update_stmt.table_name,
775        update_stmt.alias,
776        vec![],
777        update_stmt.assignments,
778        update_stmt.selection,
779    )
780}
781
782pub fn plan_mutation_query_inner(
783    qcx: QueryContext,
784    table_name: ResolvedItemName,
785    alias: Option<TableAlias>,
786    using: Vec<TableWithJoins<Aug>>,
787    assignments: Vec<Assignment<Aug>>,
788    selection: Option<Expr<Aug>>,
789) -> Result<ReadThenWritePlan, PlanError> {
790    // Get ID and version of the relation desc.
791    let (id, version) = match table_name {
792        ResolvedItemName::Item { id, version, .. } => (id, version),
793        _ => sql_bail!("cannot mutate non-user table"),
794    };
795
796    // Perform checks on item with given ID.
797    let item = qcx.scx.get_item(&id).at_version(version);
798    if item.item_type() != CatalogItemType::Table {
799        sql_bail!(
800            "cannot mutate {} '{}'",
801            item.item_type(),
802            table_name.full_name_str()
803        );
804    }
805    let _ = item.writable_table_details().ok_or_else(|| {
806        sql_err!(
807            "cannot mutate non-writeable table '{}'",
808            table_name.full_name_str()
809        )
810    })?;
811    if id.is_system() {
812        sql_bail!(
813            "cannot mutate system table '{}'",
814            table_name.full_name_str()
815        );
816    }
817
818    // Derive structs for operation from validated table
819    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
820    let scope = plan_table_alias(scope, alias.as_ref())?;
821    let desc = item.relation_desc().expect("table has desc");
822    let relation_type = qcx.relation_type(&get);
823
824    if using.is_empty() {
825        if let Some(expr) = selection {
826            let ecx = &ExprContext {
827                qcx: &qcx,
828                name: "WHERE clause",
829                scope: &scope,
830                relation_type: &relation_type,
831                allow_aggregates: false,
832                allow_subqueries: true,
833                allow_parameters: true,
834                allow_windows: false,
835            };
836            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
837            get = get.filter(vec![expr]);
838        }
839    } else {
840        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
841    }
842
843    let mut sets = BTreeMap::new();
844    for Assignment { id, value } in assignments {
845        // Get the index and type of the column.
846        let name = normalize::column_name(id);
847        match desc.get_by_name(&name) {
848            Some((idx, typ)) => {
849                let ecx = &ExprContext {
850                    qcx: &qcx,
851                    name: "SET clause",
852                    scope: &scope,
853                    relation_type: &relation_type,
854                    allow_aggregates: false,
855                    allow_subqueries: false,
856                    allow_parameters: true,
857                    allow_windows: false,
858                };
859                let expr = plan_expr(ecx, &value)?.cast_to(
860                    ecx,
861                    CastContext::Assignment,
862                    &typ.scalar_type,
863                )?;
864
865                if sets.insert(idx, expr).is_some() {
866                    sql_bail!("column {} set twice", name)
867                }
868            }
869            None => sql_bail!("unknown column {}", name),
870        };
871    }
872
873    let finishing = RowSetFinishing {
874        order_by: vec![],
875        limit: None,
876        offset: 0,
877        project: (0..desc.arity()).collect(),
878    };
879
880    Ok(ReadThenWritePlan {
881        id,
882        selection: get,
883        finishing,
884        assignments: sets,
885    })
886}
887
888// Adjust `get` to perform an existential subquery on `using` accounting for
889// `selection`.
890//
891// If `USING`, we essentially want to rewrite the query as a correlated
892// existential subquery, i.e.
893// ```
894// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
895// ```
896// However, we can't do that directly because of esoteric rules w/r/t `lateral`
897// subqueries.
898// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
899fn handle_mutation_using_clause(
900    qcx: &QueryContext,
901    selection: Option<Expr<Aug>>,
902    using: Vec<TableWithJoins<Aug>>,
903    get: HirRelationExpr,
904    outer_scope: Scope,
905) -> Result<HirRelationExpr, PlanError> {
906    // Plan `USING` as a cross-joined `FROM` without knowledge of the
907    // statement's `FROM` target. This prevents `lateral` subqueries from
908    // "seeing" the `FROM` target.
909    let (mut using_rel_expr, using_scope) =
910        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
911            let (left, left_scope) = l;
912            plan_join(
913                qcx,
914                left,
915                left_scope,
916                &Join {
917                    relation: TableFactor::NestedJoin {
918                        join: Box::new(twj),
919                        alias: None,
920                    },
921                    join_operator: JoinOperator::CrossJoin,
922                },
923            )
924        })?;
925
926    if let Some(expr) = selection {
927        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
928        // PG-like semantics e.g. expressing ambiguous column references. We put
929        // `USING...` first for no real reason, but making a different decision
930        // would require adjusting the column references on this relation
931        // differently.
932        let on = HirScalarExpr::literal_true();
933        let joined = using_rel_expr
934            .clone()
935            .join(get.clone(), on, JoinKind::Inner);
936        let joined_scope = using_scope.product(outer_scope)?;
937        let joined_relation_type = qcx.relation_type(&joined);
938
939        let ecx = &ExprContext {
940            qcx,
941            name: "WHERE clause",
942            scope: &joined_scope,
943            relation_type: &joined_relation_type,
944            allow_aggregates: false,
945            allow_subqueries: true,
946            allow_parameters: true,
947            allow_windows: false,
948        };
949
950        // Plan the filter expression on `FROM, USING...`.
951        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
952
953        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
954        // those to the right of `using_rel_expr`) to instead be correlated to
955        // the outer relation, i.e. `get`.
956        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
957        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
958        use mz_expr::visit::Visit;
959        expr.visit_mut_post(&mut |e| {
960            if let HirScalarExpr::Column(c, _name) = e {
961                if c.column >= using_rel_arity {
962                    c.level += 1;
963                    c.column -= using_rel_arity;
964                };
965            }
966        })?;
967
968        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
969        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
970        // relation.
971        using_rel_expr = using_rel_expr.filter(vec![expr]);
972    } else {
973        // Check that scopes are at compatible (i.e. do not double-reference
974        // same table), despite lack of selection
975        let _joined_scope = using_scope.product(outer_scope)?;
976    }
977    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
978    // whether any rows are returned, and not on the contents of those rows,
979    // the output list of the subquery is normally unimportant.
980    //
981    // This means we don't need to worry about projecting/mapping any
982    // additional expressions here.
983    //
984    // https://www.postgresql.org/docs/14/functions-subquery.html
985
986    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
987    Ok(get.filter(vec![using_rel_expr.exists()]))
988}
989
990#[derive(Debug)]
991pub(crate) struct CastRelationError {
992    pub(crate) column: usize,
993    pub(crate) source_type: SqlScalarType,
994    pub(crate) target_type: SqlScalarType,
995}
996
997/// Cast a relation from one type to another using the specified type of cast.
998///
999/// The length of `target_types` must match the arity of `expr`.
1000pub(crate) fn cast_relation<'a, I>(
1001    qcx: &QueryContext,
1002    ccx: CastContext,
1003    expr: HirRelationExpr,
1004    target_types: I,
1005) -> Result<HirRelationExpr, CastRelationError>
1006where
1007    I: IntoIterator<Item = &'a SqlScalarType>,
1008{
1009    let ecx = &ExprContext {
1010        qcx,
1011        name: "values",
1012        scope: &Scope::empty(),
1013        relation_type: &qcx.relation_type(&expr),
1014        allow_aggregates: false,
1015        allow_subqueries: true,
1016        allow_parameters: true,
1017        allow_windows: false,
1018    };
1019    let mut map_exprs = vec![];
1020    let mut project_key = vec![];
1021    for (i, target_typ) in target_types.into_iter().enumerate() {
1022        let expr = HirScalarExpr::column(i);
1023        // We plan every cast and check the evaluated expressions rather than
1024        // checking the types directly because of some complex casting rules
1025        // between types not expressed in `SqlScalarType` equality.
1026        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1027            Ok(cast_expr) => {
1028                if expr == cast_expr {
1029                    // Cast between types was unnecessary
1030                    project_key.push(i);
1031                } else {
1032                    // Cast between types required
1033                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1034                    map_exprs.push(cast_expr);
1035                }
1036            }
1037            Err(_) => {
1038                return Err(CastRelationError {
1039                    column: i,
1040                    source_type: ecx.scalar_type(&expr),
1041                    target_type: target_typ.clone(),
1042                });
1043            }
1044        }
1045    }
1046    Ok(expr.map(map_exprs).project(project_key))
1047}
1048
1049/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1050/// VIEW` statement.
1051pub fn plan_as_of(
1052    scx: &StatementContext,
1053    as_of: Option<AsOf<Aug>>,
1054) -> Result<QueryWhen, PlanError> {
1055    match as_of {
1056        None => Ok(QueryWhen::Immediately),
1057        Some(as_of) => match as_of {
1058            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1059            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1060        },
1061    }
1062}
1063
1064/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1065///
1066/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1067/// - not a constant,
1068/// - not castable to MzTimestamp,
1069/// - is null,
1070/// - contains an unmaterializable function,
1071/// - some other evaluation error occurs, e.g., a division by 0,
1072/// - contains aggregates, subqueries, parameters, or window function calls.
1073pub fn plan_as_of_or_up_to(
1074    scx: &StatementContext,
1075    mut expr: Expr<Aug>,
1076) -> Result<mz_repr::Timestamp, PlanError> {
1077    let scope = Scope::empty();
1078    let desc = RelationDesc::empty();
1079    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1080    // evaluated only once.)
1081    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1082    transform_ast::transform(scx, &mut expr)?;
1083    let ecx = &ExprContext {
1084        qcx: &qcx,
1085        name: "AS OF or UP TO",
1086        scope: &scope,
1087        relation_type: desc.typ(),
1088        allow_aggregates: false,
1089        allow_subqueries: false,
1090        allow_parameters: false,
1091        allow_windows: false,
1092    };
1093    let hir = plan_expr(ecx, &expr)?.cast_to(
1094        ecx,
1095        CastContext::Assignment,
1096        &SqlScalarType::MzTimestamp,
1097    )?;
1098    if hir.contains_unmaterializable() {
1099        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1100    }
1101    // At this point, we definitely have a constant expression:
1102    // - it can't contain any unmaterializable functions;
1103    // - it can't refer to any columns.
1104    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1105    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1106    // division by 0.
1107    let timestamp = hir
1108        .into_literal_mz_timestamp()
1109        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1110    Ok(timestamp)
1111}
1112
1113/// Plans an expression in the AS position of a `CREATE SECRET`.
1114pub fn plan_secret_as(
1115    scx: &StatementContext,
1116    mut expr: Expr<Aug>,
1117) -> Result<MirScalarExpr, PlanError> {
1118    let scope = Scope::empty();
1119    let desc = RelationDesc::empty();
1120    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1121
1122    transform_ast::transform(scx, &mut expr)?;
1123
1124    let ecx = &ExprContext {
1125        qcx: &qcx,
1126        name: "AS",
1127        scope: &scope,
1128        relation_type: desc.typ(),
1129        allow_aggregates: false,
1130        allow_subqueries: false,
1131        allow_parameters: false,
1132        allow_windows: false,
1133    };
1134    let expr = plan_expr(ecx, &expr)?
1135        .type_as(ecx, &SqlScalarType::Bytes)?
1136        .lower_uncorrelated(scx.catalog.system_vars())?;
1137    Ok(expr)
1138}
1139
1140/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1141pub fn plan_webhook_validate_using(
1142    scx: &StatementContext,
1143    validate_using: CreateWebhookSourceCheck<Aug>,
1144) -> Result<WebhookValidation, PlanError> {
1145    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1146
1147    let CreateWebhookSourceCheck {
1148        options,
1149        using: mut expr,
1150    } = validate_using;
1151
1152    let mut column_typs = vec![];
1153    let mut column_names = vec![];
1154
1155    let (bodies, headers, secrets) = options
1156        .map(|o| (o.bodies, o.headers, o.secrets))
1157        .unwrap_or_default();
1158
1159    // Append all of the bodies so they can be used in the expression.
1160    let mut body_tuples = vec![];
1161    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1162        let scalar_type = use_bytes
1163            .then_some(SqlScalarType::Bytes)
1164            .unwrap_or(SqlScalarType::String);
1165        let name = alias
1166            .map(|a| a.into_string())
1167            .unwrap_or_else(|| "body".to_string());
1168
1169        column_typs.push(SqlColumnType {
1170            scalar_type,
1171            nullable: false,
1172        });
1173        column_names.push(name);
1174
1175        // Store the column index so we can be sure to provide this body correctly.
1176        let column_idx = column_typs.len() - 1;
1177        // Double check we're consistent with column names.
1178        assert_eq!(
1179            column_idx,
1180            column_names.len() - 1,
1181            "body column names and types don't match"
1182        );
1183        body_tuples.push((column_idx, use_bytes));
1184    }
1185
1186    // Append all of the headers so they can be used in the expression.
1187    let mut header_tuples = vec![];
1188
1189    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1190        let value_type = use_bytes
1191            .then_some(SqlScalarType::Bytes)
1192            .unwrap_or(SqlScalarType::String);
1193        let name = alias
1194            .map(|a| a.into_string())
1195            .unwrap_or_else(|| "headers".to_string());
1196
1197        column_typs.push(SqlColumnType {
1198            scalar_type: SqlScalarType::Map {
1199                value_type: Box::new(value_type),
1200                custom_id: None,
1201            },
1202            nullable: false,
1203        });
1204        column_names.push(name);
1205
1206        // Store the column index so we can be sure to provide this body correctly.
1207        let column_idx = column_typs.len() - 1;
1208        // Double check we're consistent with column names.
1209        assert_eq!(
1210            column_idx,
1211            column_names.len() - 1,
1212            "header column names and types don't match"
1213        );
1214        header_tuples.push((column_idx, use_bytes));
1215    }
1216
1217    // Append all secrets so they can be used in the expression.
1218    let mut validation_secrets = vec![];
1219
1220    for CreateWebhookSourceSecret {
1221        secret,
1222        alias,
1223        use_bytes,
1224    } in secrets
1225    {
1226        // Either provide the secret to the validation expression as Bytes or a String.
1227        let scalar_type = use_bytes
1228            .then_some(SqlScalarType::Bytes)
1229            .unwrap_or(SqlScalarType::String);
1230
1231        column_typs.push(SqlColumnType {
1232            scalar_type,
1233            nullable: false,
1234        });
1235        let ResolvedItemName::Item {
1236            id,
1237            full_name: FullItemName { item, .. },
1238            ..
1239        } = secret
1240        else {
1241            return Err(PlanError::InvalidSecret(Box::new(secret)));
1242        };
1243
1244        // Plan the expression using the secret's alias, if one is provided.
1245        let name = if let Some(alias) = alias {
1246            alias.into_string()
1247        } else {
1248            item
1249        };
1250        column_names.push(name);
1251
1252        // Get the column index that corresponds for this secret, so we can make sure to provide the
1253        // secrets in the correct order during evaluation.
1254        let column_idx = column_typs.len() - 1;
1255        // Double check that our column names and types match.
1256        assert_eq!(
1257            column_idx,
1258            column_names.len() - 1,
1259            "column names and types don't match"
1260        );
1261
1262        validation_secrets.push(WebhookValidationSecret {
1263            id,
1264            column_idx,
1265            use_bytes,
1266        });
1267    }
1268
1269    let relation_typ = SqlRelationType::new(column_typs);
1270    let desc = RelationDesc::new(relation_typ, column_names.clone());
1271    let scope = Scope::from_source(None, column_names);
1272
1273    transform_ast::transform(scx, &mut expr)?;
1274
1275    let ecx = &ExprContext {
1276        qcx: &qcx,
1277        name: "CHECK",
1278        scope: &scope,
1279        relation_type: desc.typ(),
1280        allow_aggregates: false,
1281        allow_subqueries: false,
1282        allow_parameters: false,
1283        allow_windows: false,
1284    };
1285    let expr = plan_expr(ecx, &expr)?
1286        .type_as(ecx, &SqlScalarType::Bool)?
1287        .lower_uncorrelated(scx.catalog.system_vars())?;
1288    let validation = WebhookValidation {
1289        expression: expr,
1290        relation_desc: desc,
1291        bodies: body_tuples,
1292        headers: header_tuples,
1293        secrets: validation_secrets,
1294    };
1295    Ok(validation)
1296}
1297
1298pub fn plan_default_expr(
1299    scx: &StatementContext,
1300    expr: &Expr<Aug>,
1301    target_ty: &SqlScalarType,
1302) -> Result<HirScalarExpr, PlanError> {
1303    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1304    let ecx = &ExprContext {
1305        qcx: &qcx,
1306        name: "DEFAULT expression",
1307        scope: &Scope::empty(),
1308        relation_type: &SqlRelationType::empty(),
1309        allow_aggregates: false,
1310        allow_subqueries: false,
1311        allow_parameters: false,
1312        allow_windows: false,
1313    };
1314    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1315    Ok(hir)
1316}
1317
1318pub fn plan_params<'a>(
1319    scx: &'a StatementContext,
1320    params: Vec<Expr<Aug>>,
1321    desc: &StatementDesc,
1322) -> Result<Params, PlanError> {
1323    if params.len() != desc.param_types.len() {
1324        sql_bail!(
1325            "expected {} params, got {}",
1326            desc.param_types.len(),
1327            params.len()
1328        );
1329    }
1330
1331    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1332
1333    let mut datums = Row::default();
1334    let mut packer = datums.packer();
1335    let mut actual_types = Vec::new();
1336    let temp_storage = &RowArena::new();
1337    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1338        transform_ast::transform(scx, &mut expr)?;
1339
1340        let ecx = execute_expr_context(&qcx);
1341        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1342        let actual_ty = ecx.scalar_type(&ex);
1343        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1344            return Err(PlanError::WrongParameterType(
1345                i + 1,
1346                ecx.humanize_sql_scalar_type(expected_ty, false),
1347                ecx.humanize_sql_scalar_type(&actual_ty, false),
1348            ));
1349        }
1350        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1351        let evaled = ex.eval(&[], temp_storage)?;
1352        packer.push(evaled);
1353        actual_types.push(actual_ty);
1354    }
1355    Ok(Params {
1356        datums,
1357        execute_types: actual_types,
1358        expected_types: desc.param_types.clone(),
1359    })
1360}
1361
1362static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1363static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1364
1365/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1366pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1367    ExprContext {
1368        qcx,
1369        name: "EXECUTE",
1370        scope: &EXECUTE_CONTEXT_SCOPE,
1371        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1372        allow_aggregates: false,
1373        allow_subqueries: false,
1374        allow_parameters: false,
1375        allow_windows: false,
1376    }
1377}
1378
1379/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1380///
1381/// This is an assignment cast also in Postgres, see
1382/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1383pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1384    LazyLock::new(|| CastContext::Assignment);
1385
1386pub fn plan_index_exprs<'a>(
1387    scx: &'a StatementContext,
1388    on_desc: &RelationDesc,
1389    exprs: Vec<Expr<Aug>>,
1390) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1391    let scope = Scope::from_source(None, on_desc.iter_names());
1392    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1393
1394    let ecx = &ExprContext {
1395        qcx: &qcx,
1396        name: "CREATE INDEX",
1397        scope: &scope,
1398        relation_type: on_desc.typ(),
1399        allow_aggregates: false,
1400        allow_subqueries: false,
1401        allow_parameters: false,
1402        allow_windows: false,
1403    };
1404    let repr_col_types: Vec<ReprColumnType> = on_desc
1405        .typ()
1406        .column_types
1407        .iter()
1408        .map(ReprColumnType::from)
1409        .collect();
1410    let mut out = vec![];
1411    for mut expr in exprs {
1412        transform_ast::transform(scx, &mut expr)?;
1413        let expr = plan_expr_or_col_index(ecx, &expr)?;
1414        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1415        expr.reduce(&repr_col_types);
1416        out.push(expr);
1417    }
1418    Ok(out)
1419}
1420
1421fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1422    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1423        Some(column) => Ok(HirScalarExpr::column(column)),
1424        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1425    }
1426}
1427
1428fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1429    match e {
1430        Expr::Value(Value::Number(n)) => {
1431            let n = n.parse::<usize>().map_err(|e| {
1432                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1433            })?;
1434            if n < 1 || n > max {
1435                sql_bail!(
1436                    "column reference {} in {} is out of range (1 - {})",
1437                    n,
1438                    name,
1439                    max
1440                );
1441            }
1442            Ok(Some(n - 1))
1443        }
1444        _ => Ok(None),
1445    }
1446}
1447
1448struct PlannedQuery {
1449    expr: HirRelationExpr,
1450    scope: Scope,
1451    order_by: Vec<ColumnOrder>,
1452    limit: Option<HirScalarExpr>,
1453    /// `offset` is either
1454    /// - an Int64 literal
1455    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1456    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1457    ///   later.)
1458    offset: HirScalarExpr,
1459    project: Vec<usize>,
1460    group_size_hints: GroupSizeHints,
1461}
1462
1463fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1464    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1465}
1466
1467fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1468    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1469    // for the identifiers, so that they can be re-installed before returning.
1470    let cte_bindings = plan_ctes(qcx, q)?;
1471
1472    let limit = match &q.limit {
1473        None => None,
1474        Some(Limit {
1475            quantity,
1476            with_ties: false,
1477        }) => {
1478            let ecx = &ExprContext {
1479                qcx,
1480                name: "LIMIT",
1481                scope: &Scope::empty(),
1482                relation_type: &SqlRelationType::empty(),
1483                allow_aggregates: false,
1484                allow_subqueries: true,
1485                allow_parameters: true,
1486                allow_windows: false,
1487            };
1488            let limit = plan_expr(ecx, quantity)?;
1489            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491            let limit = if limit.is_constant() {
1492                let arena = RowArena::new();
1493                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1494
1495                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1496                // that the error happened in a LIMIT clause, so that we have better error msg for
1497                // something like `SELECT 5 LIMIT 'aaa'`.
1498                match limit.eval(&[], &arena)? {
1499                    d @ Datum::Int64(v) if v >= 0 => {
1500                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1501                    }
1502                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1503                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1504                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1505                }
1506            } else {
1507                // Gate non-constant LIMIT expressions behind a feature flag
1508                qcx.scx
1509                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1510                limit
1511            };
1512
1513            Some(limit)
1514        }
1515        Some(Limit {
1516            quantity: _,
1517            with_ties: true,
1518        }) => bail_unsupported!("FETCH ... WITH TIES"),
1519    };
1520
1521    let offset = match &q.offset {
1522        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1523        Some(offset) => {
1524            let ecx = &ExprContext {
1525                qcx,
1526                name: "OFFSET",
1527                scope: &Scope::empty(),
1528                relation_type: &SqlRelationType::empty(),
1529                allow_aggregates: false,
1530                allow_subqueries: false,
1531                allow_parameters: true,
1532                allow_windows: false,
1533            };
1534            let offset = plan_expr(ecx, offset)?;
1535            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1536
1537            let offset = if offset.is_constant() {
1538                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1539                let offset_value = offset_into_value(offset)?;
1540                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1541            } else {
1542                // The only case when this is allowed to not be a constant is if it contains
1543                // parameters. (In which case, we'll later check that it's a constant after
1544                // parameter binding.)
1545                if !offset.contains_parameters() {
1546                    return Err(PlanError::InvalidOffset(format!(
1547                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1548                        offset
1549                    )));
1550                }
1551                offset
1552            };
1553            offset
1554        }
1555    };
1556
1557    let mut planned_query = match &q.body {
1558        SetExpr::Select(s) => {
1559            // Extract query options.
1560            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1561            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1562
1563            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1564            PlannedQuery {
1565                expr: plan.expr,
1566                scope: plan.scope,
1567                order_by: plan.order_by,
1568                project: plan.project,
1569                limit,
1570                offset,
1571                group_size_hints,
1572            }
1573        }
1574        _ => {
1575            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1576            let ecx = &ExprContext {
1577                qcx,
1578                name: "ORDER BY clause of a set expression",
1579                scope: &scope,
1580                relation_type: &qcx.relation_type(&expr),
1581                allow_aggregates: false,
1582                allow_subqueries: true,
1583                allow_parameters: true,
1584                allow_windows: false,
1585            };
1586            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1587            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1588            let project = (0..ecx.relation_type.arity()).collect();
1589            PlannedQuery {
1590                expr: expr.map(map_exprs),
1591                scope,
1592                order_by,
1593                limit,
1594                project,
1595                offset,
1596                group_size_hints: GroupSizeHints::default(),
1597            }
1598        }
1599    };
1600
1601    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1602    match &q.ctes {
1603        CteBlock::Simple(_) => {
1604            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1605                if let Some(cte) = qcx.ctes.remove(&id) {
1606                    planned_query.expr = HirRelationExpr::Let {
1607                        name: cte.name,
1608                        id: id.clone(),
1609                        value: Box::new(value),
1610                        body: Box::new(planned_query.expr),
1611                    };
1612                }
1613                if let Some(shadowed_val) = shadowed_val {
1614                    qcx.ctes.insert(id, shadowed_val);
1615                }
1616            }
1617        }
1618        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1619            let MutRecBlockOptionExtracted {
1620                recursion_limit,
1621                return_at_recursion_limit,
1622                error_at_recursion_limit,
1623                seen: _,
1624            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1625            let limit = match (
1626                recursion_limit,
1627                return_at_recursion_limit,
1628                error_at_recursion_limit,
1629            ) {
1630                (None, None, None) => None,
1631                (Some(max_iters), None, None) => {
1632                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1633                }
1634                (None, Some(max_iters), None) => Some((max_iters, true)),
1635                (None, None, Some(max_iters)) => Some((max_iters, false)),
1636                _ => {
1637                    return Err(InvalidWmrRecursionLimit(
1638                        "More than one recursion limit given. \
1639                         Please give at most one of RECURSION LIMIT, \
1640                         ERROR AT RECURSION LIMIT, \
1641                         RETURN AT RECURSION LIMIT."
1642                            .to_owned(),
1643                    ));
1644                }
1645            }
1646            .try_map(|(max_iters, return_at_limit)| {
1647                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1648                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1649                        "Recursion limit has to be greater than 0.".to_owned(),
1650                    ))?,
1651                    return_at_limit: *return_at_limit,
1652                })
1653            })?;
1654
1655            let mut bindings = Vec::new();
1656            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1657                if let Some(cte) = qcx.ctes.remove(&id) {
1658                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1659                }
1660                if let Some(shadowed_val) = shadowed_val {
1661                    qcx.ctes.insert(id, shadowed_val);
1662                }
1663            }
1664            if !bindings.is_empty() {
1665                planned_query.expr = HirRelationExpr::LetRec {
1666                    limit,
1667                    bindings,
1668                    body: Box::new(planned_query.expr),
1669                }
1670            }
1671        }
1672    }
1673
1674    Ok(planned_query)
1675}
1676
1677/// Converts an OFFSET expression into a value.
1678pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1679    let offset = offset
1680        .try_into_literal_int64()
1681        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1682    if offset < 0 {
1683        return Err(PlanError::InvalidOffset(format!(
1684            "must not be negative, got {}",
1685            offset
1686        )));
1687    }
1688    Ok(offset)
1689}
1690
1691generate_extracted_config!(
1692    MutRecBlockOption,
1693    (RecursionLimit, u64),
1694    (ReturnAtRecursionLimit, u64),
1695    (ErrorAtRecursionLimit, u64)
1696);
1697
1698/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1699///
1700/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1701/// shadowed value that can be reinstalled once the planning has completed.
1702pub fn plan_ctes(
1703    qcx: &mut QueryContext,
1704    q: &Query<Aug>,
1705) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1706    // Accumulate planned expressions and shadowed descriptions.
1707    let mut result = Vec::new();
1708    // Retain the old descriptions of CTE bindings so that we can restore them
1709    // after we're done planning this SELECT.
1710    let mut shadowed_descs = BTreeMap::new();
1711
1712    // A reused identifier indicates a reused name.
1713    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1714        sql_bail!(
1715            "WITH query name {} specified more than once",
1716            normalize::ident_ref(ident).quoted()
1717        )
1718    }
1719
1720    match &q.ctes {
1721        CteBlock::Simple(ctes) => {
1722            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1723            for cte in ctes.iter() {
1724                let cte_name = normalize::ident(cte.alias.name.clone());
1725                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1726                let typ = qcx.relation_type(&val);
1727                let mut desc = RelationDesc::new(typ, scope.column_names());
1728                plan_utils::maybe_rename_columns(
1729                    format!("CTE {}", cte.alias.name),
1730                    &mut desc,
1731                    &cte.alias.columns,
1732                )?;
1733                // Capture the prior value if it exists, so that it can be re-installed.
1734                let shadowed = qcx.ctes.insert(
1735                    cte.id,
1736                    CteDesc {
1737                        name: cte_name,
1738                        desc,
1739                    },
1740                );
1741
1742                result.push((cte.id, val, shadowed));
1743            }
1744        }
1745        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1746            // Insert column types into `qcx.ctes` first for recursive bindings.
1747            for cte in ctes.iter() {
1748                let cte_name = normalize::ident(cte.name.clone());
1749                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1750                for column in cte.columns.iter() {
1751                    desc_columns.push((
1752                        normalize::column_name(column.name.clone()),
1753                        SqlColumnType {
1754                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1755                            nullable: true,
1756                        },
1757                    ));
1758                }
1759                let desc = RelationDesc::from_names_and_types(desc_columns);
1760                let shadowed = qcx.ctes.insert(
1761                    cte.id,
1762                    CteDesc {
1763                        name: cte_name,
1764                        desc,
1765                    },
1766                );
1767                // Capture the prior value if it exists, so that it can be re-installed.
1768                if let Some(shadowed) = shadowed {
1769                    shadowed_descs.insert(cte.id, shadowed);
1770                }
1771            }
1772
1773            // Plan all CTEs and validate the proposed types.
1774            for cte in ctes.iter() {
1775                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1776
1777                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1778
1779                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1780                    // Once WMR CTEs support NOT NULL constraints, check that
1781                    // nullability of derived column types are compatible.
1782                    sql_bail!(
1783                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1784                    );
1785                }
1786
1787                if !proposed_typ.keys.is_empty() {
1788                    // Once WMR CTEs support keys, check that keys exactly
1789                    // overlap.
1790                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1791                }
1792
1793                // Validate that the derived and proposed types are the same.
1794                let derived_typ = qcx.relation_type(&val);
1795
1796                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1797                    let cte_name = normalize::ident(cte.name.clone());
1798                    let proposed_typ = proposed_typ
1799                        .column_types
1800                        .iter()
1801                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1802                        .collect::<Vec<_>>();
1803                    let inferred_typ = derived_typ
1804                        .column_types
1805                        .iter()
1806                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1807                        .collect::<Vec<_>>();
1808                    Err(PlanError::RecursiveTypeMismatch(
1809                        cte_name,
1810                        proposed_typ,
1811                        inferred_typ,
1812                    ))
1813                };
1814
1815                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1816                    return type_err(proposed_typ, derived_typ);
1817                }
1818
1819                // Cast derived types to proposed types or error.
1820                let val = match cast_relation(
1821                    qcx,
1822                    // Choose `CastContext::Assignment`` because the user has
1823                    // been explicit about the types they expect. Choosing
1824                    // `CastContext::Implicit` is not "strong" enough to impose
1825                    // typmods from proposed types onto values.
1826                    CastContext::Assignment,
1827                    val,
1828                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1829                ) {
1830                    Ok(val) => val,
1831                    Err(_) => return type_err(proposed_typ, derived_typ),
1832                };
1833
1834                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1835            }
1836        }
1837    }
1838
1839    Ok(result)
1840}
1841
1842pub fn plan_nested_query(
1843    qcx: &mut QueryContext,
1844    q: &Query<Aug>,
1845) -> Result<(HirRelationExpr, Scope), PlanError> {
1846    let PlannedQuery {
1847        mut expr,
1848        scope,
1849        order_by,
1850        limit,
1851        offset,
1852        project,
1853        group_size_hints,
1854    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1855    if limit.is_some()
1856        || !offset
1857            .clone()
1858            .try_into_literal_int64()
1859            .is_ok_and(|offset| offset == 0)
1860    {
1861        expr = HirRelationExpr::top_k(
1862            expr,
1863            vec![],
1864            order_by,
1865            limit,
1866            offset,
1867            group_size_hints.limit_input_group_size,
1868        );
1869    }
1870    Ok((expr.project(project), scope))
1871}
1872
1873fn plan_set_expr(
1874    qcx: &mut QueryContext,
1875    q: &SetExpr<Aug>,
1876) -> Result<(HirRelationExpr, Scope), PlanError> {
1877    match q {
1878        SetExpr::Select(select) => {
1879            let order_by_exprs = Vec::new();
1880            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1881            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1882            // should not have planned any ordering.
1883            assert!(plan.order_by.is_empty());
1884            Ok((plan.expr.project(plan.project), plan.scope))
1885        }
1886        SetExpr::SetOperation {
1887            op,
1888            all,
1889            left,
1890            right,
1891        } => {
1892            // Plan the LHS and RHS.
1893            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1894            let (right_expr, right_scope) =
1895                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1896
1897            // Validate that the LHS and RHS are the same width.
1898            let left_type = qcx.relation_type(&left_expr);
1899            let right_type = qcx.relation_type(&right_expr);
1900            if left_type.arity() != right_type.arity() {
1901                sql_bail!(
1902                    "each {} query must have the same number of columns: {} vs {}",
1903                    op,
1904                    left_type.arity(),
1905                    right_type.arity(),
1906                );
1907            }
1908
1909            // Match the types of the corresponding columns on the LHS and RHS
1910            // using the normal type coercion rules. This is equivalent to
1911            // `coerce_homogeneous_exprs`, but implemented in terms of
1912            // `HirRelationExpr` rather than `HirScalarExpr`.
1913            let left_ecx = &ExprContext {
1914                qcx,
1915                name: &op.to_string(),
1916                scope: &left_scope,
1917                relation_type: &left_type,
1918                allow_aggregates: false,
1919                allow_subqueries: false,
1920                allow_parameters: false,
1921                allow_windows: false,
1922            };
1923            let right_ecx = &ExprContext {
1924                qcx,
1925                name: &op.to_string(),
1926                scope: &right_scope,
1927                relation_type: &right_type,
1928                allow_aggregates: false,
1929                allow_subqueries: false,
1930                allow_parameters: false,
1931                allow_windows: false,
1932            };
1933            let mut left_casts = vec![];
1934            let mut right_casts = vec![];
1935            for (i, (left_type, right_type)) in left_type
1936                .column_types
1937                .iter()
1938                .zip_eq(right_type.column_types.iter())
1939                .enumerate()
1940            {
1941                let types = &[
1942                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1943                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1944                ];
1945                let target =
1946                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1947                match typeconv::plan_cast(
1948                    left_ecx,
1949                    CastContext::Implicit,
1950                    HirScalarExpr::column(i),
1951                    &target,
1952                ) {
1953                    Ok(expr) => left_casts.push(expr),
1954                    Err(_) => sql_bail!(
1955                        "{} types {} and {} cannot be matched",
1956                        op,
1957                        qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1958                        qcx.humanize_sql_scalar_type(&target, false),
1959                    ),
1960                }
1961                match typeconv::plan_cast(
1962                    right_ecx,
1963                    CastContext::Implicit,
1964                    HirScalarExpr::column(i),
1965                    &target,
1966                ) {
1967                    Ok(expr) => right_casts.push(expr),
1968                    Err(_) => sql_bail!(
1969                        "{} types {} and {} cannot be matched",
1970                        op,
1971                        qcx.humanize_sql_scalar_type(&target, false),
1972                        qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1973                    ),
1974                }
1975            }
1976            let lhs = if left_casts
1977                .iter()
1978                .enumerate()
1979                .any(|(i, e)| e != &HirScalarExpr::column(i))
1980            {
1981                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1982                left_expr.map(left_casts).project(project_key)
1983            } else {
1984                left_expr
1985            };
1986            let rhs = if right_casts
1987                .iter()
1988                .enumerate()
1989                .any(|(i, e)| e != &HirScalarExpr::column(i))
1990            {
1991                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1992                right_expr.map(right_casts).project(project_key)
1993            } else {
1994                right_expr
1995            };
1996
1997            let relation_expr = match op {
1998                SetOperator::Union => {
1999                    if *all {
2000                        lhs.union(rhs)
2001                    } else {
2002                        lhs.union(rhs).distinct()
2003                    }
2004                }
2005                SetOperator::Except => Hir::except(all, lhs, rhs),
2006                SetOperator::Intersect => {
2007                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
2008                    // Though we believe that render() does The Right Thing (TM)
2009                    // Also note that we do *not* need another threshold() at the end of the method chain
2010                    // because the right-hand side of the outer union only produces existing records,
2011                    // i.e., the record counts for differential data flow definitely remain non-negative.
2012                    let left_clone = lhs.clone();
2013                    if *all {
2014                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2015                    } else {
2016                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2017                            .distinct()
2018                    }
2019                }
2020            };
2021            let scope = Scope::from_source(
2022                None,
2023                // Column names are taken from the left, as in Postgres.
2024                left_scope.column_names(),
2025            );
2026
2027            Ok((relation_expr, scope))
2028        }
2029        SetExpr::Values(Values(values)) => plan_values(qcx, values),
2030        SetExpr::Table(name) => {
2031            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2032            Ok((expr, scope))
2033        }
2034        SetExpr::Query(query) => {
2035            let (expr, scope) = plan_nested_query(qcx, query)?;
2036            Ok((expr, scope))
2037        }
2038        SetExpr::Show(stmt) => {
2039            // The create SQL definition of involving this query, will have the explicit `SHOW`
2040            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2041            // current schema of the executing user. When Materialize restarts and tries to re-plan
2042            // these queries, it will only have access to the raw `SHOW` command and have no idea
2043            // what schema to use. As a result Materialize will fail to boot.
2044            //
2045            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2046            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2047            // However, banning show commands in views gives us more flexibility to change their
2048            // output.
2049            //
2050            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2051            // with all show commands expanded into their equivalent SELECT statements.
2052            if !qcx.lifetime.allow_show() {
2053                return Err(PlanError::ShowCommandInView);
2054            }
2055
2056            // Some SHOW statements are a SELECT query. Others produces Rows
2057            // directly. Convert both of these to the needed Hir and Scope.
2058            fn to_hirscope(
2059                plan: ShowCreatePlan,
2060                desc: StatementDesc,
2061            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2062                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2063                let desc = desc.relation_desc.expect("must exist");
2064                let scope = Scope::from_source(None, desc.iter_names());
2065                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2066                Ok((expr, scope))
2067            }
2068
2069            match stmt.clone() {
2070                ShowStatement::ShowColumns(stmt) => {
2071                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2072                }
2073                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2074                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2075                    show::describe_show_create_connection(qcx.scx, stmt)?,
2076                ),
2077                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2078                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2079                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2080                ),
2081                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2082                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2083                    show::describe_show_create_index(qcx.scx, stmt)?,
2084                ),
2085                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2086                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2087                    show::describe_show_create_sink(qcx.scx, stmt)?,
2088                ),
2089                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2090                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2091                    show::describe_show_create_source(qcx.scx, stmt)?,
2092                ),
2093                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2094                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2095                    show::describe_show_create_table(qcx.scx, stmt)?,
2096                ),
2097                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2098                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2099                    show::describe_show_create_view(qcx.scx, stmt)?,
2100                ),
2101                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2102                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2103                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2104                ),
2105                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2106                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2107                    show::describe_show_create_type(qcx.scx, stmt)?,
2108                ),
2109                ShowStatement::ShowObjects(stmt) => {
2110                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2111                }
2112                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2113                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2114            }
2115        }
2116    }
2117}
2118
2119/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2120fn plan_values(
2121    qcx: &QueryContext,
2122    values: &[Vec<Expr<Aug>>],
2123) -> Result<(HirRelationExpr, Scope), PlanError> {
2124    assert!(!values.is_empty());
2125
2126    let ecx = &ExprContext {
2127        qcx,
2128        name: "VALUES",
2129        scope: &Scope::empty(),
2130        relation_type: &SqlRelationType::empty(),
2131        allow_aggregates: false,
2132        allow_subqueries: true,
2133        allow_parameters: true,
2134        allow_windows: false,
2135    };
2136
2137    let ncols = values[0].len();
2138    let nrows = values.len();
2139
2140    // Arrange input expressions by columns, not rows, so that we can
2141    // call `coerce_homogeneous_exprs` on each column.
2142    let mut cols = vec![vec![]; ncols];
2143    for row in values {
2144        if row.len() != ncols {
2145            sql_bail!(
2146                "VALUES expression has varying number of columns: {} vs {}",
2147                row.len(),
2148                ncols
2149            );
2150        }
2151        for (i, v) in row.iter().enumerate() {
2152            cols[i].push(v);
2153        }
2154    }
2155
2156    // Plan each column.
2157    let mut col_iters = Vec::with_capacity(ncols);
2158    let mut col_types = Vec::with_capacity(ncols);
2159    for col in &cols {
2160        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2161        let mut col_type = ecx.column_type(&col[0]);
2162        for val in &col[1..] {
2163            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2164        }
2165        col_types.push(col_type);
2166        col_iters.push(col.into_iter());
2167    }
2168
2169    // Build constant relation.
2170    let mut exprs = vec![];
2171    for _ in 0..nrows {
2172        for i in 0..ncols {
2173            exprs.push(col_iters[i].next().unwrap());
2174        }
2175    }
2176    let out = HirRelationExpr::CallTable {
2177        func: TableFunc::Wrap {
2178            width: ncols,
2179            types: col_types,
2180        },
2181        exprs,
2182    };
2183
2184    // Build column names.
2185    let mut scope = Scope::empty();
2186    for i in 0..ncols {
2187        let name = format!("column{}", i + 1);
2188        scope.items.push(ScopeItem::from_column_name(name));
2189    }
2190
2191    Ok((out, scope))
2192}
2193
2194/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2195/// statement.
2196///
2197/// This is special-cased in PostgreSQL and different enough from `plan_values`
2198/// that it is easier to use a separate function entirely. Unlike a normal
2199/// `VALUES` clause, each value is coerced to the type of the target table
2200/// via an assignment cast.
2201///
2202/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2203fn plan_values_insert(
2204    qcx: &QueryContext,
2205    target_names: &[&ColumnName],
2206    target_types: &[&SqlScalarType],
2207    values: &[Vec<Expr<Aug>>],
2208) -> Result<HirRelationExpr, PlanError> {
2209    assert!(!values.is_empty());
2210
2211    if !values.iter().map(|row| row.len()).all_equal() {
2212        sql_bail!("VALUES lists must all be the same length");
2213    }
2214
2215    let ecx = &ExprContext {
2216        qcx,
2217        name: "VALUES",
2218        scope: &Scope::empty(),
2219        relation_type: &SqlRelationType::empty(),
2220        allow_aggregates: false,
2221        allow_subqueries: true,
2222        allow_parameters: true,
2223        allow_windows: false,
2224    };
2225
2226    let mut exprs = vec![];
2227    let mut types = vec![];
2228    for row in values {
2229        if row.len() > target_names.len() {
2230            sql_bail!("INSERT has more expressions than target columns");
2231        }
2232        for (column, val) in row.into_iter().enumerate() {
2233            let target_type = &target_types[column];
2234            let val = plan_expr(ecx, val)?;
2235            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2236            let source_type = &ecx.scalar_type(&val);
2237            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2238                Ok(val) => val,
2239                Err(_) => sql_bail!(
2240                    "column {} is of type {} but expression is of type {}",
2241                    target_names[column].quoted(),
2242                    qcx.humanize_sql_scalar_type(target_type, false),
2243                    qcx.humanize_sql_scalar_type(source_type, false),
2244                ),
2245            };
2246            if column >= types.len() {
2247                types.push(ecx.column_type(&val));
2248            } else {
2249                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2250            }
2251            exprs.push(val);
2252        }
2253    }
2254
2255    Ok(HirRelationExpr::CallTable {
2256        func: TableFunc::Wrap {
2257            width: values[0].len(),
2258            types,
2259        },
2260        exprs,
2261    })
2262}
2263
2264fn plan_join_identity() -> (HirRelationExpr, Scope) {
2265    let typ = SqlRelationType::new(vec![]);
2266    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2267    let scope = Scope::empty();
2268    (expr, scope)
2269}
2270
2271/// Describes how to execute a SELECT query.
2272///
2273/// `order_by` describes how to order the rows in `expr` *before* applying the
2274/// projection. The `scope` describes the columns in `expr` *after* the
2275/// projection has been applied.
2276#[derive(Debug)]
2277struct SelectPlan {
2278    expr: HirRelationExpr,
2279    scope: Scope,
2280    order_by: Vec<ColumnOrder>,
2281    project: Vec<usize>,
2282}
2283
2284generate_extracted_config!(
2285    SelectOption,
2286    (ExpectedGroupSize, u64),
2287    (AggregateInputGroupSize, u64),
2288    (DistinctOnInputGroupSize, u64),
2289    (LimitInputGroupSize, u64)
2290);
2291
2292/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2293///
2294/// Normally, the ORDER BY clause occurs after the columns specified in the
2295/// SELECT list have been projected. In a query like
2296///
2297///   CREATE TABLE (a int, b int)
2298///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2299///
2300/// it is valid to refer to `a`, because it is explicitly selected, but it would
2301/// not be valid to refer to unselected column `b`.
2302///
2303/// But PostgreSQL extends the standard to permit queries like
2304///
2305///   SELECT a FROM t ORDER BY b
2306///
2307/// where expressions in the ORDER BY clause can refer to *both* input columns
2308/// and output columns.
2309fn plan_select_from_where(
2310    qcx: &QueryContext,
2311    mut s: Select<Aug>,
2312    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2313) -> Result<SelectPlan, PlanError> {
2314    // TODO: Both `s` and `order_by_exprs` are not references because the
2315    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2316    // table function support (the UUID mapping). Attempt to change this so callers
2317    // don't need to clone the Select.
2318
2319    // Extract query options.
2320    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2321    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2322
2323    // Step 1. Handle FROM clause, including joins.
2324    let (mut relation_expr, mut from_scope) =
2325        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2326            let (left, left_scope) = l;
2327            plan_join(
2328                qcx,
2329                left,
2330                left_scope,
2331                &Join {
2332                    relation: TableFactor::NestedJoin {
2333                        join: Box::new(twj.clone()),
2334                        alias: None,
2335                    },
2336                    join_operator: JoinOperator::CrossJoin,
2337                },
2338            )
2339        })?;
2340
2341    // Step 2. Handle WHERE clause.
2342    if let Some(selection) = &s.selection {
2343        let ecx = &ExprContext {
2344            qcx,
2345            name: "WHERE clause",
2346            scope: &from_scope,
2347            relation_type: &qcx.relation_type(&relation_expr),
2348            allow_aggregates: false,
2349            allow_subqueries: true,
2350            allow_parameters: true,
2351            allow_windows: false,
2352        };
2353        let expr = plan_expr(ecx, selection)
2354            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2355            .type_as(ecx, &SqlScalarType::Bool)?;
2356        relation_expr = relation_expr.filter(vec![expr]);
2357    }
2358
2359    // Step 3. Gather aggregates and table functions.
2360    // (But skip window aggregates.)
2361    let (aggregates, table_funcs) = {
2362        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2363        visitor.visit_select_mut(&mut s);
2364        for o in order_by_exprs.iter_mut() {
2365            visitor.visit_order_by_expr_mut(o);
2366        }
2367        visitor.into_result()?
2368    };
2369    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2370    if !table_funcs.is_empty() {
2371        let (expr, scope) = plan_scalar_table_funcs(
2372            qcx,
2373            table_funcs,
2374            &mut table_func_names,
2375            &relation_expr,
2376            &from_scope,
2377        )?;
2378        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2379        from_scope = from_scope.product(scope)?;
2380    }
2381
2382    // Step 4. Expand SELECT clause.
2383    let projection = {
2384        let ecx = &ExprContext {
2385            qcx,
2386            name: "SELECT clause",
2387            scope: &from_scope,
2388            relation_type: &qcx.relation_type(&relation_expr),
2389            allow_aggregates: true,
2390            allow_subqueries: true,
2391            allow_parameters: true,
2392            allow_windows: true,
2393        };
2394        let mut out = vec![];
2395        for si in &s.projection {
2396            if *si == SelectItem::Wildcard && s.from.is_empty() {
2397                sql_bail!("SELECT * with no tables specified is not valid");
2398            }
2399            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2400        }
2401        out
2402    };
2403
2404    // Step 5. Handle GROUP BY clause.
2405    // This will also plan the aggregates gathered in Step 3.
2406    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2407    let (mut group_scope, select_all_mapping) = {
2408        // Compute GROUP BY expressions.
2409        let ecx = &ExprContext {
2410            qcx,
2411            name: "GROUP BY clause",
2412            scope: &from_scope,
2413            relation_type: &qcx.relation_type(&relation_expr),
2414            allow_aggregates: false,
2415            allow_subqueries: true,
2416            allow_parameters: true,
2417            allow_windows: false,
2418        };
2419        let mut group_key = vec![];
2420        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2421        let mut group_hir_exprs = vec![];
2422        let mut group_scope = Scope::empty();
2423        let mut select_all_mapping = BTreeMap::new();
2424
2425        for group_expr in &s.group_by {
2426            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2427            let new_column = group_key.len();
2428
2429            if let Some(group_expr) = group_expr {
2430                // Multiple AST expressions can map to the same HIR expression.
2431                // If we already have a ScopeItem for this HIR, we can add this
2432                // next AST expression to its set
2433                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2434                    existing_scope_item.exprs.insert(group_expr.clone());
2435                    continue;
2436                }
2437            }
2438
2439            let mut scope_item = if let HirScalarExpr::Column(
2440                ColumnRef {
2441                    level: 0,
2442                    column: old_column,
2443                },
2444                _name,
2445            ) = &expr
2446            {
2447                // If we later have `SELECT foo.*` then we have to find all
2448                // the `foo` items in `from_scope` and figure out where they
2449                // ended up in `group_scope`. This is really hard to do
2450                // right using SQL name resolution, so instead we just track
2451                // the movement here.
2452                select_all_mapping.insert(*old_column, new_column);
2453                let scope_item = ecx.scope.items[*old_column].clone();
2454                scope_item
2455            } else {
2456                ScopeItem::empty()
2457            };
2458
2459            if let Some(group_expr) = group_expr.cloned() {
2460                scope_item.exprs.insert(group_expr);
2461            }
2462
2463            group_key.push(from_scope.len() + group_exprs.len());
2464            group_hir_exprs.push(expr.clone());
2465            group_exprs.insert(expr, scope_item);
2466        }
2467
2468        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2469        for expr in &group_hir_exprs {
2470            if let Some(scope_item) = group_exprs.remove(expr) {
2471                group_scope.items.push(scope_item);
2472            }
2473        }
2474
2475        // Plan aggregates.
2476        let ecx = &ExprContext {
2477            qcx,
2478            name: "aggregate function",
2479            scope: &from_scope,
2480            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2481            allow_aggregates: false,
2482            allow_subqueries: true,
2483            allow_parameters: true,
2484            allow_windows: false,
2485        };
2486        let mut agg_exprs = vec![];
2487        for sql_function in aggregates {
2488            if sql_function.over.is_some() {
2489                unreachable!(
2490                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2491                );
2492            }
2493            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2494            group_scope
2495                .items
2496                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2497        }
2498        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2499            // apply GROUP BY / aggregates
2500            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2501                group_key,
2502                agg_exprs,
2503                group_size_hints.aggregate_input_group_size,
2504            );
2505
2506            // For every old column that wasn't a group key, add a scope item
2507            // that errors when referenced. We can't simply drop these items
2508            // from scope. These items need to *exist* because they might shadow
2509            // variables in outer scopes that would otherwise be valid to
2510            // reference, but accessing them needs to produce an error.
2511            for i in 0..from_scope.len() {
2512                if !select_all_mapping.contains_key(&i) {
2513                    let scope_item = &ecx.scope.items[i];
2514                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2515                        table_name: scope_item.table_name.clone(),
2516                        column_name: scope_item.column_name.clone(),
2517                        allow_unqualified_references: scope_item.allow_unqualified_references,
2518                    });
2519                }
2520            }
2521
2522            (group_scope, select_all_mapping)
2523        } else {
2524            // if no GROUP BY, aggregates or having then all columns remain in scope
2525            (
2526                from_scope.clone(),
2527                (0..from_scope.len()).map(|i| (i, i)).collect(),
2528            )
2529        }
2530    };
2531
2532    // Step 6. Handle HAVING clause.
2533    if let Some(ref having) = s.having {
2534        let ecx = &ExprContext {
2535            qcx,
2536            name: "HAVING clause",
2537            scope: &group_scope,
2538            relation_type: &qcx.relation_type(&relation_expr),
2539            allow_aggregates: true,
2540            allow_subqueries: true,
2541            allow_parameters: true,
2542            allow_windows: false,
2543        };
2544        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2545        relation_expr = relation_expr.filter(vec![expr]);
2546    }
2547
2548    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2549    // (This includes window aggregations.)
2550    //
2551    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2552    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2553    //
2554    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2555    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2556    // and can't be part of a bigger expression.
2557    // See https://www.postgresql.org/docs/current/queries-order.html:
2558    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2559    // expression"
2560    let window_funcs = {
2561        let mut visitor = WindowFuncCollector::default();
2562        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2563        // window functions are excluded from other things by `allow_windows` being false when
2564        // planning those before this code).
2565        visitor.visit_select(&s);
2566        for o in order_by_exprs.iter() {
2567            visitor.visit_order_by_expr(o);
2568        }
2569        visitor.into_result()
2570    };
2571    for window_func in window_funcs {
2572        let ecx = &ExprContext {
2573            qcx,
2574            name: "window function",
2575            scope: &group_scope,
2576            relation_type: &qcx.relation_type(&relation_expr),
2577            allow_aggregates: true,
2578            allow_subqueries: true,
2579            allow_parameters: true,
2580            allow_windows: true,
2581        };
2582        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2583        group_scope.items.push(ScopeItem::from_expr(window_func));
2584    }
2585    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2586    // been already planned now. However, we should still set `allow_windows: true` for the
2587    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2588    // msg if an OVER clause is missing from a window function.
2589
2590    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2591    if let Some(ref qualify) = s.qualify {
2592        let ecx = &ExprContext {
2593            qcx,
2594            name: "QUALIFY clause",
2595            scope: &group_scope,
2596            relation_type: &qcx.relation_type(&relation_expr),
2597            allow_aggregates: true,
2598            allow_subqueries: true,
2599            allow_parameters: true,
2600            allow_windows: true,
2601        };
2602        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2603        relation_expr = relation_expr.filter(vec![expr]);
2604    }
2605
2606    // Step 9. Handle SELECT clause.
2607    let output_columns = {
2608        let mut new_exprs = vec![];
2609        let mut new_type = qcx.relation_type(&relation_expr);
2610        let mut output_columns = vec![];
2611        for (select_item, column_name) in &projection {
2612            let ecx = &ExprContext {
2613                qcx,
2614                name: "SELECT clause",
2615                scope: &group_scope,
2616                relation_type: &new_type,
2617                allow_aggregates: true,
2618                allow_subqueries: true,
2619                allow_parameters: true,
2620                allow_windows: true,
2621            };
2622            let expr = match select_item {
2623                ExpandedSelectItem::InputOrdinal(i) => {
2624                    if let Some(column) = select_all_mapping.get(i).copied() {
2625                        HirScalarExpr::column(column)
2626                    } else {
2627                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2628                    }
2629                }
2630                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2631            };
2632            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2633                // Simple column reference; no need to map on a new expression.
2634                output_columns.push((column, column_name));
2635            } else {
2636                // Complicated expression that requires a map expression. We
2637                // update `group_scope` as we go so that future expressions that
2638                // are textually identical to this one can reuse it. This
2639                // duplicate detection is required for proper determination of
2640                // ambiguous column references with SQL92-style `ORDER BY`
2641                // items. See `plan_order_by_or_distinct_expr` for more.
2642                let typ = ecx.column_type(&expr);
2643                new_type.column_types.push(typ);
2644                new_exprs.push(expr);
2645                output_columns.push((group_scope.len(), column_name));
2646                group_scope
2647                    .items
2648                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2649            }
2650        }
2651        relation_expr = relation_expr.map(new_exprs);
2652        output_columns
2653    };
2654    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2655
2656    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2657    let order_by = {
2658        let relation_type = qcx.relation_type(&relation_expr);
2659        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2660            &ExprContext {
2661                qcx,
2662                name: "ORDER BY clause",
2663                scope: &group_scope,
2664                relation_type: &relation_type,
2665                allow_aggregates: true,
2666                allow_subqueries: true,
2667                allow_parameters: true,
2668                allow_windows: true,
2669            },
2670            &order_by_exprs,
2671            &output_columns,
2672        )?;
2673
2674        match s.distinct {
2675            None => relation_expr = relation_expr.map(map_exprs),
2676            Some(Distinct::EntireRow) => {
2677                if relation_type.arity() == 0 {
2678                    sql_bail!("SELECT DISTINCT must have at least one column");
2679                }
2680                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2681                // list, so we can't proceed if `ORDER BY` has introduced any
2682                // columns for arbitrary expressions. This matches PostgreSQL.
2683                if !try_push_projection_order_by(
2684                    &mut relation_expr,
2685                    &mut project_key,
2686                    &mut order_by,
2687                ) {
2688                    sql_bail!(
2689                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2690                    );
2691                }
2692                assert!(map_exprs.is_empty());
2693                relation_expr = relation_expr.distinct();
2694            }
2695            Some(Distinct::On(exprs)) => {
2696                let ecx = &ExprContext {
2697                    qcx,
2698                    name: "DISTINCT ON clause",
2699                    scope: &group_scope,
2700                    relation_type: &qcx.relation_type(&relation_expr),
2701                    allow_aggregates: true,
2702                    allow_subqueries: true,
2703                    allow_parameters: true,
2704                    allow_windows: true,
2705                };
2706
2707                let mut distinct_exprs = vec![];
2708                for expr in &exprs {
2709                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2710                    distinct_exprs.push(expr);
2711                }
2712
2713                let mut distinct_key = vec![];
2714
2715                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2716                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2717                // expressions, though the order of `DISTINCT ON` expressions
2718                // does not matter. This matches PostgreSQL and leaves the door
2719                // open to a future optimization where the `DISTINCT ON` and
2720                // `ORDER BY` operations happen in one pass.
2721                //
2722                // On the bright side, any columns that have already been
2723                // computed by `ORDER BY` can be reused in the distinct key.
2724                let arity = relation_type.arity();
2725                for ord in order_by.iter().take(distinct_exprs.len()) {
2726                    // The unusual construction of `expr` here is to ensure the
2727                    // temporary column expression lives long enough.
2728                    let mut expr = &HirScalarExpr::column(ord.column);
2729                    if ord.column >= arity {
2730                        expr = &map_exprs[ord.column - arity];
2731                    };
2732                    match distinct_exprs.iter().position(move |e| e == expr) {
2733                        None => sql_bail!(
2734                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2735                        ),
2736                        Some(pos) => {
2737                            distinct_exprs.remove(pos);
2738                        }
2739                    }
2740                    distinct_key.push(ord.column);
2741                }
2742
2743                // Add any remaining `DISTINCT ON` expressions to the key.
2744                for expr in distinct_exprs {
2745                    // If the expression is a reference to an existing column,
2746                    // do not introduce a new column to support it.
2747                    let column = match expr {
2748                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2749                        _ => {
2750                            map_exprs.push(expr);
2751                            arity + map_exprs.len() - 1
2752                        }
2753                    };
2754                    distinct_key.push(column);
2755                }
2756
2757                // `DISTINCT ON` is semantically a TopK with limit 1. The
2758                // columns in `ORDER BY` that are not part of the distinct key,
2759                // if there are any, determine the ordering within each group,
2760                // per PostgreSQL semantics.
2761                let distinct_len = distinct_key.len();
2762                relation_expr = HirRelationExpr::top_k(
2763                    relation_expr.map(map_exprs),
2764                    distinct_key,
2765                    order_by.iter().skip(distinct_len).cloned().collect(),
2766                    Some(HirScalarExpr::literal(
2767                        Datum::Int64(1),
2768                        SqlScalarType::Int64,
2769                    )),
2770                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2771                    group_size_hints.distinct_on_input_group_size,
2772                );
2773            }
2774        }
2775
2776        order_by
2777    };
2778
2779    // Construct a clean scope to expose outwards, where all of the state that
2780    // accumulated in the scope during planning of this SELECT is erased. The
2781    // clean scope has at most one name for each column, and the names are not
2782    // associated with any table.
2783    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2784
2785    Ok(SelectPlan {
2786        expr: relation_expr,
2787        scope,
2788        order_by,
2789        project: project_key,
2790    })
2791}
2792
2793fn plan_scalar_table_funcs(
2794    qcx: &QueryContext,
2795    table_funcs: BTreeMap<Function<Aug>, String>,
2796    table_func_names: &mut BTreeMap<String, Ident>,
2797    relation_expr: &HirRelationExpr,
2798    from_scope: &Scope,
2799) -> Result<(HirRelationExpr, Scope), PlanError> {
2800    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2801
2802    for (table_func, id) in table_funcs.iter() {
2803        table_func_names.insert(
2804            id.clone(),
2805            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2806            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2807        );
2808    }
2809    // If there's only a single table function, we can skip generating
2810    // ordinality columns.
2811    if table_funcs.len() == 1 {
2812        let (table_func, id) = table_funcs.iter().next().unwrap();
2813        let (expr, mut scope) =
2814            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2815
2816        // A single table-function might return several columns as a record
2817        let num_cols = scope.len();
2818        for i in 0..scope.len() {
2819            scope.items[i].table_name = Some(PartialItemName {
2820                database: None,
2821                schema: None,
2822                item: id.clone(),
2823            });
2824            scope.items[i].from_single_column_function = num_cols == 1;
2825            scope.items[i].allow_unqualified_references = false;
2826        }
2827        return Ok((expr, scope));
2828    }
2829    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2830    let (expr, mut scope, num_cols) =
2831        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2832
2833    // Munge the scope so table names match with the generated ids.
2834    let mut i = 0;
2835    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2836        for _ in 0..num_cols {
2837            scope.items[i].table_name = Some(PartialItemName {
2838                database: None,
2839                schema: None,
2840                item: id.clone(),
2841            });
2842            scope.items[i].from_single_column_function = num_cols == 1;
2843            scope.items[i].allow_unqualified_references = false;
2844            i += 1;
2845        }
2846        // Ordinality column. This doubles as the
2847        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2848        // because it only needs to be NULL or not.
2849        scope.items[i].table_name = Some(PartialItemName {
2850            database: None,
2851            schema: None,
2852            item: id.clone(),
2853        });
2854        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2855        scope.items[i].allow_unqualified_references = false;
2856        i += 1;
2857    }
2858    // Coalesced ordinality column.
2859    scope.items[i].allow_unqualified_references = false;
2860    Ok((expr, scope))
2861}
2862
2863/// Plans an expression in a `GROUP BY` clause.
2864///
2865/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2866/// names/expressions defined in the `SELECT` clause. These special cases are
2867/// handled by this function; see comments within the implementation for
2868/// details.
2869fn plan_group_by_expr<'a>(
2870    ecx: &ExprContext,
2871    group_expr: &'a Expr<Aug>,
2872    projection: &'a [(ExpandedSelectItem, ColumnName)],
2873) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2874    let plan_projection = |column: usize| match &projection[column].0 {
2875        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2876        ExpandedSelectItem::Expr(expr) => {
2877            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2878        }
2879    };
2880
2881    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2882    // a special case that means to use the ith item in the SELECT clause.
2883    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2884        return plan_projection(column);
2885    }
2886
2887    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2888    // The `foo` can refer to *either* an input column or an output column. If
2889    // both exist, the input column is preferred.
2890    match group_expr {
2891        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2892            Err(PlanError::UnknownColumn {
2893                table: None,
2894                column,
2895                similar,
2896            }) => {
2897                // The expression was a simple identifier that did not match an
2898                // input column. See if it matches an output column.
2899                let mut iter = projection.iter().map(|(_expr, name)| name);
2900                if let Some(i) = iter.position(|n| *n == column) {
2901                    if iter.any(|n| *n == column) {
2902                        Err(PlanError::AmbiguousColumn(column))
2903                    } else {
2904                        plan_projection(i)
2905                    }
2906                } else {
2907                    // The name didn't match an output column either. Return the
2908                    // "unknown column" error.
2909                    Err(PlanError::UnknownColumn {
2910                        table: None,
2911                        column,
2912                        similar,
2913                    })
2914                }
2915            }
2916            res => Ok((Some(group_expr), res?)),
2917        },
2918        _ => Ok((
2919            Some(group_expr),
2920            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2921        )),
2922    }
2923}
2924
2925/// Plans a slice of `ORDER BY` expressions.
2926///
2927/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2928/// parameter.
2929///
2930/// Returns the determined column orderings and a list of scalar expressions
2931/// that must be mapped onto the underlying relation expression.
2932pub(crate) fn plan_order_by_exprs(
2933    ecx: &ExprContext,
2934    order_by_exprs: &[OrderByExpr<Aug>],
2935    output_columns: &[(usize, &ColumnName)],
2936) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2937    let mut order_by = vec![];
2938    let mut map_exprs = vec![];
2939    for obe in order_by_exprs {
2940        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2941        // If the expression is a reference to an existing column,
2942        // do not introduce a new column to support it.
2943        let column = match expr {
2944            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2945            _ => {
2946                map_exprs.push(expr);
2947                ecx.relation_type.arity() + map_exprs.len() - 1
2948            }
2949        };
2950        order_by.push(resolve_desc_and_nulls_last(obe, column));
2951    }
2952    Ok((order_by, map_exprs))
2953}
2954
2955/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2956///
2957/// The `output_columns` parameter describes, in order, the physical index and
2958/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2959/// corresponds to a `SELECT` list with a single entry named "a" that can be
2960/// found at index 3 in the underlying relation expression.
2961///
2962/// There are three cases to handle.
2963///
2964///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2965///       reference to the specified output column.
2966///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2967///       an output column, if it exists; otherwise it is a reference to an
2968///       input column.
2969///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2970///       arbitrary expressions exclusively refer to input columns, never output
2971///       columns.
2972fn plan_order_by_or_distinct_expr(
2973    ecx: &ExprContext,
2974    expr: &Expr<Aug>,
2975    output_columns: &[(usize, &ColumnName)],
2976) -> Result<HirScalarExpr, PlanError> {
2977    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2978        return Ok(HirScalarExpr::column(output_columns[i].0));
2979    }
2980
2981    if let Expr::Identifier(names) = expr {
2982        if let [name] = &names[..] {
2983            let name = normalize::column_name(name.clone());
2984            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2985            if let Some((i, _)) = iter.next() {
2986                match iter.next() {
2987                    // Per SQL92, names are not considered ambiguous if they
2988                    // refer to identical target list expressions, as in
2989                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2990                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2991                    _ => return Ok(HirScalarExpr::column(*i)),
2992                }
2993            }
2994        }
2995    }
2996
2997    plan_expr(ecx, expr)?.type_as_any(ecx)
2998}
2999
3000fn plan_table_with_joins(
3001    qcx: &QueryContext,
3002    table_with_joins: &TableWithJoins<Aug>,
3003) -> Result<(HirRelationExpr, Scope), PlanError> {
3004    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
3005    for join in &table_with_joins.joins {
3006        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
3007        expr = new_expr;
3008        scope = new_scope;
3009    }
3010    Ok((expr, scope))
3011}
3012
3013fn plan_table_factor(
3014    qcx: &QueryContext,
3015    table_factor: &TableFactor<Aug>,
3016) -> Result<(HirRelationExpr, Scope), PlanError> {
3017    match table_factor {
3018        TableFactor::Table { name, alias } => {
3019            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3020            let scope = plan_table_alias(scope, alias.as_ref())?;
3021            Ok((expr, scope))
3022        }
3023
3024        TableFactor::Function {
3025            function,
3026            alias,
3027            with_ordinality,
3028        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3029
3030        TableFactor::RowsFrom {
3031            functions,
3032            alias,
3033            with_ordinality,
3034        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3035
3036        TableFactor::Derived {
3037            lateral,
3038            subquery,
3039            alias,
3040        } => {
3041            let mut qcx = (*qcx).clone();
3042            if !lateral {
3043                // Since this derived table was not marked as `LATERAL`,
3044                // make elements in outer scopes invisible until we reach the
3045                // next lateral barrier.
3046                for scope in &mut qcx.outer_scopes {
3047                    if scope.lateral_barrier {
3048                        break;
3049                    }
3050                    scope.items.clear();
3051                }
3052            }
3053            qcx.outer_scopes[0].lateral_barrier = true;
3054            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3055            let scope = plan_table_alias(scope, alias.as_ref())?;
3056            Ok((expr, scope))
3057        }
3058
3059        TableFactor::NestedJoin { join, alias } => {
3060            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3061            let scope = plan_table_alias(scope, alias.as_ref())?;
3062            Ok((expr, scope))
3063        }
3064    }
3065}
3066
3067/// Plans a `ROWS FROM` expression.
3068///
3069/// `ROWS FROM` concatenates table functions into a single table, filling in
3070/// `NULL`s in places where one table function has fewer rows than another. We
3071/// can achieve this by augmenting each table function with a row number, doing
3072/// a `FULL JOIN` between each table function on the row number and eventually
3073/// projecting away the row number columns. Concretely, the following query
3074/// using `ROWS FROM`
3075///
3076/// ```sql
3077/// SELECT
3078///     *
3079/// FROM
3080///     ROWS FROM (
3081///         generate_series(1, 2),
3082///         information_schema._pg_expandarray(ARRAY[9]),
3083///         generate_series(3, 6)
3084///     );
3085/// ```
3086///
3087/// is equivalent to the following query that does not use `ROWS FROM`:
3088///
3089/// ```sql
3090/// SELECT
3091///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3092/// FROM
3093///     generate_series(1, 2) WITH ORDINALITY AS gs1
3094///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3095///         ON gs1.ordinality = expand.ordinality
3096///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3097///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3098/// ```
3099///
3100/// Note the call to `coalesce` in the last join condition, which ensures that
3101/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3102///
3103/// This function creates a HirRelationExpr that follows the structure of the
3104/// latter query.
3105///
3106/// `with_ordinality` can be used to have the output expression contain a
3107/// single coalesced ordinality column at the end of the entire expression.
3108fn plan_rows_from(
3109    qcx: &QueryContext,
3110    functions: &[Function<Aug>],
3111    alias: Option<&TableAlias>,
3112    with_ordinality: bool,
3113) -> Result<(HirRelationExpr, Scope), PlanError> {
3114    // If there's only a single table function, planning proceeds as if `ROWS
3115    // FROM` hadn't been written at all.
3116    if let [function] = functions {
3117        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3118    }
3119
3120    // Per PostgreSQL, all scope items take the name of the first function
3121    // (unless aliased).
3122    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3123    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3124        qcx,
3125        functions,
3126        Some(functions[0].name.full_item_name().clone()),
3127    )?;
3128
3129    // Columns tracks the set of columns we will keep in the projection.
3130    let mut columns = Vec::new();
3131    let mut offset = 0;
3132    // Retain table function's non-ordinality columns.
3133    for (idx, cols) in num_cols.into_iter().enumerate() {
3134        for i in 0..cols {
3135            columns.push(offset + i);
3136        }
3137        offset += cols + 1;
3138
3139        // Remove the ordinality column from the scope, accounting for previous scope
3140        // changes from this loop.
3141        scope.items.remove(offset - idx - 1);
3142    }
3143
3144    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3145    // column. Otherwise remove it from the scope.
3146    if with_ordinality {
3147        columns.push(offset);
3148    } else {
3149        scope.items.pop();
3150    }
3151
3152    let expr = expr.project(columns);
3153
3154    let scope = plan_table_alias(scope, alias)?;
3155    Ok((expr, scope))
3156}
3157
3158/// Plans an expression coalescing multiple table functions. Each table
3159/// function is followed by its row ordinality. The entire expression is
3160/// followed by the coalesced row ordinality.
3161///
3162/// The returned Scope will set all item's table_name's to the `table_name`
3163/// parameter if it is `Some`. If `None`, they will be the name of each table
3164/// function.
3165///
3166/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3167/// each table function.
3168///
3169/// For example, with table functions tf1 returning 1 column (a) and tf2
3170/// returning 2 columns (b, c), this function will return an expr 6 columns:
3171///
3172/// - tf1.a
3173/// - tf1.ordinality
3174/// - tf2.b
3175/// - tf2.c
3176/// - tf2.ordinality
3177/// - coalesced_ordinality
3178///
3179/// And a `Vec<usize>` of `[1, 2]`.
3180fn plan_rows_from_internal<'a>(
3181    qcx: &QueryContext,
3182    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3183    table_name: Option<FullItemName>,
3184) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3185    let mut functions = functions.into_iter();
3186    let mut num_cols = Vec::new();
3187
3188    // Join together each of the table functions in turn. The last column is
3189    // always the column to join against and is maintained to be the coalescence
3190    // of the row number column for all prior functions.
3191    let (mut left_expr, mut left_scope) =
3192        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3193    num_cols.push(left_scope.len() - 1);
3194    // Create the coalesced ordinality column.
3195    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3196    left_scope
3197        .items
3198        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3199
3200    for function in functions {
3201        // The right hand side of a join must be planned in a new scope.
3202        let qcx = qcx.empty_derived_context();
3203        let (right_expr, mut right_scope) =
3204            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3205        num_cols.push(right_scope.len() - 1);
3206        let left_col = left_scope.len() - 1;
3207        let right_col = left_scope.len() + right_scope.len() - 1;
3208        let on = HirScalarExpr::call_binary(
3209            HirScalarExpr::column(left_col),
3210            HirScalarExpr::column(right_col),
3211            expr_func::Eq,
3212        );
3213        left_expr = left_expr
3214            .join(right_expr, on, JoinKind::FullOuter)
3215            .map(vec![HirScalarExpr::call_variadic(
3216                Coalesce,
3217                vec![
3218                    HirScalarExpr::column(left_col),
3219                    HirScalarExpr::column(right_col),
3220                ],
3221            )]);
3222
3223        // Project off the previous iteration's coalesced column, but keep both of this
3224        // iteration's ordinality columns.
3225        left_expr = left_expr.project(
3226            (0..left_col) // non-coalesced ordinality columns from left function
3227                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3228                .collect(),
3229        );
3230        // Move the coalesced ordinality column.
3231        right_scope.items.push(left_scope.items.pop().unwrap());
3232
3233        left_scope.items.extend(right_scope.items);
3234    }
3235
3236    Ok((left_expr, left_scope, num_cols))
3237}
3238
3239/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3240/// FROM` clause that contains other table functions. Special aliasing rules
3241/// apply.
3242fn plan_solitary_table_function(
3243    qcx: &QueryContext,
3244    function: &Function<Aug>,
3245    alias: Option<&TableAlias>,
3246    with_ordinality: bool,
3247) -> Result<(HirRelationExpr, Scope), PlanError> {
3248    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3249
3250    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3251    if single_column_function {
3252        let item = &mut scope.items[0];
3253
3254        // Mark that the function only produced a single column. This impacts
3255        // whole-row references.
3256        item.from_single_column_function = true;
3257
3258        // Strange special case for solitary table functions that output one
3259        // column whose name matches the name of the table function. If a table
3260        // alias is provided, the column name is changed to the table alias's
3261        // name. Concretely, the following query returns a column named `x`
3262        // rather than a column named `generate_series`:
3263        //
3264        //     SELECT * FROM generate_series(1, 5) AS x
3265        //
3266        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3267        // since its output column is explicitly named `value`, not
3268        // `jsonb_array_elements`.
3269        //
3270        // Note also that we may (correctly) change the column name again when
3271        // we plan the table alias below if the `alias.columns` is non-empty.
3272        if let Some(alias) = alias {
3273            if let ScopeItem {
3274                table_name: Some(table_name),
3275                column_name,
3276                ..
3277            } = item
3278            {
3279                if table_name.item.as_str() == column_name.as_str() {
3280                    *column_name = normalize::column_name(alias.name.clone());
3281                }
3282            }
3283        }
3284    }
3285
3286    let scope = plan_table_alias(scope, alias)?;
3287    Ok((expr, scope))
3288}
3289
3290/// Plans a table function.
3291///
3292/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3293/// instead to get the appropriate aliasing behavior.
3294fn plan_table_function_internal(
3295    qcx: &QueryContext,
3296    Function {
3297        name,
3298        args,
3299        filter,
3300        over,
3301        distinct,
3302    }: &Function<Aug>,
3303    with_ordinality: bool,
3304    table_name: Option<FullItemName>,
3305) -> Result<(HirRelationExpr, Scope), PlanError> {
3306    assert_none!(filter, "cannot parse table function with FILTER");
3307    assert_none!(over, "cannot parse table function with OVER");
3308    assert!(!*distinct, "cannot parse table function with DISTINCT");
3309
3310    let ecx = &ExprContext {
3311        qcx,
3312        name: "table function arguments",
3313        scope: &Scope::empty(),
3314        relation_type: &SqlRelationType::empty(),
3315        allow_aggregates: false,
3316        allow_subqueries: true,
3317        allow_parameters: true,
3318        allow_windows: false,
3319    };
3320
3321    let scalar_args = match args {
3322        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3323        FunctionArgs::Args { args, order_by } => {
3324            if !order_by.is_empty() {
3325                sql_bail!(
3326                    "ORDER BY specified, but {} is not an aggregate function",
3327                    name
3328                );
3329            }
3330            plan_exprs(ecx, args)?
3331        }
3332    };
3333
3334    let table_name = match table_name {
3335        Some(table_name) => table_name.item,
3336        None => name.full_item_name().item.clone(),
3337    };
3338
3339    let scope_name = Some(PartialItemName {
3340        database: None,
3341        schema: None,
3342        item: table_name,
3343    });
3344
3345    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3346        Func::Table(impls) => {
3347            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3348            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3349            let expr = match tf.imp {
3350                TableFuncImpl::CallTable { mut func, exprs } => {
3351                    if with_ordinality {
3352                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3353                            PlanError::Unsupported {
3354                                feature: format!("WITH ORDINALITY on {}", func),
3355                                discussion_no: None,
3356                            },
3357                        )?;
3358                    }
3359                    HirRelationExpr::CallTable { func, exprs }
3360                }
3361                TableFuncImpl::Expr(expr) => {
3362                    if !with_ordinality {
3363                        expr
3364                    } else {
3365                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3366                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3367                        // back to the legacy implementation or error out the query.
3368                        if qcx
3369                            .scx
3370                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3371                        {
3372                            // Note that this can give an incorrect ordering, and also has an extreme
3373                            // performance problem in some cases. See the doc comment of
3374                            // `TableFuncImpl`.
3375                            tracing::error!(
3376                                %name,
3377                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3378                            );
3379                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3380                                func: WindowExprType::Scalar(ScalarWindowExpr {
3381                                    func: ScalarWindowFunc::RowNumber,
3382                                    order_by: vec![],
3383                                }),
3384                                partition_by: vec![],
3385                                order_by: vec![],
3386                            })])
3387                        } else {
3388                            bail_unsupported!(format!(
3389                                "WITH ORDINALITY or ROWS FROM with {}",
3390                                name
3391                            ));
3392                        }
3393                    }
3394                }
3395            };
3396            (expr, scope)
3397        }
3398        Func::Scalar(impls) => {
3399            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3400            let output = expr.typ(
3401                &qcx.outer_relation_types,
3402                &SqlRelationType::new(vec![]),
3403                &qcx.scx.param_types.borrow(),
3404            );
3405
3406            let relation = SqlRelationType::new(vec![output]);
3407
3408            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3409            let column_name = normalize::column_name(function_ident);
3410            let name = column_name.to_string();
3411
3412            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3413
3414            let mut func = TableFunc::TabletizedScalar { relation, name };
3415            if with_ordinality {
3416                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3417                    feature: format!("WITH ORDINALITY on {}", func),
3418                    discussion_no: None,
3419                })?;
3420            }
3421            (
3422                HirRelationExpr::CallTable {
3423                    func,
3424                    exprs: vec![expr],
3425                },
3426                scope,
3427            )
3428        }
3429        o => sql_bail!(
3430            "{} functions are not supported in functions in FROM",
3431            o.class()
3432        ),
3433    };
3434
3435    if with_ordinality {
3436        scope
3437            .items
3438            .push(ScopeItem::from_name(scope_name, "ordinality"));
3439    }
3440
3441    Ok((expr, scope))
3442}
3443
3444fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3445    if let Some(TableAlias {
3446        name,
3447        columns,
3448        strict,
3449    }) = alias
3450    {
3451        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3452            sql_bail!(
3453                "{} has {} columns available but {} columns specified",
3454                name,
3455                scope.items.len(),
3456                columns.len()
3457            );
3458        }
3459
3460        let table_name = normalize::ident(name.to_owned());
3461        for (i, item) in scope.items.iter_mut().enumerate() {
3462            item.table_name = if item.allow_unqualified_references {
3463                Some(PartialItemName {
3464                    database: None,
3465                    schema: None,
3466                    item: table_name.clone(),
3467                })
3468            } else {
3469                // Columns that prohibit unqualified references are special
3470                // columns from the output of a NATURAL or USING join that can
3471                // only be referenced by their full, pre-join name. Applying an
3472                // alias to the output of that join renders those columns
3473                // inaccessible, which we accomplish here by setting the
3474                // table name to `None`.
3475                //
3476                // Concretely, consider:
3477                //
3478                //      CREATE TABLE t1 (a int);
3479                //      CREATE TABLE t2 (a int);
3480                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3481                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3482                //
3483                // In (1), the join has no alias. The underlying columns from
3484                // either side of the join can be referenced as `t1.a` and
3485                // `t2.a`, respectively, and the unqualified name `a` refers to
3486                // a column whose value is `coalesce(t1.a, t2.a)`.
3487                //
3488                // In (2), the join is aliased as `t`. The columns from either
3489                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3490                // the coalesced column can be named as either `a` or `t.a`.
3491                //
3492                // We previously had a bug [0] that mishandled this subtle
3493                // logic.
3494                //
3495                // NOTE(benesch): We could in theory choose to project away
3496                // those inaccessible columns and drop them from the scope
3497                // entirely, but that would require that this function also
3498                // take and return the `HirRelationExpr` that is being aliased,
3499                // which is a rather large refactor.
3500                //
3501                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3502                None
3503            };
3504            item.column_name = columns
3505                .get(i)
3506                .map(|a| normalize::column_name(a.clone()))
3507                .unwrap_or_else(|| item.column_name.clone());
3508        }
3509    }
3510    Ok(scope)
3511}
3512
3513// `table_func_names` is a mapping from a UUID to the original function
3514// name. The UUIDs are identifiers that have been rewritten from some table
3515// function expression, and this mapping restores the original names.
3516fn invent_column_name(
3517    ecx: &ExprContext,
3518    expr: &Expr<Aug>,
3519    table_func_names: &BTreeMap<String, Ident>,
3520) -> Option<ColumnName> {
3521    // We follow PostgreSQL exactly here, which has some complicated rules
3522    // around "high" and "low" quality names. Low quality names override other
3523    // low quality names but not high quality names.
3524    //
3525    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3526
3527    #[derive(Debug)]
3528    enum NameQuality {
3529        Low,
3530        High,
3531    }
3532
3533    fn invent(
3534        ecx: &ExprContext,
3535        expr: &Expr<Aug>,
3536        table_func_names: &BTreeMap<String, Ident>,
3537    ) -> Option<(ColumnName, NameQuality)> {
3538        match expr {
3539            Expr::Identifier(names) => {
3540                if let [name] = names.as_slice() {
3541                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3542                        return Some((
3543                            normalize::column_name(table_func_name.clone()),
3544                            NameQuality::High,
3545                        ));
3546                    }
3547                }
3548                names
3549                    .last()
3550                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3551            }
3552            Expr::Value(v) => match v {
3553                // Per PostgreSQL, `bool` and `interval` literals take on the name
3554                // of their type, but not other literal types.
3555                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3556                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3557                _ => None,
3558            },
3559            Expr::Function(func) => {
3560                let (schema, item) = match &func.name {
3561                    ResolvedItemName::Item {
3562                        qualifiers,
3563                        full_name,
3564                        ..
3565                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3566                    _ => unreachable!(),
3567                };
3568
3569                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3570                    || schema
3571                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3572                {
3573                    None
3574                } else {
3575                    Some((item.into(), NameQuality::High))
3576                }
3577            }
3578            Expr::HomogenizingFunction { function, .. } => Some((
3579                function.to_string().to_lowercase().into(),
3580                NameQuality::High,
3581            )),
3582            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3583            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3584            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3585            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3586            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3587                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3588                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3589            },
3590            Expr::Case { else_result, .. } => {
3591                match else_result
3592                    .as_ref()
3593                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3594                {
3595                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3596                    _ => Some(("case".into(), NameQuality::Low)),
3597                }
3598            }
3599            Expr::FieldAccess { field, .. } => {
3600                Some((normalize::column_name(field.clone()), NameQuality::High))
3601            }
3602            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3603            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3604            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3605                // A bit silly to have to plan the query here just to get its column
3606                // name, since we throw away the planned expression, but fixing this
3607                // requires a separate semantic analysis phase.
3608                let (_expr, scope) =
3609                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3610                scope
3611                    .items
3612                    .first()
3613                    .map(|name| (name.column_name.clone(), NameQuality::High))
3614            }
3615            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3616            _ => None,
3617        }
3618    }
3619
3620    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3621}
3622
3623#[derive(Debug)]
3624enum ExpandedSelectItem<'a> {
3625    InputOrdinal(usize),
3626    Expr(Cow<'a, Expr<Aug>>),
3627}
3628
3629impl ExpandedSelectItem<'_> {
3630    fn as_expr(&self) -> Option<&Expr<Aug>> {
3631        match self {
3632            ExpandedSelectItem::InputOrdinal(_) => None,
3633            ExpandedSelectItem::Expr(expr) => Some(expr),
3634        }
3635    }
3636}
3637
3638fn expand_select_item<'a>(
3639    ecx: &ExprContext,
3640    s: &'a SelectItem<Aug>,
3641    table_func_names: &BTreeMap<String, Ident>,
3642) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3643    match s {
3644        SelectItem::Expr {
3645            expr: Expr::QualifiedWildcard(table_name),
3646            alias: _,
3647        } => {
3648            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3649            let table_name =
3650                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3651            let out: Vec<_> = ecx
3652                .scope
3653                .items
3654                .iter()
3655                .enumerate()
3656                .filter(|(_i, item)| item.is_from_table(&table_name))
3657                .map(|(i, item)| {
3658                    let name = item.column_name.clone();
3659                    (ExpandedSelectItem::InputOrdinal(i), name)
3660                })
3661                .collect();
3662            if out.is_empty() {
3663                sql_bail!("no table named '{}' in scope", table_name);
3664            }
3665            Ok(out)
3666        }
3667        SelectItem::Expr {
3668            expr: Expr::WildcardAccess(sql_expr),
3669            alias: _,
3670        } => {
3671            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3672            // A bit silly to have to plan the expression here just to get its
3673            // type, since we throw away the planned expression, but fixing this
3674            // requires a separate semantic analysis phase. Luckily this is an
3675            // uncommon operation and the PostgreSQL docs have a warning that
3676            // this operation is slow in Postgres too.
3677            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3678            let fields = match ecx.scalar_type(&expr) {
3679                SqlScalarType::Record { fields, .. } => fields,
3680                ty => sql_bail!(
3681                    "type {} is not composite",
3682                    ecx.humanize_sql_scalar_type(&ty, false)
3683                ),
3684            };
3685            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3686            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3687                if let [name] = ident.as_slice() {
3688                    if let Ok(items) = ecx.scope.items_from_table(
3689                        &[],
3690                        &PartialItemName {
3691                            database: None,
3692                            schema: None,
3693                            item: name.as_str().to_string(),
3694                        },
3695                    ) {
3696                        for (_, item) in items {
3697                            if item
3698                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3699                            {
3700                                skip_cols.insert(item.column_name.clone());
3701                            }
3702                        }
3703                    }
3704                }
3705            }
3706            let items = fields
3707                .iter()
3708                .filter_map(|(name, _ty)| {
3709                    if skip_cols.contains(name) {
3710                        None
3711                    } else {
3712                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3713                            expr: sql_expr.clone(),
3714                            field: name.clone().into(),
3715                        }));
3716                        Some((item, name.clone()))
3717                    }
3718                })
3719                .collect();
3720            Ok(items)
3721        }
3722        SelectItem::Wildcard => {
3723            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3724            let items: Vec<_> = ecx
3725                .scope
3726                .items
3727                .iter()
3728                .enumerate()
3729                .filter(|(_i, item)| item.allow_unqualified_references)
3730                .map(|(i, item)| {
3731                    let name = item.column_name.clone();
3732                    (ExpandedSelectItem::InputOrdinal(i), name)
3733                })
3734                .collect();
3735
3736            Ok(items)
3737        }
3738        SelectItem::Expr { expr, alias } => {
3739            let name = alias
3740                .clone()
3741                .map(normalize::column_name)
3742                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3743                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3744            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3745        }
3746    }
3747}
3748
3749fn plan_join(
3750    left_qcx: &QueryContext,
3751    left: HirRelationExpr,
3752    left_scope: Scope,
3753    join: &Join<Aug>,
3754) -> Result<(HirRelationExpr, Scope), PlanError> {
3755    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3756    let (kind, constraint) = match &join.join_operator {
3757        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3758        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3759        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3760        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3761        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3762    };
3763
3764    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3765    if !kind.can_be_correlated() {
3766        for item in &mut right_qcx.outer_scopes[0].items {
3767            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3768            // these items from scope. These items need to *exist* because they
3769            // might shadow variables in outer scopes that would otherwise be
3770            // valid to reference, but accessing them needs to produce an error.
3771            item.error_if_referenced =
3772                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3773                    table: table.cloned(),
3774                    column: column.clone(),
3775                });
3776        }
3777    }
3778    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3779
3780    let (expr, scope) = match constraint {
3781        JoinConstraint::On(expr) => {
3782            let product_scope = left_scope.product(right_scope)?;
3783            let ecx = &ExprContext {
3784                qcx: left_qcx,
3785                name: "ON clause",
3786                scope: &product_scope,
3787                relation_type: &SqlRelationType::new(
3788                    left_qcx
3789                        .relation_type(&left)
3790                        .column_types
3791                        .into_iter()
3792                        .chain(right_qcx.relation_type(&right).column_types)
3793                        .collect(),
3794                ),
3795                allow_aggregates: false,
3796                allow_subqueries: true,
3797                allow_parameters: true,
3798                allow_windows: false,
3799            };
3800            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3801            let joined = left.join(right, on, kind);
3802            (joined, product_scope)
3803        }
3804        JoinConstraint::Using { columns, alias } => {
3805            let column_names = columns
3806                .iter()
3807                .map(|ident| normalize::column_name(ident.clone()))
3808                .collect::<Vec<_>>();
3809
3810            plan_using_constraint(
3811                &column_names,
3812                left_qcx,
3813                left,
3814                left_scope,
3815                &right_qcx,
3816                right,
3817                right_scope,
3818                kind,
3819                alias.as_ref(),
3820            )?
3821        }
3822        JoinConstraint::Natural => {
3823            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3824            // have the same scx. However, it doesn't hurt to be safe.
3825            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3826            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3827            let left_column_names = left_scope.column_names();
3828            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3829            let column_names: Vec<_> = left_column_names
3830                .filter(|col| right_column_names.contains(col))
3831                .cloned()
3832                .collect();
3833            plan_using_constraint(
3834                &column_names,
3835                left_qcx,
3836                left,
3837                left_scope,
3838                &right_qcx,
3839                right,
3840                right_scope,
3841                kind,
3842                None,
3843            )?
3844        }
3845    };
3846    Ok((expr, scope))
3847}
3848
3849// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3850#[allow(clippy::too_many_arguments)]
3851fn plan_using_constraint(
3852    column_names: &[ColumnName],
3853    left_qcx: &QueryContext,
3854    left: HirRelationExpr,
3855    left_scope: Scope,
3856    right_qcx: &QueryContext,
3857    right: HirRelationExpr,
3858    right_scope: Scope,
3859    kind: JoinKind,
3860    alias: Option<&Ident>,
3861) -> Result<(HirRelationExpr, Scope), PlanError> {
3862    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3863
3864    // Cargo culting PG here; no discernable reason this must fail, but PG does
3865    // so we do, as well.
3866    let mut unique_column_names = BTreeSet::new();
3867    for c in column_names {
3868        if !unique_column_names.insert(c) {
3869            return Err(PlanError::Unsupported {
3870                feature: format!(
3871                    "column name {} appears more than once in USING clause",
3872                    c.quoted()
3873                ),
3874                discussion_no: None,
3875            });
3876        }
3877    }
3878
3879    let alias_item_name = alias.map(|alias| PartialItemName {
3880        database: None,
3881        schema: None,
3882        item: alias.clone().to_string(),
3883    });
3884
3885    if let Some(alias_item_name) = &alias_item_name {
3886        for partial_item_name in both_scope.table_names() {
3887            if partial_item_name.matches(alias_item_name) {
3888                sql_bail!(
3889                    "table name \"{}\" specified more than once",
3890                    alias_item_name
3891                )
3892            }
3893        }
3894    }
3895
3896    let ecx = &ExprContext {
3897        qcx: right_qcx,
3898        name: "USING clause",
3899        scope: &both_scope,
3900        relation_type: &SqlRelationType::new(
3901            left_qcx
3902                .relation_type(&left)
3903                .column_types
3904                .into_iter()
3905                .chain(right_qcx.relation_type(&right).column_types)
3906                .collect(),
3907        ),
3908        allow_aggregates: false,
3909        allow_subqueries: false,
3910        allow_parameters: false,
3911        allow_windows: false,
3912    };
3913
3914    let mut join_exprs = vec![];
3915    let mut map_exprs = vec![];
3916    let mut new_items = vec![];
3917    let mut join_cols = vec![];
3918    let mut hidden_cols = vec![];
3919
3920    for column_name in column_names {
3921        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3922        let (lhs, lhs_name) = left_scope.resolve_using_column(
3923            column_name,
3924            JoinSide::Left,
3925            &mut left_qcx.name_manager.borrow_mut(),
3926        )?;
3927        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3928            column_name,
3929            JoinSide::Right,
3930            &mut right_qcx.name_manager.borrow_mut(),
3931        )?;
3932
3933        // Adjust the RHS reference to its post-join location.
3934        rhs.column += left_scope.len();
3935
3936        // Join keys must be resolved to same type.
3937        let mut exprs = coerce_homogeneous_exprs(
3938            &ecx.with_name(&format!(
3939                "NATURAL/USING join column {}",
3940                column_name.quoted()
3941            )),
3942            vec![
3943                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3944                    lhs,
3945                    Arc::clone(&lhs_name),
3946                )),
3947                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3948                    rhs,
3949                    Arc::clone(&rhs_name),
3950                )),
3951            ],
3952            None,
3953        )?;
3954        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3955
3956        match kind {
3957            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3958                join_cols.push(lhs.column);
3959                hidden_cols.push(rhs.column);
3960            }
3961            JoinKind::RightOuter => {
3962                join_cols.push(rhs.column);
3963                hidden_cols.push(lhs.column);
3964            }
3965            JoinKind::FullOuter => {
3966                // Create a new column that will be the coalesced value of left
3967                // and right.
3968                join_cols.push(both_scope.items.len() + map_exprs.len());
3969                hidden_cols.push(lhs.column);
3970                hidden_cols.push(rhs.column);
3971                map_exprs.push(HirScalarExpr::call_variadic(
3972                    Coalesce,
3973                    vec![expr1.clone(), expr2.clone()],
3974                ));
3975                new_items.push(ScopeItem::from_column_name(column_name));
3976            }
3977        }
3978
3979        // If a `join_using_alias` is present, add a new scope item that accepts
3980        // only table-qualified references for each specified join column.
3981        // Unlike regular table aliases, a `join_using_alias` should not hide the
3982        // names of the joined relations.
3983        if alias_item_name.is_some() {
3984            let new_item_col = both_scope.items.len() + new_items.len();
3985            join_cols.push(new_item_col);
3986            hidden_cols.push(new_item_col);
3987
3988            new_items.push(ScopeItem::from_name(
3989                alias_item_name.clone(),
3990                column_name.clone().to_string(),
3991            ));
3992
3993            // Should be safe to use either `lhs` or `rhs` here since the column
3994            // is available in both scopes and must have the same type of the new item.
3995            // We (arbitrarily) choose the left name.
3996            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3997        }
3998
3999        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4000    }
4001    both_scope.items.extend(new_items);
4002
4003    // The columns from the secondary side of the join remain accessible by
4004    // their table-qualified name, but not by their column name alone. They are
4005    // also excluded from `SELECT *`.
4006    for c in hidden_cols {
4007        both_scope.items[c].allow_unqualified_references = false;
4008    }
4009
4010    // Reproject all returned elements to the front of the list.
4011    let project_key = join_cols
4012        .into_iter()
4013        .chain(0..both_scope.items.len())
4014        .unique()
4015        .collect::<Vec<_>>();
4016
4017    both_scope = both_scope.project(&project_key);
4018
4019    let on = HirScalarExpr::variadic_and(join_exprs);
4020
4021    let both = left
4022        .join(right, on, kind)
4023        .map(map_exprs)
4024        .project(project_key);
4025    Ok((both, both_scope))
4026}
4027
4028pub fn plan_expr<'a>(
4029    ecx: &'a ExprContext,
4030    e: &Expr<Aug>,
4031) -> Result<CoercibleScalarExpr, PlanError> {
4032    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4033}
4034
4035fn plan_expr_inner<'a>(
4036    ecx: &'a ExprContext,
4037    e: &Expr<Aug>,
4038) -> Result<CoercibleScalarExpr, PlanError> {
4039    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4040        // We've already calculated this expression.
4041        return Ok(HirScalarExpr::named_column(
4042            i,
4043            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4044        )
4045        .into());
4046    }
4047
4048    match e {
4049        // Names.
4050        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4051            Ok(plan_identifier(ecx, names)?.into())
4052        }
4053
4054        // Literals.
4055        Expr::Value(val) => plan_literal(val),
4056        Expr::Parameter(n) => plan_parameter(ecx, *n),
4057        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4058        Expr::List(exprs) => plan_list(ecx, exprs, None),
4059        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4060        Expr::Row { exprs } => plan_row(ecx, exprs),
4061
4062        // Generalized functions, operators, and casts.
4063        Expr::Op { op, expr1, expr2 } => {
4064            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4065        }
4066        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4067        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4068
4069        // Special functions and operators.
4070        Expr::Not { expr } => plan_not(ecx, expr),
4071        Expr::And { left, right } => plan_and(ecx, left, right),
4072        Expr::Or { left, right } => plan_or(ecx, left, right),
4073        Expr::IsExpr {
4074            expr,
4075            construct,
4076            negated,
4077        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4078        Expr::Case {
4079            operand,
4080            conditions,
4081            results,
4082            else_result,
4083        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4084        Expr::HomogenizingFunction { function, exprs } => {
4085            plan_homogenizing_function(ecx, function, exprs)
4086        }
4087        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4088            ecx,
4089            &None,
4090            &[l_expr.clone().equals(*r_expr.clone())],
4091            &[Expr::null()],
4092            &Some(Box::new(*l_expr.clone())),
4093        )?
4094        .into()),
4095        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4096        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4097        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4098        Expr::Like {
4099            expr,
4100            pattern,
4101            escape,
4102            case_insensitive,
4103            negated,
4104        } => Ok(plan_like(
4105            ecx,
4106            expr,
4107            pattern,
4108            escape.as_deref(),
4109            *case_insensitive,
4110            *negated,
4111        )?
4112        .into()),
4113
4114        Expr::InList {
4115            expr,
4116            list,
4117            negated,
4118        } => plan_in_list(ecx, expr, list, negated),
4119
4120        // Subqueries.
4121        Expr::Exists(query) => plan_exists(ecx, query),
4122        Expr::Subquery(query) => plan_subquery(ecx, query),
4123        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4124        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4125        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4126        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4127        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4128        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4129        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4130        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4131        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4132        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4133        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4134    }
4135}
4136
4137fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4138    if !ecx.allow_parameters {
4139        // It might be clearer to return an error like "cannot use parameter
4140        // here", but this is how PostgreSQL does it, and so for now we follow
4141        // PostgreSQL.
4142        return Err(PlanError::UnknownParameter(n));
4143    }
4144    if n == 0 || n > 65536 {
4145        return Err(PlanError::UnknownParameter(n));
4146    }
4147    if ecx.param_types().borrow().contains_key(&n) {
4148        Ok(HirScalarExpr::parameter(n).into())
4149    } else {
4150        Ok(CoercibleScalarExpr::Parameter(n))
4151    }
4152}
4153
4154fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4155    let mut out = vec![];
4156    for e in exprs {
4157        out.push(plan_expr(ecx, e)?);
4158    }
4159    Ok(CoercibleScalarExpr::LiteralRecord(out))
4160}
4161
4162fn plan_cast(
4163    ecx: &ExprContext,
4164    expr: &Expr<Aug>,
4165    data_type: &ResolvedDataType,
4166) -> Result<CoercibleScalarExpr, PlanError> {
4167    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4168    let expr = match expr {
4169        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4170        // we can pass in the target type as a type hint. This is
4171        // a limited form of the coercion that we do for string literals
4172        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4173        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4174        // PostgreSQL compatibility trouble.
4175        //
4176        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4177        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4178        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4179        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4180        _ => plan_expr(ecx, expr)?,
4181    };
4182    let ecx = &ecx.with_name("CAST");
4183    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4184    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4185    Ok(expr.into())
4186}
4187
4188fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4189    let ecx = ecx.with_name("NOT argument");
4190    Ok(plan_expr(&ecx, expr)?
4191        .type_as(&ecx, &SqlScalarType::Bool)?
4192        .call_unary(UnaryFunc::Not(expr_func::Not))
4193        .into())
4194}
4195
4196fn plan_and(
4197    ecx: &ExprContext,
4198    left: &Expr<Aug>,
4199    right: &Expr<Aug>,
4200) -> Result<CoercibleScalarExpr, PlanError> {
4201    let ecx = ecx.with_name("AND argument");
4202    Ok(HirScalarExpr::variadic_and(vec![
4203        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4204        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4205    ])
4206    .into())
4207}
4208
4209fn plan_or(
4210    ecx: &ExprContext,
4211    left: &Expr<Aug>,
4212    right: &Expr<Aug>,
4213) -> Result<CoercibleScalarExpr, PlanError> {
4214    let ecx = ecx.with_name("OR argument");
4215    Ok(HirScalarExpr::variadic_or(vec![
4216        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4218    ])
4219    .into())
4220}
4221
4222fn plan_in_list(
4223    ecx: &ExprContext,
4224    lhs: &Expr<Aug>,
4225    list: &Vec<Expr<Aug>>,
4226    negated: &bool,
4227) -> Result<CoercibleScalarExpr, PlanError> {
4228    let ecx = ecx.with_name("IN list");
4229    let or = HirScalarExpr::variadic_or(
4230        list.into_iter()
4231            .map(|e| {
4232                let eq = lhs.clone().equals(e.clone());
4233                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4234            })
4235            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4236    );
4237    Ok(if *negated {
4238        or.call_unary(UnaryFunc::Not(expr_func::Not))
4239    } else {
4240        or
4241    }
4242    .into())
4243}
4244
4245fn plan_homogenizing_function(
4246    ecx: &ExprContext,
4247    function: &HomogenizingFunction,
4248    exprs: &[Expr<Aug>],
4249) -> Result<CoercibleScalarExpr, PlanError> {
4250    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4251    let expr = HirScalarExpr::call_variadic(
4252        match function {
4253            HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4254            HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4255            HomogenizingFunction::Least => VariadicFunc::from(Least),
4256        },
4257        coerce_homogeneous_exprs(
4258            &ecx.with_name(&function.to_string().to_lowercase()),
4259            plan_exprs(ecx, exprs)?,
4260            None,
4261        )?,
4262    );
4263    Ok(expr.into())
4264}
4265
4266fn plan_field_access(
4267    ecx: &ExprContext,
4268    expr: &Expr<Aug>,
4269    field: &Ident,
4270) -> Result<CoercibleScalarExpr, PlanError> {
4271    let field = normalize::column_name(field.clone());
4272    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4273    let ty = ecx.scalar_type(&expr);
4274    let i = match &ty {
4275        SqlScalarType::Record { fields, .. } => {
4276            fields.iter().position(|(name, _ty)| *name == field)
4277        }
4278        ty => sql_bail!(
4279            "column notation applied to type {}, which is not a composite type",
4280            ecx.humanize_sql_scalar_type(ty, false)
4281        ),
4282    };
4283    match i {
4284        None => sql_bail!(
4285            "field {} not found in data type {}",
4286            field,
4287            ecx.humanize_sql_scalar_type(&ty, false)
4288        ),
4289        Some(i) => Ok(expr
4290            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4291            .into()),
4292    }
4293}
4294
4295fn plan_subscript(
4296    ecx: &ExprContext,
4297    expr: &Expr<Aug>,
4298    positions: &[SubscriptPosition<Aug>],
4299) -> Result<CoercibleScalarExpr, PlanError> {
4300    assert!(
4301        !positions.is_empty(),
4302        "subscript expression must contain at least one position"
4303    );
4304
4305    let ecx = &ecx.with_name("subscripting");
4306    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4307    let ty = ecx.scalar_type(&expr);
4308    match &ty {
4309        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4310            ecx,
4311            expr,
4312            positions,
4313            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4314            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4315            // track in its backing data).
4316            if ty == SqlScalarType::Int2Vector {
4317                1
4318            } else {
4319                0
4320            },
4321        ),
4322        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4323        SqlScalarType::List { element_type, .. } => {
4324            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4325            let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4326            let n_layers = ty.unwrap_list_n_layers();
4327            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4328        }
4329        ty => sql_bail!(
4330            "cannot subscript type {}",
4331            ecx.humanize_sql_scalar_type(ty, false)
4332        ),
4333    }
4334}
4335
4336// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4337// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4338// any were slices (i.e. included colon).
4339fn extract_scalar_subscript_from_positions<'a>(
4340    positions: &'a [SubscriptPosition<Aug>],
4341    expr_type_name: &str,
4342) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4343    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4344    for p in positions {
4345        if p.explicit_slice {
4346            sql_bail!("{} subscript does not support slices", expr_type_name);
4347        }
4348        assert!(
4349            p.end.is_none(),
4350            "index-appearing subscripts cannot have end value"
4351        );
4352        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4353    }
4354    Ok(scalar_subscripts)
4355}
4356
4357fn plan_subscript_array(
4358    ecx: &ExprContext,
4359    expr: HirScalarExpr,
4360    positions: &[SubscriptPosition<Aug>],
4361    offset: i64,
4362) -> Result<CoercibleScalarExpr, PlanError> {
4363    let mut exprs = Vec::with_capacity(positions.len() + 1);
4364    exprs.push(expr);
4365
4366    // Subscripting arrays doesn't yet support slicing, so we always want to
4367    // extract scalars or error.
4368    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4369
4370    for i in indexes {
4371        exprs.push(plan_expr(ecx, i)?.cast_to(
4372            ecx,
4373            CastContext::Explicit,
4374            &SqlScalarType::Int64,
4375        )?);
4376    }
4377
4378    Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4379}
4380
4381fn plan_subscript_list(
4382    ecx: &ExprContext,
4383    mut expr: HirScalarExpr,
4384    positions: &[SubscriptPosition<Aug>],
4385    mut remaining_layers: usize,
4386    elem_type_name: &str,
4387) -> Result<CoercibleScalarExpr, PlanError> {
4388    let mut i = 0;
4389
4390    while i < positions.len() {
4391        // Take all contiguous index operations, i.e. find next slice operation.
4392        let j = positions[i..]
4393            .iter()
4394            .position(|p| p.explicit_slice)
4395            .unwrap_or(positions.len() - i);
4396        if j != 0 {
4397            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4398            let (n, e) = plan_index_list(
4399                ecx,
4400                expr,
4401                indexes.as_slice(),
4402                remaining_layers,
4403                elem_type_name,
4404            )?;
4405            remaining_layers = n;
4406            expr = e;
4407            i += j;
4408        }
4409
4410        // Take all contiguous slice operations, i.e. find next index operation.
4411        let j = positions[i..]
4412            .iter()
4413            .position(|p| !p.explicit_slice)
4414            .unwrap_or(positions.len() - i);
4415        if j != 0 {
4416            expr = plan_slice_list(
4417                ecx,
4418                expr,
4419                &positions[i..i + j],
4420                remaining_layers,
4421                elem_type_name,
4422            )?;
4423            i += j;
4424        }
4425    }
4426
4427    Ok(expr.into())
4428}
4429
4430fn plan_index_list(
4431    ecx: &ExprContext,
4432    expr: HirScalarExpr,
4433    indexes: &[&Expr<Aug>],
4434    n_layers: usize,
4435    elem_type_name: &str,
4436) -> Result<(usize, HirScalarExpr), PlanError> {
4437    let depth = indexes.len();
4438
4439    if depth > n_layers {
4440        if n_layers == 0 {
4441            sql_bail!("cannot subscript type {}", elem_type_name)
4442        } else {
4443            sql_bail!(
4444                "cannot index into {} layers; list only has {} layer{}",
4445                depth,
4446                n_layers,
4447                if n_layers == 1 { "" } else { "s" }
4448            )
4449        }
4450    }
4451
4452    let mut exprs = Vec::with_capacity(depth + 1);
4453    exprs.push(expr);
4454
4455    for i in indexes {
4456        exprs.push(plan_expr(ecx, i)?.cast_to(
4457            ecx,
4458            CastContext::Explicit,
4459            &SqlScalarType::Int64,
4460        )?);
4461    }
4462
4463    Ok((
4464        n_layers - depth,
4465        HirScalarExpr::call_variadic(ListIndex, exprs),
4466    ))
4467}
4468
4469fn plan_slice_list(
4470    ecx: &ExprContext,
4471    expr: HirScalarExpr,
4472    slices: &[SubscriptPosition<Aug>],
4473    n_layers: usize,
4474    elem_type_name: &str,
4475) -> Result<HirScalarExpr, PlanError> {
4476    if n_layers == 0 {
4477        sql_bail!("cannot subscript type {}", elem_type_name)
4478    }
4479
4480    // first arg will be list
4481    let mut exprs = Vec::with_capacity(slices.len() + 1);
4482    exprs.push(expr);
4483    // extract (start, end) parts from collected slices
4484    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4485        Ok(match position {
4486            Some(p) => {
4487                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4488            }
4489            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4490        })
4491    };
4492    for p in slices {
4493        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4494        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4495        exprs.push(start);
4496        exprs.push(end);
4497    }
4498
4499    Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4500}
4501
4502fn plan_like(
4503    ecx: &ExprContext,
4504    expr: &Expr<Aug>,
4505    pattern: &Expr<Aug>,
4506    escape: Option<&Expr<Aug>>,
4507    case_insensitive: bool,
4508    not: bool,
4509) -> Result<HirScalarExpr, PlanError> {
4510    use CastContext::Implicit;
4511    let ecx = ecx.with_name("LIKE argument");
4512    let expr = plan_expr(&ecx, expr)?;
4513    let haystack = match ecx.scalar_type(&expr) {
4514        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4515            .type_as(&ecx, ty)?
4516            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4517        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4518    };
4519    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4520    if let Some(escape) = escape {
4521        pattern = pattern.call_binary(
4522            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4523            expr_func::LikeEscape,
4524        );
4525    }
4526    let func: BinaryFunc = if case_insensitive {
4527        expr_func::IsLikeMatchCaseInsensitive.into()
4528    } else {
4529        expr_func::IsLikeMatchCaseSensitive.into()
4530    };
4531    let like = haystack.call_binary(pattern, func);
4532    if not {
4533        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4534    } else {
4535        Ok(like)
4536    }
4537}
4538
4539fn plan_subscript_jsonb(
4540    ecx: &ExprContext,
4541    expr: HirScalarExpr,
4542    positions: &[SubscriptPosition<Aug>],
4543) -> Result<CoercibleScalarExpr, PlanError> {
4544    use CastContext::Implicit;
4545    use SqlScalarType::{Int64, String};
4546
4547    // JSONB doesn't support the slicing syntax, so simply error if you
4548    // encounter any explicit slices.
4549    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4550
4551    let mut exprs = Vec::with_capacity(subscripts.len());
4552    for s in subscripts {
4553        let subscript = plan_expr(ecx, s)?;
4554        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4555            subscript
4556        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4557            // Integers are converted to a string here and then re-parsed as an
4558            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4559            // do it.
4560            typeconv::to_string(ecx, subscript)
4561        } else {
4562            sql_bail!("jsonb subscript type must be coercible to integer or text");
4563        };
4564        exprs.push(subscript);
4565    }
4566
4567    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4568    // `expr->subscript` as you might expect.
4569    let expr = expr.call_binary(
4570        HirScalarExpr::call_variadic(
4571            ArrayCreate {
4572                elem_type: SqlScalarType::String,
4573            },
4574            exprs,
4575        ),
4576        expr_func::JsonbGetPath,
4577    );
4578    Ok(expr.into())
4579}
4580
4581fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4582    if !ecx.allow_subqueries {
4583        sql_bail!("{} does not allow subqueries", ecx.name)
4584    }
4585    let mut qcx = ecx.derived_query_context();
4586    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4587    Ok(expr.exists().into())
4588}
4589
4590fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4591    if !ecx.allow_subqueries {
4592        sql_bail!("{} does not allow subqueries", ecx.name)
4593    }
4594    let mut qcx = ecx.derived_query_context();
4595    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4596    let column_types = qcx.relation_type(&expr).column_types;
4597    if column_types.len() != 1 {
4598        sql_bail!(
4599            "Expected subselect to return 1 column, got {} columns",
4600            column_types.len()
4601        );
4602    }
4603    Ok(expr.select().into())
4604}
4605
4606fn plan_list_subquery(
4607    ecx: &ExprContext,
4608    query: &Query<Aug>,
4609) -> Result<CoercibleScalarExpr, PlanError> {
4610    plan_vector_like_subquery(
4611        ecx,
4612        query,
4613        |_| false,
4614        |elem_type| ListCreate { elem_type }.into(),
4615        |order_by| AggregateFunc::ListConcat { order_by },
4616        expr_func::ListListConcat.into(),
4617        |elem_type| {
4618            HirScalarExpr::literal(
4619                Datum::empty_list(),
4620                SqlScalarType::List {
4621                    element_type: Box::new(elem_type),
4622                    custom_id: None,
4623                },
4624            )
4625        },
4626        "list",
4627    )
4628}
4629
4630fn plan_array_subquery(
4631    ecx: &ExprContext,
4632    query: &Query<Aug>,
4633) -> Result<CoercibleScalarExpr, PlanError> {
4634    plan_vector_like_subquery(
4635        ecx,
4636        query,
4637        |elem_type| {
4638            matches!(
4639                elem_type,
4640                SqlScalarType::Char { .. }
4641                    | SqlScalarType::Array { .. }
4642                    | SqlScalarType::List { .. }
4643                    | SqlScalarType::Map { .. }
4644            )
4645        },
4646        |elem_type| ArrayCreate { elem_type }.into(),
4647        |order_by| AggregateFunc::ArrayConcat { order_by },
4648        expr_func::ArrayArrayConcat.into(),
4649        |elem_type| {
4650            HirScalarExpr::literal(
4651                Datum::empty_array(),
4652                SqlScalarType::Array(Box::new(elem_type)),
4653            )
4654        },
4655        "[]",
4656    )
4657}
4658
4659/// Generic function used to plan both array subqueries and list subqueries
4660fn plan_vector_like_subquery<F1, F2, F3, F4>(
4661    ecx: &ExprContext,
4662    query: &Query<Aug>,
4663    is_unsupported_type: F1,
4664    vector_create: F2,
4665    aggregate_concat: F3,
4666    binary_concat: BinaryFunc,
4667    empty_literal: F4,
4668    vector_type_string: &str,
4669) -> Result<CoercibleScalarExpr, PlanError>
4670where
4671    F1: Fn(&SqlScalarType) -> bool,
4672    F2: Fn(SqlScalarType) -> VariadicFunc,
4673    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4674    F4: Fn(SqlScalarType) -> HirScalarExpr,
4675{
4676    if !ecx.allow_subqueries {
4677        sql_bail!("{} does not allow subqueries", ecx.name)
4678    }
4679
4680    let mut qcx = ecx.derived_query_context();
4681    let mut planned_query = plan_query(&mut qcx, query)?;
4682    if planned_query.limit.is_some()
4683        || !planned_query
4684            .offset
4685            .clone()
4686            .try_into_literal_int64()
4687            .is_ok_and(|offset| offset == 0)
4688    {
4689        planned_query.expr = HirRelationExpr::top_k(
4690            planned_query.expr,
4691            vec![],
4692            planned_query.order_by.clone(),
4693            planned_query.limit,
4694            planned_query.offset,
4695            planned_query.group_size_hints.limit_input_group_size,
4696        );
4697    }
4698
4699    if planned_query.project.len() != 1 {
4700        sql_bail!(
4701            "Expected subselect to return 1 column, got {} columns",
4702            planned_query.project.len()
4703        );
4704    }
4705
4706    let project_column = *planned_query.project.get(0).unwrap();
4707    let elem_type = qcx
4708        .relation_type(&planned_query.expr)
4709        .column_types
4710        .get(project_column)
4711        .cloned()
4712        .unwrap()
4713        .scalar_type();
4714
4715    if is_unsupported_type(&elem_type) {
4716        bail_unsupported!(format!(
4717            "cannot build array from subquery because return type {}{}",
4718            ecx.humanize_sql_scalar_type(&elem_type, false),
4719            vector_type_string
4720        ));
4721    }
4722
4723    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4724    // subquery above.
4725    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4726        vector_create(elem_type.clone()),
4727        vec![HirScalarExpr::column(project_column)],
4728    ))
4729    .chain(
4730        planned_query
4731            .order_by
4732            .iter()
4733            .map(|co| HirScalarExpr::column(co.column)),
4734    )
4735    .collect();
4736
4737    // However, column references for `aggregation_projection` and `aggregation_order_by`
4738    // are with reference to the `exprs` of the aggregation expression.  Here that is
4739    // `aggregation_exprs`.
4740    let aggregation_projection = vec![0];
4741    let aggregation_order_by = planned_query
4742        .order_by
4743        .into_iter()
4744        .enumerate()
4745        .map(|(i, order)| ColumnOrder { column: i, ..order })
4746        .collect();
4747
4748    let reduced_expr = planned_query
4749        .expr
4750        .reduce(
4751            vec![],
4752            vec![AggregateExpr {
4753                func: aggregate_concat(aggregation_order_by),
4754                expr: Box::new(HirScalarExpr::call_variadic(
4755                    RecordCreate {
4756                        field_names: iter::repeat(ColumnName::from(""))
4757                            .take(aggregation_exprs.len())
4758                            .collect(),
4759                    },
4760                    aggregation_exprs,
4761                )),
4762                distinct: false,
4763            }],
4764            None,
4765        )
4766        .project(aggregation_projection);
4767
4768    // If `expr` has no rows, return an empty array/list rather than NULL.
4769    Ok(reduced_expr
4770        .select()
4771        .call_binary(empty_literal(elem_type), binary_concat)
4772        .into())
4773}
4774
4775fn plan_map_subquery(
4776    ecx: &ExprContext,
4777    query: &Query<Aug>,
4778) -> Result<CoercibleScalarExpr, PlanError> {
4779    if !ecx.allow_subqueries {
4780        sql_bail!("{} does not allow subqueries", ecx.name)
4781    }
4782
4783    let mut qcx = ecx.derived_query_context();
4784    let mut query = plan_query(&mut qcx, query)?;
4785    if query.limit.is_some()
4786        || !query
4787            .offset
4788            .clone()
4789            .try_into_literal_int64()
4790            .is_ok_and(|offset| offset == 0)
4791    {
4792        query.expr = HirRelationExpr::top_k(
4793            query.expr,
4794            vec![],
4795            query.order_by.clone(),
4796            query.limit,
4797            query.offset,
4798            query.group_size_hints.limit_input_group_size,
4799        );
4800    }
4801    if query.project.len() != 2 {
4802        sql_bail!(
4803            "expected map subquery to return 2 columns, got {} columns",
4804            query.project.len()
4805        );
4806    }
4807
4808    let query_types = qcx.relation_type(&query.expr).column_types;
4809    let key_column = query.project[0];
4810    let key_type = query_types[key_column].clone().scalar_type();
4811    let value_column = query.project[1];
4812    let value_type = query_types[value_column].clone().scalar_type();
4813
4814    if key_type != SqlScalarType::String {
4815        sql_bail!("cannot build map from subquery because first column is not of type text");
4816    }
4817
4818    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4819        RecordCreate {
4820            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4821        },
4822        vec![
4823            HirScalarExpr::column(key_column),
4824            HirScalarExpr::column(value_column),
4825        ],
4826    ))
4827    .chain(
4828        query
4829            .order_by
4830            .iter()
4831            .map(|co| HirScalarExpr::column(co.column)),
4832    )
4833    .collect();
4834
4835    let expr = query
4836        .expr
4837        .reduce(
4838            vec![],
4839            vec![AggregateExpr {
4840                func: AggregateFunc::MapAgg {
4841                    order_by: query
4842                        .order_by
4843                        .into_iter()
4844                        .enumerate()
4845                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4846                        .collect(),
4847                    value_type: value_type.clone(),
4848                },
4849                expr: Box::new(HirScalarExpr::call_variadic(
4850                    RecordCreate {
4851                        field_names: iter::repeat(ColumnName::from(""))
4852                            .take(aggregation_exprs.len())
4853                            .collect(),
4854                    },
4855                    aggregation_exprs,
4856                )),
4857                distinct: false,
4858            }],
4859            None,
4860        )
4861        .project(vec![0]);
4862
4863    // If `expr` has no rows, return an empty map rather than NULL.
4864    let expr = HirScalarExpr::call_variadic(
4865        Coalesce,
4866        vec![
4867            expr.select(),
4868            HirScalarExpr::literal(
4869                Datum::empty_map(),
4870                SqlScalarType::Map {
4871                    value_type: Box::new(value_type),
4872                    custom_id: None,
4873                },
4874            ),
4875        ],
4876    );
4877
4878    Ok(expr.into())
4879}
4880
4881fn plan_collate(
4882    ecx: &ExprContext,
4883    expr: &Expr<Aug>,
4884    collation: &UnresolvedItemName,
4885) -> Result<CoercibleScalarExpr, PlanError> {
4886    if collation.0.len() == 2
4887        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4888        && collation.0[1] == ident!("default")
4889    {
4890        plan_expr(ecx, expr)
4891    } else {
4892        bail_unsupported!("COLLATE");
4893    }
4894}
4895
4896/// Plans a slice of expressions.
4897///
4898/// This function is a simple convenience function for mapping [`plan_expr`]
4899/// over a slice of expressions. The planned expressions are returned in the
4900/// same order as the input. If any of the expressions fail to plan, returns an
4901/// error instead.
4902fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4903where
4904    E: std::borrow::Borrow<Expr<Aug>>,
4905{
4906    let mut out = vec![];
4907    for expr in exprs {
4908        out.push(plan_expr(ecx, expr.borrow())?);
4909    }
4910    Ok(out)
4911}
4912
4913/// Plans an `ARRAY` expression.
4914fn plan_array(
4915    ecx: &ExprContext,
4916    exprs: &[Expr<Aug>],
4917    type_hint: Option<&SqlScalarType>,
4918) -> Result<CoercibleScalarExpr, PlanError> {
4919    // Plan each element expression.
4920    let mut out = vec![];
4921    for expr in exprs {
4922        out.push(match expr {
4923            // Special case nested ARRAY expressions so we can plumb
4924            // the type hint through.
4925            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4926            _ => plan_expr(ecx, expr)?,
4927        });
4928    }
4929
4930    // Attempt to make use of the type hint.
4931    let type_hint = match type_hint {
4932        // The user has provided an explicit cast to an array type. We know the
4933        // element type to coerce to. Need to be careful, though: if there's
4934        // evidence that any of the array elements are themselves arrays, we
4935        // want to coerce to the array type, not the element type.
4936        Some(SqlScalarType::Array(elem_type)) => {
4937            let multidimensional = out.iter().any(|e| {
4938                matches!(
4939                    ecx.scalar_type(e),
4940                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4941                )
4942            });
4943            if multidimensional {
4944                type_hint
4945            } else {
4946                Some(&**elem_type)
4947            }
4948        }
4949        // The user provided an explicit cast to a non-array type. We'll have to
4950        // guess what the correct type for the array. Our caller will then
4951        // handle converting that array type to the desired non-array type.
4952        Some(_) => None,
4953        // No type hint. We'll have to guess the correct type for the array.
4954        None => None,
4955    };
4956
4957    // Coerce all elements to the same type.
4958    let (elem_type, exprs) = if exprs.is_empty() {
4959        if let Some(elem_type) = type_hint {
4960            (elem_type.clone(), vec![])
4961        } else {
4962            sql_bail!("cannot determine type of empty array");
4963        }
4964    } else {
4965        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4966        (ecx.scalar_type(&out[0]), out)
4967    };
4968
4969    // Arrays of `char` type are disallowed due to a known limitation:
4970    // https://github.com/MaterializeInc/database-issues/issues/2360.
4971    //
4972    // Arrays of `list` and `map` types are disallowed due to mind-bending
4973    // semantics.
4974    if matches!(
4975        elem_type,
4976        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4977    ) {
4978        bail_unsupported!(format!(
4979            "{}[]",
4980            ecx.humanize_sql_scalar_type(&elem_type, false)
4981        ));
4982    }
4983
4984    Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4985}
4986
4987fn plan_list(
4988    ecx: &ExprContext,
4989    exprs: &[Expr<Aug>],
4990    type_hint: Option<&SqlScalarType>,
4991) -> Result<CoercibleScalarExpr, PlanError> {
4992    let (elem_type, exprs) = if exprs.is_empty() {
4993        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4994            (element_type.without_modifiers(), vec![])
4995        } else {
4996            sql_bail!("cannot determine type of empty list");
4997        }
4998    } else {
4999        let type_hint = match type_hint {
5000            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5001            _ => None,
5002        };
5003
5004        let mut out = vec![];
5005        for expr in exprs {
5006            out.push(match expr {
5007                // Special case nested LIST expressions so we can plumb
5008                // the type hint through.
5009                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5010                _ => plan_expr(ecx, expr)?,
5011            });
5012        }
5013        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5014        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5015    };
5016
5017    if matches!(elem_type, SqlScalarType::Char { .. }) {
5018        bail_unsupported!("char list");
5019    }
5020
5021    Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5022}
5023
5024fn plan_map(
5025    ecx: &ExprContext,
5026    entries: &[MapEntry<Aug>],
5027    type_hint: Option<&SqlScalarType>,
5028) -> Result<CoercibleScalarExpr, PlanError> {
5029    let (value_type, exprs) = if entries.is_empty() {
5030        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5031            (value_type.without_modifiers(), vec![])
5032        } else {
5033            sql_bail!("cannot determine type of empty map");
5034        }
5035    } else {
5036        let type_hint = match type_hint {
5037            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5038            _ => None,
5039        };
5040
5041        let mut keys = vec![];
5042        let mut values = vec![];
5043        for MapEntry { key, value } in entries {
5044            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5045            let value = match value {
5046                // Special case nested MAP expressions so we can plumb
5047                // the type hint through.
5048                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5049                _ => plan_expr(ecx, value)?,
5050            };
5051            keys.push(key);
5052            values.push(value);
5053        }
5054        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5055        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5056        let out = itertools::interleave(keys, values).collect();
5057        (value_type, out)
5058    };
5059
5060    if matches!(value_type, SqlScalarType::Char { .. }) {
5061        bail_unsupported!("char map");
5062    }
5063
5064    let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5065    Ok(expr.into())
5066}
5067
5068/// Coerces a list of expressions such that all input expressions will be cast
5069/// to the same type. If successful, returns a new list of expressions in the
5070/// same order as the input, where each expression has the appropriate casts to
5071/// make them all of a uniform type.
5072///
5073/// If `force_type` is `Some`, the expressions are forced to the specified type
5074/// via an explicit cast. Otherwise the best common type is guessed via
5075/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5076/// implicit casts
5077///
5078/// Note that this is our implementation of Postgres' type conversion for
5079/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5080/// isn't yet used in all of those cases.
5081///
5082/// [union-type-conv]:
5083/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5084pub fn coerce_homogeneous_exprs(
5085    ecx: &ExprContext,
5086    exprs: Vec<CoercibleScalarExpr>,
5087    force_type: Option<&SqlScalarType>,
5088) -> Result<Vec<HirScalarExpr>, PlanError> {
5089    assert!(!exprs.is_empty());
5090
5091    let target_holder;
5092    let target = match force_type {
5093        Some(t) => t,
5094        None => {
5095            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5096            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5097            &target_holder
5098        }
5099    };
5100
5101    // Try to cast all expressions to `target`.
5102    let mut out = Vec::new();
5103    for expr in exprs {
5104        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5105        let ccx = match force_type {
5106            None => CastContext::Implicit,
5107            Some(_) => CastContext::Explicit,
5108        };
5109        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5110            Ok(expr) => out.push(expr),
5111            Err(_) => sql_bail!(
5112                "{} could not convert type {} to {}",
5113                ecx.name,
5114                ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5115                ecx.humanize_sql_scalar_type(target, false),
5116            ),
5117        }
5118    }
5119    Ok(out)
5120}
5121
5122/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5123/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5124pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5125    obe: &OrderByExpr<T>,
5126    column: usize,
5127) -> ColumnOrder {
5128    let desc = !obe.asc.unwrap_or(true);
5129    ColumnOrder {
5130        column,
5131        desc,
5132        // https://www.postgresql.org/docs/14/queries-order.html
5133        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5134        nulls_last: obe.nulls_last.unwrap_or(!desc),
5135    }
5136}
5137
5138/// Plans the ORDER BY clause of a window function.
5139///
5140/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5141/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5142/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5143/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5144/// `Vec<HirScalarExpr>` that we return.
5145fn plan_function_order_by(
5146    ecx: &ExprContext,
5147    order_by: &[OrderByExpr<Aug>],
5148) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5149    let mut order_by_exprs = vec![];
5150    let mut col_orders = vec![];
5151    {
5152        for (i, obe) in order_by.iter().enumerate() {
5153            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5154            // do not support ordinal references in PostgreSQL. So we use
5155            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5156            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5157            order_by_exprs.push(expr);
5158            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5159        }
5160    }
5161    Ok((order_by_exprs, col_orders))
5162}
5163
5164/// Common part of the planning of windowed and non-windowed aggregation functions.
5165fn plan_aggregate_common(
5166    ecx: &ExprContext,
5167    Function::<Aug> {
5168        name,
5169        args,
5170        filter,
5171        over: _,
5172        distinct,
5173    }: &Function<Aug>,
5174) -> Result<AggregateExpr, PlanError> {
5175    // Normal aggregate functions, like `sum`, expect as input a single expression
5176    // which yields the datum to aggregate. Order sensitive aggregate functions,
5177    // like `jsonb_agg`, are special, and instead expect a Record whose first
5178    // element yields the datum to aggregate and whose successive elements yield
5179    // keys to order by. This expectation is hard coded within the implementation
5180    // of each of the order-sensitive aggregates. The specification of how many
5181    // order by keys to consider, and in what order, is passed via the `order_by`
5182    // field on the `AggregateFunc` variant.
5183
5184    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5185    // most, so explicitly drop it if the function doesn't care about order. This
5186    // prevents the projection into Record below from triggering on unsupported
5187    // functions.
5188
5189    let impls = match resolve_func(ecx, name, args)? {
5190        Func::Aggregate(impls) => impls,
5191        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5192    };
5193
5194    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5195    // generalized function selection framework. The rule is simple: the user
5196    // must type `count(*)`, but the function selection framework sees an empty
5197    // parameter list, as if the user had typed `count()`. But if the user types
5198    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5199    // rules to all aggregates, not just `count`, since we may one day support
5200    // user-defined aggregates, including user-defined aggregates that take no
5201    // parameters.
5202    let (args, order_by) = match &args {
5203        FunctionArgs::Star => (vec![], vec![]),
5204        FunctionArgs::Args { args, order_by } => {
5205            if args.is_empty() {
5206                sql_bail!(
5207                    "{}(*) must be used to call a parameterless aggregate function",
5208                    ecx.qcx
5209                        .scx
5210                        .humanize_resolved_name(name)
5211                        .expect("name actually resolved")
5212                );
5213            }
5214            let args = plan_exprs(ecx, args)?;
5215            (args, order_by.clone())
5216        }
5217    };
5218
5219    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5220
5221    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5222    if let Some(filter) = &filter {
5223        // If a filter is present, as in
5224        //
5225        //     <agg>(<expr>) FILTER (WHERE <cond>)
5226        //
5227        // we plan it by essentially rewriting the expression to
5228        //
5229        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5230        //
5231        // where <identity> is the identity input for <agg>.
5232        let cond =
5233            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5234        let expr_typ = ecx.scalar_type(&expr);
5235        expr = HirScalarExpr::if_then_else(
5236            cond,
5237            expr,
5238            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5239        );
5240    }
5241
5242    let mut seen_outer = false;
5243    let mut seen_inner = false;
5244    #[allow(deprecated)]
5245    expr.visit_columns(0, &mut |depth, col| {
5246        if depth == 0 && col.level == 0 {
5247            seen_inner = true;
5248        } else if col.level > depth {
5249            seen_outer = true;
5250        }
5251    });
5252    if seen_outer && !seen_inner {
5253        bail_unsupported!(
5254            3720,
5255            "aggregate functions that refer exclusively to outer columns"
5256        );
5257    }
5258
5259    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5260    // map the needed expressions into the aggregate datum.
5261    if func.is_order_sensitive() {
5262        let field_names = iter::repeat(ColumnName::from(""))
5263            .take(1 + order_by_exprs.len())
5264            .collect();
5265        let mut exprs = vec![expr];
5266        exprs.extend(order_by_exprs);
5267        expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5268    }
5269
5270    Ok(AggregateExpr {
5271        func,
5272        expr: Box::new(expr),
5273        distinct: *distinct,
5274    })
5275}
5276
5277fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5278    let mut names = names.to_vec();
5279    let col_name = normalize::column_name(names.pop().unwrap());
5280
5281    // If the name is qualified, it must refer to a column in a table.
5282    if !names.is_empty() {
5283        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5284        let (i, i_name) = ecx.scope.resolve_table_column(
5285            &ecx.qcx.outer_scopes,
5286            &table_name,
5287            &col_name,
5288            &mut ecx.qcx.name_manager.borrow_mut(),
5289        )?;
5290        return Ok(HirScalarExpr::named_column(i, i_name));
5291    }
5292
5293    // If the name is unqualified, first check if it refers to a column. Track any similar names
5294    // that might exist for a better error message.
5295    let similar_names = match ecx.scope.resolve_column(
5296        &ecx.qcx.outer_scopes,
5297        &col_name,
5298        &mut ecx.qcx.name_manager.borrow_mut(),
5299    ) {
5300        Ok((i, i_name)) => {
5301            return Ok(HirScalarExpr::named_column(i, i_name));
5302        }
5303        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5304        Err(e) => return Err(e),
5305    };
5306
5307    // The name doesn't refer to a column. Check if it is a whole-row reference
5308    // to a table.
5309    let items = ecx.scope.items_from_table(
5310        &ecx.qcx.outer_scopes,
5311        &PartialItemName {
5312            database: None,
5313            schema: None,
5314            item: col_name.as_str().to_owned(),
5315        },
5316    )?;
5317    match items.as_slice() {
5318        // The name doesn't refer to a table either. Return an error.
5319        [] => Err(PlanError::UnknownColumn {
5320            table: None,
5321            column: col_name,
5322            similar: similar_names,
5323        }),
5324        // The name refers to a table that is the result of a function that
5325        // returned a single column. Per PostgreSQL, this is a special case
5326        // that returns the value directly.
5327        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5328        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5329            *column,
5330            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5331        )),
5332        // The name refers to a normal table. Return a record containing all the
5333        // columns of the table.
5334        _ => {
5335            let mut has_exists_column = None;
5336            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5337                .into_iter()
5338                .filter_map(|(column, item)| {
5339                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5340                        has_exists_column = Some(column);
5341                        None
5342                    } else {
5343                        let expr = HirScalarExpr::named_column(
5344                            column,
5345                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5346                        );
5347                        let name = item.column_name.clone();
5348                        Some((expr, name))
5349                    }
5350                })
5351                .unzip();
5352            // For the special case of a table function with a single column, the single column is instead not wrapped.
5353            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5354                exprs.into_element()
5355            } else {
5356                HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5357            };
5358            if let Some(has_exists_column) = has_exists_column {
5359                Ok(HirScalarExpr::if_then_else(
5360                    HirScalarExpr::unnamed_column(has_exists_column)
5361                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5362                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5363                    expr,
5364                ))
5365            } else {
5366                Ok(expr)
5367            }
5368        }
5369    }
5370}
5371
5372fn plan_op(
5373    ecx: &ExprContext,
5374    op: &str,
5375    expr1: &Expr<Aug>,
5376    expr2: Option<&Expr<Aug>>,
5377) -> Result<HirScalarExpr, PlanError> {
5378    let impls = func::resolve_op(op)?;
5379    let args = match expr2 {
5380        None => plan_exprs(ecx, &[expr1])?,
5381        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5382    };
5383    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5384}
5385
5386fn plan_function<'a>(
5387    ecx: &ExprContext,
5388    f @ Function {
5389        name,
5390        args,
5391        filter,
5392        over,
5393        distinct,
5394    }: &'a Function<Aug>,
5395) -> Result<HirScalarExpr, PlanError> {
5396    let impls = match resolve_func(ecx, name, args)? {
5397        Func::Table(_) => {
5398            sql_bail!(
5399                "table functions are not allowed in {} (function {})",
5400                ecx.name,
5401                name
5402            );
5403        }
5404        Func::Scalar(impls) => {
5405            if over.is_some() {
5406                sql_bail!(
5407                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5408                );
5409            }
5410            impls
5411        }
5412        Func::ScalarWindow(impls) => {
5413            let (
5414                ignore_nulls,
5415                order_by_exprs,
5416                col_orders,
5417                _window_frame,
5418                partition_by,
5419                scalar_args,
5420            ) = plan_window_function_non_aggr(ecx, f)?;
5421
5422            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5423            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5424            // msg there is less informative.)
5425            if !scalar_args.is_empty() {
5426                if let ResolvedItemName::Item {
5427                    full_name: FullItemName { item, .. },
5428                    ..
5429                } = name
5430                {
5431                    sql_bail!(
5432                        "function {} has 0 parameters, but was called with {}",
5433                        item,
5434                        scalar_args.len()
5435                    );
5436                }
5437            }
5438
5439            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5440            // accept a window frame here without an error msg. (Postgres also does this.)
5441            // TODO: maybe we should give a notice
5442
5443            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5444
5445            if ignore_nulls {
5446                // If we ever add a scalar window function that supports ignore, then don't forget
5447                // to also update HIR EXPLAIN.
5448                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5449            }
5450
5451            return Ok(HirScalarExpr::windowing(WindowExpr {
5452                func: WindowExprType::Scalar(ScalarWindowExpr {
5453                    func,
5454                    order_by: col_orders,
5455                }),
5456                partition_by,
5457                order_by: order_by_exprs,
5458            }));
5459        }
5460        Func::ValueWindow(impls) => {
5461            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5462            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5463                window_plan;
5464
5465            let (args_encoded, func) =
5466                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5467
5468            if ignore_nulls {
5469                match func {
5470                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5471                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5472                }
5473            }
5474
5475            return Ok(HirScalarExpr::windowing(WindowExpr {
5476                func: WindowExprType::Value(ValueWindowExpr {
5477                    func,
5478                    args: Box::new(args_encoded),
5479                    order_by: col_orders,
5480                    window_frame,
5481                    ignore_nulls, // (RESPECT NULLS is the default)
5482                }),
5483                partition_by,
5484                order_by: order_by_exprs,
5485            }));
5486        }
5487        Func::Aggregate(_) => {
5488            if f.over.is_none() {
5489                // Not a window aggregate. Something is wrong.
5490                if ecx.allow_aggregates {
5491                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5492                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5493                    sql_bail!(
5494                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5495                        name,
5496                    );
5497                } else {
5498                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5499                    // because it was in an unsupported context.
5500                    sql_bail!(
5501                        "aggregate functions are not allowed in {} (function {})",
5502                        ecx.name,
5503                        name
5504                    );
5505                }
5506            } else {
5507                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5508                    plan_window_function_common(ecx, &f.name, &f.over)?;
5509
5510                // https://github.com/MaterializeInc/database-issues/issues/6720
5511                match (&window_frame.start_bound, &window_frame.end_bound) {
5512                    (
5513                        mz_expr::WindowFrameBound::UnboundedPreceding,
5514                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5515                    )
5516                    | (
5517                        mz_expr::WindowFrameBound::UnboundedPreceding,
5518                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5519                    )
5520                    | (
5521                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5522                        mz_expr::WindowFrameBound::UnboundedFollowing,
5523                    )
5524                    | (
5525                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5526                        mz_expr::WindowFrameBound::UnboundedFollowing,
5527                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5528                    (_, _) => {} // other cases are ok
5529                }
5530
5531                if ignore_nulls {
5532                    // https://github.com/MaterializeInc/database-issues/issues/6722
5533                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5534                    // forget to also update HIR EXPLAIN.
5535                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5536                }
5537
5538                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5539
5540                if aggregate_expr.distinct {
5541                    // https://github.com/MaterializeInc/database-issues/issues/6626
5542                    bail_unsupported!("DISTINCT in window aggregates");
5543                }
5544
5545                return Ok(HirScalarExpr::windowing(WindowExpr {
5546                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5547                        aggregate_expr,
5548                        order_by: col_orders,
5549                        window_frame,
5550                    }),
5551                    partition_by,
5552                    order_by: order_by_exprs,
5553                }));
5554            }
5555        }
5556    };
5557
5558    if over.is_some() {
5559        unreachable!("If there is an OVER clause, we should have returned already above.");
5560    }
5561
5562    if *distinct {
5563        sql_bail!(
5564            "DISTINCT specified, but {} is not an aggregate function",
5565            ecx.qcx
5566                .scx
5567                .humanize_resolved_name(name)
5568                .expect("already resolved")
5569        );
5570    }
5571    if filter.is_some() {
5572        sql_bail!(
5573            "FILTER specified, but {} is not an aggregate function",
5574            ecx.qcx
5575                .scx
5576                .humanize_resolved_name(name)
5577                .expect("already resolved")
5578        );
5579    }
5580
5581    let scalar_args = match &args {
5582        FunctionArgs::Star => {
5583            sql_bail!(
5584                "* argument is invalid with non-aggregate function {}",
5585                ecx.qcx
5586                    .scx
5587                    .humanize_resolved_name(name)
5588                    .expect("already resolved")
5589            )
5590        }
5591        FunctionArgs::Args { args, order_by } => {
5592            if !order_by.is_empty() {
5593                sql_bail!(
5594                    "ORDER BY specified, but {} is not an aggregate function",
5595                    ecx.qcx
5596                        .scx
5597                        .humanize_resolved_name(name)
5598                        .expect("already resolved")
5599                );
5600            }
5601            plan_exprs(ecx, args)?
5602        }
5603    };
5604
5605    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5606}
5607
5608pub const IGNORE_NULLS_ERROR_MSG: &str =
5609    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5610
5611/// Resolves the name to a set of function implementations.
5612///
5613/// If the name does not specify a known built-in function, returns an error.
5614pub fn resolve_func(
5615    ecx: &ExprContext,
5616    name: &ResolvedItemName,
5617    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5618) -> Result<&'static Func, PlanError> {
5619    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5620        if let Ok(f) = i.func() {
5621            return Ok(f);
5622        }
5623    }
5624
5625    // Couldn't resolve function with this name, so generate verbose error
5626    // message.
5627    let cexprs = match args {
5628        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5629        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5630            if !order_by.is_empty() {
5631                sql_bail!(
5632                    "ORDER BY specified, but {} is not an aggregate function",
5633                    name
5634                );
5635            }
5636            plan_exprs(ecx, args)?
5637        }
5638    };
5639
5640    let arg_types: Vec<_> = cexprs
5641        .into_iter()
5642        .map(|ty| match ecx.scalar_type(&ty) {
5643            CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5644            CoercibleScalarType::Record(_) => "record".to_string(),
5645            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5646        })
5647        .collect();
5648
5649    Err(PlanError::UnknownFunction {
5650        name: name.to_string(),
5651        arg_types,
5652    })
5653}
5654
5655fn plan_is_expr<'a>(
5656    ecx: &ExprContext,
5657    expr: &'a Expr<Aug>,
5658    construct: &IsExprConstruct<Aug>,
5659    not: bool,
5660) -> Result<HirScalarExpr, PlanError> {
5661    let expr_hir = plan_expr(ecx, expr)?;
5662
5663    let mut result = match construct {
5664        IsExprConstruct::Null => {
5665            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5666            // at odds with our type coercion rules, which treat `NULL` literals
5667            // and unconstrained parameters identically. Providing a type hint
5668            // of string means we wind up supporting both.
5669            expr_hir.type_as_any(ecx)?.call_is_null()
5670        }
5671        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5672        IsExprConstruct::True => expr_hir
5673            .type_as(ecx, &SqlScalarType::Bool)?
5674            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5675        IsExprConstruct::False => expr_hir
5676            .type_as(ecx, &SqlScalarType::Bool)?
5677            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5678        IsExprConstruct::DistinctFrom(expr2) => {
5679            // There are three cases:
5680            // 1. Both terms are non-null, in which case the result should be `a != b`.
5681            // 2. Exactly one term is null, in which case the result should be true.
5682            // 3. Both terms are null, in which case the result should be false.
5683            //
5684            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5685
5686            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5687            // construct an AST expression for `expr != expr2` and plan it to get proper type
5688            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5689            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5690            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5691
5692            let expr1_hir = expr_hir.type_as_any(ecx)?;
5693            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5694
5695            let term1 = HirScalarExpr::variadic_or(vec![
5696                ne_hir,
5697                expr1_hir.clone().call_is_null(),
5698                expr2_hir.clone().call_is_null(),
5699            ]);
5700            let term2 = HirScalarExpr::variadic_or(vec![
5701                expr1_hir.call_is_null().not(),
5702                expr2_hir.call_is_null().not(),
5703            ]);
5704            term1.and(term2)
5705        }
5706    };
5707    if not {
5708        result = result.not();
5709    }
5710    Ok(result)
5711}
5712
5713fn plan_case<'a>(
5714    ecx: &ExprContext,
5715    operand: &'a Option<Box<Expr<Aug>>>,
5716    conditions: &'a [Expr<Aug>],
5717    results: &'a [Expr<Aug>],
5718    else_result: &'a Option<Box<Expr<Aug>>>,
5719) -> Result<HirScalarExpr, PlanError> {
5720    let mut cond_exprs = Vec::new();
5721    let mut result_exprs = Vec::new();
5722    for (c, r) in conditions.iter().zip_eq(results) {
5723        let c = match operand {
5724            Some(operand) => operand.clone().equals(c.clone()),
5725            None => c.clone(),
5726        };
5727        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5728        cond_exprs.push(cexpr);
5729        result_exprs.push(r);
5730    }
5731    result_exprs.push(match else_result {
5732        Some(else_result) => else_result,
5733        None => &Expr::Value(Value::Null),
5734    });
5735    let mut result_exprs = coerce_homogeneous_exprs(
5736        &ecx.with_name("CASE"),
5737        plan_exprs(ecx, &result_exprs)?,
5738        None,
5739    )?;
5740    let mut expr = result_exprs.pop().unwrap();
5741    assert_eq!(cond_exprs.len(), result_exprs.len());
5742    for (cexpr, rexpr) in cond_exprs
5743        .into_iter()
5744        .rev()
5745        .zip_eq(result_exprs.into_iter().rev())
5746    {
5747        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5748    }
5749    Ok(expr)
5750}
5751
5752fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5753    let (datum, scalar_type) = match l {
5754        Value::Number(s) => {
5755            let d = strconv::parse_numeric(s.as_str())?;
5756            if !s.contains(&['E', '.'][..]) {
5757                // Maybe representable as an int?
5758                if let Ok(n) = d.0.try_into() {
5759                    (Datum::Int32(n), SqlScalarType::Int32)
5760                } else if let Ok(n) = d.0.try_into() {
5761                    (Datum::Int64(n), SqlScalarType::Int64)
5762                } else {
5763                    (
5764                        Datum::Numeric(d),
5765                        SqlScalarType::Numeric { max_scale: None },
5766                    )
5767                }
5768            } else {
5769                (
5770                    Datum::Numeric(d),
5771                    SqlScalarType::Numeric { max_scale: None },
5772                )
5773            }
5774        }
5775        Value::HexString(_) => bail_unsupported!("hex string literals"),
5776        Value::Boolean(b) => match b {
5777            false => (Datum::False, SqlScalarType::Bool),
5778            true => (Datum::True, SqlScalarType::Bool),
5779        },
5780        Value::Interval(i) => {
5781            let i = literal::plan_interval(i)?;
5782            (Datum::Interval(i), SqlScalarType::Interval)
5783        }
5784        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5785        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5786    };
5787    let expr = HirScalarExpr::literal(datum, scalar_type);
5788    Ok(expr.into())
5789}
5790
5791/// The common part of the planning of non-aggregate window functions, i.e.,
5792/// scalar window functions and value window functions.
5793fn plan_window_function_non_aggr<'a>(
5794    ecx: &ExprContext,
5795    Function {
5796        name,
5797        args,
5798        filter,
5799        over,
5800        distinct,
5801    }: &'a Function<Aug>,
5802) -> Result<
5803    (
5804        bool,
5805        Vec<HirScalarExpr>,
5806        Vec<ColumnOrder>,
5807        mz_expr::WindowFrame,
5808        Vec<HirScalarExpr>,
5809        Vec<CoercibleScalarExpr>,
5810    ),
5811    PlanError,
5812> {
5813    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5814        plan_window_function_common(ecx, name, over)?;
5815
5816    if *distinct {
5817        sql_bail!(
5818            "DISTINCT specified, but {} is not an aggregate function",
5819            name
5820        );
5821    }
5822
5823    if filter.is_some() {
5824        bail_unsupported!("FILTER in non-aggregate window functions");
5825    }
5826
5827    let scalar_args = match &args {
5828        FunctionArgs::Star => {
5829            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5830        }
5831        FunctionArgs::Args { args, order_by } => {
5832            if !order_by.is_empty() {
5833                sql_bail!(
5834                    "ORDER BY specified, but {} is not an aggregate function",
5835                    name
5836                );
5837            }
5838            plan_exprs(ecx, args)?
5839        }
5840    };
5841
5842    Ok((
5843        ignore_nulls,
5844        order_by_exprs,
5845        col_orders,
5846        window_frame,
5847        partition,
5848        scalar_args,
5849    ))
5850}
5851
5852/// The common part of the planning of all window functions.
5853fn plan_window_function_common(
5854    ecx: &ExprContext,
5855    name: &<Aug as AstInfo>::ItemName,
5856    over: &Option<WindowSpec<Aug>>,
5857) -> Result<
5858    (
5859        bool,
5860        Vec<HirScalarExpr>,
5861        Vec<ColumnOrder>,
5862        mz_expr::WindowFrame,
5863        Vec<HirScalarExpr>,
5864    ),
5865    PlanError,
5866> {
5867    if !ecx.allow_windows {
5868        sql_bail!(
5869            "window functions are not allowed in {} (function {})",
5870            ecx.name,
5871            name
5872        );
5873    }
5874
5875    let window_spec = match over.as_ref() {
5876        Some(over) => over,
5877        None => sql_bail!("window function {} requires an OVER clause", name),
5878    };
5879    if window_spec.ignore_nulls && window_spec.respect_nulls {
5880        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5881    }
5882    let window_frame = match window_spec.window_frame.as_ref() {
5883        Some(frame) => plan_window_frame(frame)?,
5884        None => mz_expr::WindowFrame::default(),
5885    };
5886    let mut partition = Vec::new();
5887    for expr in &window_spec.partition_by {
5888        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5889    }
5890
5891    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5892
5893    Ok((
5894        window_spec.ignore_nulls,
5895        order_by_exprs,
5896        col_orders,
5897        window_frame,
5898        partition,
5899    ))
5900}
5901
5902fn plan_window_frame(
5903    WindowFrame {
5904        units,
5905        start_bound,
5906        end_bound,
5907    }: &WindowFrame,
5908) -> Result<mz_expr::WindowFrame, PlanError> {
5909    use mz_expr::WindowFrameBound::*;
5910    let units = window_frame_unit_ast_to_expr(units)?;
5911    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5912    let end_bound = end_bound
5913        .as_ref()
5914        .map(window_frame_bound_ast_to_expr)
5915        .unwrap_or(CurrentRow);
5916
5917    // Validate bounds according to Postgres rules
5918    match (&start_bound, &end_bound) {
5919        // Start bound can't be UNBOUNDED FOLLOWING
5920        (UnboundedFollowing, _) => {
5921            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5922        }
5923        // End bound can't be UNBOUNDED PRECEDING
5924        (_, UnboundedPreceding) => {
5925            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5926        }
5927        // Start bound should come before end bound in the list of bound definitions
5928        (CurrentRow, OffsetPreceding(_)) => {
5929            sql_bail!("frame starting from current row cannot have preceding rows")
5930        }
5931        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5932            sql_bail!("frame starting from following row cannot have preceding rows")
5933        }
5934        // The above rules are adopted from Postgres.
5935        // The following rules are Materialize-specific.
5936        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5937            // Note that the only hard limit is that partition size + offset should fit in i64, so
5938            // in theory, we could support much larger offsets than this. But for our current
5939            // performance, even 1000000 is quite big.
5940            if *o1 > 1000000 || *o2 > 1000000 {
5941                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5942            }
5943        }
5944        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5945            if *o1 > 1000000 || *o2 > 1000000 {
5946                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5947            }
5948        }
5949        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5950            if *o1 > 1000000 || *o2 > 1000000 {
5951                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5952            }
5953        }
5954        (OffsetPreceding(o), CurrentRow) => {
5955            if *o > 1000000 {
5956                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5957            }
5958        }
5959        (CurrentRow, OffsetFollowing(o)) => {
5960            if *o > 1000000 {
5961                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5962            }
5963        }
5964        // Other bounds are valid
5965        (_, _) => (),
5966    }
5967
5968    // RANGE is only supported in the default frame
5969    // https://github.com/MaterializeInc/database-issues/issues/6585
5970    if units == mz_expr::WindowFrameUnits::Range
5971        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5972    {
5973        bail_unsupported!("RANGE in non-default window frames")
5974    }
5975
5976    let frame = mz_expr::WindowFrame {
5977        units,
5978        start_bound,
5979        end_bound,
5980    };
5981    Ok(frame)
5982}
5983
5984fn window_frame_unit_ast_to_expr(
5985    unit: &WindowFrameUnits,
5986) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5987    match unit {
5988        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5989        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5990        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5991    }
5992}
5993
5994fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5995    match bound {
5996        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5997        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5998        WindowFrameBound::Preceding(Some(offset)) => {
5999            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6000        }
6001        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6002        WindowFrameBound::Following(Some(offset)) => {
6003            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6004        }
6005    }
6006}
6007
6008pub fn scalar_type_from_sql(
6009    scx: &StatementContext,
6010    data_type: &ResolvedDataType,
6011) -> Result<SqlScalarType, PlanError> {
6012    match data_type {
6013        ResolvedDataType::AnonymousList(elem_type) => {
6014            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6015            if matches!(elem_type, SqlScalarType::Char { .. }) {
6016                bail_unsupported!("char list");
6017            }
6018            Ok(SqlScalarType::List {
6019                element_type: Box::new(elem_type),
6020                custom_id: None,
6021            })
6022        }
6023        ResolvedDataType::AnonymousMap {
6024            key_type,
6025            value_type,
6026        } => {
6027            match scalar_type_from_sql(scx, key_type)? {
6028                SqlScalarType::String => {}
6029                other => sql_bail!(
6030                    "map key type must be {}, got {}",
6031                    scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6032                    scx.humanize_sql_scalar_type(&other, false)
6033                ),
6034            }
6035            Ok(SqlScalarType::Map {
6036                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6037                custom_id: None,
6038            })
6039        }
6040        ResolvedDataType::Named { id, modifiers, .. } => {
6041            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6042        }
6043        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6044    }
6045}
6046
6047pub fn scalar_type_from_catalog(
6048    catalog: &dyn SessionCatalog,
6049    id: CatalogItemId,
6050    modifiers: &[i64],
6051) -> Result<SqlScalarType, PlanError> {
6052    let entry = catalog.get_item(&id);
6053    let type_details = match entry.type_details() {
6054        Some(type_details) => type_details,
6055        None => {
6056            // Resolution should never produce a `ResolvedDataType::Named` with
6057            // an ID of a non-type, but we error gracefully just in case.
6058            sql_bail!(
6059                "internal error: {} does not refer to a type",
6060                catalog.resolve_full_name(entry.name()).to_string().quoted()
6061            );
6062        }
6063    };
6064    match &type_details.typ {
6065        CatalogType::Numeric => {
6066            let mut modifiers = modifiers.iter().fuse();
6067            let precision = match modifiers.next() {
6068                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6069                    sql_bail!(
6070                        "precision for type numeric must be between 1 and {}",
6071                        NUMERIC_DATUM_MAX_PRECISION,
6072                    );
6073                }
6074                Some(p) => Some(*p),
6075                None => None,
6076            };
6077            let scale = match modifiers.next() {
6078                Some(scale) => {
6079                    if let Some(precision) = precision {
6080                        if *scale > precision {
6081                            sql_bail!(
6082                                "scale for type numeric must be between 0 and precision {}",
6083                                precision
6084                            );
6085                        }
6086                    }
6087                    Some(NumericMaxScale::try_from(*scale)?)
6088                }
6089                None => None,
6090            };
6091            if modifiers.next().is_some() {
6092                sql_bail!("type numeric supports at most two type modifiers");
6093            }
6094            Ok(SqlScalarType::Numeric { max_scale: scale })
6095        }
6096        CatalogType::Char => {
6097            let mut modifiers = modifiers.iter().fuse();
6098            let length = match modifiers.next() {
6099                Some(l) => Some(CharLength::try_from(*l)?),
6100                None => Some(CharLength::ONE),
6101            };
6102            if modifiers.next().is_some() {
6103                sql_bail!("type character supports at most one type modifier");
6104            }
6105            Ok(SqlScalarType::Char { length })
6106        }
6107        CatalogType::VarChar => {
6108            let mut modifiers = modifiers.iter().fuse();
6109            let length = match modifiers.next() {
6110                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6111                None => None,
6112            };
6113            if modifiers.next().is_some() {
6114                sql_bail!("type character varying supports at most one type modifier");
6115            }
6116            Ok(SqlScalarType::VarChar { max_length: length })
6117        }
6118        CatalogType::Timestamp => {
6119            let mut modifiers = modifiers.iter().fuse();
6120            let precision = match modifiers.next() {
6121                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6122                None => None,
6123            };
6124            if modifiers.next().is_some() {
6125                sql_bail!("type timestamp supports at most one type modifier");
6126            }
6127            Ok(SqlScalarType::Timestamp { precision })
6128        }
6129        CatalogType::TimestampTz => {
6130            let mut modifiers = modifiers.iter().fuse();
6131            let precision = match modifiers.next() {
6132                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6133                None => None,
6134            };
6135            if modifiers.next().is_some() {
6136                sql_bail!("type timestamp with time zone supports at most one type modifier");
6137            }
6138            Ok(SqlScalarType::TimestampTz { precision })
6139        }
6140        t => {
6141            if !modifiers.is_empty() {
6142                sql_bail!(
6143                    "{} does not support type modifiers",
6144                    catalog.resolve_full_name(entry.name()).to_string()
6145                );
6146            }
6147            match t {
6148                CatalogType::Array {
6149                    element_reference: element_id,
6150                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6151                    catalog,
6152                    *element_id,
6153                    modifiers,
6154                )?))),
6155                CatalogType::List {
6156                    element_reference: element_id,
6157                    element_modifiers,
6158                } => Ok(SqlScalarType::List {
6159                    element_type: Box::new(scalar_type_from_catalog(
6160                        catalog,
6161                        *element_id,
6162                        element_modifiers,
6163                    )?),
6164                    custom_id: Some(id),
6165                }),
6166                CatalogType::Map {
6167                    key_reference: _,
6168                    key_modifiers: _,
6169                    value_reference: value_id,
6170                    value_modifiers,
6171                } => Ok(SqlScalarType::Map {
6172                    value_type: Box::new(scalar_type_from_catalog(
6173                        catalog,
6174                        *value_id,
6175                        value_modifiers,
6176                    )?),
6177                    custom_id: Some(id),
6178                }),
6179                CatalogType::Range {
6180                    element_reference: element_id,
6181                } => Ok(SqlScalarType::Range {
6182                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6183                }),
6184                CatalogType::Record { fields } => {
6185                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6186                        .iter()
6187                        .map(|f| {
6188                            let scalar_type = scalar_type_from_catalog(
6189                                catalog,
6190                                f.type_reference,
6191                                &f.type_modifiers,
6192                            )?;
6193                            Ok((
6194                                f.name.clone(),
6195                                SqlColumnType {
6196                                    scalar_type,
6197                                    nullable: true,
6198                                },
6199                            ))
6200                        })
6201                        .collect::<Result<Box<_>, PlanError>>()?;
6202                    Ok(SqlScalarType::Record {
6203                        fields: scalars,
6204                        custom_id: Some(id),
6205                    })
6206                }
6207                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6208                CatalogType::Bool => Ok(SqlScalarType::Bool),
6209                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6210                CatalogType::Date => Ok(SqlScalarType::Date),
6211                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6212                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6213                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6214                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6215                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6216                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6217                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6218                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6219                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6220                CatalogType::Interval => Ok(SqlScalarType::Interval),
6221                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6222                CatalogType::Oid => Ok(SqlScalarType::Oid),
6223                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6224                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6225                CatalogType::Pseudo => {
6226                    sql_bail!(
6227                        "cannot reference pseudo type {}",
6228                        catalog.resolve_full_name(entry.name()).to_string()
6229                    )
6230                }
6231                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6232                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6233                CatalogType::RegType => Ok(SqlScalarType::RegType),
6234                CatalogType::String => Ok(SqlScalarType::String),
6235                CatalogType::Time => Ok(SqlScalarType::Time),
6236                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6237                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6238                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6239                CatalogType::Numeric => unreachable!("handled above"),
6240                CatalogType::Char => unreachable!("handled above"),
6241                CatalogType::VarChar => unreachable!("handled above"),
6242                CatalogType::Timestamp => unreachable!("handled above"),
6243                CatalogType::TimestampTz => unreachable!("handled above"),
6244            }
6245        }
6246    }
6247}
6248
6249/// This is used to collect aggregates and table functions from within an `Expr`.
6250/// See the explanation of aggregate handling at the top of the file for more details.
6251struct AggregateTableFuncVisitor<'a> {
6252    scx: &'a StatementContext<'a>,
6253    aggs: Vec<Function<Aug>>,
6254    within_aggregate: bool,
6255    tables: BTreeMap<Function<Aug>, String>,
6256    table_disallowed_context: Vec<&'static str>,
6257    in_select_item: bool,
6258    id_gen: IdGen,
6259    err: Option<PlanError>,
6260}
6261
6262impl<'a> AggregateTableFuncVisitor<'a> {
6263    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6264        AggregateTableFuncVisitor {
6265            scx,
6266            aggs: Vec::new(),
6267            within_aggregate: false,
6268            tables: BTreeMap::new(),
6269            table_disallowed_context: Vec::new(),
6270            in_select_item: false,
6271            id_gen: Default::default(),
6272            err: None,
6273        }
6274    }
6275
6276    fn into_result(
6277        self,
6278    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6279        match self.err {
6280            Some(err) => Err(err),
6281            None => {
6282                // Dedup while preserving the order. We don't care what the order is, but it
6283                // has to be reproducible so that EXPLAIN PLAN tests work.
6284                let mut seen = BTreeSet::new();
6285                let aggs = self
6286                    .aggs
6287                    .into_iter()
6288                    .filter(move |agg| seen.insert(agg.clone()))
6289                    .collect();
6290                Ok((aggs, self.tables))
6291            }
6292        }
6293    }
6294}
6295
6296impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6297    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6298        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6299            Ok(i) => i,
6300            // Catching missing functions later in planning improves error messages.
6301            Err(_) => return,
6302        };
6303
6304        match item.func() {
6305            // We don't want to collect window aggregations, because these will be handled not by
6306            // plan_aggregate, but by plan_function.
6307            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6308                if self.within_aggregate {
6309                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6310                    return;
6311                }
6312                self.aggs.push(func.clone());
6313                let Function {
6314                    name: _,
6315                    args,
6316                    filter,
6317                    over: _,
6318                    distinct: _,
6319                } = func;
6320                if let Some(filter) = filter {
6321                    self.visit_expr_mut(filter);
6322                }
6323                let old_within_aggregate = self.within_aggregate;
6324                self.within_aggregate = true;
6325                self.table_disallowed_context
6326                    .push("aggregate function calls");
6327
6328                self.visit_function_args_mut(args);
6329
6330                self.within_aggregate = old_within_aggregate;
6331                self.table_disallowed_context.pop();
6332            }
6333            Ok(Func::Table { .. }) => {
6334                self.table_disallowed_context.push("other table functions");
6335                visit_mut::visit_function_mut(self, func);
6336                self.table_disallowed_context.pop();
6337            }
6338            _ => visit_mut::visit_function_mut(self, func),
6339        }
6340    }
6341
6342    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6343        // Don't go into subqueries.
6344    }
6345
6346    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6347        let (disallowed_context, func) = match expr {
6348            Expr::Case { .. } => (Some("CASE"), None),
6349            Expr::HomogenizingFunction {
6350                function: HomogenizingFunction::Coalesce,
6351                ..
6352            } => (Some("COALESCE"), None),
6353            Expr::Function(func) if self.in_select_item => {
6354                // If we're in a SELECT list, replace table functions with a uuid identifier
6355                // and save the table func so it can be planned elsewhere.
6356                let mut table_func = None;
6357                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6358                    if let Ok(Func::Table { .. }) = item.func() {
6359                        if let Some(context) = self.table_disallowed_context.last() {
6360                            self.err = Some(sql_err!(
6361                                "table functions are not allowed in {} (function {})",
6362                                context,
6363                                func.name
6364                            ));
6365                            return;
6366                        }
6367                        table_func = Some(func.clone());
6368                    }
6369                }
6370                // Since we will descend into the table func below, don't add its own disallow
6371                // context here, instead use visit_function to set that.
6372                (None, table_func)
6373            }
6374            _ => (None, None),
6375        };
6376        if let Some(func) = func {
6377            // Since we are trading out expr, we need to visit the table func here.
6378            visit_mut::visit_expr_mut(self, expr);
6379            // Don't attempt to replace table functions with unsupported syntax.
6380            if let Function {
6381                name: _,
6382                args: _,
6383                filter: None,
6384                over: None,
6385                distinct: false,
6386            } = &func
6387            {
6388                // Identical table functions can be de-duplicated.
6389                let unique_id = self.id_gen.allocate_id();
6390                let id = self
6391                    .tables
6392                    .entry(func)
6393                    .or_insert_with(|| format!("table_func_{unique_id}"));
6394                // We know this is okay because id is is 11 characters + <=20 characters, which is
6395                // less than our max length.
6396                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6397            }
6398        }
6399        if let Some(context) = disallowed_context {
6400            self.table_disallowed_context.push(context);
6401        }
6402
6403        visit_mut::visit_expr_mut(self, expr);
6404
6405        if disallowed_context.is_some() {
6406            self.table_disallowed_context.pop();
6407        }
6408    }
6409
6410    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6411        let old = self.in_select_item;
6412        self.in_select_item = true;
6413        visit_mut::visit_select_item_mut(self, si);
6414        self.in_select_item = old;
6415    }
6416}
6417
6418#[derive(Default)]
6419struct WindowFuncCollector {
6420    window_funcs: Vec<Expr<Aug>>,
6421}
6422
6423impl WindowFuncCollector {
6424    fn into_result(self) -> Vec<Expr<Aug>> {
6425        // Dedup while preserving the order.
6426        let mut seen = BTreeSet::new();
6427        let window_funcs_dedupped = self
6428            .window_funcs
6429            .into_iter()
6430            .filter(move |expr| seen.insert(expr.clone()))
6431            // Reverse the order, so that in case of a nested window function call, the
6432            // inner one is evaluated first.
6433            .rev()
6434            .collect();
6435        window_funcs_dedupped
6436    }
6437}
6438
6439impl Visit<'_, Aug> for WindowFuncCollector {
6440    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6441        match expr {
6442            Expr::Function(func) => {
6443                if func.over.is_some() {
6444                    self.window_funcs.push(expr.clone());
6445                }
6446            }
6447            _ => (),
6448        }
6449        visit::visit_expr(self, expr);
6450    }
6451
6452    fn visit_query(&mut self, _query: &Query<Aug>) {
6453        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6454    }
6455}
6456
6457/// Specifies how long a query will live.
6458#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6459pub enum QueryLifetime {
6460    /// The query's (or the expression's) result will be computed at one point in time.
6461    OneShot,
6462    /// The query (or expression) is used in a dataflow that maintains an index.
6463    Index,
6464    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6465    MaterializedView,
6466    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6467    Subscribe,
6468    /// The query (or expression) is part of a (non-materialized) view.
6469    View,
6470    /// The expression is part of a source definition.
6471    Source,
6472}
6473
6474impl QueryLifetime {
6475    /// (This used to impact whether the query is allowed to reason about the time at which it is
6476    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6477    /// mechanism, see `ExprPrepStyle`.)
6478    pub fn is_one_shot(&self) -> bool {
6479        let result = match self {
6480            QueryLifetime::OneShot => true,
6481            QueryLifetime::Index => false,
6482            QueryLifetime::MaterializedView => false,
6483            QueryLifetime::Subscribe => false,
6484            QueryLifetime::View => false,
6485            QueryLifetime::Source => false,
6486        };
6487        assert_eq!(!result, self.is_maintained());
6488        result
6489    }
6490
6491    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6492    /// turned into a `TopK`.
6493    pub fn is_maintained(&self) -> bool {
6494        match self {
6495            QueryLifetime::OneShot => false,
6496            QueryLifetime::Index => true,
6497            QueryLifetime::MaterializedView => true,
6498            QueryLifetime::Subscribe => true,
6499            QueryLifetime::View => true,
6500            QueryLifetime::Source => true,
6501        }
6502    }
6503
6504    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6505    pub fn allow_show(&self) -> bool {
6506        match self {
6507            QueryLifetime::OneShot => true,
6508            QueryLifetime::Index => false,
6509            QueryLifetime::MaterializedView => false,
6510            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6511            QueryLifetime::View => false,
6512            QueryLifetime::Source => false,
6513        }
6514    }
6515}
6516
6517/// Description of a CTE sufficient for query planning.
6518#[derive(Debug, Clone)]
6519pub struct CteDesc {
6520    pub name: String,
6521    pub desc: RelationDesc,
6522}
6523
6524/// The state required when planning a `Query`.
6525#[derive(Debug, Clone)]
6526pub struct QueryContext<'a> {
6527    /// The context for the containing `Statement`.
6528    pub scx: &'a StatementContext<'a>,
6529    /// The lifetime that the planned query will have.
6530    pub lifetime: QueryLifetime,
6531    /// The scopes of the outer relation expression.
6532    pub outer_scopes: Vec<Scope>,
6533    /// The type of the outer relation expressions.
6534    pub outer_relation_types: Vec<SqlRelationType>,
6535    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6536    pub ctes: BTreeMap<LocalId, CteDesc>,
6537    /// A name manager, for interning column names that will be stored in HIR and MIR.
6538    pub name_manager: Rc<RefCell<NameManager>>,
6539    pub recursion_guard: RecursionGuard,
6540}
6541
6542impl CheckedRecursion for QueryContext<'_> {
6543    fn recursion_guard(&self) -> &RecursionGuard {
6544        &self.recursion_guard
6545    }
6546}
6547
6548impl<'a> QueryContext<'a> {
6549    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6550        QueryContext {
6551            scx,
6552            lifetime,
6553            outer_scopes: vec![],
6554            outer_relation_types: vec![],
6555            ctes: BTreeMap::new(),
6556            name_manager: Rc::new(RefCell::new(NameManager::new())),
6557            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6558        }
6559    }
6560
6561    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6562        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6563    }
6564
6565    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6566    /// `self`.
6567    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6568        let ctes = self.ctes.clone();
6569        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6570        let outer_relation_types = iter::once(relation_type)
6571            .chain(self.outer_relation_types.clone())
6572            .collect();
6573        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6574        let name_manager = Rc::clone(&self.name_manager);
6575
6576        QueryContext {
6577            scx: self.scx,
6578            lifetime: self.lifetime,
6579            outer_scopes,
6580            outer_relation_types,
6581            ctes,
6582            name_manager,
6583            recursion_guard: self.recursion_guard.clone(),
6584        }
6585    }
6586
6587    /// Derives a `QueryContext` for a scope that contains no columns.
6588    fn empty_derived_context(&self) -> QueryContext<'a> {
6589        let scope = Scope::empty();
6590        let ty = SqlRelationType::empty();
6591        self.derived_context(scope, ty)
6592    }
6593
6594    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6595    /// CTE.
6596    pub fn resolve_table_name(
6597        &self,
6598        object: ResolvedItemName,
6599    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6600        match object {
6601            ResolvedItemName::Item {
6602                id,
6603                full_name,
6604                version,
6605                ..
6606            } => {
6607                let item = self.scx.get_item(&id).at_version(version);
6608                let desc = match item.relation_desc() {
6609                    Some(desc) => desc.clone(),
6610                    None => {
6611                        return Err(PlanError::InvalidDependency {
6612                            name: full_name.to_string(),
6613                            item_type: item.item_type().to_string(),
6614                        });
6615                    }
6616                };
6617                let expr = HirRelationExpr::Get {
6618                    id: Id::Global(item.global_id()),
6619                    typ: desc.typ().clone(),
6620                };
6621
6622                let name = full_name.into();
6623                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6624
6625                Ok((expr, scope))
6626            }
6627            ResolvedItemName::Cte { id, name } => {
6628                let name = name.into();
6629                let cte = self.ctes.get(&id).unwrap();
6630                let expr = HirRelationExpr::Get {
6631                    id: Id::Local(id),
6632                    typ: cte.desc.typ().clone(),
6633                };
6634
6635                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6636
6637                Ok((expr, scope))
6638            }
6639            ResolvedItemName::ContinualTask { id, name } => {
6640                let cte = self.ctes.get(&id).unwrap();
6641                let expr = HirRelationExpr::Get {
6642                    id: Id::Local(id),
6643                    typ: cte.desc.typ().clone(),
6644                };
6645
6646                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6647
6648                Ok((expr, scope))
6649            }
6650            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6651        }
6652    }
6653
6654    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6655    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6656    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6657        self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6658    }
6659}
6660
6661/// A bundle of unrelated things that we need for planning `Expr`s.
6662#[derive(Debug, Clone)]
6663pub struct ExprContext<'a> {
6664    pub qcx: &'a QueryContext<'a>,
6665    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6666    pub name: &'a str,
6667    /// The context for the `Query` that contains this `Expr`.
6668    /// The current scope.
6669    pub scope: &'a Scope,
6670    /// The type of the current relation expression upon which this scalar
6671    /// expression will be evaluated.
6672    pub relation_type: &'a SqlRelationType,
6673    /// Are aggregate functions allowed in this context
6674    pub allow_aggregates: bool,
6675    /// Are subqueries allowed in this context
6676    pub allow_subqueries: bool,
6677    /// Are parameters allowed in this context.
6678    pub allow_parameters: bool,
6679    /// Are window functions allowed in this context
6680    pub allow_windows: bool,
6681}
6682
6683impl CheckedRecursion for ExprContext<'_> {
6684    fn recursion_guard(&self) -> &RecursionGuard {
6685        &self.qcx.recursion_guard
6686    }
6687}
6688
6689impl<'a> ExprContext<'a> {
6690    pub fn catalog(&self) -> &dyn SessionCatalog {
6691        self.qcx.scx.catalog
6692    }
6693
6694    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6695        let mut ecx = self.clone();
6696        ecx.name = name;
6697        ecx
6698    }
6699
6700    pub fn column_type<E>(&self, expr: &E) -> E::Type
6701    where
6702        E: AbstractExpr,
6703    {
6704        expr.typ(
6705            &self.qcx.outer_relation_types,
6706            self.relation_type,
6707            &self.qcx.scx.param_types.borrow(),
6708        )
6709    }
6710
6711    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6712    where
6713        E: AbstractExpr,
6714    {
6715        self.column_type(expr).scalar_type()
6716    }
6717
6718    fn derived_query_context(&self) -> QueryContext<'_> {
6719        let mut scope = self.scope.clone();
6720        scope.lateral_barrier = true;
6721        self.qcx.derived_context(scope, self.relation_type.clone())
6722    }
6723
6724    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6725        self.qcx.scx.require_feature_flag(flag)
6726    }
6727
6728    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6729        &self.qcx.scx.param_types
6730    }
6731
6732    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6733    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6734    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6735        self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6736    }
6737
6738    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6739        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6740    }
6741}
6742
6743/// Manages column names, doing lightweight string internment.
6744///
6745/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6746/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6747/// to ensure maximal sharing.
6748#[derive(Debug, Clone)]
6749pub struct NameManager(BTreeSet<Arc<str>>);
6750
6751impl NameManager {
6752    /// Creates a new `NameManager`, with no interned names
6753    pub fn new() -> Self {
6754        Self(BTreeSet::new())
6755    }
6756
6757    /// Interns a string, returning a reference-counted pointer to the interned
6758    /// string.
6759    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6760        let s = s.as_ref();
6761        if let Some(interned) = self.0.get(s) {
6762            Arc::clone(interned)
6763        } else {
6764            let interned: Arc<str> = Arc::from(s);
6765            self.0.insert(Arc::clone(&interned));
6766            interned
6767        }
6768    }
6769
6770    /// Interns a string representing a reference to a `ScopeItem`, returning a
6771    /// reference-counted pointer to the interned string.
6772    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6773        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6774        //
6775        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6776        // name information. Note that as of 2025-04-09, we don't support column
6777        // renames.
6778        //
6779        // A few bad alternatives:
6780        //
6781        // (1) Store it but don't write it down. This fails because the expression
6782        //     cache will erase our names on restart.
6783        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6784        //     get the right names. But the world now and the world when we made
6785        //     those objects may be different.
6786        // (3) Just don't write down the table name. Nothing fails... for now.
6787
6788        self.intern(item.column_name.as_str())
6789    }
6790}
6791
6792#[cfg(test)]
6793mod test {
6794    use super::*;
6795
6796    /// Ensure that `NameManager`'s string interning works as expected.
6797    ///
6798    /// In particular, structurally but not referentially identical strings should
6799    /// be interned to the same `Arc`ed pointer.
6800    #[mz_ore::test]
6801    pub fn test_name_manager_string_interning() {
6802        let mut nm = NameManager::new();
6803
6804        let orig_hi = "hi";
6805        let hi = nm.intern(orig_hi);
6806        let hello = nm.intern("hello");
6807
6808        assert_ne!(hi.as_ptr(), hello.as_ptr());
6809
6810        // this static string is _likely_ the same as `orig_hi``
6811        let hi2 = nm.intern("hi");
6812        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6813
6814        // generate a "hi" string that doesn't get optimized to the same static string
6815        let s = format!(
6816            "{}{}",
6817            hi.chars().nth(0).unwrap(),
6818            hi2.chars().nth(1).unwrap()
6819        );
6820        // make sure that we're testing with a fresh string!
6821        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6822
6823        let hi3 = nm.intern(s);
6824        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6825    }
6826}