Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50    ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51    MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
56    TableFunc, func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
70use mz_repr::{
71    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
72    ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
73    UNKNOWN_COLUMN_NAME, strconv,
74};
75use mz_sql_parser::ast::display::AstDisplay;
76use mz_sql_parser::ast::visit::Visit;
77use mz_sql_parser::ast::visit_mut::{self, VisitMut};
78use mz_sql_parser::ast::{
79    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
80    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
81    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
82    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
83    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
84    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
85    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
86    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
87};
88use mz_sql_parser::ident;
89
90use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
91use crate::func::{self, Func, FuncSpec, TableFuncImpl};
92use crate::names::{
93    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
94};
95use crate::plan::PlanError::InvalidWmrRecursionLimit;
96use crate::plan::error::PlanError;
97use crate::plan::hir::{
98    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
99    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
100    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
101    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
102};
103use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
104use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
105use crate::plan::statement::{StatementContext, StatementDesc, show};
106use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
107use crate::plan::{
108    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
109    literal, transform_ast,
110};
111use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
112use crate::session::vars::{self, FeatureFlag};
113use crate::{ORDINALITY_COL_NAME, normalize};
114
115#[derive(Debug)]
116pub struct PlannedRootQuery<E> {
117    pub expr: E,
118    pub desc: RelationDesc,
119    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
120    pub scope: Scope,
121}
122
123/// Plans a top-level query, returning the `HirRelationExpr` describing the query
124/// plan, the `RelationDesc` describing the shape of the result set, a
125/// `RowSetFinishing` describing post-processing that must occur before results
126/// are sent to the client, and the types of the parameters in the query, if any
127/// were present.
128///
129/// Note that the returned `RelationDesc` describes the expression after
130/// applying the returned `RowSetFinishing`.
131#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
132pub fn plan_root_query(
133    scx: &StatementContext,
134    mut query: Query<Aug>,
135    lifetime: QueryLifetime,
136) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
137    transform_ast::transform(scx, &mut query)?;
138    let mut qcx = QueryContext::root(scx, lifetime);
139    let PlannedQuery {
140        mut expr,
141        scope,
142        order_by,
143        limit,
144        offset,
145        project,
146        group_size_hints,
147    } = plan_query(&mut qcx, &query)?;
148
149    let mut finishing = RowSetFinishing {
150        limit,
151        offset,
152        project,
153        order_by,
154    };
155
156    // Attempt to push the finishing's ordering past its projection. This allows
157    // data to be projected down on the workers rather than the coordinator. It
158    // also improves the optimizer's demand analysis, as the optimizer can only
159    // reason about demand information in `expr` (i.e., it can't see
160    // `finishing.project`).
161    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
162
163    if lifetime.is_maintained() {
164        expr.finish_maintained(&mut finishing, group_size_hints);
165    }
166
167    let typ = qcx.relation_type(&expr);
168    let typ = SqlRelationType::new(
169        finishing
170            .project
171            .iter()
172            .map(|i| typ.column_types[*i].clone())
173            .collect(),
174    );
175    let desc = RelationDesc::new(typ, scope.column_names());
176
177    Ok(PlannedRootQuery {
178        expr,
179        desc,
180        finishing,
181        scope,
182    })
183}
184
185/// Attempts to push a projection through an order by.
186///
187/// The returned bool indicates whether the pushdown was successful or not.
188/// Successful pushdown requires that all the columns referenced in `order_by`
189/// are included in `project`.
190///
191/// When successful, `expr` is wrapped in a projection node, `order_by` is
192/// rewritten to account for the pushed-down projection, and `project` is
193/// replaced with the trivial projection. When unsuccessful, no changes are made
194/// to any of the inputs.
195fn try_push_projection_order_by(
196    expr: &mut HirRelationExpr,
197    project: &mut Vec<usize>,
198    order_by: &mut Vec<ColumnOrder>,
199) -> bool {
200    let mut unproject = vec![None; expr.arity()];
201    for (out_i, in_i) in project.iter().copied().enumerate() {
202        unproject[in_i] = Some(out_i);
203    }
204    if order_by
205        .iter()
206        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
207    {
208        let trivial_project = (0..project.len()).collect();
209        *expr = expr.take().project(mem::replace(project, trivial_project));
210        for ob in order_by {
211            ob.column = unproject[ob.column].unwrap();
212        }
213        true
214    } else {
215        false
216    }
217}
218
219pub fn plan_insert_query(
220    scx: &StatementContext,
221    table_name: ResolvedItemName,
222    columns: Vec<Ident>,
223    source: InsertSource<Aug>,
224    returning: Vec<SelectItem<Aug>>,
225) -> Result<
226    (
227        CatalogItemId,
228        HirRelationExpr,
229        PlannedRootQuery<Vec<HirScalarExpr>>,
230    ),
231    PlanError,
232> {
233    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
234    let table = scx.get_item_by_resolved_name(&table_name)?;
235
236    // Validate the target of the insert.
237    if table.item_type() != CatalogItemType::Table {
238        sql_bail!(
239            "cannot insert into {} '{}'",
240            table.item_type(),
241            table_name.full_name_str()
242        );
243    }
244    let desc = table
245        .relation_desc()
246        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
247    let mut defaults = table
248        .writable_table_details()
249        .ok_or_else(|| {
250            sql_err!(
251                "cannot insert into non-writeable table '{}'",
252                table_name.full_name_str()
253            )
254        })?
255        .to_vec();
256
257    for default in &mut defaults {
258        transform_ast::transform(scx, default)?;
259    }
260
261    if table.id().is_system() {
262        sql_bail!(
263            "cannot insert into system table '{}'",
264            table_name.full_name_str()
265        );
266    }
267
268    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
269
270    // Validate target column order.
271    let mut source_types = Vec::with_capacity(columns.len());
272    let mut ordering = Vec::with_capacity(columns.len());
273
274    if columns.is_empty() {
275        // Columns in source query must be in order. Let's guess the full shape and truncate to the
276        // right size later after planning the source query
277        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
278        ordering.extend(0..desc.arity());
279    } else {
280        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
281            .iter()
282            .enumerate()
283            .map(|(idx, (name, typ))| (name, (idx, typ)))
284            .collect();
285
286        for c in &columns {
287            if let Some((idx, typ)) = column_by_name.get(c) {
288                ordering.push(*idx);
289                source_types.push(&typ.scalar_type);
290            } else {
291                sql_bail!(
292                    "column {} of relation {} does not exist",
293                    c.quoted(),
294                    table_name.full_name_str().quoted()
295                );
296            }
297        }
298        if let Some(dup) = columns.iter().duplicates().next() {
299            sql_bail!("column {} specified more than once", dup.quoted());
300        }
301    };
302
303    // Plan the source.
304    let expr = match source {
305        InsertSource::Query(mut query) => {
306            transform_ast::transform(scx, &mut query)?;
307
308            match query {
309                // Special-case simple VALUES clauses as PostgreSQL does.
310                Query {
311                    body: SetExpr::Values(Values(values)),
312                    ctes,
313                    order_by,
314                    limit: None,
315                    offset: None,
316                } if ctes.is_empty() && order_by.is_empty() => {
317                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
318                    plan_values_insert(&qcx, &names, &source_types, &values)?
319                }
320                _ => {
321                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
322                    expr
323                }
324            }
325        }
326        InsertSource::DefaultValues => {
327            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
328        }
329    };
330
331    let expr_arity = expr.arity();
332
333    // Validate that the arity of the source query is at most the size of declared columns or the
334    // size of the table if none are declared
335    let max_columns = if columns.is_empty() {
336        desc.arity()
337    } else {
338        columns.len()
339    };
340    if expr_arity > max_columns {
341        sql_bail!("INSERT has more expressions than target columns");
342    }
343    // But it should never have less than the declared columns (or zero)
344    if expr_arity < columns.len() {
345        sql_bail!("INSERT has more target columns than expressions");
346    }
347
348    // Trim now that we know for sure the correct arity of the source query
349    source_types.truncate(expr_arity);
350    ordering.truncate(expr_arity);
351
352    // Ensure the types of the source query match the types of the target table,
353    // installing assignment casts where necessary and possible.
354    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
355        sql_err!(
356            "column {} is of type {} but expression is of type {}",
357            desc.get_name(ordering[e.column]).quoted(),
358            qcx.humanize_sql_scalar_type(&e.target_type, false),
359            qcx.humanize_sql_scalar_type(&e.source_type, false),
360        )
361    })?;
362
363    // Fill in any omitted columns and rearrange into correct order
364    let mut map_exprs = vec![];
365    let mut project_key = Vec::with_capacity(desc.arity());
366
367    // Maps from table column index to position in the source query
368    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
369
370    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
371    for (col_idx, (col_typ, default)) in column_details {
372        if let Some(src_idx) = col_to_source.get(&col_idx) {
373            project_key.push(*src_idx);
374        } else {
375            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
376            project_key.push(expr_arity + map_exprs.len());
377            map_exprs.push(hir);
378        }
379    }
380
381    let returning = {
382        let (scope, typ) = if let ResolvedItemName::Item {
383            full_name,
384            version: _,
385            ..
386        } = table_name
387        {
388            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
389            let typ = desc.typ().clone();
390            (scope, typ)
391        } else {
392            (Scope::empty(), SqlRelationType::empty())
393        };
394        let ecx = &ExprContext {
395            qcx: &qcx,
396            name: "RETURNING clause",
397            scope: &scope,
398            relation_type: &typ,
399            allow_aggregates: false,
400            allow_subqueries: false,
401            allow_parameters: true,
402            allow_windows: false,
403        };
404        let table_func_names = BTreeMap::new();
405        let mut output_columns = vec![];
406        let mut new_exprs = vec![];
407        let mut new_type = SqlRelationType::empty();
408        for mut si in returning {
409            transform_ast::transform(scx, &mut si)?;
410            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
411                let expr = match &select_item {
412                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
413                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
414                };
415                output_columns.push(column_name);
416                let typ = ecx.column_type(&expr);
417                new_type.column_types.push(typ);
418                new_exprs.push(expr);
419            }
420        }
421        let desc = RelationDesc::new(new_type, output_columns);
422        let desc_arity = desc.arity();
423        PlannedRootQuery {
424            expr: new_exprs,
425            desc,
426            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
427            scope,
428        }
429    };
430
431    Ok((
432        table.id(),
433        expr.map(map_exprs).project(project_key),
434        returning,
435    ))
436}
437
438/// Determines the mapping between some external data and a Materialize relation.
439///
440/// Returns the following:
441/// * [`CatalogItemId`] for the destination table.
442/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
443/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
444///   since we now return a [`MapFilterProject`].
445/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
446///   destination table.
447///
448pub fn plan_copy_item(
449    scx: &StatementContext,
450    item_name: ResolvedItemName,
451    columns: Vec<Ident>,
452) -> Result<
453    (
454        CatalogItemId,
455        RelationDesc,
456        Vec<ColumnIndex>,
457        Option<MapFilterProject>,
458    ),
459    PlanError,
460> {
461    let item = scx.get_item_by_resolved_name(&item_name)?;
462    let fullname = scx.catalog.resolve_full_name(item.name());
463    let table_desc = match item.relation_desc() {
464        Some(desc) => desc.into_owned(),
465        None => {
466            return Err(PlanError::InvalidDependency {
467                name: fullname.to_string(),
468                item_type: item.item_type().to_string(),
469            });
470        }
471    };
472    let mut ordering = Vec::with_capacity(columns.len());
473
474    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
475    // should be simplified. The reason they are currently separate code paths is so we can roll
476    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
477
478    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
479    // FROM SOURCE ...`), then we generate an MFP.
480    //
481    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
482    // so it's not always guaranteed that our `item` is a table.
483    let mfp = if let Some(table_defaults) = item.writable_table_details() {
484        let mut table_defaults = table_defaults.to_vec();
485
486        for default in &mut table_defaults {
487            transform_ast::transform(scx, default)?;
488        }
489
490        // Fill in any omitted columns and rearrange into correct order
491        let source_column_names: Vec<_> = columns
492            .iter()
493            .cloned()
494            .map(normalize::column_name)
495            .collect();
496
497        let mut default_exprs = Vec::new();
498        let mut project_keys = Vec::with_capacity(table_desc.arity());
499
500        // For each column in the destination table, either project it from the source data, or provide
501        // an expression to fill in a default value.
502        let column_details = table_desc.iter().zip_eq(table_defaults);
503        for ((col_name, col_type), col_default) in column_details {
504            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
505            if let Some(src_idx) = maybe_src_idx {
506                project_keys.push(src_idx);
507            } else {
508                // If one a column from the table does not exist in the source data, then a default
509                // value will get appended to the end of the input Row from the source data.
510                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
511                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
512                project_keys.push(source_column_names.len() + default_exprs.len());
513                default_exprs.push(mir);
514            }
515        }
516
517        let mfp = MapFilterProject::new(source_column_names.len())
518            .map(default_exprs)
519            .project(project_keys);
520        Some(mfp)
521    } else {
522        None
523    };
524
525    // Create a mapping from input data to the table we're copying into.
526    let source_desc = if columns.is_empty() {
527        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
528        ordering.extend(indexes);
529
530        // The source data should be in the same order as the table.
531        table_desc
532    } else {
533        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
534        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
535            .iter_all()
536            .map(|(idx, name, typ)| (name, (*idx, typ)))
537            .collect();
538
539        let mut names = Vec::with_capacity(columns.len());
540        let mut source_types = Vec::with_capacity(columns.len());
541
542        for c in &columns {
543            if let Some((idx, typ)) = column_by_name.get(c) {
544                ordering.push(*idx);
545                source_types.push((*typ).clone());
546                names.push(c.clone());
547            } else {
548                sql_bail!(
549                    "column {} of relation {} does not exist",
550                    c.quoted(),
551                    item_name.full_name_str().quoted()
552                );
553            }
554        }
555        if let Some(dup) = columns.iter().duplicates().next() {
556            sql_bail!("column {} specified more than once", dup.quoted());
557        }
558
559        // The source data is a different shape than the destination table.
560        RelationDesc::new(SqlRelationType::new(source_types), names)
561    };
562
563    Ok((item.id(), source_desc, ordering, mfp))
564}
565
566/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
567///
568/// TODO(cf3): Merge this method with [`plan_copy_item`].
569pub fn plan_copy_from(
570    scx: &StatementContext,
571    table_name: ResolvedItemName,
572    columns: Vec<Ident>,
573) -> Result<
574    (
575        CatalogItemId,
576        RelationDesc,
577        Vec<ColumnIndex>,
578        Option<MapFilterProject>,
579    ),
580    PlanError,
581> {
582    let table = scx.get_item_by_resolved_name(&table_name)?;
583
584    // Validate the target of the insert.
585    if table.item_type() != CatalogItemType::Table {
586        sql_bail!(
587            "cannot insert into {} '{}'",
588            table.item_type(),
589            table_name.full_name_str()
590        );
591    }
592
593    let _ = table.writable_table_details().ok_or_else(|| {
594        sql_err!(
595            "cannot insert into non-writeable table '{}'",
596            table_name.full_name_str()
597        )
598    })?;
599
600    if table.id().is_system() {
601        sql_bail!(
602            "cannot insert into system table '{}'",
603            table_name.full_name_str()
604        );
605    }
606    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
607
608    Ok((id, desc, ordering, mfp))
609}
610
611/// Builds a plan that adds the default values for the missing columns and re-orders
612/// the datums in the given rows to match the order in the target table.
613pub fn plan_copy_from_rows(
614    pcx: &PlanContext,
615    catalog: &dyn SessionCatalog,
616    target_id: CatalogItemId,
617    target_name: String,
618    columns: Vec<ColumnIndex>,
619    rows: Vec<mz_repr::Row>,
620) -> Result<HirRelationExpr, PlanError> {
621    let scx = StatementContext::new(Some(pcx), catalog);
622
623    // Always copy at the latest version of the table.
624    let table = catalog
625        .try_get_item(&target_id)
626        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
627        .at_version(RelationVersionSelector::Latest);
628
629    let mut defaults = table
630        .writable_table_details()
631        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
632        .to_vec();
633
634    for default in &mut defaults {
635        transform_ast::transform(&scx, default)?;
636    }
637
638    let desc = table
639        .relation_desc()
640        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
641    let column_types = columns
642        .iter()
643        .map(|x| desc.get_type(x).clone())
644        .map(|mut x| {
645            // Null constraint is enforced later, when inserting the row in the table.
646            // Without this, an assert is hit during lowering.
647            x.nullable = true;
648            x
649        })
650        .collect();
651    let typ = SqlRelationType::new(column_types);
652    let expr = HirRelationExpr::Constant {
653        rows,
654        typ: typ.clone(),
655    };
656
657    // Exit early with just the raw constant if we know that all columns are present
658    // and in the correct order. This lets us bypass expensive downstream optimizations
659    // more easily, as at every stage we know this expression is nothing more than
660    // a constant (as opposed to e.g. a constant with with an identity map and identity
661    // projection).
662    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
663    if columns == default {
664        return Ok(expr);
665    }
666
667    // Fill in any omitted columns and rearrange into correct order
668    let mut map_exprs = vec![];
669    let mut project_key = Vec::with_capacity(desc.arity());
670
671    // Maps from table column index to position in the source query
672    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
673
674    let column_details = desc.iter_all().zip_eq(defaults);
675    for ((col_idx, _col_name, col_typ), default) in column_details {
676        if let Some(src_idx) = col_to_source.get(&col_idx) {
677            project_key.push(*src_idx);
678        } else {
679            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
680            project_key.push(typ.arity() + map_exprs.len());
681            map_exprs.push(hir);
682        }
683    }
684
685    Ok(expr.map(map_exprs).project(project_key))
686}
687
688/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
689pub struct ReadThenWritePlan {
690    pub id: CatalogItemId,
691    /// Read portion of query.
692    ///
693    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
694    /// retractions.
695    pub selection: HirRelationExpr,
696    /// Map from column index to SET expression. Empty for DELETE statements.
697    pub assignments: BTreeMap<usize, HirScalarExpr>,
698    pub finishing: RowSetFinishing,
699}
700
701pub fn plan_delete_query(
702    scx: &StatementContext,
703    mut delete_stmt: DeleteStatement<Aug>,
704) -> Result<ReadThenWritePlan, PlanError> {
705    transform_ast::transform(scx, &mut delete_stmt)?;
706
707    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
708    plan_mutation_query_inner(
709        qcx,
710        delete_stmt.table_name,
711        delete_stmt.alias,
712        delete_stmt.using,
713        vec![],
714        delete_stmt.selection,
715    )
716}
717
718pub fn plan_update_query(
719    scx: &StatementContext,
720    mut update_stmt: UpdateStatement<Aug>,
721) -> Result<ReadThenWritePlan, PlanError> {
722    transform_ast::transform(scx, &mut update_stmt)?;
723
724    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
725
726    plan_mutation_query_inner(
727        qcx,
728        update_stmt.table_name,
729        update_stmt.alias,
730        vec![],
731        update_stmt.assignments,
732        update_stmt.selection,
733    )
734}
735
736pub fn plan_mutation_query_inner(
737    qcx: QueryContext,
738    table_name: ResolvedItemName,
739    alias: Option<TableAlias>,
740    using: Vec<TableWithJoins<Aug>>,
741    assignments: Vec<Assignment<Aug>>,
742    selection: Option<Expr<Aug>>,
743) -> Result<ReadThenWritePlan, PlanError> {
744    // Get ID and version of the relation desc.
745    let (id, version) = match table_name {
746        ResolvedItemName::Item { id, version, .. } => (id, version),
747        _ => sql_bail!("cannot mutate non-user table"),
748    };
749
750    // Perform checks on item with given ID.
751    let item = qcx.scx.get_item(&id).at_version(version);
752    if item.item_type() != CatalogItemType::Table {
753        sql_bail!(
754            "cannot mutate {} '{}'",
755            item.item_type(),
756            table_name.full_name_str()
757        );
758    }
759    let _ = item.writable_table_details().ok_or_else(|| {
760        sql_err!(
761            "cannot mutate non-writeable table '{}'",
762            table_name.full_name_str()
763        )
764    })?;
765    if id.is_system() {
766        sql_bail!(
767            "cannot mutate system table '{}'",
768            table_name.full_name_str()
769        );
770    }
771
772    // Derive structs for operation from validated table
773    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
774    let scope = plan_table_alias(scope, alias.as_ref())?;
775    let desc = item.relation_desc().expect("table has desc");
776    let relation_type = qcx.relation_type(&get);
777
778    if using.is_empty() {
779        if let Some(expr) = selection {
780            let ecx = &ExprContext {
781                qcx: &qcx,
782                name: "WHERE clause",
783                scope: &scope,
784                relation_type: &relation_type,
785                allow_aggregates: false,
786                allow_subqueries: true,
787                allow_parameters: true,
788                allow_windows: false,
789            };
790            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
791            get = get.filter(vec![expr]);
792        }
793    } else {
794        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
795    }
796
797    let mut sets = BTreeMap::new();
798    for Assignment { id, value } in assignments {
799        // Get the index and type of the column.
800        let name = normalize::column_name(id);
801        match desc.get_by_name(&name) {
802            Some((idx, typ)) => {
803                let ecx = &ExprContext {
804                    qcx: &qcx,
805                    name: "SET clause",
806                    scope: &scope,
807                    relation_type: &relation_type,
808                    allow_aggregates: false,
809                    allow_subqueries: false,
810                    allow_parameters: true,
811                    allow_windows: false,
812                };
813                let expr = plan_expr(ecx, &value)?.cast_to(
814                    ecx,
815                    CastContext::Assignment,
816                    &typ.scalar_type,
817                )?;
818
819                if sets.insert(idx, expr).is_some() {
820                    sql_bail!("column {} set twice", name)
821                }
822            }
823            None => sql_bail!("unknown column {}", name),
824        };
825    }
826
827    let finishing = RowSetFinishing {
828        order_by: vec![],
829        limit: None,
830        offset: 0,
831        project: (0..desc.arity()).collect(),
832    };
833
834    Ok(ReadThenWritePlan {
835        id,
836        selection: get,
837        finishing,
838        assignments: sets,
839    })
840}
841
842// Adjust `get` to perform an existential subquery on `using` accounting for
843// `selection`.
844//
845// If `USING`, we essentially want to rewrite the query as a correlated
846// existential subquery, i.e.
847// ```
848// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
849// ```
850// However, we can't do that directly because of esoteric rules w/r/t `lateral`
851// subqueries.
852// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
853fn handle_mutation_using_clause(
854    qcx: &QueryContext,
855    selection: Option<Expr<Aug>>,
856    using: Vec<TableWithJoins<Aug>>,
857    get: HirRelationExpr,
858    outer_scope: Scope,
859) -> Result<HirRelationExpr, PlanError> {
860    // Plan `USING` as a cross-joined `FROM` without knowledge of the
861    // statement's `FROM` target. This prevents `lateral` subqueries from
862    // "seeing" the `FROM` target.
863    let (mut using_rel_expr, using_scope) =
864        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
865            let (left, left_scope) = l;
866            plan_join(
867                qcx,
868                left,
869                left_scope,
870                &Join {
871                    relation: TableFactor::NestedJoin {
872                        join: Box::new(twj),
873                        alias: None,
874                    },
875                    join_operator: JoinOperator::CrossJoin,
876                },
877            )
878        })?;
879
880    if let Some(expr) = selection {
881        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
882        // PG-like semantics e.g. expressing ambiguous column references. We put
883        // `USING...` first for no real reason, but making a different decision
884        // would require adjusting the column references on this relation
885        // differently.
886        let on = HirScalarExpr::literal_true();
887        let joined = using_rel_expr
888            .clone()
889            .join(get.clone(), on, JoinKind::Inner);
890        let joined_scope = using_scope.product(outer_scope)?;
891        let joined_relation_type = qcx.relation_type(&joined);
892
893        let ecx = &ExprContext {
894            qcx,
895            name: "WHERE clause",
896            scope: &joined_scope,
897            relation_type: &joined_relation_type,
898            allow_aggregates: false,
899            allow_subqueries: true,
900            allow_parameters: true,
901            allow_windows: false,
902        };
903
904        // Plan the filter expression on `FROM, USING...`.
905        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
906
907        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
908        // those to the right of `using_rel_expr`) to instead be correlated to
909        // the outer relation, i.e. `get`.
910        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
911        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
912        use mz_expr::visit::Visit;
913        expr.visit_mut_post(&mut |e| {
914            if let HirScalarExpr::Column(c, _name) = e {
915                if c.column >= using_rel_arity {
916                    c.level += 1;
917                    c.column -= using_rel_arity;
918                };
919            }
920        })?;
921
922        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
923        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
924        // relation.
925        using_rel_expr = using_rel_expr.filter(vec![expr]);
926    } else {
927        // Check that scopes are at compatible (i.e. do not double-reference
928        // same table), despite lack of selection
929        let _joined_scope = using_scope.product(outer_scope)?;
930    }
931    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
932    // whether any rows are returned, and not on the contents of those rows,
933    // the output list of the subquery is normally unimportant.
934    //
935    // This means we don't need to worry about projecting/mapping any
936    // additional expressions here.
937    //
938    // https://www.postgresql.org/docs/14/functions-subquery.html
939
940    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
941    Ok(get.filter(vec![using_rel_expr.exists()]))
942}
943
944#[derive(Debug)]
945pub(crate) struct CastRelationError {
946    pub(crate) column: usize,
947    pub(crate) source_type: SqlScalarType,
948    pub(crate) target_type: SqlScalarType,
949}
950
951/// Cast a relation from one type to another using the specified type of cast.
952///
953/// The length of `target_types` must match the arity of `expr`.
954pub(crate) fn cast_relation<'a, I>(
955    qcx: &QueryContext,
956    ccx: CastContext,
957    expr: HirRelationExpr,
958    target_types: I,
959) -> Result<HirRelationExpr, CastRelationError>
960where
961    I: IntoIterator<Item = &'a SqlScalarType>,
962{
963    let ecx = &ExprContext {
964        qcx,
965        name: "values",
966        scope: &Scope::empty(),
967        relation_type: &qcx.relation_type(&expr),
968        allow_aggregates: false,
969        allow_subqueries: true,
970        allow_parameters: true,
971        allow_windows: false,
972    };
973    let mut map_exprs = vec![];
974    let mut project_key = vec![];
975    for (i, target_typ) in target_types.into_iter().enumerate() {
976        let expr = HirScalarExpr::column(i);
977        // We plan every cast and check the evaluated expressions rather than
978        // checking the types directly because of some complex casting rules
979        // between types not expressed in `SqlScalarType` equality.
980        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
981            Ok(cast_expr) => {
982                if expr == cast_expr {
983                    // Cast between types was unnecessary
984                    project_key.push(i);
985                } else {
986                    // Cast between types required
987                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
988                    map_exprs.push(cast_expr);
989                }
990            }
991            Err(_) => {
992                return Err(CastRelationError {
993                    column: i,
994                    source_type: ecx.scalar_type(&expr),
995                    target_type: target_typ.clone(),
996                });
997            }
998        }
999    }
1000    Ok(expr.map(map_exprs).project(project_key))
1001}
1002
1003/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1004/// VIEW` statement.
1005pub fn plan_as_of(
1006    scx: &StatementContext,
1007    as_of: Option<AsOf<Aug>>,
1008) -> Result<QueryWhen, PlanError> {
1009    match as_of {
1010        None => Ok(QueryWhen::Immediately),
1011        Some(as_of) => match as_of {
1012            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1013            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1014        },
1015    }
1016}
1017
1018/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1019///
1020/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1021/// - not a constant,
1022/// - not castable to MzTimestamp,
1023/// - is null,
1024/// - contains an unmaterializable function,
1025/// - some other evaluation error occurs, e.g., a division by 0,
1026/// - contains aggregates, subqueries, parameters, or window function calls.
1027pub fn plan_as_of_or_up_to(
1028    scx: &StatementContext,
1029    mut expr: Expr<Aug>,
1030) -> Result<mz_repr::Timestamp, PlanError> {
1031    let scope = Scope::empty();
1032    let desc = RelationDesc::empty();
1033    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1034    // evaluated only once.)
1035    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1036    transform_ast::transform(scx, &mut expr)?;
1037    let ecx = &ExprContext {
1038        qcx: &qcx,
1039        name: "AS OF or UP TO",
1040        scope: &scope,
1041        relation_type: desc.typ(),
1042        allow_aggregates: false,
1043        allow_subqueries: false,
1044        allow_parameters: false,
1045        allow_windows: false,
1046    };
1047    let hir = plan_expr(ecx, &expr)?.cast_to(
1048        ecx,
1049        CastContext::Assignment,
1050        &SqlScalarType::MzTimestamp,
1051    )?;
1052    if hir.contains_unmaterializable() {
1053        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1054    }
1055    // At this point, we definitely have a constant expression:
1056    // - it can't contain any unmaterializable functions;
1057    // - it can't refer to any columns.
1058    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1059    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1060    // division by 0.
1061    let timestamp = hir
1062        .into_literal_mz_timestamp()
1063        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1064    Ok(timestamp)
1065}
1066
1067/// Plans an expression in the AS position of a `CREATE SECRET`.
1068pub fn plan_secret_as(
1069    scx: &StatementContext,
1070    mut expr: Expr<Aug>,
1071) -> Result<MirScalarExpr, PlanError> {
1072    let scope = Scope::empty();
1073    let desc = RelationDesc::empty();
1074    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1075
1076    transform_ast::transform(scx, &mut expr)?;
1077
1078    let ecx = &ExprContext {
1079        qcx: &qcx,
1080        name: "AS",
1081        scope: &scope,
1082        relation_type: desc.typ(),
1083        allow_aggregates: false,
1084        allow_subqueries: false,
1085        allow_parameters: false,
1086        allow_windows: false,
1087    };
1088    let expr = plan_expr(ecx, &expr)?
1089        .type_as(ecx, &SqlScalarType::Bytes)?
1090        .lower_uncorrelated(scx.catalog.system_vars())?;
1091    Ok(expr)
1092}
1093
1094/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1095pub fn plan_webhook_validate_using(
1096    scx: &StatementContext,
1097    validate_using: CreateWebhookSourceCheck<Aug>,
1098) -> Result<WebhookValidation, PlanError> {
1099    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1100
1101    let CreateWebhookSourceCheck {
1102        options,
1103        using: mut expr,
1104    } = validate_using;
1105
1106    let mut column_typs = vec![];
1107    let mut column_names = vec![];
1108
1109    let (bodies, headers, secrets) = options
1110        .map(|o| (o.bodies, o.headers, o.secrets))
1111        .unwrap_or_default();
1112
1113    // Append all of the bodies so they can be used in the expression.
1114    let mut body_tuples = vec![];
1115    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1116        let scalar_type = use_bytes
1117            .then_some(SqlScalarType::Bytes)
1118            .unwrap_or(SqlScalarType::String);
1119        let name = alias
1120            .map(|a| a.into_string())
1121            .unwrap_or_else(|| "body".to_string());
1122
1123        column_typs.push(SqlColumnType {
1124            scalar_type,
1125            nullable: false,
1126        });
1127        column_names.push(name);
1128
1129        // Store the column index so we can be sure to provide this body correctly.
1130        let column_idx = column_typs.len() - 1;
1131        // Double check we're consistent with column names.
1132        assert_eq!(
1133            column_idx,
1134            column_names.len() - 1,
1135            "body column names and types don't match"
1136        );
1137        body_tuples.push((column_idx, use_bytes));
1138    }
1139
1140    // Append all of the headers so they can be used in the expression.
1141    let mut header_tuples = vec![];
1142
1143    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1144        let value_type = use_bytes
1145            .then_some(SqlScalarType::Bytes)
1146            .unwrap_or(SqlScalarType::String);
1147        let name = alias
1148            .map(|a| a.into_string())
1149            .unwrap_or_else(|| "headers".to_string());
1150
1151        column_typs.push(SqlColumnType {
1152            scalar_type: SqlScalarType::Map {
1153                value_type: Box::new(value_type),
1154                custom_id: None,
1155            },
1156            nullable: false,
1157        });
1158        column_names.push(name);
1159
1160        // Store the column index so we can be sure to provide this body correctly.
1161        let column_idx = column_typs.len() - 1;
1162        // Double check we're consistent with column names.
1163        assert_eq!(
1164            column_idx,
1165            column_names.len() - 1,
1166            "header column names and types don't match"
1167        );
1168        header_tuples.push((column_idx, use_bytes));
1169    }
1170
1171    // Append all secrets so they can be used in the expression.
1172    let mut validation_secrets = vec![];
1173
1174    for CreateWebhookSourceSecret {
1175        secret,
1176        alias,
1177        use_bytes,
1178    } in secrets
1179    {
1180        // Either provide the secret to the validation expression as Bytes or a String.
1181        let scalar_type = use_bytes
1182            .then_some(SqlScalarType::Bytes)
1183            .unwrap_or(SqlScalarType::String);
1184
1185        column_typs.push(SqlColumnType {
1186            scalar_type,
1187            nullable: false,
1188        });
1189        let ResolvedItemName::Item {
1190            id,
1191            full_name: FullItemName { item, .. },
1192            ..
1193        } = secret
1194        else {
1195            return Err(PlanError::InvalidSecret(Box::new(secret)));
1196        };
1197
1198        // Plan the expression using the secret's alias, if one is provided.
1199        let name = if let Some(alias) = alias {
1200            alias.into_string()
1201        } else {
1202            item
1203        };
1204        column_names.push(name);
1205
1206        // Get the column index that corresponds for this secret, so we can make sure to provide the
1207        // secrets in the correct order during evaluation.
1208        let column_idx = column_typs.len() - 1;
1209        // Double check that our column names and types match.
1210        assert_eq!(
1211            column_idx,
1212            column_names.len() - 1,
1213            "column names and types don't match"
1214        );
1215
1216        validation_secrets.push(WebhookValidationSecret {
1217            id,
1218            column_idx,
1219            use_bytes,
1220        });
1221    }
1222
1223    let relation_typ = SqlRelationType::new(column_typs);
1224    let desc = RelationDesc::new(relation_typ, column_names.clone());
1225    let scope = Scope::from_source(None, column_names);
1226
1227    transform_ast::transform(scx, &mut expr)?;
1228
1229    let ecx = &ExprContext {
1230        qcx: &qcx,
1231        name: "CHECK",
1232        scope: &scope,
1233        relation_type: desc.typ(),
1234        allow_aggregates: false,
1235        allow_subqueries: false,
1236        allow_parameters: false,
1237        allow_windows: false,
1238    };
1239    let expr = plan_expr(ecx, &expr)?
1240        .type_as(ecx, &SqlScalarType::Bool)?
1241        .lower_uncorrelated(scx.catalog.system_vars())?;
1242    let validation = WebhookValidation {
1243        expression: expr,
1244        relation_desc: desc,
1245        bodies: body_tuples,
1246        headers: header_tuples,
1247        secrets: validation_secrets,
1248    };
1249    Ok(validation)
1250}
1251
1252pub fn plan_default_expr(
1253    scx: &StatementContext,
1254    expr: &Expr<Aug>,
1255    target_ty: &SqlScalarType,
1256) -> Result<HirScalarExpr, PlanError> {
1257    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1258    let ecx = &ExprContext {
1259        qcx: &qcx,
1260        name: "DEFAULT expression",
1261        scope: &Scope::empty(),
1262        relation_type: &SqlRelationType::empty(),
1263        allow_aggregates: false,
1264        allow_subqueries: false,
1265        allow_parameters: false,
1266        allow_windows: false,
1267    };
1268    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1269    Ok(hir)
1270}
1271
1272pub fn plan_params<'a>(
1273    scx: &'a StatementContext,
1274    params: Vec<Expr<Aug>>,
1275    desc: &StatementDesc,
1276) -> Result<Params, PlanError> {
1277    if params.len() != desc.param_types.len() {
1278        sql_bail!(
1279            "expected {} params, got {}",
1280            desc.param_types.len(),
1281            params.len()
1282        );
1283    }
1284
1285    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1286
1287    let mut datums = Row::default();
1288    let mut packer = datums.packer();
1289    let mut actual_types = Vec::new();
1290    let temp_storage = &RowArena::new();
1291    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1292        transform_ast::transform(scx, &mut expr)?;
1293
1294        let ecx = execute_expr_context(&qcx);
1295        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1296        let actual_ty = ecx.scalar_type(&ex);
1297        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1298            return Err(PlanError::WrongParameterType(
1299                i + 1,
1300                ecx.humanize_sql_scalar_type(expected_ty, false),
1301                ecx.humanize_sql_scalar_type(&actual_ty, false),
1302            ));
1303        }
1304        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1305        let evaled = ex.eval(&[], temp_storage)?;
1306        packer.push(evaled);
1307        actual_types.push(actual_ty);
1308    }
1309    Ok(Params {
1310        datums,
1311        execute_types: actual_types,
1312        expected_types: desc.param_types.clone(),
1313    })
1314}
1315
1316static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1317static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1318
1319/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1320pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1321    ExprContext {
1322        qcx,
1323        name: "EXECUTE",
1324        scope: &EXECUTE_CONTEXT_SCOPE,
1325        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1326        allow_aggregates: false,
1327        allow_subqueries: false,
1328        allow_parameters: false,
1329        allow_windows: false,
1330    }
1331}
1332
1333/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1334///
1335/// This is an assignment cast also in Postgres, see
1336/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1337pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1338    LazyLock::new(|| CastContext::Assignment);
1339
1340pub fn plan_index_exprs<'a>(
1341    scx: &'a StatementContext,
1342    on_desc: &RelationDesc,
1343    exprs: Vec<Expr<Aug>>,
1344) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1345    let scope = Scope::from_source(None, on_desc.iter_names());
1346    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1347
1348    let ecx = &ExprContext {
1349        qcx: &qcx,
1350        name: "CREATE INDEX",
1351        scope: &scope,
1352        relation_type: on_desc.typ(),
1353        allow_aggregates: false,
1354        allow_subqueries: false,
1355        allow_parameters: false,
1356        allow_windows: false,
1357    };
1358    let repr_col_types: Vec<ReprColumnType> = on_desc
1359        .typ()
1360        .column_types
1361        .iter()
1362        .map(ReprColumnType::from)
1363        .collect();
1364    let mut out = vec![];
1365    for mut expr in exprs {
1366        transform_ast::transform(scx, &mut expr)?;
1367        let expr = plan_expr_or_col_index(ecx, &expr)?;
1368        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1369        expr.reduce(&repr_col_types);
1370        out.push(expr);
1371    }
1372    Ok(out)
1373}
1374
1375fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1376    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1377        Some(column) => Ok(HirScalarExpr::column(column)),
1378        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1379    }
1380}
1381
1382fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1383    match e {
1384        Expr::Value(Value::Number(n)) => {
1385            let n = n.parse::<usize>().map_err(|e| {
1386                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1387            })?;
1388            if n < 1 || n > max {
1389                sql_bail!(
1390                    "column reference {} in {} is out of range (1 - {})",
1391                    n,
1392                    name,
1393                    max
1394                );
1395            }
1396            Ok(Some(n - 1))
1397        }
1398        _ => Ok(None),
1399    }
1400}
1401
1402struct PlannedQuery {
1403    expr: HirRelationExpr,
1404    scope: Scope,
1405    order_by: Vec<ColumnOrder>,
1406    limit: Option<HirScalarExpr>,
1407    /// `offset` is either
1408    /// - an Int64 literal
1409    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1410    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1411    ///   later.)
1412    offset: HirScalarExpr,
1413    project: Vec<usize>,
1414    group_size_hints: GroupSizeHints,
1415}
1416
1417fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1418    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1419}
1420
1421fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1422    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1423    // for the identifiers, so that they can be re-installed before returning.
1424    let cte_bindings = plan_ctes(qcx, q)?;
1425
1426    let limit = match &q.limit {
1427        None => None,
1428        Some(Limit {
1429            quantity,
1430            with_ties: false,
1431        }) => {
1432            let ecx = &ExprContext {
1433                qcx,
1434                name: "LIMIT",
1435                scope: &Scope::empty(),
1436                relation_type: &SqlRelationType::empty(),
1437                allow_aggregates: false,
1438                allow_subqueries: true,
1439                allow_parameters: true,
1440                allow_windows: false,
1441            };
1442            let limit = plan_expr(ecx, quantity)?;
1443            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1444
1445            let limit = if limit.is_constant() {
1446                let arena = RowArena::new();
1447                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1448
1449                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1450                // that the error happened in a LIMIT clause, so that we have better error msg for
1451                // something like `SELECT 5 LIMIT 'aaa'`.
1452                match limit.eval(&[], &arena)? {
1453                    d @ Datum::Int64(v) if v >= 0 => {
1454                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1455                    }
1456                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1457                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1458                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1459                }
1460            } else {
1461                // Gate non-constant LIMIT expressions behind a feature flag
1462                qcx.scx
1463                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1464                limit
1465            };
1466
1467            Some(limit)
1468        }
1469        Some(Limit {
1470            quantity: _,
1471            with_ties: true,
1472        }) => bail_unsupported!("FETCH ... WITH TIES"),
1473    };
1474
1475    let offset = match &q.offset {
1476        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1477        Some(offset) => {
1478            let ecx = &ExprContext {
1479                qcx,
1480                name: "OFFSET",
1481                scope: &Scope::empty(),
1482                relation_type: &SqlRelationType::empty(),
1483                allow_aggregates: false,
1484                allow_subqueries: false,
1485                allow_parameters: true,
1486                allow_windows: false,
1487            };
1488            let offset = plan_expr(ecx, offset)?;
1489            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491            let offset = if offset.is_constant() {
1492                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1493                let offset_value = offset_into_value(offset)?;
1494                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1495            } else {
1496                // The only case when this is allowed to not be a constant is if it contains
1497                // parameters. (In which case, we'll later check that it's a constant after
1498                // parameter binding.)
1499                if !offset.contains_parameters() {
1500                    return Err(PlanError::InvalidOffset(format!(
1501                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1502                        offset
1503                    )));
1504                }
1505                offset
1506            };
1507            offset
1508        }
1509    };
1510
1511    let mut planned_query = match &q.body {
1512        SetExpr::Select(s) => {
1513            // Extract query options.
1514            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1515            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1516
1517            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1518            PlannedQuery {
1519                expr: plan.expr,
1520                scope: plan.scope,
1521                order_by: plan.order_by,
1522                project: plan.project,
1523                limit,
1524                offset,
1525                group_size_hints,
1526            }
1527        }
1528        _ => {
1529            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1530            let ecx = &ExprContext {
1531                qcx,
1532                name: "ORDER BY clause of a set expression",
1533                scope: &scope,
1534                relation_type: &qcx.relation_type(&expr),
1535                allow_aggregates: false,
1536                allow_subqueries: true,
1537                allow_parameters: true,
1538                allow_windows: false,
1539            };
1540            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1541            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1542            let project = (0..ecx.relation_type.arity()).collect();
1543            PlannedQuery {
1544                expr: expr.map(map_exprs),
1545                scope,
1546                order_by,
1547                limit,
1548                project,
1549                offset,
1550                group_size_hints: GroupSizeHints::default(),
1551            }
1552        }
1553    };
1554
1555    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1556    match &q.ctes {
1557        CteBlock::Simple(_) => {
1558            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1559                if let Some(cte) = qcx.ctes.remove(&id) {
1560                    planned_query.expr = HirRelationExpr::Let {
1561                        name: cte.name,
1562                        id: id.clone(),
1563                        value: Box::new(value),
1564                        body: Box::new(planned_query.expr),
1565                    };
1566                }
1567                if let Some(shadowed_val) = shadowed_val {
1568                    qcx.ctes.insert(id, shadowed_val);
1569                }
1570            }
1571        }
1572        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1573            let MutRecBlockOptionExtracted {
1574                recursion_limit,
1575                return_at_recursion_limit,
1576                error_at_recursion_limit,
1577                seen: _,
1578            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1579            let limit = match (
1580                recursion_limit,
1581                return_at_recursion_limit,
1582                error_at_recursion_limit,
1583            ) {
1584                (None, None, None) => None,
1585                (Some(max_iters), None, None) => {
1586                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1587                }
1588                (None, Some(max_iters), None) => Some((max_iters, true)),
1589                (None, None, Some(max_iters)) => Some((max_iters, false)),
1590                _ => {
1591                    return Err(InvalidWmrRecursionLimit(
1592                        "More than one recursion limit given. \
1593                         Please give at most one of RECURSION LIMIT, \
1594                         ERROR AT RECURSION LIMIT, \
1595                         RETURN AT RECURSION LIMIT."
1596                            .to_owned(),
1597                    ));
1598                }
1599            }
1600            .try_map(|(max_iters, return_at_limit)| {
1601                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1602                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1603                        "Recursion limit has to be greater than 0.".to_owned(),
1604                    ))?,
1605                    return_at_limit: *return_at_limit,
1606                })
1607            })?;
1608
1609            let mut bindings = Vec::new();
1610            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1611                if let Some(cte) = qcx.ctes.remove(&id) {
1612                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1613                }
1614                if let Some(shadowed_val) = shadowed_val {
1615                    qcx.ctes.insert(id, shadowed_val);
1616                }
1617            }
1618            if !bindings.is_empty() {
1619                planned_query.expr = HirRelationExpr::LetRec {
1620                    limit,
1621                    bindings,
1622                    body: Box::new(planned_query.expr),
1623                }
1624            }
1625        }
1626    }
1627
1628    Ok(planned_query)
1629}
1630
1631/// Converts an OFFSET expression into a value.
1632pub fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1633    let offset = offset
1634        .try_into_literal_int64()
1635        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1636    if offset < 0 {
1637        return Err(PlanError::InvalidOffset(format!(
1638            "must not be negative, got {}",
1639            offset
1640        )));
1641    }
1642    Ok(offset)
1643}
1644
1645generate_extracted_config!(
1646    MutRecBlockOption,
1647    (RecursionLimit, u64),
1648    (ReturnAtRecursionLimit, u64),
1649    (ErrorAtRecursionLimit, u64)
1650);
1651
1652/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1653///
1654/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1655/// shadowed value that can be reinstalled once the planning has completed.
1656pub fn plan_ctes(
1657    qcx: &mut QueryContext,
1658    q: &Query<Aug>,
1659) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1660    // Accumulate planned expressions and shadowed descriptions.
1661    let mut result = Vec::new();
1662    // Retain the old descriptions of CTE bindings so that we can restore them
1663    // after we're done planning this SELECT.
1664    let mut shadowed_descs = BTreeMap::new();
1665
1666    // A reused identifier indicates a reused name.
1667    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1668        sql_bail!(
1669            "WITH query name {} specified more than once",
1670            normalize::ident_ref(ident).quoted()
1671        )
1672    }
1673
1674    match &q.ctes {
1675        CteBlock::Simple(ctes) => {
1676            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1677            for cte in ctes.iter() {
1678                let cte_name = normalize::ident(cte.alias.name.clone());
1679                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1680                let typ = qcx.relation_type(&val);
1681                let mut desc = RelationDesc::new(typ, scope.column_names());
1682                plan_utils::maybe_rename_columns(
1683                    format!("CTE {}", cte.alias.name),
1684                    &mut desc,
1685                    &cte.alias.columns,
1686                )?;
1687                // Capture the prior value if it exists, so that it can be re-installed.
1688                let shadowed = qcx.ctes.insert(
1689                    cte.id,
1690                    CteDesc {
1691                        name: cte_name,
1692                        desc,
1693                    },
1694                );
1695
1696                result.push((cte.id, val, shadowed));
1697            }
1698        }
1699        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1700            // Insert column types into `qcx.ctes` first for recursive bindings.
1701            for cte in ctes.iter() {
1702                let cte_name = normalize::ident(cte.name.clone());
1703                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1704                for column in cte.columns.iter() {
1705                    desc_columns.push((
1706                        normalize::column_name(column.name.clone()),
1707                        SqlColumnType {
1708                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1709                            nullable: true,
1710                        },
1711                    ));
1712                }
1713                let desc = RelationDesc::from_names_and_types(desc_columns);
1714                let shadowed = qcx.ctes.insert(
1715                    cte.id,
1716                    CteDesc {
1717                        name: cte_name,
1718                        desc,
1719                    },
1720                );
1721                // Capture the prior value if it exists, so that it can be re-installed.
1722                if let Some(shadowed) = shadowed {
1723                    shadowed_descs.insert(cte.id, shadowed);
1724                }
1725            }
1726
1727            // Plan all CTEs and validate the proposed types.
1728            for cte in ctes.iter() {
1729                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1730
1731                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1732
1733                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1734                    // Once WMR CTEs support NOT NULL constraints, check that
1735                    // nullability of derived column types are compatible.
1736                    sql_bail!(
1737                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1738                    );
1739                }
1740
1741                if !proposed_typ.keys.is_empty() {
1742                    // Once WMR CTEs support keys, check that keys exactly
1743                    // overlap.
1744                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1745                }
1746
1747                // Validate that the derived and proposed types are the same.
1748                let derived_typ = qcx.relation_type(&val);
1749
1750                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1751                    let cte_name = normalize::ident(cte.name.clone());
1752                    let proposed_typ = proposed_typ
1753                        .column_types
1754                        .iter()
1755                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1756                        .collect::<Vec<_>>();
1757                    let inferred_typ = derived_typ
1758                        .column_types
1759                        .iter()
1760                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1761                        .collect::<Vec<_>>();
1762                    Err(PlanError::RecursiveTypeMismatch(
1763                        cte_name,
1764                        proposed_typ,
1765                        inferred_typ,
1766                    ))
1767                };
1768
1769                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1770                    return type_err(proposed_typ, derived_typ);
1771                }
1772
1773                // Cast derived types to proposed types or error.
1774                let val = match cast_relation(
1775                    qcx,
1776                    // Choose `CastContext::Assignment`` because the user has
1777                    // been explicit about the types they expect. Choosing
1778                    // `CastContext::Implicit` is not "strong" enough to impose
1779                    // typmods from proposed types onto values.
1780                    CastContext::Assignment,
1781                    val,
1782                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1783                ) {
1784                    Ok(val) => val,
1785                    Err(_) => return type_err(proposed_typ, derived_typ),
1786                };
1787
1788                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1789            }
1790        }
1791    }
1792
1793    Ok(result)
1794}
1795
1796pub fn plan_nested_query(
1797    qcx: &mut QueryContext,
1798    q: &Query<Aug>,
1799) -> Result<(HirRelationExpr, Scope), PlanError> {
1800    let PlannedQuery {
1801        mut expr,
1802        scope,
1803        order_by,
1804        limit,
1805        offset,
1806        project,
1807        group_size_hints,
1808    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1809    if limit.is_some()
1810        || !offset
1811            .clone()
1812            .try_into_literal_int64()
1813            .is_ok_and(|offset| offset == 0)
1814    {
1815        expr = HirRelationExpr::top_k(
1816            expr,
1817            vec![],
1818            order_by,
1819            limit,
1820            offset,
1821            group_size_hints.limit_input_group_size,
1822        );
1823    }
1824    Ok((expr.project(project), scope))
1825}
1826
1827fn plan_set_expr(
1828    qcx: &mut QueryContext,
1829    q: &SetExpr<Aug>,
1830) -> Result<(HirRelationExpr, Scope), PlanError> {
1831    match q {
1832        SetExpr::Select(select) => {
1833            let order_by_exprs = Vec::new();
1834            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1835            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1836            // should not have planned any ordering.
1837            assert!(plan.order_by.is_empty());
1838            Ok((plan.expr.project(plan.project), plan.scope))
1839        }
1840        SetExpr::SetOperation {
1841            op,
1842            all,
1843            left,
1844            right,
1845        } => {
1846            // Plan the LHS and RHS.
1847            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1848            let (right_expr, right_scope) =
1849                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1850
1851            // Validate that the LHS and RHS are the same width.
1852            let left_type = qcx.relation_type(&left_expr);
1853            let right_type = qcx.relation_type(&right_expr);
1854            if left_type.arity() != right_type.arity() {
1855                sql_bail!(
1856                    "each {} query must have the same number of columns: {} vs {}",
1857                    op,
1858                    left_type.arity(),
1859                    right_type.arity(),
1860                );
1861            }
1862
1863            // Match the types of the corresponding columns on the LHS and RHS
1864            // using the normal type coercion rules. This is equivalent to
1865            // `coerce_homogeneous_exprs`, but implemented in terms of
1866            // `HirRelationExpr` rather than `HirScalarExpr`.
1867            let left_ecx = &ExprContext {
1868                qcx,
1869                name: &op.to_string(),
1870                scope: &left_scope,
1871                relation_type: &left_type,
1872                allow_aggregates: false,
1873                allow_subqueries: false,
1874                allow_parameters: false,
1875                allow_windows: false,
1876            };
1877            let right_ecx = &ExprContext {
1878                qcx,
1879                name: &op.to_string(),
1880                scope: &right_scope,
1881                relation_type: &right_type,
1882                allow_aggregates: false,
1883                allow_subqueries: false,
1884                allow_parameters: false,
1885                allow_windows: false,
1886            };
1887            let mut left_casts = vec![];
1888            let mut right_casts = vec![];
1889            for (i, (left_type, right_type)) in left_type
1890                .column_types
1891                .iter()
1892                .zip_eq(right_type.column_types.iter())
1893                .enumerate()
1894            {
1895                let types = &[
1896                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1897                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1898                ];
1899                let target =
1900                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1901                match typeconv::plan_cast(
1902                    left_ecx,
1903                    CastContext::Implicit,
1904                    HirScalarExpr::column(i),
1905                    &target,
1906                ) {
1907                    Ok(expr) => left_casts.push(expr),
1908                    Err(_) => sql_bail!(
1909                        "{} types {} and {} cannot be matched",
1910                        op,
1911                        qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1912                        qcx.humanize_sql_scalar_type(&target, false),
1913                    ),
1914                }
1915                match typeconv::plan_cast(
1916                    right_ecx,
1917                    CastContext::Implicit,
1918                    HirScalarExpr::column(i),
1919                    &target,
1920                ) {
1921                    Ok(expr) => right_casts.push(expr),
1922                    Err(_) => sql_bail!(
1923                        "{} types {} and {} cannot be matched",
1924                        op,
1925                        qcx.humanize_sql_scalar_type(&target, false),
1926                        qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1927                    ),
1928                }
1929            }
1930            let lhs = if left_casts
1931                .iter()
1932                .enumerate()
1933                .any(|(i, e)| e != &HirScalarExpr::column(i))
1934            {
1935                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1936                left_expr.map(left_casts).project(project_key)
1937            } else {
1938                left_expr
1939            };
1940            let rhs = if right_casts
1941                .iter()
1942                .enumerate()
1943                .any(|(i, e)| e != &HirScalarExpr::column(i))
1944            {
1945                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1946                right_expr.map(right_casts).project(project_key)
1947            } else {
1948                right_expr
1949            };
1950
1951            let relation_expr = match op {
1952                SetOperator::Union => {
1953                    if *all {
1954                        lhs.union(rhs)
1955                    } else {
1956                        lhs.union(rhs).distinct()
1957                    }
1958                }
1959                SetOperator::Except => Hir::except(all, lhs, rhs),
1960                SetOperator::Intersect => {
1961                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1962                    // Though we believe that render() does The Right Thing (TM)
1963                    // Also note that we do *not* need another threshold() at the end of the method chain
1964                    // because the right-hand side of the outer union only produces existing records,
1965                    // i.e., the record counts for differential data flow definitely remain non-negative.
1966                    let left_clone = lhs.clone();
1967                    if *all {
1968                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1969                    } else {
1970                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1971                            .distinct()
1972                    }
1973                }
1974            };
1975            let scope = Scope::from_source(
1976                None,
1977                // Column names are taken from the left, as in Postgres.
1978                left_scope.column_names(),
1979            );
1980
1981            Ok((relation_expr, scope))
1982        }
1983        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1984        SetExpr::Table(name) => {
1985            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1986            Ok((expr, scope))
1987        }
1988        SetExpr::Query(query) => {
1989            let (expr, scope) = plan_nested_query(qcx, query)?;
1990            Ok((expr, scope))
1991        }
1992        SetExpr::Show(stmt) => {
1993            // The create SQL definition of involving this query, will have the explicit `SHOW`
1994            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
1995            // current schema of the executing user. When Materialize restarts and tries to re-plan
1996            // these queries, it will only have access to the raw `SHOW` command and have no idea
1997            // what schema to use. As a result Materialize will fail to boot.
1998            //
1999            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2000            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2001            // However, banning show commands in views gives us more flexibility to change their
2002            // output.
2003            //
2004            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2005            // with all show commands expanded into their equivalent SELECT statements.
2006            if !qcx.lifetime.allow_show() {
2007                return Err(PlanError::ShowCommandInView);
2008            }
2009
2010            // Some SHOW statements are a SELECT query. Others produces Rows
2011            // directly. Convert both of these to the needed Hir and Scope.
2012            fn to_hirscope(
2013                plan: ShowCreatePlan,
2014                desc: StatementDesc,
2015            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2016                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2017                let desc = desc.relation_desc.ok_or_else(|| {
2018                    internal_err!("statement description missing relation descriptor")
2019                })?;
2020                let scope = Scope::from_source(None, desc.iter_names());
2021                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2022                Ok((expr, scope))
2023            }
2024
2025            match stmt.clone() {
2026                ShowStatement::ShowColumns(stmt) => {
2027                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2028                }
2029                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2030                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2031                    show::describe_show_create_connection(qcx.scx, stmt)?,
2032                ),
2033                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2034                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2035                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2036                ),
2037                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2038                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2039                    show::describe_show_create_index(qcx.scx, stmt)?,
2040                ),
2041                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2042                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2043                    show::describe_show_create_sink(qcx.scx, stmt)?,
2044                ),
2045                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2046                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2047                    show::describe_show_create_source(qcx.scx, stmt)?,
2048                ),
2049                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2050                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2051                    show::describe_show_create_table(qcx.scx, stmt)?,
2052                ),
2053                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2054                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2055                    show::describe_show_create_view(qcx.scx, stmt)?,
2056                ),
2057                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2058                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2059                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2060                ),
2061                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2062                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2063                    show::describe_show_create_type(qcx.scx, stmt)?,
2064                ),
2065                ShowStatement::ShowObjects(stmt) => {
2066                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2067                }
2068                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2069                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2070            }
2071        }
2072    }
2073}
2074
2075/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2076fn plan_values(
2077    qcx: &QueryContext,
2078    values: &[Vec<Expr<Aug>>],
2079) -> Result<(HirRelationExpr, Scope), PlanError> {
2080    assert!(!values.is_empty());
2081
2082    let ecx = &ExprContext {
2083        qcx,
2084        name: "VALUES",
2085        scope: &Scope::empty(),
2086        relation_type: &SqlRelationType::empty(),
2087        allow_aggregates: false,
2088        allow_subqueries: true,
2089        allow_parameters: true,
2090        allow_windows: false,
2091    };
2092
2093    let ncols = values[0].len();
2094    let nrows = values.len();
2095
2096    // Arrange input expressions by columns, not rows, so that we can
2097    // call `coerce_homogeneous_exprs` on each column.
2098    let mut cols = vec![vec![]; ncols];
2099    for row in values {
2100        if row.len() != ncols {
2101            sql_bail!(
2102                "VALUES expression has varying number of columns: {} vs {}",
2103                row.len(),
2104                ncols
2105            );
2106        }
2107        for (i, v) in row.iter().enumerate() {
2108            cols[i].push(v);
2109        }
2110    }
2111
2112    // Plan each column.
2113    let mut col_iters = Vec::with_capacity(ncols);
2114    let mut col_types = Vec::with_capacity(ncols);
2115    for col in &cols {
2116        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2117        let mut col_type = ecx.column_type(&col[0]);
2118        for val in &col[1..] {
2119            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2120        }
2121        col_types.push(col_type);
2122        col_iters.push(col.into_iter());
2123    }
2124
2125    // Build constant relation.
2126    let mut exprs = vec![];
2127    for _ in 0..nrows {
2128        for i in 0..ncols {
2129            exprs.push(col_iters[i].next().unwrap());
2130        }
2131    }
2132    let out = HirRelationExpr::CallTable {
2133        func: TableFunc::Wrap {
2134            width: ncols,
2135            types: col_types,
2136        },
2137        exprs,
2138    };
2139
2140    // Build column names.
2141    let mut scope = Scope::empty();
2142    for i in 0..ncols {
2143        let name = format!("column{}", i + 1);
2144        scope.items.push(ScopeItem::from_column_name(name));
2145    }
2146
2147    Ok((out, scope))
2148}
2149
2150/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2151/// statement.
2152///
2153/// This is special-cased in PostgreSQL and different enough from `plan_values`
2154/// that it is easier to use a separate function entirely. Unlike a normal
2155/// `VALUES` clause, each value is coerced to the type of the target table
2156/// via an assignment cast.
2157///
2158/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2159fn plan_values_insert(
2160    qcx: &QueryContext,
2161    target_names: &[&ColumnName],
2162    target_types: &[&SqlScalarType],
2163    values: &[Vec<Expr<Aug>>],
2164) -> Result<HirRelationExpr, PlanError> {
2165    assert!(!values.is_empty());
2166
2167    if !values.iter().map(|row| row.len()).all_equal() {
2168        sql_bail!("VALUES lists must all be the same length");
2169    }
2170
2171    let ecx = &ExprContext {
2172        qcx,
2173        name: "VALUES",
2174        scope: &Scope::empty(),
2175        relation_type: &SqlRelationType::empty(),
2176        allow_aggregates: false,
2177        allow_subqueries: true,
2178        allow_parameters: true,
2179        allow_windows: false,
2180    };
2181
2182    let mut exprs = vec![];
2183    let mut types = vec![];
2184    for row in values {
2185        if row.len() > target_names.len() {
2186            sql_bail!("INSERT has more expressions than target columns");
2187        }
2188        for (column, val) in row.into_iter().enumerate() {
2189            let target_type = &target_types[column];
2190            let val = plan_expr(ecx, val)?;
2191            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2192            let source_type = &ecx.scalar_type(&val);
2193            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2194                Ok(val) => val,
2195                Err(_) => sql_bail!(
2196                    "column {} is of type {} but expression is of type {}",
2197                    target_names[column].quoted(),
2198                    qcx.humanize_sql_scalar_type(target_type, false),
2199                    qcx.humanize_sql_scalar_type(source_type, false),
2200                ),
2201            };
2202            if column >= types.len() {
2203                types.push(ecx.column_type(&val));
2204            } else {
2205                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2206            }
2207            exprs.push(val);
2208        }
2209    }
2210
2211    Ok(HirRelationExpr::CallTable {
2212        func: TableFunc::Wrap {
2213            width: values[0].len(),
2214            types,
2215        },
2216        exprs,
2217    })
2218}
2219
2220fn plan_join_identity() -> (HirRelationExpr, Scope) {
2221    let typ = SqlRelationType::new(vec![]);
2222    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2223    let scope = Scope::empty();
2224    (expr, scope)
2225}
2226
2227/// Describes how to execute a SELECT query.
2228///
2229/// `order_by` describes how to order the rows in `expr` *before* applying the
2230/// projection. The `scope` describes the columns in `expr` *after* the
2231/// projection has been applied.
2232#[derive(Debug)]
2233struct SelectPlan {
2234    expr: HirRelationExpr,
2235    scope: Scope,
2236    order_by: Vec<ColumnOrder>,
2237    project: Vec<usize>,
2238}
2239
2240generate_extracted_config!(
2241    SelectOption,
2242    (ExpectedGroupSize, u64),
2243    (AggregateInputGroupSize, u64),
2244    (DistinctOnInputGroupSize, u64),
2245    (LimitInputGroupSize, u64)
2246);
2247
2248/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2249///
2250/// Normally, the ORDER BY clause occurs after the columns specified in the
2251/// SELECT list have been projected. In a query like
2252///
2253///   CREATE TABLE (a int, b int)
2254///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2255///
2256/// it is valid to refer to `a`, because it is explicitly selected, but it would
2257/// not be valid to refer to unselected column `b`.
2258///
2259/// But PostgreSQL extends the standard to permit queries like
2260///
2261///   SELECT a FROM t ORDER BY b
2262///
2263/// where expressions in the ORDER BY clause can refer to *both* input columns
2264/// and output columns.
2265fn plan_select_from_where(
2266    qcx: &QueryContext,
2267    mut s: Select<Aug>,
2268    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2269) -> Result<SelectPlan, PlanError> {
2270    // TODO: Both `s` and `order_by_exprs` are not references because the
2271    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2272    // table function support (the UUID mapping). Attempt to change this so callers
2273    // don't need to clone the Select.
2274
2275    // Extract query options.
2276    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2277    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2278
2279    // Step 1. Handle FROM clause, including joins.
2280    let (mut relation_expr, mut from_scope) =
2281        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2282            let (left, left_scope) = l;
2283            plan_join(
2284                qcx,
2285                left,
2286                left_scope,
2287                &Join {
2288                    relation: TableFactor::NestedJoin {
2289                        join: Box::new(twj.clone()),
2290                        alias: None,
2291                    },
2292                    join_operator: JoinOperator::CrossJoin,
2293                },
2294            )
2295        })?;
2296
2297    // Step 2. Handle WHERE clause.
2298    if let Some(selection) = &s.selection {
2299        let ecx = &ExprContext {
2300            qcx,
2301            name: "WHERE clause",
2302            scope: &from_scope,
2303            relation_type: &qcx.relation_type(&relation_expr),
2304            allow_aggregates: false,
2305            allow_subqueries: true,
2306            allow_parameters: true,
2307            allow_windows: false,
2308        };
2309        let expr = plan_expr(ecx, selection)
2310            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2311            .type_as(ecx, &SqlScalarType::Bool)?;
2312        relation_expr = relation_expr.filter(vec![expr]);
2313    }
2314
2315    // Step 3. Gather aggregates and table functions.
2316    // (But skip window aggregates.)
2317    let (aggregates, table_funcs) = {
2318        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2319        visitor.visit_select_mut(&mut s);
2320        for o in order_by_exprs.iter_mut() {
2321            visitor.visit_order_by_expr_mut(o);
2322        }
2323        visitor.into_result()?
2324    };
2325    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2326    if !table_funcs.is_empty() {
2327        let (expr, scope) = plan_scalar_table_funcs(
2328            qcx,
2329            table_funcs,
2330            &mut table_func_names,
2331            &relation_expr,
2332            &from_scope,
2333        )?;
2334        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2335        from_scope = from_scope.product(scope)?;
2336    }
2337
2338    // Step 4. Expand SELECT clause.
2339    let projection = {
2340        let ecx = &ExprContext {
2341            qcx,
2342            name: "SELECT clause",
2343            scope: &from_scope,
2344            relation_type: &qcx.relation_type(&relation_expr),
2345            allow_aggregates: true,
2346            allow_subqueries: true,
2347            allow_parameters: true,
2348            allow_windows: true,
2349        };
2350        let mut out = vec![];
2351        for si in &s.projection {
2352            if *si == SelectItem::Wildcard && s.from.is_empty() {
2353                sql_bail!("SELECT * with no tables specified is not valid");
2354            }
2355            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2356        }
2357        out
2358    };
2359
2360    // Step 5. Handle GROUP BY clause.
2361    // This will also plan the aggregates gathered in Step 3.
2362    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2363    let (mut group_scope, select_all_mapping) = {
2364        // Compute GROUP BY expressions.
2365        let ecx = &ExprContext {
2366            qcx,
2367            name: "GROUP BY clause",
2368            scope: &from_scope,
2369            relation_type: &qcx.relation_type(&relation_expr),
2370            allow_aggregates: false,
2371            allow_subqueries: true,
2372            allow_parameters: true,
2373            allow_windows: false,
2374        };
2375        let mut group_key = vec![];
2376        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2377        let mut group_hir_exprs = vec![];
2378        let mut group_scope = Scope::empty();
2379        let mut select_all_mapping = BTreeMap::new();
2380
2381        for group_expr in &s.group_by {
2382            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2383            let new_column = group_key.len();
2384
2385            if let Some(group_expr) = group_expr {
2386                // Multiple AST expressions can map to the same HIR expression.
2387                // If we already have a ScopeItem for this HIR, we can add this
2388                // next AST expression to its set
2389                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2390                    existing_scope_item.exprs.insert(group_expr.clone());
2391                    continue;
2392                }
2393            }
2394
2395            let mut scope_item = if let HirScalarExpr::Column(
2396                ColumnRef {
2397                    level: 0,
2398                    column: old_column,
2399                },
2400                _name,
2401            ) = &expr
2402            {
2403                // If we later have `SELECT foo.*` then we have to find all
2404                // the `foo` items in `from_scope` and figure out where they
2405                // ended up in `group_scope`. This is really hard to do
2406                // right using SQL name resolution, so instead we just track
2407                // the movement here.
2408                select_all_mapping.insert(*old_column, new_column);
2409                let scope_item = ecx.scope.items[*old_column].clone();
2410                scope_item
2411            } else {
2412                ScopeItem::empty()
2413            };
2414
2415            if let Some(group_expr) = group_expr.cloned() {
2416                scope_item.exprs.insert(group_expr);
2417            }
2418
2419            group_key.push(from_scope.len() + group_exprs.len());
2420            group_hir_exprs.push(expr.clone());
2421            group_exprs.insert(expr, scope_item);
2422        }
2423
2424        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2425        for expr in &group_hir_exprs {
2426            if let Some(scope_item) = group_exprs.remove(expr) {
2427                group_scope.items.push(scope_item);
2428            }
2429        }
2430
2431        // Plan aggregates.
2432        let ecx = &ExprContext {
2433            qcx,
2434            name: "aggregate function",
2435            scope: &from_scope,
2436            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2437            allow_aggregates: false,
2438            allow_subqueries: true,
2439            allow_parameters: true,
2440            allow_windows: false,
2441        };
2442        let mut agg_exprs = vec![];
2443        for sql_function in aggregates {
2444            if sql_function.over.is_some() {
2445                unreachable!(
2446                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2447                );
2448            }
2449            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2450            group_scope
2451                .items
2452                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2453        }
2454        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2455            // apply GROUP BY / aggregates
2456            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2457                group_key,
2458                agg_exprs,
2459                group_size_hints.aggregate_input_group_size,
2460            );
2461
2462            // For every old column that wasn't a group key, add a scope item
2463            // that errors when referenced. We can't simply drop these items
2464            // from scope. These items need to *exist* because they might shadow
2465            // variables in outer scopes that would otherwise be valid to
2466            // reference, but accessing them needs to produce an error.
2467            for i in 0..from_scope.len() {
2468                if !select_all_mapping.contains_key(&i) {
2469                    let scope_item = &ecx.scope.items[i];
2470                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2471                        table_name: scope_item.table_name.clone(),
2472                        column_name: scope_item.column_name.clone(),
2473                        allow_unqualified_references: scope_item.allow_unqualified_references,
2474                    });
2475                }
2476            }
2477
2478            (group_scope, select_all_mapping)
2479        } else {
2480            // if no GROUP BY, aggregates or having then all columns remain in scope
2481            (
2482                from_scope.clone(),
2483                (0..from_scope.len()).map(|i| (i, i)).collect(),
2484            )
2485        }
2486    };
2487
2488    // Step 6. Handle HAVING clause.
2489    if let Some(ref having) = s.having {
2490        let ecx = &ExprContext {
2491            qcx,
2492            name: "HAVING clause",
2493            scope: &group_scope,
2494            relation_type: &qcx.relation_type(&relation_expr),
2495            allow_aggregates: true,
2496            allow_subqueries: true,
2497            allow_parameters: true,
2498            allow_windows: false,
2499        };
2500        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2501        relation_expr = relation_expr.filter(vec![expr]);
2502    }
2503
2504    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2505    // (This includes window aggregations.)
2506    //
2507    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2508    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2509    //
2510    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2511    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2512    // and can't be part of a bigger expression.
2513    // See https://www.postgresql.org/docs/current/queries-order.html:
2514    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2515    // expression"
2516    let window_funcs = {
2517        let mut visitor = WindowFuncCollector::default();
2518        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2519        // window functions are excluded from other things by `allow_windows` being false when
2520        // planning those before this code).
2521        visitor.visit_select(&s);
2522        for o in order_by_exprs.iter() {
2523            visitor.visit_order_by_expr(o);
2524        }
2525        visitor.into_result()
2526    };
2527    for window_func in window_funcs {
2528        let ecx = &ExprContext {
2529            qcx,
2530            name: "window function",
2531            scope: &group_scope,
2532            relation_type: &qcx.relation_type(&relation_expr),
2533            allow_aggregates: true,
2534            allow_subqueries: true,
2535            allow_parameters: true,
2536            allow_windows: true,
2537        };
2538        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2539        group_scope.items.push(ScopeItem::from_expr(window_func));
2540    }
2541    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2542    // been already planned now. However, we should still set `allow_windows: true` for the
2543    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2544    // msg if an OVER clause is missing from a window function.
2545
2546    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2547    if let Some(ref qualify) = s.qualify {
2548        let ecx = &ExprContext {
2549            qcx,
2550            name: "QUALIFY clause",
2551            scope: &group_scope,
2552            relation_type: &qcx.relation_type(&relation_expr),
2553            allow_aggregates: true,
2554            allow_subqueries: true,
2555            allow_parameters: true,
2556            allow_windows: true,
2557        };
2558        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2559        relation_expr = relation_expr.filter(vec![expr]);
2560    }
2561
2562    // Step 9. Handle SELECT clause.
2563    let output_columns = {
2564        let mut new_exprs = vec![];
2565        let mut new_type = qcx.relation_type(&relation_expr);
2566        let mut output_columns = vec![];
2567        for (select_item, column_name) in &projection {
2568            let ecx = &ExprContext {
2569                qcx,
2570                name: "SELECT clause",
2571                scope: &group_scope,
2572                relation_type: &new_type,
2573                allow_aggregates: true,
2574                allow_subqueries: true,
2575                allow_parameters: true,
2576                allow_windows: true,
2577            };
2578            let expr = match select_item {
2579                ExpandedSelectItem::InputOrdinal(i) => {
2580                    if let Some(column) = select_all_mapping.get(i).copied() {
2581                        HirScalarExpr::column(column)
2582                    } else {
2583                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2584                    }
2585                }
2586                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2587            };
2588            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2589                // Simple column reference; no need to map on a new expression.
2590                output_columns.push((column, column_name));
2591            } else {
2592                // Complicated expression that requires a map expression. We
2593                // update `group_scope` as we go so that future expressions that
2594                // are textually identical to this one can reuse it. This
2595                // duplicate detection is required for proper determination of
2596                // ambiguous column references with SQL92-style `ORDER BY`
2597                // items. See `plan_order_by_or_distinct_expr` for more.
2598                let typ = ecx.column_type(&expr);
2599                new_type.column_types.push(typ);
2600                new_exprs.push(expr);
2601                output_columns.push((group_scope.len(), column_name));
2602                group_scope
2603                    .items
2604                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2605            }
2606        }
2607        relation_expr = relation_expr.map(new_exprs);
2608        output_columns
2609    };
2610    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2611
2612    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2613    let order_by = {
2614        let relation_type = qcx.relation_type(&relation_expr);
2615        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2616            &ExprContext {
2617                qcx,
2618                name: "ORDER BY clause",
2619                scope: &group_scope,
2620                relation_type: &relation_type,
2621                allow_aggregates: true,
2622                allow_subqueries: true,
2623                allow_parameters: true,
2624                allow_windows: true,
2625            },
2626            &order_by_exprs,
2627            &output_columns,
2628        )?;
2629
2630        match s.distinct {
2631            None => relation_expr = relation_expr.map(map_exprs),
2632            Some(Distinct::EntireRow) => {
2633                if relation_type.arity() == 0 {
2634                    sql_bail!("SELECT DISTINCT must have at least one column");
2635                }
2636                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2637                // list, so we can't proceed if `ORDER BY` has introduced any
2638                // columns for arbitrary expressions. This matches PostgreSQL.
2639                if !try_push_projection_order_by(
2640                    &mut relation_expr,
2641                    &mut project_key,
2642                    &mut order_by,
2643                ) {
2644                    sql_bail!(
2645                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2646                    );
2647                }
2648                assert!(map_exprs.is_empty());
2649                relation_expr = relation_expr.distinct();
2650            }
2651            Some(Distinct::On(exprs)) => {
2652                let ecx = &ExprContext {
2653                    qcx,
2654                    name: "DISTINCT ON clause",
2655                    scope: &group_scope,
2656                    relation_type: &qcx.relation_type(&relation_expr),
2657                    allow_aggregates: true,
2658                    allow_subqueries: true,
2659                    allow_parameters: true,
2660                    allow_windows: true,
2661                };
2662
2663                let mut distinct_exprs = vec![];
2664                for expr in &exprs {
2665                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2666                    distinct_exprs.push(expr);
2667                }
2668
2669                let mut distinct_key = vec![];
2670
2671                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2672                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2673                // expressions, though the order of `DISTINCT ON` expressions
2674                // does not matter. This matches PostgreSQL and leaves the door
2675                // open to a future optimization where the `DISTINCT ON` and
2676                // `ORDER BY` operations happen in one pass.
2677                //
2678                // On the bright side, any columns that have already been
2679                // computed by `ORDER BY` can be reused in the distinct key.
2680                let arity = relation_type.arity();
2681                for ord in order_by.iter().take(distinct_exprs.len()) {
2682                    // The unusual construction of `expr` here is to ensure the
2683                    // temporary column expression lives long enough.
2684                    let mut expr = &HirScalarExpr::column(ord.column);
2685                    if ord.column >= arity {
2686                        expr = &map_exprs[ord.column - arity];
2687                    };
2688                    match distinct_exprs.iter().position(move |e| e == expr) {
2689                        None => sql_bail!(
2690                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2691                        ),
2692                        Some(pos) => {
2693                            distinct_exprs.remove(pos);
2694                        }
2695                    }
2696                    distinct_key.push(ord.column);
2697                }
2698
2699                // Add any remaining `DISTINCT ON` expressions to the key.
2700                for expr in distinct_exprs {
2701                    // If the expression is a reference to an existing column,
2702                    // do not introduce a new column to support it.
2703                    let column = match expr {
2704                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2705                        _ => {
2706                            map_exprs.push(expr);
2707                            arity + map_exprs.len() - 1
2708                        }
2709                    };
2710                    distinct_key.push(column);
2711                }
2712
2713                // `DISTINCT ON` is semantically a TopK with limit 1. The
2714                // columns in `ORDER BY` that are not part of the distinct key,
2715                // if there are any, determine the ordering within each group,
2716                // per PostgreSQL semantics.
2717                let distinct_len = distinct_key.len();
2718                relation_expr = HirRelationExpr::top_k(
2719                    relation_expr.map(map_exprs),
2720                    distinct_key,
2721                    order_by.iter().skip(distinct_len).cloned().collect(),
2722                    Some(HirScalarExpr::literal(
2723                        Datum::Int64(1),
2724                        SqlScalarType::Int64,
2725                    )),
2726                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2727                    group_size_hints.distinct_on_input_group_size,
2728                );
2729            }
2730        }
2731
2732        order_by
2733    };
2734
2735    // Construct a clean scope to expose outwards, where all of the state that
2736    // accumulated in the scope during planning of this SELECT is erased. The
2737    // clean scope has at most one name for each column, and the names are not
2738    // associated with any table.
2739    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2740
2741    Ok(SelectPlan {
2742        expr: relation_expr,
2743        scope,
2744        order_by,
2745        project: project_key,
2746    })
2747}
2748
2749fn plan_scalar_table_funcs(
2750    qcx: &QueryContext,
2751    table_funcs: BTreeMap<Function<Aug>, String>,
2752    table_func_names: &mut BTreeMap<String, Ident>,
2753    relation_expr: &HirRelationExpr,
2754    from_scope: &Scope,
2755) -> Result<(HirRelationExpr, Scope), PlanError> {
2756    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2757
2758    for (table_func, id) in table_funcs.iter() {
2759        table_func_names.insert(
2760            id.clone(),
2761            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2762            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2763        );
2764    }
2765    // If there's only a single table function, we can skip generating
2766    // ordinality columns.
2767    if table_funcs.len() == 1 {
2768        let (table_func, id) = table_funcs.iter().next().unwrap();
2769        let (expr, mut scope) =
2770            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2771
2772        // A single table-function might return several columns as a record
2773        let num_cols = scope.len();
2774        for i in 0..scope.len() {
2775            scope.items[i].table_name = Some(PartialItemName {
2776                database: None,
2777                schema: None,
2778                item: id.clone(),
2779            });
2780            scope.items[i].from_single_column_function = num_cols == 1;
2781            scope.items[i].allow_unqualified_references = false;
2782        }
2783        return Ok((expr, scope));
2784    }
2785    if table_funcs.keys().any(is_repeat_row) {
2786        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
2787        // error message would be misleading, because it would refer to WITH ORDINALITY.
2788        bail_unsupported!(format!(
2789            "{} in a SELECT clause with multiple table functions",
2790            REPEAT_ROW_NAME
2791        ));
2792    }
2793    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2794    let (expr, mut scope, num_cols) =
2795        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2796
2797    // Munge the scope so table names match with the generated ids.
2798    let mut i = 0;
2799    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2800        for _ in 0..num_cols {
2801            scope.items[i].table_name = Some(PartialItemName {
2802                database: None,
2803                schema: None,
2804                item: id.clone(),
2805            });
2806            scope.items[i].from_single_column_function = num_cols == 1;
2807            scope.items[i].allow_unqualified_references = false;
2808            i += 1;
2809        }
2810        // Ordinality column. This doubles as the
2811        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2812        // because it only needs to be NULL or not.
2813        scope.items[i].table_name = Some(PartialItemName {
2814            database: None,
2815            schema: None,
2816            item: id.clone(),
2817        });
2818        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2819        scope.items[i].allow_unqualified_references = false;
2820        i += 1;
2821    }
2822    // Coalesced ordinality column.
2823    scope.items[i].allow_unqualified_references = false;
2824    Ok((expr, scope))
2825}
2826
2827/// Plans an expression in a `GROUP BY` clause.
2828///
2829/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2830/// names/expressions defined in the `SELECT` clause. These special cases are
2831/// handled by this function; see comments within the implementation for
2832/// details.
2833fn plan_group_by_expr<'a>(
2834    ecx: &ExprContext,
2835    group_expr: &'a Expr<Aug>,
2836    projection: &'a [(ExpandedSelectItem, ColumnName)],
2837) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2838    let plan_projection = |column: usize| match &projection[column].0 {
2839        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2840        ExpandedSelectItem::Expr(expr) => {
2841            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2842        }
2843    };
2844
2845    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2846    // a special case that means to use the ith item in the SELECT clause.
2847    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2848        return plan_projection(column);
2849    }
2850
2851    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2852    // The `foo` can refer to *either* an input column or an output column. If
2853    // both exist, the input column is preferred.
2854    match group_expr {
2855        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2856            Err(PlanError::UnknownColumn {
2857                table: None,
2858                column,
2859                similar,
2860            }) => {
2861                // The expression was a simple identifier that did not match an
2862                // input column. See if it matches an output column.
2863                let mut iter = projection.iter().map(|(_expr, name)| name);
2864                if let Some(i) = iter.position(|n| *n == column) {
2865                    if iter.any(|n| *n == column) {
2866                        Err(PlanError::AmbiguousColumn(column))
2867                    } else {
2868                        plan_projection(i)
2869                    }
2870                } else {
2871                    // The name didn't match an output column either. Return the
2872                    // "unknown column" error.
2873                    Err(PlanError::UnknownColumn {
2874                        table: None,
2875                        column,
2876                        similar,
2877                    })
2878                }
2879            }
2880            res => Ok((Some(group_expr), res?)),
2881        },
2882        _ => Ok((
2883            Some(group_expr),
2884            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2885        )),
2886    }
2887}
2888
2889/// Plans a slice of `ORDER BY` expressions.
2890///
2891/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2892/// parameter.
2893///
2894/// Returns the determined column orderings and a list of scalar expressions
2895/// that must be mapped onto the underlying relation expression.
2896pub(crate) fn plan_order_by_exprs(
2897    ecx: &ExprContext,
2898    order_by_exprs: &[OrderByExpr<Aug>],
2899    output_columns: &[(usize, &ColumnName)],
2900) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2901    let mut order_by = vec![];
2902    let mut map_exprs = vec![];
2903    for obe in order_by_exprs {
2904        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2905        // If the expression is a reference to an existing column,
2906        // do not introduce a new column to support it.
2907        let column = match expr {
2908            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2909            _ => {
2910                map_exprs.push(expr);
2911                ecx.relation_type.arity() + map_exprs.len() - 1
2912            }
2913        };
2914        order_by.push(resolve_desc_and_nulls_last(obe, column));
2915    }
2916    Ok((order_by, map_exprs))
2917}
2918
2919/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2920///
2921/// The `output_columns` parameter describes, in order, the physical index and
2922/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2923/// corresponds to a `SELECT` list with a single entry named "a" that can be
2924/// found at index 3 in the underlying relation expression.
2925///
2926/// There are three cases to handle.
2927///
2928///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2929///       reference to the specified output column.
2930///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2931///       an output column, if it exists; otherwise it is a reference to an
2932///       input column.
2933///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2934///       arbitrary expressions exclusively refer to input columns, never output
2935///       columns.
2936fn plan_order_by_or_distinct_expr(
2937    ecx: &ExprContext,
2938    expr: &Expr<Aug>,
2939    output_columns: &[(usize, &ColumnName)],
2940) -> Result<HirScalarExpr, PlanError> {
2941    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2942        return Ok(HirScalarExpr::column(output_columns[i].0));
2943    }
2944
2945    if let Expr::Identifier(names) = expr {
2946        if let [name] = &names[..] {
2947            let name = normalize::column_name(name.clone());
2948            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2949            if let Some((i, _)) = iter.next() {
2950                match iter.next() {
2951                    // Per SQL92, names are not considered ambiguous if they
2952                    // refer to identical target list expressions, as in
2953                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2954                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2955                    _ => return Ok(HirScalarExpr::column(*i)),
2956                }
2957            }
2958        }
2959    }
2960
2961    plan_expr(ecx, expr)?.type_as_any(ecx)
2962}
2963
2964fn plan_table_with_joins(
2965    qcx: &QueryContext,
2966    table_with_joins: &TableWithJoins<Aug>,
2967) -> Result<(HirRelationExpr, Scope), PlanError> {
2968    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2969    for join in &table_with_joins.joins {
2970        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2971        expr = new_expr;
2972        scope = new_scope;
2973    }
2974    Ok((expr, scope))
2975}
2976
2977fn plan_table_factor(
2978    qcx: &QueryContext,
2979    table_factor: &TableFactor<Aug>,
2980) -> Result<(HirRelationExpr, Scope), PlanError> {
2981    match table_factor {
2982        TableFactor::Table { name, alias } => {
2983            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2984            let scope = plan_table_alias(scope, alias.as_ref())?;
2985            Ok((expr, scope))
2986        }
2987
2988        TableFactor::Function {
2989            function,
2990            alias,
2991            with_ordinality,
2992        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2993
2994        TableFactor::RowsFrom {
2995            functions,
2996            alias,
2997            with_ordinality,
2998        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2999
3000        TableFactor::Derived {
3001            lateral,
3002            subquery,
3003            alias,
3004        } => {
3005            let mut qcx = (*qcx).clone();
3006            if !lateral {
3007                // Since this derived table was not marked as `LATERAL`,
3008                // make elements in outer scopes invisible until we reach the
3009                // next lateral barrier.
3010                for scope in &mut qcx.outer_scopes {
3011                    if scope.lateral_barrier {
3012                        break;
3013                    }
3014                    scope.items.clear();
3015                }
3016            }
3017            qcx.outer_scopes[0].lateral_barrier = true;
3018            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3019            let scope = plan_table_alias(scope, alias.as_ref())?;
3020            Ok((expr, scope))
3021        }
3022
3023        TableFactor::NestedJoin { join, alias } => {
3024            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3025            let scope = plan_table_alias(scope, alias.as_ref())?;
3026            Ok((expr, scope))
3027        }
3028    }
3029}
3030
3031/// Plans a `ROWS FROM` expression.
3032///
3033/// `ROWS FROM` concatenates table functions into a single table, filling in
3034/// `NULL`s in places where one table function has fewer rows than another. We
3035/// can achieve this by augmenting each table function with a row number, doing
3036/// a `FULL JOIN` between each table function on the row number and eventually
3037/// projecting away the row number columns. Concretely, the following query
3038/// using `ROWS FROM`
3039///
3040/// ```sql
3041/// SELECT
3042///     *
3043/// FROM
3044///     ROWS FROM (
3045///         generate_series(1, 2),
3046///         information_schema._pg_expandarray(ARRAY[9]),
3047///         generate_series(3, 6)
3048///     );
3049/// ```
3050///
3051/// is equivalent to the following query that does not use `ROWS FROM`:
3052///
3053/// ```sql
3054/// SELECT
3055///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3056/// FROM
3057///     generate_series(1, 2) WITH ORDINALITY AS gs1
3058///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3059///         ON gs1.ordinality = expand.ordinality
3060///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3061///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3062/// ```
3063///
3064/// Note the call to `coalesce` in the last join condition, which ensures that
3065/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3066///
3067/// This function creates a HirRelationExpr that follows the structure of the
3068/// latter query.
3069///
3070/// `with_ordinality` can be used to have the output expression contain a
3071/// single coalesced ordinality column at the end of the entire expression.
3072fn plan_rows_from(
3073    qcx: &QueryContext,
3074    functions: &[Function<Aug>],
3075    alias: Option<&TableAlias>,
3076    with_ordinality: bool,
3077) -> Result<(HirRelationExpr, Scope), PlanError> {
3078    // The `repeat_row` function is not supported in ROWS FROM.
3079    if functions.iter().any(is_repeat_row) {
3080        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
3081        // error message would be misleading, because it would refer to WITH ORDINALITY instead of
3082        // ROWS FROM.
3083        bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3084    }
3085
3086    // If there's only a single table function, planning proceeds as if `ROWS
3087    // FROM` hadn't been written at all.
3088    if let [function] = functions {
3089        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3090    }
3091
3092    // Per PostgreSQL, all scope items take the name of the first function
3093    // (unless aliased).
3094    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3095    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3096        qcx,
3097        functions,
3098        Some(functions[0].name.full_item_name().clone()),
3099    )?;
3100
3101    // Columns tracks the set of columns we will keep in the projection.
3102    let mut columns = Vec::new();
3103    let mut offset = 0;
3104    // Retain table function's non-ordinality columns.
3105    for (idx, cols) in num_cols.into_iter().enumerate() {
3106        for i in 0..cols {
3107            columns.push(offset + i);
3108        }
3109        offset += cols + 1;
3110
3111        // Remove the ordinality column from the scope, accounting for previous scope
3112        // changes from this loop.
3113        scope.items.remove(offset - idx - 1);
3114    }
3115
3116    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3117    // column. Otherwise remove it from the scope.
3118    if with_ordinality {
3119        columns.push(offset);
3120    } else {
3121        scope.items.pop();
3122    }
3123
3124    let expr = expr.project(columns);
3125
3126    let scope = plan_table_alias(scope, alias)?;
3127    Ok((expr, scope))
3128}
3129
3130fn is_repeat_row(f: &Function<Aug>) -> bool {
3131    f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3132}
3133
3134/// Plans an expression coalescing multiple table functions. Each table
3135/// function is followed by its row ordinality. The entire expression is
3136/// followed by the coalesced row ordinality.
3137///
3138/// The returned Scope will set all item's table_name's to the `table_name`
3139/// parameter if it is `Some`. If `None`, they will be the name of each table
3140/// function.
3141///
3142/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3143/// each table function.
3144///
3145/// For example, with table functions tf1 returning 1 column (a) and tf2
3146/// returning 2 columns (b, c), this function will return an expr 6 columns:
3147///
3148/// - tf1.a
3149/// - tf1.ordinality
3150/// - tf2.b
3151/// - tf2.c
3152/// - tf2.ordinality
3153/// - coalesced_ordinality
3154///
3155/// And a `Vec<usize>` of `[1, 2]`.
3156fn plan_rows_from_internal<'a>(
3157    qcx: &QueryContext,
3158    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3159    table_name: Option<FullItemName>,
3160) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3161    let mut functions = functions.into_iter();
3162    let mut num_cols = Vec::new();
3163
3164    // Join together each of the table functions in turn. The last column is
3165    // always the column to join against and is maintained to be the coalescence
3166    // of the row number column for all prior functions.
3167    let (mut left_expr, mut left_scope) =
3168        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3169    num_cols.push(left_scope.len() - 1);
3170    // Create the coalesced ordinality column.
3171    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3172    left_scope
3173        .items
3174        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3175
3176    for function in functions {
3177        // The right hand side of a join must be planned in a new scope.
3178        let qcx = qcx.empty_derived_context();
3179        let (right_expr, mut right_scope) =
3180            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3181        num_cols.push(right_scope.len() - 1);
3182        let left_col = left_scope.len() - 1;
3183        let right_col = left_scope.len() + right_scope.len() - 1;
3184        let on = HirScalarExpr::call_binary(
3185            HirScalarExpr::column(left_col),
3186            HirScalarExpr::column(right_col),
3187            expr_func::Eq,
3188        );
3189        left_expr = left_expr
3190            .join(right_expr, on, JoinKind::FullOuter)
3191            .map(vec![HirScalarExpr::call_variadic(
3192                Coalesce,
3193                vec![
3194                    HirScalarExpr::column(left_col),
3195                    HirScalarExpr::column(right_col),
3196                ],
3197            )]);
3198
3199        // Project off the previous iteration's coalesced column, but keep both of this
3200        // iteration's ordinality columns.
3201        left_expr = left_expr.project(
3202            (0..left_col) // non-coalesced ordinality columns from left function
3203                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3204                .collect(),
3205        );
3206        // Move the coalesced ordinality column.
3207        right_scope.items.push(left_scope.items.pop().unwrap());
3208
3209        left_scope.items.extend(right_scope.items);
3210    }
3211
3212    Ok((left_expr, left_scope, num_cols))
3213}
3214
3215/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3216/// FROM` clause that contains other table functions. Special aliasing rules
3217/// apply.
3218fn plan_solitary_table_function(
3219    qcx: &QueryContext,
3220    function: &Function<Aug>,
3221    alias: Option<&TableAlias>,
3222    with_ordinality: bool,
3223) -> Result<(HirRelationExpr, Scope), PlanError> {
3224    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3225
3226    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3227    if single_column_function {
3228        let item = &mut scope.items[0];
3229
3230        // Mark that the function only produced a single column. This impacts
3231        // whole-row references.
3232        item.from_single_column_function = true;
3233
3234        // Strange special case for solitary table functions that output one
3235        // column whose name matches the name of the table function. If a table
3236        // alias is provided, the column name is changed to the table alias's
3237        // name. Concretely, the following query returns a column named `x`
3238        // rather than a column named `generate_series`:
3239        //
3240        //     SELECT * FROM generate_series(1, 5) AS x
3241        //
3242        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3243        // since its output column is explicitly named `value`, not
3244        // `jsonb_array_elements`.
3245        //
3246        // Note also that we may (correctly) change the column name again when
3247        // we plan the table alias below if the `alias.columns` is non-empty.
3248        if let Some(alias) = alias {
3249            if let ScopeItem {
3250                table_name: Some(table_name),
3251                column_name,
3252                ..
3253            } = item
3254            {
3255                if table_name.item.as_str() == column_name.as_str() {
3256                    *column_name = normalize::column_name(alias.name.clone());
3257                }
3258            }
3259        }
3260    }
3261
3262    let scope = plan_table_alias(scope, alias)?;
3263    Ok((expr, scope))
3264}
3265
3266/// Plans a table function.
3267///
3268/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3269/// instead to get the appropriate aliasing behavior.
3270fn plan_table_function_internal(
3271    qcx: &QueryContext,
3272    Function {
3273        name,
3274        args,
3275        filter,
3276        over,
3277        distinct,
3278    }: &Function<Aug>,
3279    with_ordinality: bool,
3280    table_name: Option<FullItemName>,
3281) -> Result<(HirRelationExpr, Scope), PlanError> {
3282    assert_none!(filter, "cannot parse table function with FILTER");
3283    assert_none!(over, "cannot parse table function with OVER");
3284    assert!(!*distinct, "cannot parse table function with DISTINCT");
3285
3286    let ecx = &ExprContext {
3287        qcx,
3288        name: "table function arguments",
3289        scope: &Scope::empty(),
3290        relation_type: &SqlRelationType::empty(),
3291        allow_aggregates: false,
3292        allow_subqueries: true,
3293        allow_parameters: true,
3294        allow_windows: false,
3295    };
3296
3297    let scalar_args = match args {
3298        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3299        FunctionArgs::Args { args, order_by } => {
3300            if !order_by.is_empty() {
3301                sql_bail!(
3302                    "ORDER BY specified, but {} is not an aggregate function",
3303                    name
3304                );
3305            }
3306            plan_exprs(ecx, args)?
3307        }
3308    };
3309
3310    let table_name = match table_name {
3311        Some(table_name) => table_name.item,
3312        None => name.full_item_name().item.clone(),
3313    };
3314
3315    let scope_name = Some(PartialItemName {
3316        database: None,
3317        schema: None,
3318        item: table_name,
3319    });
3320
3321    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3322        Func::Table(impls) => {
3323            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3324            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3325            let expr = match tf.imp {
3326                TableFuncImpl::CallTable { mut func, exprs } => {
3327                    if with_ordinality {
3328                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3329                            PlanError::Unsupported {
3330                                feature: format!("WITH ORDINALITY on {}", func),
3331                                discussion_no: None,
3332                            },
3333                        )?;
3334                    }
3335                    HirRelationExpr::CallTable { func, exprs }
3336                }
3337                TableFuncImpl::Expr(expr) => {
3338                    if !with_ordinality {
3339                        expr
3340                    } else {
3341                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3342                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3343                        // back to the legacy implementation or error out the query.
3344                        if qcx
3345                            .scx
3346                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3347                        {
3348                            // Note that this can give an incorrect ordering, and also has an extreme
3349                            // performance problem in some cases. See the doc comment of
3350                            // `TableFuncImpl`.
3351                            tracing::error!(
3352                                %name,
3353                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3354                            );
3355                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3356                                func: WindowExprType::Scalar(ScalarWindowExpr {
3357                                    func: ScalarWindowFunc::RowNumber,
3358                                    order_by: vec![],
3359                                }),
3360                                partition_by: vec![],
3361                                order_by: vec![],
3362                            })])
3363                        } else {
3364                            bail_unsupported!(format!(
3365                                "WITH ORDINALITY or ROWS FROM with {}",
3366                                name
3367                            ));
3368                        }
3369                    }
3370                }
3371            };
3372            (expr, scope)
3373        }
3374        Func::Scalar(impls) => {
3375            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3376            let output = expr.typ(
3377                &qcx.outer_relation_types,
3378                &SqlRelationType::new(vec![]),
3379                &qcx.scx.param_types.borrow(),
3380            );
3381
3382            let relation = SqlRelationType::new(vec![output]);
3383
3384            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3385            let column_name = normalize::column_name(function_ident);
3386            let name = column_name.to_string();
3387
3388            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3389
3390            let mut func = TableFunc::TabletizedScalar { relation, name };
3391            if with_ordinality {
3392                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3393                    feature: format!("WITH ORDINALITY on {}", func),
3394                    discussion_no: None,
3395                })?;
3396            }
3397            (
3398                HirRelationExpr::CallTable {
3399                    func,
3400                    exprs: vec![expr],
3401                },
3402                scope,
3403            )
3404        }
3405        o => sql_bail!(
3406            "{} functions are not supported in functions in FROM",
3407            o.class()
3408        ),
3409    };
3410
3411    if with_ordinality {
3412        scope
3413            .items
3414            .push(ScopeItem::from_name(scope_name, "ordinality"));
3415    }
3416
3417    Ok((expr, scope))
3418}
3419
3420fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3421    if let Some(TableAlias {
3422        name,
3423        columns,
3424        strict,
3425    }) = alias
3426    {
3427        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3428            sql_bail!(
3429                "{} has {} columns available but {} columns specified",
3430                name,
3431                scope.items.len(),
3432                columns.len()
3433            );
3434        }
3435
3436        let table_name = normalize::ident(name.to_owned());
3437        for (i, item) in scope.items.iter_mut().enumerate() {
3438            item.table_name = if item.allow_unqualified_references {
3439                Some(PartialItemName {
3440                    database: None,
3441                    schema: None,
3442                    item: table_name.clone(),
3443                })
3444            } else {
3445                // Columns that prohibit unqualified references are special
3446                // columns from the output of a NATURAL or USING join that can
3447                // only be referenced by their full, pre-join name. Applying an
3448                // alias to the output of that join renders those columns
3449                // inaccessible, which we accomplish here by setting the
3450                // table name to `None`.
3451                //
3452                // Concretely, consider:
3453                //
3454                //      CREATE TABLE t1 (a int);
3455                //      CREATE TABLE t2 (a int);
3456                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3457                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3458                //
3459                // In (1), the join has no alias. The underlying columns from
3460                // either side of the join can be referenced as `t1.a` and
3461                // `t2.a`, respectively, and the unqualified name `a` refers to
3462                // a column whose value is `coalesce(t1.a, t2.a)`.
3463                //
3464                // In (2), the join is aliased as `t`. The columns from either
3465                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3466                // the coalesced column can be named as either `a` or `t.a`.
3467                //
3468                // We previously had a bug [0] that mishandled this subtle
3469                // logic.
3470                //
3471                // NOTE(benesch): We could in theory choose to project away
3472                // those inaccessible columns and drop them from the scope
3473                // entirely, but that would require that this function also
3474                // take and return the `HirRelationExpr` that is being aliased,
3475                // which is a rather large refactor.
3476                //
3477                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3478                None
3479            };
3480            item.column_name = columns
3481                .get(i)
3482                .map(|a| normalize::column_name(a.clone()))
3483                .unwrap_or_else(|| item.column_name.clone());
3484        }
3485    }
3486    Ok(scope)
3487}
3488
3489// `table_func_names` is a mapping from a UUID to the original function
3490// name. The UUIDs are identifiers that have been rewritten from some table
3491// function expression, and this mapping restores the original names.
3492fn invent_column_name(
3493    ecx: &ExprContext,
3494    expr: &Expr<Aug>,
3495    table_func_names: &BTreeMap<String, Ident>,
3496) -> Result<Option<ColumnName>, PlanError> {
3497    // We follow PostgreSQL exactly here, which has some complicated rules
3498    // around "high" and "low" quality names. Low quality names override other
3499    // low quality names but not high quality names.
3500    //
3501    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3502
3503    #[derive(Debug)]
3504    enum NameQuality {
3505        Low,
3506        High,
3507    }
3508
3509    fn invent(
3510        ecx: &ExprContext,
3511        expr: &Expr<Aug>,
3512        table_func_names: &BTreeMap<String, Ident>,
3513    ) -> Result<Option<(ColumnName, NameQuality)>, PlanError> {
3514        Ok(match expr {
3515            Expr::Identifier(names) => {
3516                if let [name] = names.as_slice() {
3517                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3518                        return Ok(Some((
3519                            normalize::column_name(table_func_name.clone()),
3520                            NameQuality::High,
3521                        )));
3522                    }
3523                }
3524                names
3525                    .last()
3526                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3527            }
3528            Expr::Value(v) => match v {
3529                // Per PostgreSQL, `bool` and `interval` literals take on the name
3530                // of their type, but not other literal types.
3531                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3532                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3533                _ => None,
3534            },
3535            Expr::Function(func) => {
3536                let (schema, item) = match &func.name {
3537                    ResolvedItemName::Item {
3538                        qualifiers,
3539                        full_name,
3540                        ..
3541                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3542                    // Name resolution should have rejected anything other than
3543                    // `Item` for a function call.
3544                    _ => {
3545                        bail_internal!("function name did not resolve to an item: {:?}", func.name)
3546                    }
3547                };
3548
3549                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3550                    || schema
3551                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3552                {
3553                    None
3554                } else {
3555                    Some((item.into(), NameQuality::High))
3556                }
3557            }
3558            Expr::HomogenizingFunction { function, .. } => Some((
3559                function.to_string().to_lowercase().into(),
3560                NameQuality::High,
3561            )),
3562            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3563            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3564            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3565            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3566            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names)? {
3567                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3568                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3569            },
3570            Expr::Case { else_result, .. } => {
3571                let inner = match else_result.as_ref() {
3572                    Some(else_result) => invent(ecx, else_result, table_func_names)?,
3573                    None => None,
3574                };
3575                match inner {
3576                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3577                    _ => Some(("case".into(), NameQuality::Low)),
3578                }
3579            }
3580            Expr::FieldAccess { field, .. } => {
3581                Some((normalize::column_name(field.clone()), NameQuality::High))
3582            }
3583            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3584            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names)?,
3585            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3586                // A bit silly to have to plan the query here just to get its column
3587                // name, since we throw away the planned expression, but fixing this
3588                // requires a separate semantic analysis phase.
3589                //
3590                // We deliberately swallow planning errors here: if the subquery
3591                // doesn't plan, we just don't invent a name for it; the real
3592                // planning attempt elsewhere will surface the error.
3593                let Ok((_expr, scope)) = plan_nested_query(&mut ecx.derived_query_context(), query)
3594                else {
3595                    return Ok(None);
3596                };
3597                scope
3598                    .items
3599                    .first()
3600                    .map(|name| (name.column_name.clone(), NameQuality::High))
3601            }
3602            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3603            _ => None,
3604        })
3605    }
3606
3607    Ok(invent(ecx, expr, table_func_names)?.map(|(name, _quality)| name))
3608}
3609
3610#[derive(Debug)]
3611enum ExpandedSelectItem<'a> {
3612    InputOrdinal(usize),
3613    Expr(Cow<'a, Expr<Aug>>),
3614}
3615
3616impl ExpandedSelectItem<'_> {
3617    fn as_expr(&self) -> Option<&Expr<Aug>> {
3618        match self {
3619            ExpandedSelectItem::InputOrdinal(_) => None,
3620            ExpandedSelectItem::Expr(expr) => Some(expr),
3621        }
3622    }
3623}
3624
3625fn expand_select_item<'a>(
3626    ecx: &ExprContext,
3627    s: &'a SelectItem<Aug>,
3628    table_func_names: &BTreeMap<String, Ident>,
3629) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3630    match s {
3631        SelectItem::Expr {
3632            expr: Expr::QualifiedWildcard(table_name),
3633            alias: _,
3634        } => {
3635            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3636            let table_name =
3637                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3638            let out: Vec<_> = ecx
3639                .scope
3640                .items
3641                .iter()
3642                .enumerate()
3643                .filter(|(_i, item)| item.is_from_table(&table_name))
3644                .map(|(i, item)| {
3645                    let name = item.column_name.clone();
3646                    (ExpandedSelectItem::InputOrdinal(i), name)
3647                })
3648                .collect();
3649            if out.is_empty() {
3650                sql_bail!("no table named '{}' in scope", table_name);
3651            }
3652            Ok(out)
3653        }
3654        SelectItem::Expr {
3655            expr: Expr::WildcardAccess(sql_expr),
3656            alias: _,
3657        } => {
3658            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3659            // A bit silly to have to plan the expression here just to get its
3660            // type, since we throw away the planned expression, but fixing this
3661            // requires a separate semantic analysis phase. Luckily this is an
3662            // uncommon operation and the PostgreSQL docs have a warning that
3663            // this operation is slow in Postgres too.
3664            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3665            let fields = match ecx.scalar_type(&expr) {
3666                SqlScalarType::Record { fields, .. } => fields,
3667                ty => sql_bail!(
3668                    "type {} is not composite",
3669                    ecx.humanize_sql_scalar_type(&ty, false)
3670                ),
3671            };
3672            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3673            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3674                if let [name] = ident.as_slice() {
3675                    if let Ok(items) = ecx.scope.items_from_table(
3676                        &[],
3677                        &PartialItemName {
3678                            database: None,
3679                            schema: None,
3680                            item: name.as_str().to_string(),
3681                        },
3682                    ) {
3683                        for (_, item) in items {
3684                            if item
3685                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3686                            {
3687                                skip_cols.insert(item.column_name.clone());
3688                            }
3689                        }
3690                    }
3691                }
3692            }
3693            let items = fields
3694                .iter()
3695                .filter_map(|(name, _ty)| {
3696                    if skip_cols.contains(name) {
3697                        None
3698                    } else {
3699                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3700                            expr: sql_expr.clone(),
3701                            field: name.clone().into(),
3702                        }));
3703                        Some((item, name.clone()))
3704                    }
3705                })
3706                .collect();
3707            Ok(items)
3708        }
3709        SelectItem::Wildcard => {
3710            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3711            let items: Vec<_> = ecx
3712                .scope
3713                .items
3714                .iter()
3715                .enumerate()
3716                .filter(|(_i, item)| item.allow_unqualified_references)
3717                .map(|(i, item)| {
3718                    let name = item.column_name.clone();
3719                    (ExpandedSelectItem::InputOrdinal(i), name)
3720                })
3721                .collect();
3722
3723            Ok(items)
3724        }
3725        SelectItem::Expr { expr, alias } => {
3726            let name = match alias.clone().map(normalize::column_name) {
3727                Some(name) => name,
3728                None => invent_column_name(ecx, expr, table_func_names)?
3729                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into()),
3730            };
3731            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3732        }
3733    }
3734}
3735
3736fn plan_join(
3737    left_qcx: &QueryContext,
3738    left: HirRelationExpr,
3739    left_scope: Scope,
3740    join: &Join<Aug>,
3741) -> Result<(HirRelationExpr, Scope), PlanError> {
3742    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3743    let (kind, constraint) = match &join.join_operator {
3744        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3745        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3746        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3747        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3748        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3749    };
3750
3751    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3752    if !kind.can_be_correlated() {
3753        for item in &mut right_qcx.outer_scopes[0].items {
3754            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3755            // these items from scope. These items need to *exist* because they
3756            // might shadow variables in outer scopes that would otherwise be
3757            // valid to reference, but accessing them needs to produce an error.
3758            item.error_if_referenced =
3759                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3760                    table: table.cloned(),
3761                    column: column.clone(),
3762                });
3763        }
3764    }
3765    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3766
3767    let (expr, scope) = match constraint {
3768        JoinConstraint::On(expr) => {
3769            let product_scope = left_scope.product(right_scope)?;
3770            let ecx = &ExprContext {
3771                qcx: left_qcx,
3772                name: "ON clause",
3773                scope: &product_scope,
3774                relation_type: &SqlRelationType::new(
3775                    left_qcx
3776                        .relation_type(&left)
3777                        .column_types
3778                        .into_iter()
3779                        .chain(right_qcx.relation_type(&right).column_types)
3780                        .collect(),
3781                ),
3782                allow_aggregates: false,
3783                allow_subqueries: true,
3784                allow_parameters: true,
3785                allow_windows: false,
3786            };
3787            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3788            let joined = left.join(right, on, kind);
3789            (joined, product_scope)
3790        }
3791        JoinConstraint::Using { columns, alias } => {
3792            let column_names = columns
3793                .iter()
3794                .map(|ident| normalize::column_name(ident.clone()))
3795                .collect::<Vec<_>>();
3796
3797            plan_using_constraint(
3798                &column_names,
3799                left_qcx,
3800                left,
3801                left_scope,
3802                &right_qcx,
3803                right,
3804                right_scope,
3805                kind,
3806                alias.as_ref(),
3807            )?
3808        }
3809        JoinConstraint::Natural => {
3810            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3811            // have the same scx. However, it doesn't hurt to be safe.
3812            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3813            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3814            let left_column_names = left_scope.column_names();
3815            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3816            let column_names: Vec<_> = left_column_names
3817                .filter(|col| right_column_names.contains(col))
3818                .cloned()
3819                .collect();
3820            plan_using_constraint(
3821                &column_names,
3822                left_qcx,
3823                left,
3824                left_scope,
3825                &right_qcx,
3826                right,
3827                right_scope,
3828                kind,
3829                None,
3830            )?
3831        }
3832    };
3833    Ok((expr, scope))
3834}
3835
3836// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3837#[allow(clippy::too_many_arguments)]
3838fn plan_using_constraint(
3839    column_names: &[ColumnName],
3840    left_qcx: &QueryContext,
3841    left: HirRelationExpr,
3842    left_scope: Scope,
3843    right_qcx: &QueryContext,
3844    right: HirRelationExpr,
3845    right_scope: Scope,
3846    kind: JoinKind,
3847    alias: Option<&Ident>,
3848) -> Result<(HirRelationExpr, Scope), PlanError> {
3849    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3850
3851    // Cargo culting PG here; no discernable reason this must fail, but PG does
3852    // so we do, as well.
3853    let mut unique_column_names = BTreeSet::new();
3854    for c in column_names {
3855        if !unique_column_names.insert(c) {
3856            return Err(PlanError::Unsupported {
3857                feature: format!(
3858                    "column name {} appears more than once in USING clause",
3859                    c.quoted()
3860                ),
3861                discussion_no: None,
3862            });
3863        }
3864    }
3865
3866    let alias_item_name = alias.map(|alias| PartialItemName {
3867        database: None,
3868        schema: None,
3869        item: alias.clone().to_string(),
3870    });
3871
3872    if let Some(alias_item_name) = &alias_item_name {
3873        for partial_item_name in both_scope.table_names() {
3874            if partial_item_name.matches(alias_item_name) {
3875                sql_bail!(
3876                    "table name \"{}\" specified more than once",
3877                    alias_item_name
3878                )
3879            }
3880        }
3881    }
3882
3883    let ecx = &ExprContext {
3884        qcx: right_qcx,
3885        name: "USING clause",
3886        scope: &both_scope,
3887        relation_type: &SqlRelationType::new(
3888            left_qcx
3889                .relation_type(&left)
3890                .column_types
3891                .into_iter()
3892                .chain(right_qcx.relation_type(&right).column_types)
3893                .collect(),
3894        ),
3895        allow_aggregates: false,
3896        allow_subqueries: false,
3897        allow_parameters: false,
3898        allow_windows: false,
3899    };
3900
3901    let mut join_exprs = vec![];
3902    let mut map_exprs = vec![];
3903    let mut new_items = vec![];
3904    let mut join_cols = vec![];
3905    let mut hidden_cols = vec![];
3906
3907    for column_name in column_names {
3908        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3909        let (lhs, lhs_name) = left_scope.resolve_using_column(
3910            column_name,
3911            JoinSide::Left,
3912            &mut left_qcx.name_manager.borrow_mut(),
3913        )?;
3914        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3915            column_name,
3916            JoinSide::Right,
3917            &mut right_qcx.name_manager.borrow_mut(),
3918        )?;
3919
3920        // Adjust the RHS reference to its post-join location.
3921        rhs.column += left_scope.len();
3922
3923        // Join keys must be resolved to same type.
3924        let mut exprs = coerce_homogeneous_exprs(
3925            &ecx.with_name(&format!(
3926                "NATURAL/USING join column {}",
3927                column_name.quoted()
3928            )),
3929            vec![
3930                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3931                    lhs,
3932                    Arc::clone(&lhs_name),
3933                )),
3934                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3935                    rhs,
3936                    Arc::clone(&rhs_name),
3937                )),
3938            ],
3939            None,
3940        )?;
3941        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3942
3943        match kind {
3944            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3945                join_cols.push(lhs.column);
3946                hidden_cols.push(rhs.column);
3947            }
3948            JoinKind::RightOuter => {
3949                join_cols.push(rhs.column);
3950                hidden_cols.push(lhs.column);
3951            }
3952            JoinKind::FullOuter => {
3953                // Create a new column that will be the coalesced value of left
3954                // and right.
3955                join_cols.push(both_scope.items.len() + map_exprs.len());
3956                hidden_cols.push(lhs.column);
3957                hidden_cols.push(rhs.column);
3958                map_exprs.push(HirScalarExpr::call_variadic(
3959                    Coalesce,
3960                    vec![expr1.clone(), expr2.clone()],
3961                ));
3962                new_items.push(ScopeItem::from_column_name(column_name));
3963            }
3964        }
3965
3966        // If a `join_using_alias` is present, add a new scope item that accepts
3967        // only table-qualified references for each specified join column.
3968        // Unlike regular table aliases, a `join_using_alias` should not hide the
3969        // names of the joined relations.
3970        if alias_item_name.is_some() {
3971            let new_item_col = both_scope.items.len() + new_items.len();
3972            join_cols.push(new_item_col);
3973            hidden_cols.push(new_item_col);
3974
3975            new_items.push(ScopeItem::from_name(
3976                alias_item_name.clone(),
3977                column_name.clone().to_string(),
3978            ));
3979
3980            // Should be safe to use either `lhs` or `rhs` here since the column
3981            // is available in both scopes and must have the same type of the new item.
3982            // We (arbitrarily) choose the left name.
3983            map_exprs.push(HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name)));
3984        }
3985
3986        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
3987    }
3988    both_scope.items.extend(new_items);
3989
3990    // The columns from the secondary side of the join remain accessible by
3991    // their table-qualified name, but not by their column name alone. They are
3992    // also excluded from `SELECT *`.
3993    for c in hidden_cols {
3994        both_scope.items[c].allow_unqualified_references = false;
3995    }
3996
3997    // Reproject all returned elements to the front of the list.
3998    let project_key = join_cols
3999        .into_iter()
4000        .chain(0..both_scope.items.len())
4001        .unique()
4002        .collect::<Vec<_>>();
4003
4004    both_scope = both_scope.project(&project_key);
4005
4006    let on = HirScalarExpr::variadic_and(join_exprs);
4007
4008    let both = left
4009        .join(right, on, kind)
4010        .map(map_exprs)
4011        .project(project_key);
4012    Ok((both, both_scope))
4013}
4014
4015pub fn plan_expr<'a>(
4016    ecx: &'a ExprContext,
4017    e: &Expr<Aug>,
4018) -> Result<CoercibleScalarExpr, PlanError> {
4019    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4020}
4021
4022fn plan_expr_inner<'a>(
4023    ecx: &'a ExprContext,
4024    e: &Expr<Aug>,
4025) -> Result<CoercibleScalarExpr, PlanError> {
4026    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4027        // We've already calculated this expression.
4028        return Ok(HirScalarExpr::named_column(
4029            i,
4030            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4031        )
4032        .into());
4033    }
4034
4035    match e {
4036        // Names.
4037        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4038            Ok(plan_identifier(ecx, names)?.into())
4039        }
4040
4041        // Literals.
4042        Expr::Value(val) => plan_literal(val),
4043        Expr::Parameter(n) => plan_parameter(ecx, *n),
4044        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4045        Expr::List(exprs) => plan_list(ecx, exprs, None),
4046        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4047        Expr::Row { exprs } => plan_row(ecx, exprs),
4048
4049        // Generalized functions, operators, and casts.
4050        Expr::Op { op, expr1, expr2 } => {
4051            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4052        }
4053        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4054        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4055
4056        // Special functions and operators.
4057        Expr::Not { expr } => plan_not(ecx, expr),
4058        Expr::And { left, right } => plan_and(ecx, left, right),
4059        Expr::Or { left, right } => plan_or(ecx, left, right),
4060        Expr::IsExpr {
4061            expr,
4062            construct,
4063            negated,
4064        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4065        Expr::Case {
4066            operand,
4067            conditions,
4068            results,
4069            else_result,
4070        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4071        Expr::HomogenizingFunction { function, exprs } => {
4072            plan_homogenizing_function(ecx, function, exprs)
4073        }
4074        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4075            ecx,
4076            &None,
4077            &[l_expr.clone().equals(*r_expr.clone())],
4078            &[Expr::null()],
4079            &Some(Box::new(*l_expr.clone())),
4080        )?
4081        .into()),
4082        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4083        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4084        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4085        Expr::Like {
4086            expr,
4087            pattern,
4088            escape,
4089            case_insensitive,
4090            negated,
4091        } => Ok(plan_like(
4092            ecx,
4093            expr,
4094            pattern,
4095            escape.as_deref(),
4096            *case_insensitive,
4097            *negated,
4098        )?
4099        .into()),
4100
4101        Expr::InList {
4102            expr,
4103            list,
4104            negated,
4105        } => plan_in_list(ecx, expr, list, negated),
4106
4107        // Subqueries.
4108        Expr::Exists(query) => plan_exists(ecx, query),
4109        Expr::Subquery(query) => plan_subquery(ecx, query),
4110        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4111        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4112        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4113        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4114        Expr::Nested(_) => bail_internal!("Expr::Nested should have been desugared"),
4115        Expr::InSubquery { .. } => {
4116            bail_internal!("Expr::InSubquery should have been desugared")
4117        }
4118        Expr::AnyExpr { .. } => {
4119            bail_internal!("Expr::AnyExpr should have been desugared")
4120        }
4121        Expr::AllExpr { .. } => {
4122            bail_internal!("Expr::AllExpr should have been desugared")
4123        }
4124        Expr::AnySubquery { .. } => {
4125            bail_internal!("Expr::AnySubquery should have been desugared")
4126        }
4127        Expr::AllSubquery { .. } => {
4128            bail_internal!("Expr::AllSubquery should have been desugared")
4129        }
4130        Expr::Between { .. } => {
4131            bail_internal!("Expr::Between should have been desugared")
4132        }
4133    }
4134}
4135
4136fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4137    if !ecx.allow_parameters {
4138        // It might be clearer to return an error like "cannot use parameter
4139        // here", but this is how PostgreSQL does it, and so for now we follow
4140        // PostgreSQL.
4141        return Err(PlanError::UnknownParameter(n));
4142    }
4143    if n == 0 || n > 65536 {
4144        return Err(PlanError::UnknownParameter(n));
4145    }
4146    if ecx.param_types().borrow().contains_key(&n) {
4147        Ok(HirScalarExpr::parameter(n).into())
4148    } else {
4149        Ok(CoercibleScalarExpr::Parameter(n))
4150    }
4151}
4152
4153fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4154    let mut out = vec![];
4155    for e in exprs {
4156        out.push(plan_expr(ecx, e)?);
4157    }
4158    Ok(CoercibleScalarExpr::LiteralRecord(out))
4159}
4160
4161fn plan_cast(
4162    ecx: &ExprContext,
4163    expr: &Expr<Aug>,
4164    data_type: &ResolvedDataType,
4165) -> Result<CoercibleScalarExpr, PlanError> {
4166    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4167    let expr = match expr {
4168        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4169        // we can pass in the target type as a type hint. This is
4170        // a limited form of the coercion that we do for string literals
4171        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4172        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4173        // PostgreSQL compatibility trouble.
4174        //
4175        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4176        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4177        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4178        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4179        _ => plan_expr(ecx, expr)?,
4180    };
4181    let ecx = &ecx.with_name("CAST");
4182    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4183    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4184    Ok(expr.into())
4185}
4186
4187fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4188    let ecx = ecx.with_name("NOT argument");
4189    Ok(plan_expr(&ecx, expr)?
4190        .type_as(&ecx, &SqlScalarType::Bool)?
4191        .call_unary(UnaryFunc::Not(expr_func::Not))
4192        .into())
4193}
4194
4195fn plan_and(
4196    ecx: &ExprContext,
4197    left: &Expr<Aug>,
4198    right: &Expr<Aug>,
4199) -> Result<CoercibleScalarExpr, PlanError> {
4200    let ecx = ecx.with_name("AND argument");
4201    Ok(HirScalarExpr::variadic_and(vec![
4202        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4203        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4204    ])
4205    .into())
4206}
4207
4208fn plan_or(
4209    ecx: &ExprContext,
4210    left: &Expr<Aug>,
4211    right: &Expr<Aug>,
4212) -> Result<CoercibleScalarExpr, PlanError> {
4213    let ecx = ecx.with_name("OR argument");
4214    Ok(HirScalarExpr::variadic_or(vec![
4215        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4216        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217    ])
4218    .into())
4219}
4220
4221fn plan_in_list(
4222    ecx: &ExprContext,
4223    lhs: &Expr<Aug>,
4224    list: &Vec<Expr<Aug>>,
4225    negated: &bool,
4226) -> Result<CoercibleScalarExpr, PlanError> {
4227    let ecx = ecx.with_name("IN list");
4228    let or = HirScalarExpr::variadic_or(
4229        list.into_iter()
4230            .map(|e| {
4231                let eq = lhs.clone().equals(e.clone());
4232                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4233            })
4234            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4235    );
4236    Ok(if *negated {
4237        or.call_unary(UnaryFunc::Not(expr_func::Not))
4238    } else {
4239        or
4240    }
4241    .into())
4242}
4243
4244fn plan_homogenizing_function(
4245    ecx: &ExprContext,
4246    function: &HomogenizingFunction,
4247    exprs: &[Expr<Aug>],
4248) -> Result<CoercibleScalarExpr, PlanError> {
4249    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4250    let expr = HirScalarExpr::call_variadic(
4251        match function {
4252            HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4253            HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4254            HomogenizingFunction::Least => VariadicFunc::from(Least),
4255        },
4256        coerce_homogeneous_exprs(
4257            &ecx.with_name(&function.to_string().to_lowercase()),
4258            plan_exprs(ecx, exprs)?,
4259            None,
4260        )?,
4261    );
4262    Ok(expr.into())
4263}
4264
4265fn plan_field_access(
4266    ecx: &ExprContext,
4267    expr: &Expr<Aug>,
4268    field: &Ident,
4269) -> Result<CoercibleScalarExpr, PlanError> {
4270    let field = normalize::column_name(field.clone());
4271    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4272    let ty = ecx.scalar_type(&expr);
4273    let i = match &ty {
4274        SqlScalarType::Record { fields, .. } => {
4275            fields.iter().position(|(name, _ty)| *name == field)
4276        }
4277        ty => sql_bail!(
4278            "column notation applied to type {}, which is not a composite type",
4279            ecx.humanize_sql_scalar_type(ty, false)
4280        ),
4281    };
4282    match i {
4283        None => sql_bail!(
4284            "field {} not found in data type {}",
4285            field,
4286            ecx.humanize_sql_scalar_type(&ty, false)
4287        ),
4288        Some(i) => Ok(expr
4289            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4290            .into()),
4291    }
4292}
4293
4294fn plan_subscript(
4295    ecx: &ExprContext,
4296    expr: &Expr<Aug>,
4297    positions: &[SubscriptPosition<Aug>],
4298) -> Result<CoercibleScalarExpr, PlanError> {
4299    assert!(
4300        !positions.is_empty(),
4301        "subscript expression must contain at least one position"
4302    );
4303
4304    let ecx = &ecx.with_name("subscripting");
4305    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4306    let ty = ecx.scalar_type(&expr);
4307    match &ty {
4308        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4309            ecx,
4310            expr,
4311            positions,
4312            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4313            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4314            // track in its backing data).
4315            if ty == SqlScalarType::Int2Vector {
4316                1
4317            } else {
4318                0
4319            },
4320        ),
4321        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4322        SqlScalarType::List { element_type, .. } => {
4323            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4324            let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4325            let n_layers = ty.unwrap_list_n_layers();
4326            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4327        }
4328        ty => sql_bail!(
4329            "cannot subscript type {}",
4330            ecx.humanize_sql_scalar_type(ty, false)
4331        ),
4332    }
4333}
4334
4335// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4336// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4337// any were slices (i.e. included colon).
4338fn extract_scalar_subscript_from_positions<'a>(
4339    positions: &'a [SubscriptPosition<Aug>],
4340    expr_type_name: &str,
4341) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4342    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4343    for p in positions {
4344        if p.explicit_slice {
4345            sql_bail!("{} subscript does not support slices", expr_type_name);
4346        }
4347        assert!(
4348            p.end.is_none(),
4349            "index-appearing subscripts cannot have end value"
4350        );
4351        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4352    }
4353    Ok(scalar_subscripts)
4354}
4355
4356fn plan_subscript_array(
4357    ecx: &ExprContext,
4358    expr: HirScalarExpr,
4359    positions: &[SubscriptPosition<Aug>],
4360    offset: i64,
4361) -> Result<CoercibleScalarExpr, PlanError> {
4362    let mut exprs = Vec::with_capacity(positions.len() + 1);
4363    exprs.push(expr);
4364
4365    // Subscripting arrays doesn't yet support slicing, so we always want to
4366    // extract scalars or error.
4367    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4368
4369    for i in indexes {
4370        exprs.push(plan_expr(ecx, i)?.cast_to(
4371            ecx,
4372            CastContext::Explicit,
4373            &SqlScalarType::Int64,
4374        )?);
4375    }
4376
4377    Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4378}
4379
4380fn plan_subscript_list(
4381    ecx: &ExprContext,
4382    mut expr: HirScalarExpr,
4383    positions: &[SubscriptPosition<Aug>],
4384    mut remaining_layers: usize,
4385    elem_type_name: &str,
4386) -> Result<CoercibleScalarExpr, PlanError> {
4387    let mut i = 0;
4388
4389    while i < positions.len() {
4390        // Take all contiguous index operations, i.e. find next slice operation.
4391        let j = positions[i..]
4392            .iter()
4393            .position(|p| p.explicit_slice)
4394            .unwrap_or(positions.len() - i);
4395        if j != 0 {
4396            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4397            let (n, e) = plan_index_list(
4398                ecx,
4399                expr,
4400                indexes.as_slice(),
4401                remaining_layers,
4402                elem_type_name,
4403            )?;
4404            remaining_layers = n;
4405            expr = e;
4406            i += j;
4407        }
4408
4409        // Take all contiguous slice operations, i.e. find next index operation.
4410        let j = positions[i..]
4411            .iter()
4412            .position(|p| !p.explicit_slice)
4413            .unwrap_or(positions.len() - i);
4414        if j != 0 {
4415            expr = plan_slice_list(
4416                ecx,
4417                expr,
4418                &positions[i..i + j],
4419                remaining_layers,
4420                elem_type_name,
4421            )?;
4422            i += j;
4423        }
4424    }
4425
4426    Ok(expr.into())
4427}
4428
4429fn plan_index_list(
4430    ecx: &ExprContext,
4431    expr: HirScalarExpr,
4432    indexes: &[&Expr<Aug>],
4433    n_layers: usize,
4434    elem_type_name: &str,
4435) -> Result<(usize, HirScalarExpr), PlanError> {
4436    let depth = indexes.len();
4437
4438    if depth > n_layers {
4439        if n_layers == 0 {
4440            sql_bail!("cannot subscript type {}", elem_type_name)
4441        } else {
4442            sql_bail!(
4443                "cannot index into {} layers; list only has {} layer{}",
4444                depth,
4445                n_layers,
4446                if n_layers == 1 { "" } else { "s" }
4447            )
4448        }
4449    }
4450
4451    let mut exprs = Vec::with_capacity(depth + 1);
4452    exprs.push(expr);
4453
4454    for i in indexes {
4455        exprs.push(plan_expr(ecx, i)?.cast_to(
4456            ecx,
4457            CastContext::Explicit,
4458            &SqlScalarType::Int64,
4459        )?);
4460    }
4461
4462    Ok((
4463        n_layers - depth,
4464        HirScalarExpr::call_variadic(ListIndex, exprs),
4465    ))
4466}
4467
4468fn plan_slice_list(
4469    ecx: &ExprContext,
4470    expr: HirScalarExpr,
4471    slices: &[SubscriptPosition<Aug>],
4472    n_layers: usize,
4473    elem_type_name: &str,
4474) -> Result<HirScalarExpr, PlanError> {
4475    if n_layers == 0 {
4476        sql_bail!("cannot subscript type {}", elem_type_name)
4477    }
4478
4479    // first arg will be list
4480    let mut exprs = Vec::with_capacity(slices.len() + 1);
4481    exprs.push(expr);
4482    // extract (start, end) parts from collected slices
4483    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4484        Ok(match position {
4485            Some(p) => {
4486                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4487            }
4488            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4489        })
4490    };
4491    for p in slices {
4492        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4493        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4494        exprs.push(start);
4495        exprs.push(end);
4496    }
4497
4498    Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4499}
4500
4501fn plan_like(
4502    ecx: &ExprContext,
4503    expr: &Expr<Aug>,
4504    pattern: &Expr<Aug>,
4505    escape: Option<&Expr<Aug>>,
4506    case_insensitive: bool,
4507    not: bool,
4508) -> Result<HirScalarExpr, PlanError> {
4509    use CastContext::Implicit;
4510    let ecx = ecx.with_name("LIKE argument");
4511    let expr = plan_expr(&ecx, expr)?;
4512    let haystack = match ecx.scalar_type(&expr) {
4513        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4514            .type_as(&ecx, ty)?
4515            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4516        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4517    };
4518    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4519    if let Some(escape) = escape {
4520        pattern = pattern.call_binary(
4521            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4522            expr_func::LikeEscape,
4523        );
4524    }
4525    let func: BinaryFunc = if case_insensitive {
4526        expr_func::IsLikeMatchCaseInsensitive.into()
4527    } else {
4528        expr_func::IsLikeMatchCaseSensitive.into()
4529    };
4530    let like = haystack.call_binary(pattern, func);
4531    if not {
4532        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4533    } else {
4534        Ok(like)
4535    }
4536}
4537
4538fn plan_subscript_jsonb(
4539    ecx: &ExprContext,
4540    expr: HirScalarExpr,
4541    positions: &[SubscriptPosition<Aug>],
4542) -> Result<CoercibleScalarExpr, PlanError> {
4543    use CastContext::Implicit;
4544    use SqlScalarType::{Int64, String};
4545
4546    // JSONB doesn't support the slicing syntax, so simply error if you
4547    // encounter any explicit slices.
4548    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4549
4550    let mut exprs = Vec::with_capacity(subscripts.len());
4551    for s in subscripts {
4552        let subscript = plan_expr(ecx, s)?;
4553        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4554            subscript
4555        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4556            // Integers are converted to a string here and then re-parsed as an
4557            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4558            // do it.
4559            typeconv::to_string(ecx, subscript)
4560        } else {
4561            sql_bail!("jsonb subscript type must be coercible to integer or text");
4562        };
4563        exprs.push(subscript);
4564    }
4565
4566    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4567    // `expr->subscript` as you might expect.
4568    let expr = expr.call_binary(
4569        HirScalarExpr::call_variadic(
4570            ArrayCreate {
4571                elem_type: SqlScalarType::String,
4572            },
4573            exprs,
4574        ),
4575        expr_func::JsonbGetPath,
4576    );
4577    Ok(expr.into())
4578}
4579
4580fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4581    if !ecx.allow_subqueries {
4582        sql_bail!("{} does not allow subqueries", ecx.name)
4583    }
4584    let mut qcx = ecx.derived_query_context();
4585    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4586    Ok(expr.exists().into())
4587}
4588
4589fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4590    if !ecx.allow_subqueries {
4591        sql_bail!("{} does not allow subqueries", ecx.name)
4592    }
4593    let mut qcx = ecx.derived_query_context();
4594    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4595    let column_types = qcx.relation_type(&expr).column_types;
4596    if column_types.len() != 1 {
4597        sql_bail!(
4598            "Expected subselect to return 1 column, got {} columns",
4599            column_types.len()
4600        );
4601    }
4602    Ok(expr.select().into())
4603}
4604
4605fn plan_list_subquery(
4606    ecx: &ExprContext,
4607    query: &Query<Aug>,
4608) -> Result<CoercibleScalarExpr, PlanError> {
4609    plan_vector_like_subquery(
4610        ecx,
4611        query,
4612        |_| false,
4613        |elem_type| ListCreate { elem_type }.into(),
4614        |order_by| AggregateFunc::ListConcat { order_by },
4615        expr_func::ListListConcat.into(),
4616        |elem_type| {
4617            HirScalarExpr::literal(
4618                Datum::empty_list(),
4619                SqlScalarType::List {
4620                    element_type: Box::new(elem_type),
4621                    custom_id: None,
4622                },
4623            )
4624        },
4625        "list",
4626    )
4627}
4628
4629fn plan_array_subquery(
4630    ecx: &ExprContext,
4631    query: &Query<Aug>,
4632) -> Result<CoercibleScalarExpr, PlanError> {
4633    plan_vector_like_subquery(
4634        ecx,
4635        query,
4636        |elem_type| {
4637            matches!(
4638                elem_type,
4639                SqlScalarType::Char { .. }
4640                    | SqlScalarType::Array { .. }
4641                    | SqlScalarType::List { .. }
4642                    | SqlScalarType::Map { .. }
4643            )
4644        },
4645        |elem_type| ArrayCreate { elem_type }.into(),
4646        |order_by| AggregateFunc::ArrayConcat { order_by },
4647        expr_func::ArrayArrayConcat.into(),
4648        |elem_type| {
4649            HirScalarExpr::literal(
4650                Datum::empty_array(),
4651                SqlScalarType::Array(Box::new(elem_type)),
4652            )
4653        },
4654        "[]",
4655    )
4656}
4657
4658/// Generic function used to plan both array subqueries and list subqueries
4659fn plan_vector_like_subquery<F1, F2, F3, F4>(
4660    ecx: &ExprContext,
4661    query: &Query<Aug>,
4662    is_unsupported_type: F1,
4663    vector_create: F2,
4664    aggregate_concat: F3,
4665    binary_concat: BinaryFunc,
4666    empty_literal: F4,
4667    vector_type_string: &str,
4668) -> Result<CoercibleScalarExpr, PlanError>
4669where
4670    F1: Fn(&SqlScalarType) -> bool,
4671    F2: Fn(SqlScalarType) -> VariadicFunc,
4672    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4673    F4: Fn(SqlScalarType) -> HirScalarExpr,
4674{
4675    if !ecx.allow_subqueries {
4676        sql_bail!("{} does not allow subqueries", ecx.name)
4677    }
4678
4679    let mut qcx = ecx.derived_query_context();
4680    let mut planned_query = plan_query(&mut qcx, query)?;
4681    if planned_query.limit.is_some()
4682        || !planned_query
4683            .offset
4684            .clone()
4685            .try_into_literal_int64()
4686            .is_ok_and(|offset| offset == 0)
4687    {
4688        planned_query.expr = HirRelationExpr::top_k(
4689            planned_query.expr,
4690            vec![],
4691            planned_query.order_by.clone(),
4692            planned_query.limit,
4693            planned_query.offset,
4694            planned_query.group_size_hints.limit_input_group_size,
4695        );
4696    }
4697
4698    if planned_query.project.len() != 1 {
4699        sql_bail!(
4700            "Expected subselect to return 1 column, got {} columns",
4701            planned_query.project.len()
4702        );
4703    }
4704
4705    let project_column = *planned_query.project.get(0).unwrap();
4706    let elem_type = qcx
4707        .relation_type(&planned_query.expr)
4708        .column_types
4709        .get(project_column)
4710        .cloned()
4711        .unwrap()
4712        .scalar_type();
4713
4714    if is_unsupported_type(&elem_type) {
4715        bail_unsupported!(format!(
4716            "cannot build array from subquery because return type {}{}",
4717            ecx.humanize_sql_scalar_type(&elem_type, false),
4718            vector_type_string
4719        ));
4720    }
4721
4722    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4723    // subquery above.
4724    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4725        vector_create(elem_type.clone()),
4726        vec![HirScalarExpr::column(project_column)],
4727    ))
4728    .chain(
4729        planned_query
4730            .order_by
4731            .iter()
4732            .map(|co| HirScalarExpr::column(co.column)),
4733    )
4734    .collect();
4735
4736    // However, column references for `aggregation_projection` and `aggregation_order_by`
4737    // are with reference to the `exprs` of the aggregation expression.  Here that is
4738    // `aggregation_exprs`.
4739    let aggregation_projection = vec![0];
4740    let aggregation_order_by = planned_query
4741        .order_by
4742        .into_iter()
4743        .enumerate()
4744        .map(|(i, order)| ColumnOrder { column: i, ..order })
4745        .collect();
4746
4747    let reduced_expr = planned_query
4748        .expr
4749        .reduce(
4750            vec![],
4751            vec![AggregateExpr {
4752                func: aggregate_concat(aggregation_order_by),
4753                expr: Box::new(HirScalarExpr::call_variadic(
4754                    RecordCreate {
4755                        field_names: iter::repeat(ColumnName::from(""))
4756                            .take(aggregation_exprs.len())
4757                            .collect(),
4758                    },
4759                    aggregation_exprs,
4760                )),
4761                distinct: false,
4762            }],
4763            None,
4764        )
4765        .project(aggregation_projection);
4766
4767    // If `expr` has no rows, return an empty array/list rather than NULL.
4768    Ok(reduced_expr
4769        .select()
4770        .call_binary(empty_literal(elem_type), binary_concat)
4771        .into())
4772}
4773
4774fn plan_map_subquery(
4775    ecx: &ExprContext,
4776    query: &Query<Aug>,
4777) -> Result<CoercibleScalarExpr, PlanError> {
4778    if !ecx.allow_subqueries {
4779        sql_bail!("{} does not allow subqueries", ecx.name)
4780    }
4781
4782    let mut qcx = ecx.derived_query_context();
4783    let mut query = plan_query(&mut qcx, query)?;
4784    if query.limit.is_some()
4785        || !query
4786            .offset
4787            .clone()
4788            .try_into_literal_int64()
4789            .is_ok_and(|offset| offset == 0)
4790    {
4791        query.expr = HirRelationExpr::top_k(
4792            query.expr,
4793            vec![],
4794            query.order_by.clone(),
4795            query.limit,
4796            query.offset,
4797            query.group_size_hints.limit_input_group_size,
4798        );
4799    }
4800    if query.project.len() != 2 {
4801        sql_bail!(
4802            "expected map subquery to return 2 columns, got {} columns",
4803            query.project.len()
4804        );
4805    }
4806
4807    let query_types = qcx.relation_type(&query.expr).column_types;
4808    let key_column = query.project[0];
4809    let key_type = query_types[key_column].clone().scalar_type();
4810    let value_column = query.project[1];
4811    let value_type = query_types[value_column].clone().scalar_type();
4812
4813    if key_type != SqlScalarType::String {
4814        sql_bail!("cannot build map from subquery because first column is not of type text");
4815    }
4816
4817    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4818        RecordCreate {
4819            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4820        },
4821        vec![
4822            HirScalarExpr::column(key_column),
4823            HirScalarExpr::column(value_column),
4824        ],
4825    ))
4826    .chain(
4827        query
4828            .order_by
4829            .iter()
4830            .map(|co| HirScalarExpr::column(co.column)),
4831    )
4832    .collect();
4833
4834    let expr = query
4835        .expr
4836        .reduce(
4837            vec![],
4838            vec![AggregateExpr {
4839                func: AggregateFunc::MapAgg {
4840                    order_by: query
4841                        .order_by
4842                        .into_iter()
4843                        .enumerate()
4844                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4845                        .collect(),
4846                    value_type: value_type.clone(),
4847                },
4848                expr: Box::new(HirScalarExpr::call_variadic(
4849                    RecordCreate {
4850                        field_names: iter::repeat(ColumnName::from(""))
4851                            .take(aggregation_exprs.len())
4852                            .collect(),
4853                    },
4854                    aggregation_exprs,
4855                )),
4856                distinct: false,
4857            }],
4858            None,
4859        )
4860        .project(vec![0]);
4861
4862    // If `expr` has no rows, return an empty map rather than NULL.
4863    let expr = HirScalarExpr::call_variadic(
4864        Coalesce,
4865        vec![
4866            expr.select(),
4867            HirScalarExpr::literal(
4868                Datum::empty_map(),
4869                SqlScalarType::Map {
4870                    value_type: Box::new(value_type),
4871                    custom_id: None,
4872                },
4873            ),
4874        ],
4875    );
4876
4877    Ok(expr.into())
4878}
4879
4880fn plan_collate(
4881    ecx: &ExprContext,
4882    expr: &Expr<Aug>,
4883    collation: &UnresolvedItemName,
4884) -> Result<CoercibleScalarExpr, PlanError> {
4885    if collation.0.len() == 2
4886        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4887        && collation.0[1] == ident!("default")
4888    {
4889        plan_expr(ecx, expr)
4890    } else {
4891        bail_unsupported!("COLLATE");
4892    }
4893}
4894
4895/// Plans a slice of expressions.
4896///
4897/// This function is a simple convenience function for mapping [`plan_expr`]
4898/// over a slice of expressions. The planned expressions are returned in the
4899/// same order as the input. If any of the expressions fail to plan, returns an
4900/// error instead.
4901fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4902where
4903    E: std::borrow::Borrow<Expr<Aug>>,
4904{
4905    let mut out = vec![];
4906    for expr in exprs {
4907        out.push(plan_expr(ecx, expr.borrow())?);
4908    }
4909    Ok(out)
4910}
4911
4912/// Plans an `ARRAY` expression.
4913fn plan_array(
4914    ecx: &ExprContext,
4915    exprs: &[Expr<Aug>],
4916    type_hint: Option<&SqlScalarType>,
4917) -> Result<CoercibleScalarExpr, PlanError> {
4918    // Plan each element expression.
4919    let mut out = vec![];
4920    for expr in exprs {
4921        out.push(match expr {
4922            // Special case nested ARRAY expressions so we can plumb
4923            // the type hint through.
4924            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4925            _ => plan_expr(ecx, expr)?,
4926        });
4927    }
4928
4929    // Attempt to make use of the type hint.
4930    let type_hint = match type_hint {
4931        // The user has provided an explicit cast to an array type. We know the
4932        // element type to coerce to. Need to be careful, though: if there's
4933        // evidence that any of the array elements are themselves arrays, we
4934        // want to coerce to the array type, not the element type.
4935        Some(SqlScalarType::Array(elem_type)) => {
4936            let multidimensional = out.iter().any(|e| {
4937                matches!(
4938                    ecx.scalar_type(e),
4939                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4940                )
4941            });
4942            if multidimensional {
4943                type_hint
4944            } else {
4945                Some(&**elem_type)
4946            }
4947        }
4948        // The user provided an explicit cast to a non-array type. We'll have to
4949        // guess what the correct type for the array. Our caller will then
4950        // handle converting that array type to the desired non-array type.
4951        Some(_) => None,
4952        // No type hint. We'll have to guess the correct type for the array.
4953        None => None,
4954    };
4955
4956    // Coerce all elements to the same type.
4957    let (elem_type, exprs) = if exprs.is_empty() {
4958        if let Some(elem_type) = type_hint {
4959            (elem_type.clone(), vec![])
4960        } else {
4961            sql_bail!("cannot determine type of empty array");
4962        }
4963    } else {
4964        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4965        (ecx.scalar_type(&out[0]), out)
4966    };
4967
4968    // Arrays of `char` type are disallowed due to a known limitation:
4969    // https://github.com/MaterializeInc/database-issues/issues/2360.
4970    //
4971    // Arrays of `list` and `map` types are disallowed due to mind-bending
4972    // semantics.
4973    if matches!(
4974        elem_type,
4975        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4976    ) {
4977        bail_unsupported!(format!(
4978            "{}[]",
4979            ecx.humanize_sql_scalar_type(&elem_type, false)
4980        ));
4981    }
4982
4983    Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4984}
4985
4986fn plan_list(
4987    ecx: &ExprContext,
4988    exprs: &[Expr<Aug>],
4989    type_hint: Option<&SqlScalarType>,
4990) -> Result<CoercibleScalarExpr, PlanError> {
4991    let (elem_type, exprs) = if exprs.is_empty() {
4992        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
4993            (element_type.without_modifiers(), vec![])
4994        } else {
4995            sql_bail!("cannot determine type of empty list");
4996        }
4997    } else {
4998        let type_hint = match type_hint {
4999            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5000            _ => None,
5001        };
5002
5003        let mut out = vec![];
5004        for expr in exprs {
5005            out.push(match expr {
5006                // Special case nested LIST expressions so we can plumb
5007                // the type hint through.
5008                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5009                _ => plan_expr(ecx, expr)?,
5010            });
5011        }
5012        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5013        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5014    };
5015
5016    if matches!(elem_type, SqlScalarType::Char { .. }) {
5017        bail_unsupported!("char list");
5018    }
5019
5020    Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5021}
5022
5023fn plan_map(
5024    ecx: &ExprContext,
5025    entries: &[MapEntry<Aug>],
5026    type_hint: Option<&SqlScalarType>,
5027) -> Result<CoercibleScalarExpr, PlanError> {
5028    let (value_type, exprs) = if entries.is_empty() {
5029        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5030            (value_type.without_modifiers(), vec![])
5031        } else {
5032            sql_bail!("cannot determine type of empty map");
5033        }
5034    } else {
5035        let type_hint = match type_hint {
5036            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5037            _ => None,
5038        };
5039
5040        let mut keys = vec![];
5041        let mut values = vec![];
5042        for MapEntry { key, value } in entries {
5043            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5044            let value = match value {
5045                // Special case nested MAP expressions so we can plumb
5046                // the type hint through.
5047                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5048                _ => plan_expr(ecx, value)?,
5049            };
5050            keys.push(key);
5051            values.push(value);
5052        }
5053        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5054        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5055        let out = itertools::interleave(keys, values).collect();
5056        (value_type, out)
5057    };
5058
5059    if matches!(value_type, SqlScalarType::Char { .. }) {
5060        bail_unsupported!("char map");
5061    }
5062
5063    let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5064    Ok(expr.into())
5065}
5066
5067/// Coerces a list of expressions such that all input expressions will be cast
5068/// to the same type. If successful, returns a new list of expressions in the
5069/// same order as the input, where each expression has the appropriate casts to
5070/// make them all of a uniform type.
5071///
5072/// If `force_type` is `Some`, the expressions are forced to the specified type
5073/// via an explicit cast. Otherwise the best common type is guessed via
5074/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5075/// implicit casts
5076///
5077/// Note that this is our implementation of Postgres' type conversion for
5078/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5079/// isn't yet used in all of those cases.
5080///
5081/// [union-type-conv]:
5082/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5083pub fn coerce_homogeneous_exprs(
5084    ecx: &ExprContext,
5085    exprs: Vec<CoercibleScalarExpr>,
5086    force_type: Option<&SqlScalarType>,
5087) -> Result<Vec<HirScalarExpr>, PlanError> {
5088    assert!(!exprs.is_empty());
5089
5090    let target_holder;
5091    let target = match force_type {
5092        Some(t) => t,
5093        None => {
5094            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5095            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5096            &target_holder
5097        }
5098    };
5099
5100    // Try to cast all expressions to `target`.
5101    let mut out = Vec::new();
5102    for expr in exprs {
5103        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5104        let ccx = match force_type {
5105            None => CastContext::Implicit,
5106            Some(_) => CastContext::Explicit,
5107        };
5108        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5109            Ok(expr) => out.push(expr),
5110            Err(_) => sql_bail!(
5111                "{} could not convert type {} to {}",
5112                ecx.name,
5113                ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5114                ecx.humanize_sql_scalar_type(target, false),
5115            ),
5116        }
5117    }
5118    Ok(out)
5119}
5120
5121/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5122/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5123pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5124    obe: &OrderByExpr<T>,
5125    column: usize,
5126) -> ColumnOrder {
5127    let desc = !obe.asc.unwrap_or(true);
5128    ColumnOrder {
5129        column,
5130        desc,
5131        // https://www.postgresql.org/docs/14/queries-order.html
5132        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5133        nulls_last: obe.nulls_last.unwrap_or(!desc),
5134    }
5135}
5136
5137/// Plans the ORDER BY clause of a window function.
5138///
5139/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5140/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5141/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5142/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5143/// `Vec<HirScalarExpr>` that we return.
5144fn plan_function_order_by(
5145    ecx: &ExprContext,
5146    order_by: &[OrderByExpr<Aug>],
5147) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5148    let mut order_by_exprs = vec![];
5149    let mut col_orders = vec![];
5150    {
5151        for (i, obe) in order_by.iter().enumerate() {
5152            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5153            // do not support ordinal references in PostgreSQL. So we use
5154            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5155            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5156            order_by_exprs.push(expr);
5157            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5158        }
5159    }
5160    Ok((order_by_exprs, col_orders))
5161}
5162
5163/// Returns a human-readable rendering of `name`, falling back to a debug
5164/// dump of the `ResolvedItemName` if humanization fails. Used to construct
5165/// user-facing error messages on already-failing paths, where we'd rather
5166/// surface the raw resolved name than emit a useless `<unknown>` placeholder.
5167fn humanize_or_debug(scx: &StatementContext, name: &ResolvedItemName) -> String {
5168    scx.humanize_resolved_name(name)
5169        .map(|n| n.to_string())
5170        .unwrap_or_else(|_| format!("<error when trying to humanize `{name:?}`>"))
5171}
5172
5173/// Common part of the planning of windowed and non-windowed aggregation functions.
5174fn plan_aggregate_common(
5175    ecx: &ExprContext,
5176    Function::<Aug> {
5177        name,
5178        args,
5179        filter,
5180        over: _,
5181        distinct,
5182    }: &Function<Aug>,
5183) -> Result<AggregateExpr, PlanError> {
5184    // Normal aggregate functions, like `sum`, expect as input a single expression
5185    // which yields the datum to aggregate. Order sensitive aggregate functions,
5186    // like `jsonb_agg`, are special, and instead expect a Record whose first
5187    // element yields the datum to aggregate and whose successive elements yield
5188    // keys to order by. This expectation is hard coded within the implementation
5189    // of each of the order-sensitive aggregates. The specification of how many
5190    // order by keys to consider, and in what order, is passed via the `order_by`
5191    // field on the `AggregateFunc` variant.
5192
5193    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5194    // most, so explicitly drop it if the function doesn't care about order. This
5195    // prevents the projection into Record below from triggering on unsupported
5196    // functions.
5197
5198    let impls = match resolve_func(ecx, name, args)? {
5199        Func::Aggregate(impls) => impls,
5200        _ => bail_internal!("plan_aggregate_common called on non-aggregate function"),
5201    };
5202
5203    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5204    // generalized function selection framework. The rule is simple: the user
5205    // must type `count(*)`, but the function selection framework sees an empty
5206    // parameter list, as if the user had typed `count()`. But if the user types
5207    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5208    // rules to all aggregates, not just `count`, since we may one day support
5209    // user-defined aggregates, including user-defined aggregates that take no
5210    // parameters.
5211    let (args, order_by) = match &args {
5212        FunctionArgs::Star => (vec![], vec![]),
5213        FunctionArgs::Args { args, order_by } => {
5214            if args.is_empty() {
5215                sql_bail!(
5216                    "{}(*) must be used to call a parameterless aggregate function",
5217                    humanize_or_debug(ecx.qcx.scx, name)
5218                );
5219            }
5220            let args = plan_exprs(ecx, args)?;
5221            (args, order_by.clone())
5222        }
5223    };
5224
5225    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5226
5227    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5228    if let Some(filter) = &filter {
5229        // If a filter is present, as in
5230        //
5231        //     <agg>(<expr>) FILTER (WHERE <cond>)
5232        //
5233        // we plan it by essentially rewriting the expression to
5234        //
5235        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5236        //
5237        // where <identity> is the identity input for <agg>.
5238        let cond =
5239            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5240        let expr_typ = ecx.scalar_type(&expr);
5241        expr = HirScalarExpr::if_then_else(
5242            cond,
5243            expr,
5244            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5245        );
5246    }
5247
5248    let mut seen_outer = false;
5249    let mut seen_inner = false;
5250    #[allow(deprecated)]
5251    expr.visit_columns(0, &mut |depth, col| {
5252        if depth == 0 && col.level == 0 {
5253            seen_inner = true;
5254        } else if col.level > depth {
5255            seen_outer = true;
5256        }
5257    });
5258    if seen_outer && !seen_inner {
5259        bail_unsupported!(
5260            3720,
5261            "aggregate functions that refer exclusively to outer columns"
5262        );
5263    }
5264
5265    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5266    // map the needed expressions into the aggregate datum.
5267    if func.is_order_sensitive() {
5268        let field_names = iter::repeat(ColumnName::from(""))
5269            .take(1 + order_by_exprs.len())
5270            .collect();
5271        let mut exprs = vec![expr];
5272        exprs.extend(order_by_exprs);
5273        expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5274    }
5275
5276    Ok(AggregateExpr {
5277        func,
5278        expr: Box::new(expr),
5279        distinct: *distinct,
5280    })
5281}
5282
5283fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5284    let mut names = names.to_vec();
5285    // The parser guarantees that an `Expr::Identifier` is constructed with a
5286    // non-empty list of name parts, so an empty list here is an internal bug.
5287    let Some(last) = names.pop() else {
5288        bail_internal!("empty identifier");
5289    };
5290    let col_name = normalize::column_name(last);
5291
5292    // If the name is qualified, it must refer to a column in a table.
5293    if !names.is_empty() {
5294        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5295        let (i, i_name) = ecx.scope.resolve_table_column(
5296            &ecx.qcx.outer_scopes,
5297            &table_name,
5298            &col_name,
5299            &mut ecx.qcx.name_manager.borrow_mut(),
5300        )?;
5301        return Ok(HirScalarExpr::named_column(i, i_name));
5302    }
5303
5304    // If the name is unqualified, first check if it refers to a column. Track any similar names
5305    // that might exist for a better error message.
5306    let similar_names = match ecx.scope.resolve_column(
5307        &ecx.qcx.outer_scopes,
5308        &col_name,
5309        &mut ecx.qcx.name_manager.borrow_mut(),
5310    ) {
5311        Ok((i, i_name)) => {
5312            return Ok(HirScalarExpr::named_column(i, i_name));
5313        }
5314        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5315        Err(e) => return Err(e),
5316    };
5317
5318    // The name doesn't refer to a column. Check if it is a whole-row reference
5319    // to a table.
5320    let items = ecx.scope.items_from_table(
5321        &ecx.qcx.outer_scopes,
5322        &PartialItemName {
5323            database: None,
5324            schema: None,
5325            item: col_name.as_str().to_owned(),
5326        },
5327    )?;
5328    match items.as_slice() {
5329        // The name doesn't refer to a table either. Return an error.
5330        [] => Err(PlanError::UnknownColumn {
5331            table: None,
5332            column: col_name,
5333            similar: similar_names,
5334        }),
5335        // The name refers to a table that is the result of a function that
5336        // returned a single column. Per PostgreSQL, this is a special case
5337        // that returns the value directly.
5338        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5339        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5340            *column,
5341            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5342        )),
5343        // The name refers to a normal table. Return a record containing all the
5344        // columns of the table.
5345        _ => {
5346            let mut has_exists_column = None;
5347            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5348                .into_iter()
5349                .filter_map(|(column, item)| {
5350                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5351                        has_exists_column = Some(column);
5352                        None
5353                    } else {
5354                        let expr = HirScalarExpr::named_column(
5355                            column,
5356                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5357                        );
5358                        let name = item.column_name.clone();
5359                        Some((expr, name))
5360                    }
5361                })
5362                .unzip();
5363            // For the special case of a table function with a single column, the single column is instead not wrapped.
5364            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5365                exprs.into_element()
5366            } else {
5367                HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5368            };
5369            if let Some(has_exists_column) = has_exists_column {
5370                Ok(HirScalarExpr::if_then_else(
5371                    HirScalarExpr::unnamed_column(has_exists_column)
5372                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5373                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5374                    expr,
5375                ))
5376            } else {
5377                Ok(expr)
5378            }
5379        }
5380    }
5381}
5382
5383fn plan_op(
5384    ecx: &ExprContext,
5385    op: &str,
5386    expr1: &Expr<Aug>,
5387    expr2: Option<&Expr<Aug>>,
5388) -> Result<HirScalarExpr, PlanError> {
5389    let impls = func::resolve_op(op)?;
5390    let args = match expr2 {
5391        None => plan_exprs(ecx, &[expr1])?,
5392        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5393    };
5394    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5395}
5396
5397fn plan_function<'a>(
5398    ecx: &ExprContext,
5399    f @ Function {
5400        name,
5401        args,
5402        filter,
5403        over,
5404        distinct,
5405    }: &'a Function<Aug>,
5406) -> Result<HirScalarExpr, PlanError> {
5407    let impls = match resolve_func(ecx, name, args)? {
5408        Func::Table(_) => {
5409            sql_bail!(
5410                "table functions are not allowed in {} (function {})",
5411                ecx.name,
5412                name
5413            );
5414        }
5415        Func::Scalar(impls) => {
5416            if over.is_some() {
5417                sql_bail!(
5418                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5419                );
5420            }
5421            impls
5422        }
5423        Func::ScalarWindow(impls) => {
5424            let (
5425                ignore_nulls,
5426                order_by_exprs,
5427                col_orders,
5428                _window_frame,
5429                partition_by,
5430                scalar_args,
5431            ) = plan_window_function_non_aggr(ecx, f)?;
5432
5433            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5434            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5435            // msg there is less informative.)
5436            if !scalar_args.is_empty() {
5437                if let ResolvedItemName::Item {
5438                    full_name: FullItemName { item, .. },
5439                    ..
5440                } = name
5441                {
5442                    sql_bail!(
5443                        "function {} has 0 parameters, but was called with {}",
5444                        item,
5445                        scalar_args.len()
5446                    );
5447                }
5448            }
5449
5450            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5451            // accept a window frame here without an error msg. (Postgres also does this.)
5452            // TODO: maybe we should give a notice
5453
5454            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5455
5456            if ignore_nulls {
5457                // If we ever add a scalar window function that supports ignore, then don't forget
5458                // to also update HIR EXPLAIN.
5459                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5460            }
5461
5462            return Ok(HirScalarExpr::windowing(WindowExpr {
5463                func: WindowExprType::Scalar(ScalarWindowExpr {
5464                    func,
5465                    order_by: col_orders,
5466                }),
5467                partition_by,
5468                order_by: order_by_exprs,
5469            }));
5470        }
5471        Func::ValueWindow(impls) => {
5472            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5473            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5474                window_plan;
5475
5476            let (args_encoded, func) =
5477                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5478
5479            if ignore_nulls {
5480                match func {
5481                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5482                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5483                }
5484            }
5485
5486            return Ok(HirScalarExpr::windowing(WindowExpr {
5487                func: WindowExprType::Value(ValueWindowExpr {
5488                    func,
5489                    args: Box::new(args_encoded),
5490                    order_by: col_orders,
5491                    window_frame,
5492                    ignore_nulls, // (RESPECT NULLS is the default)
5493                }),
5494                partition_by,
5495                order_by: order_by_exprs,
5496            }));
5497        }
5498        Func::Aggregate(_) => {
5499            if f.over.is_none() {
5500                // Not a window aggregate. Something is wrong.
5501                if ecx.allow_aggregates {
5502                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5503                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5504                    sql_bail!(
5505                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5506                        name,
5507                    );
5508                } else {
5509                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5510                    // because it was in an unsupported context.
5511                    sql_bail!(
5512                        "aggregate functions are not allowed in {} (function {})",
5513                        ecx.name,
5514                        name
5515                    );
5516                }
5517            } else {
5518                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5519                    plan_window_function_common(ecx, &f.name, &f.over)?;
5520
5521                // https://github.com/MaterializeInc/database-issues/issues/6720
5522                match (&window_frame.start_bound, &window_frame.end_bound) {
5523                    (
5524                        mz_expr::WindowFrameBound::UnboundedPreceding,
5525                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5526                    )
5527                    | (
5528                        mz_expr::WindowFrameBound::UnboundedPreceding,
5529                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5530                    )
5531                    | (
5532                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5533                        mz_expr::WindowFrameBound::UnboundedFollowing,
5534                    )
5535                    | (
5536                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5537                        mz_expr::WindowFrameBound::UnboundedFollowing,
5538                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5539                    (_, _) => {} // other cases are ok
5540                }
5541
5542                if ignore_nulls {
5543                    // https://github.com/MaterializeInc/database-issues/issues/6722
5544                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5545                    // forget to also update HIR EXPLAIN.
5546                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5547                }
5548
5549                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5550
5551                if aggregate_expr.distinct {
5552                    // https://github.com/MaterializeInc/database-issues/issues/6626
5553                    bail_unsupported!("DISTINCT in window aggregates");
5554                }
5555
5556                return Ok(HirScalarExpr::windowing(WindowExpr {
5557                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5558                        aggregate_expr,
5559                        order_by: col_orders,
5560                        window_frame,
5561                    }),
5562                    partition_by,
5563                    order_by: order_by_exprs,
5564                }));
5565            }
5566        }
5567    };
5568
5569    if over.is_some() {
5570        bail_internal!("OVER clause should have been handled by the window function path above");
5571    }
5572
5573    if *distinct {
5574        sql_bail!(
5575            "DISTINCT specified, but {} is not an aggregate function",
5576            humanize_or_debug(ecx.qcx.scx, name)
5577        );
5578    }
5579    if filter.is_some() {
5580        sql_bail!(
5581            "FILTER specified, but {} is not an aggregate function",
5582            humanize_or_debug(ecx.qcx.scx, name)
5583        );
5584    }
5585
5586    let scalar_args = match &args {
5587        FunctionArgs::Star => {
5588            sql_bail!(
5589                "* argument is invalid with non-aggregate function {}",
5590                humanize_or_debug(ecx.qcx.scx, name)
5591            )
5592        }
5593        FunctionArgs::Args { args, order_by } => {
5594            if !order_by.is_empty() {
5595                sql_bail!(
5596                    "ORDER BY specified, but {} is not an aggregate function",
5597                    humanize_or_debug(ecx.qcx.scx, name)
5598                );
5599            }
5600            plan_exprs(ecx, args)?
5601        }
5602    };
5603
5604    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5605}
5606
5607pub const IGNORE_NULLS_ERROR_MSG: &str =
5608    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5609
5610/// Resolves the name to a set of function implementations.
5611///
5612/// If the name does not specify a known built-in function, returns an error.
5613pub fn resolve_func(
5614    ecx: &ExprContext,
5615    name: &ResolvedItemName,
5616    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5617) -> Result<&'static Func, PlanError> {
5618    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5619        if let Ok(f) = i.func() {
5620            return Ok(f);
5621        }
5622    }
5623
5624    // Couldn't resolve function with this name, so generate verbose error
5625    // message.
5626    let cexprs = match args {
5627        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5628        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5629            if !order_by.is_empty() {
5630                sql_bail!(
5631                    "ORDER BY specified, but {} is not an aggregate function",
5632                    name
5633                );
5634            }
5635            plan_exprs(ecx, args)?
5636        }
5637    };
5638
5639    let arg_types: Vec<_> = cexprs
5640        .into_iter()
5641        .map(|ty| match ecx.scalar_type(&ty) {
5642            CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5643            CoercibleScalarType::Record(_) => "record".to_string(),
5644            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5645        })
5646        .collect();
5647
5648    Err(PlanError::UnknownFunction {
5649        name: name.to_string(),
5650        arg_types,
5651    })
5652}
5653
5654fn plan_is_expr<'a>(
5655    ecx: &ExprContext,
5656    expr: &'a Expr<Aug>,
5657    construct: &IsExprConstruct<Aug>,
5658    not: bool,
5659) -> Result<HirScalarExpr, PlanError> {
5660    let expr_hir = plan_expr(ecx, expr)?;
5661
5662    let mut result = match construct {
5663        IsExprConstruct::Null => {
5664            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5665            // at odds with our type coercion rules, which treat `NULL` literals
5666            // and unconstrained parameters identically. Providing a type hint
5667            // of string means we wind up supporting both.
5668            expr_hir.type_as_any(ecx)?.call_is_null()
5669        }
5670        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5671        IsExprConstruct::True => expr_hir
5672            .type_as(ecx, &SqlScalarType::Bool)?
5673            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5674        IsExprConstruct::False => expr_hir
5675            .type_as(ecx, &SqlScalarType::Bool)?
5676            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5677        IsExprConstruct::DistinctFrom(expr2) => {
5678            // There are three cases:
5679            // 1. Both terms are non-null, in which case the result should be `a != b`.
5680            // 2. Exactly one term is null, in which case the result should be true.
5681            // 3. Both terms are null, in which case the result should be false.
5682            //
5683            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5684
5685            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5686            // construct an AST expression for `expr != expr2` and plan it to get proper type
5687            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5688            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5689            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5690
5691            let expr1_hir = expr_hir.type_as_any(ecx)?;
5692            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5693
5694            let term1 = HirScalarExpr::variadic_or(vec![
5695                ne_hir,
5696                expr1_hir.clone().call_is_null(),
5697                expr2_hir.clone().call_is_null(),
5698            ]);
5699            let term2 = HirScalarExpr::variadic_or(vec![
5700                expr1_hir.call_is_null().not(),
5701                expr2_hir.call_is_null().not(),
5702            ]);
5703            term1.and(term2)
5704        }
5705    };
5706    if not {
5707        result = result.not();
5708    }
5709    Ok(result)
5710}
5711
5712fn plan_case<'a>(
5713    ecx: &ExprContext,
5714    operand: &'a Option<Box<Expr<Aug>>>,
5715    conditions: &'a [Expr<Aug>],
5716    results: &'a [Expr<Aug>],
5717    else_result: &'a Option<Box<Expr<Aug>>>,
5718) -> Result<HirScalarExpr, PlanError> {
5719    let mut cond_exprs = Vec::new();
5720    let mut result_exprs = Vec::new();
5721    for (c, r) in conditions.iter().zip_eq(results) {
5722        let c = match operand {
5723            Some(operand) => operand.clone().equals(c.clone()),
5724            None => c.clone(),
5725        };
5726        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5727        cond_exprs.push(cexpr);
5728        result_exprs.push(r);
5729    }
5730    result_exprs.push(match else_result {
5731        Some(else_result) => else_result,
5732        None => &Expr::Value(Value::Null),
5733    });
5734    let mut result_exprs = coerce_homogeneous_exprs(
5735        &ecx.with_name("CASE"),
5736        plan_exprs(ecx, &result_exprs)?,
5737        None,
5738    )?;
5739    let mut expr = result_exprs.pop().unwrap();
5740    assert_eq!(cond_exprs.len(), result_exprs.len());
5741    for (cexpr, rexpr) in cond_exprs
5742        .into_iter()
5743        .rev()
5744        .zip_eq(result_exprs.into_iter().rev())
5745    {
5746        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5747    }
5748    Ok(expr)
5749}
5750
5751fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5752    let (datum, scalar_type) = match l {
5753        Value::Number(s) => {
5754            let d = strconv::parse_numeric(s.as_str())?;
5755            if !s.contains(&['E', '.'][..]) {
5756                // Maybe representable as an int?
5757                if let Ok(n) = d.0.try_into() {
5758                    (Datum::Int32(n), SqlScalarType::Int32)
5759                } else if let Ok(n) = d.0.try_into() {
5760                    (Datum::Int64(n), SqlScalarType::Int64)
5761                } else {
5762                    (
5763                        Datum::Numeric(d),
5764                        SqlScalarType::Numeric { max_scale: None },
5765                    )
5766                }
5767            } else {
5768                (
5769                    Datum::Numeric(d),
5770                    SqlScalarType::Numeric { max_scale: None },
5771                )
5772            }
5773        }
5774        Value::HexString(_) => bail_unsupported!("hex string literals"),
5775        Value::Boolean(b) => match b {
5776            false => (Datum::False, SqlScalarType::Bool),
5777            true => (Datum::True, SqlScalarType::Bool),
5778        },
5779        Value::Interval(i) => {
5780            let i = literal::plan_interval(i)?;
5781            (Datum::Interval(i), SqlScalarType::Interval)
5782        }
5783        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5784        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5785    };
5786    let expr = HirScalarExpr::literal(datum, scalar_type);
5787    Ok(expr.into())
5788}
5789
5790/// The common part of the planning of non-aggregate window functions, i.e.,
5791/// scalar window functions and value window functions.
5792fn plan_window_function_non_aggr<'a>(
5793    ecx: &ExprContext,
5794    Function {
5795        name,
5796        args,
5797        filter,
5798        over,
5799        distinct,
5800    }: &'a Function<Aug>,
5801) -> Result<
5802    (
5803        bool,
5804        Vec<HirScalarExpr>,
5805        Vec<ColumnOrder>,
5806        mz_expr::WindowFrame,
5807        Vec<HirScalarExpr>,
5808        Vec<CoercibleScalarExpr>,
5809    ),
5810    PlanError,
5811> {
5812    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5813        plan_window_function_common(ecx, name, over)?;
5814
5815    if *distinct {
5816        sql_bail!(
5817            "DISTINCT specified, but {} is not an aggregate function",
5818            name
5819        );
5820    }
5821
5822    if filter.is_some() {
5823        bail_unsupported!("FILTER in non-aggregate window functions");
5824    }
5825
5826    let scalar_args = match &args {
5827        FunctionArgs::Star => {
5828            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5829        }
5830        FunctionArgs::Args { args, order_by } => {
5831            if !order_by.is_empty() {
5832                sql_bail!(
5833                    "ORDER BY specified, but {} is not an aggregate function",
5834                    name
5835                );
5836            }
5837            plan_exprs(ecx, args)?
5838        }
5839    };
5840
5841    Ok((
5842        ignore_nulls,
5843        order_by_exprs,
5844        col_orders,
5845        window_frame,
5846        partition,
5847        scalar_args,
5848    ))
5849}
5850
5851/// The common part of the planning of all window functions.
5852fn plan_window_function_common(
5853    ecx: &ExprContext,
5854    name: &<Aug as AstInfo>::ItemName,
5855    over: &Option<WindowSpec<Aug>>,
5856) -> Result<
5857    (
5858        bool,
5859        Vec<HirScalarExpr>,
5860        Vec<ColumnOrder>,
5861        mz_expr::WindowFrame,
5862        Vec<HirScalarExpr>,
5863    ),
5864    PlanError,
5865> {
5866    if !ecx.allow_windows {
5867        sql_bail!(
5868            "window functions are not allowed in {} (function {})",
5869            ecx.name,
5870            name
5871        );
5872    }
5873
5874    let window_spec = match over.as_ref() {
5875        Some(over) => over,
5876        None => sql_bail!("window function {} requires an OVER clause", name),
5877    };
5878    if window_spec.ignore_nulls && window_spec.respect_nulls {
5879        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5880    }
5881    let window_frame = match window_spec.window_frame.as_ref() {
5882        Some(frame) => plan_window_frame(frame)?,
5883        None => mz_expr::WindowFrame::default(),
5884    };
5885    let mut partition = Vec::new();
5886    for expr in &window_spec.partition_by {
5887        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5888    }
5889
5890    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5891
5892    Ok((
5893        window_spec.ignore_nulls,
5894        order_by_exprs,
5895        col_orders,
5896        window_frame,
5897        partition,
5898    ))
5899}
5900
5901fn plan_window_frame(
5902    WindowFrame {
5903        units,
5904        start_bound,
5905        end_bound,
5906    }: &WindowFrame,
5907) -> Result<mz_expr::WindowFrame, PlanError> {
5908    use mz_expr::WindowFrameBound::*;
5909    let units = window_frame_unit_ast_to_expr(units)?;
5910    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5911    let end_bound = end_bound
5912        .as_ref()
5913        .map(window_frame_bound_ast_to_expr)
5914        .unwrap_or(CurrentRow);
5915
5916    // Validate bounds according to Postgres rules
5917    match (&start_bound, &end_bound) {
5918        // Start bound can't be UNBOUNDED FOLLOWING
5919        (UnboundedFollowing, _) => {
5920            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5921        }
5922        // End bound can't be UNBOUNDED PRECEDING
5923        (_, UnboundedPreceding) => {
5924            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5925        }
5926        // Start bound should come before end bound in the list of bound definitions
5927        (CurrentRow, OffsetPreceding(_)) => {
5928            sql_bail!("frame starting from current row cannot have preceding rows")
5929        }
5930        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5931            sql_bail!("frame starting from following row cannot have preceding rows")
5932        }
5933        // The above rules are adopted from Postgres.
5934        // The following rules are Materialize-specific.
5935        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5936            // Note that the only hard limit is that partition size + offset should fit in i64, so
5937            // in theory, we could support much larger offsets than this. But for our current
5938            // performance, even 1000000 is quite big.
5939            if *o1 > 1000000 || *o2 > 1000000 {
5940                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5941            }
5942        }
5943        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5944            if *o1 > 1000000 || *o2 > 1000000 {
5945                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5946            }
5947        }
5948        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5949            if *o1 > 1000000 || *o2 > 1000000 {
5950                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5951            }
5952        }
5953        (OffsetPreceding(o), CurrentRow) => {
5954            if *o > 1000000 {
5955                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5956            }
5957        }
5958        (CurrentRow, OffsetFollowing(o)) => {
5959            if *o > 1000000 {
5960                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5961            }
5962        }
5963        // Other bounds are valid
5964        (_, _) => (),
5965    }
5966
5967    // RANGE is only supported in the default frame
5968    // https://github.com/MaterializeInc/database-issues/issues/6585
5969    if units == mz_expr::WindowFrameUnits::Range
5970        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5971    {
5972        bail_unsupported!("RANGE in non-default window frames")
5973    }
5974
5975    let frame = mz_expr::WindowFrame {
5976        units,
5977        start_bound,
5978        end_bound,
5979    };
5980    Ok(frame)
5981}
5982
5983fn window_frame_unit_ast_to_expr(
5984    unit: &WindowFrameUnits,
5985) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5986    match unit {
5987        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5988        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5989        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5990    }
5991}
5992
5993fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5994    match bound {
5995        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5996        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5997        WindowFrameBound::Preceding(Some(offset)) => {
5998            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5999        }
6000        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6001        WindowFrameBound::Following(Some(offset)) => {
6002            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6003        }
6004    }
6005}
6006
6007pub fn scalar_type_from_sql(
6008    scx: &StatementContext,
6009    data_type: &ResolvedDataType,
6010) -> Result<SqlScalarType, PlanError> {
6011    match data_type {
6012        ResolvedDataType::AnonymousList(elem_type) => {
6013            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6014            if matches!(elem_type, SqlScalarType::Char { .. }) {
6015                bail_unsupported!("char list");
6016            }
6017            Ok(SqlScalarType::List {
6018                element_type: Box::new(elem_type),
6019                custom_id: None,
6020            })
6021        }
6022        ResolvedDataType::AnonymousMap {
6023            key_type,
6024            value_type,
6025        } => {
6026            match scalar_type_from_sql(scx, key_type)? {
6027                SqlScalarType::String => {}
6028                other => sql_bail!(
6029                    "map key type must be {}, got {}",
6030                    scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6031                    scx.humanize_sql_scalar_type(&other, false)
6032                ),
6033            }
6034            Ok(SqlScalarType::Map {
6035                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6036                custom_id: None,
6037            })
6038        }
6039        ResolvedDataType::Named { id, modifiers, .. } => {
6040            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6041        }
6042        ResolvedDataType::Error => bail_internal!("should have been caught in name resolution"),
6043    }
6044}
6045
6046pub fn scalar_type_from_catalog(
6047    catalog: &dyn SessionCatalog,
6048    id: CatalogItemId,
6049    modifiers: &[i64],
6050) -> Result<SqlScalarType, PlanError> {
6051    let entry = catalog.get_item(&id);
6052    let type_details = match entry.type_details() {
6053        Some(type_details) => type_details,
6054        None => {
6055            // Resolution should never produce a `ResolvedDataType::Named` with
6056            // an ID of a non-type, but we error gracefully just in case.
6057            sql_bail!(
6058                "internal error: {} does not refer to a type",
6059                catalog.resolve_full_name(entry.name()).to_string().quoted()
6060            );
6061        }
6062    };
6063    match &type_details.typ {
6064        CatalogType::Numeric => {
6065            let mut modifiers = modifiers.iter().fuse();
6066            let precision = match modifiers.next() {
6067                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6068                    sql_bail!(
6069                        "precision for type numeric must be between 1 and {}",
6070                        NUMERIC_DATUM_MAX_PRECISION,
6071                    );
6072                }
6073                Some(p) => Some(*p),
6074                None => None,
6075            };
6076            let scale = match modifiers.next() {
6077                Some(scale) => {
6078                    if let Some(precision) = precision {
6079                        if *scale > precision {
6080                            sql_bail!(
6081                                "scale for type numeric must be between 0 and precision {}",
6082                                precision
6083                            );
6084                        }
6085                    }
6086                    Some(NumericMaxScale::try_from(*scale)?)
6087                }
6088                None => None,
6089            };
6090            if modifiers.next().is_some() {
6091                sql_bail!("type numeric supports at most two type modifiers");
6092            }
6093            Ok(SqlScalarType::Numeric { max_scale: scale })
6094        }
6095        CatalogType::Char => {
6096            let mut modifiers = modifiers.iter().fuse();
6097            let length = match modifiers.next() {
6098                Some(l) => Some(CharLength::try_from(*l)?),
6099                None => Some(CharLength::ONE),
6100            };
6101            if modifiers.next().is_some() {
6102                sql_bail!("type character supports at most one type modifier");
6103            }
6104            Ok(SqlScalarType::Char { length })
6105        }
6106        CatalogType::VarChar => {
6107            let mut modifiers = modifiers.iter().fuse();
6108            let length = match modifiers.next() {
6109                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6110                None => None,
6111            };
6112            if modifiers.next().is_some() {
6113                sql_bail!("type character varying supports at most one type modifier");
6114            }
6115            Ok(SqlScalarType::VarChar { max_length: length })
6116        }
6117        CatalogType::Timestamp => {
6118            let mut modifiers = modifiers.iter().fuse();
6119            let precision = match modifiers.next() {
6120                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6121                None => None,
6122            };
6123            if modifiers.next().is_some() {
6124                sql_bail!("type timestamp supports at most one type modifier");
6125            }
6126            Ok(SqlScalarType::Timestamp { precision })
6127        }
6128        CatalogType::TimestampTz => {
6129            let mut modifiers = modifiers.iter().fuse();
6130            let precision = match modifiers.next() {
6131                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6132                None => None,
6133            };
6134            if modifiers.next().is_some() {
6135                sql_bail!("type timestamp with time zone supports at most one type modifier");
6136            }
6137            Ok(SqlScalarType::TimestampTz { precision })
6138        }
6139        t => {
6140            if !modifiers.is_empty() {
6141                sql_bail!(
6142                    "{} does not support type modifiers",
6143                    catalog.resolve_full_name(entry.name()).to_string()
6144                );
6145            }
6146            match t {
6147                CatalogType::Array {
6148                    element_reference: element_id,
6149                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6150                    catalog,
6151                    *element_id,
6152                    modifiers,
6153                )?))),
6154                CatalogType::List {
6155                    element_reference: element_id,
6156                    element_modifiers,
6157                } => Ok(SqlScalarType::List {
6158                    element_type: Box::new(scalar_type_from_catalog(
6159                        catalog,
6160                        *element_id,
6161                        element_modifiers,
6162                    )?),
6163                    custom_id: Some(id),
6164                }),
6165                CatalogType::Map {
6166                    key_reference: _,
6167                    key_modifiers: _,
6168                    value_reference: value_id,
6169                    value_modifiers,
6170                } => Ok(SqlScalarType::Map {
6171                    value_type: Box::new(scalar_type_from_catalog(
6172                        catalog,
6173                        *value_id,
6174                        value_modifiers,
6175                    )?),
6176                    custom_id: Some(id),
6177                }),
6178                CatalogType::Range {
6179                    element_reference: element_id,
6180                } => Ok(SqlScalarType::Range {
6181                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6182                }),
6183                CatalogType::Record { fields } => {
6184                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6185                        .iter()
6186                        .map(|f| {
6187                            let scalar_type = scalar_type_from_catalog(
6188                                catalog,
6189                                f.type_reference,
6190                                &f.type_modifiers,
6191                            )?;
6192                            Ok((
6193                                f.name.clone(),
6194                                SqlColumnType {
6195                                    scalar_type,
6196                                    nullable: true,
6197                                },
6198                            ))
6199                        })
6200                        .collect::<Result<Box<_>, PlanError>>()?;
6201                    Ok(SqlScalarType::Record {
6202                        fields: scalars,
6203                        custom_id: Some(id),
6204                    })
6205                }
6206                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6207                CatalogType::Bool => Ok(SqlScalarType::Bool),
6208                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6209                CatalogType::Date => Ok(SqlScalarType::Date),
6210                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6211                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6212                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6213                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6214                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6215                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6216                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6217                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6218                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6219                CatalogType::Interval => Ok(SqlScalarType::Interval),
6220                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6221                CatalogType::Oid => Ok(SqlScalarType::Oid),
6222                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6223                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6224                CatalogType::Pseudo => {
6225                    sql_bail!(
6226                        "cannot reference pseudo type {}",
6227                        catalog.resolve_full_name(entry.name()).to_string()
6228                    )
6229                }
6230                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6231                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6232                CatalogType::RegType => Ok(SqlScalarType::RegType),
6233                CatalogType::String => Ok(SqlScalarType::String),
6234                CatalogType::Time => Ok(SqlScalarType::Time),
6235                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6236                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6237                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6238                CatalogType::Numeric => unreachable!("handled above"),
6239                CatalogType::Char => unreachable!("handled above"),
6240                CatalogType::VarChar => unreachable!("handled above"),
6241                CatalogType::Timestamp => unreachable!("handled above"),
6242                CatalogType::TimestampTz => unreachable!("handled above"),
6243            }
6244        }
6245    }
6246}
6247
6248/// This is used to collect aggregates and table functions from within an `Expr`.
6249/// See the explanation of aggregate handling at the top of the file for more details.
6250struct AggregateTableFuncVisitor<'a> {
6251    scx: &'a StatementContext<'a>,
6252    aggs: Vec<Function<Aug>>,
6253    within_aggregate: bool,
6254    tables: BTreeMap<Function<Aug>, String>,
6255    table_disallowed_context: Vec<&'static str>,
6256    in_select_item: bool,
6257    id_gen: IdGen,
6258    err: Option<PlanError>,
6259}
6260
6261impl<'a> AggregateTableFuncVisitor<'a> {
6262    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6263        AggregateTableFuncVisitor {
6264            scx,
6265            aggs: Vec::new(),
6266            within_aggregate: false,
6267            tables: BTreeMap::new(),
6268            table_disallowed_context: Vec::new(),
6269            in_select_item: false,
6270            id_gen: Default::default(),
6271            err: None,
6272        }
6273    }
6274
6275    fn into_result(
6276        self,
6277    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6278        match self.err {
6279            Some(err) => Err(err),
6280            None => {
6281                // Dedup while preserving the order. We don't care what the order is, but it
6282                // has to be reproducible so that EXPLAIN PLAN tests work.
6283                let mut seen = BTreeSet::new();
6284                let aggs = self
6285                    .aggs
6286                    .into_iter()
6287                    .filter(move |agg| seen.insert(agg.clone()))
6288                    .collect();
6289                Ok((aggs, self.tables))
6290            }
6291        }
6292    }
6293}
6294
6295impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6296    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6297        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6298            Ok(i) => i,
6299            // Catching missing functions later in planning improves error messages.
6300            Err(_) => return,
6301        };
6302
6303        match item.func() {
6304            // We don't want to collect window aggregations, because these will be handled not by
6305            // plan_aggregate, but by plan_function.
6306            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6307                if self.within_aggregate {
6308                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6309                    return;
6310                }
6311                self.aggs.push(func.clone());
6312                let Function {
6313                    name: _,
6314                    args,
6315                    filter,
6316                    over: _,
6317                    distinct: _,
6318                } = func;
6319                if let Some(filter) = filter {
6320                    self.visit_expr_mut(filter);
6321                }
6322                let old_within_aggregate = self.within_aggregate;
6323                self.within_aggregate = true;
6324                self.table_disallowed_context
6325                    .push("aggregate function calls");
6326
6327                self.visit_function_args_mut(args);
6328
6329                self.within_aggregate = old_within_aggregate;
6330                self.table_disallowed_context.pop();
6331            }
6332            Ok(Func::Table { .. }) => {
6333                self.table_disallowed_context.push("other table functions");
6334                visit_mut::visit_function_mut(self, func);
6335                self.table_disallowed_context.pop();
6336            }
6337            _ => visit_mut::visit_function_mut(self, func),
6338        }
6339    }
6340
6341    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6342        // Don't go into subqueries.
6343    }
6344
6345    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6346        let (disallowed_context, func) = match expr {
6347            Expr::Case { .. } => (Some("CASE"), None),
6348            Expr::HomogenizingFunction {
6349                function: HomogenizingFunction::Coalesce,
6350                ..
6351            } => (Some("COALESCE"), None),
6352            Expr::Function(func) if self.in_select_item => {
6353                // If we're in a SELECT list, replace table functions with a uuid identifier
6354                // and save the table func so it can be planned elsewhere.
6355                let mut table_func = None;
6356                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6357                    if let Ok(Func::Table { .. }) = item.func() {
6358                        if let Some(context) = self.table_disallowed_context.last() {
6359                            self.err = Some(sql_err!(
6360                                "table functions are not allowed in {} (function {})",
6361                                context,
6362                                func.name
6363                            ));
6364                            return;
6365                        }
6366                        table_func = Some(func.clone());
6367                    }
6368                }
6369                // Since we will descend into the table func below, don't add its own disallow
6370                // context here, instead use visit_function to set that.
6371                (None, table_func)
6372            }
6373            _ => (None, None),
6374        };
6375        if let Some(func) = func {
6376            // Since we are trading out expr, we need to visit the table func here.
6377            visit_mut::visit_expr_mut(self, expr);
6378            // Don't attempt to replace table functions with unsupported syntax.
6379            if let Function {
6380                name: _,
6381                args: _,
6382                filter: None,
6383                over: None,
6384                distinct: false,
6385            } = &func
6386            {
6387                // Identical table functions can be de-duplicated.
6388                let unique_id = self.id_gen.allocate_id();
6389                let id = self
6390                    .tables
6391                    .entry(func)
6392                    .or_insert_with(|| format!("table_func_{unique_id}"));
6393                // We know this is okay because id is is 11 characters + <=20 characters, which is
6394                // less than our max length.
6395                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6396            }
6397        }
6398        if let Some(context) = disallowed_context {
6399            self.table_disallowed_context.push(context);
6400        }
6401
6402        visit_mut::visit_expr_mut(self, expr);
6403
6404        if disallowed_context.is_some() {
6405            self.table_disallowed_context.pop();
6406        }
6407    }
6408
6409    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6410        let old = self.in_select_item;
6411        self.in_select_item = true;
6412        visit_mut::visit_select_item_mut(self, si);
6413        self.in_select_item = old;
6414    }
6415}
6416
6417#[derive(Default)]
6418struct WindowFuncCollector {
6419    window_funcs: Vec<Expr<Aug>>,
6420}
6421
6422impl WindowFuncCollector {
6423    fn into_result(self) -> Vec<Expr<Aug>> {
6424        // Dedup while preserving the order.
6425        let mut seen = BTreeSet::new();
6426        let window_funcs_dedupped = self
6427            .window_funcs
6428            .into_iter()
6429            .filter(move |expr| seen.insert(expr.clone()))
6430            // Reverse the order, so that in case of a nested window function call, the
6431            // inner one is evaluated first.
6432            .rev()
6433            .collect();
6434        window_funcs_dedupped
6435    }
6436}
6437
6438impl Visit<'_, Aug> for WindowFuncCollector {
6439    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6440        match expr {
6441            Expr::Function(func) => {
6442                if func.over.is_some() {
6443                    self.window_funcs.push(expr.clone());
6444                }
6445            }
6446            _ => (),
6447        }
6448        visit::visit_expr(self, expr);
6449    }
6450
6451    fn visit_query(&mut self, _query: &Query<Aug>) {
6452        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6453    }
6454}
6455
6456/// Specifies how long a query will live.
6457#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6458pub enum QueryLifetime {
6459    /// The query's (or the expression's) result will be computed at one point in time.
6460    OneShot,
6461    /// The query (or expression) is used in a dataflow that maintains an index.
6462    Index,
6463    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6464    MaterializedView,
6465    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6466    Subscribe,
6467    /// The query (or expression) is part of a (non-materialized) view.
6468    View,
6469    /// The expression is part of a source definition.
6470    Source,
6471}
6472
6473impl QueryLifetime {
6474    /// (This used to impact whether the query is allowed to reason about the time at which it is
6475    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6476    /// mechanism, see `ExprPrepStyle`.)
6477    pub fn is_one_shot(&self) -> bool {
6478        let result = match self {
6479            QueryLifetime::OneShot => true,
6480            QueryLifetime::Index => false,
6481            QueryLifetime::MaterializedView => false,
6482            QueryLifetime::Subscribe => false,
6483            QueryLifetime::View => false,
6484            QueryLifetime::Source => false,
6485        };
6486        assert_eq!(!result, self.is_maintained());
6487        result
6488    }
6489
6490    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6491    /// turned into a `TopK`.
6492    pub fn is_maintained(&self) -> bool {
6493        match self {
6494            QueryLifetime::OneShot => false,
6495            QueryLifetime::Index => true,
6496            QueryLifetime::MaterializedView => true,
6497            QueryLifetime::Subscribe => true,
6498            QueryLifetime::View => true,
6499            QueryLifetime::Source => true,
6500        }
6501    }
6502
6503    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6504    pub fn allow_show(&self) -> bool {
6505        match self {
6506            QueryLifetime::OneShot => true,
6507            QueryLifetime::Index => false,
6508            QueryLifetime::MaterializedView => false,
6509            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6510            QueryLifetime::View => false,
6511            QueryLifetime::Source => false,
6512        }
6513    }
6514}
6515
6516/// Description of a CTE sufficient for query planning.
6517#[derive(Debug, Clone)]
6518pub struct CteDesc {
6519    pub name: String,
6520    pub desc: RelationDesc,
6521}
6522
6523/// The state required when planning a `Query`.
6524#[derive(Debug, Clone)]
6525pub struct QueryContext<'a> {
6526    /// The context for the containing `Statement`.
6527    pub scx: &'a StatementContext<'a>,
6528    /// The lifetime that the planned query will have.
6529    pub lifetime: QueryLifetime,
6530    /// The scopes of the outer relation expression.
6531    pub outer_scopes: Vec<Scope>,
6532    /// The type of the outer relation expressions.
6533    pub outer_relation_types: Vec<SqlRelationType>,
6534    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6535    pub ctes: BTreeMap<LocalId, CteDesc>,
6536    /// A name manager, for interning column names that will be stored in HIR and MIR.
6537    pub name_manager: Rc<RefCell<NameManager>>,
6538    pub recursion_guard: RecursionGuard,
6539}
6540
6541impl CheckedRecursion for QueryContext<'_> {
6542    fn recursion_guard(&self) -> &RecursionGuard {
6543        &self.recursion_guard
6544    }
6545}
6546
6547impl<'a> QueryContext<'a> {
6548    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6549        QueryContext {
6550            scx,
6551            lifetime,
6552            outer_scopes: vec![],
6553            outer_relation_types: vec![],
6554            ctes: BTreeMap::new(),
6555            name_manager: Rc::new(RefCell::new(NameManager::new())),
6556            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6557        }
6558    }
6559
6560    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6561        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6562    }
6563
6564    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6565    /// `self`.
6566    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6567        let ctes = self.ctes.clone();
6568        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6569        let outer_relation_types = iter::once(relation_type)
6570            .chain(self.outer_relation_types.clone())
6571            .collect();
6572        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6573        let name_manager = Rc::clone(&self.name_manager);
6574
6575        QueryContext {
6576            scx: self.scx,
6577            lifetime: self.lifetime,
6578            outer_scopes,
6579            outer_relation_types,
6580            ctes,
6581            name_manager,
6582            recursion_guard: self.recursion_guard.clone(),
6583        }
6584    }
6585
6586    /// Derives a `QueryContext` for a scope that contains no columns.
6587    fn empty_derived_context(&self) -> QueryContext<'a> {
6588        let scope = Scope::empty();
6589        let ty = SqlRelationType::empty();
6590        self.derived_context(scope, ty)
6591    }
6592
6593    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6594    /// CTE.
6595    pub fn resolve_table_name(
6596        &self,
6597        object: ResolvedItemName,
6598    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6599        match object {
6600            ResolvedItemName::Item {
6601                id,
6602                full_name,
6603                version,
6604                ..
6605            } => {
6606                let item = self.scx.get_item(&id).at_version(version);
6607                let desc = match item.relation_desc() {
6608                    Some(desc) => desc.clone(),
6609                    None => {
6610                        return Err(PlanError::InvalidDependency {
6611                            name: full_name.to_string(),
6612                            item_type: item.item_type().to_string(),
6613                        });
6614                    }
6615                };
6616                let expr = HirRelationExpr::Get {
6617                    id: Id::Global(item.global_id()),
6618                    typ: desc.typ().clone(),
6619                };
6620
6621                let name = full_name.into();
6622                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6623
6624                Ok((expr, scope))
6625            }
6626            ResolvedItemName::Cte { id, name } => {
6627                let name = name.into();
6628                let cte = self.ctes.get(&id).unwrap();
6629                let expr = HirRelationExpr::Get {
6630                    id: Id::Local(id),
6631                    typ: cte.desc.typ().clone(),
6632                };
6633
6634                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6635
6636                Ok((expr, scope))
6637            }
6638            ResolvedItemName::Error => bail_internal!("should have been caught in name resolution"),
6639        }
6640    }
6641
6642    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6643    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6644    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6645        self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6646    }
6647}
6648
6649/// A bundle of unrelated things that we need for planning `Expr`s.
6650#[derive(Debug, Clone)]
6651pub struct ExprContext<'a> {
6652    pub qcx: &'a QueryContext<'a>,
6653    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6654    pub name: &'a str,
6655    /// The context for the `Query` that contains this `Expr`.
6656    /// The current scope.
6657    pub scope: &'a Scope,
6658    /// The type of the current relation expression upon which this scalar
6659    /// expression will be evaluated.
6660    pub relation_type: &'a SqlRelationType,
6661    /// Are aggregate functions allowed in this context
6662    pub allow_aggregates: bool,
6663    /// Are subqueries allowed in this context
6664    pub allow_subqueries: bool,
6665    /// Are parameters allowed in this context.
6666    pub allow_parameters: bool,
6667    /// Are window functions allowed in this context
6668    pub allow_windows: bool,
6669}
6670
6671impl CheckedRecursion for ExprContext<'_> {
6672    fn recursion_guard(&self) -> &RecursionGuard {
6673        &self.qcx.recursion_guard
6674    }
6675}
6676
6677impl<'a> ExprContext<'a> {
6678    pub fn catalog(&self) -> &dyn SessionCatalog {
6679        self.qcx.scx.catalog
6680    }
6681
6682    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6683        let mut ecx = self.clone();
6684        ecx.name = name;
6685        ecx
6686    }
6687
6688    pub fn column_type<E>(&self, expr: &E) -> E::Type
6689    where
6690        E: AbstractExpr,
6691    {
6692        expr.typ(
6693            &self.qcx.outer_relation_types,
6694            self.relation_type,
6695            &self.qcx.scx.param_types.borrow(),
6696        )
6697    }
6698
6699    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6700    where
6701        E: AbstractExpr,
6702    {
6703        self.column_type(expr).scalar_type()
6704    }
6705
6706    fn derived_query_context(&self) -> QueryContext<'_> {
6707        let mut scope = self.scope.clone();
6708        scope.lateral_barrier = true;
6709        self.qcx.derived_context(scope, self.relation_type.clone())
6710    }
6711
6712    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6713        self.qcx.scx.require_feature_flag(flag)
6714    }
6715
6716    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6717        &self.qcx.scx.param_types
6718    }
6719
6720    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6721    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6722    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6723        self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6724    }
6725
6726    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6727        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6728    }
6729}
6730
6731/// Manages column names, doing lightweight string internment.
6732///
6733/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6734/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6735/// to ensure maximal sharing.
6736#[derive(Debug, Clone)]
6737pub struct NameManager(BTreeSet<Arc<str>>);
6738
6739impl NameManager {
6740    /// Creates a new `NameManager`, with no interned names
6741    pub fn new() -> Self {
6742        Self(BTreeSet::new())
6743    }
6744
6745    /// Interns a string, returning a reference-counted pointer to the interned
6746    /// string.
6747    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6748        let s = s.as_ref();
6749        if let Some(interned) = self.0.get(s) {
6750            Arc::clone(interned)
6751        } else {
6752            let interned: Arc<str> = Arc::from(s);
6753            self.0.insert(Arc::clone(&interned));
6754            interned
6755        }
6756    }
6757
6758    /// Interns a string representing a reference to a `ScopeItem`, returning a
6759    /// reference-counted pointer to the interned string.
6760    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6761        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6762        //
6763        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6764        // name information. Note that as of 2025-04-09, we don't support column
6765        // renames.
6766        //
6767        // A few bad alternatives:
6768        //
6769        // (1) Store it but don't write it down. This fails because the expression
6770        //     cache will erase our names on restart.
6771        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6772        //     get the right names. But the world now and the world when we made
6773        //     those objects may be different.
6774        // (3) Just don't write down the table name. Nothing fails... for now.
6775
6776        self.intern(item.column_name.as_str())
6777    }
6778}
6779
6780#[cfg(test)]
6781mod test {
6782    use super::*;
6783
6784    /// Ensure that `NameManager`'s string interning works as expected.
6785    ///
6786    /// In particular, structurally but not referentially identical strings should
6787    /// be interned to the same `Arc`ed pointer.
6788    #[mz_ore::test]
6789    pub fn test_name_manager_string_interning() {
6790        let mut nm = NameManager::new();
6791
6792        let orig_hi = "hi";
6793        let hi = nm.intern(orig_hi);
6794        let hello = nm.intern("hello");
6795
6796        assert_ne!(hi.as_ptr(), hello.as_ptr());
6797
6798        // this static string is _likely_ the same as `orig_hi``
6799        let hi2 = nm.intern("hi");
6800        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6801
6802        // generate a "hi" string that doesn't get optimized to the same static string
6803        let s = format!(
6804            "{}{}",
6805            hi.chars().nth(0).unwrap(),
6806            hi2.chars().nth(1).unwrap()
6807        );
6808        // make sure that we're testing with a fresh string!
6809        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6810
6811        let hi3 = nm.intern(s);
6812        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6813    }
6814}