mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::virtual_syntax::AlgExcept;
50use mz_expr::{
51    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, TableFunc,
52    func as expr_func,
53};
54use mz_ore::assert_none;
55use mz_ore::collections::CollectionExt;
56use mz_ore::error::ErrorExt;
57use mz_ore::id_gen::IdGen;
58use mz_ore::option::FallibleMapExt;
59use mz_ore::stack::{CheckedRecursion, RecursionGuard};
60use mz_ore::str::StrExt;
61use mz_repr::adt::char::CharLength;
62use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
63use mz_repr::adt::timestamp::TimestampPrecision;
64use mz_repr::adt::varchar::VarCharMaxLength;
65use mz_repr::{
66    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector, Row,
67    RowArena, SqlColumnType, SqlRelationType, SqlScalarType, UNKNOWN_COLUMN_NAME, strconv,
68};
69use mz_sql_parser::ast::display::AstDisplay;
70use mz_sql_parser::ast::visit::Visit;
71use mz_sql_parser::ast::visit_mut::{self, VisitMut};
72use mz_sql_parser::ast::{
73    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
74    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
75    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
76    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
77    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
78    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
79    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
80    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
81};
82use mz_sql_parser::ident;
83
84use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
85use crate::func::{self, Func, FuncSpec, TableFuncImpl};
86use crate::names::{
87    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
88};
89use crate::plan::PlanError::InvalidWmrRecursionLimit;
90use crate::plan::error::PlanError;
91use crate::plan::hir::{
92    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
93    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
94    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
95    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
96};
97use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
98use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
99use crate::plan::statement::{StatementContext, StatementDesc, show};
100use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
101use crate::plan::{
102    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
103    literal, transform_ast,
104};
105use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
106use crate::session::vars::{self, FeatureFlag};
107use crate::{ORDINALITY_COL_NAME, normalize};
108
109#[derive(Debug)]
110pub struct PlannedRootQuery<E> {
111    pub expr: E,
112    pub desc: RelationDesc,
113    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
114    pub scope: Scope,
115}
116
117/// Plans a top-level query, returning the `HirRelationExpr` describing the query
118/// plan, the `RelationDesc` describing the shape of the result set, a
119/// `RowSetFinishing` describing post-processing that must occur before results
120/// are sent to the client, and the types of the parameters in the query, if any
121/// were present.
122///
123/// Note that the returned `RelationDesc` describes the expression after
124/// applying the returned `RowSetFinishing`.
125#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
126pub fn plan_root_query(
127    scx: &StatementContext,
128    mut query: Query<Aug>,
129    lifetime: QueryLifetime,
130) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
131    transform_ast::transform(scx, &mut query)?;
132    let mut qcx = QueryContext::root(scx, lifetime);
133    let PlannedQuery {
134        mut expr,
135        scope,
136        order_by,
137        limit,
138        offset,
139        project,
140        group_size_hints,
141    } = plan_query(&mut qcx, &query)?;
142
143    let mut finishing = RowSetFinishing {
144        limit,
145        offset,
146        project,
147        order_by,
148    };
149
150    // Attempt to push the finishing's ordering past its projection. This allows
151    // data to be projected down on the workers rather than the coordinator. It
152    // also improves the optimizer's demand analysis, as the optimizer can only
153    // reason about demand information in `expr` (i.e., it can't see
154    // `finishing.project`).
155    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
156
157    if lifetime.is_maintained() {
158        expr.finish_maintained(&mut finishing, group_size_hints);
159    }
160
161    let typ = qcx.relation_type(&expr);
162    let typ = SqlRelationType::new(
163        finishing
164            .project
165            .iter()
166            .map(|i| typ.column_types[*i].clone())
167            .collect(),
168    );
169    let desc = RelationDesc::new(typ, scope.column_names());
170
171    Ok(PlannedRootQuery {
172        expr,
173        desc,
174        finishing,
175        scope,
176    })
177}
178
179/// TODO(ct2): Dedup this with [plan_root_query].
180#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
181pub fn plan_ct_query(
182    qcx: &mut QueryContext,
183    mut query: Query<Aug>,
184) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
185    transform_ast::transform(qcx.scx, &mut query)?;
186    let PlannedQuery {
187        mut expr,
188        scope,
189        order_by,
190        limit,
191        offset,
192        project,
193        group_size_hints,
194    } = plan_query(qcx, &query)?;
195
196    let mut finishing = RowSetFinishing {
197        limit,
198        offset,
199        project,
200        order_by,
201    };
202
203    // Attempt to push the finishing's ordering past its projection. This allows
204    // data to be projected down on the workers rather than the coordinator. It
205    // also improves the optimizer's demand analysis, as the optimizer can only
206    // reason about demand information in `expr` (i.e., it can't see
207    // `finishing.project`).
208    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
209
210    expr.finish_maintained(&mut finishing, group_size_hints);
211
212    let typ = qcx.relation_type(&expr);
213    let typ = SqlRelationType::new(
214        finishing
215            .project
216            .iter()
217            .map(|i| typ.column_types[*i].clone())
218            .collect(),
219    );
220    let desc = RelationDesc::new(typ, scope.column_names());
221
222    Ok(PlannedRootQuery {
223        expr,
224        desc,
225        finishing,
226        scope,
227    })
228}
229
230/// Attempts to push a projection through an order by.
231///
232/// The returned bool indicates whether the pushdown was successful or not.
233/// Successful pushdown requires that all the columns referenced in `order_by`
234/// are included in `project`.
235///
236/// When successful, `expr` is wrapped in a projection node, `order_by` is
237/// rewritten to account for the pushed-down projection, and `project` is
238/// replaced with the trivial projection. When unsuccessful, no changes are made
239/// to any of the inputs.
240fn try_push_projection_order_by(
241    expr: &mut HirRelationExpr,
242    project: &mut Vec<usize>,
243    order_by: &mut Vec<ColumnOrder>,
244) -> bool {
245    let mut unproject = vec![None; expr.arity()];
246    for (out_i, in_i) in project.iter().copied().enumerate() {
247        unproject[in_i] = Some(out_i);
248    }
249    if order_by
250        .iter()
251        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
252    {
253        let trivial_project = (0..project.len()).collect();
254        *expr = expr.take().project(mem::replace(project, trivial_project));
255        for ob in order_by {
256            ob.column = unproject[ob.column].unwrap();
257        }
258        true
259    } else {
260        false
261    }
262}
263
264pub fn plan_insert_query(
265    scx: &StatementContext,
266    table_name: ResolvedItemName,
267    columns: Vec<Ident>,
268    source: InsertSource<Aug>,
269    returning: Vec<SelectItem<Aug>>,
270) -> Result<
271    (
272        CatalogItemId,
273        HirRelationExpr,
274        PlannedRootQuery<Vec<HirScalarExpr>>,
275    ),
276    PlanError,
277> {
278    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
279    let table = scx.get_item_by_resolved_name(&table_name)?;
280
281    // Validate the target of the insert.
282    if table.item_type() != CatalogItemType::Table {
283        sql_bail!(
284            "cannot insert into {} '{}'",
285            table.item_type(),
286            table_name.full_name_str()
287        );
288    }
289    let desc = table.relation_desc().expect("table has desc");
290    let mut defaults = table
291        .writable_table_details()
292        .ok_or_else(|| {
293            sql_err!(
294                "cannot insert into non-writeable table '{}'",
295                table_name.full_name_str()
296            )
297        })?
298        .to_vec();
299
300    for default in &mut defaults {
301        transform_ast::transform(scx, default)?;
302    }
303
304    if table.id().is_system() {
305        sql_bail!(
306            "cannot insert into system table '{}'",
307            table_name.full_name_str()
308        );
309    }
310
311    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
312
313    // Validate target column order.
314    let mut source_types = Vec::with_capacity(columns.len());
315    let mut ordering = Vec::with_capacity(columns.len());
316
317    if columns.is_empty() {
318        // Columns in source query must be in order. Let's guess the full shape and truncate to the
319        // right size later after planning the source query
320        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
321        ordering.extend(0..desc.arity());
322    } else {
323        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
324            .iter()
325            .enumerate()
326            .map(|(idx, (name, typ))| (name, (idx, typ)))
327            .collect();
328
329        for c in &columns {
330            if let Some((idx, typ)) = column_by_name.get(c) {
331                ordering.push(*idx);
332                source_types.push(&typ.scalar_type);
333            } else {
334                sql_bail!(
335                    "column {} of relation {} does not exist",
336                    c.quoted(),
337                    table_name.full_name_str().quoted()
338                );
339            }
340        }
341        if let Some(dup) = columns.iter().duplicates().next() {
342            sql_bail!("column {} specified more than once", dup.quoted());
343        }
344    };
345
346    // Plan the source.
347    let expr = match source {
348        InsertSource::Query(mut query) => {
349            transform_ast::transform(scx, &mut query)?;
350
351            match query {
352                // Special-case simple VALUES clauses as PostgreSQL does.
353                Query {
354                    body: SetExpr::Values(Values(values)),
355                    ctes,
356                    order_by,
357                    limit: None,
358                    offset: None,
359                } if ctes.is_empty() && order_by.is_empty() => {
360                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
361                    plan_values_insert(&qcx, &names, &source_types, &values)?
362                }
363                _ => {
364                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
365                    expr
366                }
367            }
368        }
369        InsertSource::DefaultValues => {
370            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
371        }
372    };
373
374    let expr_arity = expr.arity();
375
376    // Validate that the arity of the source query is at most the size of declared columns or the
377    // size of the table if none are declared
378    let max_columns = if columns.is_empty() {
379        desc.arity()
380    } else {
381        columns.len()
382    };
383    if expr_arity > max_columns {
384        sql_bail!("INSERT has more expressions than target columns");
385    }
386    // But it should never have less than the declared columns (or zero)
387    if expr_arity < columns.len() {
388        sql_bail!("INSERT has more target columns than expressions");
389    }
390
391    // Trim now that we know for sure the correct arity of the source query
392    source_types.truncate(expr_arity);
393    ordering.truncate(expr_arity);
394
395    // Ensure the types of the source query match the types of the target table,
396    // installing assignment casts where necessary and possible.
397    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
398        sql_err!(
399            "column {} is of type {} but expression is of type {}",
400            desc.get_name(ordering[e.column]).quoted(),
401            qcx.humanize_scalar_type(&e.target_type, false),
402            qcx.humanize_scalar_type(&e.source_type, false),
403        )
404    })?;
405
406    // Fill in any omitted columns and rearrange into correct order
407    let mut map_exprs = vec![];
408    let mut project_key = Vec::with_capacity(desc.arity());
409
410    // Maps from table column index to position in the source query
411    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
412
413    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
414    for (col_idx, (col_typ, default)) in column_details {
415        if let Some(src_idx) = col_to_source.get(&col_idx) {
416            project_key.push(*src_idx);
417        } else {
418            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
419            project_key.push(expr_arity + map_exprs.len());
420            map_exprs.push(hir);
421        }
422    }
423
424    let returning = {
425        let (scope, typ) = if let ResolvedItemName::Item {
426            full_name,
427            version: _,
428            ..
429        } = table_name
430        {
431            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
432            let typ = desc.typ().clone();
433            (scope, typ)
434        } else {
435            (Scope::empty(), SqlRelationType::empty())
436        };
437        let ecx = &ExprContext {
438            qcx: &qcx,
439            name: "RETURNING clause",
440            scope: &scope,
441            relation_type: &typ,
442            allow_aggregates: false,
443            allow_subqueries: false,
444            allow_parameters: true,
445            allow_windows: false,
446        };
447        let table_func_names = BTreeMap::new();
448        let mut output_columns = vec![];
449        let mut new_exprs = vec![];
450        let mut new_type = SqlRelationType::empty();
451        for mut si in returning {
452            transform_ast::transform(scx, &mut si)?;
453            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
454                let expr = match &select_item {
455                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
456                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
457                };
458                output_columns.push(column_name);
459                let typ = ecx.column_type(&expr);
460                new_type.column_types.push(typ);
461                new_exprs.push(expr);
462            }
463        }
464        let desc = RelationDesc::new(new_type, output_columns);
465        let desc_arity = desc.arity();
466        PlannedRootQuery {
467            expr: new_exprs,
468            desc,
469            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
470            scope,
471        }
472    };
473
474    Ok((
475        table.id(),
476        expr.map(map_exprs).project(project_key),
477        returning,
478    ))
479}
480
481/// Determines the mapping between some external data and a Materialize relation.
482///
483/// Returns the following:
484/// * [`CatalogItemId`] for the destination table.
485/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
486/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
487///   since we now return a [`MapFilterProject`].
488/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
489///   destination table.
490///
491pub fn plan_copy_item(
492    scx: &StatementContext,
493    item_name: ResolvedItemName,
494    columns: Vec<Ident>,
495) -> Result<
496    (
497        CatalogItemId,
498        RelationDesc,
499        Vec<ColumnIndex>,
500        Option<MapFilterProject>,
501    ),
502    PlanError,
503> {
504    let item = scx.get_item_by_resolved_name(&item_name)?;
505    let fullname = scx.catalog.resolve_full_name(item.name());
506    let table_desc = match item.relation_desc() {
507        Some(desc) => desc.into_owned(),
508        None => {
509            return Err(PlanError::InvalidDependency {
510                name: fullname.to_string(),
511                item_type: item.item_type(),
512            });
513        }
514    };
515    let mut ordering = Vec::with_capacity(columns.len());
516
517    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
518    // should be simplified. The reason they are currently separate code paths is so we can roll
519    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
520
521    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
522    // FROM SOURCE ...`), then we generate an MFP.
523    //
524    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
525    // so it's not always guaranteed that our `item` is a table.
526    let mfp = if let Some(table_defaults) = item.writable_table_details() {
527        let mut table_defaults = table_defaults.to_vec();
528
529        for default in &mut table_defaults {
530            transform_ast::transform(scx, default)?;
531        }
532
533        // Fill in any omitted columns and rearrange into correct order
534        let source_column_names: Vec<_> = columns
535            .iter()
536            .cloned()
537            .map(normalize::column_name)
538            .collect();
539
540        let mut default_exprs = Vec::new();
541        let mut project_keys = Vec::with_capacity(table_desc.arity());
542
543        // For each column in the destination table, either project it from the source data, or provide
544        // an expression to fill in a default value.
545        let column_details = table_desc.iter().zip_eq(table_defaults);
546        for ((col_name, col_type), col_default) in column_details {
547            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
548            if let Some(src_idx) = maybe_src_idx {
549                project_keys.push(src_idx);
550            } else {
551                // If one a column from the table does not exist in the source data, then a default
552                // value will get appended to the end of the input Row from the source data.
553                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
554                let mir = hir.lower_uncorrelated()?;
555                project_keys.push(source_column_names.len() + default_exprs.len());
556                default_exprs.push(mir);
557            }
558        }
559
560        let mfp = MapFilterProject::new(source_column_names.len())
561            .map(default_exprs)
562            .project(project_keys);
563        Some(mfp)
564    } else {
565        None
566    };
567
568    // Create a mapping from input data to the table we're copying into.
569    let source_desc = if columns.is_empty() {
570        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
571        ordering.extend(indexes);
572
573        // The source data should be in the same order as the table.
574        table_desc
575    } else {
576        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
577        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
578            .iter_all()
579            .map(|(idx, name, typ)| (name, (*idx, typ)))
580            .collect();
581
582        let mut names = Vec::with_capacity(columns.len());
583        let mut source_types = Vec::with_capacity(columns.len());
584
585        for c in &columns {
586            if let Some((idx, typ)) = column_by_name.get(c) {
587                ordering.push(*idx);
588                source_types.push((*typ).clone());
589                names.push(c.clone());
590            } else {
591                sql_bail!(
592                    "column {} of relation {} does not exist",
593                    c.quoted(),
594                    item_name.full_name_str().quoted()
595                );
596            }
597        }
598        if let Some(dup) = columns.iter().duplicates().next() {
599            sql_bail!("column {} specified more than once", dup.quoted());
600        }
601
602        // The source data is a different shape than the destination table.
603        RelationDesc::new(SqlRelationType::new(source_types), names)
604    };
605
606    Ok((item.id(), source_desc, ordering, mfp))
607}
608
609/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
610///
611/// TODO(cf3): Merge this method with [`plan_copy_item`].
612pub fn plan_copy_from(
613    scx: &StatementContext,
614    table_name: ResolvedItemName,
615    columns: Vec<Ident>,
616) -> Result<
617    (
618        CatalogItemId,
619        RelationDesc,
620        Vec<ColumnIndex>,
621        Option<MapFilterProject>,
622    ),
623    PlanError,
624> {
625    let table = scx.get_item_by_resolved_name(&table_name)?;
626
627    // Validate the target of the insert.
628    if table.item_type() != CatalogItemType::Table {
629        sql_bail!(
630            "cannot insert into {} '{}'",
631            table.item_type(),
632            table_name.full_name_str()
633        );
634    }
635
636    let _ = table.writable_table_details().ok_or_else(|| {
637        sql_err!(
638            "cannot insert into non-writeable table '{}'",
639            table_name.full_name_str()
640        )
641    })?;
642
643    if table.id().is_system() {
644        sql_bail!(
645            "cannot insert into system table '{}'",
646            table_name.full_name_str()
647        );
648    }
649    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
650
651    Ok((id, desc, ordering, mfp))
652}
653
654/// Builds a plan that adds the default values for the missing columns and re-orders
655/// the datums in the given rows to match the order in the target table.
656pub fn plan_copy_from_rows(
657    pcx: &PlanContext,
658    catalog: &dyn SessionCatalog,
659    target_id: CatalogItemId,
660    target_name: String,
661    columns: Vec<ColumnIndex>,
662    rows: Vec<mz_repr::Row>,
663) -> Result<HirRelationExpr, PlanError> {
664    let scx = StatementContext::new(Some(pcx), catalog);
665
666    // Always copy at the latest version of the table.
667    let table = catalog
668        .try_get_item(&target_id)
669        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
670        .at_version(RelationVersionSelector::Latest);
671
672    let mut defaults = table
673        .writable_table_details()
674        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
675        .to_vec();
676
677    for default in &mut defaults {
678        transform_ast::transform(&scx, default)?;
679    }
680
681    let desc = table.relation_desc().expect("table has desc");
682    let column_types = columns
683        .iter()
684        .map(|x| desc.get_type(x).clone())
685        .map(|mut x| {
686            // Null constraint is enforced later, when inserting the row in the table.
687            // Without this, an assert is hit during lowering.
688            x.nullable = true;
689            x
690        })
691        .collect();
692    let typ = SqlRelationType::new(column_types);
693    let expr = HirRelationExpr::Constant {
694        rows,
695        typ: typ.clone(),
696    };
697
698    // Exit early with just the raw constant if we know that all columns are present
699    // and in the correct order. This lets us bypass expensive downstream optimizations
700    // more easily, as at every stage we know this expression is nothing more than
701    // a constant (as opposed to e.g. a constant with with an identity map and identity
702    // projection).
703    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
704    if columns == default {
705        return Ok(expr);
706    }
707
708    // Fill in any omitted columns and rearrange into correct order
709    let mut map_exprs = vec![];
710    let mut project_key = Vec::with_capacity(desc.arity());
711
712    // Maps from table column index to position in the source query
713    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
714
715    let column_details = desc.iter_all().zip_eq(defaults);
716    for ((col_idx, _col_name, col_typ), default) in column_details {
717        if let Some(src_idx) = col_to_source.get(&col_idx) {
718            project_key.push(*src_idx);
719        } else {
720            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
721            project_key.push(typ.arity() + map_exprs.len());
722            map_exprs.push(hir);
723        }
724    }
725
726    Ok(expr.map(map_exprs).project(project_key))
727}
728
729/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
730pub struct ReadThenWritePlan {
731    pub id: CatalogItemId,
732    /// Read portion of query.
733    ///
734    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
735    /// retractions.
736    pub selection: HirRelationExpr,
737    /// Map from column index to SET expression. Empty for DELETE statements.
738    pub assignments: BTreeMap<usize, HirScalarExpr>,
739    pub finishing: RowSetFinishing,
740}
741
742pub fn plan_delete_query(
743    scx: &StatementContext,
744    mut delete_stmt: DeleteStatement<Aug>,
745) -> Result<ReadThenWritePlan, PlanError> {
746    transform_ast::transform(scx, &mut delete_stmt)?;
747
748    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
749    plan_mutation_query_inner(
750        qcx,
751        delete_stmt.table_name,
752        delete_stmt.alias,
753        delete_stmt.using,
754        vec![],
755        delete_stmt.selection,
756    )
757}
758
759pub fn plan_update_query(
760    scx: &StatementContext,
761    mut update_stmt: UpdateStatement<Aug>,
762) -> Result<ReadThenWritePlan, PlanError> {
763    transform_ast::transform(scx, &mut update_stmt)?;
764
765    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
766
767    plan_mutation_query_inner(
768        qcx,
769        update_stmt.table_name,
770        update_stmt.alias,
771        vec![],
772        update_stmt.assignments,
773        update_stmt.selection,
774    )
775}
776
777pub fn plan_mutation_query_inner(
778    qcx: QueryContext,
779    table_name: ResolvedItemName,
780    alias: Option<TableAlias>,
781    using: Vec<TableWithJoins<Aug>>,
782    assignments: Vec<Assignment<Aug>>,
783    selection: Option<Expr<Aug>>,
784) -> Result<ReadThenWritePlan, PlanError> {
785    // Get ID and version of the relation desc.
786    let (id, version) = match table_name {
787        ResolvedItemName::Item { id, version, .. } => (id, version),
788        _ => sql_bail!("cannot mutate non-user table"),
789    };
790
791    // Perform checks on item with given ID.
792    let item = qcx.scx.get_item(&id).at_version(version);
793    if item.item_type() != CatalogItemType::Table {
794        sql_bail!(
795            "cannot mutate {} '{}'",
796            item.item_type(),
797            table_name.full_name_str()
798        );
799    }
800    let _ = item.writable_table_details().ok_or_else(|| {
801        sql_err!(
802            "cannot mutate non-writeable table '{}'",
803            table_name.full_name_str()
804        )
805    })?;
806    if id.is_system() {
807        sql_bail!(
808            "cannot mutate system table '{}'",
809            table_name.full_name_str()
810        );
811    }
812
813    // Derive structs for operation from validated table
814    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
815    let scope = plan_table_alias(scope, alias.as_ref())?;
816    let desc = item.relation_desc().expect("table has desc");
817    let relation_type = qcx.relation_type(&get);
818
819    if using.is_empty() {
820        if let Some(expr) = selection {
821            let ecx = &ExprContext {
822                qcx: &qcx,
823                name: "WHERE clause",
824                scope: &scope,
825                relation_type: &relation_type,
826                allow_aggregates: false,
827                allow_subqueries: true,
828                allow_parameters: true,
829                allow_windows: false,
830            };
831            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
832            get = get.filter(vec![expr]);
833        }
834    } else {
835        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
836    }
837
838    let mut sets = BTreeMap::new();
839    for Assignment { id, value } in assignments {
840        // Get the index and type of the column.
841        let name = normalize::column_name(id);
842        match desc.get_by_name(&name) {
843            Some((idx, typ)) => {
844                let ecx = &ExprContext {
845                    qcx: &qcx,
846                    name: "SET clause",
847                    scope: &scope,
848                    relation_type: &relation_type,
849                    allow_aggregates: false,
850                    allow_subqueries: false,
851                    allow_parameters: true,
852                    allow_windows: false,
853                };
854                let expr = plan_expr(ecx, &value)?.cast_to(
855                    ecx,
856                    CastContext::Assignment,
857                    &typ.scalar_type,
858                )?;
859
860                if sets.insert(idx, expr).is_some() {
861                    sql_bail!("column {} set twice", name)
862                }
863            }
864            None => sql_bail!("unknown column {}", name),
865        };
866    }
867
868    let finishing = RowSetFinishing {
869        order_by: vec![],
870        limit: None,
871        offset: 0,
872        project: (0..desc.arity()).collect(),
873    };
874
875    Ok(ReadThenWritePlan {
876        id,
877        selection: get,
878        finishing,
879        assignments: sets,
880    })
881}
882
883// Adjust `get` to perform an existential subquery on `using` accounting for
884// `selection`.
885//
886// If `USING`, we essentially want to rewrite the query as a correlated
887// existential subquery, i.e.
888// ```
889// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
890// ```
891// However, we can't do that directly because of esoteric rules w/r/t `lateral`
892// subqueries.
893// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
894fn handle_mutation_using_clause(
895    qcx: &QueryContext,
896    selection: Option<Expr<Aug>>,
897    using: Vec<TableWithJoins<Aug>>,
898    get: HirRelationExpr,
899    outer_scope: Scope,
900) -> Result<HirRelationExpr, PlanError> {
901    // Plan `USING` as a cross-joined `FROM` without knowledge of the
902    // statement's `FROM` target. This prevents `lateral` subqueries from
903    // "seeing" the `FROM` target.
904    let (mut using_rel_expr, using_scope) =
905        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
906            let (left, left_scope) = l;
907            plan_join(
908                qcx,
909                left,
910                left_scope,
911                &Join {
912                    relation: TableFactor::NestedJoin {
913                        join: Box::new(twj),
914                        alias: None,
915                    },
916                    join_operator: JoinOperator::CrossJoin,
917                },
918            )
919        })?;
920
921    if let Some(expr) = selection {
922        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
923        // PG-like semantics e.g. expressing ambiguous column references. We put
924        // `USING...` first for no real reason, but making a different decision
925        // would require adjusting the column references on this relation
926        // differently.
927        let on = HirScalarExpr::literal_true();
928        let joined = using_rel_expr
929            .clone()
930            .join(get.clone(), on, JoinKind::Inner);
931        let joined_scope = using_scope.product(outer_scope)?;
932        let joined_relation_type = qcx.relation_type(&joined);
933
934        let ecx = &ExprContext {
935            qcx,
936            name: "WHERE clause",
937            scope: &joined_scope,
938            relation_type: &joined_relation_type,
939            allow_aggregates: false,
940            allow_subqueries: true,
941            allow_parameters: true,
942            allow_windows: false,
943        };
944
945        // Plan the filter expression on `FROM, USING...`.
946        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
947
948        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
949        // those to the right of `using_rel_expr`) to instead be correlated to
950        // the outer relation, i.e. `get`.
951        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
952        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
953        use mz_expr::visit::Visit;
954        expr.visit_mut_post(&mut |e| {
955            if let HirScalarExpr::Column(c, _name) = e {
956                if c.column >= using_rel_arity {
957                    c.level += 1;
958                    c.column -= using_rel_arity;
959                };
960            }
961        })?;
962
963        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
964        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
965        // relation.
966        using_rel_expr = using_rel_expr.filter(vec![expr]);
967    } else {
968        // Check that scopes are at compatible (i.e. do not double-reference
969        // same table), despite lack of selection
970        let _joined_scope = using_scope.product(outer_scope)?;
971    }
972    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
973    // whether any rows are returned, and not on the contents of those rows,
974    // the output list of the subquery is normally unimportant.
975    //
976    // This means we don't need to worry about projecting/mapping any
977    // additional expressions here.
978    //
979    // https://www.postgresql.org/docs/14/functions-subquery.html
980
981    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
982    Ok(get.filter(vec![using_rel_expr.exists()]))
983}
984
985#[derive(Debug)]
986pub(crate) struct CastRelationError {
987    pub(crate) column: usize,
988    pub(crate) source_type: SqlScalarType,
989    pub(crate) target_type: SqlScalarType,
990}
991
992/// Cast a relation from one type to another using the specified type of cast.
993///
994/// The length of `target_types` must match the arity of `expr`.
995pub(crate) fn cast_relation<'a, I>(
996    qcx: &QueryContext,
997    ccx: CastContext,
998    expr: HirRelationExpr,
999    target_types: I,
1000) -> Result<HirRelationExpr, CastRelationError>
1001where
1002    I: IntoIterator<Item = &'a SqlScalarType>,
1003{
1004    let ecx = &ExprContext {
1005        qcx,
1006        name: "values",
1007        scope: &Scope::empty(),
1008        relation_type: &qcx.relation_type(&expr),
1009        allow_aggregates: false,
1010        allow_subqueries: true,
1011        allow_parameters: true,
1012        allow_windows: false,
1013    };
1014    let mut map_exprs = vec![];
1015    let mut project_key = vec![];
1016    for (i, target_typ) in target_types.into_iter().enumerate() {
1017        let expr = HirScalarExpr::column(i);
1018        // We plan every cast and check the evaluated expressions rather than
1019        // checking the types directly because of some complex casting rules
1020        // between types not expressed in `SqlScalarType` equality.
1021        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1022            Ok(cast_expr) => {
1023                if expr == cast_expr {
1024                    // Cast between types was unnecessary
1025                    project_key.push(i);
1026                } else {
1027                    // Cast between types required
1028                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1029                    map_exprs.push(cast_expr);
1030                }
1031            }
1032            Err(_) => {
1033                return Err(CastRelationError {
1034                    column: i,
1035                    source_type: ecx.scalar_type(&expr),
1036                    target_type: target_typ.clone(),
1037                });
1038            }
1039        }
1040    }
1041    Ok(expr.map(map_exprs).project(project_key))
1042}
1043
1044/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1045/// VIEW` statement.
1046pub fn plan_as_of(
1047    scx: &StatementContext,
1048    as_of: Option<AsOf<Aug>>,
1049) -> Result<QueryWhen, PlanError> {
1050    match as_of {
1051        None => Ok(QueryWhen::Immediately),
1052        Some(as_of) => match as_of {
1053            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1054            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1055        },
1056    }
1057}
1058
1059/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1060///
1061/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1062/// - not a constant,
1063/// - not castable to MzTimestamp,
1064/// - is null,
1065/// - contains an unmaterializable function,
1066/// - some other evaluation error occurs, e.g., a division by 0,
1067/// - contains aggregates, subqueries, parameters, or window function calls.
1068pub fn plan_as_of_or_up_to(
1069    scx: &StatementContext,
1070    mut expr: Expr<Aug>,
1071) -> Result<mz_repr::Timestamp, PlanError> {
1072    let scope = Scope::empty();
1073    let desc = RelationDesc::empty();
1074    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1075    // evaluated only once.)
1076    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1077    transform_ast::transform(scx, &mut expr)?;
1078    let ecx = &ExprContext {
1079        qcx: &qcx,
1080        name: "AS OF or UP TO",
1081        scope: &scope,
1082        relation_type: desc.typ(),
1083        allow_aggregates: false,
1084        allow_subqueries: false,
1085        allow_parameters: false,
1086        allow_windows: false,
1087    };
1088    let hir = plan_expr(ecx, &expr)?.cast_to(
1089        ecx,
1090        CastContext::Assignment,
1091        &SqlScalarType::MzTimestamp,
1092    )?;
1093    if hir.contains_unmaterializable() {
1094        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1095    }
1096    // At this point, we definitely have a constant expression:
1097    // - it can't contain any unmaterializable functions;
1098    // - it can't refer to any columns.
1099    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1100    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1101    // division by 0.
1102    let timestamp = hir
1103        .into_literal_mz_timestamp()
1104        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1105    Ok(timestamp)
1106}
1107
1108/// Plans an expression in the AS position of a `CREATE SECRET`.
1109pub fn plan_secret_as(
1110    scx: &StatementContext,
1111    mut expr: Expr<Aug>,
1112) -> Result<MirScalarExpr, PlanError> {
1113    let scope = Scope::empty();
1114    let desc = RelationDesc::empty();
1115    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1116
1117    transform_ast::transform(scx, &mut expr)?;
1118
1119    let ecx = &ExprContext {
1120        qcx: &qcx,
1121        name: "AS",
1122        scope: &scope,
1123        relation_type: desc.typ(),
1124        allow_aggregates: false,
1125        allow_subqueries: false,
1126        allow_parameters: false,
1127        allow_windows: false,
1128    };
1129    let expr = plan_expr(ecx, &expr)?
1130        .type_as(ecx, &SqlScalarType::Bytes)?
1131        .lower_uncorrelated()?;
1132    Ok(expr)
1133}
1134
1135/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1136pub fn plan_webhook_validate_using(
1137    scx: &StatementContext,
1138    validate_using: CreateWebhookSourceCheck<Aug>,
1139) -> Result<WebhookValidation, PlanError> {
1140    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1141
1142    let CreateWebhookSourceCheck {
1143        options,
1144        using: mut expr,
1145    } = validate_using;
1146
1147    let mut column_typs = vec![];
1148    let mut column_names = vec![];
1149
1150    let (bodies, headers, secrets) = options
1151        .map(|o| (o.bodies, o.headers, o.secrets))
1152        .unwrap_or_default();
1153
1154    // Append all of the bodies so they can be used in the expression.
1155    let mut body_tuples = vec![];
1156    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1157        let scalar_type = use_bytes
1158            .then_some(SqlScalarType::Bytes)
1159            .unwrap_or(SqlScalarType::String);
1160        let name = alias
1161            .map(|a| a.into_string())
1162            .unwrap_or_else(|| "body".to_string());
1163
1164        column_typs.push(SqlColumnType {
1165            scalar_type,
1166            nullable: false,
1167        });
1168        column_names.push(name);
1169
1170        // Store the column index so we can be sure to provide this body correctly.
1171        let column_idx = column_typs.len() - 1;
1172        // Double check we're consistent with column names.
1173        assert_eq!(
1174            column_idx,
1175            column_names.len() - 1,
1176            "body column names and types don't match"
1177        );
1178        body_tuples.push((column_idx, use_bytes));
1179    }
1180
1181    // Append all of the headers so they can be used in the expression.
1182    let mut header_tuples = vec![];
1183
1184    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1185        let value_type = use_bytes
1186            .then_some(SqlScalarType::Bytes)
1187            .unwrap_or(SqlScalarType::String);
1188        let name = alias
1189            .map(|a| a.into_string())
1190            .unwrap_or_else(|| "headers".to_string());
1191
1192        column_typs.push(SqlColumnType {
1193            scalar_type: SqlScalarType::Map {
1194                value_type: Box::new(value_type),
1195                custom_id: None,
1196            },
1197            nullable: false,
1198        });
1199        column_names.push(name);
1200
1201        // Store the column index so we can be sure to provide this body correctly.
1202        let column_idx = column_typs.len() - 1;
1203        // Double check we're consistent with column names.
1204        assert_eq!(
1205            column_idx,
1206            column_names.len() - 1,
1207            "header column names and types don't match"
1208        );
1209        header_tuples.push((column_idx, use_bytes));
1210    }
1211
1212    // Append all secrets so they can be used in the expression.
1213    let mut validation_secrets = vec![];
1214
1215    for CreateWebhookSourceSecret {
1216        secret,
1217        alias,
1218        use_bytes,
1219    } in secrets
1220    {
1221        // Either provide the secret to the validation expression as Bytes or a String.
1222        let scalar_type = use_bytes
1223            .then_some(SqlScalarType::Bytes)
1224            .unwrap_or(SqlScalarType::String);
1225
1226        column_typs.push(SqlColumnType {
1227            scalar_type,
1228            nullable: false,
1229        });
1230        let ResolvedItemName::Item {
1231            id,
1232            full_name: FullItemName { item, .. },
1233            ..
1234        } = secret
1235        else {
1236            return Err(PlanError::InvalidSecret(Box::new(secret)));
1237        };
1238
1239        // Plan the expression using the secret's alias, if one is provided.
1240        let name = if let Some(alias) = alias {
1241            alias.into_string()
1242        } else {
1243            item
1244        };
1245        column_names.push(name);
1246
1247        // Get the column index that corresponds for this secret, so we can make sure to provide the
1248        // secrets in the correct order during evaluation.
1249        let column_idx = column_typs.len() - 1;
1250        // Double check that our column names and types match.
1251        assert_eq!(
1252            column_idx,
1253            column_names.len() - 1,
1254            "column names and types don't match"
1255        );
1256
1257        validation_secrets.push(WebhookValidationSecret {
1258            id,
1259            column_idx,
1260            use_bytes,
1261        });
1262    }
1263
1264    let relation_typ = SqlRelationType::new(column_typs);
1265    let desc = RelationDesc::new(relation_typ, column_names.clone());
1266    let scope = Scope::from_source(None, column_names);
1267
1268    transform_ast::transform(scx, &mut expr)?;
1269
1270    let ecx = &ExprContext {
1271        qcx: &qcx,
1272        name: "CHECK",
1273        scope: &scope,
1274        relation_type: desc.typ(),
1275        allow_aggregates: false,
1276        allow_subqueries: false,
1277        allow_parameters: false,
1278        allow_windows: false,
1279    };
1280    let expr = plan_expr(ecx, &expr)?
1281        .type_as(ecx, &SqlScalarType::Bool)?
1282        .lower_uncorrelated()?;
1283    let validation = WebhookValidation {
1284        expression: expr,
1285        relation_desc: desc,
1286        bodies: body_tuples,
1287        headers: header_tuples,
1288        secrets: validation_secrets,
1289    };
1290    Ok(validation)
1291}
1292
1293pub fn plan_default_expr(
1294    scx: &StatementContext,
1295    expr: &Expr<Aug>,
1296    target_ty: &SqlScalarType,
1297) -> Result<HirScalarExpr, PlanError> {
1298    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1299    let ecx = &ExprContext {
1300        qcx: &qcx,
1301        name: "DEFAULT expression",
1302        scope: &Scope::empty(),
1303        relation_type: &SqlRelationType::empty(),
1304        allow_aggregates: false,
1305        allow_subqueries: false,
1306        allow_parameters: false,
1307        allow_windows: false,
1308    };
1309    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1310    Ok(hir)
1311}
1312
1313pub fn plan_params<'a>(
1314    scx: &'a StatementContext,
1315    params: Vec<Expr<Aug>>,
1316    desc: &StatementDesc,
1317) -> Result<Params, PlanError> {
1318    if params.len() != desc.param_types.len() {
1319        sql_bail!(
1320            "expected {} params, got {}",
1321            desc.param_types.len(),
1322            params.len()
1323        );
1324    }
1325
1326    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1327
1328    let mut datums = Row::default();
1329    let mut packer = datums.packer();
1330    let mut actual_types = Vec::new();
1331    let temp_storage = &RowArena::new();
1332    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1333        transform_ast::transform(scx, &mut expr)?;
1334
1335        let ecx = execute_expr_context(&qcx);
1336        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1337        let actual_ty = ecx.scalar_type(&ex);
1338        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1339            return Err(PlanError::WrongParameterType(
1340                i + 1,
1341                ecx.humanize_scalar_type(expected_ty, false),
1342                ecx.humanize_scalar_type(&actual_ty, false),
1343            ));
1344        }
1345        let ex = ex.lower_uncorrelated()?;
1346        let evaled = ex.eval(&[], temp_storage)?;
1347        packer.push(evaled);
1348        actual_types.push(actual_ty);
1349    }
1350    Ok(Params {
1351        datums,
1352        execute_types: actual_types,
1353        expected_types: desc.param_types.clone(),
1354    })
1355}
1356
1357static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1358static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1359
1360/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1361pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1362    ExprContext {
1363        qcx,
1364        name: "EXECUTE",
1365        scope: &EXECUTE_CONTEXT_SCOPE,
1366        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1367        allow_aggregates: false,
1368        allow_subqueries: false,
1369        allow_parameters: false,
1370        allow_windows: false,
1371    }
1372}
1373
1374/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1375///
1376/// This is an assignment cast also in Postgres, see
1377/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1378pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1379    LazyLock::new(|| CastContext::Assignment);
1380
1381pub fn plan_index_exprs<'a>(
1382    scx: &'a StatementContext,
1383    on_desc: &RelationDesc,
1384    exprs: Vec<Expr<Aug>>,
1385) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1386    let scope = Scope::from_source(None, on_desc.iter_names());
1387    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1388
1389    let ecx = &ExprContext {
1390        qcx: &qcx,
1391        name: "CREATE INDEX",
1392        scope: &scope,
1393        relation_type: on_desc.typ(),
1394        allow_aggregates: false,
1395        allow_subqueries: false,
1396        allow_parameters: false,
1397        allow_windows: false,
1398    };
1399    let mut out = vec![];
1400    for mut expr in exprs {
1401        transform_ast::transform(scx, &mut expr)?;
1402        let expr = plan_expr_or_col_index(ecx, &expr)?;
1403        let mut expr = expr.lower_uncorrelated()?;
1404        expr.reduce(&on_desc.typ().column_types);
1405        out.push(expr);
1406    }
1407    Ok(out)
1408}
1409
1410fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1411    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1412        Some(column) => Ok(HirScalarExpr::column(column)),
1413        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1414    }
1415}
1416
1417fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1418    match e {
1419        Expr::Value(Value::Number(n)) => {
1420            let n = n.parse::<usize>().map_err(|e| {
1421                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1422            })?;
1423            if n < 1 || n > max {
1424                sql_bail!(
1425                    "column reference {} in {} is out of range (1 - {})",
1426                    n,
1427                    name,
1428                    max
1429                );
1430            }
1431            Ok(Some(n - 1))
1432        }
1433        _ => Ok(None),
1434    }
1435}
1436
1437struct PlannedQuery {
1438    expr: HirRelationExpr,
1439    scope: Scope,
1440    order_by: Vec<ColumnOrder>,
1441    limit: Option<HirScalarExpr>,
1442    /// `offset` is either
1443    /// - an Int64 literal
1444    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1445    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1446    ///   later.)
1447    offset: HirScalarExpr,
1448    project: Vec<usize>,
1449    group_size_hints: GroupSizeHints,
1450}
1451
1452fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1453    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1454}
1455
1456fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1457    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1458    // for the identifiers, so that they can be re-installed before returning.
1459    let cte_bindings = plan_ctes(qcx, q)?;
1460
1461    let limit = match &q.limit {
1462        None => None,
1463        Some(Limit {
1464            quantity,
1465            with_ties: false,
1466        }) => {
1467            let ecx = &ExprContext {
1468                qcx,
1469                name: "LIMIT",
1470                scope: &Scope::empty(),
1471                relation_type: &SqlRelationType::empty(),
1472                allow_aggregates: false,
1473                allow_subqueries: true,
1474                allow_parameters: true,
1475                allow_windows: false,
1476            };
1477            let limit = plan_expr(ecx, quantity)?;
1478            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1479
1480            let limit = if limit.is_constant() {
1481                let arena = RowArena::new();
1482                let limit = limit.lower_uncorrelated()?;
1483
1484                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1485                // that the error happened in a LIMIT clause, so that we have better error msg for
1486                // something like `SELECT 5 LIMIT 'aaa'`.
1487                match limit.eval(&[], &arena)? {
1488                    d @ Datum::Int64(v) if v >= 0 => {
1489                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1490                    }
1491                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1492                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1493                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1494                }
1495            } else {
1496                // Gate non-constant LIMIT expressions behind a feature flag
1497                qcx.scx
1498                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1499                limit
1500            };
1501
1502            Some(limit)
1503        }
1504        Some(Limit {
1505            quantity: _,
1506            with_ties: true,
1507        }) => bail_unsupported!("FETCH ... WITH TIES"),
1508    };
1509
1510    let offset = match &q.offset {
1511        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1512        Some(offset) => {
1513            let ecx = &ExprContext {
1514                qcx,
1515                name: "OFFSET",
1516                scope: &Scope::empty(),
1517                relation_type: &SqlRelationType::empty(),
1518                allow_aggregates: false,
1519                allow_subqueries: false,
1520                allow_parameters: true,
1521                allow_windows: false,
1522            };
1523            let offset = plan_expr(ecx, offset)?;
1524            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1525
1526            let offset = if offset.is_constant() {
1527                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1528                let offset_value = offset_into_value(offset)?;
1529                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1530            } else {
1531                // The only case when this is allowed to not be a constant is if it contains
1532                // parameters. (In which case, we'll later check that it's a constant after
1533                // parameter binding.)
1534                if !offset.contains_parameters() {
1535                    return Err(PlanError::InvalidOffset(format!(
1536                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1537                        offset
1538                    )));
1539                }
1540                offset
1541            };
1542            offset
1543        }
1544    };
1545
1546    let mut planned_query = match &q.body {
1547        SetExpr::Select(s) => {
1548            // Extract query options.
1549            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1550            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1551
1552            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1553            PlannedQuery {
1554                expr: plan.expr,
1555                scope: plan.scope,
1556                order_by: plan.order_by,
1557                project: plan.project,
1558                limit,
1559                offset,
1560                group_size_hints,
1561            }
1562        }
1563        _ => {
1564            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1565            let ecx = &ExprContext {
1566                qcx,
1567                name: "ORDER BY clause of a set expression",
1568                scope: &scope,
1569                relation_type: &qcx.relation_type(&expr),
1570                allow_aggregates: false,
1571                allow_subqueries: true,
1572                allow_parameters: true,
1573                allow_windows: false,
1574            };
1575            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1576            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1577            let project = (0..ecx.relation_type.arity()).collect();
1578            PlannedQuery {
1579                expr: expr.map(map_exprs),
1580                scope,
1581                order_by,
1582                limit,
1583                project,
1584                offset,
1585                group_size_hints: GroupSizeHints::default(),
1586            }
1587        }
1588    };
1589
1590    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1591    match &q.ctes {
1592        CteBlock::Simple(_) => {
1593            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1594                if let Some(cte) = qcx.ctes.remove(&id) {
1595                    planned_query.expr = HirRelationExpr::Let {
1596                        name: cte.name,
1597                        id: id.clone(),
1598                        value: Box::new(value),
1599                        body: Box::new(planned_query.expr),
1600                    };
1601                }
1602                if let Some(shadowed_val) = shadowed_val {
1603                    qcx.ctes.insert(id, shadowed_val);
1604                }
1605            }
1606        }
1607        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1608            let MutRecBlockOptionExtracted {
1609                recursion_limit,
1610                return_at_recursion_limit,
1611                error_at_recursion_limit,
1612                seen: _,
1613            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1614            let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1615                (None, None, None) => None,
1616                (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1617                (None, Some(max_iters), None) => Some((max_iters, true)),
1618                (None, None, Some(max_iters)) => Some((max_iters, false)),
1619                _ => {
1620                    return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1621                }
1622            }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1623                max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1624                return_at_limit: *return_at_limit,
1625            }))?;
1626
1627            let mut bindings = Vec::new();
1628            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1629                if let Some(cte) = qcx.ctes.remove(&id) {
1630                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1631                }
1632                if let Some(shadowed_val) = shadowed_val {
1633                    qcx.ctes.insert(id, shadowed_val);
1634                }
1635            }
1636            if !bindings.is_empty() {
1637                planned_query.expr = HirRelationExpr::LetRec {
1638                    limit,
1639                    bindings,
1640                    body: Box::new(planned_query.expr),
1641                }
1642            }
1643        }
1644    }
1645
1646    Ok(planned_query)
1647}
1648
1649/// Converts an OFFSET expression into a value.
1650pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1651    let offset = offset
1652        .try_into_literal_int64()
1653        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1654    if offset < 0 {
1655        return Err(PlanError::InvalidOffset(format!(
1656            "must not be negative, got {}",
1657            offset
1658        )));
1659    }
1660    Ok(offset)
1661}
1662
1663generate_extracted_config!(
1664    MutRecBlockOption,
1665    (RecursionLimit, u64),
1666    (ReturnAtRecursionLimit, u64),
1667    (ErrorAtRecursionLimit, u64)
1668);
1669
1670/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1671///
1672/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1673/// shadowed value that can be reinstalled once the planning has completed.
1674pub fn plan_ctes(
1675    qcx: &mut QueryContext,
1676    q: &Query<Aug>,
1677) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1678    // Accumulate planned expressions and shadowed descriptions.
1679    let mut result = Vec::new();
1680    // Retain the old descriptions of CTE bindings so that we can restore them
1681    // after we're done planning this SELECT.
1682    let mut shadowed_descs = BTreeMap::new();
1683
1684    // A reused identifier indicates a reused name.
1685    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1686        sql_bail!(
1687            "WITH query name {} specified more than once",
1688            normalize::ident_ref(ident).quoted()
1689        )
1690    }
1691
1692    match &q.ctes {
1693        CteBlock::Simple(ctes) => {
1694            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1695            for cte in ctes.iter() {
1696                let cte_name = normalize::ident(cte.alias.name.clone());
1697                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1698                let typ = qcx.relation_type(&val);
1699                let mut desc = RelationDesc::new(typ, scope.column_names());
1700                plan_utils::maybe_rename_columns(
1701                    format!("CTE {}", cte.alias.name),
1702                    &mut desc,
1703                    &cte.alias.columns,
1704                )?;
1705                // Capture the prior value if it exists, so that it can be re-installed.
1706                let shadowed = qcx.ctes.insert(
1707                    cte.id,
1708                    CteDesc {
1709                        name: cte_name,
1710                        desc,
1711                    },
1712                );
1713
1714                result.push((cte.id, val, shadowed));
1715            }
1716        }
1717        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1718            // Insert column types into `qcx.ctes` first for recursive bindings.
1719            for cte in ctes.iter() {
1720                let cte_name = normalize::ident(cte.name.clone());
1721                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1722                for column in cte.columns.iter() {
1723                    desc_columns.push((
1724                        normalize::column_name(column.name.clone()),
1725                        SqlColumnType {
1726                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1727                            nullable: true,
1728                        },
1729                    ));
1730                }
1731                let desc = RelationDesc::from_names_and_types(desc_columns);
1732                let shadowed = qcx.ctes.insert(
1733                    cte.id,
1734                    CteDesc {
1735                        name: cte_name,
1736                        desc,
1737                    },
1738                );
1739                // Capture the prior value if it exists, so that it can be re-installed.
1740                if let Some(shadowed) = shadowed {
1741                    shadowed_descs.insert(cte.id, shadowed);
1742                }
1743            }
1744
1745            // Plan all CTEs and validate the proposed types.
1746            for cte in ctes.iter() {
1747                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1748
1749                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1750
1751                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1752                    // Once WMR CTEs support NOT NULL constraints, check that
1753                    // nullability of derived column types are compatible.
1754                    sql_bail!(
1755                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1756                    );
1757                }
1758
1759                if !proposed_typ.keys.is_empty() {
1760                    // Once WMR CTEs support keys, check that keys exactly
1761                    // overlap.
1762                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1763                }
1764
1765                // Validate that the derived and proposed types are the same.
1766                let derived_typ = qcx.relation_type(&val);
1767
1768                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1769                    let cte_name = normalize::ident(cte.name.clone());
1770                    let proposed_typ = proposed_typ
1771                        .column_types
1772                        .iter()
1773                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1774                        .collect::<Vec<_>>();
1775                    let inferred_typ = derived_typ
1776                        .column_types
1777                        .iter()
1778                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1779                        .collect::<Vec<_>>();
1780                    Err(PlanError::RecursiveTypeMismatch(
1781                        cte_name,
1782                        proposed_typ,
1783                        inferred_typ,
1784                    ))
1785                };
1786
1787                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1788                    return type_err(proposed_typ, derived_typ);
1789                }
1790
1791                // Cast derived types to proposed types or error.
1792                let val = match cast_relation(
1793                    qcx,
1794                    // Choose `CastContext::Assignment`` because the user has
1795                    // been explicit about the types they expect. Choosing
1796                    // `CastContext::Implicit` is not "strong" enough to impose
1797                    // typmods from proposed types onto values.
1798                    CastContext::Assignment,
1799                    val,
1800                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1801                ) {
1802                    Ok(val) => val,
1803                    Err(_) => return type_err(proposed_typ, derived_typ),
1804                };
1805
1806                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1807            }
1808        }
1809    }
1810
1811    Ok(result)
1812}
1813
1814pub fn plan_nested_query(
1815    qcx: &mut QueryContext,
1816    q: &Query<Aug>,
1817) -> Result<(HirRelationExpr, Scope), PlanError> {
1818    let PlannedQuery {
1819        mut expr,
1820        scope,
1821        order_by,
1822        limit,
1823        offset,
1824        project,
1825        group_size_hints,
1826    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1827    if limit.is_some()
1828        || !offset
1829            .clone()
1830            .try_into_literal_int64()
1831            .is_ok_and(|offset| offset == 0)
1832    {
1833        expr = HirRelationExpr::top_k(
1834            expr,
1835            vec![],
1836            order_by,
1837            limit,
1838            offset,
1839            group_size_hints.limit_input_group_size,
1840        );
1841    }
1842    Ok((expr.project(project), scope))
1843}
1844
1845fn plan_set_expr(
1846    qcx: &mut QueryContext,
1847    q: &SetExpr<Aug>,
1848) -> Result<(HirRelationExpr, Scope), PlanError> {
1849    match q {
1850        SetExpr::Select(select) => {
1851            let order_by_exprs = Vec::new();
1852            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1853            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1854            // should not have planned any ordering.
1855            assert!(plan.order_by.is_empty());
1856            Ok((plan.expr.project(plan.project), plan.scope))
1857        }
1858        SetExpr::SetOperation {
1859            op,
1860            all,
1861            left,
1862            right,
1863        } => {
1864            // Plan the LHS and RHS.
1865            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1866            let (right_expr, right_scope) =
1867                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1868
1869            // Validate that the LHS and RHS are the same width.
1870            let left_type = qcx.relation_type(&left_expr);
1871            let right_type = qcx.relation_type(&right_expr);
1872            if left_type.arity() != right_type.arity() {
1873                sql_bail!(
1874                    "each {} query must have the same number of columns: {} vs {}",
1875                    op,
1876                    left_type.arity(),
1877                    right_type.arity(),
1878                );
1879            }
1880
1881            // Match the types of the corresponding columns on the LHS and RHS
1882            // using the normal type coercion rules. This is equivalent to
1883            // `coerce_homogeneous_exprs`, but implemented in terms of
1884            // `HirRelationExpr` rather than `HirScalarExpr`.
1885            let left_ecx = &ExprContext {
1886                qcx,
1887                name: &op.to_string(),
1888                scope: &left_scope,
1889                relation_type: &left_type,
1890                allow_aggregates: false,
1891                allow_subqueries: false,
1892                allow_parameters: false,
1893                allow_windows: false,
1894            };
1895            let right_ecx = &ExprContext {
1896                qcx,
1897                name: &op.to_string(),
1898                scope: &right_scope,
1899                relation_type: &right_type,
1900                allow_aggregates: false,
1901                allow_subqueries: false,
1902                allow_parameters: false,
1903                allow_windows: false,
1904            };
1905            let mut left_casts = vec![];
1906            let mut right_casts = vec![];
1907            for (i, (left_type, right_type)) in left_type
1908                .column_types
1909                .iter()
1910                .zip_eq(right_type.column_types.iter())
1911                .enumerate()
1912            {
1913                let types = &[
1914                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1915                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1916                ];
1917                let target =
1918                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1919                match typeconv::plan_cast(
1920                    left_ecx,
1921                    CastContext::Implicit,
1922                    HirScalarExpr::column(i),
1923                    &target,
1924                ) {
1925                    Ok(expr) => left_casts.push(expr),
1926                    Err(_) => sql_bail!(
1927                        "{} types {} and {} cannot be matched",
1928                        op,
1929                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1930                        qcx.humanize_scalar_type(&target, false),
1931                    ),
1932                }
1933                match typeconv::plan_cast(
1934                    right_ecx,
1935                    CastContext::Implicit,
1936                    HirScalarExpr::column(i),
1937                    &target,
1938                ) {
1939                    Ok(expr) => right_casts.push(expr),
1940                    Err(_) => sql_bail!(
1941                        "{} types {} and {} cannot be matched",
1942                        op,
1943                        qcx.humanize_scalar_type(&target, false),
1944                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1945                    ),
1946                }
1947            }
1948            let lhs = if left_casts
1949                .iter()
1950                .enumerate()
1951                .any(|(i, e)| e != &HirScalarExpr::column(i))
1952            {
1953                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1954                left_expr.map(left_casts).project(project_key)
1955            } else {
1956                left_expr
1957            };
1958            let rhs = if right_casts
1959                .iter()
1960                .enumerate()
1961                .any(|(i, e)| e != &HirScalarExpr::column(i))
1962            {
1963                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1964                right_expr.map(right_casts).project(project_key)
1965            } else {
1966                right_expr
1967            };
1968
1969            let relation_expr = match op {
1970                SetOperator::Union => {
1971                    if *all {
1972                        lhs.union(rhs)
1973                    } else {
1974                        lhs.union(rhs).distinct()
1975                    }
1976                }
1977                SetOperator::Except => Hir::except(all, lhs, rhs),
1978                SetOperator::Intersect => {
1979                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1980                    // Though we believe that render() does The Right Thing (TM)
1981                    // Also note that we do *not* need another threshold() at the end of the method chain
1982                    // because the right-hand side of the outer union only produces existing records,
1983                    // i.e., the record counts for differential data flow definitely remain non-negative.
1984                    let left_clone = lhs.clone();
1985                    if *all {
1986                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1987                    } else {
1988                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1989                            .distinct()
1990                    }
1991                }
1992            };
1993            let scope = Scope::from_source(
1994                None,
1995                // Column names are taken from the left, as in Postgres.
1996                left_scope.column_names(),
1997            );
1998
1999            Ok((relation_expr, scope))
2000        }
2001        SetExpr::Values(Values(values)) => plan_values(qcx, values),
2002        SetExpr::Table(name) => {
2003            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2004            Ok((expr, scope))
2005        }
2006        SetExpr::Query(query) => {
2007            let (expr, scope) = plan_nested_query(qcx, query)?;
2008            Ok((expr, scope))
2009        }
2010        SetExpr::Show(stmt) => {
2011            // The create SQL definition of involving this query, will have the explicit `SHOW`
2012            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
2013            // current schema of the executing user. When Materialize restarts and tries to re-plan
2014            // these queries, it will only have access to the raw `SHOW` command and have no idea
2015            // what schema to use. As a result Materialize will fail to boot.
2016            //
2017            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2018            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2019            // However, banning show commands in views gives us more flexibility to change their
2020            // output.
2021            //
2022            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2023            // with all show commands expanded into their equivalent SELECT statements.
2024            if !qcx.lifetime.allow_show() {
2025                return Err(PlanError::ShowCommandInView);
2026            }
2027
2028            // Some SHOW statements are a SELECT query. Others produces Rows
2029            // directly. Convert both of these to the needed Hir and Scope.
2030            fn to_hirscope(
2031                plan: ShowCreatePlan,
2032                desc: StatementDesc,
2033            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2034                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2035                let desc = desc.relation_desc.expect("must exist");
2036                let scope = Scope::from_source(None, desc.iter_names());
2037                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2038                Ok((expr, scope))
2039            }
2040
2041            match stmt.clone() {
2042                ShowStatement::ShowColumns(stmt) => {
2043                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2044                }
2045                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2046                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2047                    show::describe_show_create_connection(qcx.scx, stmt)?,
2048                ),
2049                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2050                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2051                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2052                ),
2053                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2054                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2055                    show::describe_show_create_index(qcx.scx, stmt)?,
2056                ),
2057                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2058                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2059                    show::describe_show_create_sink(qcx.scx, stmt)?,
2060                ),
2061                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2062                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2063                    show::describe_show_create_source(qcx.scx, stmt)?,
2064                ),
2065                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2066                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2067                    show::describe_show_create_table(qcx.scx, stmt)?,
2068                ),
2069                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2070                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2071                    show::describe_show_create_view(qcx.scx, stmt)?,
2072                ),
2073                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2074                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2075                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2076                ),
2077                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2078                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2079                    show::describe_show_create_type(qcx.scx, stmt)?,
2080                ),
2081                ShowStatement::ShowObjects(stmt) => {
2082                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2083                }
2084                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2085                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2086            }
2087        }
2088    }
2089}
2090
2091/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2092fn plan_values(
2093    qcx: &QueryContext,
2094    values: &[Vec<Expr<Aug>>],
2095) -> Result<(HirRelationExpr, Scope), PlanError> {
2096    assert!(!values.is_empty());
2097
2098    let ecx = &ExprContext {
2099        qcx,
2100        name: "VALUES",
2101        scope: &Scope::empty(),
2102        relation_type: &SqlRelationType::empty(),
2103        allow_aggregates: false,
2104        allow_subqueries: true,
2105        allow_parameters: true,
2106        allow_windows: false,
2107    };
2108
2109    let ncols = values[0].len();
2110    let nrows = values.len();
2111
2112    // Arrange input expressions by columns, not rows, so that we can
2113    // call `coerce_homogeneous_exprs` on each column.
2114    let mut cols = vec![vec![]; ncols];
2115    for row in values {
2116        if row.len() != ncols {
2117            sql_bail!(
2118                "VALUES expression has varying number of columns: {} vs {}",
2119                row.len(),
2120                ncols
2121            );
2122        }
2123        for (i, v) in row.iter().enumerate() {
2124            cols[i].push(v);
2125        }
2126    }
2127
2128    // Plan each column.
2129    let mut col_iters = Vec::with_capacity(ncols);
2130    let mut col_types = Vec::with_capacity(ncols);
2131    for col in &cols {
2132        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2133        let mut col_type = ecx.column_type(&col[0]);
2134        for val in &col[1..] {
2135            col_type = col_type.union(&ecx.column_type(val))?;
2136        }
2137        col_types.push(col_type);
2138        col_iters.push(col.into_iter());
2139    }
2140
2141    // Build constant relation.
2142    let mut exprs = vec![];
2143    for _ in 0..nrows {
2144        for i in 0..ncols {
2145            exprs.push(col_iters[i].next().unwrap());
2146        }
2147    }
2148    let out = HirRelationExpr::CallTable {
2149        func: TableFunc::Wrap {
2150            width: ncols,
2151            types: col_types,
2152        },
2153        exprs,
2154    };
2155
2156    // Build column names.
2157    let mut scope = Scope::empty();
2158    for i in 0..ncols {
2159        let name = format!("column{}", i + 1);
2160        scope.items.push(ScopeItem::from_column_name(name));
2161    }
2162
2163    Ok((out, scope))
2164}
2165
2166/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2167/// statement.
2168///
2169/// This is special-cased in PostgreSQL and different enough from `plan_values`
2170/// that it is easier to use a separate function entirely. Unlike a normal
2171/// `VALUES` clause, each value is coerced to the type of the target table
2172/// via an assignment cast.
2173///
2174/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2175fn plan_values_insert(
2176    qcx: &QueryContext,
2177    target_names: &[&ColumnName],
2178    target_types: &[&SqlScalarType],
2179    values: &[Vec<Expr<Aug>>],
2180) -> Result<HirRelationExpr, PlanError> {
2181    assert!(!values.is_empty());
2182
2183    if !values.iter().map(|row| row.len()).all_equal() {
2184        sql_bail!("VALUES lists must all be the same length");
2185    }
2186
2187    let ecx = &ExprContext {
2188        qcx,
2189        name: "VALUES",
2190        scope: &Scope::empty(),
2191        relation_type: &SqlRelationType::empty(),
2192        allow_aggregates: false,
2193        allow_subqueries: true,
2194        allow_parameters: true,
2195        allow_windows: false,
2196    };
2197
2198    let mut exprs = vec![];
2199    let mut types = vec![];
2200    for row in values {
2201        if row.len() > target_names.len() {
2202            sql_bail!("INSERT has more expressions than target columns");
2203        }
2204        for (column, val) in row.into_iter().enumerate() {
2205            let target_type = &target_types[column];
2206            let val = plan_expr(ecx, val)?;
2207            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2208            let source_type = &ecx.scalar_type(&val);
2209            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2210                Ok(val) => val,
2211                Err(_) => sql_bail!(
2212                    "column {} is of type {} but expression is of type {}",
2213                    target_names[column].quoted(),
2214                    qcx.humanize_scalar_type(target_type, false),
2215                    qcx.humanize_scalar_type(source_type, false),
2216                ),
2217            };
2218            if column >= types.len() {
2219                types.push(ecx.column_type(&val));
2220            } else {
2221                types[column] = types[column].union(&ecx.column_type(&val))?;
2222            }
2223            exprs.push(val);
2224        }
2225    }
2226
2227    Ok(HirRelationExpr::CallTable {
2228        func: TableFunc::Wrap {
2229            width: values[0].len(),
2230            types,
2231        },
2232        exprs,
2233    })
2234}
2235
2236fn plan_join_identity() -> (HirRelationExpr, Scope) {
2237    let typ = SqlRelationType::new(vec![]);
2238    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2239    let scope = Scope::empty();
2240    (expr, scope)
2241}
2242
2243/// Describes how to execute a SELECT query.
2244///
2245/// `order_by` describes how to order the rows in `expr` *before* applying the
2246/// projection. The `scope` describes the columns in `expr` *after* the
2247/// projection has been applied.
2248#[derive(Debug)]
2249struct SelectPlan {
2250    expr: HirRelationExpr,
2251    scope: Scope,
2252    order_by: Vec<ColumnOrder>,
2253    project: Vec<usize>,
2254}
2255
2256generate_extracted_config!(
2257    SelectOption,
2258    (ExpectedGroupSize, u64),
2259    (AggregateInputGroupSize, u64),
2260    (DistinctOnInputGroupSize, u64),
2261    (LimitInputGroupSize, u64)
2262);
2263
2264/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2265///
2266/// Normally, the ORDER BY clause occurs after the columns specified in the
2267/// SELECT list have been projected. In a query like
2268///
2269///   CREATE TABLE (a int, b int)
2270///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2271///
2272/// it is valid to refer to `a`, because it is explicitly selected, but it would
2273/// not be valid to refer to unselected column `b`.
2274///
2275/// But PostgreSQL extends the standard to permit queries like
2276///
2277///   SELECT a FROM t ORDER BY b
2278///
2279/// where expressions in the ORDER BY clause can refer to *both* input columns
2280/// and output columns.
2281fn plan_select_from_where(
2282    qcx: &QueryContext,
2283    mut s: Select<Aug>,
2284    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2285) -> Result<SelectPlan, PlanError> {
2286    // TODO: Both `s` and `order_by_exprs` are not references because the
2287    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2288    // table function support (the UUID mapping). Attempt to change this so callers
2289    // don't need to clone the Select.
2290
2291    // Extract query options.
2292    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2293    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2294
2295    // Step 1. Handle FROM clause, including joins.
2296    let (mut relation_expr, mut from_scope) =
2297        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2298            let (left, left_scope) = l;
2299            plan_join(
2300                qcx,
2301                left,
2302                left_scope,
2303                &Join {
2304                    relation: TableFactor::NestedJoin {
2305                        join: Box::new(twj.clone()),
2306                        alias: None,
2307                    },
2308                    join_operator: JoinOperator::CrossJoin,
2309                },
2310            )
2311        })?;
2312
2313    // Step 2. Handle WHERE clause.
2314    if let Some(selection) = &s.selection {
2315        let ecx = &ExprContext {
2316            qcx,
2317            name: "WHERE clause",
2318            scope: &from_scope,
2319            relation_type: &qcx.relation_type(&relation_expr),
2320            allow_aggregates: false,
2321            allow_subqueries: true,
2322            allow_parameters: true,
2323            allow_windows: false,
2324        };
2325        let expr = plan_expr(ecx, selection)
2326            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2327            .type_as(ecx, &SqlScalarType::Bool)?;
2328        relation_expr = relation_expr.filter(vec![expr]);
2329    }
2330
2331    // Step 3. Gather aggregates and table functions.
2332    // (But skip window aggregates.)
2333    let (aggregates, table_funcs) = {
2334        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2335        visitor.visit_select_mut(&mut s);
2336        for o in order_by_exprs.iter_mut() {
2337            visitor.visit_order_by_expr_mut(o);
2338        }
2339        visitor.into_result()?
2340    };
2341    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2342    if !table_funcs.is_empty() {
2343        let (expr, scope) = plan_scalar_table_funcs(
2344            qcx,
2345            table_funcs,
2346            &mut table_func_names,
2347            &relation_expr,
2348            &from_scope,
2349        )?;
2350        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2351        from_scope = from_scope.product(scope)?;
2352    }
2353
2354    // Step 4. Expand SELECT clause.
2355    let projection = {
2356        let ecx = &ExprContext {
2357            qcx,
2358            name: "SELECT clause",
2359            scope: &from_scope,
2360            relation_type: &qcx.relation_type(&relation_expr),
2361            allow_aggregates: true,
2362            allow_subqueries: true,
2363            allow_parameters: true,
2364            allow_windows: true,
2365        };
2366        let mut out = vec![];
2367        for si in &s.projection {
2368            if *si == SelectItem::Wildcard && s.from.is_empty() {
2369                sql_bail!("SELECT * with no tables specified is not valid");
2370            }
2371            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2372        }
2373        out
2374    };
2375
2376    // Step 5. Handle GROUP BY clause.
2377    // This will also plan the aggregates gathered in Step 3.
2378    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2379    let (mut group_scope, select_all_mapping) = {
2380        // Compute GROUP BY expressions.
2381        let ecx = &ExprContext {
2382            qcx,
2383            name: "GROUP BY clause",
2384            scope: &from_scope,
2385            relation_type: &qcx.relation_type(&relation_expr),
2386            allow_aggregates: false,
2387            allow_subqueries: true,
2388            allow_parameters: true,
2389            allow_windows: false,
2390        };
2391        let mut group_key = vec![];
2392        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2393        let mut group_hir_exprs = vec![];
2394        let mut group_scope = Scope::empty();
2395        let mut select_all_mapping = BTreeMap::new();
2396
2397        for group_expr in &s.group_by {
2398            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2399            let new_column = group_key.len();
2400
2401            if let Some(group_expr) = group_expr {
2402                // Multiple AST expressions can map to the same HIR expression.
2403                // If we already have a ScopeItem for this HIR, we can add this
2404                // next AST expression to its set
2405                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2406                    existing_scope_item.exprs.insert(group_expr.clone());
2407                    continue;
2408                }
2409            }
2410
2411            let mut scope_item = if let HirScalarExpr::Column(
2412                ColumnRef {
2413                    level: 0,
2414                    column: old_column,
2415                },
2416                _name,
2417            ) = &expr
2418            {
2419                // If we later have `SELECT foo.*` then we have to find all
2420                // the `foo` items in `from_scope` and figure out where they
2421                // ended up in `group_scope`. This is really hard to do
2422                // right using SQL name resolution, so instead we just track
2423                // the movement here.
2424                select_all_mapping.insert(*old_column, new_column);
2425                let scope_item = ecx.scope.items[*old_column].clone();
2426                scope_item
2427            } else {
2428                ScopeItem::empty()
2429            };
2430
2431            if let Some(group_expr) = group_expr.cloned() {
2432                scope_item.exprs.insert(group_expr);
2433            }
2434
2435            group_key.push(from_scope.len() + group_exprs.len());
2436            group_hir_exprs.push(expr.clone());
2437            group_exprs.insert(expr, scope_item);
2438        }
2439
2440        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2441        for expr in &group_hir_exprs {
2442            if let Some(scope_item) = group_exprs.remove(expr) {
2443                group_scope.items.push(scope_item);
2444            }
2445        }
2446
2447        // Plan aggregates.
2448        let ecx = &ExprContext {
2449            qcx,
2450            name: "aggregate function",
2451            scope: &from_scope,
2452            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2453            allow_aggregates: false,
2454            allow_subqueries: true,
2455            allow_parameters: true,
2456            allow_windows: false,
2457        };
2458        let mut agg_exprs = vec![];
2459        for sql_function in aggregates {
2460            if sql_function.over.is_some() {
2461                unreachable!(
2462                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2463                );
2464            }
2465            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2466            group_scope
2467                .items
2468                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2469        }
2470        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2471            // apply GROUP BY / aggregates
2472            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2473                group_key,
2474                agg_exprs,
2475                group_size_hints.aggregate_input_group_size,
2476            );
2477
2478            // For every old column that wasn't a group key, add a scope item
2479            // that errors when referenced. We can't simply drop these items
2480            // from scope. These items need to *exist* because they might shadow
2481            // variables in outer scopes that would otherwise be valid to
2482            // reference, but accessing them needs to produce an error.
2483            for i in 0..from_scope.len() {
2484                if !select_all_mapping.contains_key(&i) {
2485                    let scope_item = &ecx.scope.items[i];
2486                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2487                        table_name: scope_item.table_name.clone(),
2488                        column_name: scope_item.column_name.clone(),
2489                        allow_unqualified_references: scope_item.allow_unqualified_references,
2490                    });
2491                }
2492            }
2493
2494            (group_scope, select_all_mapping)
2495        } else {
2496            // if no GROUP BY, aggregates or having then all columns remain in scope
2497            (
2498                from_scope.clone(),
2499                (0..from_scope.len()).map(|i| (i, i)).collect(),
2500            )
2501        }
2502    };
2503
2504    // Step 6. Handle HAVING clause.
2505    if let Some(ref having) = s.having {
2506        let ecx = &ExprContext {
2507            qcx,
2508            name: "HAVING clause",
2509            scope: &group_scope,
2510            relation_type: &qcx.relation_type(&relation_expr),
2511            allow_aggregates: true,
2512            allow_subqueries: true,
2513            allow_parameters: true,
2514            allow_windows: false,
2515        };
2516        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2517        relation_expr = relation_expr.filter(vec![expr]);
2518    }
2519
2520    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2521    // (This includes window aggregations.)
2522    //
2523    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2524    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2525    //
2526    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2527    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2528    // and can't be part of a bigger expression.
2529    // See https://www.postgresql.org/docs/current/queries-order.html:
2530    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2531    // expression"
2532    let window_funcs = {
2533        let mut visitor = WindowFuncCollector::default();
2534        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2535        // window functions are excluded from other things by `allow_windows` being false when
2536        // planning those before this code).
2537        visitor.visit_select(&s);
2538        for o in order_by_exprs.iter() {
2539            visitor.visit_order_by_expr(o);
2540        }
2541        visitor.into_result()
2542    };
2543    for window_func in window_funcs {
2544        let ecx = &ExprContext {
2545            qcx,
2546            name: "window function",
2547            scope: &group_scope,
2548            relation_type: &qcx.relation_type(&relation_expr),
2549            allow_aggregates: true,
2550            allow_subqueries: true,
2551            allow_parameters: true,
2552            allow_windows: true,
2553        };
2554        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2555        group_scope.items.push(ScopeItem::from_expr(window_func));
2556    }
2557    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2558    // been already planned now. However, we should still set `allow_windows: true` for the
2559    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2560    // msg if an OVER clause is missing from a window function.
2561
2562    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2563    if let Some(ref qualify) = s.qualify {
2564        let ecx = &ExprContext {
2565            qcx,
2566            name: "QUALIFY clause",
2567            scope: &group_scope,
2568            relation_type: &qcx.relation_type(&relation_expr),
2569            allow_aggregates: true,
2570            allow_subqueries: true,
2571            allow_parameters: true,
2572            allow_windows: true,
2573        };
2574        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2575        relation_expr = relation_expr.filter(vec![expr]);
2576    }
2577
2578    // Step 9. Handle SELECT clause.
2579    let output_columns = {
2580        let mut new_exprs = vec![];
2581        let mut new_type = qcx.relation_type(&relation_expr);
2582        let mut output_columns = vec![];
2583        for (select_item, column_name) in &projection {
2584            let ecx = &ExprContext {
2585                qcx,
2586                name: "SELECT clause",
2587                scope: &group_scope,
2588                relation_type: &new_type,
2589                allow_aggregates: true,
2590                allow_subqueries: true,
2591                allow_parameters: true,
2592                allow_windows: true,
2593            };
2594            let expr = match select_item {
2595                ExpandedSelectItem::InputOrdinal(i) => {
2596                    if let Some(column) = select_all_mapping.get(i).copied() {
2597                        HirScalarExpr::column(column)
2598                    } else {
2599                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2600                    }
2601                }
2602                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2603            };
2604            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2605                // Simple column reference; no need to map on a new expression.
2606                output_columns.push((column, column_name));
2607            } else {
2608                // Complicated expression that requires a map expression. We
2609                // update `group_scope` as we go so that future expressions that
2610                // are textually identical to this one can reuse it. This
2611                // duplicate detection is required for proper determination of
2612                // ambiguous column references with SQL92-style `ORDER BY`
2613                // items. See `plan_order_by_or_distinct_expr` for more.
2614                let typ = ecx.column_type(&expr);
2615                new_type.column_types.push(typ);
2616                new_exprs.push(expr);
2617                output_columns.push((group_scope.len(), column_name));
2618                group_scope
2619                    .items
2620                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2621            }
2622        }
2623        relation_expr = relation_expr.map(new_exprs);
2624        output_columns
2625    };
2626    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2627
2628    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2629    let order_by = {
2630        let relation_type = qcx.relation_type(&relation_expr);
2631        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2632            &ExprContext {
2633                qcx,
2634                name: "ORDER BY clause",
2635                scope: &group_scope,
2636                relation_type: &relation_type,
2637                allow_aggregates: true,
2638                allow_subqueries: true,
2639                allow_parameters: true,
2640                allow_windows: true,
2641            },
2642            &order_by_exprs,
2643            &output_columns,
2644        )?;
2645
2646        match s.distinct {
2647            None => relation_expr = relation_expr.map(map_exprs),
2648            Some(Distinct::EntireRow) => {
2649                if relation_type.arity() == 0 {
2650                    sql_bail!("SELECT DISTINCT must have at least one column");
2651                }
2652                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2653                // list, so we can't proceed if `ORDER BY` has introduced any
2654                // columns for arbitrary expressions. This matches PostgreSQL.
2655                if !try_push_projection_order_by(
2656                    &mut relation_expr,
2657                    &mut project_key,
2658                    &mut order_by,
2659                ) {
2660                    sql_bail!(
2661                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2662                    );
2663                }
2664                assert!(map_exprs.is_empty());
2665                relation_expr = relation_expr.distinct();
2666            }
2667            Some(Distinct::On(exprs)) => {
2668                let ecx = &ExprContext {
2669                    qcx,
2670                    name: "DISTINCT ON clause",
2671                    scope: &group_scope,
2672                    relation_type: &qcx.relation_type(&relation_expr),
2673                    allow_aggregates: true,
2674                    allow_subqueries: true,
2675                    allow_parameters: true,
2676                    allow_windows: true,
2677                };
2678
2679                let mut distinct_exprs = vec![];
2680                for expr in &exprs {
2681                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2682                    distinct_exprs.push(expr);
2683                }
2684
2685                let mut distinct_key = vec![];
2686
2687                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2688                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2689                // expressions, though the order of `DISTINCT ON` expressions
2690                // does not matter. This matches PostgreSQL and leaves the door
2691                // open to a future optimization where the `DISTINCT ON` and
2692                // `ORDER BY` operations happen in one pass.
2693                //
2694                // On the bright side, any columns that have already been
2695                // computed by `ORDER BY` can be reused in the distinct key.
2696                let arity = relation_type.arity();
2697                for ord in order_by.iter().take(distinct_exprs.len()) {
2698                    // The unusual construction of `expr` here is to ensure the
2699                    // temporary column expression lives long enough.
2700                    let mut expr = &HirScalarExpr::column(ord.column);
2701                    if ord.column >= arity {
2702                        expr = &map_exprs[ord.column - arity];
2703                    };
2704                    match distinct_exprs.iter().position(move |e| e == expr) {
2705                        None => sql_bail!(
2706                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2707                        ),
2708                        Some(pos) => {
2709                            distinct_exprs.remove(pos);
2710                        }
2711                    }
2712                    distinct_key.push(ord.column);
2713                }
2714
2715                // Add any remaining `DISTINCT ON` expressions to the key.
2716                for expr in distinct_exprs {
2717                    // If the expression is a reference to an existing column,
2718                    // do not introduce a new column to support it.
2719                    let column = match expr {
2720                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2721                        _ => {
2722                            map_exprs.push(expr);
2723                            arity + map_exprs.len() - 1
2724                        }
2725                    };
2726                    distinct_key.push(column);
2727                }
2728
2729                // `DISTINCT ON` is semantically a TopK with limit 1. The
2730                // columns in `ORDER BY` that are not part of the distinct key,
2731                // if there are any, determine the ordering within each group,
2732                // per PostgreSQL semantics.
2733                let distinct_len = distinct_key.len();
2734                relation_expr = HirRelationExpr::top_k(
2735                    relation_expr.map(map_exprs),
2736                    distinct_key,
2737                    order_by.iter().skip(distinct_len).cloned().collect(),
2738                    Some(HirScalarExpr::literal(
2739                        Datum::Int64(1),
2740                        SqlScalarType::Int64,
2741                    )),
2742                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2743                    group_size_hints.distinct_on_input_group_size,
2744                );
2745            }
2746        }
2747
2748        order_by
2749    };
2750
2751    // Construct a clean scope to expose outwards, where all of the state that
2752    // accumulated in the scope during planning of this SELECT is erased. The
2753    // clean scope has at most one name for each column, and the names are not
2754    // associated with any table.
2755    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2756
2757    Ok(SelectPlan {
2758        expr: relation_expr,
2759        scope,
2760        order_by,
2761        project: project_key,
2762    })
2763}
2764
2765fn plan_scalar_table_funcs(
2766    qcx: &QueryContext,
2767    table_funcs: BTreeMap<Function<Aug>, String>,
2768    table_func_names: &mut BTreeMap<String, Ident>,
2769    relation_expr: &HirRelationExpr,
2770    from_scope: &Scope,
2771) -> Result<(HirRelationExpr, Scope), PlanError> {
2772    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2773
2774    for (table_func, id) in table_funcs.iter() {
2775        table_func_names.insert(
2776            id.clone(),
2777            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2778            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2779        );
2780    }
2781    // If there's only a single table function, we can skip generating
2782    // ordinality columns.
2783    if table_funcs.len() == 1 {
2784        let (table_func, id) = table_funcs.iter().next().unwrap();
2785        let (expr, mut scope) =
2786            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2787
2788        // A single table-function might return several columns as a record
2789        let num_cols = scope.len();
2790        for i in 0..scope.len() {
2791            scope.items[i].table_name = Some(PartialItemName {
2792                database: None,
2793                schema: None,
2794                item: id.clone(),
2795            });
2796            scope.items[i].from_single_column_function = num_cols == 1;
2797            scope.items[i].allow_unqualified_references = false;
2798        }
2799        return Ok((expr, scope));
2800    }
2801    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2802    let (expr, mut scope, num_cols) =
2803        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2804
2805    // Munge the scope so table names match with the generated ids.
2806    let mut i = 0;
2807    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2808        for _ in 0..num_cols {
2809            scope.items[i].table_name = Some(PartialItemName {
2810                database: None,
2811                schema: None,
2812                item: id.clone(),
2813            });
2814            scope.items[i].from_single_column_function = num_cols == 1;
2815            scope.items[i].allow_unqualified_references = false;
2816            i += 1;
2817        }
2818        // Ordinality column. This doubles as the
2819        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2820        // because it only needs to be NULL or not.
2821        scope.items[i].table_name = Some(PartialItemName {
2822            database: None,
2823            schema: None,
2824            item: id.clone(),
2825        });
2826        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2827        scope.items[i].allow_unqualified_references = false;
2828        i += 1;
2829    }
2830    // Coalesced ordinality column.
2831    scope.items[i].allow_unqualified_references = false;
2832    Ok((expr, scope))
2833}
2834
2835/// Plans an expression in a `GROUP BY` clause.
2836///
2837/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2838/// names/expressions defined in the `SELECT` clause. These special cases are
2839/// handled by this function; see comments within the implementation for
2840/// details.
2841fn plan_group_by_expr<'a>(
2842    ecx: &ExprContext,
2843    group_expr: &'a Expr<Aug>,
2844    projection: &'a [(ExpandedSelectItem, ColumnName)],
2845) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2846    let plan_projection = |column: usize| match &projection[column].0 {
2847        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2848        ExpandedSelectItem::Expr(expr) => {
2849            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2850        }
2851    };
2852
2853    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2854    // a special case that means to use the ith item in the SELECT clause.
2855    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2856        return plan_projection(column);
2857    }
2858
2859    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2860    // The `foo` can refer to *either* an input column or an output column. If
2861    // both exist, the input column is preferred.
2862    match group_expr {
2863        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2864            Err(PlanError::UnknownColumn {
2865                table: None,
2866                column,
2867                similar,
2868            }) => {
2869                // The expression was a simple identifier that did not match an
2870                // input column. See if it matches an output column.
2871                let mut iter = projection.iter().map(|(_expr, name)| name);
2872                if let Some(i) = iter.position(|n| *n == column) {
2873                    if iter.any(|n| *n == column) {
2874                        Err(PlanError::AmbiguousColumn(column))
2875                    } else {
2876                        plan_projection(i)
2877                    }
2878                } else {
2879                    // The name didn't match an output column either. Return the
2880                    // "unknown column" error.
2881                    Err(PlanError::UnknownColumn {
2882                        table: None,
2883                        column,
2884                        similar,
2885                    })
2886                }
2887            }
2888            res => Ok((Some(group_expr), res?)),
2889        },
2890        _ => Ok((
2891            Some(group_expr),
2892            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2893        )),
2894    }
2895}
2896
2897/// Plans a slice of `ORDER BY` expressions.
2898///
2899/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2900/// parameter.
2901///
2902/// Returns the determined column orderings and a list of scalar expressions
2903/// that must be mapped onto the underlying relation expression.
2904pub(crate) fn plan_order_by_exprs(
2905    ecx: &ExprContext,
2906    order_by_exprs: &[OrderByExpr<Aug>],
2907    output_columns: &[(usize, &ColumnName)],
2908) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2909    let mut order_by = vec![];
2910    let mut map_exprs = vec![];
2911    for obe in order_by_exprs {
2912        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2913        // If the expression is a reference to an existing column,
2914        // do not introduce a new column to support it.
2915        let column = match expr {
2916            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2917            _ => {
2918                map_exprs.push(expr);
2919                ecx.relation_type.arity() + map_exprs.len() - 1
2920            }
2921        };
2922        order_by.push(resolve_desc_and_nulls_last(obe, column));
2923    }
2924    Ok((order_by, map_exprs))
2925}
2926
2927/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2928///
2929/// The `output_columns` parameter describes, in order, the physical index and
2930/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2931/// corresponds to a `SELECT` list with a single entry named "a" that can be
2932/// found at index 3 in the underlying relation expression.
2933///
2934/// There are three cases to handle.
2935///
2936///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2937///       reference to the specified output column.
2938///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2939///       an output column, if it exists; otherwise it is a reference to an
2940///       input column.
2941///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2942///       arbitrary expressions exclusively refer to input columns, never output
2943///       columns.
2944fn plan_order_by_or_distinct_expr(
2945    ecx: &ExprContext,
2946    expr: &Expr<Aug>,
2947    output_columns: &[(usize, &ColumnName)],
2948) -> Result<HirScalarExpr, PlanError> {
2949    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2950        return Ok(HirScalarExpr::column(output_columns[i].0));
2951    }
2952
2953    if let Expr::Identifier(names) = expr {
2954        if let [name] = &names[..] {
2955            let name = normalize::column_name(name.clone());
2956            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2957            if let Some((i, _)) = iter.next() {
2958                match iter.next() {
2959                    // Per SQL92, names are not considered ambiguous if they
2960                    // refer to identical target list expressions, as in
2961                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2962                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2963                    _ => return Ok(HirScalarExpr::column(*i)),
2964                }
2965            }
2966        }
2967    }
2968
2969    plan_expr(ecx, expr)?.type_as_any(ecx)
2970}
2971
2972fn plan_table_with_joins(
2973    qcx: &QueryContext,
2974    table_with_joins: &TableWithJoins<Aug>,
2975) -> Result<(HirRelationExpr, Scope), PlanError> {
2976    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2977    for join in &table_with_joins.joins {
2978        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2979        expr = new_expr;
2980        scope = new_scope;
2981    }
2982    Ok((expr, scope))
2983}
2984
2985fn plan_table_factor(
2986    qcx: &QueryContext,
2987    table_factor: &TableFactor<Aug>,
2988) -> Result<(HirRelationExpr, Scope), PlanError> {
2989    match table_factor {
2990        TableFactor::Table { name, alias } => {
2991            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2992            let scope = plan_table_alias(scope, alias.as_ref())?;
2993            Ok((expr, scope))
2994        }
2995
2996        TableFactor::Function {
2997            function,
2998            alias,
2999            with_ordinality,
3000        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
3001
3002        TableFactor::RowsFrom {
3003            functions,
3004            alias,
3005            with_ordinality,
3006        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3007
3008        TableFactor::Derived {
3009            lateral,
3010            subquery,
3011            alias,
3012        } => {
3013            let mut qcx = (*qcx).clone();
3014            if !lateral {
3015                // Since this derived table was not marked as `LATERAL`,
3016                // make elements in outer scopes invisible until we reach the
3017                // next lateral barrier.
3018                for scope in &mut qcx.outer_scopes {
3019                    if scope.lateral_barrier {
3020                        break;
3021                    }
3022                    scope.items.clear();
3023                }
3024            }
3025            qcx.outer_scopes[0].lateral_barrier = true;
3026            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3027            let scope = plan_table_alias(scope, alias.as_ref())?;
3028            Ok((expr, scope))
3029        }
3030
3031        TableFactor::NestedJoin { join, alias } => {
3032            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3033            let scope = plan_table_alias(scope, alias.as_ref())?;
3034            Ok((expr, scope))
3035        }
3036    }
3037}
3038
3039/// Plans a `ROWS FROM` expression.
3040///
3041/// `ROWS FROM` concatenates table functions into a single table, filling in
3042/// `NULL`s in places where one table function has fewer rows than another. We
3043/// can achieve this by augmenting each table function with a row number, doing
3044/// a `FULL JOIN` between each table function on the row number and eventually
3045/// projecting away the row number columns. Concretely, the following query
3046/// using `ROWS FROM`
3047///
3048/// ```sql
3049/// SELECT
3050///     *
3051/// FROM
3052///     ROWS FROM (
3053///         generate_series(1, 2),
3054///         information_schema._pg_expandarray(ARRAY[9]),
3055///         generate_series(3, 6)
3056///     );
3057/// ```
3058///
3059/// is equivalent to the following query that does not use `ROWS FROM`:
3060///
3061/// ```sql
3062/// SELECT
3063///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3064/// FROM
3065///     generate_series(1, 2) WITH ORDINALITY AS gs1
3066///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3067///         ON gs1.ordinality = expand.ordinality
3068///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3069///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3070/// ```
3071///
3072/// Note the call to `coalesce` in the last join condition, which ensures that
3073/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3074///
3075/// This function creates a HirRelationExpr that follows the structure of the
3076/// latter query.
3077///
3078/// `with_ordinality` can be used to have the output expression contain a
3079/// single coalesced ordinality column at the end of the entire expression.
3080fn plan_rows_from(
3081    qcx: &QueryContext,
3082    functions: &[Function<Aug>],
3083    alias: Option<&TableAlias>,
3084    with_ordinality: bool,
3085) -> Result<(HirRelationExpr, Scope), PlanError> {
3086    // If there's only a single table function, planning proceeds as if `ROWS
3087    // FROM` hadn't been written at all.
3088    if let [function] = functions {
3089        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3090    }
3091
3092    // Per PostgreSQL, all scope items take the name of the first function
3093    // (unless aliased).
3094    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3095    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3096        qcx,
3097        functions,
3098        Some(functions[0].name.full_item_name().clone()),
3099    )?;
3100
3101    // Columns tracks the set of columns we will keep in the projection.
3102    let mut columns = Vec::new();
3103    let mut offset = 0;
3104    // Retain table function's non-ordinality columns.
3105    for (idx, cols) in num_cols.into_iter().enumerate() {
3106        for i in 0..cols {
3107            columns.push(offset + i);
3108        }
3109        offset += cols + 1;
3110
3111        // Remove the ordinality column from the scope, accounting for previous scope
3112        // changes from this loop.
3113        scope.items.remove(offset - idx - 1);
3114    }
3115
3116    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3117    // column. Otherwise remove it from the scope.
3118    if with_ordinality {
3119        columns.push(offset);
3120    } else {
3121        scope.items.pop();
3122    }
3123
3124    let expr = expr.project(columns);
3125
3126    let scope = plan_table_alias(scope, alias)?;
3127    Ok((expr, scope))
3128}
3129
3130/// Plans an expression coalescing multiple table functions. Each table
3131/// function is followed by its row ordinality. The entire expression is
3132/// followed by the coalesced row ordinality.
3133///
3134/// The returned Scope will set all item's table_name's to the `table_name`
3135/// parameter if it is `Some`. If `None`, they will be the name of each table
3136/// function.
3137///
3138/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3139/// each table function.
3140///
3141/// For example, with table functions tf1 returning 1 column (a) and tf2
3142/// returning 2 columns (b, c), this function will return an expr 6 columns:
3143///
3144/// - tf1.a
3145/// - tf1.ordinality
3146/// - tf2.b
3147/// - tf2.c
3148/// - tf2.ordinality
3149/// - coalesced_ordinality
3150///
3151/// And a `Vec<usize>` of `[1, 2]`.
3152fn plan_rows_from_internal<'a>(
3153    qcx: &QueryContext,
3154    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3155    table_name: Option<FullItemName>,
3156) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3157    let mut functions = functions.into_iter();
3158    let mut num_cols = Vec::new();
3159
3160    // Join together each of the table functions in turn. The last column is
3161    // always the column to join against and is maintained to be the coalescence
3162    // of the row number column for all prior functions.
3163    let (mut left_expr, mut left_scope) =
3164        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3165    num_cols.push(left_scope.len() - 1);
3166    // Create the coalesced ordinality column.
3167    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3168    left_scope
3169        .items
3170        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3171
3172    for function in functions {
3173        // The right hand side of a join must be planned in a new scope.
3174        let qcx = qcx.empty_derived_context();
3175        let (right_expr, mut right_scope) =
3176            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3177        num_cols.push(right_scope.len() - 1);
3178        let left_col = left_scope.len() - 1;
3179        let right_col = left_scope.len() + right_scope.len() - 1;
3180        let on = HirScalarExpr::call_binary(
3181            HirScalarExpr::column(left_col),
3182            HirScalarExpr::column(right_col),
3183            expr_func::Eq,
3184        );
3185        left_expr = left_expr
3186            .join(right_expr, on, JoinKind::FullOuter)
3187            .map(vec![HirScalarExpr::call_variadic(
3188                VariadicFunc::Coalesce,
3189                vec![
3190                    HirScalarExpr::column(left_col),
3191                    HirScalarExpr::column(right_col),
3192                ],
3193            )]);
3194
3195        // Project off the previous iteration's coalesced column, but keep both of this
3196        // iteration's ordinality columns.
3197        left_expr = left_expr.project(
3198            (0..left_col) // non-coalesced ordinality columns from left function
3199                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3200                .collect(),
3201        );
3202        // Move the coalesced ordinality column.
3203        right_scope.items.push(left_scope.items.pop().unwrap());
3204
3205        left_scope.items.extend(right_scope.items);
3206    }
3207
3208    Ok((left_expr, left_scope, num_cols))
3209}
3210
3211/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3212/// FROM` clause that contains other table functions. Special aliasing rules
3213/// apply.
3214fn plan_solitary_table_function(
3215    qcx: &QueryContext,
3216    function: &Function<Aug>,
3217    alias: Option<&TableAlias>,
3218    with_ordinality: bool,
3219) -> Result<(HirRelationExpr, Scope), PlanError> {
3220    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3221
3222    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3223    if single_column_function {
3224        let item = &mut scope.items[0];
3225
3226        // Mark that the function only produced a single column. This impacts
3227        // whole-row references.
3228        item.from_single_column_function = true;
3229
3230        // Strange special case for solitary table functions that output one
3231        // column whose name matches the name of the table function. If a table
3232        // alias is provided, the column name is changed to the table alias's
3233        // name. Concretely, the following query returns a column named `x`
3234        // rather than a column named `generate_series`:
3235        //
3236        //     SELECT * FROM generate_series(1, 5) AS x
3237        //
3238        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3239        // since its output column is explicitly named `value`, not
3240        // `jsonb_array_elements`.
3241        //
3242        // Note also that we may (correctly) change the column name again when
3243        // we plan the table alias below if the `alias.columns` is non-empty.
3244        if let Some(alias) = alias {
3245            if let ScopeItem {
3246                table_name: Some(table_name),
3247                column_name,
3248                ..
3249            } = item
3250            {
3251                if table_name.item.as_str() == column_name.as_str() {
3252                    *column_name = normalize::column_name(alias.name.clone());
3253                }
3254            }
3255        }
3256    }
3257
3258    let scope = plan_table_alias(scope, alias)?;
3259    Ok((expr, scope))
3260}
3261
3262/// Plans a table function.
3263///
3264/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3265/// instead to get the appropriate aliasing behavior.
3266fn plan_table_function_internal(
3267    qcx: &QueryContext,
3268    Function {
3269        name,
3270        args,
3271        filter,
3272        over,
3273        distinct,
3274    }: &Function<Aug>,
3275    with_ordinality: bool,
3276    table_name: Option<FullItemName>,
3277) -> Result<(HirRelationExpr, Scope), PlanError> {
3278    assert_none!(filter, "cannot parse table function with FILTER");
3279    assert_none!(over, "cannot parse table function with OVER");
3280    assert!(!*distinct, "cannot parse table function with DISTINCT");
3281
3282    let ecx = &ExprContext {
3283        qcx,
3284        name: "table function arguments",
3285        scope: &Scope::empty(),
3286        relation_type: &SqlRelationType::empty(),
3287        allow_aggregates: false,
3288        allow_subqueries: true,
3289        allow_parameters: true,
3290        allow_windows: false,
3291    };
3292
3293    let scalar_args = match args {
3294        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3295        FunctionArgs::Args { args, order_by } => {
3296            if !order_by.is_empty() {
3297                sql_bail!(
3298                    "ORDER BY specified, but {} is not an aggregate function",
3299                    name
3300                );
3301            }
3302            plan_exprs(ecx, args)?
3303        }
3304    };
3305
3306    let table_name = match table_name {
3307        Some(table_name) => table_name.item,
3308        None => name.full_item_name().item.clone(),
3309    };
3310
3311    let scope_name = Some(PartialItemName {
3312        database: None,
3313        schema: None,
3314        item: table_name,
3315    });
3316
3317    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3318        Func::Table(impls) => {
3319            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3320            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3321            let expr = match tf.imp {
3322                TableFuncImpl::CallTable { mut func, exprs } => {
3323                    if with_ordinality {
3324                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3325                            PlanError::Unsupported {
3326                                feature: format!("WITH ORDINALITY on {}", func),
3327                                discussion_no: None,
3328                            },
3329                        )?;
3330                    }
3331                    HirRelationExpr::CallTable { func, exprs }
3332                }
3333                TableFuncImpl::Expr(expr) => {
3334                    if !with_ordinality {
3335                        expr
3336                    } else {
3337                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3338                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3339                        // back to the legacy implementation or error out the query.
3340                        if qcx
3341                            .scx
3342                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3343                        {
3344                            // Note that this can give an incorrect ordering, and also has an extreme
3345                            // performance problem in some cases. See the doc comment of
3346                            // `TableFuncImpl`.
3347                            tracing::error!(
3348                                %name,
3349                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3350                            );
3351                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3352                                func: WindowExprType::Scalar(ScalarWindowExpr {
3353                                    func: ScalarWindowFunc::RowNumber,
3354                                    order_by: vec![],
3355                                }),
3356                                partition_by: vec![],
3357                                order_by: vec![],
3358                            })])
3359                        } else {
3360                            bail_unsupported!(format!(
3361                                "WITH ORDINALITY or ROWS FROM with {}",
3362                                name
3363                            ));
3364                        }
3365                    }
3366                }
3367            };
3368            (expr, scope)
3369        }
3370        Func::Scalar(impls) => {
3371            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3372            let output = expr.typ(
3373                &qcx.outer_relation_types,
3374                &SqlRelationType::new(vec![]),
3375                &qcx.scx.param_types.borrow(),
3376            );
3377
3378            let relation = SqlRelationType::new(vec![output]);
3379
3380            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3381            let column_name = normalize::column_name(function_ident);
3382            let name = column_name.to_string();
3383
3384            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3385
3386            let mut func = TableFunc::TabletizedScalar { relation, name };
3387            if with_ordinality {
3388                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3389                    feature: format!("WITH ORDINALITY on {}", func),
3390                    discussion_no: None,
3391                })?;
3392            }
3393            (
3394                HirRelationExpr::CallTable {
3395                    func,
3396                    exprs: vec![expr],
3397                },
3398                scope,
3399            )
3400        }
3401        o => sql_bail!(
3402            "{} functions are not supported in functions in FROM",
3403            o.class()
3404        ),
3405    };
3406
3407    if with_ordinality {
3408        scope
3409            .items
3410            .push(ScopeItem::from_name(scope_name, "ordinality"));
3411    }
3412
3413    Ok((expr, scope))
3414}
3415
3416fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3417    if let Some(TableAlias {
3418        name,
3419        columns,
3420        strict,
3421    }) = alias
3422    {
3423        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3424            sql_bail!(
3425                "{} has {} columns available but {} columns specified",
3426                name,
3427                scope.items.len(),
3428                columns.len()
3429            );
3430        }
3431
3432        let table_name = normalize::ident(name.to_owned());
3433        for (i, item) in scope.items.iter_mut().enumerate() {
3434            item.table_name = if item.allow_unqualified_references {
3435                Some(PartialItemName {
3436                    database: None,
3437                    schema: None,
3438                    item: table_name.clone(),
3439                })
3440            } else {
3441                // Columns that prohibit unqualified references are special
3442                // columns from the output of a NATURAL or USING join that can
3443                // only be referenced by their full, pre-join name. Applying an
3444                // alias to the output of that join renders those columns
3445                // inaccessible, which we accomplish here by setting the
3446                // table name to `None`.
3447                //
3448                // Concretely, consider:
3449                //
3450                //      CREATE TABLE t1 (a int);
3451                //      CREATE TABLE t2 (a int);
3452                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3453                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3454                //
3455                // In (1), the join has no alias. The underlying columns from
3456                // either side of the join can be referenced as `t1.a` and
3457                // `t2.a`, respectively, and the unqualified name `a` refers to
3458                // a column whose value is `coalesce(t1.a, t2.a)`.
3459                //
3460                // In (2), the join is aliased as `t`. The columns from either
3461                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3462                // the coalesced column can be named as either `a` or `t.a`.
3463                //
3464                // We previously had a bug [0] that mishandled this subtle
3465                // logic.
3466                //
3467                // NOTE(benesch): We could in theory choose to project away
3468                // those inaccessible columns and drop them from the scope
3469                // entirely, but that would require that this function also
3470                // take and return the `HirRelationExpr` that is being aliased,
3471                // which is a rather large refactor.
3472                //
3473                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3474                None
3475            };
3476            item.column_name = columns
3477                .get(i)
3478                .map(|a| normalize::column_name(a.clone()))
3479                .unwrap_or_else(|| item.column_name.clone());
3480        }
3481    }
3482    Ok(scope)
3483}
3484
3485// `table_func_names` is a mapping from a UUID to the original function
3486// name. The UUIDs are identifiers that have been rewritten from some table
3487// function expression, and this mapping restores the original names.
3488fn invent_column_name(
3489    ecx: &ExprContext,
3490    expr: &Expr<Aug>,
3491    table_func_names: &BTreeMap<String, Ident>,
3492) -> Option<ColumnName> {
3493    // We follow PostgreSQL exactly here, which has some complicated rules
3494    // around "high" and "low" quality names. Low quality names override other
3495    // low quality names but not high quality names.
3496    //
3497    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3498
3499    #[derive(Debug)]
3500    enum NameQuality {
3501        Low,
3502        High,
3503    }
3504
3505    fn invent(
3506        ecx: &ExprContext,
3507        expr: &Expr<Aug>,
3508        table_func_names: &BTreeMap<String, Ident>,
3509    ) -> Option<(ColumnName, NameQuality)> {
3510        match expr {
3511            Expr::Identifier(names) => {
3512                if let [name] = names.as_slice() {
3513                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3514                        return Some((
3515                            normalize::column_name(table_func_name.clone()),
3516                            NameQuality::High,
3517                        ));
3518                    }
3519                }
3520                names
3521                    .last()
3522                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3523            }
3524            Expr::Value(v) => match v {
3525                // Per PostgreSQL, `bool` and `interval` literals take on the name
3526                // of their type, but not other literal types.
3527                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3528                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3529                _ => None,
3530            },
3531            Expr::Function(func) => {
3532                let (schema, item) = match &func.name {
3533                    ResolvedItemName::Item {
3534                        qualifiers,
3535                        full_name,
3536                        ..
3537                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3538                    _ => unreachable!(),
3539                };
3540
3541                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3542                    || schema
3543                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3544                {
3545                    None
3546                } else {
3547                    Some((item.into(), NameQuality::High))
3548                }
3549            }
3550            Expr::HomogenizingFunction { function, .. } => Some((
3551                function.to_string().to_lowercase().into(),
3552                NameQuality::High,
3553            )),
3554            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3555            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3556            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3557            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3558            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3559                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3560                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3561            },
3562            Expr::Case { else_result, .. } => {
3563                match else_result
3564                    .as_ref()
3565                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3566                {
3567                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3568                    _ => Some(("case".into(), NameQuality::Low)),
3569                }
3570            }
3571            Expr::FieldAccess { field, .. } => {
3572                Some((normalize::column_name(field.clone()), NameQuality::High))
3573            }
3574            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3575            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3576            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3577                // A bit silly to have to plan the query here just to get its column
3578                // name, since we throw away the planned expression, but fixing this
3579                // requires a separate semantic analysis phase.
3580                let (_expr, scope) =
3581                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3582                scope
3583                    .items
3584                    .first()
3585                    .map(|name| (name.column_name.clone(), NameQuality::High))
3586            }
3587            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3588            _ => None,
3589        }
3590    }
3591
3592    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3593}
3594
3595#[derive(Debug)]
3596enum ExpandedSelectItem<'a> {
3597    InputOrdinal(usize),
3598    Expr(Cow<'a, Expr<Aug>>),
3599}
3600
3601impl ExpandedSelectItem<'_> {
3602    fn as_expr(&self) -> Option<&Expr<Aug>> {
3603        match self {
3604            ExpandedSelectItem::InputOrdinal(_) => None,
3605            ExpandedSelectItem::Expr(expr) => Some(expr),
3606        }
3607    }
3608}
3609
3610fn expand_select_item<'a>(
3611    ecx: &ExprContext,
3612    s: &'a SelectItem<Aug>,
3613    table_func_names: &BTreeMap<String, Ident>,
3614) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3615    match s {
3616        SelectItem::Expr {
3617            expr: Expr::QualifiedWildcard(table_name),
3618            alias: _,
3619        } => {
3620            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3621            let table_name =
3622                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3623            let out: Vec<_> = ecx
3624                .scope
3625                .items
3626                .iter()
3627                .enumerate()
3628                .filter(|(_i, item)| item.is_from_table(&table_name))
3629                .map(|(i, item)| {
3630                    let name = item.column_name.clone();
3631                    (ExpandedSelectItem::InputOrdinal(i), name)
3632                })
3633                .collect();
3634            if out.is_empty() {
3635                sql_bail!("no table named '{}' in scope", table_name);
3636            }
3637            Ok(out)
3638        }
3639        SelectItem::Expr {
3640            expr: Expr::WildcardAccess(sql_expr),
3641            alias: _,
3642        } => {
3643            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3644            // A bit silly to have to plan the expression here just to get its
3645            // type, since we throw away the planned expression, but fixing this
3646            // requires a separate semantic analysis phase. Luckily this is an
3647            // uncommon operation and the PostgreSQL docs have a warning that
3648            // this operation is slow in Postgres too.
3649            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3650            let fields = match ecx.scalar_type(&expr) {
3651                SqlScalarType::Record { fields, .. } => fields,
3652                ty => sql_bail!(
3653                    "type {} is not composite",
3654                    ecx.humanize_scalar_type(&ty, false)
3655                ),
3656            };
3657            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3658            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3659                if let [name] = ident.as_slice() {
3660                    if let Ok(items) = ecx.scope.items_from_table(
3661                        &[],
3662                        &PartialItemName {
3663                            database: None,
3664                            schema: None,
3665                            item: name.as_str().to_string(),
3666                        },
3667                    ) {
3668                        for (_, item) in items {
3669                            if item
3670                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3671                            {
3672                                skip_cols.insert(item.column_name.clone());
3673                            }
3674                        }
3675                    }
3676                }
3677            }
3678            let items = fields
3679                .iter()
3680                .filter_map(|(name, _ty)| {
3681                    if skip_cols.contains(name) {
3682                        None
3683                    } else {
3684                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3685                            expr: sql_expr.clone(),
3686                            field: name.clone().into(),
3687                        }));
3688                        Some((item, name.clone()))
3689                    }
3690                })
3691                .collect();
3692            Ok(items)
3693        }
3694        SelectItem::Wildcard => {
3695            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3696            let items: Vec<_> = ecx
3697                .scope
3698                .items
3699                .iter()
3700                .enumerate()
3701                .filter(|(_i, item)| item.allow_unqualified_references)
3702                .map(|(i, item)| {
3703                    let name = item.column_name.clone();
3704                    (ExpandedSelectItem::InputOrdinal(i), name)
3705                })
3706                .collect();
3707
3708            Ok(items)
3709        }
3710        SelectItem::Expr { expr, alias } => {
3711            let name = alias
3712                .clone()
3713                .map(normalize::column_name)
3714                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3715                .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into());
3716            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3717        }
3718    }
3719}
3720
3721fn plan_join(
3722    left_qcx: &QueryContext,
3723    left: HirRelationExpr,
3724    left_scope: Scope,
3725    join: &Join<Aug>,
3726) -> Result<(HirRelationExpr, Scope), PlanError> {
3727    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3728    let (kind, constraint) = match &join.join_operator {
3729        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3730        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3731        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3732        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3733        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3734    };
3735
3736    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3737    if !kind.can_be_correlated() {
3738        for item in &mut right_qcx.outer_scopes[0].items {
3739            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3740            // these items from scope. These items need to *exist* because they
3741            // might shadow variables in outer scopes that would otherwise be
3742            // valid to reference, but accessing them needs to produce an error.
3743            item.error_if_referenced =
3744                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3745                    table: table.cloned(),
3746                    column: column.clone(),
3747                });
3748        }
3749    }
3750    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3751
3752    let (expr, scope) = match constraint {
3753        JoinConstraint::On(expr) => {
3754            let product_scope = left_scope.product(right_scope)?;
3755            let ecx = &ExprContext {
3756                qcx: left_qcx,
3757                name: "ON clause",
3758                scope: &product_scope,
3759                relation_type: &SqlRelationType::new(
3760                    left_qcx
3761                        .relation_type(&left)
3762                        .column_types
3763                        .into_iter()
3764                        .chain(right_qcx.relation_type(&right).column_types)
3765                        .collect(),
3766                ),
3767                allow_aggregates: false,
3768                allow_subqueries: true,
3769                allow_parameters: true,
3770                allow_windows: false,
3771            };
3772            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3773            let joined = left.join(right, on, kind);
3774            (joined, product_scope)
3775        }
3776        JoinConstraint::Using { columns, alias } => {
3777            let column_names = columns
3778                .iter()
3779                .map(|ident| normalize::column_name(ident.clone()))
3780                .collect::<Vec<_>>();
3781
3782            plan_using_constraint(
3783                &column_names,
3784                left_qcx,
3785                left,
3786                left_scope,
3787                &right_qcx,
3788                right,
3789                right_scope,
3790                kind,
3791                alias.as_ref(),
3792            )?
3793        }
3794        JoinConstraint::Natural => {
3795            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3796            // have the same scx. However, it doesn't hurt to be safe.
3797            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3798            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3799            let left_column_names = left_scope.column_names();
3800            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3801            let column_names: Vec<_> = left_column_names
3802                .filter(|col| right_column_names.contains(col))
3803                .cloned()
3804                .collect();
3805            plan_using_constraint(
3806                &column_names,
3807                left_qcx,
3808                left,
3809                left_scope,
3810                &right_qcx,
3811                right,
3812                right_scope,
3813                kind,
3814                None,
3815            )?
3816        }
3817    };
3818    Ok((expr, scope))
3819}
3820
3821// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3822#[allow(clippy::too_many_arguments)]
3823fn plan_using_constraint(
3824    column_names: &[ColumnName],
3825    left_qcx: &QueryContext,
3826    left: HirRelationExpr,
3827    left_scope: Scope,
3828    right_qcx: &QueryContext,
3829    right: HirRelationExpr,
3830    right_scope: Scope,
3831    kind: JoinKind,
3832    alias: Option<&Ident>,
3833) -> Result<(HirRelationExpr, Scope), PlanError> {
3834    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3835
3836    // Cargo culting PG here; no discernable reason this must fail, but PG does
3837    // so we do, as well.
3838    let mut unique_column_names = BTreeSet::new();
3839    for c in column_names {
3840        if !unique_column_names.insert(c) {
3841            return Err(PlanError::Unsupported {
3842                feature: format!(
3843                    "column name {} appears more than once in USING clause",
3844                    c.quoted()
3845                ),
3846                discussion_no: None,
3847            });
3848        }
3849    }
3850
3851    let alias_item_name = alias.map(|alias| PartialItemName {
3852        database: None,
3853        schema: None,
3854        item: alias.clone().to_string(),
3855    });
3856
3857    if let Some(alias_item_name) = &alias_item_name {
3858        for partial_item_name in both_scope.table_names() {
3859            if partial_item_name.matches(alias_item_name) {
3860                sql_bail!(
3861                    "table name \"{}\" specified more than once",
3862                    alias_item_name
3863                )
3864            }
3865        }
3866    }
3867
3868    let ecx = &ExprContext {
3869        qcx: right_qcx,
3870        name: "USING clause",
3871        scope: &both_scope,
3872        relation_type: &SqlRelationType::new(
3873            left_qcx
3874                .relation_type(&left)
3875                .column_types
3876                .into_iter()
3877                .chain(right_qcx.relation_type(&right).column_types)
3878                .collect(),
3879        ),
3880        allow_aggregates: false,
3881        allow_subqueries: false,
3882        allow_parameters: false,
3883        allow_windows: false,
3884    };
3885
3886    let mut join_exprs = vec![];
3887    let mut map_exprs = vec![];
3888    let mut new_items = vec![];
3889    let mut join_cols = vec![];
3890    let mut hidden_cols = vec![];
3891
3892    for column_name in column_names {
3893        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3894        let (lhs, lhs_name) = left_scope.resolve_using_column(
3895            column_name,
3896            JoinSide::Left,
3897            &mut left_qcx.name_manager.borrow_mut(),
3898        )?;
3899        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3900            column_name,
3901            JoinSide::Right,
3902            &mut right_qcx.name_manager.borrow_mut(),
3903        )?;
3904
3905        // Adjust the RHS reference to its post-join location.
3906        rhs.column += left_scope.len();
3907
3908        // Join keys must be resolved to same type.
3909        let mut exprs = coerce_homogeneous_exprs(
3910            &ecx.with_name(&format!(
3911                "NATURAL/USING join column {}",
3912                column_name.quoted()
3913            )),
3914            vec![
3915                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3916                    lhs,
3917                    Arc::clone(&lhs_name),
3918                )),
3919                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3920                    rhs,
3921                    Arc::clone(&rhs_name),
3922                )),
3923            ],
3924            None,
3925        )?;
3926        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3927
3928        match kind {
3929            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3930                join_cols.push(lhs.column);
3931                hidden_cols.push(rhs.column);
3932            }
3933            JoinKind::RightOuter => {
3934                join_cols.push(rhs.column);
3935                hidden_cols.push(lhs.column);
3936            }
3937            JoinKind::FullOuter => {
3938                // Create a new column that will be the coalesced value of left
3939                // and right.
3940                join_cols.push(both_scope.items.len() + map_exprs.len());
3941                hidden_cols.push(lhs.column);
3942                hidden_cols.push(rhs.column);
3943                map_exprs.push(HirScalarExpr::call_variadic(
3944                    VariadicFunc::Coalesce,
3945                    vec![expr1.clone(), expr2.clone()],
3946                ));
3947                new_items.push(ScopeItem::from_column_name(column_name));
3948            }
3949        }
3950
3951        // If a `join_using_alias` is present, add a new scope item that accepts
3952        // only table-qualified references for each specified join column.
3953        // Unlike regular table aliases, a `join_using_alias` should not hide the
3954        // names of the joined relations.
3955        if alias_item_name.is_some() {
3956            let new_item_col = both_scope.items.len() + new_items.len();
3957            join_cols.push(new_item_col);
3958            hidden_cols.push(new_item_col);
3959
3960            new_items.push(ScopeItem::from_name(
3961                alias_item_name.clone(),
3962                column_name.clone().to_string(),
3963            ));
3964
3965            // Should be safe to use either `lhs` or `rhs` here since the column
3966            // is available in both scopes and must have the same type of the new item.
3967            // We (arbitrarily) choose the left name.
3968            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3969        }
3970
3971        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3972    }
3973    both_scope.items.extend(new_items);
3974
3975    // The columns from the secondary side of the join remain accessible by
3976    // their table-qualified name, but not by their column name alone. They are
3977    // also excluded from `SELECT *`.
3978    for c in hidden_cols {
3979        both_scope.items[c].allow_unqualified_references = false;
3980    }
3981
3982    // Reproject all returned elements to the front of the list.
3983    let project_key = join_cols
3984        .into_iter()
3985        .chain(0..both_scope.items.len())
3986        .unique()
3987        .collect::<Vec<_>>();
3988
3989    both_scope = both_scope.project(&project_key);
3990
3991    let on = HirScalarExpr::variadic_and(join_exprs);
3992
3993    let both = left
3994        .join(right, on, kind)
3995        .map(map_exprs)
3996        .project(project_key);
3997    Ok((both, both_scope))
3998}
3999
4000pub fn plan_expr<'a>(
4001    ecx: &'a ExprContext,
4002    e: &Expr<Aug>,
4003) -> Result<CoercibleScalarExpr, PlanError> {
4004    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4005}
4006
4007fn plan_expr_inner<'a>(
4008    ecx: &'a ExprContext,
4009    e: &Expr<Aug>,
4010) -> Result<CoercibleScalarExpr, PlanError> {
4011    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4012        // We've already calculated this expression.
4013        return Ok(HirScalarExpr::named_column(
4014            i,
4015            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4016        )
4017        .into());
4018    }
4019
4020    match e {
4021        // Names.
4022        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4023            Ok(plan_identifier(ecx, names)?.into())
4024        }
4025
4026        // Literals.
4027        Expr::Value(val) => plan_literal(val),
4028        Expr::Parameter(n) => plan_parameter(ecx, *n),
4029        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4030        Expr::List(exprs) => plan_list(ecx, exprs, None),
4031        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4032        Expr::Row { exprs } => plan_row(ecx, exprs),
4033
4034        // Generalized functions, operators, and casts.
4035        Expr::Op { op, expr1, expr2 } => {
4036            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4037        }
4038        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4039        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4040
4041        // Special functions and operators.
4042        Expr::Not { expr } => plan_not(ecx, expr),
4043        Expr::And { left, right } => plan_and(ecx, left, right),
4044        Expr::Or { left, right } => plan_or(ecx, left, right),
4045        Expr::IsExpr {
4046            expr,
4047            construct,
4048            negated,
4049        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4050        Expr::Case {
4051            operand,
4052            conditions,
4053            results,
4054            else_result,
4055        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4056        Expr::HomogenizingFunction { function, exprs } => {
4057            plan_homogenizing_function(ecx, function, exprs)
4058        }
4059        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4060            ecx,
4061            &None,
4062            &[l_expr.clone().equals(*r_expr.clone())],
4063            &[Expr::null()],
4064            &Some(Box::new(*l_expr.clone())),
4065        )?
4066        .into()),
4067        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4068        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4069        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4070        Expr::Like {
4071            expr,
4072            pattern,
4073            escape,
4074            case_insensitive,
4075            negated,
4076        } => Ok(plan_like(
4077            ecx,
4078            expr,
4079            pattern,
4080            escape.as_deref(),
4081            *case_insensitive,
4082            *negated,
4083        )?
4084        .into()),
4085
4086        Expr::InList {
4087            expr,
4088            list,
4089            negated,
4090        } => plan_in_list(ecx, expr, list, negated),
4091
4092        // Subqueries.
4093        Expr::Exists(query) => plan_exists(ecx, query),
4094        Expr::Subquery(query) => plan_subquery(ecx, query),
4095        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4096        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4097        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4098        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4099        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
4100        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
4101        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
4102        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
4103        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
4104        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
4105        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
4106    }
4107}
4108
4109fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4110    if !ecx.allow_parameters {
4111        // It might be clearer to return an error like "cannot use parameter
4112        // here", but this is how PostgreSQL does it, and so for now we follow
4113        // PostgreSQL.
4114        return Err(PlanError::UnknownParameter(n));
4115    }
4116    if n == 0 || n > 65536 {
4117        return Err(PlanError::UnknownParameter(n));
4118    }
4119    if ecx.param_types().borrow().contains_key(&n) {
4120        Ok(HirScalarExpr::parameter(n).into())
4121    } else {
4122        Ok(CoercibleScalarExpr::Parameter(n))
4123    }
4124}
4125
4126fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4127    let mut out = vec![];
4128    for e in exprs {
4129        out.push(plan_expr(ecx, e)?);
4130    }
4131    Ok(CoercibleScalarExpr::LiteralRecord(out))
4132}
4133
4134fn plan_cast(
4135    ecx: &ExprContext,
4136    expr: &Expr<Aug>,
4137    data_type: &ResolvedDataType,
4138) -> Result<CoercibleScalarExpr, PlanError> {
4139    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4140    let expr = match expr {
4141        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4142        // we can pass in the target type as a type hint. This is
4143        // a limited form of the coercion that we do for string literals
4144        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4145        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4146        // PostgreSQL compatibility trouble.
4147        //
4148        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4149        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4150        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4151        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4152        _ => plan_expr(ecx, expr)?,
4153    };
4154    let ecx = &ecx.with_name("CAST");
4155    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4156    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4157    Ok(expr.into())
4158}
4159
4160fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4161    let ecx = ecx.with_name("NOT argument");
4162    Ok(plan_expr(&ecx, expr)?
4163        .type_as(&ecx, &SqlScalarType::Bool)?
4164        .call_unary(UnaryFunc::Not(expr_func::Not))
4165        .into())
4166}
4167
4168fn plan_and(
4169    ecx: &ExprContext,
4170    left: &Expr<Aug>,
4171    right: &Expr<Aug>,
4172) -> Result<CoercibleScalarExpr, PlanError> {
4173    let ecx = ecx.with_name("AND argument");
4174    Ok(HirScalarExpr::variadic_and(vec![
4175        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4176        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4177    ])
4178    .into())
4179}
4180
4181fn plan_or(
4182    ecx: &ExprContext,
4183    left: &Expr<Aug>,
4184    right: &Expr<Aug>,
4185) -> Result<CoercibleScalarExpr, PlanError> {
4186    let ecx = ecx.with_name("OR argument");
4187    Ok(HirScalarExpr::variadic_or(vec![
4188        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4189        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4190    ])
4191    .into())
4192}
4193
4194fn plan_in_list(
4195    ecx: &ExprContext,
4196    lhs: &Expr<Aug>,
4197    list: &Vec<Expr<Aug>>,
4198    negated: &bool,
4199) -> Result<CoercibleScalarExpr, PlanError> {
4200    let ecx = ecx.with_name("IN list");
4201    let or = HirScalarExpr::variadic_or(
4202        list.into_iter()
4203            .map(|e| {
4204                let eq = lhs.clone().equals(e.clone());
4205                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4206            })
4207            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4208    );
4209    Ok(if *negated {
4210        or.call_unary(UnaryFunc::Not(expr_func::Not))
4211    } else {
4212        or
4213    }
4214    .into())
4215}
4216
4217fn plan_homogenizing_function(
4218    ecx: &ExprContext,
4219    function: &HomogenizingFunction,
4220    exprs: &[Expr<Aug>],
4221) -> Result<CoercibleScalarExpr, PlanError> {
4222    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4223    let expr = HirScalarExpr::call_variadic(
4224        match function {
4225            HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4226            HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4227            HomogenizingFunction::Least => VariadicFunc::Least,
4228        },
4229        coerce_homogeneous_exprs(
4230            &ecx.with_name(&function.to_string().to_lowercase()),
4231            plan_exprs(ecx, exprs)?,
4232            None,
4233        )?,
4234    );
4235    Ok(expr.into())
4236}
4237
4238fn plan_field_access(
4239    ecx: &ExprContext,
4240    expr: &Expr<Aug>,
4241    field: &Ident,
4242) -> Result<CoercibleScalarExpr, PlanError> {
4243    let field = normalize::column_name(field.clone());
4244    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4245    let ty = ecx.scalar_type(&expr);
4246    let i = match &ty {
4247        SqlScalarType::Record { fields, .. } => {
4248            fields.iter().position(|(name, _ty)| *name == field)
4249        }
4250        ty => sql_bail!(
4251            "column notation applied to type {}, which is not a composite type",
4252            ecx.humanize_scalar_type(ty, false)
4253        ),
4254    };
4255    match i {
4256        None => sql_bail!(
4257            "field {} not found in data type {}",
4258            field,
4259            ecx.humanize_scalar_type(&ty, false)
4260        ),
4261        Some(i) => Ok(expr
4262            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4263            .into()),
4264    }
4265}
4266
4267fn plan_subscript(
4268    ecx: &ExprContext,
4269    expr: &Expr<Aug>,
4270    positions: &[SubscriptPosition<Aug>],
4271) -> Result<CoercibleScalarExpr, PlanError> {
4272    assert!(
4273        !positions.is_empty(),
4274        "subscript expression must contain at least one position"
4275    );
4276
4277    let ecx = &ecx.with_name("subscripting");
4278    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4279    let ty = ecx.scalar_type(&expr);
4280    match &ty {
4281        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4282            ecx,
4283            expr,
4284            positions,
4285            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4286            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4287            // track in its backing data).
4288            if ty == SqlScalarType::Int2Vector {
4289                1
4290            } else {
4291                0
4292            },
4293        ),
4294        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4295        SqlScalarType::List { element_type, .. } => {
4296            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4297            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4298            let n_layers = ty.unwrap_list_n_layers();
4299            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4300        }
4301        ty => sql_bail!(
4302            "cannot subscript type {}",
4303            ecx.humanize_scalar_type(ty, false)
4304        ),
4305    }
4306}
4307
4308// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4309// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4310// any were slices (i.e. included colon).
4311fn extract_scalar_subscript_from_positions<'a>(
4312    positions: &'a [SubscriptPosition<Aug>],
4313    expr_type_name: &str,
4314) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4315    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4316    for p in positions {
4317        if p.explicit_slice {
4318            sql_bail!("{} subscript does not support slices", expr_type_name);
4319        }
4320        assert!(
4321            p.end.is_none(),
4322            "index-appearing subscripts cannot have end value"
4323        );
4324        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4325    }
4326    Ok(scalar_subscripts)
4327}
4328
4329fn plan_subscript_array(
4330    ecx: &ExprContext,
4331    expr: HirScalarExpr,
4332    positions: &[SubscriptPosition<Aug>],
4333    offset: i64,
4334) -> Result<CoercibleScalarExpr, PlanError> {
4335    let mut exprs = Vec::with_capacity(positions.len() + 1);
4336    exprs.push(expr);
4337
4338    // Subscripting arrays doesn't yet support slicing, so we always want to
4339    // extract scalars or error.
4340    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4341
4342    for i in indexes {
4343        exprs.push(plan_expr(ecx, i)?.cast_to(
4344            ecx,
4345            CastContext::Explicit,
4346            &SqlScalarType::Int64,
4347        )?);
4348    }
4349
4350    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayIndex { offset }, exprs).into())
4351}
4352
4353fn plan_subscript_list(
4354    ecx: &ExprContext,
4355    mut expr: HirScalarExpr,
4356    positions: &[SubscriptPosition<Aug>],
4357    mut remaining_layers: usize,
4358    elem_type_name: &str,
4359) -> Result<CoercibleScalarExpr, PlanError> {
4360    let mut i = 0;
4361
4362    while i < positions.len() {
4363        // Take all contiguous index operations, i.e. find next slice operation.
4364        let j = positions[i..]
4365            .iter()
4366            .position(|p| p.explicit_slice)
4367            .unwrap_or(positions.len() - i);
4368        if j != 0 {
4369            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4370            let (n, e) = plan_index_list(
4371                ecx,
4372                expr,
4373                indexes.as_slice(),
4374                remaining_layers,
4375                elem_type_name,
4376            )?;
4377            remaining_layers = n;
4378            expr = e;
4379            i += j;
4380        }
4381
4382        // Take all contiguous slice operations, i.e. find next index operation.
4383        let j = positions[i..]
4384            .iter()
4385            .position(|p| !p.explicit_slice)
4386            .unwrap_or(positions.len() - i);
4387        if j != 0 {
4388            expr = plan_slice_list(
4389                ecx,
4390                expr,
4391                &positions[i..i + j],
4392                remaining_layers,
4393                elem_type_name,
4394            )?;
4395            i += j;
4396        }
4397    }
4398
4399    Ok(expr.into())
4400}
4401
4402fn plan_index_list(
4403    ecx: &ExprContext,
4404    expr: HirScalarExpr,
4405    indexes: &[&Expr<Aug>],
4406    n_layers: usize,
4407    elem_type_name: &str,
4408) -> Result<(usize, HirScalarExpr), PlanError> {
4409    let depth = indexes.len();
4410
4411    if depth > n_layers {
4412        if n_layers == 0 {
4413            sql_bail!("cannot subscript type {}", elem_type_name)
4414        } else {
4415            sql_bail!(
4416                "cannot index into {} layers; list only has {} layer{}",
4417                depth,
4418                n_layers,
4419                if n_layers == 1 { "" } else { "s" }
4420            )
4421        }
4422    }
4423
4424    let mut exprs = Vec::with_capacity(depth + 1);
4425    exprs.push(expr);
4426
4427    for i in indexes {
4428        exprs.push(plan_expr(ecx, i)?.cast_to(
4429            ecx,
4430            CastContext::Explicit,
4431            &SqlScalarType::Int64,
4432        )?);
4433    }
4434
4435    Ok((
4436        n_layers - depth,
4437        HirScalarExpr::call_variadic(VariadicFunc::ListIndex, exprs),
4438    ))
4439}
4440
4441fn plan_slice_list(
4442    ecx: &ExprContext,
4443    expr: HirScalarExpr,
4444    slices: &[SubscriptPosition<Aug>],
4445    n_layers: usize,
4446    elem_type_name: &str,
4447) -> Result<HirScalarExpr, PlanError> {
4448    if n_layers == 0 {
4449        sql_bail!("cannot subscript type {}", elem_type_name)
4450    }
4451
4452    // first arg will be list
4453    let mut exprs = Vec::with_capacity(slices.len() + 1);
4454    exprs.push(expr);
4455    // extract (start, end) parts from collected slices
4456    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4457        Ok(match position {
4458            Some(p) => {
4459                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4460            }
4461            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4462        })
4463    };
4464    for p in slices {
4465        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4466        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4467        exprs.push(start);
4468        exprs.push(end);
4469    }
4470
4471    Ok(HirScalarExpr::call_variadic(
4472        VariadicFunc::ListSliceLinear,
4473        exprs,
4474    ))
4475}
4476
4477fn plan_like(
4478    ecx: &ExprContext,
4479    expr: &Expr<Aug>,
4480    pattern: &Expr<Aug>,
4481    escape: Option<&Expr<Aug>>,
4482    case_insensitive: bool,
4483    not: bool,
4484) -> Result<HirScalarExpr, PlanError> {
4485    use CastContext::Implicit;
4486    let ecx = ecx.with_name("LIKE argument");
4487    let expr = plan_expr(&ecx, expr)?;
4488    let haystack = match ecx.scalar_type(&expr) {
4489        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4490            .type_as(&ecx, ty)?
4491            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4492        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4493    };
4494    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4495    if let Some(escape) = escape {
4496        pattern = pattern.call_binary(
4497            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4498            expr_func::LikeEscape,
4499        );
4500    }
4501    let func: BinaryFunc = if case_insensitive {
4502        expr_func::IsLikeMatchCaseInsensitive.into()
4503    } else {
4504        expr_func::IsLikeMatchCaseSensitive.into()
4505    };
4506    let like = haystack.call_binary(pattern, func);
4507    if not {
4508        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4509    } else {
4510        Ok(like)
4511    }
4512}
4513
4514fn plan_subscript_jsonb(
4515    ecx: &ExprContext,
4516    expr: HirScalarExpr,
4517    positions: &[SubscriptPosition<Aug>],
4518) -> Result<CoercibleScalarExpr, PlanError> {
4519    use CastContext::Implicit;
4520    use SqlScalarType::{Int64, String};
4521
4522    // JSONB doesn't support the slicing syntax, so simply error if you
4523    // encounter any explicit slices.
4524    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4525
4526    let mut exprs = Vec::with_capacity(subscripts.len());
4527    for s in subscripts {
4528        let subscript = plan_expr(ecx, s)?;
4529        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4530            subscript
4531        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4532            // Integers are converted to a string here and then re-parsed as an
4533            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4534            // do it.
4535            typeconv::to_string(ecx, subscript)
4536        } else {
4537            sql_bail!("jsonb subscript type must be coercible to integer or text");
4538        };
4539        exprs.push(subscript);
4540    }
4541
4542    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4543    // `expr->subscript` as you might expect.
4544    let expr = expr.call_binary(
4545        HirScalarExpr::call_variadic(
4546            VariadicFunc::ArrayCreate {
4547                elem_type: SqlScalarType::String,
4548            },
4549            exprs,
4550        ),
4551        expr_func::JsonbGetPath,
4552    );
4553    Ok(expr.into())
4554}
4555
4556fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4557    if !ecx.allow_subqueries {
4558        sql_bail!("{} does not allow subqueries", ecx.name)
4559    }
4560    let mut qcx = ecx.derived_query_context();
4561    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4562    Ok(expr.exists().into())
4563}
4564
4565fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4566    if !ecx.allow_subqueries {
4567        sql_bail!("{} does not allow subqueries", ecx.name)
4568    }
4569    let mut qcx = ecx.derived_query_context();
4570    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4571    let column_types = qcx.relation_type(&expr).column_types;
4572    if column_types.len() != 1 {
4573        sql_bail!(
4574            "Expected subselect to return 1 column, got {} columns",
4575            column_types.len()
4576        );
4577    }
4578    Ok(expr.select().into())
4579}
4580
4581fn plan_list_subquery(
4582    ecx: &ExprContext,
4583    query: &Query<Aug>,
4584) -> Result<CoercibleScalarExpr, PlanError> {
4585    plan_vector_like_subquery(
4586        ecx,
4587        query,
4588        |_| false,
4589        |elem_type| VariadicFunc::ListCreate { elem_type },
4590        |order_by| AggregateFunc::ListConcat { order_by },
4591        expr_func::ListListConcat.into(),
4592        |elem_type| {
4593            HirScalarExpr::literal(
4594                Datum::empty_list(),
4595                SqlScalarType::List {
4596                    element_type: Box::new(elem_type),
4597                    custom_id: None,
4598                },
4599            )
4600        },
4601        "list",
4602    )
4603}
4604
4605fn plan_array_subquery(
4606    ecx: &ExprContext,
4607    query: &Query<Aug>,
4608) -> Result<CoercibleScalarExpr, PlanError> {
4609    plan_vector_like_subquery(
4610        ecx,
4611        query,
4612        |elem_type| {
4613            matches!(
4614                elem_type,
4615                SqlScalarType::Char { .. }
4616                    | SqlScalarType::Array { .. }
4617                    | SqlScalarType::List { .. }
4618                    | SqlScalarType::Map { .. }
4619            )
4620        },
4621        |elem_type| VariadicFunc::ArrayCreate { elem_type },
4622        |order_by| AggregateFunc::ArrayConcat { order_by },
4623        expr_func::ArrayArrayConcat.into(),
4624        |elem_type| {
4625            HirScalarExpr::literal(
4626                Datum::empty_array(),
4627                SqlScalarType::Array(Box::new(elem_type)),
4628            )
4629        },
4630        "[]",
4631    )
4632}
4633
4634/// Generic function used to plan both array subqueries and list subqueries
4635fn plan_vector_like_subquery<F1, F2, F3, F4>(
4636    ecx: &ExprContext,
4637    query: &Query<Aug>,
4638    is_unsupported_type: F1,
4639    vector_create: F2,
4640    aggregate_concat: F3,
4641    binary_concat: BinaryFunc,
4642    empty_literal: F4,
4643    vector_type_string: &str,
4644) -> Result<CoercibleScalarExpr, PlanError>
4645where
4646    F1: Fn(&SqlScalarType) -> bool,
4647    F2: Fn(SqlScalarType) -> VariadicFunc,
4648    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4649    F4: Fn(SqlScalarType) -> HirScalarExpr,
4650{
4651    if !ecx.allow_subqueries {
4652        sql_bail!("{} does not allow subqueries", ecx.name)
4653    }
4654
4655    let mut qcx = ecx.derived_query_context();
4656    let mut planned_query = plan_query(&mut qcx, query)?;
4657    if planned_query.limit.is_some()
4658        || !planned_query
4659            .offset
4660            .clone()
4661            .try_into_literal_int64()
4662            .is_ok_and(|offset| offset == 0)
4663    {
4664        planned_query.expr = HirRelationExpr::top_k(
4665            planned_query.expr,
4666            vec![],
4667            planned_query.order_by.clone(),
4668            planned_query.limit,
4669            planned_query.offset,
4670            planned_query.group_size_hints.limit_input_group_size,
4671        );
4672    }
4673
4674    if planned_query.project.len() != 1 {
4675        sql_bail!(
4676            "Expected subselect to return 1 column, got {} columns",
4677            planned_query.project.len()
4678        );
4679    }
4680
4681    let project_column = *planned_query.project.get(0).unwrap();
4682    let elem_type = qcx
4683        .relation_type(&planned_query.expr)
4684        .column_types
4685        .get(project_column)
4686        .cloned()
4687        .unwrap()
4688        .scalar_type();
4689
4690    if is_unsupported_type(&elem_type) {
4691        bail_unsupported!(format!(
4692            "cannot build array from subquery because return type {}{}",
4693            ecx.humanize_scalar_type(&elem_type, false),
4694            vector_type_string
4695        ));
4696    }
4697
4698    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4699    // subquery above.
4700    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4701        vector_create(elem_type.clone()),
4702        vec![HirScalarExpr::column(project_column)],
4703    ))
4704    .chain(
4705        planned_query
4706            .order_by
4707            .iter()
4708            .map(|co| HirScalarExpr::column(co.column)),
4709    )
4710    .collect();
4711
4712    // However, column references for `aggregation_projection` and `aggregation_order_by`
4713    // are with reference to the `exprs` of the aggregation expression.  Here that is
4714    // `aggregation_exprs`.
4715    let aggregation_projection = vec![0];
4716    let aggregation_order_by = planned_query
4717        .order_by
4718        .into_iter()
4719        .enumerate()
4720        .map(|(i, order)| ColumnOrder { column: i, ..order })
4721        .collect();
4722
4723    let reduced_expr = planned_query
4724        .expr
4725        .reduce(
4726            vec![],
4727            vec![AggregateExpr {
4728                func: aggregate_concat(aggregation_order_by),
4729                expr: Box::new(HirScalarExpr::call_variadic(
4730                    VariadicFunc::RecordCreate {
4731                        field_names: iter::repeat(ColumnName::from(""))
4732                            .take(aggregation_exprs.len())
4733                            .collect(),
4734                    },
4735                    aggregation_exprs,
4736                )),
4737                distinct: false,
4738            }],
4739            None,
4740        )
4741        .project(aggregation_projection);
4742
4743    // If `expr` has no rows, return an empty array/list rather than NULL.
4744    Ok(reduced_expr
4745        .select()
4746        .call_binary(empty_literal(elem_type), binary_concat)
4747        .into())
4748}
4749
4750fn plan_map_subquery(
4751    ecx: &ExprContext,
4752    query: &Query<Aug>,
4753) -> Result<CoercibleScalarExpr, PlanError> {
4754    if !ecx.allow_subqueries {
4755        sql_bail!("{} does not allow subqueries", ecx.name)
4756    }
4757
4758    let mut qcx = ecx.derived_query_context();
4759    let mut query = plan_query(&mut qcx, query)?;
4760    if query.limit.is_some()
4761        || !query
4762            .offset
4763            .clone()
4764            .try_into_literal_int64()
4765            .is_ok_and(|offset| offset == 0)
4766    {
4767        query.expr = HirRelationExpr::top_k(
4768            query.expr,
4769            vec![],
4770            query.order_by.clone(),
4771            query.limit,
4772            query.offset,
4773            query.group_size_hints.limit_input_group_size,
4774        );
4775    }
4776    if query.project.len() != 2 {
4777        sql_bail!(
4778            "expected map subquery to return 2 columns, got {} columns",
4779            query.project.len()
4780        );
4781    }
4782
4783    let query_types = qcx.relation_type(&query.expr).column_types;
4784    let key_column = query.project[0];
4785    let key_type = query_types[key_column].clone().scalar_type();
4786    let value_column = query.project[1];
4787    let value_type = query_types[value_column].clone().scalar_type();
4788
4789    if key_type != SqlScalarType::String {
4790        sql_bail!("cannot build map from subquery because first column is not of type text");
4791    }
4792
4793    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4794        VariadicFunc::RecordCreate {
4795            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4796        },
4797        vec![
4798            HirScalarExpr::column(key_column),
4799            HirScalarExpr::column(value_column),
4800        ],
4801    ))
4802    .chain(
4803        query
4804            .order_by
4805            .iter()
4806            .map(|co| HirScalarExpr::column(co.column)),
4807    )
4808    .collect();
4809
4810    let expr = query
4811        .expr
4812        .reduce(
4813            vec![],
4814            vec![AggregateExpr {
4815                func: AggregateFunc::MapAgg {
4816                    order_by: query
4817                        .order_by
4818                        .into_iter()
4819                        .enumerate()
4820                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4821                        .collect(),
4822                    value_type: value_type.clone(),
4823                },
4824                expr: Box::new(HirScalarExpr::call_variadic(
4825                    VariadicFunc::RecordCreate {
4826                        field_names: iter::repeat(ColumnName::from(""))
4827                            .take(aggregation_exprs.len())
4828                            .collect(),
4829                    },
4830                    aggregation_exprs,
4831                )),
4832                distinct: false,
4833            }],
4834            None,
4835        )
4836        .project(vec![0]);
4837
4838    // If `expr` has no rows, return an empty map rather than NULL.
4839    let expr = HirScalarExpr::call_variadic(
4840        VariadicFunc::Coalesce,
4841        vec![
4842            expr.select(),
4843            HirScalarExpr::literal(
4844                Datum::empty_map(),
4845                SqlScalarType::Map {
4846                    value_type: Box::new(value_type),
4847                    custom_id: None,
4848                },
4849            ),
4850        ],
4851    );
4852
4853    Ok(expr.into())
4854}
4855
4856fn plan_collate(
4857    ecx: &ExprContext,
4858    expr: &Expr<Aug>,
4859    collation: &UnresolvedItemName,
4860) -> Result<CoercibleScalarExpr, PlanError> {
4861    if collation.0.len() == 2
4862        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4863        && collation.0[1] == ident!("default")
4864    {
4865        plan_expr(ecx, expr)
4866    } else {
4867        bail_unsupported!("COLLATE");
4868    }
4869}
4870
4871/// Plans a slice of expressions.
4872///
4873/// This function is a simple convenience function for mapping [`plan_expr`]
4874/// over a slice of expressions. The planned expressions are returned in the
4875/// same order as the input. If any of the expressions fail to plan, returns an
4876/// error instead.
4877fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4878where
4879    E: std::borrow::Borrow<Expr<Aug>>,
4880{
4881    let mut out = vec![];
4882    for expr in exprs {
4883        out.push(plan_expr(ecx, expr.borrow())?);
4884    }
4885    Ok(out)
4886}
4887
4888/// Plans an `ARRAY` expression.
4889fn plan_array(
4890    ecx: &ExprContext,
4891    exprs: &[Expr<Aug>],
4892    type_hint: Option<&SqlScalarType>,
4893) -> Result<CoercibleScalarExpr, PlanError> {
4894    // Plan each element expression.
4895    let mut out = vec![];
4896    for expr in exprs {
4897        out.push(match expr {
4898            // Special case nested ARRAY expressions so we can plumb
4899            // the type hint through.
4900            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4901            _ => plan_expr(ecx, expr)?,
4902        });
4903    }
4904
4905    // Attempt to make use of the type hint.
4906    let type_hint = match type_hint {
4907        // The user has provided an explicit cast to an array type. We know the
4908        // element type to coerce to. Need to be careful, though: if there's
4909        // evidence that any of the array elements are themselves arrays, we
4910        // want to coerce to the array type, not the element type.
4911        Some(SqlScalarType::Array(elem_type)) => {
4912            let multidimensional = out.iter().any(|e| {
4913                matches!(
4914                    ecx.scalar_type(e),
4915                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4916                )
4917            });
4918            if multidimensional {
4919                type_hint
4920            } else {
4921                Some(&**elem_type)
4922            }
4923        }
4924        // The user provided an explicit cast to a non-array type. We'll have to
4925        // guess what the correct type for the array. Our caller will then
4926        // handle converting that array type to the desired non-array type.
4927        Some(_) => None,
4928        // No type hint. We'll have to guess the correct type for the array.
4929        None => None,
4930    };
4931
4932    // Coerce all elements to the same type.
4933    let (elem_type, exprs) = if exprs.is_empty() {
4934        if let Some(elem_type) = type_hint {
4935            (elem_type.clone(), vec![])
4936        } else {
4937            sql_bail!("cannot determine type of empty array");
4938        }
4939    } else {
4940        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4941        (ecx.scalar_type(&out[0]), out)
4942    };
4943
4944    // Arrays of `char` type are disallowed due to a known limitation:
4945    // https://github.com/MaterializeInc/database-issues/issues/2360.
4946    //
4947    // Arrays of `list` and `map` types are disallowed due to mind-bending
4948    // semantics.
4949    if matches!(
4950        elem_type,
4951        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4952    ) {
4953        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4954    }
4955
4956    Ok(HirScalarExpr::call_variadic(VariadicFunc::ArrayCreate { elem_type }, exprs).into())
4957}
4958
4959fn plan_list(
4960    ecx: &ExprContext,
4961    exprs: &[Expr<Aug>],
4962    type_hint: Option<&SqlScalarType>,
4963) -> Result<CoercibleScalarExpr, PlanError> {
4964    let (elem_type, exprs) = if exprs.is_empty() {
4965        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4966            (element_type.without_modifiers(), vec![])
4967        } else {
4968            sql_bail!("cannot determine type of empty list");
4969        }
4970    } else {
4971        let type_hint = match type_hint {
4972            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
4973            _ => None,
4974        };
4975
4976        let mut out = vec![];
4977        for expr in exprs {
4978            out.push(match expr {
4979                // Special case nested LIST expressions so we can plumb
4980                // the type hint through.
4981                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4982                _ => plan_expr(ecx, expr)?,
4983            });
4984        }
4985        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4986        (ecx.scalar_type(&out[0]).without_modifiers(), out)
4987    };
4988
4989    if matches!(elem_type, SqlScalarType::Char { .. }) {
4990        bail_unsupported!("char list");
4991    }
4992
4993    Ok(HirScalarExpr::call_variadic(VariadicFunc::ListCreate { elem_type }, exprs).into())
4994}
4995
4996fn plan_map(
4997    ecx: &ExprContext,
4998    entries: &[MapEntry<Aug>],
4999    type_hint: Option<&SqlScalarType>,
5000) -> Result<CoercibleScalarExpr, PlanError> {
5001    let (value_type, exprs) = if entries.is_empty() {
5002        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5003            (value_type.without_modifiers(), vec![])
5004        } else {
5005            sql_bail!("cannot determine type of empty map");
5006        }
5007    } else {
5008        let type_hint = match type_hint {
5009            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5010            _ => None,
5011        };
5012
5013        let mut keys = vec![];
5014        let mut values = vec![];
5015        for MapEntry { key, value } in entries {
5016            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5017            let value = match value {
5018                // Special case nested MAP expressions so we can plumb
5019                // the type hint through.
5020                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5021                _ => plan_expr(ecx, value)?,
5022            };
5023            keys.push(key);
5024            values.push(value);
5025        }
5026        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5027        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5028        let out = itertools::interleave(keys, values).collect();
5029        (value_type, out)
5030    };
5031
5032    if matches!(value_type, SqlScalarType::Char { .. }) {
5033        bail_unsupported!("char map");
5034    }
5035
5036    let expr = HirScalarExpr::call_variadic(VariadicFunc::MapBuild { value_type }, exprs);
5037    Ok(expr.into())
5038}
5039
5040/// Coerces a list of expressions such that all input expressions will be cast
5041/// to the same type. If successful, returns a new list of expressions in the
5042/// same order as the input, where each expression has the appropriate casts to
5043/// make them all of a uniform type.
5044///
5045/// If `force_type` is `Some`, the expressions are forced to the specified type
5046/// via an explicit cast. Otherwise the best common type is guessed via
5047/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5048/// implicit casts
5049///
5050/// Note that this is our implementation of Postgres' type conversion for
5051/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5052/// isn't yet used in all of those cases.
5053///
5054/// [union-type-conv]:
5055/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5056pub fn coerce_homogeneous_exprs(
5057    ecx: &ExprContext,
5058    exprs: Vec<CoercibleScalarExpr>,
5059    force_type: Option<&SqlScalarType>,
5060) -> Result<Vec<HirScalarExpr>, PlanError> {
5061    assert!(!exprs.is_empty());
5062
5063    let target_holder;
5064    let target = match force_type {
5065        Some(t) => t,
5066        None => {
5067            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5068            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5069            &target_holder
5070        }
5071    };
5072
5073    // Try to cast all expressions to `target`.
5074    let mut out = Vec::new();
5075    for expr in exprs {
5076        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5077        let ccx = match force_type {
5078            None => CastContext::Implicit,
5079            Some(_) => CastContext::Explicit,
5080        };
5081        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5082            Ok(expr) => out.push(expr),
5083            Err(_) => sql_bail!(
5084                "{} could not convert type {} to {}",
5085                ecx.name,
5086                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
5087                ecx.humanize_scalar_type(target, false),
5088            ),
5089        }
5090    }
5091    Ok(out)
5092}
5093
5094/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5095/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5096pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5097    obe: &OrderByExpr<T>,
5098    column: usize,
5099) -> ColumnOrder {
5100    let desc = !obe.asc.unwrap_or(true);
5101    ColumnOrder {
5102        column,
5103        desc,
5104        // https://www.postgresql.org/docs/14/queries-order.html
5105        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5106        nulls_last: obe.nulls_last.unwrap_or(!desc),
5107    }
5108}
5109
5110/// Plans the ORDER BY clause of a window function.
5111///
5112/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5113/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5114/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5115/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5116/// `Vec<HirScalarExpr>` that we return.
5117fn plan_function_order_by(
5118    ecx: &ExprContext,
5119    order_by: &[OrderByExpr<Aug>],
5120) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5121    let mut order_by_exprs = vec![];
5122    let mut col_orders = vec![];
5123    {
5124        for (i, obe) in order_by.iter().enumerate() {
5125            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5126            // do not support ordinal references in PostgreSQL. So we use
5127            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5128            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5129            order_by_exprs.push(expr);
5130            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5131        }
5132    }
5133    Ok((order_by_exprs, col_orders))
5134}
5135
5136/// Common part of the planning of windowed and non-windowed aggregation functions.
5137fn plan_aggregate_common(
5138    ecx: &ExprContext,
5139    Function::<Aug> {
5140        name,
5141        args,
5142        filter,
5143        over: _,
5144        distinct,
5145    }: &Function<Aug>,
5146) -> Result<AggregateExpr, PlanError> {
5147    // Normal aggregate functions, like `sum`, expect as input a single expression
5148    // which yields the datum to aggregate. Order sensitive aggregate functions,
5149    // like `jsonb_agg`, are special, and instead expect a Record whose first
5150    // element yields the datum to aggregate and whose successive elements yield
5151    // keys to order by. This expectation is hard coded within the implementation
5152    // of each of the order-sensitive aggregates. The specification of how many
5153    // order by keys to consider, and in what order, is passed via the `order_by`
5154    // field on the `AggregateFunc` variant.
5155
5156    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5157    // most, so explicitly drop it if the function doesn't care about order. This
5158    // prevents the projection into Record below from triggering on unsupported
5159    // functions.
5160
5161    let impls = match resolve_func(ecx, name, args)? {
5162        Func::Aggregate(impls) => impls,
5163        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
5164    };
5165
5166    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5167    // generalized function selection framework. The rule is simple: the user
5168    // must type `count(*)`, but the function selection framework sees an empty
5169    // parameter list, as if the user had typed `count()`. But if the user types
5170    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5171    // rules to all aggregates, not just `count`, since we may one day support
5172    // user-defined aggregates, including user-defined aggregates that take no
5173    // parameters.
5174    let (args, order_by) = match &args {
5175        FunctionArgs::Star => (vec![], vec![]),
5176        FunctionArgs::Args { args, order_by } => {
5177            if args.is_empty() {
5178                sql_bail!(
5179                    "{}(*) must be used to call a parameterless aggregate function",
5180                    ecx.qcx
5181                        .scx
5182                        .humanize_resolved_name(name)
5183                        .expect("name actually resolved")
5184                );
5185            }
5186            let args = plan_exprs(ecx, args)?;
5187            (args, order_by.clone())
5188        }
5189    };
5190
5191    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5192
5193    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5194    if let Some(filter) = &filter {
5195        // If a filter is present, as in
5196        //
5197        //     <agg>(<expr>) FILTER (WHERE <cond>)
5198        //
5199        // we plan it by essentially rewriting the expression to
5200        //
5201        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5202        //
5203        // where <identity> is the identity input for <agg>.
5204        let cond =
5205            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5206        let expr_typ = ecx.scalar_type(&expr);
5207        expr = HirScalarExpr::if_then_else(
5208            cond,
5209            expr,
5210            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5211        );
5212    }
5213
5214    let mut seen_outer = false;
5215    let mut seen_inner = false;
5216    #[allow(deprecated)]
5217    expr.visit_columns(0, &mut |depth, col| {
5218        if depth == 0 && col.level == 0 {
5219            seen_inner = true;
5220        } else if col.level > depth {
5221            seen_outer = true;
5222        }
5223    });
5224    if seen_outer && !seen_inner {
5225        bail_unsupported!(
5226            3720,
5227            "aggregate functions that refer exclusively to outer columns"
5228        );
5229    }
5230
5231    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5232    // map the needed expressions into the aggregate datum.
5233    if func.is_order_sensitive() {
5234        let field_names = iter::repeat(ColumnName::from(""))
5235            .take(1 + order_by_exprs.len())
5236            .collect();
5237        let mut exprs = vec![expr];
5238        exprs.extend(order_by_exprs);
5239        expr = HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs);
5240    }
5241
5242    Ok(AggregateExpr {
5243        func,
5244        expr: Box::new(expr),
5245        distinct: *distinct,
5246    })
5247}
5248
5249fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5250    let mut names = names.to_vec();
5251    let col_name = normalize::column_name(names.pop().unwrap());
5252
5253    // If the name is qualified, it must refer to a column in a table.
5254    if !names.is_empty() {
5255        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5256        let (i, i_name) = ecx.scope.resolve_table_column(
5257            &ecx.qcx.outer_scopes,
5258            &table_name,
5259            &col_name,
5260            &mut ecx.qcx.name_manager.borrow_mut(),
5261        )?;
5262        return Ok(HirScalarExpr::named_column(i, i_name));
5263    }
5264
5265    // If the name is unqualified, first check if it refers to a column. Track any similar names
5266    // that might exist for a better error message.
5267    let similar_names = match ecx.scope.resolve_column(
5268        &ecx.qcx.outer_scopes,
5269        &col_name,
5270        &mut ecx.qcx.name_manager.borrow_mut(),
5271    ) {
5272        Ok((i, i_name)) => {
5273            return Ok(HirScalarExpr::named_column(i, i_name));
5274        }
5275        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5276        Err(e) => return Err(e),
5277    };
5278
5279    // The name doesn't refer to a column. Check if it is a whole-row reference
5280    // to a table.
5281    let items = ecx.scope.items_from_table(
5282        &ecx.qcx.outer_scopes,
5283        &PartialItemName {
5284            database: None,
5285            schema: None,
5286            item: col_name.as_str().to_owned(),
5287        },
5288    )?;
5289    match items.as_slice() {
5290        // The name doesn't refer to a table either. Return an error.
5291        [] => Err(PlanError::UnknownColumn {
5292            table: None,
5293            column: col_name,
5294            similar: similar_names,
5295        }),
5296        // The name refers to a table that is the result of a function that
5297        // returned a single column. Per PostgreSQL, this is a special case
5298        // that returns the value directly.
5299        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5300        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5301            *column,
5302            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5303        )),
5304        // The name refers to a normal table. Return a record containing all the
5305        // columns of the table.
5306        _ => {
5307            let mut has_exists_column = None;
5308            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5309                .into_iter()
5310                .filter_map(|(column, item)| {
5311                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5312                        has_exists_column = Some(column);
5313                        None
5314                    } else {
5315                        let expr = HirScalarExpr::named_column(
5316                            column,
5317                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5318                        );
5319                        let name = item.column_name.clone();
5320                        Some((expr, name))
5321                    }
5322                })
5323                .unzip();
5324            // For the special case of a table function with a single column, the single column is instead not wrapped.
5325            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5326                exprs.into_element()
5327            } else {
5328                HirScalarExpr::call_variadic(VariadicFunc::RecordCreate { field_names }, exprs)
5329            };
5330            if let Some(has_exists_column) = has_exists_column {
5331                Ok(HirScalarExpr::if_then_else(
5332                    HirScalarExpr::unnamed_column(has_exists_column)
5333                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5334                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5335                    expr,
5336                ))
5337            } else {
5338                Ok(expr)
5339            }
5340        }
5341    }
5342}
5343
5344fn plan_op(
5345    ecx: &ExprContext,
5346    op: &str,
5347    expr1: &Expr<Aug>,
5348    expr2: Option<&Expr<Aug>>,
5349) -> Result<HirScalarExpr, PlanError> {
5350    let impls = func::resolve_op(op)?;
5351    let args = match expr2 {
5352        None => plan_exprs(ecx, &[expr1])?,
5353        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5354    };
5355    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5356}
5357
5358fn plan_function<'a>(
5359    ecx: &ExprContext,
5360    f @ Function {
5361        name,
5362        args,
5363        filter,
5364        over,
5365        distinct,
5366    }: &'a Function<Aug>,
5367) -> Result<HirScalarExpr, PlanError> {
5368    let impls = match resolve_func(ecx, name, args)? {
5369        Func::Table(_) => {
5370            sql_bail!(
5371                "table functions are not allowed in {} (function {})",
5372                ecx.name,
5373                name
5374            );
5375        }
5376        Func::Scalar(impls) => {
5377            if over.is_some() {
5378                sql_bail!(
5379                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5380                );
5381            }
5382            impls
5383        }
5384        Func::ScalarWindow(impls) => {
5385            let (
5386                ignore_nulls,
5387                order_by_exprs,
5388                col_orders,
5389                _window_frame,
5390                partition_by,
5391                scalar_args,
5392            ) = plan_window_function_non_aggr(ecx, f)?;
5393
5394            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5395            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5396            // msg there is less informative.)
5397            if !scalar_args.is_empty() {
5398                if let ResolvedItemName::Item {
5399                    full_name: FullItemName { item, .. },
5400                    ..
5401                } = name
5402                {
5403                    sql_bail!(
5404                        "function {} has 0 parameters, but was called with {}",
5405                        item,
5406                        scalar_args.len()
5407                    );
5408                }
5409            }
5410
5411            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5412            // accept a window frame here without an error msg. (Postgres also does this.)
5413            // TODO: maybe we should give a notice
5414
5415            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5416
5417            if ignore_nulls {
5418                // If we ever add a scalar window function that supports ignore, then don't forget
5419                // to also update HIR EXPLAIN.
5420                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5421            }
5422
5423            return Ok(HirScalarExpr::windowing(WindowExpr {
5424                func: WindowExprType::Scalar(ScalarWindowExpr {
5425                    func,
5426                    order_by: col_orders,
5427                }),
5428                partition_by,
5429                order_by: order_by_exprs,
5430            }));
5431        }
5432        Func::ValueWindow(impls) => {
5433            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5434                plan_window_function_non_aggr(ecx, f)?;
5435
5436            let (args_encoded, func) =
5437                func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5438
5439            if ignore_nulls {
5440                match func {
5441                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5442                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5443                }
5444            }
5445
5446            return Ok(HirScalarExpr::windowing(WindowExpr {
5447                func: WindowExprType::Value(ValueWindowExpr {
5448                    func,
5449                    args: Box::new(args_encoded),
5450                    order_by: col_orders,
5451                    window_frame,
5452                    ignore_nulls, // (RESPECT NULLS is the default)
5453                }),
5454                partition_by,
5455                order_by: order_by_exprs,
5456            }));
5457        }
5458        Func::Aggregate(_) => {
5459            if f.over.is_none() {
5460                // Not a window aggregate. Something is wrong.
5461                if ecx.allow_aggregates {
5462                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5463                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5464                    sql_bail!(
5465                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5466                        name,
5467                    );
5468                } else {
5469                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5470                    // because it was in an unsupported context.
5471                    sql_bail!(
5472                        "aggregate functions are not allowed in {} (function {})",
5473                        ecx.name,
5474                        name
5475                    );
5476                }
5477            } else {
5478                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5479                    plan_window_function_common(ecx, &f.name, &f.over)?;
5480
5481                // https://github.com/MaterializeInc/database-issues/issues/6720
5482                match (&window_frame.start_bound, &window_frame.end_bound) {
5483                    (
5484                        mz_expr::WindowFrameBound::UnboundedPreceding,
5485                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5486                    )
5487                    | (
5488                        mz_expr::WindowFrameBound::UnboundedPreceding,
5489                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5490                    )
5491                    | (
5492                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5493                        mz_expr::WindowFrameBound::UnboundedFollowing,
5494                    )
5495                    | (
5496                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5497                        mz_expr::WindowFrameBound::UnboundedFollowing,
5498                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5499                    (_, _) => {} // other cases are ok
5500                }
5501
5502                if ignore_nulls {
5503                    // https://github.com/MaterializeInc/database-issues/issues/6722
5504                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5505                    // forget to also update HIR EXPLAIN.
5506                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5507                }
5508
5509                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5510
5511                if aggregate_expr.distinct {
5512                    // https://github.com/MaterializeInc/database-issues/issues/6626
5513                    bail_unsupported!("DISTINCT in window aggregates");
5514                }
5515
5516                return Ok(HirScalarExpr::windowing(WindowExpr {
5517                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5518                        aggregate_expr,
5519                        order_by: col_orders,
5520                        window_frame,
5521                    }),
5522                    partition_by,
5523                    order_by: order_by_exprs,
5524                }));
5525            }
5526        }
5527    };
5528
5529    if over.is_some() {
5530        unreachable!("If there is an OVER clause, we should have returned already above.");
5531    }
5532
5533    if *distinct {
5534        sql_bail!(
5535            "DISTINCT specified, but {} is not an aggregate function",
5536            ecx.qcx
5537                .scx
5538                .humanize_resolved_name(name)
5539                .expect("already resolved")
5540        );
5541    }
5542    if filter.is_some() {
5543        sql_bail!(
5544            "FILTER specified, but {} is not an aggregate function",
5545            ecx.qcx
5546                .scx
5547                .humanize_resolved_name(name)
5548                .expect("already resolved")
5549        );
5550    }
5551
5552    let scalar_args = match &args {
5553        FunctionArgs::Star => {
5554            sql_bail!(
5555                "* argument is invalid with non-aggregate function {}",
5556                ecx.qcx
5557                    .scx
5558                    .humanize_resolved_name(name)
5559                    .expect("already resolved")
5560            )
5561        }
5562        FunctionArgs::Args { args, order_by } => {
5563            if !order_by.is_empty() {
5564                sql_bail!(
5565                    "ORDER BY specified, but {} is not an aggregate function",
5566                    ecx.qcx
5567                        .scx
5568                        .humanize_resolved_name(name)
5569                        .expect("already resolved")
5570                );
5571            }
5572            plan_exprs(ecx, args)?
5573        }
5574    };
5575
5576    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5577}
5578
5579pub const IGNORE_NULLS_ERROR_MSG: &str =
5580    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5581
5582/// Resolves the name to a set of function implementations.
5583///
5584/// If the name does not specify a known built-in function, returns an error.
5585pub fn resolve_func(
5586    ecx: &ExprContext,
5587    name: &ResolvedItemName,
5588    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5589) -> Result<&'static Func, PlanError> {
5590    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5591        if let Ok(f) = i.func() {
5592            return Ok(f);
5593        }
5594    }
5595
5596    // Couldn't resolve function with this name, so generate verbose error
5597    // message.
5598    let cexprs = match args {
5599        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5600        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5601            if !order_by.is_empty() {
5602                sql_bail!(
5603                    "ORDER BY specified, but {} is not an aggregate function",
5604                    name
5605                );
5606            }
5607            plan_exprs(ecx, args)?
5608        }
5609    };
5610
5611    let arg_types: Vec<_> = cexprs
5612        .into_iter()
5613        .map(|ty| match ecx.scalar_type(&ty) {
5614            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5615            CoercibleScalarType::Record(_) => "record".to_string(),
5616            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5617        })
5618        .collect();
5619
5620    Err(PlanError::UnknownFunction {
5621        name: name.to_string(),
5622        arg_types,
5623    })
5624}
5625
5626fn plan_is_expr<'a>(
5627    ecx: &ExprContext,
5628    expr: &'a Expr<Aug>,
5629    construct: &IsExprConstruct<Aug>,
5630    not: bool,
5631) -> Result<HirScalarExpr, PlanError> {
5632    let expr = plan_expr(ecx, expr)?;
5633    let mut expr = match construct {
5634        IsExprConstruct::Null => {
5635            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5636            // at odds with our type coercion rules, which treat `NULL` literals
5637            // and unconstrained parameters identically. Providing a type hint
5638            // of string means we wind up supporting both.
5639            let expr = expr.type_as_any(ecx)?;
5640            expr.call_is_null()
5641        }
5642        IsExprConstruct::Unknown => {
5643            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5644            expr.call_is_null()
5645        }
5646        IsExprConstruct::True => {
5647            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5648            expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5649        }
5650        IsExprConstruct::False => {
5651            let expr = expr.type_as(ecx, &SqlScalarType::Bool)?;
5652            expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5653        }
5654        IsExprConstruct::DistinctFrom(expr2) => {
5655            let expr1 = expr.type_as_any(ecx)?;
5656            let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5657            // There are three cases:
5658            // 1. Both terms are non-null, in which case the result should be `a != b`.
5659            // 2. Exactly one term is null, in which case the result should be true.
5660            // 3. Both terms are null, in which case the result should be false.
5661            //
5662            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5663            let term1 = HirScalarExpr::variadic_or(vec![
5664                expr1.clone().call_binary(expr2.clone(), expr_func::NotEq),
5665                expr1.clone().call_is_null(),
5666                expr2.clone().call_is_null(),
5667            ]);
5668            let term2 = HirScalarExpr::variadic_or(vec![
5669                expr1.call_is_null().not(),
5670                expr2.call_is_null().not(),
5671            ]);
5672            term1.and(term2)
5673        }
5674    };
5675    if not {
5676        expr = expr.not();
5677    }
5678    Ok(expr)
5679}
5680
5681fn plan_case<'a>(
5682    ecx: &ExprContext,
5683    operand: &'a Option<Box<Expr<Aug>>>,
5684    conditions: &'a [Expr<Aug>],
5685    results: &'a [Expr<Aug>],
5686    else_result: &'a Option<Box<Expr<Aug>>>,
5687) -> Result<HirScalarExpr, PlanError> {
5688    let mut cond_exprs = Vec::new();
5689    let mut result_exprs = Vec::new();
5690    for (c, r) in conditions.iter().zip_eq(results) {
5691        let c = match operand {
5692            Some(operand) => operand.clone().equals(c.clone()),
5693            None => c.clone(),
5694        };
5695        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5696        cond_exprs.push(cexpr);
5697        result_exprs.push(r);
5698    }
5699    result_exprs.push(match else_result {
5700        Some(else_result) => else_result,
5701        None => &Expr::Value(Value::Null),
5702    });
5703    let mut result_exprs = coerce_homogeneous_exprs(
5704        &ecx.with_name("CASE"),
5705        plan_exprs(ecx, &result_exprs)?,
5706        None,
5707    )?;
5708    let mut expr = result_exprs.pop().unwrap();
5709    assert_eq!(cond_exprs.len(), result_exprs.len());
5710    for (cexpr, rexpr) in cond_exprs
5711        .into_iter()
5712        .rev()
5713        .zip_eq(result_exprs.into_iter().rev())
5714    {
5715        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5716    }
5717    Ok(expr)
5718}
5719
5720fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5721    let (datum, scalar_type) = match l {
5722        Value::Number(s) => {
5723            let d = strconv::parse_numeric(s.as_str())?;
5724            if !s.contains(&['E', '.'][..]) {
5725                // Maybe representable as an int?
5726                if let Ok(n) = d.0.try_into() {
5727                    (Datum::Int32(n), SqlScalarType::Int32)
5728                } else if let Ok(n) = d.0.try_into() {
5729                    (Datum::Int64(n), SqlScalarType::Int64)
5730                } else {
5731                    (
5732                        Datum::Numeric(d),
5733                        SqlScalarType::Numeric { max_scale: None },
5734                    )
5735                }
5736            } else {
5737                (
5738                    Datum::Numeric(d),
5739                    SqlScalarType::Numeric { max_scale: None },
5740                )
5741            }
5742        }
5743        Value::HexString(_) => bail_unsupported!("hex string literals"),
5744        Value::Boolean(b) => match b {
5745            false => (Datum::False, SqlScalarType::Bool),
5746            true => (Datum::True, SqlScalarType::Bool),
5747        },
5748        Value::Interval(i) => {
5749            let i = literal::plan_interval(i)?;
5750            (Datum::Interval(i), SqlScalarType::Interval)
5751        }
5752        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5753        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5754    };
5755    let expr = HirScalarExpr::literal(datum, scalar_type);
5756    Ok(expr.into())
5757}
5758
5759/// The common part of the planning of non-aggregate window functions, i.e.,
5760/// scalar window functions and value window functions.
5761fn plan_window_function_non_aggr<'a>(
5762    ecx: &ExprContext,
5763    Function {
5764        name,
5765        args,
5766        filter,
5767        over,
5768        distinct,
5769    }: &'a Function<Aug>,
5770) -> Result<
5771    (
5772        bool,
5773        Vec<HirScalarExpr>,
5774        Vec<ColumnOrder>,
5775        mz_expr::WindowFrame,
5776        Vec<HirScalarExpr>,
5777        Vec<CoercibleScalarExpr>,
5778    ),
5779    PlanError,
5780> {
5781    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5782        plan_window_function_common(ecx, name, over)?;
5783
5784    if *distinct {
5785        sql_bail!(
5786            "DISTINCT specified, but {} is not an aggregate function",
5787            name
5788        );
5789    }
5790
5791    if filter.is_some() {
5792        bail_unsupported!("FILTER in non-aggregate window functions");
5793    }
5794
5795    let scalar_args = match &args {
5796        FunctionArgs::Star => {
5797            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5798        }
5799        FunctionArgs::Args { args, order_by } => {
5800            if !order_by.is_empty() {
5801                sql_bail!(
5802                    "ORDER BY specified, but {} is not an aggregate function",
5803                    name
5804                );
5805            }
5806            plan_exprs(ecx, args)?
5807        }
5808    };
5809
5810    Ok((
5811        ignore_nulls,
5812        order_by_exprs,
5813        col_orders,
5814        window_frame,
5815        partition,
5816        scalar_args,
5817    ))
5818}
5819
5820/// The common part of the planning of all window functions.
5821fn plan_window_function_common(
5822    ecx: &ExprContext,
5823    name: &<Aug as AstInfo>::ItemName,
5824    over: &Option<WindowSpec<Aug>>,
5825) -> Result<
5826    (
5827        bool,
5828        Vec<HirScalarExpr>,
5829        Vec<ColumnOrder>,
5830        mz_expr::WindowFrame,
5831        Vec<HirScalarExpr>,
5832    ),
5833    PlanError,
5834> {
5835    if !ecx.allow_windows {
5836        sql_bail!(
5837            "window functions are not allowed in {} (function {})",
5838            ecx.name,
5839            name
5840        );
5841    }
5842
5843    let window_spec = match over.as_ref() {
5844        Some(over) => over,
5845        None => sql_bail!("window function {} requires an OVER clause", name),
5846    };
5847    if window_spec.ignore_nulls && window_spec.respect_nulls {
5848        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5849    }
5850    let window_frame = match window_spec.window_frame.as_ref() {
5851        Some(frame) => plan_window_frame(frame)?,
5852        None => mz_expr::WindowFrame::default(),
5853    };
5854    let mut partition = Vec::new();
5855    for expr in &window_spec.partition_by {
5856        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5857    }
5858
5859    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5860
5861    Ok((
5862        window_spec.ignore_nulls,
5863        order_by_exprs,
5864        col_orders,
5865        window_frame,
5866        partition,
5867    ))
5868}
5869
5870fn plan_window_frame(
5871    WindowFrame {
5872        units,
5873        start_bound,
5874        end_bound,
5875    }: &WindowFrame,
5876) -> Result<mz_expr::WindowFrame, PlanError> {
5877    use mz_expr::WindowFrameBound::*;
5878    let units = window_frame_unit_ast_to_expr(units)?;
5879    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5880    let end_bound = end_bound
5881        .as_ref()
5882        .map(window_frame_bound_ast_to_expr)
5883        .unwrap_or(CurrentRow);
5884
5885    // Validate bounds according to Postgres rules
5886    match (&start_bound, &end_bound) {
5887        // Start bound can't be UNBOUNDED FOLLOWING
5888        (UnboundedFollowing, _) => {
5889            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5890        }
5891        // End bound can't be UNBOUNDED PRECEDING
5892        (_, UnboundedPreceding) => {
5893            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5894        }
5895        // Start bound should come before end bound in the list of bound definitions
5896        (CurrentRow, OffsetPreceding(_)) => {
5897            sql_bail!("frame starting from current row cannot have preceding rows")
5898        }
5899        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5900            sql_bail!("frame starting from following row cannot have preceding rows")
5901        }
5902        // The above rules are adopted from Postgres.
5903        // The following rules are Materialize-specific.
5904        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5905            // Note that the only hard limit is that partition size + offset should fit in i64, so
5906            // in theory, we could support much larger offsets than this. But for our current
5907            // performance, even 1000000 is quite big.
5908            if *o1 > 1000000 || *o2 > 1000000 {
5909                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5910            }
5911        }
5912        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5913            if *o1 > 1000000 || *o2 > 1000000 {
5914                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5915            }
5916        }
5917        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5918            if *o1 > 1000000 || *o2 > 1000000 {
5919                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5920            }
5921        }
5922        (OffsetPreceding(o), CurrentRow) => {
5923            if *o > 1000000 {
5924                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5925            }
5926        }
5927        (CurrentRow, OffsetFollowing(o)) => {
5928            if *o > 1000000 {
5929                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5930            }
5931        }
5932        // Other bounds are valid
5933        (_, _) => (),
5934    }
5935
5936    // RANGE is only supported in the default frame
5937    // https://github.com/MaterializeInc/database-issues/issues/6585
5938    if units == mz_expr::WindowFrameUnits::Range
5939        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5940    {
5941        bail_unsupported!("RANGE in non-default window frames")
5942    }
5943
5944    let frame = mz_expr::WindowFrame {
5945        units,
5946        start_bound,
5947        end_bound,
5948    };
5949    Ok(frame)
5950}
5951
5952fn window_frame_unit_ast_to_expr(
5953    unit: &WindowFrameUnits,
5954) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5955    match unit {
5956        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5957        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5958        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5959    }
5960}
5961
5962fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5963    match bound {
5964        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5965        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5966        WindowFrameBound::Preceding(Some(offset)) => {
5967            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5968        }
5969        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5970        WindowFrameBound::Following(Some(offset)) => {
5971            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5972        }
5973    }
5974}
5975
5976pub fn scalar_type_from_sql(
5977    scx: &StatementContext,
5978    data_type: &ResolvedDataType,
5979) -> Result<SqlScalarType, PlanError> {
5980    match data_type {
5981        ResolvedDataType::AnonymousList(elem_type) => {
5982            let elem_type = scalar_type_from_sql(scx, elem_type)?;
5983            if matches!(elem_type, SqlScalarType::Char { .. }) {
5984                bail_unsupported!("char list");
5985            }
5986            Ok(SqlScalarType::List {
5987                element_type: Box::new(elem_type),
5988                custom_id: None,
5989            })
5990        }
5991        ResolvedDataType::AnonymousMap {
5992            key_type,
5993            value_type,
5994        } => {
5995            match scalar_type_from_sql(scx, key_type)? {
5996                SqlScalarType::String => {}
5997                other => sql_bail!(
5998                    "map key type must be {}, got {}",
5999                    scx.humanize_scalar_type(&SqlScalarType::String, false),
6000                    scx.humanize_scalar_type(&other, false)
6001                ),
6002            }
6003            Ok(SqlScalarType::Map {
6004                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6005                custom_id: None,
6006            })
6007        }
6008        ResolvedDataType::Named { id, modifiers, .. } => {
6009            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6010        }
6011        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
6012    }
6013}
6014
6015pub fn scalar_type_from_catalog(
6016    catalog: &dyn SessionCatalog,
6017    id: CatalogItemId,
6018    modifiers: &[i64],
6019) -> Result<SqlScalarType, PlanError> {
6020    let entry = catalog.get_item(&id);
6021    let type_details = match entry.type_details() {
6022        Some(type_details) => type_details,
6023        None => {
6024            // Resolution should never produce a `ResolvedDataType::Named` with
6025            // an ID of a non-type, but we error gracefully just in case.
6026            sql_bail!(
6027                "internal error: {} does not refer to a type",
6028                catalog.resolve_full_name(entry.name()).to_string().quoted()
6029            );
6030        }
6031    };
6032    match &type_details.typ {
6033        CatalogType::Numeric => {
6034            let mut modifiers = modifiers.iter().fuse();
6035            let precision = match modifiers.next() {
6036                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6037                    sql_bail!(
6038                        "precision for type numeric must be between 1 and {}",
6039                        NUMERIC_DATUM_MAX_PRECISION,
6040                    );
6041                }
6042                Some(p) => Some(*p),
6043                None => None,
6044            };
6045            let scale = match modifiers.next() {
6046                Some(scale) => {
6047                    if let Some(precision) = precision {
6048                        if *scale > precision {
6049                            sql_bail!(
6050                                "scale for type numeric must be between 0 and precision {}",
6051                                precision
6052                            );
6053                        }
6054                    }
6055                    Some(NumericMaxScale::try_from(*scale)?)
6056                }
6057                None => None,
6058            };
6059            if modifiers.next().is_some() {
6060                sql_bail!("type numeric supports at most two type modifiers");
6061            }
6062            Ok(SqlScalarType::Numeric { max_scale: scale })
6063        }
6064        CatalogType::Char => {
6065            let mut modifiers = modifiers.iter().fuse();
6066            let length = match modifiers.next() {
6067                Some(l) => Some(CharLength::try_from(*l)?),
6068                None => Some(CharLength::ONE),
6069            };
6070            if modifiers.next().is_some() {
6071                sql_bail!("type character supports at most one type modifier");
6072            }
6073            Ok(SqlScalarType::Char { length })
6074        }
6075        CatalogType::VarChar => {
6076            let mut modifiers = modifiers.iter().fuse();
6077            let length = match modifiers.next() {
6078                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6079                None => None,
6080            };
6081            if modifiers.next().is_some() {
6082                sql_bail!("type character varying supports at most one type modifier");
6083            }
6084            Ok(SqlScalarType::VarChar { max_length: length })
6085        }
6086        CatalogType::Timestamp => {
6087            let mut modifiers = modifiers.iter().fuse();
6088            let precision = match modifiers.next() {
6089                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6090                None => None,
6091            };
6092            if modifiers.next().is_some() {
6093                sql_bail!("type timestamp supports at most one type modifier");
6094            }
6095            Ok(SqlScalarType::Timestamp { precision })
6096        }
6097        CatalogType::TimestampTz => {
6098            let mut modifiers = modifiers.iter().fuse();
6099            let precision = match modifiers.next() {
6100                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6101                None => None,
6102            };
6103            if modifiers.next().is_some() {
6104                sql_bail!("type timestamp with time zone supports at most one type modifier");
6105            }
6106            Ok(SqlScalarType::TimestampTz { precision })
6107        }
6108        t => {
6109            if !modifiers.is_empty() {
6110                sql_bail!(
6111                    "{} does not support type modifiers",
6112                    catalog.resolve_full_name(entry.name()).to_string()
6113                );
6114            }
6115            match t {
6116                CatalogType::Array {
6117                    element_reference: element_id,
6118                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6119                    catalog,
6120                    *element_id,
6121                    modifiers,
6122                )?))),
6123                CatalogType::List {
6124                    element_reference: element_id,
6125                    element_modifiers,
6126                } => Ok(SqlScalarType::List {
6127                    element_type: Box::new(scalar_type_from_catalog(
6128                        catalog,
6129                        *element_id,
6130                        element_modifiers,
6131                    )?),
6132                    custom_id: Some(id),
6133                }),
6134                CatalogType::Map {
6135                    key_reference: _,
6136                    key_modifiers: _,
6137                    value_reference: value_id,
6138                    value_modifiers,
6139                } => Ok(SqlScalarType::Map {
6140                    value_type: Box::new(scalar_type_from_catalog(
6141                        catalog,
6142                        *value_id,
6143                        value_modifiers,
6144                    )?),
6145                    custom_id: Some(id),
6146                }),
6147                CatalogType::Range {
6148                    element_reference: element_id,
6149                } => Ok(SqlScalarType::Range {
6150                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6151                }),
6152                CatalogType::Record { fields } => {
6153                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6154                        .iter()
6155                        .map(|f| {
6156                            let scalar_type = scalar_type_from_catalog(
6157                                catalog,
6158                                f.type_reference,
6159                                &f.type_modifiers,
6160                            )?;
6161                            Ok((
6162                                f.name.clone(),
6163                                SqlColumnType {
6164                                    scalar_type,
6165                                    nullable: true,
6166                                },
6167                            ))
6168                        })
6169                        .collect::<Result<Box<_>, PlanError>>()?;
6170                    Ok(SqlScalarType::Record {
6171                        fields: scalars,
6172                        custom_id: Some(id),
6173                    })
6174                }
6175                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6176                CatalogType::Bool => Ok(SqlScalarType::Bool),
6177                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6178                CatalogType::Date => Ok(SqlScalarType::Date),
6179                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6180                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6181                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6182                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6183                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6184                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6185                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6186                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6187                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6188                CatalogType::Interval => Ok(SqlScalarType::Interval),
6189                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6190                CatalogType::Oid => Ok(SqlScalarType::Oid),
6191                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6192                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6193                CatalogType::Pseudo => {
6194                    sql_bail!(
6195                        "cannot reference pseudo type {}",
6196                        catalog.resolve_full_name(entry.name()).to_string()
6197                    )
6198                }
6199                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6200                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6201                CatalogType::RegType => Ok(SqlScalarType::RegType),
6202                CatalogType::String => Ok(SqlScalarType::String),
6203                CatalogType::Time => Ok(SqlScalarType::Time),
6204                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6205                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6206                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6207                CatalogType::Numeric => unreachable!("handled above"),
6208                CatalogType::Char => unreachable!("handled above"),
6209                CatalogType::VarChar => unreachable!("handled above"),
6210                CatalogType::Timestamp => unreachable!("handled above"),
6211                CatalogType::TimestampTz => unreachable!("handled above"),
6212            }
6213        }
6214    }
6215}
6216
6217/// This is used to collect aggregates and table functions from within an `Expr`.
6218/// See the explanation of aggregate handling at the top of the file for more details.
6219struct AggregateTableFuncVisitor<'a> {
6220    scx: &'a StatementContext<'a>,
6221    aggs: Vec<Function<Aug>>,
6222    within_aggregate: bool,
6223    tables: BTreeMap<Function<Aug>, String>,
6224    table_disallowed_context: Vec<&'static str>,
6225    in_select_item: bool,
6226    id_gen: IdGen,
6227    err: Option<PlanError>,
6228}
6229
6230impl<'a> AggregateTableFuncVisitor<'a> {
6231    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6232        AggregateTableFuncVisitor {
6233            scx,
6234            aggs: Vec::new(),
6235            within_aggregate: false,
6236            tables: BTreeMap::new(),
6237            table_disallowed_context: Vec::new(),
6238            in_select_item: false,
6239            id_gen: Default::default(),
6240            err: None,
6241        }
6242    }
6243
6244    fn into_result(
6245        self,
6246    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6247        match self.err {
6248            Some(err) => Err(err),
6249            None => {
6250                // Dedup while preserving the order. We don't care what the order is, but it
6251                // has to be reproducible so that EXPLAIN PLAN tests work.
6252                let mut seen = BTreeSet::new();
6253                let aggs = self
6254                    .aggs
6255                    .into_iter()
6256                    .filter(move |agg| seen.insert(agg.clone()))
6257                    .collect();
6258                Ok((aggs, self.tables))
6259            }
6260        }
6261    }
6262}
6263
6264impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6265    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6266        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6267            Ok(i) => i,
6268            // Catching missing functions later in planning improves error messages.
6269            Err(_) => return,
6270        };
6271
6272        match item.func() {
6273            // We don't want to collect window aggregations, because these will be handled not by
6274            // plan_aggregate, but by plan_function.
6275            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6276                if self.within_aggregate {
6277                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6278                    return;
6279                }
6280                self.aggs.push(func.clone());
6281                let Function {
6282                    name: _,
6283                    args,
6284                    filter,
6285                    over: _,
6286                    distinct: _,
6287                } = func;
6288                if let Some(filter) = filter {
6289                    self.visit_expr_mut(filter);
6290                }
6291                let old_within_aggregate = self.within_aggregate;
6292                self.within_aggregate = true;
6293                self.table_disallowed_context
6294                    .push("aggregate function calls");
6295
6296                self.visit_function_args_mut(args);
6297
6298                self.within_aggregate = old_within_aggregate;
6299                self.table_disallowed_context.pop();
6300            }
6301            Ok(Func::Table { .. }) => {
6302                self.table_disallowed_context.push("other table functions");
6303                visit_mut::visit_function_mut(self, func);
6304                self.table_disallowed_context.pop();
6305            }
6306            _ => visit_mut::visit_function_mut(self, func),
6307        }
6308    }
6309
6310    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6311        // Don't go into subqueries.
6312    }
6313
6314    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6315        let (disallowed_context, func) = match expr {
6316            Expr::Case { .. } => (Some("CASE"), None),
6317            Expr::HomogenizingFunction {
6318                function: HomogenizingFunction::Coalesce,
6319                ..
6320            } => (Some("COALESCE"), None),
6321            Expr::Function(func) if self.in_select_item => {
6322                // If we're in a SELECT list, replace table functions with a uuid identifier
6323                // and save the table func so it can be planned elsewhere.
6324                let mut table_func = None;
6325                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6326                    if let Ok(Func::Table { .. }) = item.func() {
6327                        if let Some(context) = self.table_disallowed_context.last() {
6328                            self.err = Some(sql_err!(
6329                                "table functions are not allowed in {} (function {})",
6330                                context,
6331                                func.name
6332                            ));
6333                            return;
6334                        }
6335                        table_func = Some(func.clone());
6336                    }
6337                }
6338                // Since we will descend into the table func below, don't add its own disallow
6339                // context here, instead use visit_function to set that.
6340                (None, table_func)
6341            }
6342            _ => (None, None),
6343        };
6344        if let Some(func) = func {
6345            // Since we are trading out expr, we need to visit the table func here.
6346            visit_mut::visit_expr_mut(self, expr);
6347            // Don't attempt to replace table functions with unsupported syntax.
6348            if let Function {
6349                name: _,
6350                args: _,
6351                filter: None,
6352                over: None,
6353                distinct: false,
6354            } = &func
6355            {
6356                // Identical table functions can be de-duplicated.
6357                let unique_id = self.id_gen.allocate_id();
6358                let id = self
6359                    .tables
6360                    .entry(func)
6361                    .or_insert_with(|| format!("table_func_{unique_id}"));
6362                // We know this is okay because id is is 11 characters + <=20 characters, which is
6363                // less than our max length.
6364                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6365            }
6366        }
6367        if let Some(context) = disallowed_context {
6368            self.table_disallowed_context.push(context);
6369        }
6370
6371        visit_mut::visit_expr_mut(self, expr);
6372
6373        if disallowed_context.is_some() {
6374            self.table_disallowed_context.pop();
6375        }
6376    }
6377
6378    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6379        let old = self.in_select_item;
6380        self.in_select_item = true;
6381        visit_mut::visit_select_item_mut(self, si);
6382        self.in_select_item = old;
6383    }
6384}
6385
6386#[derive(Default)]
6387struct WindowFuncCollector {
6388    window_funcs: Vec<Expr<Aug>>,
6389}
6390
6391impl WindowFuncCollector {
6392    fn into_result(self) -> Vec<Expr<Aug>> {
6393        // Dedup while preserving the order.
6394        let mut seen = BTreeSet::new();
6395        let window_funcs_dedupped = self
6396            .window_funcs
6397            .into_iter()
6398            .filter(move |expr| seen.insert(expr.clone()))
6399            // Reverse the order, so that in case of a nested window function call, the
6400            // inner one is evaluated first.
6401            .rev()
6402            .collect();
6403        window_funcs_dedupped
6404    }
6405}
6406
6407impl Visit<'_, Aug> for WindowFuncCollector {
6408    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6409        match expr {
6410            Expr::Function(func) => {
6411                if func.over.is_some() {
6412                    self.window_funcs.push(expr.clone());
6413                }
6414            }
6415            _ => (),
6416        }
6417        visit::visit_expr(self, expr);
6418    }
6419
6420    fn visit_query(&mut self, _query: &Query<Aug>) {
6421        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6422    }
6423}
6424
6425/// Specifies how long a query will live.
6426#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6427pub enum QueryLifetime {
6428    /// The query's (or the expression's) result will be computed at one point in time.
6429    OneShot,
6430    /// The query (or expression) is used in a dataflow that maintains an index.
6431    Index,
6432    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6433    MaterializedView,
6434    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6435    Subscribe,
6436    /// The query (or expression) is part of a (non-materialized) view.
6437    View,
6438    /// The expression is part of a source definition.
6439    Source,
6440}
6441
6442impl QueryLifetime {
6443    /// (This used to impact whether the query is allowed to reason about the time at which it is
6444    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6445    /// mechanism, see `ExprPrepStyle`.)
6446    pub fn is_one_shot(&self) -> bool {
6447        let result = match self {
6448            QueryLifetime::OneShot => true,
6449            QueryLifetime::Index => false,
6450            QueryLifetime::MaterializedView => false,
6451            QueryLifetime::Subscribe => false,
6452            QueryLifetime::View => false,
6453            QueryLifetime::Source => false,
6454        };
6455        assert_eq!(!result, self.is_maintained());
6456        result
6457    }
6458
6459    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6460    /// turned into a `TopK`.
6461    pub fn is_maintained(&self) -> bool {
6462        match self {
6463            QueryLifetime::OneShot => false,
6464            QueryLifetime::Index => true,
6465            QueryLifetime::MaterializedView => true,
6466            QueryLifetime::Subscribe => true,
6467            QueryLifetime::View => true,
6468            QueryLifetime::Source => true,
6469        }
6470    }
6471
6472    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6473    pub fn allow_show(&self) -> bool {
6474        match self {
6475            QueryLifetime::OneShot => true,
6476            QueryLifetime::Index => false,
6477            QueryLifetime::MaterializedView => false,
6478            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6479            QueryLifetime::View => false,
6480            QueryLifetime::Source => false,
6481        }
6482    }
6483}
6484
6485/// Description of a CTE sufficient for query planning.
6486#[derive(Debug, Clone)]
6487pub struct CteDesc {
6488    pub name: String,
6489    pub desc: RelationDesc,
6490}
6491
6492/// The state required when planning a `Query`.
6493#[derive(Debug, Clone)]
6494pub struct QueryContext<'a> {
6495    /// The context for the containing `Statement`.
6496    pub scx: &'a StatementContext<'a>,
6497    /// The lifetime that the planned query will have.
6498    pub lifetime: QueryLifetime,
6499    /// The scopes of the outer relation expression.
6500    pub outer_scopes: Vec<Scope>,
6501    /// The type of the outer relation expressions.
6502    pub outer_relation_types: Vec<SqlRelationType>,
6503    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6504    pub ctes: BTreeMap<LocalId, CteDesc>,
6505    /// A name manager, for interning column names that will be stored in HIR and MIR.
6506    pub name_manager: Rc<RefCell<NameManager>>,
6507    pub recursion_guard: RecursionGuard,
6508}
6509
6510impl CheckedRecursion for QueryContext<'_> {
6511    fn recursion_guard(&self) -> &RecursionGuard {
6512        &self.recursion_guard
6513    }
6514}
6515
6516impl<'a> QueryContext<'a> {
6517    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6518        QueryContext {
6519            scx,
6520            lifetime,
6521            outer_scopes: vec![],
6522            outer_relation_types: vec![],
6523            ctes: BTreeMap::new(),
6524            name_manager: Rc::new(RefCell::new(NameManager::new())),
6525            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6526        }
6527    }
6528
6529    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6530        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6531    }
6532
6533    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6534    /// `self`.
6535    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6536        let ctes = self.ctes.clone();
6537        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6538        let outer_relation_types = iter::once(relation_type)
6539            .chain(self.outer_relation_types.clone())
6540            .collect();
6541        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6542        let name_manager = Rc::clone(&self.name_manager);
6543
6544        QueryContext {
6545            scx: self.scx,
6546            lifetime: self.lifetime,
6547            outer_scopes,
6548            outer_relation_types,
6549            ctes,
6550            name_manager,
6551            recursion_guard: self.recursion_guard.clone(),
6552        }
6553    }
6554
6555    /// Derives a `QueryContext` for a scope that contains no columns.
6556    fn empty_derived_context(&self) -> QueryContext<'a> {
6557        let scope = Scope::empty();
6558        let ty = SqlRelationType::empty();
6559        self.derived_context(scope, ty)
6560    }
6561
6562    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6563    /// CTE.
6564    pub fn resolve_table_name(
6565        &self,
6566        object: ResolvedItemName,
6567    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6568        match object {
6569            ResolvedItemName::Item {
6570                id,
6571                full_name,
6572                version,
6573                ..
6574            } => {
6575                let item = self.scx.get_item(&id).at_version(version);
6576                let desc = match item.relation_desc() {
6577                    Some(desc) => desc.clone(),
6578                    None => {
6579                        return Err(PlanError::InvalidDependency {
6580                            name: full_name.to_string(),
6581                            item_type: item.item_type(),
6582                        });
6583                    }
6584                };
6585                let expr = HirRelationExpr::Get {
6586                    id: Id::Global(item.global_id()),
6587                    typ: desc.typ().clone(),
6588                };
6589
6590                let name = full_name.into();
6591                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6592
6593                Ok((expr, scope))
6594            }
6595            ResolvedItemName::Cte { id, name } => {
6596                let name = name.into();
6597                let cte = self.ctes.get(&id).unwrap();
6598                let expr = HirRelationExpr::Get {
6599                    id: Id::Local(id),
6600                    typ: cte.desc.typ().clone(),
6601                };
6602
6603                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6604
6605                Ok((expr, scope))
6606            }
6607            ResolvedItemName::ContinualTask { id, name } => {
6608                let cte = self.ctes.get(&id).unwrap();
6609                let expr = HirRelationExpr::Get {
6610                    id: Id::Local(id),
6611                    typ: cte.desc.typ().clone(),
6612                };
6613
6614                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6615
6616                Ok((expr, scope))
6617            }
6618            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6619        }
6620    }
6621
6622    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6623    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6624    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6625        self.scx.humanize_scalar_type(typ, postgres_compat)
6626    }
6627}
6628
6629/// A bundle of unrelated things that we need for planning `Expr`s.
6630#[derive(Debug, Clone)]
6631pub struct ExprContext<'a> {
6632    pub qcx: &'a QueryContext<'a>,
6633    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6634    pub name: &'a str,
6635    /// The context for the `Query` that contains this `Expr`.
6636    /// The current scope.
6637    pub scope: &'a Scope,
6638    /// The type of the current relation expression upon which this scalar
6639    /// expression will be evaluated.
6640    pub relation_type: &'a SqlRelationType,
6641    /// Are aggregate functions allowed in this context
6642    pub allow_aggregates: bool,
6643    /// Are subqueries allowed in this context
6644    pub allow_subqueries: bool,
6645    /// Are parameters allowed in this context.
6646    pub allow_parameters: bool,
6647    /// Are window functions allowed in this context
6648    pub allow_windows: bool,
6649}
6650
6651impl CheckedRecursion for ExprContext<'_> {
6652    fn recursion_guard(&self) -> &RecursionGuard {
6653        &self.qcx.recursion_guard
6654    }
6655}
6656
6657impl<'a> ExprContext<'a> {
6658    pub fn catalog(&self) -> &dyn SessionCatalog {
6659        self.qcx.scx.catalog
6660    }
6661
6662    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6663        let mut ecx = self.clone();
6664        ecx.name = name;
6665        ecx
6666    }
6667
6668    pub fn column_type<E>(&self, expr: &E) -> E::Type
6669    where
6670        E: AbstractExpr,
6671    {
6672        expr.typ(
6673            &self.qcx.outer_relation_types,
6674            self.relation_type,
6675            &self.qcx.scx.param_types.borrow(),
6676        )
6677    }
6678
6679    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6680    where
6681        E: AbstractExpr,
6682    {
6683        self.column_type(expr).scalar_type()
6684    }
6685
6686    fn derived_query_context(&self) -> QueryContext<'_> {
6687        let mut scope = self.scope.clone();
6688        scope.lateral_barrier = true;
6689        self.qcx.derived_context(scope, self.relation_type.clone())
6690    }
6691
6692    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6693        self.qcx.scx.require_feature_flag(flag)
6694    }
6695
6696    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6697        &self.qcx.scx.param_types
6698    }
6699
6700    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6701    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6702    pub fn humanize_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6703        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6704    }
6705
6706    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6707        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6708    }
6709}
6710
6711/// Manages column names, doing lightweight string internment.
6712///
6713/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6714/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6715/// to ensure maximal sharing.
6716#[derive(Debug, Clone)]
6717pub struct NameManager(BTreeSet<Arc<str>>);
6718
6719impl NameManager {
6720    /// Creates a new `NameManager`, with no interned names
6721    pub fn new() -> Self {
6722        Self(BTreeSet::new())
6723    }
6724
6725    /// Interns a string, returning a reference-counted pointer to the interned
6726    /// string.
6727    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6728        let s = s.as_ref();
6729        if let Some(interned) = self.0.get(s) {
6730            Arc::clone(interned)
6731        } else {
6732            let interned: Arc<str> = Arc::from(s);
6733            self.0.insert(Arc::clone(&interned));
6734            interned
6735        }
6736    }
6737
6738    /// Interns a string representing a reference to a `ScopeItem`, returning a
6739    /// reference-counted pointer to the interned string.
6740    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6741        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6742        //
6743        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6744        // name information. Note that as of 2025-04-09, we don't support column
6745        // renames.
6746        //
6747        // A few bad alternatives:
6748        //
6749        // (1) Store it but don't write it down. This fails because the expression
6750        //     cache will erase our names on restart.
6751        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6752        //     get the right names. But the world now and the world when we made
6753        //     those objects may be different.
6754        // (3) Just don't write down the table name. Nothing fails... for now.
6755
6756        self.intern(item.column_name.as_str())
6757    }
6758}
6759
6760#[cfg(test)]
6761mod test {
6762    use super::*;
6763
6764    /// Ensure that `NameManager`'s string interning works as expected.
6765    ///
6766    /// In particular, structurally but not referentially identical strings should
6767    /// be interned to the same `Arc`ed pointer.
6768    #[mz_ore::test]
6769    pub fn test_name_manager_string_interning() {
6770        let mut nm = NameManager::new();
6771
6772        let orig_hi = "hi";
6773        let hi = nm.intern(orig_hi);
6774        let hello = nm.intern("hello");
6775
6776        assert_ne!(hi.as_ptr(), hello.as_ptr());
6777
6778        // this static string is _likely_ the same as `orig_hi``
6779        let hi2 = nm.intern("hi");
6780        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6781
6782        // generate a "hi" string that doesn't get optimized to the same static string
6783        let s = format!(
6784            "{}{}",
6785            hi.chars().nth(0).unwrap(),
6786            hi2.chars().nth(1).unwrap()
6787        );
6788        // make sure that we're testing with a fresh string!
6789        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6790
6791        let hi3 = nm.intern(s);
6792        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6793    }
6794}