Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52    func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67    RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103    literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111    pub expr: E,
112    pub desc: RelationDesc,
113    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114    pub scope: Scope,
115}
116
117/// Plans a top-level query, returning the `HirRelationExpr` describing the query
118/// plan, the `RelationDesc` describing the shape of the result set, a
119/// `RowSetFinishing` describing post-processing that must occur before results
120/// are sent to the client, and the types of the parameters in the query, if any
121/// were present.
122///
123/// Note that the returned `RelationDesc` describes the expression after
124/// applying the returned `RowSetFinishing`.
125#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127    scx: &StatementContext,
128    mut query: Query<Aug>,
129    lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131    transform_ast::transform(scx, &mut query)?;
132    let mut qcx = QueryContext::root(scx, lifetime);
133    let PlannedQuery {
134        mut expr,
135        scope,
136        order_by,
137        limit,
138        offset,
139        project,
140        group_size_hints,
141    } = plan_query(&mut qcx, &query)?;
142
143    let mut finishing = RowSetFinishing {
144        limit,
145        offset,
146        project,
147        order_by,
148    };
149
150    // Attempt to push the finishing's ordering past its projection. This allows
151    // data to be projected down on the workers rather than the coordinator. It
152    // also improves the optimizer's demand analysis, as the optimizer can only
153    // reason about demand information in `expr` (i.e., it can't see
154    // `finishing.project`).
155    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157    if lifetime.is_maintained() {
158        expr.finish_maintained(&mut finishing, group_size_hints);
159    }
160
161    let typ = qcx.relation_type(&expr);
162    let typ = SqlRelationType::new(
163        finishing
164            .project
165            .iter()
166            .map(|i| typ.column_types[*i].clone())
167            .collect(),
168    );
169    let desc = RelationDesc::new(typ, scope.column_names());
170
171    Ok(PlannedRootQuery {
172        expr,
173        desc,
174        finishing,
175        scope,
176    })
177}
178
179/// TODO(ct2): Dedup this with [plan_root_query].
180#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182    qcx: &mut QueryContext,
183    mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185    transform_ast::transform(qcx.scx, &mut query)?;
186    let PlannedQuery {
187        mut expr,
188        scope,
189        order_by,
190        limit,
191        offset,
192        project,
193        group_size_hints,
194    } = plan_query(qcx, &query)?;
195
196    let mut finishing = RowSetFinishing {
197        limit,
198        offset,
199        project,
200        order_by,
201    };
202
203    // Attempt to push the finishing's ordering past its projection. This allows
204    // data to be projected down on the workers rather than the coordinator. It
205    // also improves the optimizer's demand analysis, as the optimizer can only
206    // reason about demand information in `expr` (i.e., it can't see
207    // `finishing.project`).
208    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210    expr.finish_maintained(&mut finishing, group_size_hints);
211
212    let typ = qcx.relation_type(&expr);
213    let typ = SqlRelationType::new(
214        finishing
215            .project
216            .iter()
217            .map(|i| typ.column_types[*i].clone())
218            .collect(),
219    );
220    let desc = RelationDesc::new(typ, scope.column_names());
221
222    Ok(PlannedRootQuery {
223        expr,
224        desc,
225        finishing,
226        scope,
227    })
228}
229
230/// Attempts to push a projection through an order by.
231///
232/// The returned bool indicates whether the pushdown was successful or not.
233/// Successful pushdown requires that all the columns referenced in `order_by`
234/// are included in `project`.
235///
236/// When successful, `expr` is wrapped in a projection node, `order_by` is
237/// rewritten to account for the pushed-down projection, and `project` is
238/// replaced with the trivial projection. When unsuccessful, no changes are made
239/// to any of the inputs.
240fn try_push_projection_order_by(
241    expr: &mut HirRelationExpr,
242    project: &mut Vec<usize>,
243    order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245    let mut unproject = vec![None; expr.arity()];
246    for (out_i, in_i) in project.iter().copied().enumerate() {
247        unproject[in_i] = Some(out_i);
248    }
249    if order_by
250        .iter()
251        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252    {
253        let trivial_project = (0..project.len()).collect();
254        *expr = expr.take().project(mem::replace(project, trivial_project));
255        for ob in order_by {
256            ob.column = unproject[ob.column].unwrap();
257        }
258        true
259    } else {
260        false
261    }
262}
263
264pub fn plan_insert_query(
265    scx: &StatementContext,
266    table_name: ResolvedItemName,
267    columns: Vec<Ident>,
268    source: InsertSource<Aug>,
269    returning: Vec<SelectItem<Aug>>,
270) -> Result<
271    (
272        CatalogItemId,
273        HirRelationExpr,
274        PlannedRootQuery<Vec<HirScalarExpr>>,
275    ),
276    PlanError,
277> {
278    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279    let table = scx.get_item_by_resolved_name(&table_name)?;
280
281    // Validate the target of the insert.
282    if table.item_type() != CatalogItemType::Table {
283        sql_bail!(
284            "cannot insert into {} '{}'",
285            table.item_type(),
286            table_name.full_name_str()
287        );
288    }
289    let desc = table.relation_desc().expect("table has desc");
290    let mut defaults = table
291        .writable_table_details()
292        .ok_or_else(|| {
293            sql_err!(
294                "cannot insert into non-writeable table '{}'",
295                table_name.full_name_str()
296            )
297        })?
298        .to_vec();
299
300    for default in &mut defaults {
301        transform_ast::transform(scx, default)?;
302    }
303
304    if table.id().is_system() {
305        sql_bail!(
306            "cannot insert into system table '{}'",
307            table_name.full_name_str()
308        );
309    }
310
311    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313    // Validate target column order.
314    let mut source_types = Vec::with_capacity(columns.len());
315    let mut ordering = Vec::with_capacity(columns.len());
316
317    if columns.is_empty() {
318        // Columns in source query must be in order. Let's guess the full shape and truncate to the
319        // right size later after planning the source query
320        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321        ordering.extend(0..desc.arity());
322    } else {
323        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324            .iter()
325            .enumerate()
326            .map(|(idx, (name, typ))| (name, (idx, typ)))
327            .collect();
328
329        for c in &columns {
330            if let Some((idx, typ)) = column_by_name.get(c) {
331                ordering.push(*idx);
332                source_types.push(&typ.scalar_type);
333            } else {
334                sql_bail!(
335                    "column {} of relation {} does not exist",
336                    c.quoted(),
337                    table_name.full_name_str().quoted()
338                );
339            }
340        }
341        if let Some(dup) = columns.iter().duplicates().next() {
342            sql_bail!("column {} specified more than once", dup.quoted());
343        }
344    };
345
346    // Plan the source.
347    let expr = match source {
348        InsertSource::Query(mut query) => {
349            transform_ast::transform(scx, &mut query)?;
350
351            match query {
352                // Special-case simple VALUES clauses as PostgreSQL does.
353                Query {
354                    body: SetExpr::Values(Values(values)),
355                    ctes,
356                    order_by,
357                    limit: None,
358                    offset: None,
359                } if ctes.is_empty() && order_by.is_empty() => {
360                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361                    plan_values_insert(&qcx, &names, &source_types, &values)?
362                }
363                _ => {
364                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365                    expr
366                }
367            }
368        }
369        InsertSource::DefaultValues => {
370            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371        }
372    };
373
374    let expr_arity = expr.arity();
375
376    // Validate that the arity of the source query is at most the size of declared columns or the
377    // size of the table if none are declared
378    let max_columns = if columns.is_empty() {
379        desc.arity()
380    } else {
381        columns.len()
382    };
383    if expr_arity > max_columns {
384        sql_bail!("INSERT has more expressions than target columns");
385    }
386    // But it should never have less than the declared columns (or zero)
387    if expr_arity < columns.len() {
388        sql_bail!("INSERT has more target columns than expressions");
389    }
390
391    // Trim now that we know for sure the correct arity of the source query
392    source_types.truncate(expr_arity);
393    ordering.truncate(expr_arity);
394
395    // Ensure the types of the source query match the types of the target table,
396    // installing assignment casts where necessary and possible.
397    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398        sql_err!(
399            "column {} is of type {} but expression is of type {}",
400            desc.get_name(ordering[e.column]).quoted(),
401            qcx.humanize_scalar_type(&e.target_type, false),
402            qcx.humanize_scalar_type(&e.source_type, false),
403        )
404    })?;
405
406    // Fill in any omitted columns and rearrange into correct order
407    let mut map_exprs = vec![];
408    let mut project_key = Vec::with_capacity(desc.arity());
409
410    // Maps from table column index to position in the source query
411    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414    for (col_idx, (col_typ, default)) in column_details {
415        if let Some(src_idx) = col_to_source.get(&col_idx) {
416            project_key.push(*src_idx);
417        } else {
418            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419            project_key.push(expr_arity + map_exprs.len());
420            map_exprs.push(hir);
421        }
422    }
423
424    let returning = {
425        let (scope, typ) = if let ResolvedItemName::Item {
426            full_name,
427            version: _,
428            ..
429        } = table_name
430        {
431            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432            let typ = desc.typ().clone();
433            (scope, typ)
434        } else {
435            (Scope::empty(), SqlRelationType::empty())
436        };
437        let ecx = &ExprContext {
438            qcx: &qcx,
439            name: "RETURNING clause",
440            scope: &scope,
441            relation_type: &typ,
442            allow_aggregates: false,
443            allow_subqueries: false,
444            allow_parameters: true,
445            allow_windows: false,
446        };
447        let table_func_names = BTreeMap::new();
448        let mut output_columns = vec![];
449        let mut new_exprs = vec![];
450        let mut new_type = SqlRelationType::empty();
451        for mut si in returning {
452            transform_ast::transform(scx, &mut si)?;
453            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454                let expr = match &select_item {
455                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457                };
458                output_columns.push(column_name);
459                let typ = ecx.column_type(&expr);
460                new_type.column_types.push(typ);
461                new_exprs.push(expr);
462            }
463        }
464        let desc = RelationDesc::new(new_type, output_columns);
465        let desc_arity = desc.arity();
466        PlannedRootQuery {
467            expr: new_exprs,
468            desc,
469            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470            scope,
471        }
472    };
473
474    Ok((
475        table.id(),
476        expr.map(map_exprs).project(project_key),
477        returning,
478    ))
479}
480
481/// Determines the mapping between some external data and a Materialize relation.
482///
483/// Returns the following:
484/// * [`CatalogItemId`] for the destination table.
485/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
486/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
487///   since we now return a [`MapFilterProject`].
488/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
489///   destination table.
490///
491pub fn plan_copy_item(
492    scx: &StatementContext,
493    item_name: ResolvedItemName,
494    columns: Vec<Ident>,
495) -> Result<
496    (
497        CatalogItemId,
498        RelationDesc,
499        Vec<ColumnIndex>,
500        Option<MapFilterProject>,
501    ),
502    PlanError,
503> {
504    let item = scx.get_item_by_resolved_name(&item_name)?;
505    let fullname = scx.catalog.resolve_full_name(item.name());
506    let table_desc = match item.relation_desc() {
507        Some(desc) => desc.into_owned(),
508        None => {
509            return Err(PlanError::InvalidDependency {
510                name: fullname.to_string(),
511                item_type: item.item_type().to_string(),
512            });
513        }
514    };
515    let mut ordering = Vec::with_capacity(columns.len());
516
517    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
518    // should be simplified. The reason they are currently separate code paths is so we can roll
519    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
520
521    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
522    // FROM SOURCE ...`), then we generate an MFP.
523    //
524    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
525    // so it's not always guaranteed that our `item` is a table.
526    let mfp = if let Some(table_defaults) = item.writable_table_details() {
527        let mut table_defaults = table_defaults.to_vec();
528
529        for default in &mut table_defaults {
530            transform_ast::transform(scx, default)?;
531        }
532
533        // Fill in any omitted columns and rearrange into correct order
534        let source_column_names: Vec<_> = columns
535            .iter()
536            .cloned()
537            .map(normalize::column_name)
538            .collect();
539
540        let mut default_exprs = Vec::new();
541        let mut project_keys = Vec::with_capacity(table_desc.arity());
542
543        // For each column in the destination table, either project it from the source data, or provide
544        // an expression to fill in a default value.
545        let column_details = table_desc.iter().zip_eq(table_defaults);
546        for ((col_name, col_type), col_default) in column_details {
547            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
548            if let Some(src_idx) = maybe_src_idx {
549                project_keys.push(src_idx);
550            } else {
551                // If one a column from the table does not exist in the source data, then a default
552                // value will get appended to the end of the input Row from the source data.
553                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
554                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
555                project_keys.push(source_column_names.len() + default_exprs.len());
556                default_exprs.push(mir);
557            }
558        }
559
560        let mfp = MapFilterProject::new(source_column_names.len())
561            .map(default_exprs)
562            .project(project_keys);
563        Some(mfp)
564    } else {
565        None
566    };
567
568    // Create a mapping from input data to the table we're copying into.
569    let source_desc = if columns.is_empty() {
570        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
571        ordering.extend(indexes);
572
573        // The source data should be in the same order as the table.
574        table_desc
575    } else {
576        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
577        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
578            .iter_all()
579            .map(|(idx, name, typ)| (name, (*idx, typ)))
580            .collect();
581
582        let mut names = Vec::with_capacity(columns.len());
583        let mut source_types = Vec::with_capacity(columns.len());
584
585        for c in &columns {
586            if let Some((idx, typ)) = column_by_name.get(c) {
587                ordering.push(*idx);
588                source_types.push((*typ).clone());
589                names.push(c.clone());
590            } else {
591                sql_bail!(
592                    "column {} of relation {} does not exist",
593                    c.quoted(),
594                    item_name.full_name_str().quoted()
595                );
596            }
597        }
598        if let Some(dup) = columns.iter().duplicates().next() {
599            sql_bail!("column {} specified more than once", dup.quoted());
600        }
601
602        // The source data is a different shape than the destination table.
603        RelationDesc::new(SqlRelationType::new(source_types), names)
604    };
605
606    Ok((item.id(), source_desc, ordering, mfp))
607}
608
609/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
610///
611/// TODO(cf3): Merge this method with [`plan_copy_item`].
612pub fn plan_copy_from(
613    scx: &StatementContext,
614    table_name: ResolvedItemName,
615    columns: Vec<Ident>,
616) -> Result<
617    (
618        CatalogItemId,
619        RelationDesc,
620        Vec<ColumnIndex>,
621        Option<MapFilterProject>,
622    ),
623    PlanError,
624> {
625    let table = scx.get_item_by_resolved_name(&table_name)?;
626
627    // Validate the target of the insert.
628    if table.item_type() != CatalogItemType::Table {
629        sql_bail!(
630            "cannot insert into {} '{}'",
631            table.item_type(),
632            table_name.full_name_str()
633        );
634    }
635
636    let _ = table.writable_table_details().ok_or_else(|| {
637        sql_err!(
638            "cannot insert into non-writeable table '{}'",
639            table_name.full_name_str()
640        )
641    })?;
642
643    if table.id().is_system() {
644        sql_bail!(
645            "cannot insert into system table '{}'",
646            table_name.full_name_str()
647        );
648    }
649    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
650
651    Ok((id, desc, ordering, mfp))
652}
653
654/// Builds a plan that adds the default values for the missing columns and re-orders
655/// the datums in the given rows to match the order in the target table.
656pub fn plan_copy_from_rows(
657    pcx: &PlanContext,
658    catalog: &dyn SessionCatalog,
659    target_id: CatalogItemId,
660    target_name: String,
661    columns: Vec<ColumnIndex>,
662    rows: Vec<mz_repr::Row>,
663) -> Result<HirRelationExpr, PlanError> {
664    let scx = StatementContext::new(Some(pcx), catalog);
665
666    // Always copy at the latest version of the table.
667    let table = catalog
668        .try_get_item(&target_id)
669        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
670        .at_version(RelationVersionSelector::Latest);
671
672    let mut defaults = table
673        .writable_table_details()
674        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
675        .to_vec();
676
677    for default in &mut defaults {
678        transform_ast::transform(&scx, default)?;
679    }
680
681    let desc = table.relation_desc().expect("table has desc");
682    let column_types = columns
683        .iter()
684        .map(|x| desc.get_type(x).clone())
685        .map(|mut x| {
686            // Null constraint is enforced later, when inserting the row in the table.
687            // Without this, an assert is hit during lowering.
688            x.nullable = true;
689            x
690        })
691        .collect();
692    let typ = SqlRelationType::new(column_types);
693    let expr = HirRelationExpr::Constant {
694        rows,
695        typ: typ.clone(),
696    };
697
698    // Exit early with just the raw constant if we know that all columns are present
699    // and in the correct order. This lets us bypass expensive downstream optimizations
700    // more easily, as at every stage we know this expression is nothing more than
701    // a constant (as opposed to e.g. a constant with with an identity map and identity
702    // projection).
703    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
704    if columns == default {
705        return Ok(expr);
706    }
707
708    // Fill in any omitted columns and rearrange into correct order
709    let mut map_exprs = vec![];
710    let mut project_key = Vec::with_capacity(desc.arity());
711
712    // Maps from table column index to position in the source query
713    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
714
715    let column_details = desc.iter_all().zip_eq(defaults);
716    for ((col_idx, _col_name, col_typ), default) in column_details {
717        if let Some(src_idx) = col_to_source.get(&col_idx) {
718            project_key.push(*src_idx);
719        } else {
720            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
721            project_key.push(typ.arity() + map_exprs.len());
722            map_exprs.push(hir);
723        }
724    }
725
726    Ok(expr.map(map_exprs).project(project_key))
727}
728
729/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
730pub struct ReadThenWritePlan {
731    pub id: CatalogItemId,
732    /// Read portion of query.
733    ///
734    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
735    /// retractions.
736    pub selection: HirRelationExpr,
737    /// Map from column index to SET expression. Empty for DELETE statements.
738    pub assignments: BTreeMap<usize, HirScalarExpr>,
739    pub finishing: RowSetFinishing,
740}
741
742pub fn plan_delete_query(
743    scx: &StatementContext,
744    mut delete_stmt: DeleteStatement<Aug>,
745) -> Result<ReadThenWritePlan, PlanError> {
746    transform_ast::transform(scx, &mut delete_stmt)?;
747
748    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
749    plan_mutation_query_inner(
750        qcx,
751        delete_stmt.table_name,
752        delete_stmt.alias,
753        delete_stmt.using,
754        vec![],
755        delete_stmt.selection,
756    )
757}
758
759pub fn plan_update_query(
760    scx: &StatementContext,
761    mut update_stmt: UpdateStatement<Aug>,
762) -> Result<ReadThenWritePlan, PlanError> {
763    transform_ast::transform(scx, &mut update_stmt)?;
764
765    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
766
767    plan_mutation_query_inner(
768        qcx,
769        update_stmt.table_name,
770        update_stmt.alias,
771        vec![],
772        update_stmt.assignments,
773        update_stmt.selection,
774    )
775}
776
777pub fn plan_mutation_query_inner(
778    qcx: QueryContext,
779    table_name: ResolvedItemName,
780    alias: Option<TableAlias>,
781    using: Vec<TableWithJoins<Aug>>,
782    assignments: Vec<Assignment<Aug>>,
783    selection: Option<Expr<Aug>>,
784) -> Result<ReadThenWritePlan, PlanError> {
785    // Get ID and version of the relation desc.
786    let (id, version) = match table_name {
787        ResolvedItemName::Item { id, version, .. } => (id, version),
788        _ => sql_bail!("cannot mutate non-user table"),
789    };
790
791    // Perform checks on item with given ID.
792    let item = qcx.scx.get_item(&id).at_version(version);
793    if item.item_type() != CatalogItemType::Table {
794        sql_bail!(
795            "cannot mutate {} '{}'",
796            item.item_type(),
797            table_name.full_name_str()
798        );
799    }
800    let _ = item.writable_table_details().ok_or_else(|| {
801        sql_err!(
802            "cannot mutate non-writeable table '{}'",
803            table_name.full_name_str()
804        )
805    })?;
806    if id.is_system() {
807        sql_bail!(
808            "cannot mutate system table '{}'",
809            table_name.full_name_str()
810        );
811    }
812
813    // Derive structs for operation from validated table
814    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
815    let scope = plan_table_alias(scope, alias.as_ref())?;
816    let desc = item.relation_desc().expect("table has desc");
817    let relation_type = qcx.relation_type(&get);
818
819    if using.is_empty() {
820        if let Some(expr) = selection {
821            let ecx = &ExprContext {
822                qcx: &qcx,
823                name: "WHERE clause",
824                scope: &scope,
825                relation_type: &relation_type,
826                allow_aggregates: false,
827                allow_subqueries: true,
828                allow_parameters: true,
829                allow_windows: false,
830            };
831            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
832            get = get.filter(vec![expr]);
833        }
834    } else {
835        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
836    }
837
838    let mut sets = BTreeMap::new();
839    for Assignment { id, value } in assignments {
840        // Get the index and type of the column.
841        let name = normalize::column_name(id);
842        match desc.get_by_name(&name) {
843            Some((idx, typ)) => {
844                let ecx = &ExprContext {
845                    qcx: &qcx,
846                    name: "SET clause",
847                    scope: &scope,
848                    relation_type: &relation_type,
849                    allow_aggregates: false,
850                    allow_subqueries: false,
851                    allow_parameters: true,
852                    allow_windows: false,
853                };
854                let expr = plan_expr(ecx, &value)?.cast_to(
855                    ecx,
856                    CastContext::Assignment,
857                    &typ.scalar_type,
858                )?;
859
860                if sets.insert(idx, expr).is_some() {
861                    sql_bail!("column {} set twice", name)
862                }
863            }
864            None => sql_bail!("unknown column {}", name),
865        };
866    }
867
868    let finishing = RowSetFinishing {
869        order_by: vec![],
870        limit: None,
871        offset: 0,
872        project: (0..desc.arity()).collect(),
873    };
874
875    Ok(ReadThenWritePlan {
876        id,
877        selection: get,
878        finishing,
879        assignments: sets,
880    })
881}
882
883// Adjust `get` to perform an existential subquery on `using` accounting for
884// `selection`.
885//
886// If `USING`, we essentially want to rewrite the query as a correlated
887// existential subquery, i.e.
888// ```
889// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
890// ```
891// However, we can't do that directly because of esoteric rules w/r/t `lateral`
892// subqueries.
893// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
894fn handle_mutation_using_clause(
895    qcx: &QueryContext,
896    selection: Option<Expr<Aug>>,
897    using: Vec<TableWithJoins<Aug>>,
898    get: HirRelationExpr,
899    outer_scope: Scope,
900) -> Result<HirRelationExpr, PlanError> {
901    // Plan `USING` as a cross-joined `FROM` without knowledge of the
902    // statement's `FROM` target. This prevents `lateral` subqueries from
903    // "seeing" the `FROM` target.
904    let (mut using_rel_expr, using_scope) =
905        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
906            let (left, left_scope) = l;
907            plan_join(
908                qcx,
909                left,
910                left_scope,
911                &Join {
912                    relation: TableFactor::NestedJoin {
913                        join: Box::new(twj),
914                        alias: None,
915                    },
916                    join_operator: JoinOperator::CrossJoin,
917                },
918            )
919        })?;
920
921    if let Some(expr) = selection {
922        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
923        // PG-like semantics e.g. expressing ambiguous column references. We put
924        // `USING...` first for no real reason, but making a different decision
925        // would require adjusting the column references on this relation
926        // differently.
927        let on = HirScalarExpr::literal_true();
928        let joined = using_rel_expr
929            .clone()
930            .join(get.clone(), on, JoinKind::Inner);
931        let joined_scope = using_scope.product(outer_scope)?;
932        let joined_relation_type = qcx.relation_type(&joined);
933
934        let ecx = &ExprContext {
935            qcx,
936            name: "WHERE clause",
937            scope: &joined_scope,
938            relation_type: &joined_relation_type,
939            allow_aggregates: false,
940            allow_subqueries: true,
941            allow_parameters: true,
942            allow_windows: false,
943        };
944
945        // Plan the filter expression on `FROM, USING...`.
946        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
947
948        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
949        // those to the right of `using_rel_expr`) to instead be correlated to
950        // the outer relation, i.e. `get`.
951        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
952        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
953        use mz_expr::visit::Visit;
954        expr.visit_mut_post(&mut |e| {
955            if let HirScalarExpr::Column(c, _name) = e {
956                if c.column >= using_rel_arity {
957                    c.level += 1;
958                    c.column -= using_rel_arity;
959                };
960            }
961        })?;
962
963        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
964        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
965        // relation.
966        using_rel_expr = using_rel_expr.filter(vec![expr]);
967    } else {
968        // Check that scopes are at compatible (i.e. do not double-reference
969        // same table), despite lack of selection
970        let _joined_scope = using_scope.product(outer_scope)?;
971    }
972    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
973    // whether any rows are returned, and not on the contents of those rows,
974    // the output list of the subquery is normally unimportant.
975    //
976    // This means we don't need to worry about projecting/mapping any
977    // additional expressions here.
978    //
979    // https://www.postgresql.org/docs/14/functions-subquery.html
980
981    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
982    Ok(get.filter(vec![using_rel_expr.exists()]))
983}
984
985#[derive(Debug)]
986pub(crate) struct CastRelationError {
987    pub(crate) column: usize,
988    pub(crate) source_type: SqlScalarType,
989    pub(crate) target_type: SqlScalarType,
990}
991
992/// Cast a relation from one type to another using the specified type of cast.
993///
994/// The length of `target_types` must match the arity of `expr`.
995pub(crate) fn cast_relation<'a, I>(
996    qcx: &QueryContext,
997    ccx: CastContext,
998    expr: HirRelationExpr,
999    target_types: I,
1000) -> Result<HirRelationExpr, CastRelationError>
1001where
1002    I: IntoIterator<Item = &'a SqlScalarType>,
1003{
1004    let ecx = &ExprContext {
1005        qcx,
1006        name: "values",
1007        scope: &Scope::empty(),
1008        relation_type: &qcx.relation_type(&expr),
1009        allow_aggregates: false,
1010        allow_subqueries: true,
1011        allow_parameters: true,
1012        allow_windows: false,
1013    };
1014    let mut map_exprs = vec![];
1015    let mut project_key = vec![];
1016    for (i, target_typ) in target_types.into_iter().enumerate() {
1017        let expr = HirScalarExpr::column(i);
1018        // We plan every cast and check the evaluated expressions rather than
1019        // checking the types directly because of some complex casting rules
1020        // between types not expressed in `SqlScalarType` equality.
1021        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1022            Ok(cast_expr) => {
1023                if expr == cast_expr {
1024                    // Cast between types was unnecessary
1025                    project_key.push(i);
1026                } else {
1027                    // Cast between types required
1028                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1029                    map_exprs.push(cast_expr);
1030                }
1031            }
1032            Err(_) => {
1033                return Err(CastRelationError {
1034                    column: i,
1035                    source_type: ecx.scalar_type(&expr),
1036                    target_type: target_typ.clone(),
1037                });
1038            }
1039        }
1040    }
1041    Ok(expr.map(map_exprs).project(project_key))
1042}
1043
1044/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1045/// VIEW` statement.
1046pub fn plan_as_of(
1047    scx: &StatementContext,
1048    as_of: Option<AsOf<Aug>>,
1049) -> Result<QueryWhen, PlanError> {
1050    match as_of {
1051        None => Ok(QueryWhen::Immediately),
1052        Some(as_of) => match as_of {
1053            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1054            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1055        },
1056    }
1057}
1058
1059/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1060///
1061/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1062/// - not a constant,
1063/// - not castable to MzTimestamp,
1064/// - is null,
1065/// - contains an unmaterializable function,
1066/// - some other evaluation error occurs, e.g., a division by 0,
1067/// - contains aggregates, subqueries, parameters, or window function calls.
1068pub fn plan_as_of_or_up_to(
1069    scx: &StatementContext,
1070    mut expr: Expr<Aug>,
1071) -> Result<mz_repr::Timestamp, PlanError> {
1072    let scope = Scope::empty();
1073    let desc = RelationDesc::empty();
1074    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1075    // evaluated only once.)
1076    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1077    transform_ast::transform(scx, &mut expr)?;
1078    let ecx = &ExprContext {
1079        qcx: &qcx,
1080        name: "AS OF or UP TO",
1081        scope: &scope,
1082        relation_type: desc.typ(),
1083        allow_aggregates: false,
1084        allow_subqueries: false,
1085        allow_parameters: false,
1086        allow_windows: false,
1087    };
1088    let hir = plan_expr(ecx, &expr)?.cast_to(
1089        ecx,
1090        CastContext::Assignment,
1091        &SqlScalarType::MzTimestamp,
1092    )?;
1093    if hir.contains_unmaterializable() {
1094        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1095    }
1096    // At this point, we definitely have a constant expression:
1097    // - it can't contain any unmaterializable functions;
1098    // - it can't refer to any columns.
1099    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1100    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1101    // division by 0.
1102    let timestamp = hir
1103        .into_literal_mz_timestamp()
1104        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1105    Ok(timestamp)
1106}
1107
1108/// Plans an expression in the AS position of a `CREATE SECRET`.
1109pub fn plan_secret_as(
1110    scx: &StatementContext,
1111    mut expr: Expr<Aug>,
1112) -> Result<MirScalarExpr, PlanError> {
1113    let scope = Scope::empty();
1114    let desc = RelationDesc::empty();
1115    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1116
1117    transform_ast::transform(scx, &mut expr)?;
1118
1119    let ecx = &ExprContext {
1120        qcx: &qcx,
1121        name: "AS",
1122        scope: &scope,
1123        relation_type: desc.typ(),
1124        allow_aggregates: false,
1125        allow_subqueries: false,
1126        allow_parameters: false,
1127        allow_windows: false,
1128    };
1129    let expr = plan_expr(ecx, &expr)?
1130        .type_as(ecx, &SqlScalarType::Bytes)?
1131        .lower_uncorrelated(scx.catalog.system_vars())?;
1132    Ok(expr)
1133}
1134
1135/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1136pub fn plan_webhook_validate_using(
1137    scx: &StatementContext,
1138    validate_using: CreateWebhookSourceCheck<Aug>,
1139) -> Result<WebhookValidation, PlanError> {
1140    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1141
1142    let CreateWebhookSourceCheck {
1143        options,
1144        using: mut expr,
1145    } = validate_using;
1146
1147    let mut column_typs = vec![];
1148    let mut column_names = vec![];
1149
1150    let (bodies, headers, secrets) = options
1151        .map(|o| (o.bodies, o.headers, o.secrets))
1152        .unwrap_or_default();
1153
1154    // Append all of the bodies so they can be used in the expression.
1155    let mut body_tuples = vec![];
1156    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1157        let scalar_type = use_bytes
1158            .then_some(SqlScalarType::Bytes)
1159            .unwrap_or(SqlScalarType::String);
1160        let name = alias
1161            .map(|a| a.into_string())
1162            .unwrap_or_else(|| "body".to_string());
1163
1164        column_typs.push(SqlColumnType {
1165            scalar_type,
1166            nullable: false,
1167        });
1168        column_names.push(name);
1169
1170        // Store the column index so we can be sure to provide this body correctly.
1171        let column_idx = column_typs.len() - 1;
1172        // Double check we're consistent with column names.
1173        assert_eq!(
1174            column_idx,
1175            column_names.len() - 1,
1176            "body column names and types don't match"
1177        );
1178        body_tuples.push((column_idx, use_bytes));
1179    }
1180
1181    // Append all of the headers so they can be used in the expression.
1182    let mut header_tuples = vec![];
1183
1184    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1185        let value_type = use_bytes
1186            .then_some(SqlScalarType::Bytes)
1187            .unwrap_or(SqlScalarType::String);
1188        let name = alias
1189            .map(|a| a.into_string())
1190            .unwrap_or_else(|| "headers".to_string());
1191
1192        column_typs.push(SqlColumnType {
1193            scalar_type: SqlScalarType::Map {
1194                value_type: Box::new(value_type),
1195                custom_id: None,
1196            },
1197            nullable: false,
1198        });
1199        column_names.push(name);
1200
1201        // Store the column index so we can be sure to provide this body correctly.
1202        let column_idx = column_typs.len() - 1;
1203        // Double check we're consistent with column names.
1204        assert_eq!(
1205            column_idx,
1206            column_names.len() - 1,
1207            "header column names and types don't match"
1208        );
1209        header_tuples.push((column_idx, use_bytes));
1210    }
1211
1212    // Append all secrets so they can be used in the expression.
1213    let mut validation_secrets = vec![];
1214
1215    for CreateWebhookSourceSecret {
1216        secret,
1217        alias,
1218        use_bytes,
1219    } in secrets
1220    {
1221        // Either provide the secret to the validation expression as Bytes or a String.
1222        let scalar_type = use_bytes
1223            .then_some(SqlScalarType::Bytes)
1224            .unwrap_or(SqlScalarType::String);
1225
1226        column_typs.push(SqlColumnType {
1227            scalar_type,
1228            nullable: false,
1229        });
1230        let ResolvedItemName::Item {
1231            id,
1232            full_name: FullItemName { item, .. },
1233            ..
1234        } = secret
1235        else {
1236            return Err(PlanError::InvalidSecret(Box::new(secret)));
1237        };
1238
1239        // Plan the expression using the secret's alias, if one is provided.
1240        let name = if let Some(alias) = alias {
1241            alias.into_string()
1242        } else {
1243            item
1244        };
1245        column_names.push(name);
1246
1247        // Get the column index that corresponds for this secret, so we can make sure to provide the
1248        // secrets in the correct order during evaluation.
1249        let column_idx = column_typs.len() - 1;
1250        // Double check that our column names and types match.
1251        assert_eq!(
1252            column_idx,
1253            column_names.len() - 1,
1254            "column names and types don't match"
1255        );
1256
1257        validation_secrets.push(WebhookValidationSecret {
1258            id,
1259            column_idx,
1260            use_bytes,
1261        });
1262    }
1263
1264    let relation_typ = SqlRelationType::new(column_typs);
1265    let desc = RelationDesc::new(relation_typ, column_names.clone());
1266    let scope = Scope::from_source(None, column_names);
1267
1268    transform_ast::transform(scx, &mut expr)?;
1269
1270    let ecx = &ExprContext {
1271        qcx: &qcx,
1272        name: "CHECK",
1273        scope: &scope,
1274        relation_type: desc.typ(),
1275        allow_aggregates: false,
1276        allow_subqueries: false,
1277        allow_parameters: false,
1278        allow_windows: false,
1279    };
1280    let expr = plan_expr(ecx, &expr)?
1281        .type_as(ecx, &SqlScalarType::Bool)?
1282        .lower_uncorrelated(scx.catalog.system_vars())?;
1283    let validation = WebhookValidation {
1284        expression: expr,
1285        relation_desc: desc,
1286        bodies: body_tuples,
1287        headers: header_tuples,
1288        secrets: validation_secrets,
1289    };
1290    Ok(validation)
1291}
1292
1293pub fn plan_default_expr(
1294    scx: &StatementContext,
1295    expr: &Expr<Aug>,
1296    target_ty: &SqlScalarType,
1297) -> Result<HirScalarExpr, PlanError> {
1298    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1299    let ecx = &ExprContext {
1300        qcx: &qcx,
1301        name: "DEFAULT expression",
1302        scope: &Scope::empty(),
1303        relation_type: &SqlRelationType::empty(),
1304        allow_aggregates: false,
1305        allow_subqueries: false,
1306        allow_parameters: false,
1307        allow_windows: false,
1308    };
1309    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1310    Ok(hir)
1311}
1312
1313pub fn plan_params<'a>(
1314    scx: &'a StatementContext,
1315    params: Vec<Expr<Aug>>,
1316    desc: &StatementDesc,
1317) -> Result<Params, PlanError> {
1318    if params.len() != desc.param_types.len() {
1319        sql_bail!(
1320            "expected {} params, got {}",
1321            desc.param_types.len(),
1322            params.len()
1323        );
1324    }
1325
1326    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1327
1328    let mut datums = Row::default();
1329    let mut packer = datums.packer();
1330    let mut actual_types = Vec::new();
1331    let temp_storage = &RowArena::new();
1332    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1333        transform_ast::transform(scx, &mut expr)?;
1334
1335        let ecx = execute_expr_context(&qcx);
1336        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1337        let actual_ty = ecx.scalar_type(&ex);
1338        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1339            return Err(PlanError::WrongParameterType(
1340                i + 1,
1341                ecx.humanize_scalar_type(expected_ty, false),
1342                ecx.humanize_scalar_type(&actual_ty, false),
1343            ));
1344        }
1345        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1346        let evaled = ex.eval(&[], temp_storage)?;
1347        packer.push(evaled);
1348        actual_types.push(actual_ty);
1349    }
1350    Ok(Params {
1351        datums,
1352        execute_types: actual_types,
1353        expected_types: desc.param_types.clone(),
1354    })
1355}
1356
1357static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1358static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1359
1360/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1361pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1362    ExprContext {
1363        qcx,
1364        name: "EXECUTE",
1365        scope: &EXECUTE_CONTEXT_SCOPE,
1366        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1367        allow_aggregates: false,
1368        allow_subqueries: false,
1369        allow_parameters: false,
1370        allow_windows: false,
1371    }
1372}
1373
1374/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1375///
1376/// This is an assignment cast also in Postgres, see
1377/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1378pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1379    LazyLock::new(|| CastContext::Assignment);
1380
1381pub fn plan_index_exprs<'a>(
1382    scx: &'a StatementContext,
1383    on_desc: &RelationDesc,
1384    exprs: Vec<Expr<Aug>>,
1385) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1386    let scope = Scope::from_source(None, on_desc.iter_names());
1387    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1388
1389    let ecx = &ExprContext {
1390        qcx: &qcx,
1391        name: "CREATE INDEX",
1392        scope: &scope,
1393        relation_type: on_desc.typ(),
1394        allow_aggregates: false,
1395        allow_subqueries: false,
1396        allow_parameters: false,
1397        allow_windows: false,
1398    };
1399    let mut out = vec![];
1400    for mut expr in exprs {
1401        transform_ast::transform(scx, &mut expr)?;
1402        let expr = plan_expr_or_col_index(ecx, &expr)?;
1403        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1404        expr.reduce(&on_desc.typ().column_types);
1405        out.push(expr);
1406    }
1407    Ok(out)
1408}
1409
1410fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1411    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1412        Some(column) => Ok(HirScalarExpr::column(column)),
1413        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1414    }
1415}
1416
1417fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1418    match e {
1419        Expr::Value(Value::Number(n)) => {
1420            let n = n.parse::<usize>().map_err(|e| {
1421                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1422            })?;
1423            if n < 1 || n > max {
1424                sql_bail!(
1425                    "column reference {} in {} is out of range (1 - {})",
1426                    n,
1427                    name,
1428                    max
1429                );
1430            }
1431            Ok(Some(n - 1))
1432        }
1433        _ => Ok(None),
1434    }
1435}
1436
1437struct PlannedQuery {
1438    expr: HirRelationExpr,
1439    scope: Scope,
1440    order_by: Vec<ColumnOrder>,
1441    limit: Option<HirScalarExpr>,
1442    /// `offset` is either
1443    /// - an Int64 literal
1444    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1445    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1446    ///   later.)
1447    offset: HirScalarExpr,
1448    project: Vec<usize>,
1449    group_size_hints: GroupSizeHints,
1450}
1451
1452fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1453    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1454}
1455
1456fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1457    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1458    // for the identifiers, so that they can be re-installed before returning.
1459    let cte_bindings = plan_ctes(qcx, q)?;
1460
1461    let limit = match &q.limit {
1462        None => None,
1463        Some(Limit {
1464            quantity,
1465            with_ties: false,
1466        }) => {
1467            let ecx = &ExprContext {
1468                qcx,
1469                name: "LIMIT",
1470                scope: &Scope::empty(),
1471                relation_type: &SqlRelationType::empty(),
1472                allow_aggregates: false,
1473                allow_subqueries: true,
1474                allow_parameters: true,
1475                allow_windows: false,
1476            };
1477            let limit = plan_expr(ecx, quantity)?;
1478            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1479
1480            let limit = if limit.is_constant() {
1481                let arena = RowArena::new();
1482                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1483
1484                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1485                // that the error happened in a LIMIT clause, so that we have better error msg for
1486                // something like `SELECT 5 LIMIT 'aaa'`.
1487                match limit.eval(&[], &arena)? {
1488                    d @ Datum::Int64(v) if v >= 0 => {
1489                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1490                    }
1491                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1492                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1493                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1494                }
1495            } else {
1496                // Gate non-constant LIMIT expressions behind a feature flag
1497                qcx.scx
1498                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1499                limit
1500            };
1501
1502            Some(limit)
1503        }
1504        Some(Limit {
1505            quantity: _,
1506            with_ties: true,
1507        }) => bail_unsupported!("FETCH ... WITH TIES"),
1508    };
1509
1510    let offset = match &q.offset {
1511        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1512        Some(offset) => {
1513            let ecx = &ExprContext {
1514                qcx,
1515                name: "OFFSET",
1516                scope: &Scope::empty(),
1517                relation_type: &SqlRelationType::empty(),
1518                allow_aggregates: false,
1519                allow_subqueries: false,
1520                allow_parameters: true,
1521                allow_windows: false,
1522            };
1523            let offset = plan_expr(ecx, offset)?;
1524            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1525
1526            let offset = if offset.is_constant() {
1527                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1528                let offset_value = offset_into_value(offset)?;
1529                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1530            } else {
1531                // The only case when this is allowed to not be a constant is if it contains
1532                // parameters. (In which case, we'll later check that it's a constant after
1533                // parameter binding.)
1534                if !offset.contains_parameters() {
1535                    return Err(PlanError::InvalidOffset(format!(
1536                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1537                        offset
1538                    )));
1539                }
1540                offset
1541            };
1542            offset
1543        }
1544    };
1545
1546    let mut planned_query = match &q.body {
1547        SetExpr::Select(s) => {
1548            // Extract query options.
1549            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1550            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1551
1552            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1553            PlannedQuery {
1554                expr: plan.expr,
1555                scope: plan.scope,
1556                order_by: plan.order_by,
1557                project: plan.project,
1558                limit,
1559                offset,
1560                group_size_hints,
1561            }
1562        }
1563        _ => {
1564            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1565            let ecx = &ExprContext {
1566                qcx,
1567                name: "ORDER BY clause of a set expression",
1568                scope: &scope,
1569                relation_type: &qcx.relation_type(&expr),
1570                allow_aggregates: false,
1571                allow_subqueries: true,
1572                allow_parameters: true,
1573                allow_windows: false,
1574            };
1575            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1576            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1577            let project = (0..ecx.relation_type.arity()).collect();
1578            PlannedQuery {
1579                expr: expr.map(map_exprs),
1580                scope,
1581                order_by,
1582                limit,
1583                project,
1584                offset,
1585                group_size_hints: GroupSizeHints::default(),
1586            }
1587        }
1588    };
1589
1590    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1591    match &q.ctes {
1592        CteBlock::Simple(_) => {
1593            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1594                if let Some(cte) = qcx.ctes.remove(&id) {
1595                    planned_query.expr = HirRelationExpr::Let {
1596                        name: cte.name,
1597                        id: id.clone(),
1598                        value: Box::new(value),
1599                        body: Box::new(planned_query.expr),
1600                    };
1601                }
1602                if let Some(shadowed_val) = shadowed_val {
1603                    qcx.ctes.insert(id, shadowed_val);
1604                }
1605            }
1606        }
1607        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1608            let MutRecBlockOptionExtracted {
1609                recursion_limit,
1610                return_at_recursion_limit,
1611                error_at_recursion_limit,
1612                seen: _,
1613            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1614            let limit = match (
1615                recursion_limit,
1616                return_at_recursion_limit,
1617                error_at_recursion_limit,
1618            ) {
1619                (None, None, None) => None,
1620                (Some(max_iters), None, None) => {
1621                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1622                }
1623                (None, Some(max_iters), None) => Some((max_iters, true)),
1624                (None, None, Some(max_iters)) => Some((max_iters, false)),
1625                _ => {
1626                    return Err(InvalidWmrRecursionLimit(
1627                        "More than one recursion limit given. \
1628                         Please give at most one of RECURSION LIMIT, \
1629                         ERROR AT RECURSION LIMIT, \
1630                         RETURN AT RECURSION LIMIT."
1631                            .to_owned(),
1632                    ));
1633                }
1634            }
1635            .try_map(|(max_iters, return_at_limit)| {
1636                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1637                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1638                        "Recursion limit has to be greater than 0.".to_owned(),
1639                    ))?,
1640                    return_at_limit: *return_at_limit,
1641                })
1642            })?;
1643
1644            let mut bindings = Vec::new();
1645            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1646                if let Some(cte) = qcx.ctes.remove(&id) {
1647                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1648                }
1649                if let Some(shadowed_val) = shadowed_val {
1650                    qcx.ctes.insert(id, shadowed_val);
1651                }
1652            }
1653            if !bindings.is_empty() {
1654                planned_query.expr = HirRelationExpr::LetRec {
1655                    limit,
1656                    bindings,
1657                    body: Box::new(planned_query.expr),
1658                }
1659            }
1660        }
1661    }
1662
1663    Ok(planned_query)
1664}
1665
1666/// Converts an OFFSET expression into a value.
1667pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1668    let offset = offset
1669        .try_into_literal_int64()
1670        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1671    if offset < 0 {
1672        return Err(PlanError::InvalidOffset(format!(
1673            "must not be negative, got {}",
1674            offset
1675        )));
1676    }
1677    Ok(offset)
1678}
1679
1680generate_extracted_config!(
1681    MutRecBlockOption,
1682    (RecursionLimit, u64),
1683    (ReturnAtRecursionLimit, u64),
1684    (ErrorAtRecursionLimit, u64)
1685);
1686
1687/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1688///
1689/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1690/// shadowed value that can be reinstalled once the planning has completed.
1691pub fn plan_ctes(
1692    qcx: &mut QueryContext,
1693    q: &Query<Aug>,
1694) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1695    // Accumulate planned expressions and shadowed descriptions.
1696    let mut result = Vec::new();
1697    // Retain the old descriptions of CTE bindings so that we can restore them
1698    // after we're done planning this SELECT.
1699    let mut shadowed_descs = BTreeMap::new();
1700
1701    // A reused identifier indicates a reused name.
1702    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1703        sql_bail!(
1704            "WITH query name {} specified more than once",
1705            normalize::ident_ref(ident).quoted()
1706        )
1707    }
1708
1709    match &q.ctes {
1710        CteBlock::Simple(ctes) => {
1711            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1712            for cte in ctes.iter() {
1713                let cte_name = normalize::ident(cte.alias.name.clone());
1714                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1715                let typ = qcx.relation_type(&val);
1716                let mut desc = RelationDesc::new(typ, scope.column_names());
1717                plan_utils::maybe_rename_columns(
1718                    format!("CTE {}", cte.alias.name),
1719                    &mut desc,
1720                    &cte.alias.columns,
1721                )?;
1722                // Capture the prior value if it exists, so that it can be re-installed.
1723                let shadowed = qcx.ctes.insert(
1724                    cte.id,
1725                    CteDesc {
1726                        name: cte_name,
1727                        desc,
1728                    },
1729                );
1730
1731                result.push((cte.id, val, shadowed));
1732            }
1733        }
1734        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1735            // Insert column types into `qcx.ctes` first for recursive bindings.
1736            for cte in ctes.iter() {
1737                let cte_name = normalize::ident(cte.name.clone());
1738                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1739                for column in cte.columns.iter() {
1740                    desc_columns.push((
1741                        normalize::column_name(column.name.clone()),
1742                        SqlColumnType {
1743                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1744                            nullable: true,
1745                        },
1746                    ));
1747                }
1748                let desc = RelationDesc::from_names_and_types(desc_columns);
1749                let shadowed = qcx.ctes.insert(
1750                    cte.id,
1751                    CteDesc {
1752                        name: cte_name,
1753                        desc,
1754                    },
1755                );
1756                // Capture the prior value if it exists, so that it can be re-installed.
1757                if let Some(shadowed) = shadowed {
1758                    shadowed_descs.insert(cte.id, shadowed);
1759                }
1760            }
1761
1762            // Plan all CTEs and validate the proposed types.
1763            for cte in ctes.iter() {
1764                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1765
1766                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1767
1768                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1769                    // Once WMR CTEs support NOT NULL constraints, check that
1770                    // nullability of derived column types are compatible.
1771                    sql_bail!(
1772                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1773                    );
1774                }
1775
1776                if !proposed_typ.keys.is_empty() {
1777                    // Once WMR CTEs support keys, check that keys exactly
1778                    // overlap.
1779                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1780                }
1781
1782                // Validate that the derived and proposed types are the same.
1783                let derived_typ = qcx.relation_type(&val);
1784
1785                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1786                    let cte_name = normalize::ident(cte.name.clone());
1787                    let proposed_typ = proposed_typ
1788                        .column_types
1789                        .iter()
1790                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1791                        .collect::<Vec<_>>();
1792                    let inferred_typ = derived_typ
1793                        .column_types
1794                        .iter()
1795                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1796                        .collect::<Vec<_>>();
1797                    Err(PlanError::RecursiveTypeMismatch(
1798                        cte_name,
1799                        proposed_typ,
1800                        inferred_typ,
1801                    ))
1802                };
1803
1804                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1805                    return type_err(proposed_typ, derived_typ);
1806                }
1807
1808                // Cast derived types to proposed types or error.
1809                let val = match cast_relation(
1810                    qcx,
1811                    // Choose `CastContext::Assignment`` because the user has
1812                    // been explicit about the types they expect. Choosing
1813                    // `CastContext::Implicit` is not "strong" enough to impose
1814                    // typmods from proposed types onto values.
1815                    CastContext::Assignment,
1816                    val,
1817                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1818                ) {
1819                    Ok(val) => val,
1820                    Err(_) => return type_err(proposed_typ, derived_typ),
1821                };
1822
1823                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1824            }
1825        }
1826    }
1827
1828    Ok(result)
1829}
1830
1831pub fn plan_nested_query(
1832    qcx: &mut QueryContext,
1833    q: &Query<Aug>,
1834) -> Result<(HirRelationExpr, Scope), PlanError> {
1835    let PlannedQuery {
1836        mut expr,
1837        scope,
1838        order_by,
1839        limit,
1840        offset,
1841        project,
1842        group_size_hints,
1843    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1844    if limit.is_some()
1845        || !offset
1846            .clone()
1847            .try_into_literal_int64()
1848            .is_ok_and(|offset| offset == 0)
1849    {
1850        expr = HirRelationExpr::top_k(
1851            expr,
1852            vec![],
1853            order_by,
1854            limit,
1855            offset,
1856            group_size_hints.limit_input_group_size,
1857        );
1858    }
1859    Ok((expr.project(project), scope))
1860}
1861
1862fn plan_set_expr(
1863    qcx: &mut QueryContext,
1864    q: &SetExpr<Aug>,
1865) -> Result<(HirRelationExpr, Scope), PlanError> {
1866    match q {
1867        SetExpr::Select(select) => {
1868            let order_by_exprs = Vec::new();
1869            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1870            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1871            // should not have planned any ordering.
1872            assert!(plan.order_by.is_empty());
1873            Ok((plan.expr.project(plan.project), plan.scope))
1874        }
1875        SetExpr::SetOperation {
1876            op,
1877            all,
1878            left,
1879            right,
1880        } => {
1881            // Plan the LHS and RHS.
1882            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1883            let (right_expr, right_scope) =
1884                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1885
1886            // Validate that the LHS and RHS are the same width.
1887            let left_type = qcx.relation_type(&left_expr);
1888            let right_type = qcx.relation_type(&right_expr);
1889            if left_type.arity() != right_type.arity() {
1890                sql_bail!(
1891                    "each {} query must have the same number of columns: {} vs {}",
1892                    op,
1893                    left_type.arity(),
1894                    right_type.arity(),
1895                );
1896            }
1897
1898            // Match the types of the corresponding columns on the LHS and RHS
1899            // using the normal type coercion rules. This is equivalent to
1900            // `coerce_homogeneous_exprs`, but implemented in terms of
1901            // `HirRelationExpr` rather than `HirScalarExpr`.
1902            let left_ecx = &ExprContext {
1903                qcx,
1904                name: &op.to_string(),
1905                scope: &left_scope,
1906                relation_type: &left_type,
1907                allow_aggregates: false,
1908                allow_subqueries: false,
1909                allow_parameters: false,
1910                allow_windows: false,
1911            };
1912            let right_ecx = &ExprContext {
1913                qcx,
1914                name: &op.to_string(),
1915                scope: &right_scope,
1916                relation_type: &right_type,
1917                allow_aggregates: false,
1918                allow_subqueries: false,
1919                allow_parameters: false,
1920                allow_windows: false,
1921            };
1922            let mut left_casts = vec![];
1923            let mut right_casts = vec![];
1924            for (i, (left_type, right_type)) in left_type
1925                .column_types
1926                .iter()
1927                .zip_eq(right_type.column_types.iter())
1928                .enumerate()
1929            {
1930                let types = &[
1931                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1932                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1933                ];
1934                let target =
1935                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1936                match typeconv::plan_cast(
1937                    left_ecx,
1938                    CastContext::Implicit,
1939                    HirScalarExpr::column(i),
1940                    &target,
1941                ) {
1942                    Ok(expr) => left_casts.push(expr),
1943                    Err(_) => sql_bail!(
1944                        "{} types {} and {} cannot be matched",
1945                        op,
1946                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1947                        qcx.humanize_scalar_type(&target, false),
1948                    ),
1949                }
1950                match typeconv::plan_cast(
1951                    right_ecx,
1952                    CastContext::Implicit,
1953                    HirScalarExpr::column(i),
1954                    &target,
1955                ) {
1956                    Ok(expr) => right_casts.push(expr),
1957                    Err(_) => sql_bail!(
1958                        "{} types {} and {} cannot be matched",
1959                        op,
1960                        qcx.humanize_scalar_type(&target, false),
1961                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1962                    ),
1963                }
1964            }
1965            let lhs = if left_casts
1966                .iter()
1967                .enumerate()
1968                .any(|(i, e)| e != &HirScalarExpr::column(i))
1969            {
1970                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1971                left_expr.map(left_casts).project(project_key)
1972            } else {
1973                left_expr
1974            };
1975            let rhs = if right_casts
1976                .iter()
1977                .enumerate()
1978                .any(|(i, e)| e != &HirScalarExpr::column(i))
1979            {
1980                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1981                right_expr.map(right_casts).project(project_key)
1982            } else {
1983                right_expr
1984            };
1985
1986            let relation_expr = match op {
1987                SetOperator::Union => {
1988                    if *all {
1989                        lhs.union(rhs)
1990                    } else {
1991                        lhs.union(rhs).distinct()
1992                    }
1993                }
1994                SetOperator::Except => Hir::except(all, lhs, rhs),
1995                SetOperator::Intersect => {
1996                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1997                    // Though we believe that render() does The Right Thing (TM)
1998                    // Also note that we do *not* need another threshold() at the end of the method chain
1999                    // because the right-hand side of the outer union only produces existing records,
2000                    // i.e., the record counts for differential data flow definitely remain non-negative.
2001                    let left_clone = lhs.clone();
2002                    if *all {
2003                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2004                    } else {
2005                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
2006                            .distinct()
2007                    }
2008                }
2009            };
2010            let scope = Scope::from_source(
2011                None,
2012                // Column names are taken from the left, as in Postgres.
2013                left_scope.column_names(),
2014            );
2015
2016            Ok((relation_expr, scope))
2017        }
2018        SetExpr::Values(Values(values)) => plan_values(qcx, values),
2019        SetExpr::Table(name) => {
2020            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2021            Ok((expr, scope))
2022        }
2023        SetExpr::Query(query) => {
2024            let (expr, scope) = plan_nested_query(qcx, query)?;
2025            Ok((expr, scope))
2026        }
2027        SetExpr::Show(stmt) => {
2028            // The create SQL definition of involving this query, will have the explicit `SHOW`
2029            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2030            // current schema of the executing user. When Materialize restarts and tries to re-plan
2031            // these queries, it will only have access to the raw `SHOW` command and have no idea
2032            // what schema to use. As a result Materialize will fail to boot.
2033            //
2034            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2035            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2036            // However, banning show commands in views gives us more flexibility to change their
2037            // output.
2038            //
2039            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2040            // with all show commands expanded into their equivalent SELECT statements.
2041            if !qcx.lifetime.allow_show() {
2042                return Err(PlanError::ShowCommandInView);
2043            }
2044
2045            // Some SHOW statements are a SELECT query. Others produces Rows
2046            // directly. Convert both of these to the needed Hir and Scope.
2047            fn to_hirscope(
2048                plan: ShowCreatePlan,
2049                desc: StatementDesc,
2050            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2051                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2052                let desc = desc.relation_desc.expect("must exist");
2053                let scope = Scope::from_source(None, desc.iter_names());
2054                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2055                Ok((expr, scope))
2056            }
2057
2058            match stmt.clone() {
2059                ShowStatement::ShowColumns(stmt) => {
2060                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2061                }
2062                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2063                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2064                    show::describe_show_create_connection(qcx.scx, stmt)?,
2065                ),
2066                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2067                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2068                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2069                ),
2070                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2071                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2072                    show::describe_show_create_index(qcx.scx, stmt)?,
2073                ),
2074                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2075                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2076                    show::describe_show_create_sink(qcx.scx, stmt)?,
2077                ),
2078                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2079                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2080                    show::describe_show_create_source(qcx.scx, stmt)?,
2081                ),
2082                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2083                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2084                    show::describe_show_create_table(qcx.scx, stmt)?,
2085                ),
2086                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2087                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2088                    show::describe_show_create_view(qcx.scx, stmt)?,
2089                ),
2090                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2091                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2092                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2093                ),
2094                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2095                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2096                    show::describe_show_create_type(qcx.scx, stmt)?,
2097                ),
2098                ShowStatement::ShowObjects(stmt) => {
2099                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2100                }
2101                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2102                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2103            }
2104        }
2105    }
2106}
2107
2108/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2109fn plan_values(
2110    qcx: &QueryContext,
2111    values: &[Vec<Expr<Aug>>],
2112) -> Result<(HirRelationExpr, Scope), PlanError> {
2113    assert!(!values.is_empty());
2114
2115    let ecx = &ExprContext {
2116        qcx,
2117        name: "VALUES",
2118        scope: &Scope::empty(),
2119        relation_type: &SqlRelationType::empty(),
2120        allow_aggregates: false,
2121        allow_subqueries: true,
2122        allow_parameters: true,
2123        allow_windows: false,
2124    };
2125
2126    let ncols = values[0].len();
2127    let nrows = values.len();
2128
2129    // Arrange input expressions by columns, not rows, so that we can
2130    // call `coerce_homogeneous_exprs` on each column.
2131    let mut cols = vec![vec![]; ncols];
2132    for row in values {
2133        if row.len() != ncols {
2134            sql_bail!(
2135                "VALUES expression has varying number of columns: {} vs {}",
2136                row.len(),
2137                ncols
2138            );
2139        }
2140        for (i, v) in row.iter().enumerate() {
2141            cols[i].push(v);
2142        }
2143    }
2144
2145    // Plan each column.
2146    let mut col_iters = Vec::with_capacity(ncols);
2147    let mut col_types = Vec::with_capacity(ncols);
2148    for col in &cols {
2149        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2150        let mut col_type = ecx.column_type(&col[0]);
2151        for val in &col[1..] {
2152            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2153        }
2154        col_types.push(col_type);
2155        col_iters.push(col.into_iter());
2156    }
2157
2158    // Build constant relation.
2159    let mut exprs = vec![];
2160    for _ in 0..nrows {
2161        for i in 0..ncols {
2162            exprs.push(col_iters[i].next().unwrap());
2163        }
2164    }
2165    let out = HirRelationExpr::CallTable {
2166        func: TableFunc::Wrap {
2167            width: ncols,
2168            types: col_types,
2169        },
2170        exprs,
2171    };
2172
2173    // Build column names.
2174    let mut scope = Scope::empty();
2175    for i in 0..ncols {
2176        let name = format!("column{}", i + 1);
2177        scope.items.push(ScopeItem::from_column_name(name));
2178    }
2179
2180    Ok((out, scope))
2181}
2182
2183/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2184/// statement.
2185///
2186/// This is special-cased in PostgreSQL and different enough from `plan_values`
2187/// that it is easier to use a separate function entirely. Unlike a normal
2188/// `VALUES` clause, each value is coerced to the type of the target table
2189/// via an assignment cast.
2190///
2191/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2192fn plan_values_insert(
2193    qcx: &QueryContext,
2194    target_names: &[&ColumnName],
2195    target_types: &[&SqlScalarType],
2196    values: &[Vec<Expr<Aug>>],
2197) -> Result<HirRelationExpr, PlanError> {
2198    assert!(!values.is_empty());
2199
2200    if !values.iter().map(|row| row.len()).all_equal() {
2201        sql_bail!("VALUES lists must all be the same length");
2202    }
2203
2204    let ecx = &ExprContext {
2205        qcx,
2206        name: "VALUES",
2207        scope: &Scope::empty(),
2208        relation_type: &SqlRelationType::empty(),
2209        allow_aggregates: false,
2210        allow_subqueries: true,
2211        allow_parameters: true,
2212        allow_windows: false,
2213    };
2214
2215    let mut exprs = vec![];
2216    let mut types = vec![];
2217    for row in values {
2218        if row.len() > target_names.len() {
2219            sql_bail!("INSERT has more expressions than target columns");
2220        }
2221        for (column, val) in row.into_iter().enumerate() {
2222            let target_type = &target_types[column];
2223            let val = plan_expr(ecx, val)?;
2224            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2225            let source_type = &ecx.scalar_type(&val);
2226            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2227                Ok(val) => val,
2228                Err(_) => sql_bail!(
2229                    "column {} is of type {} but expression is of type {}",
2230                    target_names[column].quoted(),
2231                    qcx.humanize_scalar_type(target_type, false),
2232                    qcx.humanize_scalar_type(source_type, false),
2233                ),
2234            };
2235            if column >= types.len() {
2236                types.push(ecx.column_type(&val));
2237            } else {
2238                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2239            }
2240            exprs.push(val);
2241        }
2242    }
2243
2244    Ok(HirRelationExpr::CallTable {
2245        func: TableFunc::Wrap {
2246            width: values[0].len(),
2247            types,
2248        },
2249        exprs,
2250    })
2251}
2252
2253fn plan_join_identity() -> (HirRelationExpr, Scope) {
2254    let typ = SqlRelationType::new(vec![]);
2255    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2256    let scope = Scope::empty();
2257    (expr, scope)
2258}
2259
2260/// Describes how to execute a SELECT query.
2261///
2262/// `order_by` describes how to order the rows in `expr` *before* applying the
2263/// projection. The `scope` describes the columns in `expr` *after* the
2264/// projection has been applied.
2265#[derive(Debug)]
2266struct SelectPlan {
2267    expr: HirRelationExpr,
2268    scope: Scope,
2269    order_by: Vec<ColumnOrder>,
2270    project: Vec<usize>,
2271}
2272
2273generate_extracted_config!(
2274    SelectOption,
2275    (ExpectedGroupSize, u64),
2276    (AggregateInputGroupSize, u64),
2277    (DistinctOnInputGroupSize, u64),
2278    (LimitInputGroupSize, u64)
2279);
2280
2281/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2282///
2283/// Normally, the ORDER BY clause occurs after the columns specified in the
2284/// SELECT list have been projected. In a query like
2285///
2286///   CREATE TABLE (a int, b int)
2287///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2288///
2289/// it is valid to refer to `a`, because it is explicitly selected, but it would
2290/// not be valid to refer to unselected column `b`.
2291///
2292/// But PostgreSQL extends the standard to permit queries like
2293///
2294///   SELECT a FROM t ORDER BY b
2295///
2296/// where expressions in the ORDER BY clause can refer to *both* input columns
2297/// and output columns.
2298fn plan_select_from_where(
2299    qcx: &QueryContext,
2300    mut s: Select<Aug>,
2301    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2302) -> Result<SelectPlan, PlanError> {
2303    // TODO: Both `s` and `order_by_exprs` are not references because the
2304    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2305    // table function support (the UUID mapping). Attempt to change this so callers
2306    // don't need to clone the Select.
2307
2308    // Extract query options.
2309    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2310    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2311
2312    // Step 1. Handle FROM clause, including joins.
2313    let (mut relation_expr, mut from_scope) =
2314        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2315            let (left, left_scope) = l;
2316            plan_join(
2317                qcx,
2318                left,
2319                left_scope,
2320                &Join {
2321                    relation: TableFactor::NestedJoin {
2322                        join: Box::new(twj.clone()),
2323                        alias: None,
2324                    },
2325                    join_operator: JoinOperator::CrossJoin,
2326                },
2327            )
2328        })?;
2329
2330    // Step 2. Handle WHERE clause.
2331    if let Some(selection) = &s.selection {
2332        let ecx = &ExprContext {
2333            qcx,
2334            name: "WHERE clause",
2335            scope: &from_scope,
2336            relation_type: &qcx.relation_type(&relation_expr),
2337            allow_aggregates: false,
2338            allow_subqueries: true,
2339            allow_parameters: true,
2340            allow_windows: false,
2341        };
2342        let expr = plan_expr(ecx, selection)
2343            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2344            .type_as(ecx, &SqlScalarType::Bool)?;
2345        relation_expr = relation_expr.filter(vec![expr]);
2346    }
2347
2348    // Step 3. Gather aggregates and table functions.
2349    // (But skip window aggregates.)
2350    let (aggregates, table_funcs) = {
2351        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2352        visitor.visit_select_mut(&mut s);
2353        for o in order_by_exprs.iter_mut() {
2354            visitor.visit_order_by_expr_mut(o);
2355        }
2356        visitor.into_result()?
2357    };
2358    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2359    if !table_funcs.is_empty() {
2360        let (expr, scope) = plan_scalar_table_funcs(
2361            qcx,
2362            table_funcs,
2363            &mut table_func_names,
2364            &relation_expr,
2365            &from_scope,
2366        )?;
2367        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2368        from_scope = from_scope.product(scope)?;
2369    }
2370
2371    // Step 4. Expand SELECT clause.
2372    let projection = {
2373        let ecx = &ExprContext {
2374            qcx,
2375            name: "SELECT clause",
2376            scope: &from_scope,
2377            relation_type: &qcx.relation_type(&relation_expr),
2378            allow_aggregates: true,
2379            allow_subqueries: true,
2380            allow_parameters: true,
2381            allow_windows: true,
2382        };
2383        let mut out = vec![];
2384        for si in &s.projection {
2385            if *si == SelectItem::Wildcard && s.from.is_empty() {
2386                sql_bail!("SELECT * with no tables specified is not valid");
2387            }
2388            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2389        }
2390        out
2391    };
2392
2393    // Step 5. Handle GROUP BY clause.
2394    // This will also plan the aggregates gathered in Step 3.
2395    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2396    let (mut group_scope, select_all_mapping) = {
2397        // Compute GROUP BY expressions.
2398        let ecx = &ExprContext {
2399            qcx,
2400            name: "GROUP BY clause",
2401            scope: &from_scope,
2402            relation_type: &qcx.relation_type(&relation_expr),
2403            allow_aggregates: false,
2404            allow_subqueries: true,
2405            allow_parameters: true,
2406            allow_windows: false,
2407        };
2408        let mut group_key = vec![];
2409        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2410        let mut group_hir_exprs = vec![];
2411        let mut group_scope = Scope::empty();
2412        let mut select_all_mapping = BTreeMap::new();
2413
2414        for group_expr in &s.group_by {
2415            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2416            let new_column = group_key.len();
2417
2418            if let Some(group_expr) = group_expr {
2419                // Multiple AST expressions can map to the same HIR expression.
2420                // If we already have a ScopeItem for this HIR, we can add this
2421                // next AST expression to its set
2422                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2423                    existing_scope_item.exprs.insert(group_expr.clone());
2424                    continue;
2425                }
2426            }
2427
2428            let mut scope_item = if let HirScalarExpr::Column(
2429                ColumnRef {
2430                    level: 0,
2431                    column: old_column,
2432                },
2433                _name,
2434            ) = &expr
2435            {
2436                // If we later have `SELECT foo.*` then we have to find all
2437                // the `foo` items in `from_scope` and figure out where they
2438                // ended up in `group_scope`. This is really hard to do
2439                // right using SQL name resolution, so instead we just track
2440                // the movement here.
2441                select_all_mapping.insert(*old_column, new_column);
2442                let scope_item = ecx.scope.items[*old_column].clone();
2443                scope_item
2444            } else {
2445                ScopeItem::empty()
2446            };
2447
2448            if let Some(group_expr) = group_expr.cloned() {
2449                scope_item.exprs.insert(group_expr);
2450            }
2451
2452            group_key.push(from_scope.len() + group_exprs.len());
2453            group_hir_exprs.push(expr.clone());
2454            group_exprs.insert(expr, scope_item);
2455        }
2456
2457        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2458        for expr in &group_hir_exprs {
2459            if let Some(scope_item) = group_exprs.remove(expr) {
2460                group_scope.items.push(scope_item);
2461            }
2462        }
2463
2464        // Plan aggregates.
2465        let ecx = &ExprContext {
2466            qcx,
2467            name: "aggregate function",
2468            scope: &from_scope,
2469            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2470            allow_aggregates: false,
2471            allow_subqueries: true,
2472            allow_parameters: true,
2473            allow_windows: false,
2474        };
2475        let mut agg_exprs = vec![];
2476        for sql_function in aggregates {
2477            if sql_function.over.is_some() {
2478                unreachable!(
2479                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2480                );
2481            }
2482            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2483            group_scope
2484                .items
2485                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2486        }
2487        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2488            // apply GROUP BY / aggregates
2489            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2490                group_key,
2491                agg_exprs,
2492                group_size_hints.aggregate_input_group_size,
2493            );
2494
2495            // For every old column that wasn't a group key, add a scope item
2496            // that errors when referenced. We can't simply drop these items
2497            // from scope. These items need to *exist* because they might shadow
2498            // variables in outer scopes that would otherwise be valid to
2499            // reference, but accessing them needs to produce an error.
2500            for i in 0..from_scope.len() {
2501                if !select_all_mapping.contains_key(&i) {
2502                    let scope_item = &ecx.scope.items[i];
2503                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2504                        table_name: scope_item.table_name.clone(),
2505                        column_name: scope_item.column_name.clone(),
2506                        allow_unqualified_references: scope_item.allow_unqualified_references,
2507                    });
2508                }
2509            }
2510
2511            (group_scope, select_all_mapping)
2512        } else {
2513            // if no GROUP BY, aggregates or having then all columns remain in scope
2514            (
2515                from_scope.clone(),
2516                (0..from_scope.len()).map(|i| (i, i)).collect(),
2517            )
2518        }
2519    };
2520
2521    // Step 6. Handle HAVING clause.
2522    if let Some(ref having) = s.having {
2523        let ecx = &ExprContext {
2524            qcx,
2525            name: "HAVING clause",
2526            scope: &group_scope,
2527            relation_type: &qcx.relation_type(&relation_expr),
2528            allow_aggregates: true,
2529            allow_subqueries: true,
2530            allow_parameters: true,
2531            allow_windows: false,
2532        };
2533        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2534        relation_expr = relation_expr.filter(vec![expr]);
2535    }
2536
2537    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2538    // (This includes window aggregations.)
2539    //
2540    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2541    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2542    //
2543    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2544    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2545    // and can't be part of a bigger expression.
2546    // See https://www.postgresql.org/docs/current/queries-order.html:
2547    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2548    // expression"
2549    let window_funcs = {
2550        let mut visitor = WindowFuncCollector::default();
2551        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2552        // window functions are excluded from other things by `allow_windows` being false when
2553        // planning those before this code).
2554        visitor.visit_select(&s);
2555        for o in order_by_exprs.iter() {
2556            visitor.visit_order_by_expr(o);
2557        }
2558        visitor.into_result()
2559    };
2560    for window_func in window_funcs {
2561        let ecx = &ExprContext {
2562            qcx,
2563            name: "window function",
2564            scope: &group_scope,
2565            relation_type: &qcx.relation_type(&relation_expr),
2566            allow_aggregates: true,
2567            allow_subqueries: true,
2568            allow_parameters: true,
2569            allow_windows: true,
2570        };
2571        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2572        group_scope.items.push(ScopeItem::from_expr(window_func));
2573    }
2574    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2575    // been already planned now. However, we should still set `allow_windows: true` for the
2576    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2577    // msg if an OVER clause is missing from a window function.
2578
2579    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2580    if let Some(ref qualify) = s.qualify {
2581        let ecx = &ExprContext {
2582            qcx,
2583            name: "QUALIFY clause",
2584            scope: &group_scope,
2585            relation_type: &qcx.relation_type(&relation_expr),
2586            allow_aggregates: true,
2587            allow_subqueries: true,
2588            allow_parameters: true,
2589            allow_windows: true,
2590        };
2591        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2592        relation_expr = relation_expr.filter(vec![expr]);
2593    }
2594
2595    // Step 9. Handle SELECT clause.
2596    let output_columns = {
2597        let mut new_exprs = vec![];
2598        let mut new_type = qcx.relation_type(&relation_expr);
2599        let mut output_columns = vec![];
2600        for (select_item, column_name) in &projection {
2601            let ecx = &ExprContext {
2602                qcx,
2603                name: "SELECT clause",
2604                scope: &group_scope,
2605                relation_type: &new_type,
2606                allow_aggregates: true,
2607                allow_subqueries: true,
2608                allow_parameters: true,
2609                allow_windows: true,
2610            };
2611            let expr = match select_item {
2612                ExpandedSelectItem::InputOrdinal(i) => {
2613                    if let Some(column) = select_all_mapping.get(i).copied() {
2614                        HirScalarExpr::column(column)
2615                    } else {
2616                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2617                    }
2618                }
2619                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2620            };
2621            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2622                // Simple column reference; no need to map on a new expression.
2623                output_columns.push((column, column_name));
2624            } else {
2625                // Complicated expression that requires a map expression. We
2626                // update `group_scope` as we go so that future expressions that
2627                // are textually identical to this one can reuse it. This
2628                // duplicate detection is required for proper determination of
2629                // ambiguous column references with SQL92-style `ORDER BY`
2630                // items. See `plan_order_by_or_distinct_expr` for more.
2631                let typ = ecx.column_type(&expr);
2632                new_type.column_types.push(typ);
2633                new_exprs.push(expr);
2634                output_columns.push((group_scope.len(), column_name));
2635                group_scope
2636                    .items
2637                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2638            }
2639        }
2640        relation_expr = relation_expr.map(new_exprs);
2641        output_columns
2642    };
2643    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2644
2645    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2646    let order_by = {
2647        let relation_type = qcx.relation_type(&relation_expr);
2648        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2649            &ExprContext {
2650                qcx,
2651                name: "ORDER BY clause",
2652                scope: &group_scope,
2653                relation_type: &relation_type,
2654                allow_aggregates: true,
2655                allow_subqueries: true,
2656                allow_parameters: true,
2657                allow_windows: true,
2658            },
2659            &order_by_exprs,
2660            &output_columns,
2661        )?;
2662
2663        match s.distinct {
2664            None => relation_expr = relation_expr.map(map_exprs),
2665            Some(Distinct::EntireRow) => {
2666                if relation_type.arity() == 0 {
2667                    sql_bail!("SELECT DISTINCT must have at least one column");
2668                }
2669                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2670                // list, so we can't proceed if `ORDER BY` has introduced any
2671                // columns for arbitrary expressions. This matches PostgreSQL.
2672                if !try_push_projection_order_by(
2673                    &mut relation_expr,
2674                    &mut project_key,
2675                    &mut order_by,
2676                ) {
2677                    sql_bail!(
2678                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2679                    );
2680                }
2681                assert!(map_exprs.is_empty());
2682                relation_expr = relation_expr.distinct();
2683            }
2684            Some(Distinct::On(exprs)) => {
2685                let ecx = &ExprContext {
2686                    qcx,
2687                    name: "DISTINCT ON clause",
2688                    scope: &group_scope,
2689                    relation_type: &qcx.relation_type(&relation_expr),
2690                    allow_aggregates: true,
2691                    allow_subqueries: true,
2692                    allow_parameters: true,
2693                    allow_windows: true,
2694                };
2695
2696                let mut distinct_exprs = vec![];
2697                for expr in &exprs {
2698                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2699                    distinct_exprs.push(expr);
2700                }
2701
2702                let mut distinct_key = vec![];
2703
2704                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2705                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2706                // expressions, though the order of `DISTINCT ON` expressions
2707                // does not matter. This matches PostgreSQL and leaves the door
2708                // open to a future optimization where the `DISTINCT ON` and
2709                // `ORDER BY` operations happen in one pass.
2710                //
2711                // On the bright side, any columns that have already been
2712                // computed by `ORDER BY` can be reused in the distinct key.
2713                let arity = relation_type.arity();
2714                for ord in order_by.iter().take(distinct_exprs.len()) {
2715                    // The unusual construction of `expr` here is to ensure the
2716                    // temporary column expression lives long enough.
2717                    let mut expr = &HirScalarExpr::column(ord.column);
2718                    if ord.column >= arity {
2719                        expr = &map_exprs[ord.column - arity];
2720                    };
2721                    match distinct_exprs.iter().position(move |e| e == expr) {
2722                        None => sql_bail!(
2723                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2724                        ),
2725                        Some(pos) => {
2726                            distinct_exprs.remove(pos);
2727                        }
2728                    }
2729                    distinct_key.push(ord.column);
2730                }
2731
2732                // Add any remaining `DISTINCT ON` expressions to the key.
2733                for expr in distinct_exprs {
2734                    // If the expression is a reference to an existing column,
2735                    // do not introduce a new column to support it.
2736                    let column = match expr {
2737                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2738                        _ => {
2739                            map_exprs.push(expr);
2740                            arity + map_exprs.len() - 1
2741                        }
2742                    };
2743                    distinct_key.push(column);
2744                }
2745
2746                // `DISTINCT ON` is semantically a TopK with limit 1. The
2747                // columns in `ORDER BY` that are not part of the distinct key,
2748                // if there are any, determine the ordering within each group,
2749                // per PostgreSQL semantics.
2750                let distinct_len = distinct_key.len();
2751                relation_expr = HirRelationExpr::top_k(
2752                    relation_expr.map(map_exprs),
2753                    distinct_key,
2754                    order_by.iter().skip(distinct_len).cloned().collect(),
2755                    Some(HirScalarExpr::literal(
2756                        Datum::Int64(1),
2757                        SqlScalarType::Int64,
2758                    )),
2759                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2760                    group_size_hints.distinct_on_input_group_size,
2761                );
2762            }
2763        }
2764
2765        order_by
2766    };
2767
2768    // Construct a clean scope to expose outwards, where all of the state that
2769    // accumulated in the scope during planning of this SELECT is erased. The
2770    // clean scope has at most one name for each column, and the names are not
2771    // associated with any table.
2772    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2773
2774    Ok(SelectPlan {
2775        expr: relation_expr,
2776        scope,
2777        order_by,
2778        project: project_key,
2779    })
2780}
2781
2782fn plan_scalar_table_funcs(
2783    qcx: &QueryContext,
2784    table_funcs: BTreeMap<Function<Aug>, String>,
2785    table_func_names: &mut BTreeMap<String, Ident>,
2786    relation_expr: &HirRelationExpr,
2787    from_scope: &Scope,
2788) -> Result<(HirRelationExpr, Scope), PlanError> {
2789    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2790
2791    for (table_func, id) in table_funcs.iter() {
2792        table_func_names.insert(
2793            id.clone(),
2794            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2795            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2796        );
2797    }
2798    // If there's only a single table function, we can skip generating
2799    // ordinality columns.
2800    if table_funcs.len() == 1 {
2801        let (table_func, id) = table_funcs.iter().next().unwrap();
2802        let (expr, mut scope) =
2803            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2804
2805        // A single table-function might return several columns as a record
2806        let num_cols = scope.len();
2807        for i in 0..scope.len() {
2808            scope.items[i].table_name = Some(PartialItemName {
2809                database: None,
2810                schema: None,
2811                item: id.clone(),
2812            });
2813            scope.items[i].from_single_column_function = num_cols == 1;
2814            scope.items[i].allow_unqualified_references = false;
2815        }
2816        return Ok((expr, scope));
2817    }
2818    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2819    let (expr, mut scope, num_cols) =
2820        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2821
2822    // Munge the scope so table names match with the generated ids.
2823    let mut i = 0;
2824    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2825        for _ in 0..num_cols {
2826            scope.items[i].table_name = Some(PartialItemName {
2827                database: None,
2828                schema: None,
2829                item: id.clone(),
2830            });
2831            scope.items[i].from_single_column_function = num_cols == 1;
2832            scope.items[i].allow_unqualified_references = false;
2833            i += 1;
2834        }
2835        // Ordinality column. This doubles as the
2836        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2837        // because it only needs to be NULL or not.
2838        scope.items[i].table_name = Some(PartialItemName {
2839            database: None,
2840            schema: None,
2841            item: id.clone(),
2842        });
2843        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2844        scope.items[i].allow_unqualified_references = false;
2845        i += 1;
2846    }
2847    // Coalesced ordinality column.
2848    scope.items[i].allow_unqualified_references = false;
2849    Ok((expr, scope))
2850}
2851
2852/// Plans an expression in a `GROUP BY` clause.
2853///
2854/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2855/// names/expressions defined in the `SELECT` clause. These special cases are
2856/// handled by this function; see comments within the implementation for
2857/// details.
2858fn plan_group_by_expr<'a>(
2859    ecx: &ExprContext,
2860    group_expr: &'a Expr<Aug>,
2861    projection: &'a [(ExpandedSelectItem, ColumnName)],
2862) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2863    let plan_projection = |column: usize| match &projection[column].0 {
2864        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2865        ExpandedSelectItem::Expr(expr) => {
2866            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2867        }
2868    };
2869
2870    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2871    // a special case that means to use the ith item in the SELECT clause.
2872    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2873        return plan_projection(column);
2874    }
2875
2876    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2877    // The `foo` can refer to *either* an input column or an output column. If
2878    // both exist, the input column is preferred.
2879    match group_expr {
2880        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2881            Err(PlanError::UnknownColumn {
2882                table: None,
2883                column,
2884                similar,
2885            }) => {
2886                // The expression was a simple identifier that did not match an
2887                // input column. See if it matches an output column.
2888                let mut iter = projection.iter().map(|(_expr, name)| name);
2889                if let Some(i) = iter.position(|n| *n == column) {
2890                    if iter.any(|n| *n == column) {
2891                        Err(PlanError::AmbiguousColumn(column))
2892                    } else {
2893                        plan_projection(i)
2894                    }
2895                } else {
2896                    // The name didn't match an output column either. Return the
2897                    // "unknown column" error.
2898                    Err(PlanError::UnknownColumn {
2899                        table: None,
2900                        column,
2901                        similar,
2902                    })
2903                }
2904            }
2905            res => Ok((Some(group_expr), res?)),
2906        },
2907        _ => Ok((
2908            Some(group_expr),
2909            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2910        )),
2911    }
2912}
2913
2914/// Plans a slice of `ORDER BY` expressions.
2915///
2916/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2917/// parameter.
2918///
2919/// Returns the determined column orderings and a list of scalar expressions
2920/// that must be mapped onto the underlying relation expression.
2921pub(crate) fn plan_order_by_exprs(
2922    ecx: &ExprContext,
2923    order_by_exprs: &[OrderByExpr<Aug>],
2924    output_columns: &[(usize, &ColumnName)],
2925) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2926    let mut order_by = vec![];
2927    let mut map_exprs = vec![];
2928    for obe in order_by_exprs {
2929        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2930        // If the expression is a reference to an existing column,
2931        // do not introduce a new column to support it.
2932        let column = match expr {
2933            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2934            _ => {
2935                map_exprs.push(expr);
2936                ecx.relation_type.arity() + map_exprs.len() - 1
2937            }
2938        };
2939        order_by.push(resolve_desc_and_nulls_last(obe, column));
2940    }
2941    Ok((order_by, map_exprs))
2942}
2943
2944/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2945///
2946/// The `output_columns` parameter describes, in order, the physical index and
2947/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2948/// corresponds to a `SELECT` list with a single entry named "a" that can be
2949/// found at index 3 in the underlying relation expression.
2950///
2951/// There are three cases to handle.
2952///
2953///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2954///       reference to the specified output column.
2955///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2956///       an output column, if it exists; otherwise it is a reference to an
2957///       input column.
2958///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2959///       arbitrary expressions exclusively refer to input columns, never output
2960///       columns.
2961fn plan_order_by_or_distinct_expr(
2962    ecx: &ExprContext,
2963    expr: &Expr<Aug>,
2964    output_columns: &[(usize, &ColumnName)],
2965) -> Result<HirScalarExpr, PlanError> {
2966    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2967        return Ok(HirScalarExpr::column(output_columns[i].0));
2968    }
2969
2970    if let Expr::Identifier(names) = expr {
2971        if let [name] = &names[..] {
2972            let name = normalize::column_name(name.clone());
2973            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2974            if let Some((i, _)) = iter.next() {
2975                match iter.next() {
2976                    // Per SQL92, names are not considered ambiguous if they
2977                    // refer to identical target list expressions, as in
2978                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2979                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2980                    _ => return Ok(HirScalarExpr::column(*i)),
2981                }
2982            }
2983        }
2984    }
2985
2986    plan_expr(ecx, expr)?.type_as_any(ecx)
2987}
2988
2989fn plan_table_with_joins(
2990    qcx: &QueryContext,
2991    table_with_joins: &TableWithJoins<Aug>,
2992) -> Result<(HirRelationExpr, Scope), PlanError> {
2993    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2994    for join in &table_with_joins.joins {
2995        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2996        expr = new_expr;
2997        scope = new_scope;
2998    }
2999    Ok((expr, scope))
3000}
3001
3002fn plan_table_factor(
3003    qcx: &QueryContext,
3004    table_factor: &TableFactor<Aug>,
3005) -> Result<(HirRelationExpr, Scope), PlanError> {
3006    match table_factor {
3007        TableFactor::Table { name, alias } => {
3008            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
3009            let scope = plan_table_alias(scope, alias.as_ref())?;
3010            Ok((expr, scope))
3011        }
3012
3013        TableFactor::Function {
3014            function,
3015            alias,
3016            with_ordinality,
3017        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3018
3019        TableFactor::RowsFrom {
3020            functions,
3021            alias,
3022            with_ordinality,
3023        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3024
3025        TableFactor::Derived {
3026            lateral,
3027            subquery,
3028            alias,
3029        } => {
3030            let mut qcx = (*qcx).clone();
3031            if !lateral {
3032                // Since this derived table was not marked as `LATERAL`,
3033                // make elements in outer scopes invisible until we reach the
3034                // next lateral barrier.
3035                for scope in &mut qcx.outer_scopes {
3036                    if scope.lateral_barrier {
3037                        break;
3038                    }
3039                    scope.items.clear();
3040                }
3041            }
3042            qcx.outer_scopes[0].lateral_barrier = true;
3043            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3044            let scope = plan_table_alias(scope, alias.as_ref())?;
3045            Ok((expr, scope))
3046        }
3047
3048        TableFactor::NestedJoin { join, alias } => {
3049            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3050            let scope = plan_table_alias(scope, alias.as_ref())?;
3051            Ok((expr, scope))
3052        }
3053    }
3054}
3055
3056/// Plans a `ROWS FROM` expression.
3057///
3058/// `ROWS FROM` concatenates table functions into a single table, filling in
3059/// `NULL`s in places where one table function has fewer rows than another. We
3060/// can achieve this by augmenting each table function with a row number, doing
3061/// a `FULL JOIN` between each table function on the row number and eventually
3062/// projecting away the row number columns. Concretely, the following query
3063/// using `ROWS FROM`
3064///
3065/// ```sql
3066/// SELECT
3067///     *
3068/// FROM
3069///     ROWS FROM (
3070///         generate_series(1, 2),
3071///         information_schema._pg_expandarray(ARRAY[9]),
3072///         generate_series(3, 6)
3073///     );
3074/// ```
3075///
3076/// is equivalent to the following query that does not use `ROWS FROM`:
3077///
3078/// ```sql
3079/// SELECT
3080///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3081/// FROM
3082///     generate_series(1, 2) WITH ORDINALITY AS gs1
3083///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3084///         ON gs1.ordinality = expand.ordinality
3085///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3086///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3087/// ```
3088///
3089/// Note the call to `coalesce` in the last join condition, which ensures that
3090/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3091///
3092/// This function creates a HirRelationExpr that follows the structure of the
3093/// latter query.
3094///
3095/// `with_ordinality` can be used to have the output expression contain a
3096/// single coalesced ordinality column at the end of the entire expression.
3097fn plan_rows_from(
3098    qcx: &QueryContext,
3099    functions: &[Function<Aug>],
3100    alias: Option<&TableAlias>,
3101    with_ordinality: bool,
3102) -> Result<(HirRelationExpr, Scope), PlanError> {
3103    // If there's only a single table function, planning proceeds as if `ROWS
3104    // FROM` hadn't been written at all.
3105    if let [function] = functions {
3106        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3107    }
3108
3109    // Per PostgreSQL, all scope items take the name of the first function
3110    // (unless aliased).
3111    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3112    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3113        qcx,
3114        functions,
3115        Some(functions[0].name.full_item_name().clone()),
3116    )?;
3117
3118    // Columns tracks the set of columns we will keep in the projection.
3119    let mut columns = Vec::new();
3120    let mut offset = 0;
3121    // Retain table function's non-ordinality columns.
3122    for (idx, cols) in num_cols.into_iter().enumerate() {
3123        for i in 0..cols {
3124            columns.push(offset + i);
3125        }
3126        offset += cols + 1;
3127
3128        // Remove the ordinality column from the scope, accounting for previous scope
3129        // changes from this loop.
3130        scope.items.remove(offset - idx - 1);
3131    }
3132
3133    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3134    // column. Otherwise remove it from the scope.
3135    if with_ordinality {
3136        columns.push(offset);
3137    } else {
3138        scope.items.pop();
3139    }
3140
3141    let expr = expr.project(columns);
3142
3143    let scope = plan_table_alias(scope, alias)?;
3144    Ok((expr, scope))
3145}
3146
3147/// Plans an expression coalescing multiple table functions. Each table
3148/// function is followed by its row ordinality. The entire expression is
3149/// followed by the coalesced row ordinality.
3150///
3151/// The returned Scope will set all item's table_name's to the `table_name`
3152/// parameter if it is `Some`. If `None`, they will be the name of each table
3153/// function.
3154///
3155/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3156/// each table function.
3157///
3158/// For example, with table functions tf1 returning 1 column (a) and tf2
3159/// returning 2 columns (b, c), this function will return an expr 6 columns:
3160///
3161/// - tf1.a
3162/// - tf1.ordinality
3163/// - tf2.b
3164/// - tf2.c
3165/// - tf2.ordinality
3166/// - coalesced_ordinality
3167///
3168/// And a `Vec<usize>` of `[1, 2]`.
3169fn plan_rows_from_internal<'a>(
3170    qcx: &QueryContext,
3171    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3172    table_name: Option<FullItemName>,
3173) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3174    let mut functions = functions.into_iter();
3175    let mut num_cols = Vec::new();
3176
3177    // Join together each of the table functions in turn. The last column is
3178    // always the column to join against and is maintained to be the coalescence
3179    // of the row number column for all prior functions.
3180    let (mut left_expr, mut left_scope) =
3181        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3182    num_cols.push(left_scope.len() - 1);
3183    // Create the coalesced ordinality column.
3184    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3185    left_scope
3186        .items
3187        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3188
3189    for function in functions {
3190        // The right hand side of a join must be planned in a new scope.
3191        let qcx = qcx.empty_derived_context();
3192        let (right_expr, mut right_scope) =
3193            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3194        num_cols.push(right_scope.len() - 1);
3195        let left_col = left_scope.len() - 1;
3196        let right_col = left_scope.len() + right_scope.len() - 1;
3197        let on = HirScalarExpr::call_binary(
3198            HirScalarExpr::column(left_col),
3199            HirScalarExpr::column(right_col),
3200            expr_func::Eq,
3201        );
3202        left_expr = left_expr
3203            .join(right_expr, on, JoinKind::FullOuter)
3204            .map(vec![HirScalarExpr::call_variadic(
3205                VariadicFunc::Coalesce,
3206                vec![
3207                    HirScalarExpr::column(left_col),
3208                    HirScalarExpr::column(right_col),
3209                ],
3210            )]);
3211
3212        // Project off the previous iteration's coalesced column, but keep both of this
3213        // iteration's ordinality columns.
3214        left_expr = left_expr.project(
3215            (0..left_col) // non-coalesced ordinality columns from left function
3216                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3217                .collect(),
3218        );
3219        // Move the coalesced ordinality column.
3220        right_scope.items.push(left_scope.items.pop().unwrap());
3221
3222        left_scope.items.extend(right_scope.items);
3223    }
3224
3225    Ok((left_expr, left_scope, num_cols))
3226}
3227
3228/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3229/// FROM` clause that contains other table functions. Special aliasing rules
3230/// apply.
3231fn plan_solitary_table_function(
3232    qcx: &QueryContext,
3233    function: &Function<Aug>,
3234    alias: Option<&TableAlias>,
3235    with_ordinality: bool,
3236) -> Result<(HirRelationExpr, Scope), PlanError> {
3237    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3238
3239    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3240    if single_column_function {
3241        let item = &mut scope.items[0];
3242
3243        // Mark that the function only produced a single column. This impacts
3244        // whole-row references.
3245        item.from_single_column_function = true;
3246
3247        // Strange special case for solitary table functions that output one
3248        // column whose name matches the name of the table function. If a table
3249        // alias is provided, the column name is changed to the table alias's
3250        // name. Concretely, the following query returns a column named `x`
3251        // rather than a column named `generate_series`:
3252        //
3253        //     SELECT * FROM generate_series(1, 5) AS x
3254        //
3255        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3256        // since its output column is explicitly named `value`, not
3257        // `jsonb_array_elements`.
3258        //
3259        // Note also that we may (correctly) change the column name again when
3260        // we plan the table alias below if the `alias.columns` is non-empty.
3261        if let Some(alias) = alias {
3262            if let ScopeItem {
3263                table_name: Some(table_name),
3264                column_name,
3265                ..
3266            } = item
3267            {
3268                if table_name.item.as_str() == column_name.as_str() {
3269                    *column_name = normalize::column_name(alias.name.clone());
3270                }
3271            }
3272        }
3273    }
3274
3275    let scope = plan_table_alias(scope, alias)?;
3276    Ok((expr, scope))
3277}
3278
3279/// Plans a table function.
3280///
3281/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3282/// instead to get the appropriate aliasing behavior.
3283fn plan_table_function_internal(
3284    qcx: &QueryContext,
3285    Function {
3286        name,
3287        args,
3288        filter,
3289        over,
3290        distinct,
3291    }: &Function<Aug>,
3292    with_ordinality: bool,
3293    table_name: Option<FullItemName>,
3294) -> Result<(HirRelationExpr, Scope), PlanError> {
3295    assert_none!(filter, "cannot parse table function with FILTER");
3296    assert_none!(over, "cannot parse table function with OVER");
3297    assert!(!*distinct, "cannot parse table function with DISTINCT");
3298
3299    let ecx = &ExprContext {
3300        qcx,
3301        name: "table function arguments",
3302        scope: &Scope::empty(),
3303        relation_type: &SqlRelationType::empty(),
3304        allow_aggregates: false,
3305        allow_subqueries: true,
3306        allow_parameters: true,
3307        allow_windows: false,
3308    };
3309
3310    let scalar_args = match args {
3311        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3312        FunctionArgs::Args { args, order_by } => {
3313            if !order_by.is_empty() {
3314                sql_bail!(
3315                    "ORDER BY specified, but {} is not an aggregate function",
3316                    name
3317                );
3318            }
3319            plan_exprs(ecx, args)?
3320        }
3321    };
3322
3323    let table_name = match table_name {
3324        Some(table_name) => table_name.item,
3325        None => name.full_item_name().item.clone(),
3326    };
3327
3328    let scope_name = Some(PartialItemName {
3329        database: None,
3330        schema: None,
3331        item: table_name,
3332    });
3333
3334    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3335        Func::Table(impls) => {
3336            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3337            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3338            let expr = match tf.imp {
3339                TableFuncImpl::CallTable { mut func, exprs } => {
3340                    if with_ordinality {
3341                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3342                            PlanError::Unsupported {
3343                                feature: format!("WITH ORDINALITY on {}", func),
3344                                discussion_no: None,
3345                            },
3346                        )?;
3347                    }
3348                    HirRelationExpr::CallTable { func, exprs }
3349                }
3350                TableFuncImpl::Expr(expr) => {
3351                    if !with_ordinality {
3352                        expr
3353                    } else {
3354                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3355                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3356                        // back to the legacy implementation or error out the query.
3357                        if qcx
3358                            .scx
3359                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3360                        {
3361                            // Note that this can give an incorrect ordering, and also has an extreme
3362                            // performance problem in some cases. See the doc comment of
3363                            // `TableFuncImpl`.
3364                            tracing::error!(
3365                                %name,
3366                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3367                            );
3368                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3369                                func: WindowExprType::Scalar(ScalarWindowExpr {
3370                                    func: ScalarWindowFunc::RowNumber,
3371                                    order_by: vec![],
3372                                }),
3373                                partition_by: vec![],
3374                                order_by: vec![],
3375                            })])
3376                        } else {
3377                            bail_unsupported!(format!(
3378                                "WITH ORDINALITY or ROWS FROM with {}",
3379                                name
3380                            ));
3381                        }
3382                    }
3383                }
3384            };
3385            (expr, scope)
3386        }
3387        Func::Scalar(impls) => {
3388            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3389            let output = expr.typ(
3390                &qcx.outer_relation_types,
3391                &SqlRelationType::new(vec![]),
3392                &qcx.scx.param_types.borrow(),
3393            );
3394
3395            let relation = SqlRelationType::new(vec![output]);
3396
3397            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3398            let column_name = normalize::column_name(function_ident);
3399            let name = column_name.to_string();
3400
3401            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3402
3403            let mut func = TableFunc::TabletizedScalar { relation, name };
3404            if with_ordinality {
3405                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3406                    feature: format!("WITH ORDINALITY on {}", func),
3407                    discussion_no: None,
3408                })?;
3409            }
3410            (
3411                HirRelationExpr::CallTable {
3412                    func,
3413                    exprs: vec![expr],
3414                },
3415                scope,
3416            )
3417        }
3418        o => sql_bail!(
3419            "{} functions are not supported in functions in FROM",
3420            o.class()
3421        ),
3422    };
3423
3424    if with_ordinality {
3425        scope
3426            .items
3427            .push(ScopeItem::from_name(scope_name, "ordinality"));
3428    }
3429
3430    Ok((expr, scope))
3431}
3432
3433fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3434    if let Some(TableAlias {
3435        name,
3436        columns,
3437        strict,
3438    }) = alias
3439    {
3440        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3441            sql_bail!(
3442                "{} has {} columns available but {} columns specified",
3443                name,
3444                scope.items.len(),
3445                columns.len()
3446            );
3447        }
3448
3449        let table_name = normalize::ident(name.to_owned());
3450        for (i, item) in scope.items.iter_mut().enumerate() {
3451            item.table_name = if item.allow_unqualified_references {
3452                Some(PartialItemName {
3453                    database: None,
3454                    schema: None,
3455                    item: table_name.clone(),
3456                })
3457            } else {
3458                // Columns that prohibit unqualified references are special
3459                // columns from the output of a NATURAL or USING join that can
3460                // only be referenced by their full, pre-join name. Applying an
3461                // alias to the output of that join renders those columns
3462                // inaccessible, which we accomplish here by setting the
3463                // table name to `None`.
3464                //
3465                // Concretely, consider:
3466                //
3467                //      CREATE TABLE t1 (a int);
3468                //      CREATE TABLE t2 (a int);
3469                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3470                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3471                //
3472                // In (1), the join has no alias. The underlying columns from
3473                // either side of the join can be referenced as `t1.a` and
3474                // `t2.a`, respectively, and the unqualified name `a` refers to
3475                // a column whose value is `coalesce(t1.a, t2.a)`.
3476                //
3477                // In (2), the join is aliased as `t`. The columns from either
3478                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3479                // the coalesced column can be named as either `a` or `t.a`.
3480                //
3481                // We previously had a bug [0] that mishandled this subtle
3482                // logic.
3483                //
3484                // NOTE(benesch): We could in theory choose to project away
3485                // those inaccessible columns and drop them from the scope
3486                // entirely, but that would require that this function also
3487                // take and return the `HirRelationExpr` that is being aliased,
3488                // which is a rather large refactor.
3489                //
3490                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3491                None
3492            };
3493            item.column_name = columns
3494                .get(i)
3495                .map(|a| normalize::column_name(a.clone()))
3496                .unwrap_or_else(|| item.column_name.clone());
3497        }
3498    }
3499    Ok(scope)
3500}
3501
3502// `table_func_names` is a mapping from a UUID to the original function
3503// name. The UUIDs are identifiers that have been rewritten from some table
3504// function expression, and this mapping restores the original names.
3505fn invent_column_name(
3506    ecx: &ExprContext,
3507    expr: &Expr<Aug>,
3508    table_func_names: &BTreeMap<String, Ident>,
3509) -> Option<ColumnName> {
3510    // We follow PostgreSQL exactly here, which has some complicated rules
3511    // around "high" and "low" quality names. Low quality names override other
3512    // low quality names but not high quality names.
3513    //
3514    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3515
3516    #[derive(Debug)]
3517    enum NameQuality {
3518        Low,
3519        High,
3520    }
3521
3522    fn invent(
3523        ecx: &ExprContext,
3524        expr: &Expr<Aug>,
3525        table_func_names: &BTreeMap<String, Ident>,
3526    ) -> Option<(ColumnName, NameQuality)> {
3527        match expr {
3528            Expr::Identifier(names) => {
3529                if let [name] = names.as_slice() {
3530                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3531                        return Some((
3532                            normalize::column_name(table_func_name.clone()),
3533                            NameQuality::High,
3534                        ));
3535                    }
3536                }
3537                names
3538                    .last()
3539                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3540            }
3541            Expr::Value(v) => match v {
3542                // Per PostgreSQL, `bool` and `interval` literals take on the name
3543                // of their type, but not other literal types.
3544                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3545                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3546                _ => None,
3547            },
3548            Expr::Function(func) => {
3549                let (schema, item) = match &func.name {
3550                    ResolvedItemName::Item {
3551                        qualifiers,
3552                        full_name,
3553                        ..
3554                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3555                    _ => unreachable!(),
3556                };
3557
3558                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3559                    || schema
3560                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3561                {
3562                    None
3563                } else {
3564                    Some((item.into(), NameQuality::High))
3565                }
3566            }
3567            Expr::HomogenizingFunction { function, .. } => Some((
3568                function.to_string().to_lowercase().into(),
3569                NameQuality::High,
3570            )),
3571            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3572            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3573            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3574            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3575            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3576                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3577                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3578            },
3579            Expr::Case { else_result, .. } => {
3580                match else_result
3581                    .as_ref()
3582                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3583                {
3584                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3585                    _ => Some(("case".into(), NameQuality::Low)),
3586                }
3587            }
3588            Expr::FieldAccess { field, .. } => {
3589                Some((normalize::column_name(field.clone()), NameQuality::High))
3590            }
3591            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3592            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3593            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3594                // A bit silly to have to plan the query here just to get its column
3595                // name, since we throw away the planned expression, but fixing this
3596                // requires a separate semantic analysis phase.
3597                let (_expr, scope) =
3598                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3599                scope
3600                    .items
3601                    .first()
3602                    .map(|name| (name.column_name.clone(), NameQuality::High))
3603            }
3604            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3605            _ => None,
3606        }
3607    }
3608
3609    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3610}
3611
3612#[derive(Debug)]
3613enum ExpandedSelectItem<'a> {
3614    InputOrdinal(usize),
3615    Expr(Cow<'a, Expr<Aug>>),
3616}
3617
3618impl ExpandedSelectItem<'_> {
3619    fn as_expr(&self) -> Option<&Expr<Aug>> {
3620        match self {
3621            ExpandedSelectItem::InputOrdinal(_) => None,
3622            ExpandedSelectItem::Expr(expr) => Some(expr),
3623        }
3624    }
3625}
3626
3627fn expand_select_item<'a>(
3628    ecx: &ExprContext,
3629    s: &'a SelectItem<Aug>,
3630    table_func_names: &BTreeMap<String, Ident>,
3631) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3632    match s {
3633        SelectItem::Expr {
3634            expr: Expr::QualifiedWildcard(table_name),
3635            alias: _,
3636        } => {
3637            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3638            let table_name =
3639                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3640            let out: Vec<_> = ecx
3641                .scope
3642                .items
3643                .iter()
3644                .enumerate()
3645                .filter(|(_i, item)| item.is_from_table(&table_name))
3646                .map(|(i, item)| {
3647                    let name = item.column_name.clone();
3648                    (ExpandedSelectItem::InputOrdinal(i), name)
3649                })
3650                .collect();
3651            if out.is_empty() {
3652                sql_bail!("no table named '{}' in scope", table_name);
3653            }
3654            Ok(out)
3655        }
3656        SelectItem::Expr {
3657            expr: Expr::WildcardAccess(sql_expr),
3658            alias: _,
3659        } => {
3660            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3661            // A bit silly to have to plan the expression here just to get its
3662            // type, since we throw away the planned expression, but fixing this
3663            // requires a separate semantic analysis phase. Luckily this is an
3664            // uncommon operation and the PostgreSQL docs have a warning that
3665            // this operation is slow in Postgres too.
3666            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3667            let fields = match ecx.scalar_type(&expr) {
3668                SqlScalarType::Record { fields, .. } => fields,
3669                ty => sql_bail!(
3670                    "type {} is not composite",
3671                    ecx.humanize_scalar_type(&ty, false)
3672                ),
3673            };
3674            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3675            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3676                if let [name] = ident.as_slice() {
3677                    if let Ok(items) = ecx.scope.items_from_table(
3678                        &[],
3679                        &PartialItemName {
3680                            database: None,
3681                            schema: None,
3682                            item: name.as_str().to_string(),
3683                        },
3684                    ) {
3685                        for (_, item) in items {
3686                            if item
3687                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3688                            {
3689                                skip_cols.insert(item.column_name.clone());
3690                            }
3691                        }
3692                    }
3693                }
3694            }
3695            let items = fields
3696                .iter()
3697                .filter_map(|(name, _ty)| {
3698                    if skip_cols.contains(name) {
3699                        None
3700                    } else {
3701                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3702                            expr: sql_expr.clone(),
3703                            field: name.clone().into(),
3704                        }));
3705                        Some((item, name.clone()))
3706                    }
3707                })
3708                .collect();
3709            Ok(items)
3710        }
3711        SelectItem::Wildcard => {
3712            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3713            let items: Vec<_> = ecx
3714                .scope
3715                .items
3716                .iter()
3717                .enumerate()
3718                .filter(|(_i, item)| item.allow_unqualified_references)
3719                .map(|(i, item)| {
3720                    let name = item.column_name.clone();
3721                    (ExpandedSelectItem::InputOrdinal(i), name)
3722                })
3723                .collect();
3724
3725            Ok(items)
3726        }
3727        SelectItem::Expr { expr, alias } => {
3728            let name = alias
3729                .clone()
3730                .map(normalize::column_name)
3731                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3732                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3733            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3734        }
3735    }
3736}
3737
3738fn plan_join(
3739    left_qcx: &QueryContext,
3740    left: HirRelationExpr,
3741    left_scope: Scope,
3742    join: &Join<Aug>,
3743) -> Result<(HirRelationExpr, Scope), PlanError> {
3744    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3745    let (kind, constraint) = match &join.join_operator {
3746        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3747        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3748        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3749        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3750        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3751    };
3752
3753    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3754    if !kind.can_be_correlated() {
3755        for item in &mut right_qcx.outer_scopes[0].items {
3756            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3757            // these items from scope. These items need to *exist* because they
3758            // might shadow variables in outer scopes that would otherwise be
3759            // valid to reference, but accessing them needs to produce an error.
3760            item.error_if_referenced =
3761                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3762                    table: table.cloned(),
3763                    column: column.clone(),
3764                });
3765        }
3766    }
3767    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3768
3769    let (expr, scope) = match constraint {
3770        JoinConstraint::On(expr) => {
3771            let product_scope = left_scope.product(right_scope)?;
3772            let ecx = &ExprContext {
3773                qcx: left_qcx,
3774                name: "ON clause",
3775                scope: &product_scope,
3776                relation_type: &SqlRelationType::new(
3777                    left_qcx
3778                        .relation_type(&left)
3779                        .column_types
3780                        .into_iter()
3781                        .chain(right_qcx.relation_type(&right).column_types)
3782                        .collect(),
3783                ),
3784                allow_aggregates: false,
3785                allow_subqueries: true,
3786                allow_parameters: true,
3787                allow_windows: false,
3788            };
3789            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3790            let joined = left.join(right, on, kind);
3791            (joined, product_scope)
3792        }
3793        JoinConstraint::Using { columns, alias } => {
3794            let column_names = columns
3795                .iter()
3796                .map(|ident| normalize::column_name(ident.clone()))
3797                .collect::<Vec<_>>();
3798
3799            plan_using_constraint(
3800                &column_names,
3801                left_qcx,
3802                left,
3803                left_scope,
3804                &right_qcx,
3805                right,
3806                right_scope,
3807                kind,
3808                alias.as_ref(),
3809            )?
3810        }
3811        JoinConstraint::Natural => {
3812            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3813            // have the same scx. However, it doesn't hurt to be safe.
3814            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3815            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3816            let left_column_names = left_scope.column_names();
3817            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3818            let column_names: Vec<_> = left_column_names
3819                .filter(|col| right_column_names.contains(col))
3820                .cloned()
3821                .collect();
3822            plan_using_constraint(
3823                &column_names,
3824                left_qcx,
3825                left,
3826                left_scope,
3827                &right_qcx,
3828                right,
3829                right_scope,
3830                kind,
3831                None,
3832            )?
3833        }
3834    };
3835    Ok((expr, scope))
3836}
3837
3838// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3839#[allow(clippy::too_many_arguments)]
3840fn plan_using_constraint(
3841    column_names: &[ColumnName],
3842    left_qcx: &QueryContext,
3843    left: HirRelationExpr,
3844    left_scope: Scope,
3845    right_qcx: &QueryContext,
3846    right: HirRelationExpr,
3847    right_scope: Scope,
3848    kind: JoinKind,
3849    alias: Option<&Ident>,
3850) -> Result<(HirRelationExpr, Scope), PlanError> {
3851    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3852
3853    // Cargo culting PG here; no discernable reason this must fail, but PG does
3854    // so we do, as well.
3855    let mut unique_column_names = BTreeSet::new();
3856    for c in column_names {
3857        if !unique_column_names.insert(c) {
3858            return Err(PlanError::Unsupported {
3859                feature: format!(
3860                    "column name {} appears more than once in USING clause",
3861                    c.quoted()
3862                ),
3863                discussion_no: None,
3864            });
3865        }
3866    }
3867
3868    let alias_item_name = alias.map(|alias| PartialItemName {
3869        database: None,
3870        schema: None,
3871        item: alias.clone().to_string(),
3872    });
3873
3874    if let Some(alias_item_name) = &alias_item_name {
3875        for partial_item_name in both_scope.table_names() {
3876            if partial_item_name.matches(alias_item_name) {
3877                sql_bail!(
3878                    "table name \"{}\" specified more than once",
3879                    alias_item_name
3880                )
3881            }
3882        }
3883    }
3884
3885    let ecx = &ExprContext {
3886        qcx: right_qcx,
3887        name: "USING clause",
3888        scope: &both_scope,
3889        relation_type: &SqlRelationType::new(
3890            left_qcx
3891                .relation_type(&left)
3892                .column_types
3893                .into_iter()
3894                .chain(right_qcx.relation_type(&right).column_types)
3895                .collect(),
3896        ),
3897        allow_aggregates: false,
3898        allow_subqueries: false,
3899        allow_parameters: false,
3900        allow_windows: false,
3901    };
3902
3903    let mut join_exprs = vec![];
3904    let mut map_exprs = vec![];
3905    let mut new_items = vec![];
3906    let mut join_cols = vec![];
3907    let mut hidden_cols = vec![];
3908
3909    for column_name in column_names {
3910        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3911        let (lhs, lhs_name) = left_scope.resolve_using_column(
3912            column_name,
3913            JoinSide::Left,
3914            &mut left_qcx.name_manager.borrow_mut(),
3915        )?;
3916        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3917            column_name,
3918            JoinSide::Right,
3919            &mut right_qcx.name_manager.borrow_mut(),
3920        )?;
3921
3922        // Adjust the RHS reference to its post-join location.
3923        rhs.column += left_scope.len();
3924
3925        // Join keys must be resolved to same type.
3926        let mut exprs = coerce_homogeneous_exprs(
3927            &ecx.with_name(&format!(
3928                "NATURAL/USING join column {}",
3929                column_name.quoted()
3930            )),
3931            vec![
3932                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3933                    lhs,
3934                    Arc::clone(&lhs_name),
3935                )),
3936                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3937                    rhs,
3938                    Arc::clone(&rhs_name),
3939                )),
3940            ],
3941            None,
3942        )?;
3943        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3944
3945        match kind {
3946            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3947                join_cols.push(lhs.column);
3948                hidden_cols.push(rhs.column);
3949            }
3950            JoinKind::RightOuter => {
3951                join_cols.push(rhs.column);
3952                hidden_cols.push(lhs.column);
3953            }
3954            JoinKind::FullOuter => {
3955                // Create a new column that will be the coalesced value of left
3956                // and right.
3957                join_cols.push(both_scope.items.len() + map_exprs.len());
3958                hidden_cols.push(lhs.column);
3959                hidden_cols.push(rhs.column);
3960                map_exprs.push(HirScalarExpr::call_variadic(
3961                    VariadicFunc::Coalesce,
3962                    vec![expr1.clone(), expr2.clone()],
3963                ));
3964                new_items.push(ScopeItem::from_column_name(column_name));
3965            }
3966        }
3967
3968        // If a `join_using_alias` is present, add a new scope item that accepts
3969        // only table-qualified references for each specified join column.
3970        // Unlike regular table aliases, a `join_using_alias` should not hide the
3971        // names of the joined relations.
3972        if alias_item_name.is_some() {
3973            let new_item_col = both_scope.items.len() + new_items.len();
3974            join_cols.push(new_item_col);
3975            hidden_cols.push(new_item_col);
3976
3977            new_items.push(ScopeItem::from_name(
3978                alias_item_name.clone(),
3979                column_name.clone().to_string(),
3980            ));
3981
3982            // Should be safe to use either `lhs` or `rhs` here since the column
3983            // is available in both scopes and must have the same type of the new item.
3984            // We (arbitrarily) choose the left name.
3985            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3986        }
3987
3988        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3989    }
3990    both_scope.items.extend(new_items);
3991
3992    // The columns from the secondary side of the join remain accessible by
3993    // their table-qualified name, but not by their column name alone. They are
3994    // also excluded from `SELECT *`.
3995    for c in hidden_cols {
3996        both_scope.items[c].allow_unqualified_references = false;
3997    }
3998
3999    // Reproject all returned elements to the front of the list.
4000    let project_key = join_cols
4001        .into_iter()
4002        .chain(0..both_scope.items.len())
4003        .unique()
4004        .collect::<Vec<_>>();
4005
4006    both_scope = both_scope.project(&project_key);
4007
4008    let on = HirScalarExpr::variadic_and(join_exprs);
4009
4010    let both = left
4011        .join(right, on, kind)
4012        .map(map_exprs)
4013        .project(project_key);
4014    Ok((both, both_scope))
4015}
4016
4017pub fn plan_expr<'a>(
4018    ecx: &'a ExprContext,
4019    e: &Expr<Aug>,
4020) -> Result<CoercibleScalarExpr, PlanError> {
4021    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4022}
4023
4024fn plan_expr_inner<'a>(
4025    ecx: &'a ExprContext,
4026    e: &Expr<Aug>,
4027) -> Result<CoercibleScalarExpr, PlanError> {
4028    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4029        // We've already calculated this expression.
4030        return Ok(HirScalarExpr::named_column(
4031            i,
4032            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4033        )
4034        .into());
4035    }
4036
4037    match e {
4038        // Names.
4039        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4040            Ok(plan_identifier(ecx, names)?.into())
4041        }
4042
4043        // Literals.
4044        Expr::Value(val) => plan_literal(val),
4045        Expr::Parameter(n) => plan_parameter(ecx, *n),
4046        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4047        Expr::List(exprs) => plan_list(ecx, exprs, None),
4048        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4049        Expr::Row { exprs } => plan_row(ecx, exprs),
4050
4051        // Generalized functions, operators, and casts.
4052        Expr::Op { op, expr1, expr2 } => {
4053            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4054        }
4055        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4056        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4057
4058        // Special functions and operators.
4059        Expr::Not { expr } => plan_not(ecx, expr),
4060        Expr::And { left, right } => plan_and(ecx, left, right),
4061        Expr::Or { left, right } => plan_or(ecx, left, right),
4062        Expr::IsExpr {
4063            expr,
4064            construct,
4065            negated,
4066        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4067        Expr::Case {
4068            operand,
4069            conditions,
4070            results,
4071            else_result,
4072        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4073        Expr::HomogenizingFunction { function, exprs } => {
4074            plan_homogenizing_function(ecx, function, exprs)
4075        }
4076        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4077            ecx,
4078            &None,
4079            &[l_expr.clone().equals(*r_expr.clone())],
4080            &[Expr::null()],
4081            &Some(Box::new(*l_expr.clone())),
4082        )?
4083        .into()),
4084        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4085        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4086        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4087        Expr::Like {
4088            expr,
4089            pattern,
4090            escape,
4091            case_insensitive,
4092            negated,
4093        } => Ok(plan_like(
4094            ecx,
4095            expr,
4096            pattern,
4097            escape.as_deref(),
4098            *case_insensitive,
4099            *negated,
4100        )?
4101        .into()),
4102
4103        Expr::InList {
4104            expr,
4105            list,
4106            negated,
4107        } => plan_in_list(ecx, expr, list, negated),
4108
4109        // Subqueries.
4110        Expr::Exists(query) => plan_exists(ecx, query),
4111        Expr::Subquery(query) => plan_subquery(ecx, query),
4112        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4113        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4114        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4115        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4116        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4117        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4118        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4119        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4120        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4121        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4122        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4123    }
4124}
4125
4126fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4127    if !ecx.allow_parameters {
4128        // It might be clearer to return an error like "cannot use parameter
4129        // here", but this is how PostgreSQL does it, and so for now we follow
4130        // PostgreSQL.
4131        return Err(PlanError::UnknownParameter(n));
4132    }
4133    if n == 0 || n > 65536 {
4134        return Err(PlanError::UnknownParameter(n));
4135    }
4136    if ecx.param_types().borrow().contains_key(&n) {
4137        Ok(HirScalarExpr::parameter(n).into())
4138    } else {
4139        Ok(CoercibleScalarExpr::Parameter(n))
4140    }
4141}
4142
4143fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4144    let mut out = vec![];
4145    for e in exprs {
4146        out.push(plan_expr(ecx, e)?);
4147    }
4148    Ok(CoercibleScalarExpr::LiteralRecord(out))
4149}
4150
4151fn plan_cast(
4152    ecx: &ExprContext,
4153    expr: &Expr<Aug>,
4154    data_type: &ResolvedDataType,
4155) -> Result<CoercibleScalarExpr, PlanError> {
4156    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4157    let expr = match expr {
4158        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4159        // we can pass in the target type as a type hint. This is
4160        // a limited form of the coercion that we do for string literals
4161        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4162        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4163        // PostgreSQL compatibility trouble.
4164        //
4165        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4166        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4167        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4168        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4169        _ => plan_expr(ecx, expr)?,
4170    };
4171    let ecx = &ecx.with_name("CAST");
4172    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4173    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4174    Ok(expr.into())
4175}
4176
4177fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4178    let ecx = ecx.with_name("NOT argument");
4179    Ok(plan_expr(&ecx, expr)?
4180        .type_as(&ecx, &SqlScalarType::Bool)?
4181        .call_unary(UnaryFunc::Not(expr_func::Not))
4182        .into())
4183}
4184
4185fn plan_and(
4186    ecx: &ExprContext,
4187    left: &Expr<Aug>,
4188    right: &Expr<Aug>,
4189) -> Result<CoercibleScalarExpr, PlanError> {
4190    let ecx = ecx.with_name("AND argument");
4191    Ok(HirScalarExpr::variadic_and(vec![
4192        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4193        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4194    ])
4195    .into())
4196}
4197
4198fn plan_or(
4199    ecx: &ExprContext,
4200    left: &Expr<Aug>,
4201    right: &Expr<Aug>,
4202) -> Result<CoercibleScalarExpr, PlanError> {
4203    let ecx = ecx.with_name("OR argument");
4204    Ok(HirScalarExpr::variadic_or(vec![
4205        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4206        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4207    ])
4208    .into())
4209}
4210
4211fn plan_in_list(
4212    ecx: &ExprContext,
4213    lhs: &Expr<Aug>,
4214    list: &Vec<Expr<Aug>>,
4215    negated: &bool,
4216) -> Result<CoercibleScalarExpr, PlanError> {
4217    let ecx = ecx.with_name("IN list");
4218    let or = HirScalarExpr::variadic_or(
4219        list.into_iter()
4220            .map(|e| {
4221                let eq = lhs.clone().equals(e.clone());
4222                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4223            })
4224            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4225    );
4226    Ok(if *negated {
4227        or.call_unary(UnaryFunc::Not(expr_func::Not))
4228    } else {
4229        or
4230    }
4231    .into())
4232}
4233
4234fn plan_homogenizing_function(
4235    ecx: &ExprContext,
4236    function: &HomogenizingFunction,
4237    exprs: &[Expr<Aug>],
4238) -> Result<CoercibleScalarExpr, PlanError> {
4239    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4240    let expr = HirScalarExpr::call_variadic(
4241        match function {
4242            HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4243            HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4244            HomogenizingFunction::Least => VariadicFunc::Least,
4245        },
4246        coerce_homogeneous_exprs(
4247            &ecx.with_name(&function.to_string().to_lowercase()),
4248            plan_exprs(ecx, exprs)?,
4249            None,
4250        )?,
4251    );
4252    Ok(expr.into())
4253}
4254
4255fn plan_field_access(
4256    ecx: &ExprContext,
4257    expr: &Expr<Aug>,
4258    field: &Ident,
4259) -> Result<CoercibleScalarExpr, PlanError> {
4260    let field = normalize::column_name(field.clone());
4261    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4262    let ty = ecx.scalar_type(&expr);
4263    let i = match &ty {
4264        SqlScalarType::Record { fields, .. } => {
4265            fields.iter().position(|(name, _ty)| *name == field)
4266        }
4267        ty => sql_bail!(
4268            "column notation applied to type {}, which is not a composite type",
4269            ecx.humanize_scalar_type(ty, false)
4270        ),
4271    };
4272    match i {
4273        None => sql_bail!(
4274            "field {} not found in data type {}",
4275            field,
4276            ecx.humanize_scalar_type(&ty, false)
4277        ),
4278        Some(i) => Ok(expr
4279            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4280            .into()),
4281    }
4282}
4283
4284fn plan_subscript(
4285    ecx: &ExprContext,
4286    expr: &Expr<Aug>,
4287    positions: &[SubscriptPosition<Aug>],
4288) -> Result<CoercibleScalarExpr, PlanError> {
4289    assert!(
4290        !positions.is_empty(),
4291        "subscript expression must contain at least one position"
4292    );
4293
4294    let ecx = &ecx.with_name("subscripting");
4295    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4296    let ty = ecx.scalar_type(&expr);
4297    match &ty {
4298        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4299            ecx,
4300            expr,
4301            positions,
4302            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4303            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4304            // track in its backing data).
4305            if ty == SqlScalarType::Int2Vector {
4306                1
4307            } else {
4308                0
4309            },
4310        ),
4311        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4312        SqlScalarType::List { element_type, .. } => {
4313            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4314            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4315            let n_layers = ty.unwrap_list_n_layers();
4316            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4317        }
4318        ty => sql_bail!(
4319            "cannot subscript type {}",
4320            ecx.humanize_scalar_type(ty, false)
4321        ),
4322    }
4323}
4324
4325// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4326// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4327// any were slices (i.e. included colon).
4328fn extract_scalar_subscript_from_positions<'a>(
4329    positions: &'a [SubscriptPosition<Aug>],
4330    expr_type_name: &str,
4331) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4332    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4333    for p in positions {
4334        if p.explicit_slice {
4335            sql_bail!("{} subscript does not support slices", expr_type_name);
4336        }
4337        assert!(
4338            p.end.is_none(),
4339            "index-appearing subscripts cannot have end value"
4340        );
4341        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4342    }
4343    Ok(scalar_subscripts)
4344}
4345
4346fn plan_subscript_array(
4347    ecx: &ExprContext,
4348    expr: HirScalarExpr,
4349    positions: &[SubscriptPosition<Aug>],
4350    offset: i64,
4351) -> Result<CoercibleScalarExpr, PlanError> {
4352    let mut exprs = Vec::with_capacity(positions.len() + 1);
4353    exprs.push(expr);
4354
4355    // Subscripting arrays doesn't yet support slicing, so we always want to
4356    // extract scalars or error.
4357    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4358
4359    for i in indexes {
4360        exprs.push(plan_expr(ecx, i)?.cast_to(
4361            ecx,
4362            CastContext::Explicit,
4363            &SqlScalarType::Int64,
4364        )?);
4365    }
4366
4367    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4368}
4369
4370fn plan_subscript_list(
4371    ecx: &ExprContext,
4372    mut expr: HirScalarExpr,
4373    positions: &[SubscriptPosition<Aug>],
4374    mut remaining_layers: usize,
4375    elem_type_name: &str,
4376) -> Result<CoercibleScalarExpr, PlanError> {
4377    let mut i = 0;
4378
4379    while i < positions.len() {
4380        // Take all contiguous index operations, i.e. find next slice operation.
4381        let j = positions[i..]
4382            .iter()
4383            .position(|p| p.explicit_slice)
4384            .unwrap_or(positions.len() - i);
4385        if j != 0 {
4386            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4387            let (n, e) = plan_index_list(
4388                ecx,
4389                expr,
4390                indexes.as_slice(),
4391                remaining_layers,
4392                elem_type_name,
4393            )?;
4394            remaining_layers = n;
4395            expr = e;
4396            i += j;
4397        }
4398
4399        // Take all contiguous slice operations, i.e. find next index operation.
4400        let j = positions[i..]
4401            .iter()
4402            .position(|p| !p.explicit_slice)
4403            .unwrap_or(positions.len() - i);
4404        if j != 0 {
4405            expr = plan_slice_list(
4406                ecx,
4407                expr,
4408                &positions[i..i + j],
4409                remaining_layers,
4410                elem_type_name,
4411            )?;
4412            i += j;
4413        }
4414    }
4415
4416    Ok(expr.into())
4417}
4418
4419fn plan_index_list(
4420    ecx: &ExprContext,
4421    expr: HirScalarExpr,
4422    indexes: &[&Expr<Aug>],
4423    n_layers: usize,
4424    elem_type_name: &str,
4425) -> Result<(usize, HirScalarExpr), PlanError> {
4426    let depth = indexes.len();
4427
4428    if depth > n_layers {
4429        if n_layers == 0 {
4430            sql_bail!("cannot subscript type {}", elem_type_name)
4431        } else {
4432            sql_bail!(
4433                "cannot index into {} layers; list only has {} layer{}",
4434                depth,
4435                n_layers,
4436                if n_layers == 1 { "" } else { "s" }
4437            )
4438        }
4439    }
4440
4441    let mut exprs = Vec::with_capacity(depth + 1);
4442    exprs.push(expr);
4443
4444    for i in indexes {
4445        exprs.push(plan_expr(ecx, i)?.cast_to(
4446            ecx,
4447            CastContext::Explicit,
4448            &SqlScalarType::Int64,
4449        )?);
4450    }
4451
4452    Ok((
4453        n_layers - depth,
4454        HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4455    ))
4456}
4457
4458fn plan_slice_list(
4459    ecx: &ExprContext,
4460    expr: HirScalarExpr,
4461    slices: &[SubscriptPosition<Aug>],
4462    n_layers: usize,
4463    elem_type_name: &str,
4464) -> Result<HirScalarExpr, PlanError> {
4465    if n_layers == 0 {
4466        sql_bail!("cannot subscript type {}", elem_type_name)
4467    }
4468
4469    // first arg will be list
4470    let mut exprs = Vec::with_capacity(slices.len() + 1);
4471    exprs.push(expr);
4472    // extract (start, end) parts from collected slices
4473    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4474        Ok(match position {
4475            Some(p) => {
4476                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4477            }
4478            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4479        })
4480    };
4481    for p in slices {
4482        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4483        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4484        exprs.push(start);
4485        exprs.push(end);
4486    }
4487
4488    Ok(HirScalarExpr::call_variadic(
4489        VariadicFunc::ListSliceLinear,
4490        exprs,
4491    ))
4492}
4493
4494fn plan_like(
4495    ecx: &ExprContext,
4496    expr: &Expr<Aug>,
4497    pattern: &Expr<Aug>,
4498    escape: Option<&Expr<Aug>>,
4499    case_insensitive: bool,
4500    not: bool,
4501) -> Result<HirScalarExpr, PlanError> {
4502    use CastContext::Implicit;
4503    let ecx = ecx.with_name("LIKE argument");
4504    let expr = plan_expr(&ecx, expr)?;
4505    let haystack = match ecx.scalar_type(&expr) {
4506        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4507            .type_as(&ecx, ty)?
4508            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4509        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4510    };
4511    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4512    if let Some(escape) = escape {
4513        pattern = pattern.call_binary(
4514            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4515            expr_func::LikeEscape,
4516        );
4517    }
4518    let func: BinaryFunc = if case_insensitive {
4519        expr_func::IsLikeMatchCaseInsensitive.into()
4520    } else {
4521        expr_func::IsLikeMatchCaseSensitive.into()
4522    };
4523    let like = haystack.call_binary(pattern, func);
4524    if not {
4525        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4526    } else {
4527        Ok(like)
4528    }
4529}
4530
4531fn plan_subscript_jsonb(
4532    ecx: &ExprContext,
4533    expr: HirScalarExpr,
4534    positions: &[SubscriptPosition<Aug>],
4535) -> Result<CoercibleScalarExpr, PlanError> {
4536    use CastContext::Implicit;
4537    use SqlScalarType::{Int64, String};
4538
4539    // JSONB doesn't support the slicing syntax, so simply error if you
4540    // encounter any explicit slices.
4541    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4542
4543    let mut exprs = Vec::with_capacity(subscripts.len());
4544    for s in subscripts {
4545        let subscript = plan_expr(ecx, s)?;
4546        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4547            subscript
4548        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4549            // Integers are converted to a string here and then re-parsed as an
4550            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4551            // do it.
4552            typeconv::to_string(ecx, subscript)
4553        } else {
4554            sql_bail!("jsonb subscript type must be coercible to integer or text");
4555        };
4556        exprs.push(subscript);
4557    }
4558
4559    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4560    // `expr->subscript` as you might expect.
4561    let expr = expr.call_binary(
4562        HirScalarExpr::call_variadic(
4563            VariadicFunc::ArrayCreate {
4564                elem_type: SqlScalarType::String,
4565            },
4566            exprs,
4567        ),
4568        expr_func::JsonbGetPath,
4569    );
4570    Ok(expr.into())
4571}
4572
4573fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4574    if !ecx.allow_subqueries {
4575        sql_bail!("{} does not allow subqueries", ecx.name)
4576    }
4577    let mut qcx = ecx.derived_query_context();
4578    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4579    Ok(expr.exists().into())
4580}
4581
4582fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4583    if !ecx.allow_subqueries {
4584        sql_bail!("{} does not allow subqueries", ecx.name)
4585    }
4586    let mut qcx = ecx.derived_query_context();
4587    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4588    let column_types = qcx.relation_type(&expr).column_types;
4589    if column_types.len() != 1 {
4590        sql_bail!(
4591            "Expected subselect to return 1 column, got {} columns",
4592            column_types.len()
4593        );
4594    }
4595    Ok(expr.select().into())
4596}
4597
4598fn plan_list_subquery(
4599    ecx: &ExprContext,
4600    query: &Query<Aug>,
4601) -> Result<CoercibleScalarExpr, PlanError> {
4602    plan_vector_like_subquery(
4603        ecx,
4604        query,
4605        |_| false,
4606        |elem_type| VariadicFunc::ListCreate { elem_type },
4607        |order_by| AggregateFunc::ListConcat { order_by },
4608        expr_func::ListListConcat.into(),
4609        |elem_type| {
4610            HirScalarExpr::literal(
4611                Datum::empty_list(),
4612                SqlScalarType::List {
4613                    element_type: Box::new(elem_type),
4614                    custom_id: None,
4615                },
4616            )
4617        },
4618        "list",
4619    )
4620}
4621
4622fn plan_array_subquery(
4623    ecx: &ExprContext,
4624    query: &Query<Aug>,
4625) -> Result<CoercibleScalarExpr, PlanError> {
4626    plan_vector_like_subquery(
4627        ecx,
4628        query,
4629        |elem_type| {
4630            matches!(
4631                elem_type,
4632                SqlScalarType::Char { .. }
4633                    | SqlScalarType::Array { .. }
4634                    | SqlScalarType::List { .. }
4635                    | SqlScalarType::Map { .. }
4636            )
4637        },
4638        |elem_type| VariadicFunc::ArrayCreate { elem_type },
4639        |order_by| AggregateFunc::ArrayConcat { order_by },
4640        expr_func::ArrayArrayConcat.into(),
4641        |elem_type| {
4642            HirScalarExpr::literal(
4643                Datum::empty_array(),
4644                SqlScalarType::Array(Box::new(elem_type)),
4645            )
4646        },
4647        "[]",
4648    )
4649}
4650
4651/// Generic function used to plan both array subqueries and list subqueries
4652fn plan_vector_like_subquery<F1, F2, F3, F4>(
4653    ecx: &ExprContext,
4654    query: &Query<Aug>,
4655    is_unsupported_type: F1,
4656    vector_create: F2,
4657    aggregate_concat: F3,
4658    binary_concat: BinaryFunc,
4659    empty_literal: F4,
4660    vector_type_string: &str,
4661) -> Result<CoercibleScalarExpr, PlanError>
4662where
4663    F1: Fn(&SqlScalarType) -> bool,
4664    F2: Fn(SqlScalarType) -> VariadicFunc,
4665    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4666    F4: Fn(SqlScalarType) -> HirScalarExpr,
4667{
4668    if !ecx.allow_subqueries {
4669        sql_bail!("{} does not allow subqueries", ecx.name)
4670    }
4671
4672    let mut qcx = ecx.derived_query_context();
4673    let mut planned_query = plan_query(&mut qcx, query)?;
4674    if planned_query.limit.is_some()
4675        || !planned_query
4676            .offset
4677            .clone()
4678            .try_into_literal_int64()
4679            .is_ok_and(|offset| offset == 0)
4680    {
4681        planned_query.expr = HirRelationExpr::top_k(
4682            planned_query.expr,
4683            vec![],
4684            planned_query.order_by.clone(),
4685            planned_query.limit,
4686            planned_query.offset,
4687            planned_query.group_size_hints.limit_input_group_size,
4688        );
4689    }
4690
4691    if planned_query.project.len() != 1 {
4692        sql_bail!(
4693            "Expected subselect to return 1 column, got {} columns",
4694            planned_query.project.len()
4695        );
4696    }
4697
4698    let project_column = *planned_query.project.get(0).unwrap();
4699    let elem_type = qcx
4700        .relation_type(&planned_query.expr)
4701        .column_types
4702        .get(project_column)
4703        .cloned()
4704        .unwrap()
4705        .scalar_type();
4706
4707    if is_unsupported_type(&elem_type) {
4708        bail_unsupported!(format!(
4709            "cannot build array from subquery because return type {}{}",
4710            ecx.humanize_scalar_type(&elem_type, false),
4711            vector_type_string
4712        ));
4713    }
4714
4715    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4716    // subquery above.
4717    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4718        vector_create(elem_type.clone()),
4719        vec![HirScalarExpr::column(project_column)],
4720    ))
4721    .chain(
4722        planned_query
4723            .order_by
4724            .iter()
4725            .map(|co| HirScalarExpr::column(co.column)),
4726    )
4727    .collect();
4728
4729    // However, column references for `aggregation_projection` and `aggregation_order_by`
4730    // are with reference to the `exprs` of the aggregation expression.  Here that is
4731    // `aggregation_exprs`.
4732    let aggregation_projection = vec![0];
4733    let aggregation_order_by = planned_query
4734        .order_by
4735        .into_iter()
4736        .enumerate()
4737        .map(|(i, order)| ColumnOrder { column: i, ..order })
4738        .collect();
4739
4740    let reduced_expr = planned_query
4741        .expr
4742        .reduce(
4743            vec![],
4744            vec![AggregateExpr {
4745                func: aggregate_concat(aggregation_order_by),
4746                expr: Box::new(HirScalarExpr::call_variadic(
4747                    VariadicFunc::RecordCreate {
4748                        field_names: iter::repeat(ColumnName::from(""))
4749                            .take(aggregation_exprs.len())
4750                            .collect(),
4751                    },
4752                    aggregation_exprs,
4753                )),
4754                distinct: false,
4755            }],
4756            None,
4757        )
4758        .project(aggregation_projection);
4759
4760    // If `expr` has no rows, return an empty array/list rather than NULL.
4761    Ok(reduced_expr
4762        .select()
4763        .call_binary(empty_literal(elem_type), binary_concat)
4764        .into())
4765}
4766
4767fn plan_map_subquery(
4768    ecx: &ExprContext,
4769    query: &Query<Aug>,
4770) -> Result<CoercibleScalarExpr, PlanError> {
4771    if !ecx.allow_subqueries {
4772        sql_bail!("{} does not allow subqueries", ecx.name)
4773    }
4774
4775    let mut qcx = ecx.derived_query_context();
4776    let mut query = plan_query(&mut qcx, query)?;
4777    if query.limit.is_some()
4778        || !query
4779            .offset
4780            .clone()
4781            .try_into_literal_int64()
4782            .is_ok_and(|offset| offset == 0)
4783    {
4784        query.expr = HirRelationExpr::top_k(
4785            query.expr,
4786            vec![],
4787            query.order_by.clone(),
4788            query.limit,
4789            query.offset,
4790            query.group_size_hints.limit_input_group_size,
4791        );
4792    }
4793    if query.project.len() != 2 {
4794        sql_bail!(
4795            "expected map subquery to return 2 columns, got {} columns",
4796            query.project.len()
4797        );
4798    }
4799
4800    let query_types = qcx.relation_type(&query.expr).column_types;
4801    let key_column = query.project[0];
4802    let key_type = query_types[key_column].clone().scalar_type();
4803    let value_column = query.project[1];
4804    let value_type = query_types[value_column].clone().scalar_type();
4805
4806    if key_type != SqlScalarType::String {
4807        sql_bail!("cannot build map from subquery because first column is not of type text");
4808    }
4809
4810    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4811        VariadicFunc::RecordCreate {
4812            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4813        },
4814        vec![
4815            HirScalarExpr::column(key_column),
4816            HirScalarExpr::column(value_column),
4817        ],
4818    ))
4819    .chain(
4820        query
4821            .order_by
4822            .iter()
4823            .map(|co| HirScalarExpr::column(co.column)),
4824    )
4825    .collect();
4826
4827    let expr = query
4828        .expr
4829        .reduce(
4830            vec![],
4831            vec![AggregateExpr {
4832                func: AggregateFunc::MapAgg {
4833                    order_by: query
4834                        .order_by
4835                        .into_iter()
4836                        .enumerate()
4837                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4838                        .collect(),
4839                    value_type: value_type.clone(),
4840                },
4841                expr: Box::new(HirScalarExpr::call_variadic(
4842                    VariadicFunc::RecordCreate {
4843                        field_names: iter::repeat(ColumnName::from(""))
4844                            .take(aggregation_exprs.len())
4845                            .collect(),
4846                    },
4847                    aggregation_exprs,
4848                )),
4849                distinct: false,
4850            }],
4851            None,
4852        )
4853        .project(vec![0]);
4854
4855    // If `expr` has no rows, return an empty map rather than NULL.
4856    let expr = HirScalarExpr::call_variadic(
4857        VariadicFunc::Coalesce,
4858        vec![
4859            expr.select(),
4860            HirScalarExpr::literal(
4861                Datum::empty_map(),
4862                SqlScalarType::Map {
4863                    value_type: Box::new(value_type),
4864                    custom_id: None,
4865                },
4866            ),
4867        ],
4868    );
4869
4870    Ok(expr.into())
4871}
4872
4873fn plan_collate(
4874    ecx: &ExprContext,
4875    expr: &Expr<Aug>,
4876    collation: &UnresolvedItemName,
4877) -> Result<CoercibleScalarExpr, PlanError> {
4878    if collation.0.len() == 2
4879        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4880        && collation.0[1] == ident!("default")
4881    {
4882        plan_expr(ecx, expr)
4883    } else {
4884        bail_unsupported!("COLLATE");
4885    }
4886}
4887
4888/// Plans a slice of expressions.
4889///
4890/// This function is a simple convenience function for mapping [`plan_expr`]
4891/// over a slice of expressions. The planned expressions are returned in the
4892/// same order as the input. If any of the expressions fail to plan, returns an
4893/// error instead.
4894fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4895where
4896    E: std::borrow::Borrow<Expr<Aug>>,
4897{
4898    let mut out = vec![];
4899    for expr in exprs {
4900        out.push(plan_expr(ecx, expr.borrow())?);
4901    }
4902    Ok(out)
4903}
4904
4905/// Plans an `ARRAY` expression.
4906fn plan_array(
4907    ecx: &ExprContext,
4908    exprs: &[Expr<Aug>],
4909    type_hint: Option<&SqlScalarType>,
4910) -> Result<CoercibleScalarExpr, PlanError> {
4911    // Plan each element expression.
4912    let mut out = vec![];
4913    for expr in exprs {
4914        out.push(match expr {
4915            // Special case nested ARRAY expressions so we can plumb
4916            // the type hint through.
4917            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4918            _ => plan_expr(ecx, expr)?,
4919        });
4920    }
4921
4922    // Attempt to make use of the type hint.
4923    let type_hint = match type_hint {
4924        // The user has provided an explicit cast to an array type. We know the
4925        // element type to coerce to. Need to be careful, though: if there's
4926        // evidence that any of the array elements are themselves arrays, we
4927        // want to coerce to the array type, not the element type.
4928        Some(SqlScalarType::Array(elem_type)) => {
4929            let multidimensional = out.iter().any(|e| {
4930                matches!(
4931                    ecx.scalar_type(e),
4932                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4933                )
4934            });
4935            if multidimensional {
4936                type_hint
4937            } else {
4938                Some(&**elem_type)
4939            }
4940        }
4941        // The user provided an explicit cast to a non-array type. We'll have to
4942        // guess what the correct type for the array. Our caller will then
4943        // handle converting that array type to the desired non-array type.
4944        Some(_) => None,
4945        // No type hint. We'll have to guess the correct type for the array.
4946        None => None,
4947    };
4948
4949    // Coerce all elements to the same type.
4950    let (elem_type, exprs) = if exprs.is_empty() {
4951        if let Some(elem_type) = type_hint {
4952            (elem_type.clone(), vec![])
4953        } else {
4954            sql_bail!("cannot determine type of empty array");
4955        }
4956    } else {
4957        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4958        (ecx.scalar_type(&out[0]), out)
4959    };
4960
4961    // Arrays of `char` type are disallowed due to a known limitation:
4962    // https://github.com/MaterializeInc/database-issues/issues/2360.
4963    //
4964    // Arrays of `list` and `map` types are disallowed due to mind-bending
4965    // semantics.
4966    if matches!(
4967        elem_type,
4968        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4969    ) {
4970        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4971    }
4972
4973    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4974}
4975
4976fn plan_list(
4977    ecx: &ExprContext,
4978    exprs: &[Expr<Aug>],
4979    type_hint: Option<&SqlScalarType>,
4980) -> Result<CoercibleScalarExpr, PlanError> {
4981    let (elem_type, exprs) = if exprs.is_empty() {
4982        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4983            (element_type.without_modifiers(), vec![])
4984        } else {
4985            sql_bail!("cannot determine type of empty list");
4986        }
4987    } else {
4988        let type_hint = match type_hint {
4989            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4990            _ => None,
4991        };
4992
4993        let mut out = vec![];
4994        for expr in exprs {
4995            out.push(match expr {
4996                // Special case nested LIST expressions so we can plumb
4997                // the type hint through.
4998                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4999                _ => plan_expr(ecx, expr)?,
5000            });
5001        }
5002        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5003        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5004    };
5005
5006    if matches!(elem_type, SqlScalarType::Char { .. }) {
5007        bail_unsupported!("char list");
5008    }
5009
5010    Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
5011}
5012
5013fn plan_map(
5014    ecx: &ExprContext,
5015    entries: &[MapEntry<Aug>],
5016    type_hint: Option<&SqlScalarType>,
5017) -> Result<CoercibleScalarExpr, PlanError> {
5018    let (value_type, exprs) = if entries.is_empty() {
5019        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5020            (value_type.without_modifiers(), vec![])
5021        } else {
5022            sql_bail!("cannot determine type of empty map");
5023        }
5024    } else {
5025        let type_hint = match type_hint {
5026            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5027            _ => None,
5028        };
5029
5030        let mut keys = vec![];
5031        let mut values = vec![];
5032        for MapEntry { key, value } in entries {
5033            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5034            let value = match value {
5035                // Special case nested MAP expressions so we can plumb
5036                // the type hint through.
5037                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5038                _ => plan_expr(ecx, value)?,
5039            };
5040            keys.push(key);
5041            values.push(value);
5042        }
5043        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5044        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5045        let out = itertools::interleave(keys, values).collect();
5046        (value_type, out)
5047    };
5048
5049    if matches!(value_type, SqlScalarType::Char { .. }) {
5050        bail_unsupported!("char map");
5051    }
5052
5053    let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5054    Ok(expr.into())
5055}
5056
5057/// Coerces a list of expressions such that all input expressions will be cast
5058/// to the same type. If successful, returns a new list of expressions in the
5059/// same order as the input, where each expression has the appropriate casts to
5060/// make them all of a uniform type.
5061///
5062/// If `force_type` is `Some`, the expressions are forced to the specified type
5063/// via an explicit cast. Otherwise the best common type is guessed via
5064/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5065/// implicit casts
5066///
5067/// Note that this is our implementation of Postgres' type conversion for
5068/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5069/// isn't yet used in all of those cases.
5070///
5071/// [union-type-conv]:
5072/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5073pub fn coerce_homogeneous_exprs(
5074    ecx: &ExprContext,
5075    exprs: Vec<CoercibleScalarExpr>,
5076    force_type: Option<&SqlScalarType>,
5077) -> Result<Vec<HirScalarExpr>, PlanError> {
5078    assert!(!exprs.is_empty());
5079
5080    let target_holder;
5081    let target = match force_type {
5082        Some(t) => t,
5083        None => {
5084            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5085            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5086            &target_holder
5087        }
5088    };
5089
5090    // Try to cast all expressions to `target`.
5091    let mut out = Vec::new();
5092    for expr in exprs {
5093        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5094        let ccx = match force_type {
5095            None => CastContext::Implicit,
5096            Some(_) => CastContext::Explicit,
5097        };
5098        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5099            Ok(expr) => out.push(expr),
5100            Err(_) => sql_bail!(
5101                "{} could not convert type {} to {}",
5102                ecx.name,
5103                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5104                ecx.humanize_scalar_type(target, false),
5105            ),
5106        }
5107    }
5108    Ok(out)
5109}
5110
5111/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5112/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5113pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5114    obe: &OrderByExpr<T>,
5115    column: usize,
5116) -> ColumnOrder {
5117    let desc = !obe.asc.unwrap_or(true);
5118    ColumnOrder {
5119        column,
5120        desc,
5121        // https://www.postgresql.org/docs/14/queries-order.html
5122        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5123        nulls_last: obe.nulls_last.unwrap_or(!desc),
5124    }
5125}
5126
5127/// Plans the ORDER BY clause of a window function.
5128///
5129/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5130/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5131/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5132/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5133/// `Vec<HirScalarExpr>` that we return.
5134fn plan_function_order_by(
5135    ecx: &ExprContext,
5136    order_by: &[OrderByExpr<Aug>],
5137) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5138    let mut order_by_exprs = vec![];
5139    let mut col_orders = vec![];
5140    {
5141        for (i, obe) in order_by.iter().enumerate() {
5142            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5143            // do not support ordinal references in PostgreSQL. So we use
5144            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5145            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5146            order_by_exprs.push(expr);
5147            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5148        }
5149    }
5150    Ok((order_by_exprs, col_orders))
5151}
5152
5153/// Common part of the planning of windowed and non-windowed aggregation functions.
5154fn plan_aggregate_common(
5155    ecx: &ExprContext,
5156    Function::<Aug> {
5157        name,
5158        args,
5159        filter,
5160        over: _,
5161        distinct,
5162    }: &Function<Aug>,
5163) -> Result<AggregateExpr, PlanError> {
5164    // Normal aggregate functions, like `sum`, expect as input a single expression
5165    // which yields the datum to aggregate. Order sensitive aggregate functions,
5166    // like `jsonb_agg`, are special, and instead expect a Record whose first
5167    // element yields the datum to aggregate and whose successive elements yield
5168    // keys to order by. This expectation is hard coded within the implementation
5169    // of each of the order-sensitive aggregates. The specification of how many
5170    // order by keys to consider, and in what order, is passed via the `order_by`
5171    // field on the `AggregateFunc` variant.
5172
5173    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5174    // most, so explicitly drop it if the function doesn't care about order. This
5175    // prevents the projection into Record below from triggering on unsupported
5176    // functions.
5177
5178    let impls = match resolve_func(ecx, name, args)? {
5179        Func::Aggregate(impls) => impls,
5180        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5181    };
5182
5183    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5184    // generalized function selection framework. The rule is simple: the user
5185    // must type `count(*)`, but the function selection framework sees an empty
5186    // parameter list, as if the user had typed `count()`. But if the user types
5187    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5188    // rules to all aggregates, not just `count`, since we may one day support
5189    // user-defined aggregates, including user-defined aggregates that take no
5190    // parameters.
5191    let (args, order_by) = match &args {
5192        FunctionArgs::Star => (vec![], vec![]),
5193        FunctionArgs::Args { args, order_by } => {
5194            if args.is_empty() {
5195                sql_bail!(
5196                    "{}(*) must be used to call a parameterless aggregate function",
5197                    ecx.qcx
5198                        .scx
5199                        .humanize_resolved_name(name)
5200                        .expect("name actually resolved")
5201                );
5202            }
5203            let args = plan_exprs(ecx, args)?;
5204            (args, order_by.clone())
5205        }
5206    };
5207
5208    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5209
5210    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5211    if let Some(filter) = &filter {
5212        // If a filter is present, as in
5213        //
5214        //     <agg>(<expr>) FILTER (WHERE <cond>)
5215        //
5216        // we plan it by essentially rewriting the expression to
5217        //
5218        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5219        //
5220        // where <identity> is the identity input for <agg>.
5221        let cond =
5222            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5223        let expr_typ = ecx.scalar_type(&expr);
5224        expr = HirScalarExpr::if_then_else(
5225            cond,
5226            expr,
5227            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5228        );
5229    }
5230
5231    let mut seen_outer = false;
5232    let mut seen_inner = false;
5233    #[allow(deprecated)]
5234    expr.visit_columns(0, &mut |depth, col| {
5235        if depth == 0 && col.level == 0 {
5236            seen_inner = true;
5237        } else if col.level > depth {
5238            seen_outer = true;
5239        }
5240    });
5241    if seen_outer && !seen_inner {
5242        bail_unsupported!(
5243            3720,
5244            "aggregate functions that refer exclusively to outer columns"
5245        );
5246    }
5247
5248    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5249    // map the needed expressions into the aggregate datum.
5250    if func.is_order_sensitive() {
5251        let field_names = iter::repeat(ColumnName::from(""))
5252            .take(1 + order_by_exprs.len())
5253            .collect();
5254        let mut exprs = vec![expr];
5255        exprs.extend(order_by_exprs);
5256        expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5257    }
5258
5259    Ok(AggregateExpr {
5260        func,
5261        expr: Box::new(expr),
5262        distinct: *distinct,
5263    })
5264}
5265
5266fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5267    let mut names = names.to_vec();
5268    let col_name = normalize::column_name(names.pop().unwrap());
5269
5270    // If the name is qualified, it must refer to a column in a table.
5271    if !names.is_empty() {
5272        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5273        let (i, i_name) = ecx.scope.resolve_table_column(
5274            &ecx.qcx.outer_scopes,
5275            &table_name,
5276            &col_name,
5277            &mut ecx.qcx.name_manager.borrow_mut(),
5278        )?;
5279        return Ok(HirScalarExpr::named_column(i, i_name));
5280    }
5281
5282    // If the name is unqualified, first check if it refers to a column. Track any similar names
5283    // that might exist for a better error message.
5284    let similar_names = match ecx.scope.resolve_column(
5285        &ecx.qcx.outer_scopes,
5286        &col_name,
5287        &mut ecx.qcx.name_manager.borrow_mut(),
5288    ) {
5289        Ok((i, i_name)) => {
5290            return Ok(HirScalarExpr::named_column(i, i_name));
5291        }
5292        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5293        Err(e) => return Err(e),
5294    };
5295
5296    // The name doesn't refer to a column. Check if it is a whole-row reference
5297    // to a table.
5298    let items = ecx.scope.items_from_table(
5299        &ecx.qcx.outer_scopes,
5300        &PartialItemName {
5301            database: None,
5302            schema: None,
5303            item: col_name.as_str().to_owned(),
5304        },
5305    )?;
5306    match items.as_slice() {
5307        // The name doesn't refer to a table either. Return an error.
5308        [] => Err(PlanError::UnknownColumn {
5309            table: None,
5310            column: col_name,
5311            similar: similar_names,
5312        }),
5313        // The name refers to a table that is the result of a function that
5314        // returned a single column. Per PostgreSQL, this is a special case
5315        // that returns the value directly.
5316        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5317        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5318            *column,
5319            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5320        )),
5321        // The name refers to a normal table. Return a record containing all the
5322        // columns of the table.
5323        _ => {
5324            let mut has_exists_column = None;
5325            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5326                .into_iter()
5327                .filter_map(|(column, item)| {
5328                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5329                        has_exists_column = Some(column);
5330                        None
5331                    } else {
5332                        let expr = HirScalarExpr::named_column(
5333                            column,
5334                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5335                        );
5336                        let name = item.column_name.clone();
5337                        Some((expr, name))
5338                    }
5339                })
5340                .unzip();
5341            // For the special case of a table function with a single column, the single column is instead not wrapped.
5342            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5343                exprs.into_element()
5344            } else {
5345                HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5346            };
5347            if let Some(has_exists_column) = has_exists_column {
5348                Ok(HirScalarExpr::if_then_else(
5349                    HirScalarExpr::unnamed_column(has_exists_column)
5350                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5351                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5352                    expr,
5353                ))
5354            } else {
5355                Ok(expr)
5356            }
5357        }
5358    }
5359}
5360
5361fn plan_op(
5362    ecx: &ExprContext,
5363    op: &str,
5364    expr1: &Expr<Aug>,
5365    expr2: Option<&Expr<Aug>>,
5366) -> Result<HirScalarExpr, PlanError> {
5367    let impls = func::resolve_op(op)?;
5368    let args = match expr2 {
5369        None => plan_exprs(ecx, &[expr1])?,
5370        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5371    };
5372    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5373}
5374
5375fn plan_function<'a>(
5376    ecx: &ExprContext,
5377    f @ Function {
5378        name,
5379        args,
5380        filter,
5381        over,
5382        distinct,
5383    }: &'a Function<Aug>,
5384) -> Result<HirScalarExpr, PlanError> {
5385    let impls = match resolve_func(ecx, name, args)? {
5386        Func::Table(_) => {
5387            sql_bail!(
5388                "table functions are not allowed in {} (function {})",
5389                ecx.name,
5390                name
5391            );
5392        }
5393        Func::Scalar(impls) => {
5394            if over.is_some() {
5395                sql_bail!(
5396                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5397                );
5398            }
5399            impls
5400        }
5401        Func::ScalarWindow(impls) => {
5402            let (
5403                ignore_nulls,
5404                order_by_exprs,
5405                col_orders,
5406                _window_frame,
5407                partition_by,
5408                scalar_args,
5409            ) = plan_window_function_non_aggr(ecx, f)?;
5410
5411            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5412            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5413            // msg there is less informative.)
5414            if !scalar_args.is_empty() {
5415                if let ResolvedItemName::Item {
5416                    full_name: FullItemName { item, .. },
5417                    ..
5418                } = name
5419                {
5420                    sql_bail!(
5421                        "function {} has 0 parameters, but was called with {}",
5422                        item,
5423                        scalar_args.len()
5424                    );
5425                }
5426            }
5427
5428            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5429            // accept a window frame here without an error msg. (Postgres also does this.)
5430            // TODO: maybe we should give a notice
5431
5432            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5433
5434            if ignore_nulls {
5435                // If we ever add a scalar window function that supports ignore, then don't forget
5436                // to also update HIR EXPLAIN.
5437                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5438            }
5439
5440            return Ok(HirScalarExpr::windowing(WindowExpr {
5441                func: WindowExprType::Scalar(ScalarWindowExpr {
5442                    func,
5443                    order_by: col_orders,
5444                }),
5445                partition_by,
5446                order_by: order_by_exprs,
5447            }));
5448        }
5449        Func::ValueWindow(impls) => {
5450            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5451            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5452                window_plan;
5453
5454            let (args_encoded, func) =
5455                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5456
5457            if ignore_nulls {
5458                match func {
5459                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5460                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5461                }
5462            }
5463
5464            return Ok(HirScalarExpr::windowing(WindowExpr {
5465                func: WindowExprType::Value(ValueWindowExpr {
5466                    func,
5467                    args: Box::new(args_encoded),
5468                    order_by: col_orders,
5469                    window_frame,
5470                    ignore_nulls, // (RESPECT NULLS is the default)
5471                }),
5472                partition_by,
5473                order_by: order_by_exprs,
5474            }));
5475        }
5476        Func::Aggregate(_) => {
5477            if f.over.is_none() {
5478                // Not a window aggregate. Something is wrong.
5479                if ecx.allow_aggregates {
5480                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5481                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5482                    sql_bail!(
5483                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5484                        name,
5485                    );
5486                } else {
5487                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5488                    // because it was in an unsupported context.
5489                    sql_bail!(
5490                        "aggregate functions are not allowed in {} (function {})",
5491                        ecx.name,
5492                        name
5493                    );
5494                }
5495            } else {
5496                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5497                    plan_window_function_common(ecx, &f.name, &f.over)?;
5498
5499                // https://github.com/MaterializeInc/database-issues/issues/6720
5500                match (&window_frame.start_bound, &window_frame.end_bound) {
5501                    (
5502                        mz_expr::WindowFrameBound::UnboundedPreceding,
5503                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5504                    )
5505                    | (
5506                        mz_expr::WindowFrameBound::UnboundedPreceding,
5507                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5508                    )
5509                    | (
5510                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5511                        mz_expr::WindowFrameBound::UnboundedFollowing,
5512                    )
5513                    | (
5514                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5515                        mz_expr::WindowFrameBound::UnboundedFollowing,
5516                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5517                    (_, _) => {} // other cases are ok
5518                }
5519
5520                if ignore_nulls {
5521                    // https://github.com/MaterializeInc/database-issues/issues/6722
5522                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5523                    // forget to also update HIR EXPLAIN.
5524                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5525                }
5526
5527                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5528
5529                if aggregate_expr.distinct {
5530                    // https://github.com/MaterializeInc/database-issues/issues/6626
5531                    bail_unsupported!("DISTINCT in window aggregates");
5532                }
5533
5534                return Ok(HirScalarExpr::windowing(WindowExpr {
5535                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5536                        aggregate_expr,
5537                        order_by: col_orders,
5538                        window_frame,
5539                    }),
5540                    partition_by,
5541                    order_by: order_by_exprs,
5542                }));
5543            }
5544        }
5545    };
5546
5547    if over.is_some() {
5548        unreachable!("If there is an OVER clause, we should have returned already above.");
5549    }
5550
5551    if *distinct {
5552        sql_bail!(
5553            "DISTINCT specified, but {} is not an aggregate function",
5554            ecx.qcx
5555                .scx
5556                .humanize_resolved_name(name)
5557                .expect("already resolved")
5558        );
5559    }
5560    if filter.is_some() {
5561        sql_bail!(
5562            "FILTER specified, but {} is not an aggregate function",
5563            ecx.qcx
5564                .scx
5565                .humanize_resolved_name(name)
5566                .expect("already resolved")
5567        );
5568    }
5569
5570    let scalar_args = match &args {
5571        FunctionArgs::Star => {
5572            sql_bail!(
5573                "* argument is invalid with non-aggregate function {}",
5574                ecx.qcx
5575                    .scx
5576                    .humanize_resolved_name(name)
5577                    .expect("already resolved")
5578            )
5579        }
5580        FunctionArgs::Args { args, order_by } => {
5581            if !order_by.is_empty() {
5582                sql_bail!(
5583                    "ORDER BY specified, but {} is not an aggregate function",
5584                    ecx.qcx
5585                        .scx
5586                        .humanize_resolved_name(name)
5587                        .expect("already resolved")
5588                );
5589            }
5590            plan_exprs(ecx, args)?
5591        }
5592    };
5593
5594    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5595}
5596
5597pub const IGNORE_NULLS_ERROR_MSG: &str =
5598    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5599
5600/// Resolves the name to a set of function implementations.
5601///
5602/// If the name does not specify a known built-in function, returns an error.
5603pub fn resolve_func(
5604    ecx: &ExprContext,
5605    name: &ResolvedItemName,
5606    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5607) -> Result<&'static Func, PlanError> {
5608    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5609        if let Ok(f) = i.func() {
5610            return Ok(f);
5611        }
5612    }
5613
5614    // Couldn't resolve function with this name, so generate verbose error
5615    // message.
5616    let cexprs = match args {
5617        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5618        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5619            if !order_by.is_empty() {
5620                sql_bail!(
5621                    "ORDER BY specified, but {} is not an aggregate function",
5622                    name
5623                );
5624            }
5625            plan_exprs(ecx, args)?
5626        }
5627    };
5628
5629    let arg_types: Vec<_> = cexprs
5630        .into_iter()
5631        .map(|ty| match ecx.scalar_type(&ty) {
5632            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5633            CoercibleScalarType::Record(_) => "record".to_string(),
5634            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5635        })
5636        .collect();
5637
5638    Err(PlanError::UnknownFunction {
5639        name: name.to_string(),
5640        arg_types,
5641    })
5642}
5643
5644fn plan_is_expr<'a>(
5645    ecx: &ExprContext,
5646    expr: &'a Expr<Aug>,
5647    construct: &IsExprConstruct<Aug>,
5648    not: bool,
5649) -> Result<HirScalarExpr, PlanError> {
5650    let expr_hir = plan_expr(ecx, expr)?;
5651
5652    let mut result = match construct {
5653        IsExprConstruct::Null => {
5654            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5655            // at odds with our type coercion rules, which treat `NULL` literals
5656            // and unconstrained parameters identically. Providing a type hint
5657            // of string means we wind up supporting both.
5658            expr_hir.type_as_any(ecx)?.call_is_null()
5659        }
5660        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5661        IsExprConstruct::True => expr_hir
5662            .type_as(ecx, &SqlScalarType::Bool)?
5663            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5664        IsExprConstruct::False => expr_hir
5665            .type_as(ecx, &SqlScalarType::Bool)?
5666            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5667        IsExprConstruct::DistinctFrom(expr2) => {
5668            // There are three cases:
5669            // 1. Both terms are non-null, in which case the result should be `a != b`.
5670            // 2. Exactly one term is null, in which case the result should be true.
5671            // 3. Both terms are null, in which case the result should be false.
5672            //
5673            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5674
5675            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5676            // construct an AST expression for `expr != expr2` and plan it to get proper type
5677            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5678            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5679            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5680
5681            let expr1_hir = expr_hir.type_as_any(ecx)?;
5682            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5683
5684            let term1 = HirScalarExpr::variadic_or(vec![
5685                ne_hir,
5686                expr1_hir.clone().call_is_null(),
5687                expr2_hir.clone().call_is_null(),
5688            ]);
5689            let term2 = HirScalarExpr::variadic_or(vec![
5690                expr1_hir.call_is_null().not(),
5691                expr2_hir.call_is_null().not(),
5692            ]);
5693            term1.and(term2)
5694        }
5695    };
5696    if not {
5697        result = result.not();
5698    }
5699    Ok(result)
5700}
5701
5702fn plan_case<'a>(
5703    ecx: &ExprContext,
5704    operand: &'a Option<Box<Expr<Aug>>>,
5705    conditions: &'a [Expr<Aug>],
5706    results: &'a [Expr<Aug>],
5707    else_result: &'a Option<Box<Expr<Aug>>>,
5708) -> Result<HirScalarExpr, PlanError> {
5709    let mut cond_exprs = Vec::new();
5710    let mut result_exprs = Vec::new();
5711    for (c, r) in conditions.iter().zip_eq(results) {
5712        let c = match operand {
5713            Some(operand) => operand.clone().equals(c.clone()),
5714            None => c.clone(),
5715        };
5716        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5717        cond_exprs.push(cexpr);
5718        result_exprs.push(r);
5719    }
5720    result_exprs.push(match else_result {
5721        Some(else_result) => else_result,
5722        None => &Expr::Value(Value::Null),
5723    });
5724    let mut result_exprs = coerce_homogeneous_exprs(
5725        &ecx.with_name("CASE"),
5726        plan_exprs(ecx, &result_exprs)?,
5727        None,
5728    )?;
5729    let mut expr = result_exprs.pop().unwrap();
5730    assert_eq!(cond_exprs.len(), result_exprs.len());
5731    for (cexpr, rexpr) in cond_exprs
5732        .into_iter()
5733        .rev()
5734        .zip_eq(result_exprs.into_iter().rev())
5735    {
5736        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5737    }
5738    Ok(expr)
5739}
5740
5741fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5742    let (datum, scalar_type) = match l {
5743        Value::Number(s) => {
5744            let d = strconv::parse_numeric(s.as_str())?;
5745            if !s.contains(&['E', '.'][..]) {
5746                // Maybe representable as an int?
5747                if let Ok(n) = d.0.try_into() {
5748                    (Datum::Int32(n), SqlScalarType::Int32)
5749                } else if let Ok(n) = d.0.try_into() {
5750                    (Datum::Int64(n), SqlScalarType::Int64)
5751                } else {
5752                    (
5753                        Datum::Numeric(d),
5754                        SqlScalarType::Numeric { max_scale: None },
5755                    )
5756                }
5757            } else {
5758                (
5759                    Datum::Numeric(d),
5760                    SqlScalarType::Numeric { max_scale: None },
5761                )
5762            }
5763        }
5764        Value::HexString(_) => bail_unsupported!("hex string literals"),
5765        Value::Boolean(b) => match b {
5766            false => (Datum::False, SqlScalarType::Bool),
5767            true => (Datum::True, SqlScalarType::Bool),
5768        },
5769        Value::Interval(i) => {
5770            let i = literal::plan_interval(i)?;
5771            (Datum::Interval(i), SqlScalarType::Interval)
5772        }
5773        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5774        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5775    };
5776    let expr = HirScalarExpr::literal(datum, scalar_type);
5777    Ok(expr.into())
5778}
5779
5780/// The common part of the planning of non-aggregate window functions, i.e.,
5781/// scalar window functions and value window functions.
5782fn plan_window_function_non_aggr<'a>(
5783    ecx: &ExprContext,
5784    Function {
5785        name,
5786        args,
5787        filter,
5788        over,
5789        distinct,
5790    }: &'a Function<Aug>,
5791) -> Result<
5792    (
5793        bool,
5794        Vec<HirScalarExpr>,
5795        Vec<ColumnOrder>,
5796        mz_expr::WindowFrame,
5797        Vec<HirScalarExpr>,
5798        Vec<CoercibleScalarExpr>,
5799    ),
5800    PlanError,
5801> {
5802    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5803        plan_window_function_common(ecx, name, over)?;
5804
5805    if *distinct {
5806        sql_bail!(
5807            "DISTINCT specified, but {} is not an aggregate function",
5808            name
5809        );
5810    }
5811
5812    if filter.is_some() {
5813        bail_unsupported!("FILTER in non-aggregate window functions");
5814    }
5815
5816    let scalar_args = match &args {
5817        FunctionArgs::Star => {
5818            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5819        }
5820        FunctionArgs::Args { args, order_by } => {
5821            if !order_by.is_empty() {
5822                sql_bail!(
5823                    "ORDER BY specified, but {} is not an aggregate function",
5824                    name
5825                );
5826            }
5827            plan_exprs(ecx, args)?
5828        }
5829    };
5830
5831    Ok((
5832        ignore_nulls,
5833        order_by_exprs,
5834        col_orders,
5835        window_frame,
5836        partition,
5837        scalar_args,
5838    ))
5839}
5840
5841/// The common part of the planning of all window functions.
5842fn plan_window_function_common(
5843    ecx: &ExprContext,
5844    name: &<Aug as AstInfo>::ItemName,
5845    over: &Option<WindowSpec<Aug>>,
5846) -> Result<
5847    (
5848        bool,
5849        Vec<HirScalarExpr>,
5850        Vec<ColumnOrder>,
5851        mz_expr::WindowFrame,
5852        Vec<HirScalarExpr>,
5853    ),
5854    PlanError,
5855> {
5856    if !ecx.allow_windows {
5857        sql_bail!(
5858            "window functions are not allowed in {} (function {})",
5859            ecx.name,
5860            name
5861        );
5862    }
5863
5864    let window_spec = match over.as_ref() {
5865        Some(over) => over,
5866        None => sql_bail!("window function {} requires an OVER clause", name),
5867    };
5868    if window_spec.ignore_nulls && window_spec.respect_nulls {
5869        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5870    }
5871    let window_frame = match window_spec.window_frame.as_ref() {
5872        Some(frame) => plan_window_frame(frame)?,
5873        None => mz_expr::WindowFrame::default(),
5874    };
5875    let mut partition = Vec::new();
5876    for expr in &window_spec.partition_by {
5877        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5878    }
5879
5880    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5881
5882    Ok((
5883        window_spec.ignore_nulls,
5884        order_by_exprs,
5885        col_orders,
5886        window_frame,
5887        partition,
5888    ))
5889}
5890
5891fn plan_window_frame(
5892    WindowFrame {
5893        units,
5894        start_bound,
5895        end_bound,
5896    }: &WindowFrame,
5897) -> Result<mz_expr::WindowFrame, PlanError> {
5898    use mz_expr::WindowFrameBound::*;
5899    let units = window_frame_unit_ast_to_expr(units)?;
5900    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5901    let end_bound = end_bound
5902        .as_ref()
5903        .map(window_frame_bound_ast_to_expr)
5904        .unwrap_or(CurrentRow);
5905
5906    // Validate bounds according to Postgres rules
5907    match (&start_bound, &end_bound) {
5908        // Start bound can't be UNBOUNDED FOLLOWING
5909        (UnboundedFollowing, _) => {
5910            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5911        }
5912        // End bound can't be UNBOUNDED PRECEDING
5913        (_, UnboundedPreceding) => {
5914            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5915        }
5916        // Start bound should come before end bound in the list of bound definitions
5917        (CurrentRow, OffsetPreceding(_)) => {
5918            sql_bail!("frame starting from current row cannot have preceding rows")
5919        }
5920        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5921            sql_bail!("frame starting from following row cannot have preceding rows")
5922        }
5923        // The above rules are adopted from Postgres.
5924        // The following rules are Materialize-specific.
5925        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5926            // Note that the only hard limit is that partition size + offset should fit in i64, so
5927            // in theory, we could support much larger offsets than this. But for our current
5928            // performance, even 1000000 is quite big.
5929            if *o1 > 1000000 || *o2 > 1000000 {
5930                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5931            }
5932        }
5933        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5934            if *o1 > 1000000 || *o2 > 1000000 {
5935                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5936            }
5937        }
5938        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5939            if *o1 > 1000000 || *o2 > 1000000 {
5940                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5941            }
5942        }
5943        (OffsetPreceding(o), CurrentRow) => {
5944            if *o > 1000000 {
5945                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5946            }
5947        }
5948        (CurrentRow, OffsetFollowing(o)) => {
5949            if *o > 1000000 {
5950                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5951            }
5952        }
5953        // Other bounds are valid
5954        (_, _) => (),
5955    }
5956
5957    // RANGE is only supported in the default frame
5958    // https://github.com/MaterializeInc/database-issues/issues/6585
5959    if units == mz_expr::WindowFrameUnits::Range
5960        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5961    {
5962        bail_unsupported!("RANGE in non-default window frames")
5963    }
5964
5965    let frame = mz_expr::WindowFrame {
5966        units,
5967        start_bound,
5968        end_bound,
5969    };
5970    Ok(frame)
5971}
5972
5973fn window_frame_unit_ast_to_expr(
5974    unit: &WindowFrameUnits,
5975) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5976    match unit {
5977        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5978        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5979        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5980    }
5981}
5982
5983fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5984    match bound {
5985        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5986        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5987        WindowFrameBound::Preceding(Some(offset)) => {
5988            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5989        }
5990        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5991        WindowFrameBound::Following(Some(offset)) => {
5992            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5993        }
5994    }
5995}
5996
5997pub fn scalar_type_from_sql(
5998    scx: &StatementContext,
5999    data_type: &ResolvedDataType,
6000) -> Result<SqlScalarType, PlanError> {
6001    match data_type {
6002        ResolvedDataType::AnonymousList(elem_type) => {
6003            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6004            if matches!(elem_type, SqlScalarType::Char { .. }) {
6005                bail_unsupported!("char list");
6006            }
6007            Ok(SqlScalarType::List {
6008                element_type: Box::new(elem_type),
6009                custom_id: None,
6010            })
6011        }
6012        ResolvedDataType::AnonymousMap {
6013            key_type,
6014            value_type,
6015        } => {
6016            match scalar_type_from_sql(scx, key_type)? {
6017                SqlScalarType::String => {}
6018                other => sql_bail!(
6019                    "map key type must be {}, got {}",
6020                    scx.humanize_scalar_type(&SqlScalarType::String, false),
6021                    scx.humanize_scalar_type(&other, false)
6022                ),
6023            }
6024            Ok(SqlScalarType::Map {
6025                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6026                custom_id: None,
6027            })
6028        }
6029        ResolvedDataType::Named { id, modifiers, .. } => {
6030            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6031        }
6032        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6033    }
6034}
6035
6036pub fn scalar_type_from_catalog(
6037    catalog: &dyn SessionCatalog,
6038    id: CatalogItemId,
6039    modifiers: &[i64],
6040) -> Result<SqlScalarType, PlanError> {
6041    let entry = catalog.get_item(&id);
6042    let type_details = match entry.type_details() {
6043        Some(type_details) => type_details,
6044        None => {
6045            // Resolution should never produce a `ResolvedDataType::Named` with
6046            // an ID of a non-type, but we error gracefully just in case.
6047            sql_bail!(
6048                "internal error: {} does not refer to a type",
6049                catalog.resolve_full_name(entry.name()).to_string().quoted()
6050            );
6051        }
6052    };
6053    match &type_details.typ {
6054        CatalogType::Numeric => {
6055            let mut modifiers = modifiers.iter().fuse();
6056            let precision = match modifiers.next() {
6057                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6058                    sql_bail!(
6059                        "precision for type numeric must be between 1 and {}",
6060                        NUMERIC_DATUM_MAX_PRECISION,
6061                    );
6062                }
6063                Some(p) => Some(*p),
6064                None => None,
6065            };
6066            let scale = match modifiers.next() {
6067                Some(scale) => {
6068                    if let Some(precision) = precision {
6069                        if *scale > precision {
6070                            sql_bail!(
6071                                "scale for type numeric must be between 0 and precision {}",
6072                                precision
6073                            );
6074                        }
6075                    }
6076                    Some(NumericMaxScale::try_from(*scale)?)
6077                }
6078                None => None,
6079            };
6080            if modifiers.next().is_some() {
6081                sql_bail!("type numeric supports at most two type modifiers");
6082            }
6083            Ok(SqlScalarType::Numeric { max_scale: scale })
6084        }
6085        CatalogType::Char => {
6086            let mut modifiers = modifiers.iter().fuse();
6087            let length = match modifiers.next() {
6088                Some(l) => Some(CharLength::try_from(*l)?),
6089                None => Some(CharLength::ONE),
6090            };
6091            if modifiers.next().is_some() {
6092                sql_bail!("type character supports at most one type modifier");
6093            }
6094            Ok(SqlScalarType::Char { length })
6095        }
6096        CatalogType::VarChar => {
6097            let mut modifiers = modifiers.iter().fuse();
6098            let length = match modifiers.next() {
6099                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6100                None => None,
6101            };
6102            if modifiers.next().is_some() {
6103                sql_bail!("type character varying supports at most one type modifier");
6104            }
6105            Ok(SqlScalarType::VarChar { max_length: length })
6106        }
6107        CatalogType::Timestamp => {
6108            let mut modifiers = modifiers.iter().fuse();
6109            let precision = match modifiers.next() {
6110                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6111                None => None,
6112            };
6113            if modifiers.next().is_some() {
6114                sql_bail!("type timestamp supports at most one type modifier");
6115            }
6116            Ok(SqlScalarType::Timestamp { precision })
6117        }
6118        CatalogType::TimestampTz => {
6119            let mut modifiers = modifiers.iter().fuse();
6120            let precision = match modifiers.next() {
6121                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6122                None => None,
6123            };
6124            if modifiers.next().is_some() {
6125                sql_bail!("type timestamp with time zone supports at most one type modifier");
6126            }
6127            Ok(SqlScalarType::TimestampTz { precision })
6128        }
6129        t => {
6130            if !modifiers.is_empty() {
6131                sql_bail!(
6132                    "{} does not support type modifiers",
6133                    catalog.resolve_full_name(entry.name()).to_string()
6134                );
6135            }
6136            match t {
6137                CatalogType::Array {
6138                    element_reference: element_id,
6139                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6140                    catalog,
6141                    *element_id,
6142                    modifiers,
6143                )?))),
6144                CatalogType::List {
6145                    element_reference: element_id,
6146                    element_modifiers,
6147                } => Ok(SqlScalarType::List {
6148                    element_type: Box::new(scalar_type_from_catalog(
6149                        catalog,
6150                        *element_id,
6151                        element_modifiers,
6152                    )?),
6153                    custom_id: Some(id),
6154                }),
6155                CatalogType::Map {
6156                    key_reference: _,
6157                    key_modifiers: _,
6158                    value_reference: value_id,
6159                    value_modifiers,
6160                } => Ok(SqlScalarType::Map {
6161                    value_type: Box::new(scalar_type_from_catalog(
6162                        catalog,
6163                        *value_id,
6164                        value_modifiers,
6165                    )?),
6166                    custom_id: Some(id),
6167                }),
6168                CatalogType::Range {
6169                    element_reference: element_id,
6170                } => Ok(SqlScalarType::Range {
6171                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6172                }),
6173                CatalogType::Record { fields } => {
6174                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6175                        .iter()
6176                        .map(|f| {
6177                            let scalar_type = scalar_type_from_catalog(
6178                                catalog,
6179                                f.type_reference,
6180                                &f.type_modifiers,
6181                            )?;
6182                            Ok((
6183                                f.name.clone(),
6184                                SqlColumnType {
6185                                    scalar_type,
6186                                    nullable: true,
6187                                },
6188                            ))
6189                        })
6190                        .collect::<Result<Box<_>, PlanError>>()?;
6191                    Ok(SqlScalarType::Record {
6192                        fields: scalars,
6193                        custom_id: Some(id),
6194                    })
6195                }
6196                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6197                CatalogType::Bool => Ok(SqlScalarType::Bool),
6198                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6199                CatalogType::Date => Ok(SqlScalarType::Date),
6200                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6201                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6202                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6203                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6204                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6205                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6206                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6207                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6208                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6209                CatalogType::Interval => Ok(SqlScalarType::Interval),
6210                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6211                CatalogType::Oid => Ok(SqlScalarType::Oid),
6212                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6213                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6214                CatalogType::Pseudo => {
6215                    sql_bail!(
6216                        "cannot reference pseudo type {}",
6217                        catalog.resolve_full_name(entry.name()).to_string()
6218                    )
6219                }
6220                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6221                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6222                CatalogType::RegType => Ok(SqlScalarType::RegType),
6223                CatalogType::String => Ok(SqlScalarType::String),
6224                CatalogType::Time => Ok(SqlScalarType::Time),
6225                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6226                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6227                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6228                CatalogType::Numeric => unreachable!("handled above"),
6229                CatalogType::Char => unreachable!("handled above"),
6230                CatalogType::VarChar => unreachable!("handled above"),
6231                CatalogType::Timestamp => unreachable!("handled above"),
6232                CatalogType::TimestampTz => unreachable!("handled above"),
6233            }
6234        }
6235    }
6236}
6237
6238/// This is used to collect aggregates and table functions from within an `Expr`.
6239/// See the explanation of aggregate handling at the top of the file for more details.
6240struct AggregateTableFuncVisitor<'a> {
6241    scx: &'a StatementContext<'a>,
6242    aggs: Vec<Function<Aug>>,
6243    within_aggregate: bool,
6244    tables: BTreeMap<Function<Aug>, String>,
6245    table_disallowed_context: Vec<&'static str>,
6246    in_select_item: bool,
6247    id_gen: IdGen,
6248    err: Option<PlanError>,
6249}
6250
6251impl<'a> AggregateTableFuncVisitor<'a> {
6252    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6253        AggregateTableFuncVisitor {
6254            scx,
6255            aggs: Vec::new(),
6256            within_aggregate: false,
6257            tables: BTreeMap::new(),
6258            table_disallowed_context: Vec::new(),
6259            in_select_item: false,
6260            id_gen: Default::default(),
6261            err: None,
6262        }
6263    }
6264
6265    fn into_result(
6266        self,
6267    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6268        match self.err {
6269            Some(err) => Err(err),
6270            None => {
6271                // Dedup while preserving the order. We don't care what the order is, but it
6272                // has to be reproducible so that EXPLAIN PLAN tests work.
6273                let mut seen = BTreeSet::new();
6274                let aggs = self
6275                    .aggs
6276                    .into_iter()
6277                    .filter(move |agg| seen.insert(agg.clone()))
6278                    .collect();
6279                Ok((aggs, self.tables))
6280            }
6281        }
6282    }
6283}
6284
6285impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6286    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6287        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6288            Ok(i) => i,
6289            // Catching missing functions later in planning improves error messages.
6290            Err(_) => return,
6291        };
6292
6293        match item.func() {
6294            // We don't want to collect window aggregations, because these will be handled not by
6295            // plan_aggregate, but by plan_function.
6296            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6297                if self.within_aggregate {
6298                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6299                    return;
6300                }
6301                self.aggs.push(func.clone());
6302                let Function {
6303                    name: _,
6304                    args,
6305                    filter,
6306                    over: _,
6307                    distinct: _,
6308                } = func;
6309                if let Some(filter) = filter {
6310                    self.visit_expr_mut(filter);
6311                }
6312                let old_within_aggregate = self.within_aggregate;
6313                self.within_aggregate = true;
6314                self.table_disallowed_context
6315                    .push("aggregate function calls");
6316
6317                self.visit_function_args_mut(args);
6318
6319                self.within_aggregate = old_within_aggregate;
6320                self.table_disallowed_context.pop();
6321            }
6322            Ok(Func::Table { .. }) => {
6323                self.table_disallowed_context.push("other table functions");
6324                visit_mut::visit_function_mut(self, func);
6325                self.table_disallowed_context.pop();
6326            }
6327            _ => visit_mut::visit_function_mut(self, func),
6328        }
6329    }
6330
6331    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6332        // Don't go into subqueries.
6333    }
6334
6335    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6336        let (disallowed_context, func) = match expr {
6337            Expr::Case { .. } => (Some("CASE"), None),
6338            Expr::HomogenizingFunction {
6339                function: HomogenizingFunction::Coalesce,
6340                ..
6341            } => (Some("COALESCE"), None),
6342            Expr::Function(func) if self.in_select_item => {
6343                // If we're in a SELECT list, replace table functions with a uuid identifier
6344                // and save the table func so it can be planned elsewhere.
6345                let mut table_func = None;
6346                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6347                    if let Ok(Func::Table { .. }) = item.func() {
6348                        if let Some(context) = self.table_disallowed_context.last() {
6349                            self.err = Some(sql_err!(
6350                                "table functions are not allowed in {} (function {})",
6351                                context,
6352                                func.name
6353                            ));
6354                            return;
6355                        }
6356                        table_func = Some(func.clone());
6357                    }
6358                }
6359                // Since we will descend into the table func below, don't add its own disallow
6360                // context here, instead use visit_function to set that.
6361                (None, table_func)
6362            }
6363            _ => (None, None),
6364        };
6365        if let Some(func) = func {
6366            // Since we are trading out expr, we need to visit the table func here.
6367            visit_mut::visit_expr_mut(self, expr);
6368            // Don't attempt to replace table functions with unsupported syntax.
6369            if let Function {
6370                name: _,
6371                args: _,
6372                filter: None,
6373                over: None,
6374                distinct: false,
6375            } = &func
6376            {
6377                // Identical table functions can be de-duplicated.
6378                let unique_id = self.id_gen.allocate_id();
6379                let id = self
6380                    .tables
6381                    .entry(func)
6382                    .or_insert_with(|| format!("table_func_{unique_id}"));
6383                // We know this is okay because id is is 11 characters + <=20 characters, which is
6384                // less than our max length.
6385                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6386            }
6387        }
6388        if let Some(context) = disallowed_context {
6389            self.table_disallowed_context.push(context);
6390        }
6391
6392        visit_mut::visit_expr_mut(self, expr);
6393
6394        if disallowed_context.is_some() {
6395            self.table_disallowed_context.pop();
6396        }
6397    }
6398
6399    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6400        let old = self.in_select_item;
6401        self.in_select_item = true;
6402        visit_mut::visit_select_item_mut(self, si);
6403        self.in_select_item = old;
6404    }
6405}
6406
6407#[derive(Default)]
6408struct WindowFuncCollector {
6409    window_funcs: Vec<Expr<Aug>>,
6410}
6411
6412impl WindowFuncCollector {
6413    fn into_result(self) -> Vec<Expr<Aug>> {
6414        // Dedup while preserving the order.
6415        let mut seen = BTreeSet::new();
6416        let window_funcs_dedupped = self
6417            .window_funcs
6418            .into_iter()
6419            .filter(move |expr| seen.insert(expr.clone()))
6420            // Reverse the order, so that in case of a nested window function call, the
6421            // inner one is evaluated first.
6422            .rev()
6423            .collect();
6424        window_funcs_dedupped
6425    }
6426}
6427
6428impl Visit<'_, Aug> for WindowFuncCollector {
6429    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6430        match expr {
6431            Expr::Function(func) => {
6432                if func.over.is_some() {
6433                    self.window_funcs.push(expr.clone());
6434                }
6435            }
6436            _ => (),
6437        }
6438        visit::visit_expr(self, expr);
6439    }
6440
6441    fn visit_query(&mut self, _query: &Query<Aug>) {
6442        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6443    }
6444}
6445
6446/// Specifies how long a query will live.
6447#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6448pub enum QueryLifetime {
6449    /// The query's (or the expression's) result will be computed at one point in time.
6450    OneShot,
6451    /// The query (or expression) is used in a dataflow that maintains an index.
6452    Index,
6453    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6454    MaterializedView,
6455    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6456    Subscribe,
6457    /// The query (or expression) is part of a (non-materialized) view.
6458    View,
6459    /// The expression is part of a source definition.
6460    Source,
6461}
6462
6463impl QueryLifetime {
6464    /// (This used to impact whether the query is allowed to reason about the time at which it is
6465    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6466    /// mechanism, see `ExprPrepStyle`.)
6467    pub fn is_one_shot(&self) -> bool {
6468        let result = match self {
6469            QueryLifetime::OneShot => true,
6470            QueryLifetime::Index => false,
6471            QueryLifetime::MaterializedView => false,
6472            QueryLifetime::Subscribe => false,
6473            QueryLifetime::View => false,
6474            QueryLifetime::Source => false,
6475        };
6476        assert_eq!(!result, self.is_maintained());
6477        result
6478    }
6479
6480    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6481    /// turned into a `TopK`.
6482    pub fn is_maintained(&self) -> bool {
6483        match self {
6484            QueryLifetime::OneShot => false,
6485            QueryLifetime::Index => true,
6486            QueryLifetime::MaterializedView => true,
6487            QueryLifetime::Subscribe => true,
6488            QueryLifetime::View => true,
6489            QueryLifetime::Source => true,
6490        }
6491    }
6492
6493    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6494    pub fn allow_show(&self) -> bool {
6495        match self {
6496            QueryLifetime::OneShot => true,
6497            QueryLifetime::Index => false,
6498            QueryLifetime::MaterializedView => false,
6499            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6500            QueryLifetime::View => false,
6501            QueryLifetime::Source => false,
6502        }
6503    }
6504}
6505
6506/// Description of a CTE sufficient for query planning.
6507#[derive(Debug, Clone)]
6508pub struct CteDesc {
6509    pub name: String,
6510    pub desc: RelationDesc,
6511}
6512
6513/// The state required when planning a `Query`.
6514#[derive(Debug, Clone)]
6515pub struct QueryContext<'a> {
6516    /// The context for the containing `Statement`.
6517    pub scx: &'a StatementContext<'a>,
6518    /// The lifetime that the planned query will have.
6519    pub lifetime: QueryLifetime,
6520    /// The scopes of the outer relation expression.
6521    pub outer_scopes: Vec<Scope>,
6522    /// The type of the outer relation expressions.
6523    pub outer_relation_types: Vec<SqlRelationType>,
6524    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6525    pub ctes: BTreeMap<LocalId, CteDesc>,
6526    /// A name manager, for interning column names that will be stored in HIR and MIR.
6527    pub name_manager: Rc<RefCell<NameManager>>,
6528    pub recursion_guard: RecursionGuard,
6529}
6530
6531impl CheckedRecursion for QueryContext<'_> {
6532    fn recursion_guard(&self) -> &RecursionGuard {
6533        &self.recursion_guard
6534    }
6535}
6536
6537impl<'a> QueryContext<'a> {
6538    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6539        QueryContext {
6540            scx,
6541            lifetime,
6542            outer_scopes: vec![],
6543            outer_relation_types: vec![],
6544            ctes: BTreeMap::new(),
6545            name_manager: Rc::new(RefCell::new(NameManager::new())),
6546            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6547        }
6548    }
6549
6550    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6551        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6552    }
6553
6554    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6555    /// `self`.
6556    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6557        let ctes = self.ctes.clone();
6558        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6559        let outer_relation_types = iter::once(relation_type)
6560            .chain(self.outer_relation_types.clone())
6561            .collect();
6562        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6563        let name_manager = Rc::clone(&self.name_manager);
6564
6565        QueryContext {
6566            scx: self.scx,
6567            lifetime: self.lifetime,
6568            outer_scopes,
6569            outer_relation_types,
6570            ctes,
6571            name_manager,
6572            recursion_guard: self.recursion_guard.clone(),
6573        }
6574    }
6575
6576    /// Derives a `QueryContext` for a scope that contains no columns.
6577    fn empty_derived_context(&self) -> QueryContext<'a> {
6578        let scope = Scope::empty();
6579        let ty = SqlRelationType::empty();
6580        self.derived_context(scope, ty)
6581    }
6582
6583    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6584    /// CTE.
6585    pub fn resolve_table_name(
6586        &self,
6587        object: ResolvedItemName,
6588    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6589        match object {
6590            ResolvedItemName::Item {
6591                id,
6592                full_name,
6593                version,
6594                ..
6595            } => {
6596                let item = self.scx.get_item(&id).at_version(version);
6597                let desc = match item.relation_desc() {
6598                    Some(desc) => desc.clone(),
6599                    None => {
6600                        return Err(PlanError::InvalidDependency {
6601                            name: full_name.to_string(),
6602                            item_type: item.item_type().to_string(),
6603                        });
6604                    }
6605                };
6606                let expr = HirRelationExpr::Get {
6607                    id: Id::Global(item.global_id()),
6608                    typ: desc.typ().clone(),
6609                };
6610
6611                let name = full_name.into();
6612                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6613
6614                Ok((expr, scope))
6615            }
6616            ResolvedItemName::Cte { id, name } => {
6617                let name = name.into();
6618                let cte = self.ctes.get(&id).unwrap();
6619                let expr = HirRelationExpr::Get {
6620                    id: Id::Local(id),
6621                    typ: cte.desc.typ().clone(),
6622                };
6623
6624                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6625
6626                Ok((expr, scope))
6627            }
6628            ResolvedItemName::ContinualTask { id, name } => {
6629                let cte = self.ctes.get(&id).unwrap();
6630                let expr = HirRelationExpr::Get {
6631                    id: Id::Local(id),
6632                    typ: cte.desc.typ().clone(),
6633                };
6634
6635                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6636
6637                Ok((expr, scope))
6638            }
6639            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6640        }
6641    }
6642
6643    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6644    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6645    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6646        self.scx.humanize_scalar_type(typ, postgres_compat)
6647    }
6648}
6649
6650/// A bundle of unrelated things that we need for planning `Expr`s.
6651#[derive(Debug, Clone)]
6652pub struct ExprContext<'a> {
6653    pub qcx: &'a QueryContext<'a>,
6654    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6655    pub name: &'a str,
6656    /// The context for the `Query` that contains this `Expr`.
6657    /// The current scope.
6658    pub scope: &'a Scope,
6659    /// The type of the current relation expression upon which this scalar
6660    /// expression will be evaluated.
6661    pub relation_type: &'a SqlRelationType,
6662    /// Are aggregate functions allowed in this context
6663    pub allow_aggregates: bool,
6664    /// Are subqueries allowed in this context
6665    pub allow_subqueries: bool,
6666    /// Are parameters allowed in this context.
6667    pub allow_parameters: bool,
6668    /// Are window functions allowed in this context
6669    pub allow_windows: bool,
6670}
6671
6672impl CheckedRecursion for ExprContext<'_> {
6673    fn recursion_guard(&self) -> &RecursionGuard {
6674        &self.qcx.recursion_guard
6675    }
6676}
6677
6678impl<'a> ExprContext<'a> {
6679    pub fn catalog(&self) -> &dyn SessionCatalog {
6680        self.qcx.scx.catalog
6681    }
6682
6683    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6684        let mut ecx = self.clone();
6685        ecx.name = name;
6686        ecx
6687    }
6688
6689    pub fn column_type<E>(&self, expr: &E) -> E::Type
6690    where
6691        E: AbstractExpr,
6692    {
6693        expr.typ(
6694            &self.qcx.outer_relation_types,
6695            self.relation_type,
6696            &self.qcx.scx.param_types.borrow(),
6697        )
6698    }
6699
6700    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6701    where
6702        E: AbstractExpr,
6703    {
6704        self.column_type(expr).scalar_type()
6705    }
6706
6707    fn derived_query_context(&self) -> QueryContext<'_> {
6708        let mut scope = self.scope.clone();
6709        scope.lateral_barrier = true;
6710        self.qcx.derived_context(scope, self.relation_type.clone())
6711    }
6712
6713    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6714        self.qcx.scx.require_feature_flag(flag)
6715    }
6716
6717    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6718        &self.qcx.scx.param_types
6719    }
6720
6721    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6722    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6723    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6724        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6725    }
6726
6727    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6728        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6729    }
6730}
6731
6732/// Manages column names, doing lightweight string internment.
6733///
6734/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6735/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6736/// to ensure maximal sharing.
6737#[derive(Debug, Clone)]
6738pub struct NameManager(BTreeSet<Arc<str>>);
6739
6740impl NameManager {
6741    /// Creates a new `NameManager`, with no interned names
6742    pub fn new() -> Self {
6743        Self(BTreeSet::new())
6744    }
6745
6746    /// Interns a string, returning a reference-counted pointer to the interned
6747    /// string.
6748    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6749        let s = s.as_ref();
6750        if let Some(interned) = self.0.get(s) {
6751            Arc::clone(interned)
6752        } else {
6753            let interned: Arc<str> = Arc::from(s);
6754            self.0.insert(Arc::clone(&interned));
6755            interned
6756        }
6757    }
6758
6759    /// Interns a string representing a reference to a `ScopeItem`, returning a
6760    /// reference-counted pointer to the interned string.
6761    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6762        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6763        //
6764        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6765        // name information. Note that as of 2025-04-09, we don't support column
6766        // renames.
6767        //
6768        // A few bad alternatives:
6769        //
6770        // (1) Store it but don't write it down. This fails because the expression
6771        //     cache will erase our names on restart.
6772        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6773        //     get the right names. But the world now and the world when we made
6774        //     those objects may be different.
6775        // (3) Just don't write down the table name. Nothing fails... for now.
6776
6777        self.intern(item.column_name.as_str())
6778    }
6779}
6780
6781#[cfg(test)]
6782mod test {
6783    use super::*;
6784
6785    /// Ensure that `NameManager`'s string interning works as expected.
6786    ///
6787    /// In particular, structurally but not referentially identical strings should
6788    /// be interned to the same `Arc`ed pointer.
6789    #[mz_ore::test]
6790    pub fn test_name_manager_string_interning() {
6791        let mut nm = NameManager::new();
6792
6793        let orig_hi = "hi";
6794        let hi = nm.intern(orig_hi);
6795        let hello = nm.intern("hello");
6796
6797        assert_ne!(hi.as_ptr(), hello.as_ptr());
6798
6799        // this static string is _likely_ the same as `orig_hi``
6800        let hi2 = nm.intern("hi");
6801        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6802
6803        // generate a "hi" string that doesn't get optimized to the same static string
6804        let s = format!(
6805            "{}{}",
6806            hi.chars().nth(0).unwrap(),
6807            hi2.chars().nth(1).unwrap()
6808        );
6809        // make sure that we're testing with a fresh string!
6810        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6811
6812        let hi3 = nm.intern(s);
6813        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6814    }
6815}