Skip to main content

mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `SqlScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::rc::Rc;
45use std::sync::{Arc, LazyLock};
46use std::{iter, mem};
47
48use itertools::Itertools;
49use mz_expr::func::variadic::{
50    ArrayCreate, ArrayIndex, Coalesce, Greatest, Least, ListCreate, ListIndex, ListSliceLinear,
51    MapBuild, RecordCreate,
52};
53use mz_expr::virtual_syntax::AlgExcept;
54use mz_expr::{
55    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, REPEAT_ROW_NAME, RowSetFinishing,
56    TableFunc, func as expr_func,
57};
58use mz_ore::assert_none;
59use mz_ore::collections::CollectionExt;
60use mz_ore::error::ErrorExt;
61use mz_ore::id_gen::IdGen;
62use mz_ore::option::FallibleMapExt;
63use mz_ore::stack::{CheckedRecursion, RecursionGuard};
64use mz_ore::str::StrExt;
65use mz_repr::adt::char::CharLength;
66use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
67use mz_repr::adt::timestamp::TimestampPrecision;
68use mz_repr::adt::varchar::VarCharMaxLength;
69use mz_repr::namespaces::MZ_CATALOG_SCHEMA;
70use mz_repr::{
71    CatalogItemId, ColumnIndex, ColumnName, Datum, RelationDesc, RelationVersionSelector,
72    ReprColumnType, Row, RowArena, SqlColumnType, SqlRelationType, SqlScalarType,
73    UNKNOWN_COLUMN_NAME, strconv,
74};
75use mz_sql_parser::ast::display::AstDisplay;
76use mz_sql_parser::ast::visit::Visit;
77use mz_sql_parser::ast::visit_mut::{self, VisitMut};
78use mz_sql_parser::ast::{
79    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
80    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
81    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
82    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
83    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
84    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
85    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
86    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
87};
88use mz_sql_parser::ident;
89
90use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
91use crate::func::{self, Func, FuncSpec, TableFuncImpl};
92use crate::names::{
93    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
94};
95use crate::plan::PlanError::InvalidWmrRecursionLimit;
96use crate::plan::error::PlanError;
97use crate::plan::hir::{
98    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
99    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
100    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
101    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
102};
103use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
104use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
105use crate::plan::statement::{StatementContext, StatementDesc, show};
106use crate::plan::typeconv::{self, CastContext, plan_hypothetical_cast};
107use crate::plan::{
108    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
109    literal, transform_ast,
110};
111use crate::session::vars::ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK;
112use crate::session::vars::{self, FeatureFlag};
113use crate::{ORDINALITY_COL_NAME, normalize};
114
115#[derive(Debug)]
116pub struct PlannedRootQuery<E> {
117    pub expr: E,
118    pub desc: RelationDesc,
119    pub finishing: RowSetFinishing<HirScalarExpr, HirScalarExpr>,
120    pub scope: Scope,
121}
122
123/// Plans a top-level query, returning the `HirRelationExpr` describing the query
124/// plan, the `RelationDesc` describing the shape of the result set, a
125/// `RowSetFinishing` describing post-processing that must occur before results
126/// are sent to the client, and the types of the parameters in the query, if any
127/// were present.
128///
129/// Note that the returned `RelationDesc` describes the expression after
130/// applying the returned `RowSetFinishing`.
131#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
132pub fn plan_root_query(
133    scx: &StatementContext,
134    mut query: Query<Aug>,
135    lifetime: QueryLifetime,
136) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
137    transform_ast::transform(scx, &mut query)?;
138    let mut qcx = QueryContext::root(scx, lifetime);
139    let PlannedQuery {
140        mut expr,
141        scope,
142        order_by,
143        limit,
144        offset,
145        project,
146        group_size_hints,
147    } = plan_query(&mut qcx, &query)?;
148
149    let mut finishing = RowSetFinishing {
150        limit,
151        offset,
152        project,
153        order_by,
154    };
155
156    // Attempt to push the finishing's ordering past its projection. This allows
157    // data to be projected down on the workers rather than the coordinator. It
158    // also improves the optimizer's demand analysis, as the optimizer can only
159    // reason about demand information in `expr` (i.e., it can't see
160    // `finishing.project`).
161    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
162
163    if lifetime.is_maintained() {
164        expr.finish_maintained(&mut finishing, group_size_hints);
165    }
166
167    let typ = qcx.relation_type(&expr);
168    let typ = SqlRelationType::new(
169        finishing
170            .project
171            .iter()
172            .map(|i| typ.column_types[*i].clone())
173            .collect(),
174    );
175    let desc = RelationDesc::new(typ, scope.column_names());
176
177    Ok(PlannedRootQuery {
178        expr,
179        desc,
180        finishing,
181        scope,
182    })
183}
184
185/// Attempts to push a projection through an order by.
186///
187/// The returned bool indicates whether the pushdown was successful or not.
188/// Successful pushdown requires that all the columns referenced in `order_by`
189/// are included in `project`.
190///
191/// When successful, `expr` is wrapped in a projection node, `order_by` is
192/// rewritten to account for the pushed-down projection, and `project` is
193/// replaced with the trivial projection. When unsuccessful, no changes are made
194/// to any of the inputs.
195fn try_push_projection_order_by(
196    expr: &mut HirRelationExpr,
197    project: &mut Vec<usize>,
198    order_by: &mut Vec<ColumnOrder>,
199) -> bool {
200    let mut unproject = vec![None; expr.arity()];
201    for (out_i, in_i) in project.iter().copied().enumerate() {
202        unproject[in_i] = Some(out_i);
203    }
204    if order_by
205        .iter()
206        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
207    {
208        let trivial_project = (0..project.len()).collect();
209        *expr = expr.take().project(mem::replace(project, trivial_project));
210        for ob in order_by {
211            ob.column = unproject[ob.column].unwrap();
212        }
213        true
214    } else {
215        false
216    }
217}
218
219pub fn plan_insert_query(
220    scx: &StatementContext,
221    table_name: ResolvedItemName,
222    columns: Vec<Ident>,
223    source: InsertSource<Aug>,
224    returning: Vec<SelectItem<Aug>>,
225) -> Result<
226    (
227        CatalogItemId,
228        HirRelationExpr,
229        PlannedRootQuery<Vec<HirScalarExpr>>,
230    ),
231    PlanError,
232> {
233    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
234    let table = scx.get_item_by_resolved_name(&table_name)?;
235
236    // Validate the target of the insert.
237    if table.item_type() != CatalogItemType::Table {
238        sql_bail!(
239            "cannot insert into {} '{}'",
240            table.item_type(),
241            table_name.full_name_str()
242        );
243    }
244    let desc = table
245        .relation_desc()
246        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
247    let mut defaults = table
248        .writable_table_details()
249        .ok_or_else(|| {
250            sql_err!(
251                "cannot insert into non-writeable table '{}'",
252                table_name.full_name_str()
253            )
254        })?
255        .to_vec();
256
257    for default in &mut defaults {
258        transform_ast::transform(scx, default)?;
259    }
260
261    if table.id().is_system() {
262        sql_bail!(
263            "cannot insert into system table '{}'",
264            table_name.full_name_str()
265        );
266    }
267
268    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
269
270    // Validate target column order.
271    let mut source_types = Vec::with_capacity(columns.len());
272    let mut ordering = Vec::with_capacity(columns.len());
273
274    if columns.is_empty() {
275        // Columns in source query must be in order. Let's guess the full shape and truncate to the
276        // right size later after planning the source query
277        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
278        ordering.extend(0..desc.arity());
279    } else {
280        let column_by_name: BTreeMap<&ColumnName, (usize, &SqlColumnType)> = desc
281            .iter()
282            .enumerate()
283            .map(|(idx, (name, typ))| (name, (idx, typ)))
284            .collect();
285
286        for c in &columns {
287            if let Some((idx, typ)) = column_by_name.get(c) {
288                ordering.push(*idx);
289                source_types.push(&typ.scalar_type);
290            } else {
291                sql_bail!(
292                    "column {} of relation {} does not exist",
293                    c.quoted(),
294                    table_name.full_name_str().quoted()
295                );
296            }
297        }
298        if let Some(dup) = columns.iter().duplicates().next() {
299            sql_bail!("column {} specified more than once", dup.quoted());
300        }
301    };
302
303    // Plan the source.
304    let expr = match source {
305        InsertSource::Query(mut query) => {
306            transform_ast::transform(scx, &mut query)?;
307
308            match query {
309                // Special-case simple VALUES clauses as PostgreSQL does.
310                Query {
311                    body: SetExpr::Values(Values(values)),
312                    ctes,
313                    order_by,
314                    limit: None,
315                    offset: None,
316                } if ctes.is_empty() && order_by.is_empty() => {
317                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
318                    plan_values_insert(&qcx, &names, &source_types, &values)?
319                }
320                _ => {
321                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
322                    expr
323                }
324            }
325        }
326        InsertSource::DefaultValues => {
327            HirRelationExpr::constant(vec![vec![]], SqlRelationType::empty())
328        }
329    };
330
331    let expr_arity = expr.arity();
332
333    // Validate that the arity of the source query is at most the size of declared columns or the
334    // size of the table if none are declared
335    let max_columns = if columns.is_empty() {
336        desc.arity()
337    } else {
338        columns.len()
339    };
340    if expr_arity > max_columns {
341        sql_bail!("INSERT has more expressions than target columns");
342    }
343    // But it should never have less than the declared columns (or zero)
344    if expr_arity < columns.len() {
345        sql_bail!("INSERT has more target columns than expressions");
346    }
347
348    // Trim now that we know for sure the correct arity of the source query
349    source_types.truncate(expr_arity);
350    ordering.truncate(expr_arity);
351
352    // Ensure the types of the source query match the types of the target table,
353    // installing assignment casts where necessary and possible.
354    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
355        sql_err!(
356            "column {} is of type {} but expression is of type {}",
357            desc.get_name(ordering[e.column]).quoted(),
358            qcx.humanize_sql_scalar_type(&e.target_type, false),
359            qcx.humanize_sql_scalar_type(&e.source_type, false),
360        )
361    })?;
362
363    // Fill in any omitted columns and rearrange into correct order
364    let mut map_exprs = vec![];
365    let mut project_key = Vec::with_capacity(desc.arity());
366
367    // Maps from table column index to position in the source query
368    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
369
370    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
371    for (col_idx, (col_typ, default)) in column_details {
372        if let Some(src_idx) = col_to_source.get(&col_idx) {
373            project_key.push(*src_idx);
374        } else {
375            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
376            project_key.push(expr_arity + map_exprs.len());
377            map_exprs.push(hir);
378        }
379    }
380
381    let returning = {
382        let (scope, typ) = if let ResolvedItemName::Item {
383            full_name,
384            version: _,
385            ..
386        } = table_name
387        {
388            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
389            let typ = desc.typ().clone();
390            (scope, typ)
391        } else {
392            (Scope::empty(), SqlRelationType::empty())
393        };
394        let ecx = &ExprContext {
395            qcx: &qcx,
396            name: "RETURNING clause",
397            scope: &scope,
398            relation_type: &typ,
399            allow_aggregates: false,
400            allow_subqueries: false,
401            allow_parameters: true,
402            allow_windows: false,
403        };
404        let table_func_names = BTreeMap::new();
405        let mut output_columns = vec![];
406        let mut new_exprs = vec![];
407        let mut new_type = SqlRelationType::empty();
408        for mut si in returning {
409            transform_ast::transform(scx, &mut si)?;
410            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
411                let expr = match &select_item {
412                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
413                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
414                };
415                output_columns.push(column_name);
416                let typ = ecx.column_type(&expr);
417                new_type.column_types.push(typ);
418                new_exprs.push(expr);
419            }
420        }
421        let desc = RelationDesc::new(new_type, output_columns);
422        let desc_arity = desc.arity();
423        PlannedRootQuery {
424            expr: new_exprs,
425            desc,
426            finishing: HirRelationExpr::trivial_row_set_finishing_hir(desc_arity),
427            scope,
428        }
429    };
430
431    Ok((
432        table.id(),
433        expr.map(map_exprs).project(project_key),
434        returning,
435    ))
436}
437
438/// Determines the mapping between some external data and a Materialize relation.
439///
440/// Returns the following:
441/// * [`CatalogItemId`] for the destination table.
442/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
443/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
444///   since we now return a [`MapFilterProject`].
445/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
446///   destination table.
447///
448pub fn plan_copy_item(
449    scx: &StatementContext,
450    item_name: ResolvedItemName,
451    columns: Vec<Ident>,
452) -> Result<
453    (
454        CatalogItemId,
455        RelationDesc,
456        Vec<ColumnIndex>,
457        Option<MapFilterProject>,
458    ),
459    PlanError,
460> {
461    let item = scx.get_item_by_resolved_name(&item_name)?;
462    let fullname = scx.catalog.resolve_full_name(item.name());
463    let table_desc = match item.relation_desc() {
464        Some(desc) => desc.into_owned(),
465        None => {
466            return Err(PlanError::InvalidDependency {
467                name: fullname.to_string(),
468                item_type: item.item_type().to_string(),
469            });
470        }
471    };
472    let mut ordering = Vec::with_capacity(columns.len());
473
474    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
475    // should be simplified. The reason they are currently separate code paths is so we can roll
476    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
477
478    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
479    // FROM SOURCE ...`), then we generate an MFP.
480    //
481    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
482    // so it's not always guaranteed that our `item` is a table.
483    let mfp = if let Some(table_defaults) = item.writable_table_details() {
484        let mut table_defaults = table_defaults.to_vec();
485
486        for default in &mut table_defaults {
487            transform_ast::transform(scx, default)?;
488        }
489
490        // Fill in any omitted columns and rearrange into correct order
491        let source_column_names: Vec<_> = columns
492            .iter()
493            .cloned()
494            .map(normalize::column_name)
495            .collect();
496
497        let mut default_exprs = Vec::new();
498        let mut project_keys = Vec::with_capacity(table_desc.arity());
499
500        // For each column in the destination table, either project it from the source data, or provide
501        // an expression to fill in a default value.
502        let column_details = table_desc.iter().zip_eq(table_defaults);
503        for ((col_name, col_type), col_default) in column_details {
504            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
505            if let Some(src_idx) = maybe_src_idx {
506                project_keys.push(src_idx);
507            } else {
508                // If one a column from the table does not exist in the source data, then a default
509                // value will get appended to the end of the input Row from the source data.
510                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
511                let mir = hir.lower_uncorrelated(scx.catalog.system_vars())?;
512                project_keys.push(source_column_names.len() + default_exprs.len());
513                default_exprs.push(mir);
514            }
515        }
516
517        let mfp = MapFilterProject::new(source_column_names.len())
518            .map(default_exprs)
519            .project(project_keys);
520        Some(mfp)
521    } else {
522        None
523    };
524
525    // Create a mapping from input data to the table we're copying into.
526    let source_desc = if columns.is_empty() {
527        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
528        ordering.extend(indexes);
529
530        // The source data should be in the same order as the table.
531        table_desc
532    } else {
533        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
534        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &SqlColumnType)> = table_desc
535            .iter_all()
536            .map(|(idx, name, typ)| (name, (*idx, typ)))
537            .collect();
538
539        let mut names = Vec::with_capacity(columns.len());
540        let mut source_types = Vec::with_capacity(columns.len());
541
542        for c in &columns {
543            if let Some((idx, typ)) = column_by_name.get(c) {
544                ordering.push(*idx);
545                source_types.push((*typ).clone());
546                names.push(c.clone());
547            } else {
548                sql_bail!(
549                    "column {} of relation {} does not exist",
550                    c.quoted(),
551                    item_name.full_name_str().quoted()
552                );
553            }
554        }
555        if let Some(dup) = columns.iter().duplicates().next() {
556            sql_bail!("column {} specified more than once", dup.quoted());
557        }
558
559        // The source data is a different shape than the destination table.
560        RelationDesc::new(SqlRelationType::new(source_types), names)
561    };
562
563    Ok((item.id(), source_desc, ordering, mfp))
564}
565
566/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
567///
568/// TODO(cf3): Merge this method with [`plan_copy_item`].
569pub fn plan_copy_from(
570    scx: &StatementContext,
571    table_name: ResolvedItemName,
572    columns: Vec<Ident>,
573) -> Result<
574    (
575        CatalogItemId,
576        RelationDesc,
577        Vec<ColumnIndex>,
578        Option<MapFilterProject>,
579    ),
580    PlanError,
581> {
582    let table = scx.get_item_by_resolved_name(&table_name)?;
583
584    // Validate the target of the insert.
585    if table.item_type() != CatalogItemType::Table {
586        sql_bail!(
587            "cannot insert into {} '{}'",
588            table.item_type(),
589            table_name.full_name_str()
590        );
591    }
592
593    let _ = table.writable_table_details().ok_or_else(|| {
594        sql_err!(
595            "cannot insert into non-writeable table '{}'",
596            table_name.full_name_str()
597        )
598    })?;
599
600    if table.id().is_system() {
601        sql_bail!(
602            "cannot insert into system table '{}'",
603            table_name.full_name_str()
604        );
605    }
606    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
607
608    Ok((id, desc, ordering, mfp))
609}
610
611/// Builds a plan that adds the default values for the missing columns and re-orders
612/// the datums in the given rows to match the order in the target table.
613pub fn plan_copy_from_rows(
614    pcx: &PlanContext,
615    catalog: &dyn SessionCatalog,
616    target_id: CatalogItemId,
617    target_name: String,
618    columns: Vec<ColumnIndex>,
619    rows: Vec<mz_repr::Row>,
620) -> Result<HirRelationExpr, PlanError> {
621    let scx = StatementContext::new(Some(pcx), catalog);
622
623    // Always copy at the latest version of the table.
624    let table = catalog
625        .try_get_item(&target_id)
626        .ok_or_else(|| PlanError::CopyFromTargetTableDropped { target_name })?
627        .at_version(RelationVersionSelector::Latest);
628
629    let mut defaults = table
630        .writable_table_details()
631        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
632        .to_vec();
633
634    for default in &mut defaults {
635        transform_ast::transform(&scx, default)?;
636    }
637
638    let desc = table
639        .relation_desc()
640        .ok_or_else(|| sql_err!("item does not have a relation description"))?;
641    let column_types = columns
642        .iter()
643        .map(|x| desc.get_type(x).clone())
644        .map(|mut x| {
645            // Null constraint is enforced later, when inserting the row in the table.
646            // Without this, an assert is hit during lowering.
647            x.nullable = true;
648            x
649        })
650        .collect();
651    let typ = SqlRelationType::new(column_types);
652    let expr = HirRelationExpr::Constant {
653        rows,
654        typ: typ.clone(),
655    };
656
657    // Exit early with just the raw constant if we know that all columns are present
658    // and in the correct order. This lets us bypass expensive downstream optimizations
659    // more easily, as at every stage we know this expression is nothing more than
660    // a constant (as opposed to e.g. a constant with with an identity map and identity
661    // projection).
662    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
663    if columns == default {
664        return Ok(expr);
665    }
666
667    // Fill in any omitted columns and rearrange into correct order
668    let mut map_exprs = vec![];
669    let mut project_key = Vec::with_capacity(desc.arity());
670
671    // Maps from table column index to position in the source query
672    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
673
674    let column_details = desc.iter_all().zip_eq(defaults);
675    for ((col_idx, _col_name, col_typ), default) in column_details {
676        if let Some(src_idx) = col_to_source.get(&col_idx) {
677            project_key.push(*src_idx);
678        } else {
679            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
680            project_key.push(typ.arity() + map_exprs.len());
681            map_exprs.push(hir);
682        }
683    }
684
685    Ok(expr.map(map_exprs).project(project_key))
686}
687
688/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
689pub struct ReadThenWritePlan {
690    pub id: CatalogItemId,
691    /// Read portion of query.
692    ///
693    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
694    /// retractions.
695    pub selection: HirRelationExpr,
696    /// Map from column index to SET expression. Empty for DELETE statements.
697    pub assignments: BTreeMap<usize, HirScalarExpr>,
698    pub finishing: RowSetFinishing,
699}
700
701pub fn plan_delete_query(
702    scx: &StatementContext,
703    mut delete_stmt: DeleteStatement<Aug>,
704) -> Result<ReadThenWritePlan, PlanError> {
705    transform_ast::transform(scx, &mut delete_stmt)?;
706
707    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
708    plan_mutation_query_inner(
709        qcx,
710        delete_stmt.table_name,
711        delete_stmt.alias,
712        delete_stmt.using,
713        vec![],
714        delete_stmt.selection,
715    )
716}
717
718pub fn plan_update_query(
719    scx: &StatementContext,
720    mut update_stmt: UpdateStatement<Aug>,
721) -> Result<ReadThenWritePlan, PlanError> {
722    transform_ast::transform(scx, &mut update_stmt)?;
723
724    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
725
726    plan_mutation_query_inner(
727        qcx,
728        update_stmt.table_name,
729        update_stmt.alias,
730        vec![],
731        update_stmt.assignments,
732        update_stmt.selection,
733    )
734}
735
736pub fn plan_mutation_query_inner(
737    qcx: QueryContext,
738    table_name: ResolvedItemName,
739    alias: Option<TableAlias>,
740    using: Vec<TableWithJoins<Aug>>,
741    assignments: Vec<Assignment<Aug>>,
742    selection: Option<Expr<Aug>>,
743) -> Result<ReadThenWritePlan, PlanError> {
744    // Get ID and version of the relation desc.
745    let (id, version) = match table_name {
746        ResolvedItemName::Item { id, version, .. } => (id, version),
747        _ => sql_bail!("cannot mutate non-user table"),
748    };
749
750    // Perform checks on item with given ID.
751    let item = qcx.scx.get_item(&id).at_version(version);
752    if item.item_type() != CatalogItemType::Table {
753        sql_bail!(
754            "cannot mutate {} '{}'",
755            item.item_type(),
756            table_name.full_name_str()
757        );
758    }
759    let _ = item.writable_table_details().ok_or_else(|| {
760        sql_err!(
761            "cannot mutate non-writeable table '{}'",
762            table_name.full_name_str()
763        )
764    })?;
765    if id.is_system() {
766        sql_bail!(
767            "cannot mutate system table '{}'",
768            table_name.full_name_str()
769        );
770    }
771
772    // Derive structs for operation from validated table
773    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
774    let scope = plan_table_alias(scope, alias.as_ref())?;
775    let desc = item.relation_desc().expect("table has desc");
776    let relation_type = qcx.relation_type(&get);
777
778    if using.is_empty() {
779        if let Some(expr) = selection {
780            let ecx = &ExprContext {
781                qcx: &qcx,
782                name: "WHERE clause",
783                scope: &scope,
784                relation_type: &relation_type,
785                allow_aggregates: false,
786                allow_subqueries: true,
787                allow_parameters: true,
788                allow_windows: false,
789            };
790            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
791            get = get.filter(vec![expr]);
792        }
793    } else {
794        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
795    }
796
797    let mut sets = BTreeMap::new();
798    for Assignment { id, value } in assignments {
799        // Get the index and type of the column.
800        let name = normalize::column_name(id);
801        match desc.get_by_name(&name) {
802            Some((idx, typ)) => {
803                let ecx = &ExprContext {
804                    qcx: &qcx,
805                    name: "SET clause",
806                    scope: &scope,
807                    relation_type: &relation_type,
808                    allow_aggregates: false,
809                    allow_subqueries: false,
810                    allow_parameters: true,
811                    allow_windows: false,
812                };
813                let expr = plan_expr(ecx, &value)?.cast_to(
814                    ecx,
815                    CastContext::Assignment,
816                    &typ.scalar_type,
817                )?;
818
819                if sets.insert(idx, expr).is_some() {
820                    sql_bail!("column {} set twice", name)
821                }
822            }
823            None => sql_bail!("unknown column {}", name),
824        };
825    }
826
827    let finishing = RowSetFinishing {
828        order_by: vec![],
829        limit: None,
830        offset: 0,
831        project: (0..desc.arity()).collect(),
832    };
833
834    Ok(ReadThenWritePlan {
835        id,
836        selection: get,
837        finishing,
838        assignments: sets,
839    })
840}
841
842// Adjust `get` to perform an existential subquery on `using` accounting for
843// `selection`.
844//
845// If `USING`, we essentially want to rewrite the query as a correlated
846// existential subquery, i.e.
847// ```
848// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
849// ```
850// However, we can't do that directly because of esoteric rules w/r/t `lateral`
851// subqueries.
852// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
853fn handle_mutation_using_clause(
854    qcx: &QueryContext,
855    selection: Option<Expr<Aug>>,
856    using: Vec<TableWithJoins<Aug>>,
857    get: HirRelationExpr,
858    outer_scope: Scope,
859) -> Result<HirRelationExpr, PlanError> {
860    // Plan `USING` as a cross-joined `FROM` without knowledge of the
861    // statement's `FROM` target. This prevents `lateral` subqueries from
862    // "seeing" the `FROM` target.
863    let (mut using_rel_expr, using_scope) =
864        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
865            let (left, left_scope) = l;
866            plan_join(
867                qcx,
868                left,
869                left_scope,
870                &Join {
871                    relation: TableFactor::NestedJoin {
872                        join: Box::new(twj),
873                        alias: None,
874                    },
875                    join_operator: JoinOperator::CrossJoin,
876                },
877            )
878        })?;
879
880    if let Some(expr) = selection {
881        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
882        // PG-like semantics e.g. expressing ambiguous column references. We put
883        // `USING...` first for no real reason, but making a different decision
884        // would require adjusting the column references on this relation
885        // differently.
886        let on = HirScalarExpr::literal_true();
887        let joined = using_rel_expr
888            .clone()
889            .join(get.clone(), on, JoinKind::Inner);
890        let joined_scope = using_scope.product(outer_scope)?;
891        let joined_relation_type = qcx.relation_type(&joined);
892
893        let ecx = &ExprContext {
894            qcx,
895            name: "WHERE clause",
896            scope: &joined_scope,
897            relation_type: &joined_relation_type,
898            allow_aggregates: false,
899            allow_subqueries: true,
900            allow_parameters: true,
901            allow_windows: false,
902        };
903
904        // Plan the filter expression on `FROM, USING...`.
905        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &SqlScalarType::Bool)?;
906
907        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
908        // those to the right of `using_rel_expr`) to instead be correlated to
909        // the outer relation, i.e. `get`.
910        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
911        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
912        use mz_expr::visit::Visit;
913        expr.visit_mut_post(&mut |e| {
914            if let HirScalarExpr::Column(c, _name) = e {
915                if c.column >= using_rel_arity {
916                    c.level += 1;
917                    c.column -= using_rel_arity;
918                };
919            }
920        })?;
921
922        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
923        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
924        // relation.
925        using_rel_expr = using_rel_expr.filter(vec![expr]);
926    } else {
927        // Check that scopes are at compatible (i.e. do not double-reference
928        // same table), despite lack of selection
929        let _joined_scope = using_scope.product(outer_scope)?;
930    }
931    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
932    // whether any rows are returned, and not on the contents of those rows,
933    // the output list of the subquery is normally unimportant.
934    //
935    // This means we don't need to worry about projecting/mapping any
936    // additional expressions here.
937    //
938    // https://www.postgresql.org/docs/14/functions-subquery.html
939
940    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
941    Ok(get.filter(vec![using_rel_expr.exists()]))
942}
943
944#[derive(Debug)]
945pub(crate) struct CastRelationError {
946    pub(crate) column: usize,
947    pub(crate) source_type: SqlScalarType,
948    pub(crate) target_type: SqlScalarType,
949}
950
951/// Cast a relation from one type to another using the specified type of cast.
952///
953/// The length of `target_types` must match the arity of `expr`.
954pub(crate) fn cast_relation<'a, I>(
955    qcx: &QueryContext,
956    ccx: CastContext,
957    expr: HirRelationExpr,
958    target_types: I,
959) -> Result<HirRelationExpr, CastRelationError>
960where
961    I: IntoIterator<Item = &'a SqlScalarType>,
962{
963    let ecx = &ExprContext {
964        qcx,
965        name: "values",
966        scope: &Scope::empty(),
967        relation_type: &qcx.relation_type(&expr),
968        allow_aggregates: false,
969        allow_subqueries: true,
970        allow_parameters: true,
971        allow_windows: false,
972    };
973    let mut map_exprs = vec![];
974    let mut project_key = vec![];
975    for (i, target_typ) in target_types.into_iter().enumerate() {
976        let expr = HirScalarExpr::column(i);
977        // We plan every cast and check the evaluated expressions rather than
978        // checking the types directly because of some complex casting rules
979        // between types not expressed in `SqlScalarType` equality.
980        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
981            Ok(cast_expr) => {
982                if expr == cast_expr {
983                    // Cast between types was unnecessary
984                    project_key.push(i);
985                } else {
986                    // Cast between types required
987                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
988                    map_exprs.push(cast_expr);
989                }
990            }
991            Err(_) => {
992                return Err(CastRelationError {
993                    column: i,
994                    source_type: ecx.scalar_type(&expr),
995                    target_type: target_typ.clone(),
996                });
997            }
998        }
999    }
1000    Ok(expr.map(map_exprs).project(project_key))
1001}
1002
1003/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1004/// VIEW` statement.
1005pub fn plan_as_of(
1006    scx: &StatementContext,
1007    as_of: Option<AsOf<Aug>>,
1008) -> Result<QueryWhen, PlanError> {
1009    match as_of {
1010        None => Ok(QueryWhen::Immediately),
1011        Some(as_of) => match as_of {
1012            AsOf::At(expr) => Ok(QueryWhen::AtTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1013            AsOf::AtLeast(expr) => Ok(QueryWhen::AtLeastTimestamp(plan_as_of_or_up_to(scx, expr)?)),
1014        },
1015    }
1016}
1017
1018/// Plans and evaluates a scalar expression in a OneShot context to a non-null MzTimestamp.
1019///
1020/// Produces [`PlanError::InvalidAsOfUpTo`] if the expression is
1021/// - not a constant,
1022/// - not castable to MzTimestamp,
1023/// - is null,
1024/// - contains an unmaterializable function,
1025/// - some other evaluation error occurs, e.g., a division by 0,
1026/// - contains aggregates, subqueries, parameters, or window function calls.
1027pub fn plan_as_of_or_up_to(
1028    scx: &StatementContext,
1029    mut expr: Expr<Aug>,
1030) -> Result<mz_repr::Timestamp, PlanError> {
1031    let scope = Scope::empty();
1032    let desc = RelationDesc::empty();
1033    // (Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF or UP TO is
1034    // evaluated only once.)
1035    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1036    transform_ast::transform(scx, &mut expr)?;
1037    let ecx = &ExprContext {
1038        qcx: &qcx,
1039        name: "AS OF or UP TO",
1040        scope: &scope,
1041        relation_type: desc.typ(),
1042        allow_aggregates: false,
1043        allow_subqueries: false,
1044        allow_parameters: false,
1045        allow_windows: false,
1046    };
1047    let hir = plan_expr(ecx, &expr)?.cast_to(
1048        ecx,
1049        CastContext::Assignment,
1050        &SqlScalarType::MzTimestamp,
1051    )?;
1052    if hir.contains_unmaterializable() {
1053        bail_unsupported!("calling an unmaterializable function in AS OF or UP TO");
1054    }
1055    // At this point, we definitely have a constant expression:
1056    // - it can't contain any unmaterializable functions;
1057    // - it can't refer to any columns.
1058    // But the following can still fail due to a variety of reasons: most commonly, the cast can
1059    // fail, but also a null might appear, or some other evaluation error can happen, e.g., a
1060    // division by 0.
1061    let timestamp = hir
1062        .into_literal_mz_timestamp()
1063        .ok_or_else(|| PlanError::InvalidAsOfUpTo)?;
1064    Ok(timestamp)
1065}
1066
1067/// Plans an expression in the AS position of a `CREATE SECRET`.
1068pub fn plan_secret_as(
1069    scx: &StatementContext,
1070    mut expr: Expr<Aug>,
1071) -> Result<MirScalarExpr, PlanError> {
1072    let scope = Scope::empty();
1073    let desc = RelationDesc::empty();
1074    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1075
1076    transform_ast::transform(scx, &mut expr)?;
1077
1078    let ecx = &ExprContext {
1079        qcx: &qcx,
1080        name: "AS",
1081        scope: &scope,
1082        relation_type: desc.typ(),
1083        allow_aggregates: false,
1084        allow_subqueries: false,
1085        allow_parameters: false,
1086        allow_windows: false,
1087    };
1088    let expr = plan_expr(ecx, &expr)?
1089        .type_as(ecx, &SqlScalarType::Bytes)?
1090        .lower_uncorrelated(scx.catalog.system_vars())?;
1091    Ok(expr)
1092}
1093
1094/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1095pub fn plan_webhook_validate_using(
1096    scx: &StatementContext,
1097    validate_using: CreateWebhookSourceCheck<Aug>,
1098) -> Result<WebhookValidation, PlanError> {
1099    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1100
1101    let CreateWebhookSourceCheck {
1102        options,
1103        using: mut expr,
1104    } = validate_using;
1105
1106    let mut column_typs = vec![];
1107    let mut column_names = vec![];
1108
1109    let (bodies, headers, secrets) = options
1110        .map(|o| (o.bodies, o.headers, o.secrets))
1111        .unwrap_or_default();
1112
1113    // Append all of the bodies so they can be used in the expression.
1114    let mut body_tuples = vec![];
1115    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1116        let scalar_type = use_bytes
1117            .then_some(SqlScalarType::Bytes)
1118            .unwrap_or(SqlScalarType::String);
1119        let name = alias
1120            .map(|a| a.into_string())
1121            .unwrap_or_else(|| "body".to_string());
1122
1123        column_typs.push(SqlColumnType {
1124            scalar_type,
1125            nullable: false,
1126        });
1127        column_names.push(name);
1128
1129        // Store the column index so we can be sure to provide this body correctly.
1130        let column_idx = column_typs.len() - 1;
1131        // Double check we're consistent with column names.
1132        assert_eq!(
1133            column_idx,
1134            column_names.len() - 1,
1135            "body column names and types don't match"
1136        );
1137        body_tuples.push((column_idx, use_bytes));
1138    }
1139
1140    // Append all of the headers so they can be used in the expression.
1141    let mut header_tuples = vec![];
1142
1143    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1144        let value_type = use_bytes
1145            .then_some(SqlScalarType::Bytes)
1146            .unwrap_or(SqlScalarType::String);
1147        let name = alias
1148            .map(|a| a.into_string())
1149            .unwrap_or_else(|| "headers".to_string());
1150
1151        column_typs.push(SqlColumnType {
1152            scalar_type: SqlScalarType::Map {
1153                value_type: Box::new(value_type),
1154                custom_id: None,
1155            },
1156            nullable: false,
1157        });
1158        column_names.push(name);
1159
1160        // Store the column index so we can be sure to provide this body correctly.
1161        let column_idx = column_typs.len() - 1;
1162        // Double check we're consistent with column names.
1163        assert_eq!(
1164            column_idx,
1165            column_names.len() - 1,
1166            "header column names and types don't match"
1167        );
1168        header_tuples.push((column_idx, use_bytes));
1169    }
1170
1171    // Append all secrets so they can be used in the expression.
1172    let mut validation_secrets = vec![];
1173
1174    for CreateWebhookSourceSecret {
1175        secret,
1176        alias,
1177        use_bytes,
1178    } in secrets
1179    {
1180        // Either provide the secret to the validation expression as Bytes or a String.
1181        let scalar_type = use_bytes
1182            .then_some(SqlScalarType::Bytes)
1183            .unwrap_or(SqlScalarType::String);
1184
1185        column_typs.push(SqlColumnType {
1186            scalar_type,
1187            nullable: false,
1188        });
1189        let ResolvedItemName::Item {
1190            id,
1191            full_name: FullItemName { item, .. },
1192            ..
1193        } = secret
1194        else {
1195            return Err(PlanError::InvalidSecret(Box::new(secret)));
1196        };
1197
1198        // Plan the expression using the secret's alias, if one is provided.
1199        let name = if let Some(alias) = alias {
1200            alias.into_string()
1201        } else {
1202            item
1203        };
1204        column_names.push(name);
1205
1206        // Get the column index that corresponds for this secret, so we can make sure to provide the
1207        // secrets in the correct order during evaluation.
1208        let column_idx = column_typs.len() - 1;
1209        // Double check that our column names and types match.
1210        assert_eq!(
1211            column_idx,
1212            column_names.len() - 1,
1213            "column names and types don't match"
1214        );
1215
1216        validation_secrets.push(WebhookValidationSecret {
1217            id,
1218            column_idx,
1219            use_bytes,
1220        });
1221    }
1222
1223    let relation_typ = SqlRelationType::new(column_typs);
1224    let desc = RelationDesc::new(relation_typ, column_names.clone());
1225    let scope = Scope::from_source(None, column_names);
1226
1227    transform_ast::transform(scx, &mut expr)?;
1228
1229    let ecx = &ExprContext {
1230        qcx: &qcx,
1231        name: "CHECK",
1232        scope: &scope,
1233        relation_type: desc.typ(),
1234        allow_aggregates: false,
1235        allow_subqueries: false,
1236        allow_parameters: false,
1237        allow_windows: false,
1238    };
1239    let expr = plan_expr(ecx, &expr)?
1240        .type_as(ecx, &SqlScalarType::Bool)?
1241        .lower_uncorrelated(scx.catalog.system_vars())?;
1242    let validation = WebhookValidation {
1243        expression: expr,
1244        relation_desc: desc,
1245        bodies: body_tuples,
1246        headers: header_tuples,
1247        secrets: validation_secrets,
1248    };
1249    Ok(validation)
1250}
1251
1252pub fn plan_default_expr(
1253    scx: &StatementContext,
1254    expr: &Expr<Aug>,
1255    target_ty: &SqlScalarType,
1256) -> Result<HirScalarExpr, PlanError> {
1257    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1258    let ecx = &ExprContext {
1259        qcx: &qcx,
1260        name: "DEFAULT expression",
1261        scope: &Scope::empty(),
1262        relation_type: &SqlRelationType::empty(),
1263        allow_aggregates: false,
1264        allow_subqueries: false,
1265        allow_parameters: false,
1266        allow_windows: false,
1267    };
1268    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1269    Ok(hir)
1270}
1271
1272pub fn plan_params<'a>(
1273    scx: &'a StatementContext,
1274    params: Vec<Expr<Aug>>,
1275    desc: &StatementDesc,
1276) -> Result<Params, PlanError> {
1277    if params.len() != desc.param_types.len() {
1278        sql_bail!(
1279            "expected {} params, got {}",
1280            desc.param_types.len(),
1281            params.len()
1282        );
1283    }
1284
1285    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1286
1287    let mut datums = Row::default();
1288    let mut packer = datums.packer();
1289    let mut actual_types = Vec::new();
1290    let temp_storage = &RowArena::new();
1291    for (i, (mut expr, expected_ty)) in params.into_iter().zip_eq(&desc.param_types).enumerate() {
1292        transform_ast::transform(scx, &mut expr)?;
1293
1294        let ecx = execute_expr_context(&qcx);
1295        let ex = plan_expr(&ecx, &expr)?.type_as_any(&ecx)?;
1296        let actual_ty = ecx.scalar_type(&ex);
1297        if plan_hypothetical_cast(&ecx, *EXECUTE_CAST_CONTEXT, &actual_ty, expected_ty).is_none() {
1298            return Err(PlanError::WrongParameterType(
1299                i + 1,
1300                ecx.humanize_sql_scalar_type(expected_ty, false),
1301                ecx.humanize_sql_scalar_type(&actual_ty, false),
1302            ));
1303        }
1304        let ex = ex.lower_uncorrelated(scx.catalog.system_vars())?;
1305        let evaled = ex.eval(&[], temp_storage)?;
1306        packer.push(evaled);
1307        actual_types.push(actual_ty);
1308    }
1309    Ok(Params {
1310        datums,
1311        execute_types: actual_types,
1312        expected_types: desc.param_types.clone(),
1313    })
1314}
1315
1316static EXECUTE_CONTEXT_SCOPE: LazyLock<Scope> = LazyLock::new(Scope::empty);
1317static EXECUTE_CONTEXT_REL_TYPE: LazyLock<SqlRelationType> = LazyLock::new(SqlRelationType::empty);
1318
1319/// Returns an `ExprContext` for the expressions in the parameters of an EXECUTE statement.
1320pub(crate) fn execute_expr_context<'a>(qcx: &'a QueryContext<'a>) -> ExprContext<'a> {
1321    ExprContext {
1322        qcx,
1323        name: "EXECUTE",
1324        scope: &EXECUTE_CONTEXT_SCOPE,
1325        relation_type: &EXECUTE_CONTEXT_REL_TYPE,
1326        allow_aggregates: false,
1327        allow_subqueries: false,
1328        allow_parameters: false,
1329        allow_windows: false,
1330    }
1331}
1332
1333/// The CastContext used when matching up the types of parameters passed to EXECUTE.
1334///
1335/// This is an assignment cast also in Postgres, see
1336/// <https://github.com/MaterializeInc/database-issues/issues/9266>
1337pub(crate) static EXECUTE_CAST_CONTEXT: LazyLock<CastContext> =
1338    LazyLock::new(|| CastContext::Assignment);
1339
1340pub fn plan_index_exprs<'a>(
1341    scx: &'a StatementContext,
1342    on_desc: &RelationDesc,
1343    exprs: Vec<Expr<Aug>>,
1344) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1345    let scope = Scope::from_source(None, on_desc.iter_names());
1346    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1347
1348    let ecx = &ExprContext {
1349        qcx: &qcx,
1350        name: "CREATE INDEX",
1351        scope: &scope,
1352        relation_type: on_desc.typ(),
1353        allow_aggregates: false,
1354        allow_subqueries: false,
1355        allow_parameters: false,
1356        allow_windows: false,
1357    };
1358    let repr_col_types: Vec<ReprColumnType> = on_desc
1359        .typ()
1360        .column_types
1361        .iter()
1362        .map(ReprColumnType::from)
1363        .collect();
1364    let mut out = vec![];
1365    for mut expr in exprs {
1366        transform_ast::transform(scx, &mut expr)?;
1367        let expr = plan_expr_or_col_index(ecx, &expr)?;
1368        let mut expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
1369        expr.reduce(&repr_col_types);
1370        out.push(expr);
1371    }
1372    Ok(out)
1373}
1374
1375fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1376    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1377        Some(column) => Ok(HirScalarExpr::column(column)),
1378        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1379    }
1380}
1381
1382fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1383    match e {
1384        Expr::Value(Value::Number(n)) => {
1385            let n = n.parse::<usize>().map_err(|e| {
1386                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1387            })?;
1388            if n < 1 || n > max {
1389                sql_bail!(
1390                    "column reference {} in {} is out of range (1 - {})",
1391                    n,
1392                    name,
1393                    max
1394                );
1395            }
1396            Ok(Some(n - 1))
1397        }
1398        _ => Ok(None),
1399    }
1400}
1401
1402struct PlannedQuery {
1403    expr: HirRelationExpr,
1404    scope: Scope,
1405    order_by: Vec<ColumnOrder>,
1406    limit: Option<HirScalarExpr>,
1407    /// `offset` is either
1408    /// - an Int64 literal
1409    /// - or contains parameters. (If it contains parameters, then after parameter substitution it
1410    ///   should also be `is_constant` and reduce to an Int64 literal, but we check this only
1411    ///   later.)
1412    offset: HirScalarExpr,
1413    project: Vec<usize>,
1414    group_size_hints: GroupSizeHints,
1415}
1416
1417fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1418    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1419}
1420
1421fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1422    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1423    // for the identifiers, so that they can be re-installed before returning.
1424    let cte_bindings = plan_ctes(qcx, q)?;
1425
1426    let limit = match &q.limit {
1427        None => None,
1428        Some(Limit {
1429            quantity,
1430            with_ties: false,
1431        }) => {
1432            let ecx = &ExprContext {
1433                qcx,
1434                name: "LIMIT",
1435                scope: &Scope::empty(),
1436                relation_type: &SqlRelationType::empty(),
1437                allow_aggregates: false,
1438                allow_subqueries: true,
1439                allow_parameters: true,
1440                allow_windows: false,
1441            };
1442            let limit = plan_expr(ecx, quantity)?;
1443            let limit = limit.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1444
1445            let limit = if limit.is_constant() {
1446                let arena = RowArena::new();
1447                let limit = limit.lower_uncorrelated(qcx.scx.catalog.system_vars())?;
1448
1449                // TODO: Don't use ? on eval, but instead wrap the error and add the information
1450                // that the error happened in a LIMIT clause, so that we have better error msg for
1451                // something like `SELECT 5 LIMIT 'aaa'`.
1452                match limit.eval(&[], &arena)? {
1453                    d @ Datum::Int64(v) if v >= 0 => {
1454                        HirScalarExpr::literal(d, SqlScalarType::Int64)
1455                    }
1456                    d @ Datum::Null => HirScalarExpr::literal(d, SqlScalarType::Int64),
1457                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1458                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1459                }
1460            } else {
1461                // Gate non-constant LIMIT expressions behind a feature flag
1462                qcx.scx
1463                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1464                limit
1465            };
1466
1467            Some(limit)
1468        }
1469        Some(Limit {
1470            quantity: _,
1471            with_ties: true,
1472        }) => bail_unsupported!("FETCH ... WITH TIES"),
1473    };
1474
1475    let offset = match &q.offset {
1476        None => HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
1477        Some(offset) => {
1478            let ecx = &ExprContext {
1479                qcx,
1480                name: "OFFSET",
1481                scope: &Scope::empty(),
1482                relation_type: &SqlRelationType::empty(),
1483                allow_aggregates: false,
1484                allow_subqueries: false,
1485                allow_parameters: true,
1486                allow_windows: false,
1487            };
1488            let offset = plan_expr(ecx, offset)?;
1489            let offset = offset.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?;
1490
1491            let offset = if offset.is_constant() {
1492                // Simplify it to a literal or error out. (E.g., the cast inserted above may fail.)
1493                let offset_value = offset_into_value(offset)?;
1494                HirScalarExpr::literal(Datum::Int64(offset_value), SqlScalarType::Int64)
1495            } else {
1496                // The only case when this is allowed to not be a constant is if it contains
1497                // parameters. (In which case, we'll later check that it's a constant after
1498                // parameter binding.)
1499                if !offset.contains_parameters() {
1500                    return Err(PlanError::InvalidOffset(format!(
1501                        "must be simplifiable to a constant, possibly after parameter binding, got {}",
1502                        offset
1503                    )));
1504                }
1505                offset
1506            };
1507            offset
1508        }
1509    };
1510
1511    let mut planned_query = match &q.body {
1512        SetExpr::Select(s) => {
1513            // Extract query options.
1514            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1515            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1516
1517            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1518            PlannedQuery {
1519                expr: plan.expr,
1520                scope: plan.scope,
1521                order_by: plan.order_by,
1522                project: plan.project,
1523                limit,
1524                offset,
1525                group_size_hints,
1526            }
1527        }
1528        _ => {
1529            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1530            let ecx = &ExprContext {
1531                qcx,
1532                name: "ORDER BY clause of a set expression",
1533                scope: &scope,
1534                relation_type: &qcx.relation_type(&expr),
1535                allow_aggregates: false,
1536                allow_subqueries: true,
1537                allow_parameters: true,
1538                allow_windows: false,
1539            };
1540            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1541            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1542            let project = (0..ecx.relation_type.arity()).collect();
1543            PlannedQuery {
1544                expr: expr.map(map_exprs),
1545                scope,
1546                order_by,
1547                limit,
1548                project,
1549                offset,
1550                group_size_hints: GroupSizeHints::default(),
1551            }
1552        }
1553    };
1554
1555    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1556    match &q.ctes {
1557        CteBlock::Simple(_) => {
1558            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1559                if let Some(cte) = qcx.ctes.remove(&id) {
1560                    planned_query.expr = HirRelationExpr::Let {
1561                        name: cte.name,
1562                        id: id.clone(),
1563                        value: Box::new(value),
1564                        body: Box::new(planned_query.expr),
1565                    };
1566                }
1567                if let Some(shadowed_val) = shadowed_val {
1568                    qcx.ctes.insert(id, shadowed_val);
1569                }
1570            }
1571        }
1572        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1573            let MutRecBlockOptionExtracted {
1574                recursion_limit,
1575                return_at_recursion_limit,
1576                error_at_recursion_limit,
1577                seen: _,
1578            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1579            let limit = match (
1580                recursion_limit,
1581                return_at_recursion_limit,
1582                error_at_recursion_limit,
1583            ) {
1584                (None, None, None) => None,
1585                (Some(max_iters), None, None) => {
1586                    Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT))
1587                }
1588                (None, Some(max_iters), None) => Some((max_iters, true)),
1589                (None, None, Some(max_iters)) => Some((max_iters, false)),
1590                _ => {
1591                    return Err(InvalidWmrRecursionLimit(
1592                        "More than one recursion limit given. \
1593                         Please give at most one of RECURSION LIMIT, \
1594                         ERROR AT RECURSION LIMIT, \
1595                         RETURN AT RECURSION LIMIT."
1596                            .to_owned(),
1597                    ));
1598                }
1599            }
1600            .try_map(|(max_iters, return_at_limit)| {
1601                Ok::<LetRecLimit, PlanError>(LetRecLimit {
1602                    max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit(
1603                        "Recursion limit has to be greater than 0.".to_owned(),
1604                    ))?,
1605                    return_at_limit: *return_at_limit,
1606                })
1607            })?;
1608
1609            let mut bindings = Vec::new();
1610            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1611                if let Some(cte) = qcx.ctes.remove(&id) {
1612                    bindings.push((cte.name, id, value, cte.desc.into_typ()));
1613                }
1614                if let Some(shadowed_val) = shadowed_val {
1615                    qcx.ctes.insert(id, shadowed_val);
1616                }
1617            }
1618            if !bindings.is_empty() {
1619                planned_query.expr = HirRelationExpr::LetRec {
1620                    limit,
1621                    bindings,
1622                    body: Box::new(planned_query.expr),
1623                }
1624            }
1625        }
1626    }
1627
1628    Ok(planned_query)
1629}
1630
1631/// Converts an OFFSET expression into a value.
1632pub(crate) fn offset_into_value(offset: HirScalarExpr) -> Result<i64, PlanError> {
1633    let offset = offset
1634        .try_into_literal_int64()
1635        .map_err(|err| PlanError::InvalidOffset(err.to_string_with_causes()))?;
1636    if offset < 0 {
1637        return Err(negative_offset_error(offset));
1638    }
1639    Ok(offset)
1640}
1641
1642pub(crate) fn negative_offset_error(offset: i64) -> PlanError {
1643    PlanError::InvalidOffset(format!("must not be negative, got {}", offset))
1644}
1645
1646generate_extracted_config!(
1647    MutRecBlockOption,
1648    (RecursionLimit, u64),
1649    (ReturnAtRecursionLimit, u64),
1650    (ErrorAtRecursionLimit, u64)
1651);
1652
1653/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1654///
1655/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1656/// shadowed value that can be reinstalled once the planning has completed.
1657pub fn plan_ctes(
1658    qcx: &mut QueryContext,
1659    q: &Query<Aug>,
1660) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1661    // Accumulate planned expressions and shadowed descriptions.
1662    let mut result = Vec::new();
1663    // Retain the old descriptions of CTE bindings so that we can restore them
1664    // after we're done planning this SELECT.
1665    let mut shadowed_descs = BTreeMap::new();
1666
1667    // A reused identifier indicates a reused name.
1668    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1669        sql_bail!(
1670            "WITH query name {} specified more than once",
1671            normalize::ident_ref(ident).quoted()
1672        )
1673    }
1674
1675    match &q.ctes {
1676        CteBlock::Simple(ctes) => {
1677            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1678            for cte in ctes.iter() {
1679                let cte_name = normalize::ident(cte.alias.name.clone());
1680                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1681                let typ = qcx.relation_type(&val);
1682                let mut desc = RelationDesc::new(typ, scope.column_names());
1683                plan_utils::maybe_rename_columns(
1684                    format!("CTE {}", cte.alias.name),
1685                    &mut desc,
1686                    &cte.alias.columns,
1687                )?;
1688                // Capture the prior value if it exists, so that it can be re-installed.
1689                let shadowed = qcx.ctes.insert(
1690                    cte.id,
1691                    CteDesc {
1692                        name: cte_name,
1693                        desc,
1694                    },
1695                );
1696
1697                result.push((cte.id, val, shadowed));
1698            }
1699        }
1700        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1701            // Insert column types into `qcx.ctes` first for recursive bindings.
1702            for cte in ctes.iter() {
1703                let cte_name = normalize::ident(cte.name.clone());
1704                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1705                for column in cte.columns.iter() {
1706                    desc_columns.push((
1707                        normalize::column_name(column.name.clone()),
1708                        SqlColumnType {
1709                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1710                            nullable: true,
1711                        },
1712                    ));
1713                }
1714                let desc = RelationDesc::from_names_and_types(desc_columns);
1715                let shadowed = qcx.ctes.insert(
1716                    cte.id,
1717                    CteDesc {
1718                        name: cte_name,
1719                        desc,
1720                    },
1721                );
1722                // Capture the prior value if it exists, so that it can be re-installed.
1723                if let Some(shadowed) = shadowed {
1724                    shadowed_descs.insert(cte.id, shadowed);
1725                }
1726            }
1727
1728            // Plan all CTEs and validate the proposed types.
1729            for cte in ctes.iter() {
1730                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1731
1732                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1733
1734                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1735                    // Once WMR CTEs support NOT NULL constraints, check that
1736                    // nullability of derived column types are compatible.
1737                    sql_bail!(
1738                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1739                    );
1740                }
1741
1742                if !proposed_typ.keys.is_empty() {
1743                    // Once WMR CTEs support keys, check that keys exactly
1744                    // overlap.
1745                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1746                }
1747
1748                // Validate that the derived and proposed types are the same.
1749                let derived_typ = qcx.relation_type(&val);
1750
1751                let type_err = |proposed_typ: &SqlRelationType, derived_typ: SqlRelationType| {
1752                    let cte_name = normalize::ident(cte.name.clone());
1753                    let proposed_typ = proposed_typ
1754                        .column_types
1755                        .iter()
1756                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1757                        .collect::<Vec<_>>();
1758                    let inferred_typ = derived_typ
1759                        .column_types
1760                        .iter()
1761                        .map(|ty| qcx.humanize_sql_scalar_type(&ty.scalar_type, false))
1762                        .collect::<Vec<_>>();
1763                    Err(PlanError::RecursiveTypeMismatch(
1764                        cte_name,
1765                        proposed_typ,
1766                        inferred_typ,
1767                    ))
1768                };
1769
1770                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1771                    return type_err(proposed_typ, derived_typ);
1772                }
1773
1774                // Cast derived types to proposed types or error.
1775                let val = match cast_relation(
1776                    qcx,
1777                    // Choose `CastContext::Assignment`` because the user has
1778                    // been explicit about the types they expect. Choosing
1779                    // `CastContext::Implicit` is not "strong" enough to impose
1780                    // typmods from proposed types onto values.
1781                    CastContext::Assignment,
1782                    val,
1783                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1784                ) {
1785                    Ok(val) => val,
1786                    Err(_) => return type_err(proposed_typ, derived_typ),
1787                };
1788
1789                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1790            }
1791        }
1792    }
1793
1794    Ok(result)
1795}
1796
1797pub fn plan_nested_query(
1798    qcx: &mut QueryContext,
1799    q: &Query<Aug>,
1800) -> Result<(HirRelationExpr, Scope), PlanError> {
1801    let PlannedQuery {
1802        mut expr,
1803        scope,
1804        order_by,
1805        limit,
1806        offset,
1807        project,
1808        group_size_hints,
1809    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1810    if limit.is_some()
1811        || !offset
1812            .clone()
1813            .try_into_literal_int64()
1814            .is_ok_and(|offset| offset == 0)
1815    {
1816        expr = HirRelationExpr::top_k(
1817            expr,
1818            vec![],
1819            order_by,
1820            limit,
1821            offset,
1822            group_size_hints.limit_input_group_size,
1823        );
1824    }
1825    Ok((expr.project(project), scope))
1826}
1827
1828fn plan_set_expr(
1829    qcx: &mut QueryContext,
1830    q: &SetExpr<Aug>,
1831) -> Result<(HirRelationExpr, Scope), PlanError> {
1832    match q {
1833        SetExpr::Select(select) => {
1834            let order_by_exprs = Vec::new();
1835            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1836            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1837            // should not have planned any ordering.
1838            assert!(plan.order_by.is_empty());
1839            Ok((plan.expr.project(plan.project), plan.scope))
1840        }
1841        SetExpr::SetOperation {
1842            op,
1843            all,
1844            left,
1845            right,
1846        } => {
1847            // Plan the LHS and RHS.
1848            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1849            let (right_expr, right_scope) =
1850                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1851
1852            // Validate that the LHS and RHS are the same width.
1853            let left_type = qcx.relation_type(&left_expr);
1854            let right_type = qcx.relation_type(&right_expr);
1855            if left_type.arity() != right_type.arity() {
1856                sql_bail!(
1857                    "each {} query must have the same number of columns: {} vs {}",
1858                    op,
1859                    left_type.arity(),
1860                    right_type.arity(),
1861                );
1862            }
1863
1864            // Match the types of the corresponding columns on the LHS and RHS
1865            // using the normal type coercion rules. This is equivalent to
1866            // `coerce_homogeneous_exprs`, but implemented in terms of
1867            // `HirRelationExpr` rather than `HirScalarExpr`.
1868            let left_ecx = &ExprContext {
1869                qcx,
1870                name: &op.to_string(),
1871                scope: &left_scope,
1872                relation_type: &left_type,
1873                allow_aggregates: false,
1874                allow_subqueries: false,
1875                allow_parameters: false,
1876                allow_windows: false,
1877            };
1878            let right_ecx = &ExprContext {
1879                qcx,
1880                name: &op.to_string(),
1881                scope: &right_scope,
1882                relation_type: &right_type,
1883                allow_aggregates: false,
1884                allow_subqueries: false,
1885                allow_parameters: false,
1886                allow_windows: false,
1887            };
1888            let mut left_casts = vec![];
1889            let mut right_casts = vec![];
1890            for (i, (left_type, right_type)) in left_type
1891                .column_types
1892                .iter()
1893                .zip_eq(right_type.column_types.iter())
1894                .enumerate()
1895            {
1896                let types = &[
1897                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1898                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1899                ];
1900                let target =
1901                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1902                match typeconv::plan_cast(
1903                    left_ecx,
1904                    CastContext::Implicit,
1905                    HirScalarExpr::column(i),
1906                    &target,
1907                ) {
1908                    Ok(expr) => left_casts.push(expr),
1909                    Err(_) => sql_bail!(
1910                        "{} types {} and {} cannot be matched",
1911                        op,
1912                        qcx.humanize_sql_scalar_type(&left_type.scalar_type, false),
1913                        qcx.humanize_sql_scalar_type(&target, false),
1914                    ),
1915                }
1916                match typeconv::plan_cast(
1917                    right_ecx,
1918                    CastContext::Implicit,
1919                    HirScalarExpr::column(i),
1920                    &target,
1921                ) {
1922                    Ok(expr) => right_casts.push(expr),
1923                    Err(_) => sql_bail!(
1924                        "{} types {} and {} cannot be matched",
1925                        op,
1926                        qcx.humanize_sql_scalar_type(&target, false),
1927                        qcx.humanize_sql_scalar_type(&right_type.scalar_type, false),
1928                    ),
1929                }
1930            }
1931            let lhs = if left_casts
1932                .iter()
1933                .enumerate()
1934                .any(|(i, e)| e != &HirScalarExpr::column(i))
1935            {
1936                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1937                left_expr.map(left_casts).project(project_key)
1938            } else {
1939                left_expr
1940            };
1941            let rhs = if right_casts
1942                .iter()
1943                .enumerate()
1944                .any(|(i, e)| e != &HirScalarExpr::column(i))
1945            {
1946                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1947                right_expr.map(right_casts).project(project_key)
1948            } else {
1949                right_expr
1950            };
1951
1952            let relation_expr = match op {
1953                SetOperator::Union => {
1954                    if *all {
1955                        lhs.union(rhs)
1956                    } else {
1957                        lhs.union(rhs).distinct()
1958                    }
1959                }
1960                SetOperator::Except => Hir::except(all, lhs, rhs),
1961                SetOperator::Intersect => {
1962                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1963                    // Though we believe that render() does The Right Thing (TM)
1964                    // Also note that we do *not* need another threshold() at the end of the method chain
1965                    // because the right-hand side of the outer union only produces existing records,
1966                    // i.e., the record counts for differential data flow definitely remain non-negative.
1967                    let left_clone = lhs.clone();
1968                    if *all {
1969                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1970                    } else {
1971                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1972                            .distinct()
1973                    }
1974                }
1975            };
1976            let scope = Scope::from_source(
1977                None,
1978                // Column names are taken from the left, as in Postgres.
1979                left_scope.column_names(),
1980            );
1981
1982            Ok((relation_expr, scope))
1983        }
1984        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1985        SetExpr::Table(name) => {
1986            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1987            Ok((expr, scope))
1988        }
1989        SetExpr::Query(query) => {
1990            let (expr, scope) = plan_nested_query(qcx, query)?;
1991            Ok((expr, scope))
1992        }
1993        SetExpr::Show(stmt) => {
1994            // The create SQL definition of involving this query, will have the explicit `SHOW`
1995            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
1996            // current schema of the executing user. When Materialize restarts and tries to re-plan
1997            // these queries, it will only have access to the raw `SHOW` command and have no idea
1998            // what schema to use. As a result Materialize will fail to boot.
1999            //
2000            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
2001            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
2002            // However, banning show commands in views gives us more flexibility to change their
2003            // output.
2004            //
2005            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
2006            // with all show commands expanded into their equivalent SELECT statements.
2007            if !qcx.lifetime.allow_show() {
2008                return Err(PlanError::ShowCommandInView);
2009            }
2010
2011            // Some SHOW statements are a SELECT query. Others produces Rows
2012            // directly. Convert both of these to the needed Hir and Scope.
2013            fn to_hirscope(
2014                plan: ShowCreatePlan,
2015                desc: StatementDesc,
2016            ) -> Result<(HirRelationExpr, Scope), PlanError> {
2017                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
2018                let desc = desc.relation_desc.ok_or_else(|| {
2019                    internal_err!("statement description missing relation descriptor")
2020                })?;
2021                let scope = Scope::from_source(None, desc.iter_names());
2022                let expr = HirRelationExpr::constant(rows, desc.into_typ());
2023                Ok((expr, scope))
2024            }
2025
2026            match stmt.clone() {
2027                ShowStatement::ShowColumns(stmt) => {
2028                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
2029                }
2030                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
2031                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
2032                    show::describe_show_create_connection(qcx.scx, stmt)?,
2033                ),
2034                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
2035                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
2036                    show::describe_show_create_cluster(qcx.scx, stmt)?,
2037                ),
2038                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
2039                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
2040                    show::describe_show_create_index(qcx.scx, stmt)?,
2041                ),
2042                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
2043                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
2044                    show::describe_show_create_sink(qcx.scx, stmt)?,
2045                ),
2046                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
2047                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
2048                    show::describe_show_create_source(qcx.scx, stmt)?,
2049                ),
2050                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
2051                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
2052                    show::describe_show_create_table(qcx.scx, stmt)?,
2053                ),
2054                ShowStatement::ShowCreateView(stmt) => to_hirscope(
2055                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
2056                    show::describe_show_create_view(qcx.scx, stmt)?,
2057                ),
2058                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
2059                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
2060                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
2061                ),
2062                ShowStatement::ShowCreateType(stmt) => to_hirscope(
2063                    show::plan_show_create_type(qcx.scx, stmt.clone())?,
2064                    show::describe_show_create_type(qcx.scx, stmt)?,
2065                ),
2066                ShowStatement::ShowObjects(stmt) => {
2067                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
2068                }
2069                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
2070                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
2071            }
2072        }
2073    }
2074}
2075
2076/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2077fn plan_values(
2078    qcx: &QueryContext,
2079    values: &[Vec<Expr<Aug>>],
2080) -> Result<(HirRelationExpr, Scope), PlanError> {
2081    assert!(!values.is_empty());
2082
2083    let ecx = &ExprContext {
2084        qcx,
2085        name: "VALUES",
2086        scope: &Scope::empty(),
2087        relation_type: &SqlRelationType::empty(),
2088        allow_aggregates: false,
2089        allow_subqueries: true,
2090        allow_parameters: true,
2091        allow_windows: false,
2092    };
2093
2094    let ncols = values[0].len();
2095    let nrows = values.len();
2096
2097    // Arrange input expressions by columns, not rows, so that we can
2098    // call `coerce_homogeneous_exprs` on each column.
2099    let mut cols = vec![vec![]; ncols];
2100    for row in values {
2101        if row.len() != ncols {
2102            sql_bail!(
2103                "VALUES expression has varying number of columns: {} vs {}",
2104                row.len(),
2105                ncols
2106            );
2107        }
2108        for (i, v) in row.iter().enumerate() {
2109            cols[i].push(v);
2110        }
2111    }
2112
2113    // Plan each column.
2114    let mut col_iters = Vec::with_capacity(ncols);
2115    let mut col_types = Vec::with_capacity(ncols);
2116    for col in &cols {
2117        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2118        let mut col_type = ecx.column_type(&col[0]);
2119        for val in &col[1..] {
2120            col_type = col_type.sql_union(&ecx.column_type(val))?; // HIR deliberately not using `union`
2121        }
2122        col_types.push(col_type);
2123        col_iters.push(col.into_iter());
2124    }
2125
2126    // Build constant relation.
2127    let mut exprs = vec![];
2128    for _ in 0..nrows {
2129        for i in 0..ncols {
2130            exprs.push(col_iters[i].next().unwrap());
2131        }
2132    }
2133    let out = HirRelationExpr::CallTable {
2134        func: TableFunc::Wrap {
2135            width: ncols,
2136            types: col_types,
2137        },
2138        exprs,
2139    };
2140
2141    // Build column names.
2142    let mut scope = Scope::empty();
2143    for i in 0..ncols {
2144        let name = format!("column{}", i + 1);
2145        scope.items.push(ScopeItem::from_column_name(name));
2146    }
2147
2148    Ok((out, scope))
2149}
2150
2151/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2152/// statement.
2153///
2154/// This is special-cased in PostgreSQL and different enough from `plan_values`
2155/// that it is easier to use a separate function entirely. Unlike a normal
2156/// `VALUES` clause, each value is coerced to the type of the target table
2157/// via an assignment cast.
2158///
2159/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2160fn plan_values_insert(
2161    qcx: &QueryContext,
2162    target_names: &[&ColumnName],
2163    target_types: &[&SqlScalarType],
2164    values: &[Vec<Expr<Aug>>],
2165) -> Result<HirRelationExpr, PlanError> {
2166    assert!(!values.is_empty());
2167
2168    if !values.iter().map(|row| row.len()).all_equal() {
2169        sql_bail!("VALUES lists must all be the same length");
2170    }
2171
2172    let ecx = &ExprContext {
2173        qcx,
2174        name: "VALUES",
2175        scope: &Scope::empty(),
2176        relation_type: &SqlRelationType::empty(),
2177        allow_aggregates: false,
2178        allow_subqueries: true,
2179        allow_parameters: true,
2180        allow_windows: false,
2181    };
2182
2183    let mut exprs = vec![];
2184    let mut types = vec![];
2185    for row in values {
2186        if row.len() > target_names.len() {
2187            sql_bail!("INSERT has more expressions than target columns");
2188        }
2189        for (column, val) in row.into_iter().enumerate() {
2190            let target_type = &target_types[column];
2191            let val = plan_expr(ecx, val)?;
2192            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2193            let source_type = &ecx.scalar_type(&val);
2194            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2195                Ok(val) => val,
2196                Err(_) => sql_bail!(
2197                    "column {} is of type {} but expression is of type {}",
2198                    target_names[column].quoted(),
2199                    qcx.humanize_sql_scalar_type(target_type, false),
2200                    qcx.humanize_sql_scalar_type(source_type, false),
2201                ),
2202            };
2203            if column >= types.len() {
2204                types.push(ecx.column_type(&val));
2205            } else {
2206                types[column] = types[column].sql_union(&ecx.column_type(&val))?; // HIR deliberately not using `union`
2207            }
2208            exprs.push(val);
2209        }
2210    }
2211
2212    Ok(HirRelationExpr::CallTable {
2213        func: TableFunc::Wrap {
2214            width: values[0].len(),
2215            types,
2216        },
2217        exprs,
2218    })
2219}
2220
2221fn plan_join_identity() -> (HirRelationExpr, Scope) {
2222    let typ = SqlRelationType::new(vec![]);
2223    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2224    let scope = Scope::empty();
2225    (expr, scope)
2226}
2227
2228/// Describes how to execute a SELECT query.
2229///
2230/// `order_by` describes how to order the rows in `expr` *before* applying the
2231/// projection. The `scope` describes the columns in `expr` *after* the
2232/// projection has been applied.
2233#[derive(Debug)]
2234struct SelectPlan {
2235    expr: HirRelationExpr,
2236    scope: Scope,
2237    order_by: Vec<ColumnOrder>,
2238    project: Vec<usize>,
2239}
2240
2241generate_extracted_config!(
2242    SelectOption,
2243    (ExpectedGroupSize, u64),
2244    (AggregateInputGroupSize, u64),
2245    (DistinctOnInputGroupSize, u64),
2246    (LimitInputGroupSize, u64)
2247);
2248
2249/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2250///
2251/// Normally, the ORDER BY clause occurs after the columns specified in the
2252/// SELECT list have been projected. In a query like
2253///
2254///   CREATE TABLE (a int, b int)
2255///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2256///
2257/// it is valid to refer to `a`, because it is explicitly selected, but it would
2258/// not be valid to refer to unselected column `b`.
2259///
2260/// But PostgreSQL extends the standard to permit queries like
2261///
2262///   SELECT a FROM t ORDER BY b
2263///
2264/// where expressions in the ORDER BY clause can refer to *both* input columns
2265/// and output columns.
2266fn plan_select_from_where(
2267    qcx: &QueryContext,
2268    mut s: Select<Aug>,
2269    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2270) -> Result<SelectPlan, PlanError> {
2271    // TODO: Both `s` and `order_by_exprs` are not references because the
2272    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2273    // table function support (the UUID mapping). Attempt to change this so callers
2274    // don't need to clone the Select.
2275
2276    // Extract query options.
2277    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2278    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2279
2280    // Step 1. Handle FROM clause, including joins.
2281    let (mut relation_expr, mut from_scope) =
2282        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2283            let (left, left_scope) = l;
2284            plan_join(
2285                qcx,
2286                left,
2287                left_scope,
2288                &Join {
2289                    relation: TableFactor::NestedJoin {
2290                        join: Box::new(twj.clone()),
2291                        alias: None,
2292                    },
2293                    join_operator: JoinOperator::CrossJoin,
2294                },
2295            )
2296        })?;
2297
2298    // Step 2. Handle WHERE clause.
2299    if let Some(selection) = &s.selection {
2300        let ecx = &ExprContext {
2301            qcx,
2302            name: "WHERE clause",
2303            scope: &from_scope,
2304            relation_type: &qcx.relation_type(&relation_expr),
2305            allow_aggregates: false,
2306            allow_subqueries: true,
2307            allow_parameters: true,
2308            allow_windows: false,
2309        };
2310        let expr = plan_expr(ecx, selection)
2311            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2312            .type_as(ecx, &SqlScalarType::Bool)?;
2313        relation_expr = relation_expr.filter(vec![expr]);
2314    }
2315
2316    // Step 3. Gather aggregates and table functions.
2317    // (But skip window aggregates.)
2318    let (aggregates, table_funcs) = {
2319        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2320        visitor.visit_select_mut(&mut s);
2321        for o in order_by_exprs.iter_mut() {
2322            visitor.visit_order_by_expr_mut(o);
2323        }
2324        visitor.into_result()?
2325    };
2326    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2327    if !table_funcs.is_empty() {
2328        let (expr, scope) = plan_scalar_table_funcs(
2329            qcx,
2330            table_funcs,
2331            &mut table_func_names,
2332            &relation_expr,
2333            &from_scope,
2334        )?;
2335        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2336        from_scope = from_scope.product(scope)?;
2337    }
2338
2339    // Step 4. Expand SELECT clause.
2340    let projection = {
2341        let ecx = &ExprContext {
2342            qcx,
2343            name: "SELECT clause",
2344            scope: &from_scope,
2345            relation_type: &qcx.relation_type(&relation_expr),
2346            allow_aggregates: true,
2347            allow_subqueries: true,
2348            allow_parameters: true,
2349            allow_windows: true,
2350        };
2351        let mut out = vec![];
2352        for si in &s.projection {
2353            if *si == SelectItem::Wildcard && s.from.is_empty() {
2354                sql_bail!("SELECT * with no tables specified is not valid");
2355            }
2356            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2357        }
2358        out
2359    };
2360
2361    // Step 5. Handle GROUP BY clause.
2362    // This will also plan the aggregates gathered in Step 3.
2363    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2364    let (mut group_scope, select_all_mapping) = {
2365        // Compute GROUP BY expressions.
2366        let ecx = &ExprContext {
2367            qcx,
2368            name: "GROUP BY clause",
2369            scope: &from_scope,
2370            relation_type: &qcx.relation_type(&relation_expr),
2371            allow_aggregates: false,
2372            allow_subqueries: true,
2373            allow_parameters: true,
2374            allow_windows: false,
2375        };
2376        let mut group_key = vec![];
2377        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2378        let mut group_hir_exprs = vec![];
2379        let mut group_scope = Scope::empty();
2380        let mut select_all_mapping = BTreeMap::new();
2381
2382        for group_expr in &s.group_by {
2383            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2384            let new_column = group_key.len();
2385
2386            if let Some(group_expr) = group_expr {
2387                // Multiple AST expressions can map to the same HIR expression.
2388                // If we already have a ScopeItem for this HIR, we can add this
2389                // next AST expression to its set
2390                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2391                    existing_scope_item.exprs.insert(group_expr.clone());
2392                    continue;
2393                }
2394            }
2395
2396            let mut scope_item = if let HirScalarExpr::Column(
2397                ColumnRef {
2398                    level: 0,
2399                    column: old_column,
2400                },
2401                _name,
2402            ) = &expr
2403            {
2404                // If we later have `SELECT foo.*` then we have to find all
2405                // the `foo` items in `from_scope` and figure out where they
2406                // ended up in `group_scope`. This is really hard to do
2407                // right using SQL name resolution, so instead we just track
2408                // the movement here.
2409                select_all_mapping.insert(*old_column, new_column);
2410                let scope_item = ecx.scope.items[*old_column].clone();
2411                scope_item
2412            } else {
2413                ScopeItem::empty()
2414            };
2415
2416            if let Some(group_expr) = group_expr.cloned() {
2417                scope_item.exprs.insert(group_expr);
2418            }
2419
2420            group_key.push(from_scope.len() + group_exprs.len());
2421            group_hir_exprs.push(expr.clone());
2422            group_exprs.insert(expr, scope_item);
2423        }
2424
2425        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2426        for expr in &group_hir_exprs {
2427            if let Some(scope_item) = group_exprs.remove(expr) {
2428                group_scope.items.push(scope_item);
2429            }
2430        }
2431
2432        // Plan aggregates.
2433        let ecx = &ExprContext {
2434            qcx,
2435            name: "aggregate function",
2436            scope: &from_scope,
2437            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2438            allow_aggregates: false,
2439            allow_subqueries: true,
2440            allow_parameters: true,
2441            allow_windows: false,
2442        };
2443        let mut agg_exprs = vec![];
2444        for sql_function in aggregates {
2445            if sql_function.over.is_some() {
2446                unreachable!(
2447                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2448                );
2449            }
2450            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2451            group_scope
2452                .items
2453                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2454        }
2455        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2456            // apply GROUP BY / aggregates
2457            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2458                group_key,
2459                agg_exprs,
2460                group_size_hints.aggregate_input_group_size,
2461            );
2462
2463            // For every old column that wasn't a group key, add a scope item
2464            // that errors when referenced. We can't simply drop these items
2465            // from scope. These items need to *exist* because they might shadow
2466            // variables in outer scopes that would otherwise be valid to
2467            // reference, but accessing them needs to produce an error.
2468            for i in 0..from_scope.len() {
2469                if !select_all_mapping.contains_key(&i) {
2470                    let scope_item = &ecx.scope.items[i];
2471                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2472                        table_name: scope_item.table_name.clone(),
2473                        column_name: scope_item.column_name.clone(),
2474                        allow_unqualified_references: scope_item.allow_unqualified_references,
2475                    });
2476                }
2477            }
2478
2479            (group_scope, select_all_mapping)
2480        } else {
2481            // if no GROUP BY, aggregates or having then all columns remain in scope
2482            (
2483                from_scope.clone(),
2484                (0..from_scope.len()).map(|i| (i, i)).collect(),
2485            )
2486        }
2487    };
2488
2489    // Step 6. Handle HAVING clause.
2490    if let Some(ref having) = s.having {
2491        let ecx = &ExprContext {
2492            qcx,
2493            name: "HAVING clause",
2494            scope: &group_scope,
2495            relation_type: &qcx.relation_type(&relation_expr),
2496            allow_aggregates: true,
2497            allow_subqueries: true,
2498            allow_parameters: true,
2499            allow_windows: false,
2500        };
2501        let expr = plan_expr(ecx, having)?.type_as(ecx, &SqlScalarType::Bool)?;
2502        relation_expr = relation_expr.filter(vec![expr]);
2503    }
2504
2505    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2506    // (This includes window aggregations.)
2507    //
2508    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2509    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2510    //
2511    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2512    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2513    // and can't be part of a bigger expression.
2514    // See https://www.postgresql.org/docs/current/queries-order.html:
2515    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2516    // expression"
2517    let window_funcs = {
2518        let mut visitor = WindowFuncCollector::default();
2519        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2520        // window functions are excluded from other things by `allow_windows` being false when
2521        // planning those before this code).
2522        visitor.visit_select(&s);
2523        for o in order_by_exprs.iter() {
2524            visitor.visit_order_by_expr(o);
2525        }
2526        visitor.into_result()
2527    };
2528    for window_func in window_funcs {
2529        let ecx = &ExprContext {
2530            qcx,
2531            name: "window function",
2532            scope: &group_scope,
2533            relation_type: &qcx.relation_type(&relation_expr),
2534            allow_aggregates: true,
2535            allow_subqueries: true,
2536            allow_parameters: true,
2537            allow_windows: true,
2538        };
2539        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2540        group_scope.items.push(ScopeItem::from_expr(window_func));
2541    }
2542    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2543    // been already planned now. However, we should still set `allow_windows: true` for the
2544    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2545    // msg if an OVER clause is missing from a window function.
2546
2547    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2548    if let Some(ref qualify) = s.qualify {
2549        let ecx = &ExprContext {
2550            qcx,
2551            name: "QUALIFY clause",
2552            scope: &group_scope,
2553            relation_type: &qcx.relation_type(&relation_expr),
2554            allow_aggregates: true,
2555            allow_subqueries: true,
2556            allow_parameters: true,
2557            allow_windows: true,
2558        };
2559        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &SqlScalarType::Bool)?;
2560        relation_expr = relation_expr.filter(vec![expr]);
2561    }
2562
2563    // Step 9. Handle SELECT clause.
2564    let output_columns = {
2565        let mut new_exprs = vec![];
2566        let mut new_type = qcx.relation_type(&relation_expr);
2567        let mut output_columns = vec![];
2568        for (select_item, column_name) in &projection {
2569            let ecx = &ExprContext {
2570                qcx,
2571                name: "SELECT clause",
2572                scope: &group_scope,
2573                relation_type: &new_type,
2574                allow_aggregates: true,
2575                allow_subqueries: true,
2576                allow_parameters: true,
2577                allow_windows: true,
2578            };
2579            let expr = match select_item {
2580                ExpandedSelectItem::InputOrdinal(i) => {
2581                    if let Some(column) = select_all_mapping.get(i).copied() {
2582                        HirScalarExpr::column(column)
2583                    } else {
2584                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2585                    }
2586                }
2587                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2588            };
2589            if let HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) = expr {
2590                // Simple column reference; no need to map on a new expression.
2591                output_columns.push((column, column_name));
2592            } else {
2593                // Complicated expression that requires a map expression. We
2594                // update `group_scope` as we go so that future expressions that
2595                // are textually identical to this one can reuse it. This
2596                // duplicate detection is required for proper determination of
2597                // ambiguous column references with SQL92-style `ORDER BY`
2598                // items. See `plan_order_by_or_distinct_expr` for more.
2599                let typ = ecx.column_type(&expr);
2600                new_type.column_types.push(typ);
2601                new_exprs.push(expr);
2602                output_columns.push((group_scope.len(), column_name));
2603                group_scope
2604                    .items
2605                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2606            }
2607        }
2608        relation_expr = relation_expr.map(new_exprs);
2609        output_columns
2610    };
2611    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2612
2613    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2614    let order_by = {
2615        let relation_type = qcx.relation_type(&relation_expr);
2616        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2617            &ExprContext {
2618                qcx,
2619                name: "ORDER BY clause",
2620                scope: &group_scope,
2621                relation_type: &relation_type,
2622                allow_aggregates: true,
2623                allow_subqueries: true,
2624                allow_parameters: true,
2625                allow_windows: true,
2626            },
2627            &order_by_exprs,
2628            &output_columns,
2629        )?;
2630
2631        match s.distinct {
2632            None => relation_expr = relation_expr.map(map_exprs),
2633            Some(Distinct::EntireRow) => {
2634                if relation_type.arity() == 0 {
2635                    sql_bail!("SELECT DISTINCT must have at least one column");
2636                }
2637                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2638                // list, so we can't proceed if `ORDER BY` has introduced any
2639                // columns for arbitrary expressions. This matches PostgreSQL.
2640                if !try_push_projection_order_by(
2641                    &mut relation_expr,
2642                    &mut project_key,
2643                    &mut order_by,
2644                ) {
2645                    sql_bail!(
2646                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2647                    );
2648                }
2649                assert!(map_exprs.is_empty());
2650                relation_expr = relation_expr.distinct();
2651            }
2652            Some(Distinct::On(exprs)) => {
2653                let ecx = &ExprContext {
2654                    qcx,
2655                    name: "DISTINCT ON clause",
2656                    scope: &group_scope,
2657                    relation_type: &qcx.relation_type(&relation_expr),
2658                    allow_aggregates: true,
2659                    allow_subqueries: true,
2660                    allow_parameters: true,
2661                    allow_windows: true,
2662                };
2663
2664                let mut distinct_exprs = vec![];
2665                for expr in &exprs {
2666                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2667                    distinct_exprs.push(expr);
2668                }
2669
2670                let mut distinct_key = vec![];
2671
2672                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2673                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2674                // expressions, though the order of `DISTINCT ON` expressions
2675                // does not matter. This matches PostgreSQL and leaves the door
2676                // open to a future optimization where the `DISTINCT ON` and
2677                // `ORDER BY` operations happen in one pass.
2678                //
2679                // On the bright side, any columns that have already been
2680                // computed by `ORDER BY` can be reused in the distinct key.
2681                let arity = relation_type.arity();
2682                for ord in order_by.iter().take(distinct_exprs.len()) {
2683                    // The unusual construction of `expr` here is to ensure the
2684                    // temporary column expression lives long enough.
2685                    let mut expr = &HirScalarExpr::column(ord.column);
2686                    if ord.column >= arity {
2687                        expr = &map_exprs[ord.column - arity];
2688                    };
2689                    match distinct_exprs.iter().position(move |e| e == expr) {
2690                        None => sql_bail!(
2691                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2692                        ),
2693                        Some(pos) => {
2694                            distinct_exprs.remove(pos);
2695                        }
2696                    }
2697                    distinct_key.push(ord.column);
2698                }
2699
2700                // Add any remaining `DISTINCT ON` expressions to the key.
2701                for expr in distinct_exprs {
2702                    // If the expression is a reference to an existing column,
2703                    // do not introduce a new column to support it.
2704                    let column = match expr {
2705                        HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2706                        _ => {
2707                            map_exprs.push(expr);
2708                            arity + map_exprs.len() - 1
2709                        }
2710                    };
2711                    distinct_key.push(column);
2712                }
2713
2714                // `DISTINCT ON` is semantically a TopK with limit 1. The
2715                // columns in `ORDER BY` that are not part of the distinct key,
2716                // if there are any, determine the ordering within each group,
2717                // per PostgreSQL semantics.
2718                let distinct_len = distinct_key.len();
2719                relation_expr = HirRelationExpr::top_k(
2720                    relation_expr.map(map_exprs),
2721                    distinct_key,
2722                    order_by.iter().skip(distinct_len).cloned().collect(),
2723                    Some(HirScalarExpr::literal(
2724                        Datum::Int64(1),
2725                        SqlScalarType::Int64,
2726                    )),
2727                    HirScalarExpr::literal(Datum::Int64(0), SqlScalarType::Int64),
2728                    group_size_hints.distinct_on_input_group_size,
2729                );
2730            }
2731        }
2732
2733        order_by
2734    };
2735
2736    // Construct a clean scope to expose outwards, where all of the state that
2737    // accumulated in the scope during planning of this SELECT is erased. The
2738    // clean scope has at most one name for each column, and the names are not
2739    // associated with any table.
2740    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2741
2742    Ok(SelectPlan {
2743        expr: relation_expr,
2744        scope,
2745        order_by,
2746        project: project_key,
2747    })
2748}
2749
2750fn plan_scalar_table_funcs(
2751    qcx: &QueryContext,
2752    table_funcs: BTreeMap<Function<Aug>, String>,
2753    table_func_names: &mut BTreeMap<String, Ident>,
2754    relation_expr: &HirRelationExpr,
2755    from_scope: &Scope,
2756) -> Result<(HirRelationExpr, Scope), PlanError> {
2757    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2758
2759    for (table_func, id) in table_funcs.iter() {
2760        table_func_names.insert(
2761            id.clone(),
2762            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2763            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2764        );
2765    }
2766    // If there's only a single table function, we can skip generating
2767    // ordinality columns.
2768    if table_funcs.len() == 1 {
2769        let (table_func, id) = table_funcs.iter().next().unwrap();
2770        let (expr, mut scope) =
2771            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2772
2773        // A single table-function might return several columns as a record
2774        let num_cols = scope.len();
2775        for i in 0..scope.len() {
2776            scope.items[i].table_name = Some(PartialItemName {
2777                database: None,
2778                schema: None,
2779                item: id.clone(),
2780            });
2781            scope.items[i].from_single_column_function = num_cols == 1;
2782            scope.items[i].allow_unqualified_references = false;
2783        }
2784        return Ok((expr, scope));
2785    }
2786    if table_funcs.keys().any(is_repeat_row) {
2787        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
2788        // error message would be misleading, because it would refer to WITH ORDINALITY.
2789        bail_unsupported!(format!(
2790            "{} in a SELECT clause with multiple table functions",
2791            REPEAT_ROW_NAME
2792        ));
2793    }
2794    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2795    let (expr, mut scope, num_cols) =
2796        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2797
2798    // Munge the scope so table names match with the generated ids.
2799    let mut i = 0;
2800    for (id, num_cols) in table_funcs.values().zip_eq(num_cols) {
2801        for _ in 0..num_cols {
2802            scope.items[i].table_name = Some(PartialItemName {
2803                database: None,
2804                schema: None,
2805                item: id.clone(),
2806            });
2807            scope.items[i].from_single_column_function = num_cols == 1;
2808            scope.items[i].allow_unqualified_references = false;
2809            i += 1;
2810        }
2811        // Ordinality column. This doubles as the
2812        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2813        // because it only needs to be NULL or not.
2814        scope.items[i].table_name = Some(PartialItemName {
2815            database: None,
2816            schema: None,
2817            item: id.clone(),
2818        });
2819        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2820        scope.items[i].allow_unqualified_references = false;
2821        i += 1;
2822    }
2823    // Coalesced ordinality column.
2824    scope.items[i].allow_unqualified_references = false;
2825    Ok((expr, scope))
2826}
2827
2828/// Plans an expression in a `GROUP BY` clause.
2829///
2830/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2831/// names/expressions defined in the `SELECT` clause. These special cases are
2832/// handled by this function; see comments within the implementation for
2833/// details.
2834fn plan_group_by_expr<'a>(
2835    ecx: &ExprContext,
2836    group_expr: &'a Expr<Aug>,
2837    projection: &'a [(ExpandedSelectItem, ColumnName)],
2838) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2839    let plan_projection = |column: usize| match &projection[column].0 {
2840        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2841        ExpandedSelectItem::Expr(expr) => {
2842            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2843        }
2844    };
2845
2846    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2847    // a special case that means to use the ith item in the SELECT clause.
2848    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2849        return plan_projection(column);
2850    }
2851
2852    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2853    // The `foo` can refer to *either* an input column or an output column. If
2854    // both exist, the input column is preferred.
2855    match group_expr {
2856        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2857            Err(PlanError::UnknownColumn {
2858                table: None,
2859                column,
2860                similar,
2861            }) => {
2862                // The expression was a simple identifier that did not match an
2863                // input column. See if it matches an output column.
2864                let mut iter = projection.iter().map(|(_expr, name)| name);
2865                if let Some(i) = iter.position(|n| *n == column) {
2866                    if iter.any(|n| *n == column) {
2867                        Err(PlanError::AmbiguousColumn(column))
2868                    } else {
2869                        plan_projection(i)
2870                    }
2871                } else {
2872                    // The name didn't match an output column either. Return the
2873                    // "unknown column" error.
2874                    Err(PlanError::UnknownColumn {
2875                        table: None,
2876                        column,
2877                        similar,
2878                    })
2879                }
2880            }
2881            res => Ok((Some(group_expr), res?)),
2882        },
2883        _ => Ok((
2884            Some(group_expr),
2885            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2886        )),
2887    }
2888}
2889
2890/// Plans a slice of `ORDER BY` expressions.
2891///
2892/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2893/// parameter.
2894///
2895/// Returns the determined column orderings and a list of scalar expressions
2896/// that must be mapped onto the underlying relation expression.
2897pub(crate) fn plan_order_by_exprs(
2898    ecx: &ExprContext,
2899    order_by_exprs: &[OrderByExpr<Aug>],
2900    output_columns: &[(usize, &ColumnName)],
2901) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2902    let mut order_by = vec![];
2903    let mut map_exprs = vec![];
2904    for obe in order_by_exprs {
2905        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2906        // If the expression is a reference to an existing column,
2907        // do not introduce a new column to support it.
2908        let column = match expr {
2909            HirScalarExpr::Column(ColumnRef { level: 0, column }, _name) => column,
2910            _ => {
2911                map_exprs.push(expr);
2912                ecx.relation_type.arity() + map_exprs.len() - 1
2913            }
2914        };
2915        order_by.push(resolve_desc_and_nulls_last(obe, column));
2916    }
2917    Ok((order_by, map_exprs))
2918}
2919
2920/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2921///
2922/// The `output_columns` parameter describes, in order, the physical index and
2923/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2924/// corresponds to a `SELECT` list with a single entry named "a" that can be
2925/// found at index 3 in the underlying relation expression.
2926///
2927/// There are three cases to handle.
2928///
2929///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2930///       reference to the specified output column.
2931///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2932///       an output column, if it exists; otherwise it is a reference to an
2933///       input column.
2934///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2935///       arbitrary expressions exclusively refer to input columns, never output
2936///       columns.
2937fn plan_order_by_or_distinct_expr(
2938    ecx: &ExprContext,
2939    expr: &Expr<Aug>,
2940    output_columns: &[(usize, &ColumnName)],
2941) -> Result<HirScalarExpr, PlanError> {
2942    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2943        return Ok(HirScalarExpr::column(output_columns[i].0));
2944    }
2945
2946    if let Expr::Identifier(names) = expr {
2947        if let [name] = &names[..] {
2948            let name = normalize::column_name(name.clone());
2949            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2950            if let Some((i, _)) = iter.next() {
2951                match iter.next() {
2952                    // Per SQL92, names are not considered ambiguous if they
2953                    // refer to identical target list expressions, as in
2954                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2955                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2956                    _ => return Ok(HirScalarExpr::column(*i)),
2957                }
2958            }
2959        }
2960    }
2961
2962    plan_expr(ecx, expr)?.type_as_any(ecx)
2963}
2964
2965fn plan_table_with_joins(
2966    qcx: &QueryContext,
2967    table_with_joins: &TableWithJoins<Aug>,
2968) -> Result<(HirRelationExpr, Scope), PlanError> {
2969    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2970    for join in &table_with_joins.joins {
2971        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2972        expr = new_expr;
2973        scope = new_scope;
2974    }
2975    Ok((expr, scope))
2976}
2977
2978fn plan_table_factor(
2979    qcx: &QueryContext,
2980    table_factor: &TableFactor<Aug>,
2981) -> Result<(HirRelationExpr, Scope), PlanError> {
2982    match table_factor {
2983        TableFactor::Table { name, alias } => {
2984            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2985            let scope = plan_table_alias(scope, alias.as_ref())?;
2986            Ok((expr, scope))
2987        }
2988
2989        TableFactor::Function {
2990            function,
2991            alias,
2992            with_ordinality,
2993        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2994
2995        TableFactor::RowsFrom {
2996            functions,
2997            alias,
2998            with_ordinality,
2999        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
3000
3001        TableFactor::Derived {
3002            lateral,
3003            subquery,
3004            alias,
3005        } => {
3006            let mut qcx = (*qcx).clone();
3007            if !lateral {
3008                // Since this derived table was not marked as `LATERAL`,
3009                // make elements in outer scopes invisible until we reach the
3010                // next lateral barrier.
3011                for scope in &mut qcx.outer_scopes {
3012                    if scope.lateral_barrier {
3013                        break;
3014                    }
3015                    scope.items.clear();
3016                }
3017            }
3018            qcx.outer_scopes[0].lateral_barrier = true;
3019            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
3020            let scope = plan_table_alias(scope, alias.as_ref())?;
3021            Ok((expr, scope))
3022        }
3023
3024        TableFactor::NestedJoin { join, alias } => {
3025            let (expr, scope) = plan_table_with_joins(qcx, join)?;
3026            let scope = plan_table_alias(scope, alias.as_ref())?;
3027            Ok((expr, scope))
3028        }
3029    }
3030}
3031
3032/// Plans a `ROWS FROM` expression.
3033///
3034/// `ROWS FROM` concatenates table functions into a single table, filling in
3035/// `NULL`s in places where one table function has fewer rows than another. We
3036/// can achieve this by augmenting each table function with a row number, doing
3037/// a `FULL JOIN` between each table function on the row number and eventually
3038/// projecting away the row number columns. Concretely, the following query
3039/// using `ROWS FROM`
3040///
3041/// ```sql
3042/// SELECT
3043///     *
3044/// FROM
3045///     ROWS FROM (
3046///         generate_series(1, 2),
3047///         information_schema._pg_expandarray(ARRAY[9]),
3048///         generate_series(3, 6)
3049///     );
3050/// ```
3051///
3052/// is equivalent to the following query that does not use `ROWS FROM`:
3053///
3054/// ```sql
3055/// SELECT
3056///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
3057/// FROM
3058///     generate_series(1, 2) WITH ORDINALITY AS gs1
3059///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
3060///         ON gs1.ordinality = expand.ordinality
3061///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
3062///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
3063/// ```
3064///
3065/// Note the call to `coalesce` in the last join condition, which ensures that
3066/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
3067///
3068/// This function creates a HirRelationExpr that follows the structure of the
3069/// latter query.
3070///
3071/// `with_ordinality` can be used to have the output expression contain a
3072/// single coalesced ordinality column at the end of the entire expression.
3073fn plan_rows_from(
3074    qcx: &QueryContext,
3075    functions: &[Function<Aug>],
3076    alias: Option<&TableAlias>,
3077    with_ordinality: bool,
3078) -> Result<(HirRelationExpr, Scope), PlanError> {
3079    // The `repeat_row` function is not supported in ROWS FROM.
3080    if functions.iter().any(is_repeat_row) {
3081        // Note: Would be also caught by WITH ORDINALITY checking for `repeat_row`, but then the
3082        // error message would be misleading, because it would refer to WITH ORDINALITY instead of
3083        // ROWS FROM.
3084        bail_unsupported!(format!("{} in ROWS FROM", REPEAT_ROW_NAME));
3085    }
3086
3087    // If there's only a single table function, planning proceeds as if `ROWS
3088    // FROM` hadn't been written at all.
3089    if let [function] = functions {
3090        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
3091    }
3092
3093    // Per PostgreSQL, all scope items take the name of the first function
3094    // (unless aliased).
3095    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
3096    let (expr, mut scope, num_cols) = plan_rows_from_internal(
3097        qcx,
3098        functions,
3099        Some(functions[0].name.full_item_name().clone()),
3100    )?;
3101
3102    // Columns tracks the set of columns we will keep in the projection.
3103    let mut columns = Vec::new();
3104    let mut offset = 0;
3105    // Retain table function's non-ordinality columns.
3106    for (idx, cols) in num_cols.into_iter().enumerate() {
3107        for i in 0..cols {
3108            columns.push(offset + i);
3109        }
3110        offset += cols + 1;
3111
3112        // Remove the ordinality column from the scope, accounting for previous scope
3113        // changes from this loop.
3114        scope.items.remove(offset - idx - 1);
3115    }
3116
3117    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3118    // column. Otherwise remove it from the scope.
3119    if with_ordinality {
3120        columns.push(offset);
3121    } else {
3122        scope.items.pop();
3123    }
3124
3125    let expr = expr.project(columns);
3126
3127    let scope = plan_table_alias(scope, alias)?;
3128    Ok((expr, scope))
3129}
3130
3131fn is_repeat_row(f: &Function<Aug>) -> bool {
3132    f.name.full_name_str().as_str() == format!("{}.{}", MZ_CATALOG_SCHEMA, REPEAT_ROW_NAME)
3133}
3134
3135/// Plans an expression coalescing multiple table functions. Each table
3136/// function is followed by its row ordinality. The entire expression is
3137/// followed by the coalesced row ordinality.
3138///
3139/// The returned Scope will set all item's table_name's to the `table_name`
3140/// parameter if it is `Some`. If `None`, they will be the name of each table
3141/// function.
3142///
3143/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3144/// each table function.
3145///
3146/// For example, with table functions tf1 returning 1 column (a) and tf2
3147/// returning 2 columns (b, c), this function will return an expr 6 columns:
3148///
3149/// - tf1.a
3150/// - tf1.ordinality
3151/// - tf2.b
3152/// - tf2.c
3153/// - tf2.ordinality
3154/// - coalesced_ordinality
3155///
3156/// And a `Vec<usize>` of `[1, 2]`.
3157fn plan_rows_from_internal<'a>(
3158    qcx: &QueryContext,
3159    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3160    table_name: Option<FullItemName>,
3161) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3162    let mut functions = functions.into_iter();
3163    let mut num_cols = Vec::new();
3164
3165    // Join together each of the table functions in turn. The last column is
3166    // always the column to join against and is maintained to be the coalescence
3167    // of the row number column for all prior functions.
3168    let (mut left_expr, mut left_scope) =
3169        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3170    num_cols.push(left_scope.len() - 1);
3171    // Create the coalesced ordinality column.
3172    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3173    left_scope
3174        .items
3175        .push(ScopeItem::from_column_name(ORDINALITY_COL_NAME));
3176
3177    for function in functions {
3178        // The right hand side of a join must be planned in a new scope.
3179        let qcx = qcx.empty_derived_context();
3180        let (right_expr, mut right_scope) =
3181            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3182        num_cols.push(right_scope.len() - 1);
3183        let left_col = left_scope.len() - 1;
3184        let right_col = left_scope.len() + right_scope.len() - 1;
3185        let on = HirScalarExpr::call_binary(
3186            HirScalarExpr::column(left_col),
3187            HirScalarExpr::column(right_col),
3188            expr_func::Eq,
3189        );
3190        left_expr = left_expr
3191            .join(right_expr, on, JoinKind::FullOuter)
3192            .map(vec![HirScalarExpr::call_variadic(
3193                Coalesce,
3194                vec![
3195                    HirScalarExpr::column(left_col),
3196                    HirScalarExpr::column(right_col),
3197                ],
3198            )]);
3199
3200        // Project off the previous iteration's coalesced column, but keep both of this
3201        // iteration's ordinality columns.
3202        left_expr = left_expr.project(
3203            (0..left_col) // non-coalesced ordinality columns from left function
3204                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3205                .collect(),
3206        );
3207        // Move the coalesced ordinality column.
3208        right_scope.items.push(left_scope.items.pop().unwrap());
3209
3210        left_scope.items.extend(right_scope.items);
3211    }
3212
3213    Ok((left_expr, left_scope, num_cols))
3214}
3215
3216/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3217/// FROM` clause that contains other table functions. Special aliasing rules
3218/// apply.
3219fn plan_solitary_table_function(
3220    qcx: &QueryContext,
3221    function: &Function<Aug>,
3222    alias: Option<&TableAlias>,
3223    with_ordinality: bool,
3224) -> Result<(HirRelationExpr, Scope), PlanError> {
3225    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3226
3227    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3228    if single_column_function {
3229        let item = &mut scope.items[0];
3230
3231        // Mark that the function only produced a single column. This impacts
3232        // whole-row references.
3233        item.from_single_column_function = true;
3234
3235        // Strange special case for solitary table functions that output one
3236        // column whose name matches the name of the table function. If a table
3237        // alias is provided, the column name is changed to the table alias's
3238        // name. Concretely, the following query returns a column named `x`
3239        // rather than a column named `generate_series`:
3240        //
3241        //     SELECT * FROM generate_series(1, 5) AS x
3242        //
3243        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3244        // since its output column is explicitly named `value`, not
3245        // `jsonb_array_elements`.
3246        //
3247        // Note also that we may (correctly) change the column name again when
3248        // we plan the table alias below if the `alias.columns` is non-empty.
3249        if let Some(alias) = alias {
3250            if let ScopeItem {
3251                table_name: Some(table_name),
3252                column_name,
3253                ..
3254            } = item
3255            {
3256                if table_name.item.as_str() == column_name.as_str() {
3257                    *column_name = normalize::column_name(alias.name.clone());
3258                }
3259            }
3260        }
3261    }
3262
3263    let scope = plan_table_alias(scope, alias)?;
3264    Ok((expr, scope))
3265}
3266
3267/// Plans a table function.
3268///
3269/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3270/// instead to get the appropriate aliasing behavior.
3271fn plan_table_function_internal(
3272    qcx: &QueryContext,
3273    Function {
3274        name,
3275        args,
3276        filter,
3277        over,
3278        distinct,
3279    }: &Function<Aug>,
3280    with_ordinality: bool,
3281    table_name: Option<FullItemName>,
3282) -> Result<(HirRelationExpr, Scope), PlanError> {
3283    assert_none!(filter, "cannot parse table function with FILTER");
3284    assert_none!(over, "cannot parse table function with OVER");
3285    assert!(!*distinct, "cannot parse table function with DISTINCT");
3286
3287    let ecx = &ExprContext {
3288        qcx,
3289        name: "table function arguments",
3290        scope: &Scope::empty(),
3291        relation_type: &SqlRelationType::empty(),
3292        allow_aggregates: false,
3293        allow_subqueries: true,
3294        allow_parameters: true,
3295        allow_windows: false,
3296    };
3297
3298    let scalar_args = match args {
3299        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3300        FunctionArgs::Args { args, order_by } => {
3301            if !order_by.is_empty() {
3302                sql_bail!(
3303                    "ORDER BY specified, but {} is not an aggregate function",
3304                    name
3305                );
3306            }
3307            plan_exprs(ecx, args)?
3308        }
3309    };
3310
3311    let table_name = match table_name {
3312        Some(table_name) => table_name.item,
3313        None => name.full_item_name().item.clone(),
3314    };
3315
3316    let scope_name = Some(PartialItemName {
3317        database: None,
3318        schema: None,
3319        item: table_name,
3320    });
3321
3322    let (expr, mut scope) = match resolve_func(ecx, name, args)? {
3323        Func::Table(impls) => {
3324            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3325            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3326            let expr = match tf.imp {
3327                TableFuncImpl::CallTable { mut func, exprs } => {
3328                    if with_ordinality {
3329                        func = TableFunc::with_ordinality(func.clone()).ok_or(
3330                            PlanError::Unsupported {
3331                                feature: format!("WITH ORDINALITY on {}", func),
3332                                discussion_no: None,
3333                            },
3334                        )?;
3335                    }
3336                    HirRelationExpr::CallTable { func, exprs }
3337                }
3338                TableFuncImpl::Expr(expr) => {
3339                    if !with_ordinality {
3340                        expr
3341                    } else {
3342                        // The table function is defined by a SQL query (i.e., TableFuncImpl::Expr),
3343                        // so we can't use the new `WITH ORDINALITY` implementation. We can fall
3344                        // back to the legacy implementation or error out the query.
3345                        if qcx
3346                            .scx
3347                            .is_feature_flag_enabled(&ENABLE_WITH_ORDINALITY_LEGACY_FALLBACK)
3348                        {
3349                            // Note that this can give an incorrect ordering, and also has an extreme
3350                            // performance problem in some cases. See the doc comment of
3351                            // `TableFuncImpl`.
3352                            tracing::error!(
3353                                %name,
3354                                "Using the legacy WITH ORDINALITY / ROWS FROM implementation for a table function",
3355                            );
3356                            expr.map(vec![HirScalarExpr::windowing(WindowExpr {
3357                                func: WindowExprType::Scalar(ScalarWindowExpr {
3358                                    func: ScalarWindowFunc::RowNumber,
3359                                    order_by: vec![],
3360                                }),
3361                                partition_by: vec![],
3362                                order_by: vec![],
3363                            })])
3364                        } else {
3365                            bail_unsupported!(format!(
3366                                "WITH ORDINALITY or ROWS FROM with {}",
3367                                name
3368                            ));
3369                        }
3370                    }
3371                }
3372            };
3373            (expr, scope)
3374        }
3375        Func::Scalar(impls) => {
3376            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3377            let output = expr.typ(
3378                &qcx.outer_relation_types,
3379                &SqlRelationType::new(vec![]),
3380                &qcx.scx.param_types.borrow(),
3381            );
3382
3383            let relation = SqlRelationType::new(vec![output]);
3384
3385            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3386            let column_name = normalize::column_name(function_ident);
3387            let name = column_name.to_string();
3388
3389            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3390
3391            let mut func = TableFunc::TabletizedScalar { relation, name };
3392            if with_ordinality {
3393                func = TableFunc::with_ordinality(func.clone()).ok_or(PlanError::Unsupported {
3394                    feature: format!("WITH ORDINALITY on {}", func),
3395                    discussion_no: None,
3396                })?;
3397            }
3398            (
3399                HirRelationExpr::CallTable {
3400                    func,
3401                    exprs: vec![expr],
3402                },
3403                scope,
3404            )
3405        }
3406        o => sql_bail!(
3407            "{} functions are not supported in functions in FROM",
3408            o.class()
3409        ),
3410    };
3411
3412    if with_ordinality {
3413        scope
3414            .items
3415            .push(ScopeItem::from_name(scope_name, "ordinality"));
3416    }
3417
3418    Ok((expr, scope))
3419}
3420
3421fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3422    if let Some(TableAlias {
3423        name,
3424        columns,
3425        strict,
3426    }) = alias
3427    {
3428        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3429            sql_bail!(
3430                "{} has {} columns available but {} columns specified",
3431                name,
3432                scope.items.len(),
3433                columns.len()
3434            );
3435        }
3436
3437        let table_name = normalize::ident(name.to_owned());
3438        for (i, item) in scope.items.iter_mut().enumerate() {
3439            item.table_name = if item.allow_unqualified_references {
3440                Some(PartialItemName {
3441                    database: None,
3442                    schema: None,
3443                    item: table_name.clone(),
3444                })
3445            } else {
3446                // Columns that prohibit unqualified references are special
3447                // columns from the output of a NATURAL or USING join that can
3448                // only be referenced by their full, pre-join name. Applying an
3449                // alias to the output of that join renders those columns
3450                // inaccessible, which we accomplish here by setting the
3451                // table name to `None`.
3452                //
3453                // Concretely, consider:
3454                //
3455                //      CREATE TABLE t1 (a int);
3456                //      CREATE TABLE t2 (a int);
3457                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3458                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3459                //
3460                // In (1), the join has no alias. The underlying columns from
3461                // either side of the join can be referenced as `t1.a` and
3462                // `t2.a`, respectively, and the unqualified name `a` refers to
3463                // a column whose value is `coalesce(t1.a, t2.a)`.
3464                //
3465                // In (2), the join is aliased as `t`. The columns from either
3466                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3467                // the coalesced column can be named as either `a` or `t.a`.
3468                //
3469                // We previously had a bug [0] that mishandled this subtle
3470                // logic.
3471                //
3472                // NOTE(benesch): We could in theory choose to project away
3473                // those inaccessible columns and drop them from the scope
3474                // entirely, but that would require that this function also
3475                // take and return the `HirRelationExpr` that is being aliased,
3476                // which is a rather large refactor.
3477                //
3478                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3479                None
3480            };
3481            item.column_name = columns
3482                .get(i)
3483                .map(|a| normalize::column_name(a.clone()))
3484                .unwrap_or_else(|| item.column_name.clone());
3485        }
3486    }
3487    Ok(scope)
3488}
3489
3490// `table_func_names` is a mapping from a UUID to the original function
3491// name. The UUIDs are identifiers that have been rewritten from some table
3492// function expression, and this mapping restores the original names.
3493fn invent_column_name(
3494    ecx: &ExprContext,
3495    expr: &Expr<Aug>,
3496    table_func_names: &BTreeMap<String, Ident>,
3497) -> Result<Option<ColumnName>, PlanError> {
3498    // We follow PostgreSQL exactly here, which has some complicated rules
3499    // around "high" and "low" quality names. Low quality names override other
3500    // low quality names but not high quality names.
3501    //
3502    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3503
3504    #[derive(Debug)]
3505    enum NameQuality {
3506        Low,
3507        High,
3508    }
3509
3510    fn invent(
3511        ecx: &ExprContext,
3512        expr: &Expr<Aug>,
3513        table_func_names: &BTreeMap<String, Ident>,
3514    ) -> Result<Option<(ColumnName, NameQuality)>, PlanError> {
3515        Ok(match expr {
3516            Expr::Identifier(names) => {
3517                if let [name] = names.as_slice() {
3518                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3519                        return Ok(Some((
3520                            normalize::column_name(table_func_name.clone()),
3521                            NameQuality::High,
3522                        )));
3523                    }
3524                }
3525                names
3526                    .last()
3527                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3528            }
3529            Expr::Value(v) => match v {
3530                // Per PostgreSQL, `bool` and `interval` literals take on the name
3531                // of their type, but not other literal types.
3532                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3533                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3534                _ => None,
3535            },
3536            Expr::Function(func) => {
3537                let (schema, item) = match &func.name {
3538                    ResolvedItemName::Item {
3539                        qualifiers,
3540                        full_name,
3541                        ..
3542                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3543                    // Name resolution should have rejected anything other than
3544                    // `Item` for a function call.
3545                    _ => {
3546                        bail_internal!("function name did not resolve to an item: {:?}", func.name)
3547                    }
3548                };
3549
3550                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3551                    || schema
3552                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3553                {
3554                    None
3555                } else {
3556                    Some((item.into(), NameQuality::High))
3557                }
3558            }
3559            Expr::HomogenizingFunction { function, .. } => Some((
3560                function.to_string().to_lowercase().into(),
3561                NameQuality::High,
3562            )),
3563            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3564            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3565            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3566            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3567            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names)? {
3568                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3569                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3570            },
3571            Expr::Case { else_result, .. } => {
3572                let inner = match else_result.as_ref() {
3573                    Some(else_result) => invent(ecx, else_result, table_func_names)?,
3574                    None => None,
3575                };
3576                match inner {
3577                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3578                    _ => Some(("case".into(), NameQuality::Low)),
3579                }
3580            }
3581            Expr::FieldAccess { field, .. } => {
3582                Some((normalize::column_name(field.clone()), NameQuality::High))
3583            }
3584            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3585            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names)?,
3586            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3587                // A bit silly to have to plan the query here just to get its column
3588                // name, since we throw away the planned expression, but fixing this
3589                // requires a separate semantic analysis phase.
3590                //
3591                // We deliberately swallow planning errors here: if the subquery
3592                // doesn't plan, we just don't invent a name for it; the real
3593                // planning attempt elsewhere will surface the error.
3594                let Ok((_expr, scope)) = plan_nested_query(&mut ecx.derived_query_context(), query)
3595                else {
3596                    return Ok(None);
3597                };
3598                scope
3599                    .items
3600                    .first()
3601                    .map(|name| (name.column_name.clone(), NameQuality::High))
3602            }
3603            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3604            _ => None,
3605        })
3606    }
3607
3608    Ok(invent(ecx, expr, table_func_names)?.map(|(name, _quality)| name))
3609}
3610
3611#[derive(Debug)]
3612enum ExpandedSelectItem<'a> {
3613    InputOrdinal(usize),
3614    Expr(Cow<'a, Expr<Aug>>),
3615}
3616
3617impl ExpandedSelectItem<'_> {
3618    fn as_expr(&self) -> Option<&Expr<Aug>> {
3619        match self {
3620            ExpandedSelectItem::InputOrdinal(_) => None,
3621            ExpandedSelectItem::Expr(expr) => Some(expr),
3622        }
3623    }
3624}
3625
3626fn expand_select_item<'a>(
3627    ecx: &ExprContext,
3628    s: &'a SelectItem<Aug>,
3629    table_func_names: &BTreeMap<String, Ident>,
3630) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3631    match s {
3632        SelectItem::Expr {
3633            expr: Expr::QualifiedWildcard(table_name),
3634            alias: _,
3635        } => {
3636            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3637            let table_name =
3638                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3639            let out: Vec<_> = ecx
3640                .scope
3641                .items
3642                .iter()
3643                .enumerate()
3644                .filter(|(_i, item)| item.is_from_table(&table_name))
3645                .map(|(i, item)| {
3646                    let name = item.column_name.clone();
3647                    (ExpandedSelectItem::InputOrdinal(i), name)
3648                })
3649                .collect();
3650            if out.is_empty() {
3651                sql_bail!("no table named '{}' in scope", table_name);
3652            }
3653            Ok(out)
3654        }
3655        SelectItem::Expr {
3656            expr: Expr::WildcardAccess(sql_expr),
3657            alias: _,
3658        } => {
3659            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3660            // A bit silly to have to plan the expression here just to get its
3661            // type, since we throw away the planned expression, but fixing this
3662            // requires a separate semantic analysis phase. Luckily this is an
3663            // uncommon operation and the PostgreSQL docs have a warning that
3664            // this operation is slow in Postgres too.
3665            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3666            let fields = match ecx.scalar_type(&expr) {
3667                SqlScalarType::Record { fields, .. } => fields,
3668                ty => sql_bail!(
3669                    "type {} is not composite",
3670                    ecx.humanize_sql_scalar_type(&ty, false)
3671                ),
3672            };
3673            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3674            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3675                if let [name] = ident.as_slice() {
3676                    if let Ok(items) = ecx.scope.items_from_table(
3677                        &[],
3678                        &PartialItemName {
3679                            database: None,
3680                            schema: None,
3681                            item: name.as_str().to_string(),
3682                        },
3683                    ) {
3684                        for (_, item) in items {
3685                            if item
3686                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3687                            {
3688                                skip_cols.insert(item.column_name.clone());
3689                            }
3690                        }
3691                    }
3692                }
3693            }
3694            let items = fields
3695                .iter()
3696                .filter_map(|(name, _ty)| {
3697                    if skip_cols.contains(name) {
3698                        None
3699                    } else {
3700                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3701                            expr: sql_expr.clone(),
3702                            field: name.clone().into(),
3703                        }));
3704                        Some((item, name.clone()))
3705                    }
3706                })
3707                .collect();
3708            Ok(items)
3709        }
3710        SelectItem::Wildcard => {
3711            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3712            let items: Vec<_> = ecx
3713                .scope
3714                .items
3715                .iter()
3716                .enumerate()
3717                .filter(|(_i, item)| item.allow_unqualified_references)
3718                .map(|(i, item)| {
3719                    let name = item.column_name.clone();
3720                    (ExpandedSelectItem::InputOrdinal(i), name)
3721                })
3722                .collect();
3723
3724            Ok(items)
3725        }
3726        SelectItem::Expr { expr, alias } => {
3727            let name = match alias.clone().map(normalize::column_name) {
3728                Some(name) => name,
3729                None => invent_column_name(ecx, expr, table_func_names)?
3730                    .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into()),
3731            };
3732            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3733        }
3734    }
3735}
3736
3737fn plan_join(
3738    left_qcx: &QueryContext,
3739    left: HirRelationExpr,
3740    left_scope: Scope,
3741    join: &Join<Aug>,
3742) -> Result<(HirRelationExpr, Scope), PlanError> {
3743    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3744    let (kind, constraint) = match &join.join_operator {
3745        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3746        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3747        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3748        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3749        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3750    };
3751
3752    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3753    if !kind.can_be_correlated() {
3754        for item in &mut right_qcx.outer_scopes[0].items {
3755            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3756            // these items from scope. These items need to *exist* because they
3757            // might shadow variables in outer scopes that would otherwise be
3758            // valid to reference, but accessing them needs to produce an error.
3759            item.error_if_referenced =
3760                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3761                    table: table.cloned(),
3762                    column: column.clone(),
3763                });
3764        }
3765    }
3766    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3767
3768    let (expr, scope) = match constraint {
3769        JoinConstraint::On(expr) => {
3770            let product_scope = left_scope.product(right_scope)?;
3771            let ecx = &ExprContext {
3772                qcx: left_qcx,
3773                name: "ON clause",
3774                scope: &product_scope,
3775                relation_type: &SqlRelationType::new(
3776                    left_qcx
3777                        .relation_type(&left)
3778                        .column_types
3779                        .into_iter()
3780                        .chain(right_qcx.relation_type(&right).column_types)
3781                        .collect(),
3782                ),
3783                allow_aggregates: false,
3784                allow_subqueries: true,
3785                allow_parameters: true,
3786                allow_windows: false,
3787            };
3788            let on = plan_expr(ecx, expr)?.type_as(ecx, &SqlScalarType::Bool)?;
3789            let joined = left.join(right, on, kind);
3790            (joined, product_scope)
3791        }
3792        JoinConstraint::Using { columns, alias } => {
3793            let column_names = columns
3794                .iter()
3795                .map(|ident| normalize::column_name(ident.clone()))
3796                .collect::<Vec<_>>();
3797
3798            plan_using_constraint(
3799                &column_names,
3800                left_qcx,
3801                left,
3802                left_scope,
3803                &right_qcx,
3804                right,
3805                right_scope,
3806                kind,
3807                alias.as_ref(),
3808            )?
3809        }
3810        JoinConstraint::Natural => {
3811            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3812            // have the same scx. However, it doesn't hurt to be safe.
3813            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3814            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3815            let left_column_names = left_scope.column_names();
3816            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3817            let column_names: Vec<_> = left_column_names
3818                .filter(|col| right_column_names.contains(col))
3819                .cloned()
3820                .collect();
3821            plan_using_constraint(
3822                &column_names,
3823                left_qcx,
3824                left,
3825                left_scope,
3826                &right_qcx,
3827                right,
3828                right_scope,
3829                kind,
3830                None,
3831            )?
3832        }
3833    };
3834    Ok((expr, scope))
3835}
3836
3837// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3838#[allow(clippy::too_many_arguments)]
3839fn plan_using_constraint(
3840    column_names: &[ColumnName],
3841    left_qcx: &QueryContext,
3842    left: HirRelationExpr,
3843    left_scope: Scope,
3844    right_qcx: &QueryContext,
3845    right: HirRelationExpr,
3846    right_scope: Scope,
3847    kind: JoinKind,
3848    alias: Option<&Ident>,
3849) -> Result<(HirRelationExpr, Scope), PlanError> {
3850    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3851
3852    // Cargo culting PG here; no discernable reason this must fail, but PG does
3853    // so we do, as well.
3854    let mut unique_column_names = BTreeSet::new();
3855    for c in column_names {
3856        if !unique_column_names.insert(c) {
3857            return Err(PlanError::Unsupported {
3858                feature: format!(
3859                    "column name {} appears more than once in USING clause",
3860                    c.quoted()
3861                ),
3862                discussion_no: None,
3863            });
3864        }
3865    }
3866
3867    let alias_item_name = alias.map(|alias| PartialItemName {
3868        database: None,
3869        schema: None,
3870        item: alias.clone().to_string(),
3871    });
3872
3873    if let Some(alias_item_name) = &alias_item_name {
3874        for partial_item_name in both_scope.table_names() {
3875            if partial_item_name.matches(alias_item_name) {
3876                sql_bail!(
3877                    "table name \"{}\" specified more than once",
3878                    alias_item_name
3879                )
3880            }
3881        }
3882    }
3883
3884    let ecx = &ExprContext {
3885        qcx: right_qcx,
3886        name: "USING clause",
3887        scope: &both_scope,
3888        relation_type: &SqlRelationType::new(
3889            left_qcx
3890                .relation_type(&left)
3891                .column_types
3892                .into_iter()
3893                .chain(right_qcx.relation_type(&right).column_types)
3894                .collect(),
3895        ),
3896        allow_aggregates: false,
3897        allow_subqueries: false,
3898        allow_parameters: false,
3899        allow_windows: false,
3900    };
3901
3902    let mut join_exprs = vec![];
3903    let mut map_exprs = vec![];
3904    let mut new_items = vec![];
3905    let mut join_cols = vec![];
3906    let mut hidden_cols = vec![];
3907
3908    for column_name in column_names {
3909        // the two sides will have different names (e.g., `t1.a` and `t2.a`)
3910        let (lhs, lhs_name) = left_scope.resolve_using_column(
3911            column_name,
3912            JoinSide::Left,
3913            &mut left_qcx.name_manager.borrow_mut(),
3914        )?;
3915        let (mut rhs, rhs_name) = right_scope.resolve_using_column(
3916            column_name,
3917            JoinSide::Right,
3918            &mut right_qcx.name_manager.borrow_mut(),
3919        )?;
3920
3921        // Adjust the RHS reference to its post-join location.
3922        rhs.column += left_scope.len();
3923
3924        // Join keys must be resolved to same type.
3925        let mut exprs = coerce_homogeneous_exprs(
3926            &ecx.with_name(&format!(
3927                "NATURAL/USING join column {}",
3928                column_name.quoted()
3929            )),
3930            vec![
3931                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3932                    lhs,
3933                    Arc::clone(&lhs_name),
3934                )),
3935                CoercibleScalarExpr::Coerced(HirScalarExpr::named_column(
3936                    rhs,
3937                    Arc::clone(&rhs_name),
3938                )),
3939            ],
3940            None,
3941        )?;
3942        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3943
3944        match kind {
3945            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3946                join_cols.push(lhs.column);
3947                hidden_cols.push(rhs.column);
3948            }
3949            JoinKind::RightOuter => {
3950                join_cols.push(rhs.column);
3951                hidden_cols.push(lhs.column);
3952            }
3953            JoinKind::FullOuter => {
3954                // Create a new column that will be the coalesced value of left
3955                // and right.
3956                join_cols.push(both_scope.items.len() + map_exprs.len());
3957                hidden_cols.push(lhs.column);
3958                hidden_cols.push(rhs.column);
3959                map_exprs.push(HirScalarExpr::call_variadic(
3960                    Coalesce,
3961                    vec![expr1.clone(), expr2.clone()],
3962                ));
3963                new_items.push(ScopeItem::from_column_name(column_name));
3964            }
3965        }
3966
3967        // If a `join_using_alias` is present, add a new scope item that accepts
3968        // only table-qualified references for each specified join column.
3969        // Unlike regular table aliases, a `join_using_alias` should not hide the
3970        // names of the joined relations.
3971        if alias_item_name.is_some() {
3972            let new_item_col = both_scope.items.len() + new_items.len();
3973            join_cols.push(new_item_col);
3974            hidden_cols.push(new_item_col);
3975
3976            new_items.push(ScopeItem::from_name(
3977                alias_item_name.clone(),
3978                column_name.clone().to_string(),
3979            ));
3980
3981            // The aliased column `alias.col` must take the same value as the
3982            // unqualified join output column `col`. For INNER and LEFT joins
3983            // that's the LHS value, for RIGHT joins it's the RHS value, and
3984            // for FULL OUTER joins it's COALESCE(lhs, rhs). Using `lhs`
3985            // unconditionally produces wrong results for RIGHT/FULL joins on
3986            // rows where the LHS side is NULL.
3987            let alias_expr = match kind {
3988                JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3989                    HirScalarExpr::named_column(lhs, Arc::clone(&lhs_name))
3990                }
3991                JoinKind::RightOuter => HirScalarExpr::named_column(rhs, Arc::clone(&rhs_name)),
3992                JoinKind::FullOuter => {
3993                    HirScalarExpr::call_variadic(Coalesce, vec![expr1.clone(), expr2.clone()])
3994                }
3995            };
3996            map_exprs.push(alias_expr);
3997        }
3998
3999        join_exprs.push(expr1.call_binary(expr2, expr_func::Eq));
4000    }
4001    both_scope.items.extend(new_items);
4002
4003    // The columns from the secondary side of the join remain accessible by
4004    // their table-qualified name, but not by their column name alone. They are
4005    // also excluded from `SELECT *`.
4006    for c in hidden_cols {
4007        both_scope.items[c].allow_unqualified_references = false;
4008    }
4009
4010    // Reproject all returned elements to the front of the list.
4011    let project_key = join_cols
4012        .into_iter()
4013        .chain(0..both_scope.items.len())
4014        .unique()
4015        .collect::<Vec<_>>();
4016
4017    both_scope = both_scope.project(&project_key);
4018
4019    let on = HirScalarExpr::variadic_and(join_exprs);
4020
4021    let both = left
4022        .join(right, on, kind)
4023        .map(map_exprs)
4024        .project(project_key);
4025    Ok((both, both_scope))
4026}
4027
4028pub fn plan_expr<'a>(
4029    ecx: &'a ExprContext,
4030    e: &Expr<Aug>,
4031) -> Result<CoercibleScalarExpr, PlanError> {
4032    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
4033}
4034
4035fn plan_expr_inner<'a>(
4036    ecx: &'a ExprContext,
4037    e: &Expr<Aug>,
4038) -> Result<CoercibleScalarExpr, PlanError> {
4039    if let Some((i, item)) = ecx.scope.resolve_expr(e) {
4040        // We've already calculated this expression.
4041        return Ok(HirScalarExpr::named_column(
4042            i,
4043            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
4044        )
4045        .into());
4046    }
4047
4048    match e {
4049        // Names.
4050        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
4051            Ok(plan_identifier(ecx, names)?.into())
4052        }
4053
4054        // Literals.
4055        Expr::Value(val) => plan_literal(val),
4056        Expr::Parameter(n) => plan_parameter(ecx, *n),
4057        Expr::Array(exprs) => plan_array(ecx, exprs, None),
4058        Expr::List(exprs) => plan_list(ecx, exprs, None),
4059        Expr::Map(exprs) => plan_map(ecx, exprs, None),
4060        Expr::Row { exprs } => plan_row(ecx, exprs),
4061
4062        // Generalized functions, operators, and casts.
4063        Expr::Op { op, expr1, expr2 } => {
4064            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
4065        }
4066        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
4067        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
4068
4069        // Special functions and operators.
4070        Expr::Not { expr } => plan_not(ecx, expr),
4071        Expr::And { left, right } => plan_and(ecx, left, right),
4072        Expr::Or { left, right } => plan_or(ecx, left, right),
4073        Expr::IsExpr {
4074            expr,
4075            construct,
4076            negated,
4077        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
4078        Expr::Case {
4079            operand,
4080            conditions,
4081            results,
4082            else_result,
4083        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
4084        Expr::HomogenizingFunction { function, exprs } => {
4085            plan_homogenizing_function(ecx, function, exprs)
4086        }
4087        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
4088            ecx,
4089            &None,
4090            &[l_expr.clone().equals(*r_expr.clone())],
4091            &[Expr::null()],
4092            &Some(Box::new(*l_expr.clone())),
4093        )?
4094        .into()),
4095        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
4096        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
4097        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
4098        Expr::Like {
4099            expr,
4100            pattern,
4101            escape,
4102            case_insensitive,
4103            negated,
4104        } => Ok(plan_like(
4105            ecx,
4106            expr,
4107            pattern,
4108            escape.as_deref(),
4109            *case_insensitive,
4110            *negated,
4111        )?
4112        .into()),
4113
4114        Expr::InList {
4115            expr,
4116            list,
4117            negated,
4118        } => plan_in_list(ecx, expr, list, negated),
4119
4120        // Subqueries.
4121        Expr::Exists(query) => plan_exists(ecx, query),
4122        Expr::Subquery(query) => plan_subquery(ecx, query),
4123        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
4124        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
4125        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
4126        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
4127        Expr::Nested(_) => bail_internal!("Expr::Nested should have been desugared"),
4128        Expr::InSubquery { .. } => {
4129            bail_internal!("Expr::InSubquery should have been desugared")
4130        }
4131        Expr::AnyExpr { .. } => {
4132            bail_internal!("Expr::AnyExpr should have been desugared")
4133        }
4134        Expr::AllExpr { .. } => {
4135            bail_internal!("Expr::AllExpr should have been desugared")
4136        }
4137        Expr::AnySubquery { .. } => {
4138            bail_internal!("Expr::AnySubquery should have been desugared")
4139        }
4140        Expr::AllSubquery { .. } => {
4141            bail_internal!("Expr::AllSubquery should have been desugared")
4142        }
4143        Expr::Between { .. } => {
4144            bail_internal!("Expr::Between should have been desugared")
4145        }
4146    }
4147}
4148
4149fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
4150    if !ecx.allow_parameters {
4151        // It might be clearer to return an error like "cannot use parameter
4152        // here", but this is how PostgreSQL does it, and so for now we follow
4153        // PostgreSQL.
4154        return Err(PlanError::UnknownParameter(n));
4155    }
4156    if n == 0 || n > 65536 {
4157        return Err(PlanError::UnknownParameter(n));
4158    }
4159    if ecx.param_types().borrow().contains_key(&n) {
4160        Ok(HirScalarExpr::parameter(n).into())
4161    } else {
4162        Ok(CoercibleScalarExpr::Parameter(n))
4163    }
4164}
4165
4166fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
4167    let mut out = vec![];
4168    for e in exprs {
4169        out.push(plan_expr(ecx, e)?);
4170    }
4171    Ok(CoercibleScalarExpr::LiteralRecord(out))
4172}
4173
4174fn plan_cast(
4175    ecx: &ExprContext,
4176    expr: &Expr<Aug>,
4177    data_type: &ResolvedDataType,
4178) -> Result<CoercibleScalarExpr, PlanError> {
4179    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
4180    let expr = match expr {
4181        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
4182        // we can pass in the target type as a type hint. This is
4183        // a limited form of the coercion that we do for string literals
4184        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
4185        // handle ARRAY/LIST/MAP coercion too, but doing so causes
4186        // PostgreSQL compatibility trouble.
4187        //
4188        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
4189        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
4190        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
4191        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
4192        _ => plan_expr(ecx, expr)?,
4193    };
4194    let ecx = &ecx.with_name("CAST");
4195    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
4196    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
4197    Ok(expr.into())
4198}
4199
4200fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4201    let ecx = ecx.with_name("NOT argument");
4202    Ok(plan_expr(&ecx, expr)?
4203        .type_as(&ecx, &SqlScalarType::Bool)?
4204        .call_unary(UnaryFunc::Not(expr_func::Not))
4205        .into())
4206}
4207
4208fn plan_and(
4209    ecx: &ExprContext,
4210    left: &Expr<Aug>,
4211    right: &Expr<Aug>,
4212) -> Result<CoercibleScalarExpr, PlanError> {
4213    let ecx = ecx.with_name("AND argument");
4214    Ok(HirScalarExpr::variadic_and(vec![
4215        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4216        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4217    ])
4218    .into())
4219}
4220
4221fn plan_or(
4222    ecx: &ExprContext,
4223    left: &Expr<Aug>,
4224    right: &Expr<Aug>,
4225) -> Result<CoercibleScalarExpr, PlanError> {
4226    let ecx = ecx.with_name("OR argument");
4227    Ok(HirScalarExpr::variadic_or(vec![
4228        plan_expr(&ecx, left)?.type_as(&ecx, &SqlScalarType::Bool)?,
4229        plan_expr(&ecx, right)?.type_as(&ecx, &SqlScalarType::Bool)?,
4230    ])
4231    .into())
4232}
4233
4234fn plan_in_list(
4235    ecx: &ExprContext,
4236    lhs: &Expr<Aug>,
4237    list: &Vec<Expr<Aug>>,
4238    negated: &bool,
4239) -> Result<CoercibleScalarExpr, PlanError> {
4240    let ecx = ecx.with_name("IN list");
4241    let or = HirScalarExpr::variadic_or(
4242        list.into_iter()
4243            .map(|e| {
4244                let eq = lhs.clone().equals(e.clone());
4245                plan_expr(&ecx, &eq)?.type_as(&ecx, &SqlScalarType::Bool)
4246            })
4247            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4248    );
4249    Ok(if *negated {
4250        or.call_unary(UnaryFunc::Not(expr_func::Not))
4251    } else {
4252        or
4253    }
4254    .into())
4255}
4256
4257fn plan_homogenizing_function(
4258    ecx: &ExprContext,
4259    function: &HomogenizingFunction,
4260    exprs: &[Expr<Aug>],
4261) -> Result<CoercibleScalarExpr, PlanError> {
4262    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4263    let expr = HirScalarExpr::call_variadic(
4264        match function {
4265            HomogenizingFunction::Coalesce => VariadicFunc::from(Coalesce),
4266            HomogenizingFunction::Greatest => VariadicFunc::from(Greatest),
4267            HomogenizingFunction::Least => VariadicFunc::from(Least),
4268        },
4269        coerce_homogeneous_exprs(
4270            &ecx.with_name(&function.to_string().to_lowercase()),
4271            plan_exprs(ecx, exprs)?,
4272            None,
4273        )?,
4274    );
4275    Ok(expr.into())
4276}
4277
4278fn plan_field_access(
4279    ecx: &ExprContext,
4280    expr: &Expr<Aug>,
4281    field: &Ident,
4282) -> Result<CoercibleScalarExpr, PlanError> {
4283    let field = normalize::column_name(field.clone());
4284    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4285    let ty = ecx.scalar_type(&expr);
4286    let i = match &ty {
4287        SqlScalarType::Record { fields, .. } => {
4288            fields.iter().position(|(name, _ty)| *name == field)
4289        }
4290        ty => sql_bail!(
4291            "column notation applied to type {}, which is not a composite type",
4292            ecx.humanize_sql_scalar_type(ty, false)
4293        ),
4294    };
4295    match i {
4296        None => sql_bail!(
4297            "field {} not found in data type {}",
4298            field,
4299            ecx.humanize_sql_scalar_type(&ty, false)
4300        ),
4301        Some(i) => Ok(expr
4302            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4303            .into()),
4304    }
4305}
4306
4307fn plan_subscript(
4308    ecx: &ExprContext,
4309    expr: &Expr<Aug>,
4310    positions: &[SubscriptPosition<Aug>],
4311) -> Result<CoercibleScalarExpr, PlanError> {
4312    assert!(
4313        !positions.is_empty(),
4314        "subscript expression must contain at least one position"
4315    );
4316
4317    let ecx = &ecx.with_name("subscripting");
4318    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4319    let ty = ecx.scalar_type(&expr);
4320    match &ty {
4321        SqlScalarType::Array(..) | SqlScalarType::Int2Vector => plan_subscript_array(
4322            ecx,
4323            expr,
4324            positions,
4325            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4326            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4327            // track in its backing data).
4328            if ty == SqlScalarType::Int2Vector {
4329                1
4330            } else {
4331                0
4332            },
4333        ),
4334        SqlScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4335        SqlScalarType::List { element_type, .. } => {
4336            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4337            let elem_type_name = ecx.humanize_sql_scalar_type(element_type, false);
4338            let n_layers = ty.unwrap_list_n_layers();
4339            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4340        }
4341        ty => sql_bail!(
4342            "cannot subscript type {}",
4343            ecx.humanize_sql_scalar_type(ty, false)
4344        ),
4345    }
4346}
4347
4348// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4349// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4350// any were slices (i.e. included colon).
4351fn extract_scalar_subscript_from_positions<'a>(
4352    positions: &'a [SubscriptPosition<Aug>],
4353    expr_type_name: &str,
4354) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4355    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4356    for p in positions {
4357        if p.explicit_slice {
4358            sql_bail!("{} subscript does not support slices", expr_type_name);
4359        }
4360        assert!(
4361            p.end.is_none(),
4362            "index-appearing subscripts cannot have end value"
4363        );
4364        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4365    }
4366    Ok(scalar_subscripts)
4367}
4368
4369fn plan_subscript_array(
4370    ecx: &ExprContext,
4371    expr: HirScalarExpr,
4372    positions: &[SubscriptPosition<Aug>],
4373    offset: i64,
4374) -> Result<CoercibleScalarExpr, PlanError> {
4375    let mut exprs = Vec::with_capacity(positions.len() + 1);
4376    exprs.push(expr);
4377
4378    // Subscripting arrays doesn't yet support slicing, so we always want to
4379    // extract scalars or error.
4380    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4381
4382    for i in indexes {
4383        exprs.push(plan_expr(ecx, i)?.cast_to(
4384            ecx,
4385            CastContext::Explicit,
4386            &SqlScalarType::Int64,
4387        )?);
4388    }
4389
4390    Ok(HirScalarExpr::call_variadic(ArrayIndex { offset }, exprs).into())
4391}
4392
4393fn plan_subscript_list(
4394    ecx: &ExprContext,
4395    mut expr: HirScalarExpr,
4396    positions: &[SubscriptPosition<Aug>],
4397    mut remaining_layers: usize,
4398    elem_type_name: &str,
4399) -> Result<CoercibleScalarExpr, PlanError> {
4400    let mut i = 0;
4401
4402    while i < positions.len() {
4403        // Take all contiguous index operations, i.e. find next slice operation.
4404        let j = positions[i..]
4405            .iter()
4406            .position(|p| p.explicit_slice)
4407            .unwrap_or(positions.len() - i);
4408        if j != 0 {
4409            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4410            let (n, e) = plan_index_list(
4411                ecx,
4412                expr,
4413                indexes.as_slice(),
4414                remaining_layers,
4415                elem_type_name,
4416            )?;
4417            remaining_layers = n;
4418            expr = e;
4419            i += j;
4420        }
4421
4422        // Take all contiguous slice operations, i.e. find next index operation.
4423        let j = positions[i..]
4424            .iter()
4425            .position(|p| !p.explicit_slice)
4426            .unwrap_or(positions.len() - i);
4427        if j != 0 {
4428            expr = plan_slice_list(
4429                ecx,
4430                expr,
4431                &positions[i..i + j],
4432                remaining_layers,
4433                elem_type_name,
4434            )?;
4435            i += j;
4436        }
4437    }
4438
4439    Ok(expr.into())
4440}
4441
4442fn plan_index_list(
4443    ecx: &ExprContext,
4444    expr: HirScalarExpr,
4445    indexes: &[&Expr<Aug>],
4446    n_layers: usize,
4447    elem_type_name: &str,
4448) -> Result<(usize, HirScalarExpr), PlanError> {
4449    let depth = indexes.len();
4450
4451    if depth > n_layers {
4452        if n_layers == 0 {
4453            sql_bail!("cannot subscript type {}", elem_type_name)
4454        } else {
4455            sql_bail!(
4456                "cannot index into {} layers; list only has {} layer{}",
4457                depth,
4458                n_layers,
4459                if n_layers == 1 { "" } else { "s" }
4460            )
4461        }
4462    }
4463
4464    let mut exprs = Vec::with_capacity(depth + 1);
4465    exprs.push(expr);
4466
4467    for i in indexes {
4468        exprs.push(plan_expr(ecx, i)?.cast_to(
4469            ecx,
4470            CastContext::Explicit,
4471            &SqlScalarType::Int64,
4472        )?);
4473    }
4474
4475    Ok((
4476        n_layers - depth,
4477        HirScalarExpr::call_variadic(ListIndex, exprs),
4478    ))
4479}
4480
4481fn plan_slice_list(
4482    ecx: &ExprContext,
4483    expr: HirScalarExpr,
4484    slices: &[SubscriptPosition<Aug>],
4485    n_layers: usize,
4486    elem_type_name: &str,
4487) -> Result<HirScalarExpr, PlanError> {
4488    if n_layers == 0 {
4489        sql_bail!("cannot subscript type {}", elem_type_name)
4490    }
4491
4492    // first arg will be list
4493    let mut exprs = Vec::with_capacity(slices.len() + 1);
4494    exprs.push(expr);
4495    // extract (start, end) parts from collected slices
4496    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4497        Ok(match position {
4498            Some(p) => {
4499                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &SqlScalarType::Int64)?
4500            }
4501            None => HirScalarExpr::literal(Datum::Int64(default), SqlScalarType::Int64),
4502        })
4503    };
4504    for p in slices {
4505        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4506        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4507        exprs.push(start);
4508        exprs.push(end);
4509    }
4510
4511    Ok(HirScalarExpr::call_variadic(ListSliceLinear, exprs))
4512}
4513
4514fn plan_like(
4515    ecx: &ExprContext,
4516    expr: &Expr<Aug>,
4517    pattern: &Expr<Aug>,
4518    escape: Option<&Expr<Aug>>,
4519    case_insensitive: bool,
4520    not: bool,
4521) -> Result<HirScalarExpr, PlanError> {
4522    use CastContext::Implicit;
4523    let ecx = ecx.with_name("LIKE argument");
4524    let expr = plan_expr(&ecx, expr)?;
4525    let haystack = match ecx.scalar_type(&expr) {
4526        CoercibleScalarType::Coerced(ref ty @ SqlScalarType::Char { length }) => expr
4527            .type_as(&ecx, ty)?
4528            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4529        _ => expr.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4530    };
4531    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?;
4532    if let Some(escape) = escape {
4533        pattern = pattern.call_binary(
4534            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &SqlScalarType::String)?,
4535            expr_func::LikeEscape,
4536        );
4537    }
4538    let func: BinaryFunc = if case_insensitive {
4539        expr_func::IsLikeMatchCaseInsensitive.into()
4540    } else {
4541        expr_func::IsLikeMatchCaseSensitive.into()
4542    };
4543    let like = haystack.call_binary(pattern, func);
4544    if not {
4545        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4546    } else {
4547        Ok(like)
4548    }
4549}
4550
4551fn plan_subscript_jsonb(
4552    ecx: &ExprContext,
4553    expr: HirScalarExpr,
4554    positions: &[SubscriptPosition<Aug>],
4555) -> Result<CoercibleScalarExpr, PlanError> {
4556    use CastContext::Implicit;
4557    use SqlScalarType::{Int64, String};
4558
4559    // JSONB doesn't support the slicing syntax, so simply error if you
4560    // encounter any explicit slices.
4561    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4562
4563    let mut exprs = Vec::with_capacity(subscripts.len());
4564    for s in subscripts {
4565        let subscript = plan_expr(ecx, s)?;
4566        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4567            subscript
4568        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4569            // Integers are converted to a string here and then re-parsed as an
4570            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4571            // do it.
4572            typeconv::to_string(ecx, subscript)
4573        } else {
4574            sql_bail!("jsonb subscript type must be coercible to integer or text");
4575        };
4576        exprs.push(subscript);
4577    }
4578
4579    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4580    // `expr->subscript` as you might expect.
4581    let expr = expr.call_binary(
4582        HirScalarExpr::call_variadic(
4583            ArrayCreate {
4584                elem_type: SqlScalarType::String,
4585            },
4586            exprs,
4587        ),
4588        expr_func::JsonbGetPath,
4589    );
4590    Ok(expr.into())
4591}
4592
4593fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4594    if !ecx.allow_subqueries {
4595        sql_bail!("{} does not allow subqueries", ecx.name)
4596    }
4597    let mut qcx = ecx.derived_query_context();
4598    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4599    Ok(expr.exists().into())
4600}
4601
4602fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4603    if !ecx.allow_subqueries {
4604        sql_bail!("{} does not allow subqueries", ecx.name)
4605    }
4606    let mut qcx = ecx.derived_query_context();
4607    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4608    let column_types = qcx.relation_type(&expr).column_types;
4609    if column_types.len() != 1 {
4610        sql_bail!(
4611            "Expected subselect to return 1 column, got {} columns",
4612            column_types.len()
4613        );
4614    }
4615    Ok(expr.select().into())
4616}
4617
4618fn plan_list_subquery(
4619    ecx: &ExprContext,
4620    query: &Query<Aug>,
4621) -> Result<CoercibleScalarExpr, PlanError> {
4622    plan_vector_like_subquery(
4623        ecx,
4624        query,
4625        |_| false,
4626        |elem_type| ListCreate { elem_type }.into(),
4627        |order_by| AggregateFunc::ListConcat { order_by },
4628        expr_func::ListListConcat.into(),
4629        |elem_type| {
4630            HirScalarExpr::literal(
4631                Datum::empty_list(),
4632                SqlScalarType::List {
4633                    element_type: Box::new(elem_type),
4634                    custom_id: None,
4635                },
4636            )
4637        },
4638        "list",
4639    )
4640}
4641
4642fn plan_array_subquery(
4643    ecx: &ExprContext,
4644    query: &Query<Aug>,
4645) -> Result<CoercibleScalarExpr, PlanError> {
4646    plan_vector_like_subquery(
4647        ecx,
4648        query,
4649        |elem_type| {
4650            matches!(
4651                elem_type,
4652                SqlScalarType::Char { .. }
4653                    | SqlScalarType::Array { .. }
4654                    | SqlScalarType::List { .. }
4655                    | SqlScalarType::Map { .. }
4656            )
4657        },
4658        |elem_type| ArrayCreate { elem_type }.into(),
4659        |order_by| AggregateFunc::ArrayConcat { order_by },
4660        expr_func::ArrayArrayConcat.into(),
4661        |elem_type| {
4662            HirScalarExpr::literal(
4663                Datum::empty_array(),
4664                SqlScalarType::Array(Box::new(elem_type)),
4665            )
4666        },
4667        "[]",
4668    )
4669}
4670
4671/// Generic function used to plan both array subqueries and list subqueries
4672fn plan_vector_like_subquery<F1, F2, F3, F4>(
4673    ecx: &ExprContext,
4674    query: &Query<Aug>,
4675    is_unsupported_type: F1,
4676    vector_create: F2,
4677    aggregate_concat: F3,
4678    binary_concat: BinaryFunc,
4679    empty_literal: F4,
4680    vector_type_string: &str,
4681) -> Result<CoercibleScalarExpr, PlanError>
4682where
4683    F1: Fn(&SqlScalarType) -> bool,
4684    F2: Fn(SqlScalarType) -> VariadicFunc,
4685    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4686    F4: Fn(SqlScalarType) -> HirScalarExpr,
4687{
4688    if !ecx.allow_subqueries {
4689        sql_bail!("{} does not allow subqueries", ecx.name)
4690    }
4691
4692    let mut qcx = ecx.derived_query_context();
4693    let mut planned_query = plan_query(&mut qcx, query)?;
4694    if planned_query.limit.is_some()
4695        || !planned_query
4696            .offset
4697            .clone()
4698            .try_into_literal_int64()
4699            .is_ok_and(|offset| offset == 0)
4700    {
4701        planned_query.expr = HirRelationExpr::top_k(
4702            planned_query.expr,
4703            vec![],
4704            planned_query.order_by.clone(),
4705            planned_query.limit,
4706            planned_query.offset,
4707            planned_query.group_size_hints.limit_input_group_size,
4708        );
4709    }
4710
4711    if planned_query.project.len() != 1 {
4712        sql_bail!(
4713            "Expected subselect to return 1 column, got {} columns",
4714            planned_query.project.len()
4715        );
4716    }
4717
4718    let project_column = *planned_query.project.get(0).unwrap();
4719    let elem_type = qcx
4720        .relation_type(&planned_query.expr)
4721        .column_types
4722        .get(project_column)
4723        .cloned()
4724        .unwrap()
4725        .scalar_type();
4726
4727    if is_unsupported_type(&elem_type) {
4728        bail_unsupported!(format!(
4729            "cannot build array from subquery because return type {}{}",
4730            ecx.humanize_sql_scalar_type(&elem_type, false),
4731            vector_type_string
4732        ));
4733    }
4734
4735    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4736    // subquery above.
4737    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4738        vector_create(elem_type.clone()),
4739        vec![HirScalarExpr::column(project_column)],
4740    ))
4741    .chain(
4742        planned_query
4743            .order_by
4744            .iter()
4745            .map(|co| HirScalarExpr::column(co.column)),
4746    )
4747    .collect();
4748
4749    // However, column references for `aggregation_projection` and `aggregation_order_by`
4750    // are with reference to the `exprs` of the aggregation expression.  Here that is
4751    // `aggregation_exprs`.
4752    let aggregation_projection = vec![0];
4753    let aggregation_order_by = planned_query
4754        .order_by
4755        .into_iter()
4756        .enumerate()
4757        .map(|(i, order)| ColumnOrder { column: i, ..order })
4758        .collect();
4759
4760    let reduced_expr = planned_query
4761        .expr
4762        .reduce(
4763            vec![],
4764            vec![AggregateExpr {
4765                func: aggregate_concat(aggregation_order_by),
4766                expr: Box::new(HirScalarExpr::call_variadic(
4767                    RecordCreate {
4768                        field_names: iter::repeat(ColumnName::from(""))
4769                            .take(aggregation_exprs.len())
4770                            .collect(),
4771                    },
4772                    aggregation_exprs,
4773                )),
4774                distinct: false,
4775            }],
4776            None,
4777        )
4778        .project(aggregation_projection);
4779
4780    // If `expr` has no rows, return an empty array/list rather than NULL.
4781    Ok(reduced_expr
4782        .select()
4783        .call_binary(empty_literal(elem_type), binary_concat)
4784        .into())
4785}
4786
4787fn plan_map_subquery(
4788    ecx: &ExprContext,
4789    query: &Query<Aug>,
4790) -> Result<CoercibleScalarExpr, PlanError> {
4791    if !ecx.allow_subqueries {
4792        sql_bail!("{} does not allow subqueries", ecx.name)
4793    }
4794
4795    let mut qcx = ecx.derived_query_context();
4796    let mut query = plan_query(&mut qcx, query)?;
4797    if query.limit.is_some()
4798        || !query
4799            .offset
4800            .clone()
4801            .try_into_literal_int64()
4802            .is_ok_and(|offset| offset == 0)
4803    {
4804        query.expr = HirRelationExpr::top_k(
4805            query.expr,
4806            vec![],
4807            query.order_by.clone(),
4808            query.limit,
4809            query.offset,
4810            query.group_size_hints.limit_input_group_size,
4811        );
4812    }
4813    if query.project.len() != 2 {
4814        sql_bail!(
4815            "expected map subquery to return 2 columns, got {} columns",
4816            query.project.len()
4817        );
4818    }
4819
4820    let query_types = qcx.relation_type(&query.expr).column_types;
4821    let key_column = query.project[0];
4822    let key_type = query_types[key_column].clone().scalar_type();
4823    let value_column = query.project[1];
4824    let value_type = query_types[value_column].clone().scalar_type();
4825
4826    if key_type != SqlScalarType::String {
4827        sql_bail!("cannot build map from subquery because first column is not of type text");
4828    }
4829
4830    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::call_variadic(
4831        RecordCreate {
4832            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4833        },
4834        vec![
4835            HirScalarExpr::column(key_column),
4836            HirScalarExpr::column(value_column),
4837        ],
4838    ))
4839    .chain(
4840        query
4841            .order_by
4842            .iter()
4843            .map(|co| HirScalarExpr::column(co.column)),
4844    )
4845    .collect();
4846
4847    let expr = query
4848        .expr
4849        .reduce(
4850            vec![],
4851            vec![AggregateExpr {
4852                func: AggregateFunc::MapAgg {
4853                    order_by: query
4854                        .order_by
4855                        .into_iter()
4856                        .enumerate()
4857                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4858                        .collect(),
4859                    value_type: value_type.clone(),
4860                },
4861                expr: Box::new(HirScalarExpr::call_variadic(
4862                    RecordCreate {
4863                        field_names: iter::repeat(ColumnName::from(""))
4864                            .take(aggregation_exprs.len())
4865                            .collect(),
4866                    },
4867                    aggregation_exprs,
4868                )),
4869                distinct: false,
4870            }],
4871            None,
4872        )
4873        .project(vec![0]);
4874
4875    // If `expr` has no rows, return an empty map rather than NULL.
4876    let expr = HirScalarExpr::call_variadic(
4877        Coalesce,
4878        vec![
4879            expr.select(),
4880            HirScalarExpr::literal(
4881                Datum::empty_map(),
4882                SqlScalarType::Map {
4883                    value_type: Box::new(value_type),
4884                    custom_id: None,
4885                },
4886            ),
4887        ],
4888    );
4889
4890    Ok(expr.into())
4891}
4892
4893fn plan_collate(
4894    ecx: &ExprContext,
4895    expr: &Expr<Aug>,
4896    collation: &UnresolvedItemName,
4897) -> Result<CoercibleScalarExpr, PlanError> {
4898    if collation.0.len() == 2
4899        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4900        && collation.0[1] == ident!("default")
4901    {
4902        plan_expr(ecx, expr)
4903    } else {
4904        bail_unsupported!("COLLATE");
4905    }
4906}
4907
4908/// Plans a slice of expressions.
4909///
4910/// This function is a simple convenience function for mapping [`plan_expr`]
4911/// over a slice of expressions. The planned expressions are returned in the
4912/// same order as the input. If any of the expressions fail to plan, returns an
4913/// error instead.
4914fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4915where
4916    E: std::borrow::Borrow<Expr<Aug>>,
4917{
4918    let mut out = vec![];
4919    for expr in exprs {
4920        out.push(plan_expr(ecx, expr.borrow())?);
4921    }
4922    Ok(out)
4923}
4924
4925/// Plans an `ARRAY` expression.
4926fn plan_array(
4927    ecx: &ExprContext,
4928    exprs: &[Expr<Aug>],
4929    type_hint: Option<&SqlScalarType>,
4930) -> Result<CoercibleScalarExpr, PlanError> {
4931    // Plan each element expression.
4932    let mut out = vec![];
4933    for expr in exprs {
4934        out.push(match expr {
4935            // Special case nested ARRAY expressions so we can plumb
4936            // the type hint through.
4937            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4938            _ => plan_expr(ecx, expr)?,
4939        });
4940    }
4941
4942    // Attempt to make use of the type hint.
4943    let type_hint = match type_hint {
4944        // The user has provided an explicit cast to an array type. We know the
4945        // element type to coerce to. Need to be careful, though: if there's
4946        // evidence that any of the array elements are themselves arrays, we
4947        // want to coerce to the array type, not the element type.
4948        Some(SqlScalarType::Array(elem_type)) => {
4949            let multidimensional = out.iter().any(|e| {
4950                matches!(
4951                    ecx.scalar_type(e),
4952                    CoercibleScalarType::Coerced(SqlScalarType::Array(_))
4953                )
4954            });
4955            if multidimensional {
4956                type_hint
4957            } else {
4958                Some(&**elem_type)
4959            }
4960        }
4961        // The user provided an explicit cast to a non-array type. We'll have to
4962        // guess what the correct type for the array. Our caller will then
4963        // handle converting that array type to the desired non-array type.
4964        Some(_) => None,
4965        // No type hint. We'll have to guess the correct type for the array.
4966        None => None,
4967    };
4968
4969    // Coerce all elements to the same type.
4970    let (elem_type, exprs) = if exprs.is_empty() {
4971        if let Some(elem_type) = type_hint {
4972            (elem_type.clone(), vec![])
4973        } else {
4974            sql_bail!("cannot determine type of empty array");
4975        }
4976    } else {
4977        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4978        (ecx.scalar_type(&out[0]), out)
4979    };
4980
4981    // Arrays of `char` type are disallowed due to a known limitation:
4982    // https://github.com/MaterializeInc/database-issues/issues/2360.
4983    //
4984    // Arrays of `list` and `map` types are disallowed due to mind-bending
4985    // semantics.
4986    if matches!(
4987        elem_type,
4988        SqlScalarType::Char { .. } | SqlScalarType::List { .. } | SqlScalarType::Map { .. }
4989    ) {
4990        bail_unsupported!(format!(
4991            "{}[]",
4992            ecx.humanize_sql_scalar_type(&elem_type, false)
4993        ));
4994    }
4995
4996    Ok(HirScalarExpr::call_variadic(ArrayCreate { elem_type }, exprs).into())
4997}
4998
4999fn plan_list(
5000    ecx: &ExprContext,
5001    exprs: &[Expr<Aug>],
5002    type_hint: Option<&SqlScalarType>,
5003) -> Result<CoercibleScalarExpr, PlanError> {
5004    let (elem_type, exprs) = if exprs.is_empty() {
5005        if let Some(SqlScalarType::List { element_type, .. }) = type_hint {
5006            (element_type.without_modifiers(), vec![])
5007        } else {
5008            sql_bail!("cannot determine type of empty list");
5009        }
5010    } else {
5011        let type_hint = match type_hint {
5012            Some(SqlScalarType::List { element_type, .. }) => Some(&**element_type),
5013            _ => None,
5014        };
5015
5016        let mut out = vec![];
5017        for expr in exprs {
5018            out.push(match expr {
5019                // Special case nested LIST expressions so we can plumb
5020                // the type hint through.
5021                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
5022                _ => plan_expr(ecx, expr)?,
5023            });
5024        }
5025        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
5026        (ecx.scalar_type(&out[0]).without_modifiers(), out)
5027    };
5028
5029    if matches!(elem_type, SqlScalarType::Char { .. }) {
5030        bail_unsupported!("char list");
5031    }
5032
5033    Ok(HirScalarExpr::call_variadic(ListCreate { elem_type }, exprs).into())
5034}
5035
5036fn plan_map(
5037    ecx: &ExprContext,
5038    entries: &[MapEntry<Aug>],
5039    type_hint: Option<&SqlScalarType>,
5040) -> Result<CoercibleScalarExpr, PlanError> {
5041    let (value_type, exprs) = if entries.is_empty() {
5042        if let Some(SqlScalarType::Map { value_type, .. }) = type_hint {
5043            (value_type.without_modifiers(), vec![])
5044        } else {
5045            sql_bail!("cannot determine type of empty map");
5046        }
5047    } else {
5048        let type_hint = match type_hint {
5049            Some(SqlScalarType::Map { value_type, .. }) => Some(&**value_type),
5050            _ => None,
5051        };
5052
5053        let mut keys = vec![];
5054        let mut values = vec![];
5055        for MapEntry { key, value } in entries {
5056            let key = plan_expr(ecx, key)?.type_as(ecx, &SqlScalarType::String)?;
5057            let value = match value {
5058                // Special case nested MAP expressions so we can plumb
5059                // the type hint through.
5060                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
5061                _ => plan_expr(ecx, value)?,
5062            };
5063            keys.push(key);
5064            values.push(value);
5065        }
5066        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
5067        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
5068        let out = itertools::interleave(keys, values).collect();
5069        (value_type, out)
5070    };
5071
5072    if matches!(value_type, SqlScalarType::Char { .. }) {
5073        bail_unsupported!("char map");
5074    }
5075
5076    let expr = HirScalarExpr::call_variadic(MapBuild { value_type }, exprs);
5077    Ok(expr.into())
5078}
5079
5080/// Coerces a list of expressions such that all input expressions will be cast
5081/// to the same type. If successful, returns a new list of expressions in the
5082/// same order as the input, where each expression has the appropriate casts to
5083/// make them all of a uniform type.
5084///
5085/// If `force_type` is `Some`, the expressions are forced to the specified type
5086/// via an explicit cast. Otherwise the best common type is guessed via
5087/// [`typeconv::guess_best_common_type`] and conversions are attempted via
5088/// implicit casts
5089///
5090/// Note that this is our implementation of Postgres' type conversion for
5091/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
5092/// isn't yet used in all of those cases.
5093///
5094/// [union-type-conv]:
5095/// https://www.postgresql.org/docs/12/typeconv-union-case.html
5096pub fn coerce_homogeneous_exprs(
5097    ecx: &ExprContext,
5098    exprs: Vec<CoercibleScalarExpr>,
5099    force_type: Option<&SqlScalarType>,
5100) -> Result<Vec<HirScalarExpr>, PlanError> {
5101    assert!(!exprs.is_empty());
5102
5103    let target_holder;
5104    let target = match force_type {
5105        Some(t) => t,
5106        None => {
5107            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
5108            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
5109            &target_holder
5110        }
5111    };
5112
5113    // Try to cast all expressions to `target`.
5114    let mut out = Vec::new();
5115    for expr in exprs {
5116        let arg = typeconv::plan_coerce(ecx, expr, target)?;
5117        let ccx = match force_type {
5118            None => CastContext::Implicit,
5119            Some(_) => CastContext::Explicit,
5120        };
5121        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
5122            Ok(expr) => out.push(expr),
5123            Err(_) => sql_bail!(
5124                "{} could not convert type {} to {}",
5125                ecx.name,
5126                ecx.humanize_sql_scalar_type(&ecx.scalar_type(&arg), false),
5127                ecx.humanize_sql_scalar_type(target, false),
5128            ),
5129        }
5130    }
5131    Ok(out)
5132}
5133
5134/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
5135/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
5136pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
5137    obe: &OrderByExpr<T>,
5138    column: usize,
5139) -> ColumnOrder {
5140    let desc = !obe.asc.unwrap_or(true);
5141    ColumnOrder {
5142        column,
5143        desc,
5144        // https://www.postgresql.org/docs/14/queries-order.html
5145        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
5146        nulls_last: obe.nulls_last.unwrap_or(!desc),
5147    }
5148}
5149
5150/// Plans the ORDER BY clause of a window function.
5151///
5152/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
5153/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
5154/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
5155/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
5156/// `Vec<HirScalarExpr>` that we return.
5157fn plan_function_order_by(
5158    ecx: &ExprContext,
5159    order_by: &[OrderByExpr<Aug>],
5160) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
5161    let mut order_by_exprs = vec![];
5162    let mut col_orders = vec![];
5163    {
5164        for (i, obe) in order_by.iter().enumerate() {
5165            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
5166            // do not support ordinal references in PostgreSQL. So we use
5167            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
5168            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
5169            order_by_exprs.push(expr);
5170            col_orders.push(resolve_desc_and_nulls_last(obe, i));
5171        }
5172    }
5173    Ok((order_by_exprs, col_orders))
5174}
5175
5176/// Returns a human-readable rendering of `name`, falling back to a debug
5177/// dump of the `ResolvedItemName` if humanization fails. Used to construct
5178/// user-facing error messages on already-failing paths, where we'd rather
5179/// surface the raw resolved name than emit a useless `<unknown>` placeholder.
5180fn humanize_or_debug(scx: &StatementContext, name: &ResolvedItemName) -> String {
5181    scx.humanize_resolved_name(name)
5182        .map(|n| n.to_string())
5183        .unwrap_or_else(|_| format!("<error when trying to humanize `{name:?}`>"))
5184}
5185
5186/// Common part of the planning of windowed and non-windowed aggregation functions.
5187fn plan_aggregate_common(
5188    ecx: &ExprContext,
5189    Function::<Aug> {
5190        name,
5191        args,
5192        filter,
5193        over: _,
5194        distinct,
5195    }: &Function<Aug>,
5196) -> Result<AggregateExpr, PlanError> {
5197    // Normal aggregate functions, like `sum`, expect as input a single expression
5198    // which yields the datum to aggregate. Order sensitive aggregate functions,
5199    // like `jsonb_agg`, are special, and instead expect a Record whose first
5200    // element yields the datum to aggregate and whose successive elements yield
5201    // keys to order by. This expectation is hard coded within the implementation
5202    // of each of the order-sensitive aggregates. The specification of how many
5203    // order by keys to consider, and in what order, is passed via the `order_by`
5204    // field on the `AggregateFunc` variant.
5205
5206    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
5207    // most, so explicitly drop it if the function doesn't care about order. This
5208    // prevents the projection into Record below from triggering on unsupported
5209    // functions.
5210
5211    let impls = match resolve_func(ecx, name, args)? {
5212        Func::Aggregate(impls) => impls,
5213        _ => bail_internal!("plan_aggregate_common called on non-aggregate function"),
5214    };
5215
5216    // We follow PostgreSQL's rule here for mapping `count(*)` into the
5217    // generalized function selection framework. The rule is simple: the user
5218    // must type `count(*)`, but the function selection framework sees an empty
5219    // parameter list, as if the user had typed `count()`. But if the user types
5220    // `count()` directly, that is an error. Like PostgreSQL, we apply these
5221    // rules to all aggregates, not just `count`, since we may one day support
5222    // user-defined aggregates, including user-defined aggregates that take no
5223    // parameters.
5224    let (args, order_by) = match &args {
5225        FunctionArgs::Star => (vec![], vec![]),
5226        FunctionArgs::Args { args, order_by } => {
5227            if args.is_empty() {
5228                sql_bail!(
5229                    "{}(*) must be used to call a parameterless aggregate function",
5230                    humanize_or_debug(ecx.qcx.scx, name)
5231                );
5232            }
5233            let args = plan_exprs(ecx, args)?;
5234            (args, order_by.clone())
5235        }
5236    };
5237
5238    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5239
5240    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5241    if let Some(filter) = &filter {
5242        // If a filter is present, as in
5243        //
5244        //     <agg>(<expr>) FILTER (WHERE <cond>)
5245        //
5246        // we plan it by essentially rewriting the expression to
5247        //
5248        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5249        //
5250        // where <identity> is the identity input for <agg>.
5251        let cond =
5252            plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &SqlScalarType::Bool)?;
5253        let expr_typ = ecx.scalar_type(&expr);
5254        expr = HirScalarExpr::if_then_else(
5255            cond,
5256            expr,
5257            HirScalarExpr::literal(func.identity_datum(), expr_typ),
5258        );
5259    }
5260
5261    let mut seen_outer = false;
5262    let mut seen_inner = false;
5263    #[allow(deprecated)]
5264    expr.visit_columns(0, &mut |depth, col| {
5265        if depth == 0 && col.level == 0 {
5266            seen_inner = true;
5267        } else if col.level > depth {
5268            seen_outer = true;
5269        }
5270    });
5271    if seen_outer && !seen_inner {
5272        bail_unsupported!(
5273            3720,
5274            "aggregate functions that refer exclusively to outer columns"
5275        );
5276    }
5277
5278    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5279    // map the needed expressions into the aggregate datum.
5280    if func.is_order_sensitive() {
5281        let field_names = iter::repeat(ColumnName::from(""))
5282            .take(1 + order_by_exprs.len())
5283            .collect();
5284        let mut exprs = vec![expr];
5285        exprs.extend(order_by_exprs);
5286        expr = HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs);
5287    }
5288
5289    Ok(AggregateExpr {
5290        func,
5291        expr: Box::new(expr),
5292        distinct: *distinct,
5293    })
5294}
5295
5296fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5297    let mut names = names.to_vec();
5298    // The parser guarantees that an `Expr::Identifier` is constructed with a
5299    // non-empty list of name parts, so an empty list here is an internal bug.
5300    let Some(last) = names.pop() else {
5301        bail_internal!("empty identifier");
5302    };
5303    let col_name = normalize::column_name(last);
5304
5305    // If the name is qualified, it must refer to a column in a table.
5306    if !names.is_empty() {
5307        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5308        let (i, i_name) = ecx.scope.resolve_table_column(
5309            &ecx.qcx.outer_scopes,
5310            &table_name,
5311            &col_name,
5312            &mut ecx.qcx.name_manager.borrow_mut(),
5313        )?;
5314        return Ok(HirScalarExpr::named_column(i, i_name));
5315    }
5316
5317    // If the name is unqualified, first check if it refers to a column. Track any similar names
5318    // that might exist for a better error message.
5319    let similar_names = match ecx.scope.resolve_column(
5320        &ecx.qcx.outer_scopes,
5321        &col_name,
5322        &mut ecx.qcx.name_manager.borrow_mut(),
5323    ) {
5324        Ok((i, i_name)) => {
5325            return Ok(HirScalarExpr::named_column(i, i_name));
5326        }
5327        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5328        Err(e) => return Err(e),
5329    };
5330
5331    // The name doesn't refer to a column. Check if it is a whole-row reference
5332    // to a table.
5333    let items = ecx.scope.items_from_table(
5334        &ecx.qcx.outer_scopes,
5335        &PartialItemName {
5336            database: None,
5337            schema: None,
5338            item: col_name.as_str().to_owned(),
5339        },
5340    )?;
5341    match items.as_slice() {
5342        // The name doesn't refer to a table either. Return an error.
5343        [] => Err(PlanError::UnknownColumn {
5344            table: None,
5345            column: col_name,
5346            similar: similar_names,
5347        }),
5348        // The name refers to a table that is the result of a function that
5349        // returned a single column. Per PostgreSQL, this is a special case
5350        // that returns the value directly.
5351        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5352        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::named_column(
5353            *column,
5354            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5355        )),
5356        // The name refers to a normal table. Return a record containing all the
5357        // columns of the table.
5358        _ => {
5359            let mut has_exists_column = None;
5360            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5361                .into_iter()
5362                .filter_map(|(column, item)| {
5363                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5364                        has_exists_column = Some(column);
5365                        None
5366                    } else {
5367                        let expr = HirScalarExpr::named_column(
5368                            column,
5369                            ecx.qcx.name_manager.borrow_mut().intern_scope_item(item),
5370                        );
5371                        let name = item.column_name.clone();
5372                        Some((expr, name))
5373                    }
5374                })
5375                .unzip();
5376            // For the special case of a table function with a single column, the single column is instead not wrapped.
5377            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5378                exprs.into_element()
5379            } else {
5380                HirScalarExpr::call_variadic(RecordCreate { field_names }, exprs)
5381            };
5382            if let Some(has_exists_column) = has_exists_column {
5383                Ok(HirScalarExpr::if_then_else(
5384                    HirScalarExpr::unnamed_column(has_exists_column)
5385                        .call_unary(UnaryFunc::IsNull(mz_expr::func::IsNull)),
5386                    HirScalarExpr::literal_null(ecx.scalar_type(&expr)),
5387                    expr,
5388                ))
5389            } else {
5390                Ok(expr)
5391            }
5392        }
5393    }
5394}
5395
5396fn plan_op(
5397    ecx: &ExprContext,
5398    op: &str,
5399    expr1: &Expr<Aug>,
5400    expr2: Option<&Expr<Aug>>,
5401) -> Result<HirScalarExpr, PlanError> {
5402    let impls = func::resolve_op(op)?;
5403    let args = match expr2 {
5404        None => plan_exprs(ecx, &[expr1])?,
5405        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5406    };
5407    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5408}
5409
5410fn plan_function<'a>(
5411    ecx: &ExprContext,
5412    f @ Function {
5413        name,
5414        args,
5415        filter,
5416        over,
5417        distinct,
5418    }: &'a Function<Aug>,
5419) -> Result<HirScalarExpr, PlanError> {
5420    let impls = match resolve_func(ecx, name, args)? {
5421        Func::Table(_) => {
5422            sql_bail!(
5423                "table functions are not allowed in {} (function {})",
5424                ecx.name,
5425                name
5426            );
5427        }
5428        Func::Scalar(impls) => {
5429            if over.is_some() {
5430                sql_bail!(
5431                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5432                );
5433            }
5434            impls
5435        }
5436        Func::ScalarWindow(impls) => {
5437            let (
5438                ignore_nulls,
5439                order_by_exprs,
5440                col_orders,
5441                _window_frame,
5442                partition_by,
5443                scalar_args,
5444            ) = plan_window_function_non_aggr(ecx, f)?;
5445
5446            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5447            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5448            // msg there is less informative.)
5449            if !scalar_args.is_empty() {
5450                if let ResolvedItemName::Item {
5451                    full_name: FullItemName { item, .. },
5452                    ..
5453                } = name
5454                {
5455                    sql_bail!(
5456                        "function {} has 0 parameters, but was called with {}",
5457                        item,
5458                        scalar_args.len()
5459                    );
5460                }
5461            }
5462
5463            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5464            // accept a window frame here without an error msg. (Postgres also does this.)
5465            // TODO: maybe we should give a notice
5466
5467            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5468
5469            if ignore_nulls {
5470                // If we ever add a scalar window function that supports ignore, then don't forget
5471                // to also update HIR EXPLAIN.
5472                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5473            }
5474
5475            return Ok(HirScalarExpr::windowing(WindowExpr {
5476                func: WindowExprType::Scalar(ScalarWindowExpr {
5477                    func,
5478                    order_by: col_orders,
5479                }),
5480                partition_by,
5481                order_by: order_by_exprs,
5482            }));
5483        }
5484        Func::ValueWindow(impls) => {
5485            let window_plan = plan_window_function_non_aggr(ecx, f)?;
5486            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, win_args) =
5487                window_plan;
5488
5489            let (args_encoded, func) =
5490                func::select_impl(ecx, FuncSpec::Func(name), impls, win_args, vec![])?;
5491
5492            if ignore_nulls {
5493                match func {
5494                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5495                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5496                }
5497            }
5498
5499            return Ok(HirScalarExpr::windowing(WindowExpr {
5500                func: WindowExprType::Value(ValueWindowExpr {
5501                    func,
5502                    args: Box::new(args_encoded),
5503                    order_by: col_orders,
5504                    window_frame,
5505                    ignore_nulls, // (RESPECT NULLS is the default)
5506                }),
5507                partition_by,
5508                order_by: order_by_exprs,
5509            }));
5510        }
5511        Func::Aggregate(_) => {
5512            if f.over.is_none() {
5513                // Not a window aggregate. Something is wrong.
5514                if ecx.allow_aggregates {
5515                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5516                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5517                    sql_bail!(
5518                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5519                        name,
5520                    );
5521                } else {
5522                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5523                    // because it was in an unsupported context.
5524                    sql_bail!(
5525                        "aggregate functions are not allowed in {} (function {})",
5526                        ecx.name,
5527                        name
5528                    );
5529                }
5530            } else {
5531                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5532                    plan_window_function_common(ecx, &f.name, &f.over)?;
5533
5534                // https://github.com/MaterializeInc/database-issues/issues/6720
5535                match (&window_frame.start_bound, &window_frame.end_bound) {
5536                    (
5537                        mz_expr::WindowFrameBound::UnboundedPreceding,
5538                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5539                    )
5540                    | (
5541                        mz_expr::WindowFrameBound::UnboundedPreceding,
5542                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5543                    )
5544                    | (
5545                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5546                        mz_expr::WindowFrameBound::UnboundedFollowing,
5547                    )
5548                    | (
5549                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5550                        mz_expr::WindowFrameBound::UnboundedFollowing,
5551                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5552                    (_, _) => {} // other cases are ok
5553                }
5554
5555                if ignore_nulls {
5556                    // https://github.com/MaterializeInc/database-issues/issues/6722
5557                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5558                    // forget to also update HIR EXPLAIN.
5559                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5560                }
5561
5562                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5563
5564                if aggregate_expr.distinct {
5565                    // https://github.com/MaterializeInc/database-issues/issues/6626
5566                    bail_unsupported!("DISTINCT in window aggregates");
5567                }
5568
5569                return Ok(HirScalarExpr::windowing(WindowExpr {
5570                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5571                        aggregate_expr,
5572                        order_by: col_orders,
5573                        window_frame,
5574                    }),
5575                    partition_by,
5576                    order_by: order_by_exprs,
5577                }));
5578            }
5579        }
5580    };
5581
5582    if over.is_some() {
5583        bail_internal!("OVER clause should have been handled by the window function path above");
5584    }
5585
5586    if *distinct {
5587        sql_bail!(
5588            "DISTINCT specified, but {} is not an aggregate function",
5589            humanize_or_debug(ecx.qcx.scx, name)
5590        );
5591    }
5592    if filter.is_some() {
5593        sql_bail!(
5594            "FILTER specified, but {} is not an aggregate function",
5595            humanize_or_debug(ecx.qcx.scx, name)
5596        );
5597    }
5598
5599    let scalar_args = match &args {
5600        FunctionArgs::Star => {
5601            sql_bail!(
5602                "* argument is invalid with non-aggregate function {}",
5603                humanize_or_debug(ecx.qcx.scx, name)
5604            )
5605        }
5606        FunctionArgs::Args { args, order_by } => {
5607            if !order_by.is_empty() {
5608                sql_bail!(
5609                    "ORDER BY specified, but {} is not an aggregate function",
5610                    humanize_or_debug(ecx.qcx.scx, name)
5611                );
5612            }
5613            plan_exprs(ecx, args)?
5614        }
5615    };
5616
5617    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5618}
5619
5620pub const IGNORE_NULLS_ERROR_MSG: &str =
5621    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5622
5623/// Resolves the name to a set of function implementations.
5624///
5625/// If the name does not specify a known built-in function, returns an error.
5626pub fn resolve_func(
5627    ecx: &ExprContext,
5628    name: &ResolvedItemName,
5629    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5630) -> Result<&'static Func, PlanError> {
5631    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5632        if let Ok(f) = i.func() {
5633            return Ok(f);
5634        }
5635    }
5636
5637    // Couldn't resolve function with this name, so generate verbose error
5638    // message.
5639    let cexprs = match args {
5640        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5641        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5642            if !order_by.is_empty() {
5643                sql_bail!(
5644                    "ORDER BY specified, but {} is not an aggregate function",
5645                    name
5646                );
5647            }
5648            plan_exprs(ecx, args)?
5649        }
5650    };
5651
5652    let arg_types: Vec<_> = cexprs
5653        .into_iter()
5654        .map(|ty| match ecx.scalar_type(&ty) {
5655            CoercibleScalarType::Coerced(ty) => ecx.humanize_sql_scalar_type(&ty, false),
5656            CoercibleScalarType::Record(_) => "record".to_string(),
5657            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5658        })
5659        .collect();
5660
5661    Err(PlanError::UnknownFunction {
5662        name: name.to_string(),
5663        arg_types,
5664    })
5665}
5666
5667fn plan_is_expr<'a>(
5668    ecx: &ExprContext,
5669    expr: &'a Expr<Aug>,
5670    construct: &IsExprConstruct<Aug>,
5671    not: bool,
5672) -> Result<HirScalarExpr, PlanError> {
5673    let expr_hir = plan_expr(ecx, expr)?;
5674
5675    let mut result = match construct {
5676        IsExprConstruct::Null => {
5677            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5678            // at odds with our type coercion rules, which treat `NULL` literals
5679            // and unconstrained parameters identically. Providing a type hint
5680            // of string means we wind up supporting both.
5681            expr_hir.type_as_any(ecx)?.call_is_null()
5682        }
5683        IsExprConstruct::Unknown => expr_hir.type_as(ecx, &SqlScalarType::Bool)?.call_is_null(),
5684        IsExprConstruct::True => expr_hir
5685            .type_as(ecx, &SqlScalarType::Bool)?
5686            .call_unary(UnaryFunc::IsTrue(expr_func::IsTrue)),
5687        IsExprConstruct::False => expr_hir
5688            .type_as(ecx, &SqlScalarType::Bool)?
5689            .call_unary(UnaryFunc::IsFalse(expr_func::IsFalse)),
5690        IsExprConstruct::DistinctFrom(expr2) => {
5691            // There are three cases:
5692            // 1. Both terms are non-null, in which case the result should be `a != b`.
5693            // 2. Exactly one term is null, in which case the result should be true.
5694            // 3. Both terms are null, in which case the result should be false.
5695            //
5696            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5697
5698            // We'll need `expr != expr2`, but don't just construct this HIR directly. Instead,
5699            // construct an AST expression for `expr != expr2` and plan it to get proper type
5700            // checking, implicit casts, etc. (This seems to be also what Postgres does.)
5701            let ne_ast = expr.clone().not_equals(expr2.as_ref().clone());
5702            let ne_hir = plan_expr(ecx, &ne_ast)?.type_as_any(ecx)?;
5703
5704            let expr1_hir = expr_hir.type_as_any(ecx)?;
5705            let expr2_hir = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5706
5707            let term1 = HirScalarExpr::variadic_or(vec![
5708                ne_hir,
5709                expr1_hir.clone().call_is_null(),
5710                expr2_hir.clone().call_is_null(),
5711            ]);
5712            let term2 = HirScalarExpr::variadic_or(vec![
5713                expr1_hir.call_is_null().not(),
5714                expr2_hir.call_is_null().not(),
5715            ]);
5716            term1.and(term2)
5717        }
5718    };
5719    if not {
5720        result = result.not();
5721    }
5722    Ok(result)
5723}
5724
5725fn plan_case<'a>(
5726    ecx: &ExprContext,
5727    operand: &'a Option<Box<Expr<Aug>>>,
5728    conditions: &'a [Expr<Aug>],
5729    results: &'a [Expr<Aug>],
5730    else_result: &'a Option<Box<Expr<Aug>>>,
5731) -> Result<HirScalarExpr, PlanError> {
5732    let mut cond_exprs = Vec::new();
5733    let mut result_exprs = Vec::new();
5734    for (c, r) in conditions.iter().zip_eq(results) {
5735        let c = match operand {
5736            Some(operand) => operand.clone().equals(c.clone()),
5737            None => c.clone(),
5738        };
5739        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &SqlScalarType::Bool)?;
5740        cond_exprs.push(cexpr);
5741        result_exprs.push(r);
5742    }
5743    result_exprs.push(match else_result {
5744        Some(else_result) => else_result,
5745        None => &Expr::Value(Value::Null),
5746    });
5747    let mut result_exprs = coerce_homogeneous_exprs(
5748        &ecx.with_name("CASE"),
5749        plan_exprs(ecx, &result_exprs)?,
5750        None,
5751    )?;
5752    let mut expr = result_exprs.pop().unwrap();
5753    assert_eq!(cond_exprs.len(), result_exprs.len());
5754    for (cexpr, rexpr) in cond_exprs
5755        .into_iter()
5756        .rev()
5757        .zip_eq(result_exprs.into_iter().rev())
5758    {
5759        expr = HirScalarExpr::if_then_else(cexpr, rexpr, expr);
5760    }
5761    Ok(expr)
5762}
5763
5764fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5765    let (datum, scalar_type) = match l {
5766        Value::Number(s) => {
5767            let d = strconv::parse_numeric(s.as_str())?;
5768            if !s.contains(&['E', '.'][..]) {
5769                // Maybe representable as an int?
5770                if let Ok(n) = d.0.try_into() {
5771                    (Datum::Int32(n), SqlScalarType::Int32)
5772                } else if let Ok(n) = d.0.try_into() {
5773                    (Datum::Int64(n), SqlScalarType::Int64)
5774                } else {
5775                    (
5776                        Datum::Numeric(d),
5777                        SqlScalarType::Numeric { max_scale: None },
5778                    )
5779                }
5780            } else {
5781                (
5782                    Datum::Numeric(d),
5783                    SqlScalarType::Numeric { max_scale: None },
5784                )
5785            }
5786        }
5787        Value::HexString(_) => bail_unsupported!("hex string literals"),
5788        Value::Boolean(b) => match b {
5789            false => (Datum::False, SqlScalarType::Bool),
5790            true => (Datum::True, SqlScalarType::Bool),
5791        },
5792        Value::Interval(i) => {
5793            let i = literal::plan_interval(i)?;
5794            (Datum::Interval(i), SqlScalarType::Interval)
5795        }
5796        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5797        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5798    };
5799    let expr = HirScalarExpr::literal(datum, scalar_type);
5800    Ok(expr.into())
5801}
5802
5803/// The common part of the planning of non-aggregate window functions, i.e.,
5804/// scalar window functions and value window functions.
5805fn plan_window_function_non_aggr<'a>(
5806    ecx: &ExprContext,
5807    Function {
5808        name,
5809        args,
5810        filter,
5811        over,
5812        distinct,
5813    }: &'a Function<Aug>,
5814) -> Result<
5815    (
5816        bool,
5817        Vec<HirScalarExpr>,
5818        Vec<ColumnOrder>,
5819        mz_expr::WindowFrame,
5820        Vec<HirScalarExpr>,
5821        Vec<CoercibleScalarExpr>,
5822    ),
5823    PlanError,
5824> {
5825    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5826        plan_window_function_common(ecx, name, over)?;
5827
5828    if *distinct {
5829        sql_bail!(
5830            "DISTINCT specified, but {} is not an aggregate function",
5831            name
5832        );
5833    }
5834
5835    if filter.is_some() {
5836        bail_unsupported!("FILTER in non-aggregate window functions");
5837    }
5838
5839    let scalar_args = match &args {
5840        FunctionArgs::Star => {
5841            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5842        }
5843        FunctionArgs::Args { args, order_by } => {
5844            if !order_by.is_empty() {
5845                sql_bail!(
5846                    "ORDER BY specified, but {} is not an aggregate function",
5847                    name
5848                );
5849            }
5850            plan_exprs(ecx, args)?
5851        }
5852    };
5853
5854    Ok((
5855        ignore_nulls,
5856        order_by_exprs,
5857        col_orders,
5858        window_frame,
5859        partition,
5860        scalar_args,
5861    ))
5862}
5863
5864/// The common part of the planning of all window functions.
5865fn plan_window_function_common(
5866    ecx: &ExprContext,
5867    name: &<Aug as AstInfo>::ItemName,
5868    over: &Option<WindowSpec<Aug>>,
5869) -> Result<
5870    (
5871        bool,
5872        Vec<HirScalarExpr>,
5873        Vec<ColumnOrder>,
5874        mz_expr::WindowFrame,
5875        Vec<HirScalarExpr>,
5876    ),
5877    PlanError,
5878> {
5879    if !ecx.allow_windows {
5880        sql_bail!(
5881            "window functions are not allowed in {} (function {})",
5882            ecx.name,
5883            name
5884        );
5885    }
5886
5887    let window_spec = match over.as_ref() {
5888        Some(over) => over,
5889        None => sql_bail!("window function {} requires an OVER clause", name),
5890    };
5891    if window_spec.ignore_nulls && window_spec.respect_nulls {
5892        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5893    }
5894    let window_frame = match window_spec.window_frame.as_ref() {
5895        Some(frame) => plan_window_frame(frame)?,
5896        None => mz_expr::WindowFrame::default(),
5897    };
5898    let mut partition = Vec::new();
5899    for expr in &window_spec.partition_by {
5900        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5901    }
5902
5903    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5904
5905    Ok((
5906        window_spec.ignore_nulls,
5907        order_by_exprs,
5908        col_orders,
5909        window_frame,
5910        partition,
5911    ))
5912}
5913
5914fn plan_window_frame(
5915    WindowFrame {
5916        units,
5917        start_bound,
5918        end_bound,
5919    }: &WindowFrame,
5920) -> Result<mz_expr::WindowFrame, PlanError> {
5921    use mz_expr::WindowFrameBound::*;
5922    let units = window_frame_unit_ast_to_expr(units)?;
5923    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5924    let end_bound = end_bound
5925        .as_ref()
5926        .map(window_frame_bound_ast_to_expr)
5927        .unwrap_or(CurrentRow);
5928
5929    // Validate bounds according to Postgres rules
5930    match (&start_bound, &end_bound) {
5931        // Start bound can't be UNBOUNDED FOLLOWING
5932        (UnboundedFollowing, _) => {
5933            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5934        }
5935        // End bound can't be UNBOUNDED PRECEDING
5936        (_, UnboundedPreceding) => {
5937            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5938        }
5939        // Start bound should come before end bound in the list of bound definitions
5940        (CurrentRow, OffsetPreceding(_)) => {
5941            sql_bail!("frame starting from current row cannot have preceding rows")
5942        }
5943        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5944            sql_bail!("frame starting from following row cannot have preceding rows")
5945        }
5946        // The above rules are adopted from Postgres.
5947        // The following rules are Materialize-specific.
5948        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5949            // Note that the only hard limit is that partition size + offset should fit in i64, so
5950            // in theory, we could support much larger offsets than this. But for our current
5951            // performance, even 1000000 is quite big.
5952            if *o1 > 1000000 || *o2 > 1000000 {
5953                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5954            }
5955        }
5956        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5957            if *o1 > 1000000 || *o2 > 1000000 {
5958                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5959            }
5960        }
5961        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5962            if *o1 > 1000000 || *o2 > 1000000 {
5963                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5964            }
5965        }
5966        (OffsetPreceding(o), CurrentRow) => {
5967            if *o > 1000000 {
5968                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5969            }
5970        }
5971        (CurrentRow, OffsetFollowing(o)) => {
5972            if *o > 1000000 {
5973                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5974            }
5975        }
5976        // Other bounds are valid
5977        (_, _) => (),
5978    }
5979
5980    // RANGE is only supported in the default frame
5981    // https://github.com/MaterializeInc/database-issues/issues/6585
5982    if units == mz_expr::WindowFrameUnits::Range
5983        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5984    {
5985        bail_unsupported!("RANGE in non-default window frames")
5986    }
5987
5988    let frame = mz_expr::WindowFrame {
5989        units,
5990        start_bound,
5991        end_bound,
5992    };
5993    Ok(frame)
5994}
5995
5996fn window_frame_unit_ast_to_expr(
5997    unit: &WindowFrameUnits,
5998) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5999    match unit {
6000        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
6001        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
6002        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
6003    }
6004}
6005
6006fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
6007    match bound {
6008        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
6009        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
6010        WindowFrameBound::Preceding(Some(offset)) => {
6011            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
6012        }
6013        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
6014        WindowFrameBound::Following(Some(offset)) => {
6015            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
6016        }
6017    }
6018}
6019
6020pub fn scalar_type_from_sql(
6021    scx: &StatementContext,
6022    data_type: &ResolvedDataType,
6023) -> Result<SqlScalarType, PlanError> {
6024    match data_type {
6025        ResolvedDataType::AnonymousList(elem_type) => {
6026            let elem_type = scalar_type_from_sql(scx, elem_type)?;
6027            if matches!(elem_type, SqlScalarType::Char { .. }) {
6028                bail_unsupported!("char list");
6029            }
6030            Ok(SqlScalarType::List {
6031                element_type: Box::new(elem_type),
6032                custom_id: None,
6033            })
6034        }
6035        ResolvedDataType::AnonymousMap {
6036            key_type,
6037            value_type,
6038        } => {
6039            match scalar_type_from_sql(scx, key_type)? {
6040                SqlScalarType::String => {}
6041                other => sql_bail!(
6042                    "map key type must be {}, got {}",
6043                    scx.humanize_sql_scalar_type(&SqlScalarType::String, false),
6044                    scx.humanize_sql_scalar_type(&other, false)
6045                ),
6046            }
6047            Ok(SqlScalarType::Map {
6048                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
6049                custom_id: None,
6050            })
6051        }
6052        ResolvedDataType::Named { id, modifiers, .. } => {
6053            scalar_type_from_catalog(scx.catalog, *id, modifiers)
6054        }
6055        ResolvedDataType::Error => bail_internal!("should have been caught in name resolution"),
6056    }
6057}
6058
6059pub fn scalar_type_from_catalog(
6060    catalog: &dyn SessionCatalog,
6061    id: CatalogItemId,
6062    modifiers: &[i64],
6063) -> Result<SqlScalarType, PlanError> {
6064    let entry = catalog.get_item(&id);
6065    let type_details = match entry.type_details() {
6066        Some(type_details) => type_details,
6067        None => {
6068            // Resolution should never produce a `ResolvedDataType::Named` with
6069            // an ID of a non-type, but we error gracefully just in case.
6070            sql_bail!(
6071                "internal error: {} does not refer to a type",
6072                catalog.resolve_full_name(entry.name()).to_string().quoted()
6073            );
6074        }
6075    };
6076    match &type_details.typ {
6077        CatalogType::Numeric => {
6078            let mut modifiers = modifiers.iter().fuse();
6079            let precision = match modifiers.next() {
6080                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
6081                    sql_bail!(
6082                        "precision for type numeric must be between 1 and {}",
6083                        NUMERIC_DATUM_MAX_PRECISION,
6084                    );
6085                }
6086                Some(p) => Some(*p),
6087                None => None,
6088            };
6089            let scale = match modifiers.next() {
6090                Some(scale) => {
6091                    if let Some(precision) = precision {
6092                        if *scale > precision {
6093                            sql_bail!(
6094                                "scale for type numeric must be between 0 and precision {}",
6095                                precision
6096                            );
6097                        }
6098                    }
6099                    Some(NumericMaxScale::try_from(*scale)?)
6100                }
6101                None => None,
6102            };
6103            if modifiers.next().is_some() {
6104                sql_bail!("type numeric supports at most two type modifiers");
6105            }
6106            Ok(SqlScalarType::Numeric { max_scale: scale })
6107        }
6108        CatalogType::Char => {
6109            let mut modifiers = modifiers.iter().fuse();
6110            let length = match modifiers.next() {
6111                Some(l) => Some(CharLength::try_from(*l)?),
6112                None => Some(CharLength::ONE),
6113            };
6114            if modifiers.next().is_some() {
6115                sql_bail!("type character supports at most one type modifier");
6116            }
6117            Ok(SqlScalarType::Char { length })
6118        }
6119        CatalogType::VarChar => {
6120            let mut modifiers = modifiers.iter().fuse();
6121            let length = match modifiers.next() {
6122                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
6123                None => None,
6124            };
6125            if modifiers.next().is_some() {
6126                sql_bail!("type character varying supports at most one type modifier");
6127            }
6128            Ok(SqlScalarType::VarChar { max_length: length })
6129        }
6130        CatalogType::Timestamp => {
6131            let mut modifiers = modifiers.iter().fuse();
6132            let precision = match modifiers.next() {
6133                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6134                None => None,
6135            };
6136            if modifiers.next().is_some() {
6137                sql_bail!("type timestamp supports at most one type modifier");
6138            }
6139            Ok(SqlScalarType::Timestamp { precision })
6140        }
6141        CatalogType::TimestampTz => {
6142            let mut modifiers = modifiers.iter().fuse();
6143            let precision = match modifiers.next() {
6144                Some(p) => Some(TimestampPrecision::try_from(*p)?),
6145                None => None,
6146            };
6147            if modifiers.next().is_some() {
6148                sql_bail!("type timestamp with time zone supports at most one type modifier");
6149            }
6150            Ok(SqlScalarType::TimestampTz { precision })
6151        }
6152        t => {
6153            if !modifiers.is_empty() {
6154                sql_bail!(
6155                    "{} does not support type modifiers",
6156                    catalog.resolve_full_name(entry.name()).to_string()
6157                );
6158            }
6159            match t {
6160                CatalogType::Array {
6161                    element_reference: element_id,
6162                } => Ok(SqlScalarType::Array(Box::new(scalar_type_from_catalog(
6163                    catalog,
6164                    *element_id,
6165                    modifiers,
6166                )?))),
6167                CatalogType::List {
6168                    element_reference: element_id,
6169                    element_modifiers,
6170                } => Ok(SqlScalarType::List {
6171                    element_type: Box::new(scalar_type_from_catalog(
6172                        catalog,
6173                        *element_id,
6174                        element_modifiers,
6175                    )?),
6176                    custom_id: Some(id),
6177                }),
6178                CatalogType::Map {
6179                    key_reference: _,
6180                    key_modifiers: _,
6181                    value_reference: value_id,
6182                    value_modifiers,
6183                } => Ok(SqlScalarType::Map {
6184                    value_type: Box::new(scalar_type_from_catalog(
6185                        catalog,
6186                        *value_id,
6187                        value_modifiers,
6188                    )?),
6189                    custom_id: Some(id),
6190                }),
6191                CatalogType::Range {
6192                    element_reference: element_id,
6193                } => Ok(SqlScalarType::Range {
6194                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
6195                }),
6196                CatalogType::Record { fields } => {
6197                    let scalars: Box<[(ColumnName, SqlColumnType)]> = fields
6198                        .iter()
6199                        .map(|f| {
6200                            let scalar_type = scalar_type_from_catalog(
6201                                catalog,
6202                                f.type_reference,
6203                                &f.type_modifiers,
6204                            )?;
6205                            Ok((
6206                                f.name.clone(),
6207                                SqlColumnType {
6208                                    scalar_type,
6209                                    nullable: true,
6210                                },
6211                            ))
6212                        })
6213                        .collect::<Result<Box<_>, PlanError>>()?;
6214                    Ok(SqlScalarType::Record {
6215                        fields: scalars,
6216                        custom_id: Some(id),
6217                    })
6218                }
6219                CatalogType::AclItem => Ok(SqlScalarType::AclItem),
6220                CatalogType::Bool => Ok(SqlScalarType::Bool),
6221                CatalogType::Bytes => Ok(SqlScalarType::Bytes),
6222                CatalogType::Date => Ok(SqlScalarType::Date),
6223                CatalogType::Float32 => Ok(SqlScalarType::Float32),
6224                CatalogType::Float64 => Ok(SqlScalarType::Float64),
6225                CatalogType::Int16 => Ok(SqlScalarType::Int16),
6226                CatalogType::Int32 => Ok(SqlScalarType::Int32),
6227                CatalogType::Int64 => Ok(SqlScalarType::Int64),
6228                CatalogType::UInt16 => Ok(SqlScalarType::UInt16),
6229                CatalogType::UInt32 => Ok(SqlScalarType::UInt32),
6230                CatalogType::UInt64 => Ok(SqlScalarType::UInt64),
6231                CatalogType::MzTimestamp => Ok(SqlScalarType::MzTimestamp),
6232                CatalogType::Interval => Ok(SqlScalarType::Interval),
6233                CatalogType::Jsonb => Ok(SqlScalarType::Jsonb),
6234                CatalogType::Oid => Ok(SqlScalarType::Oid),
6235                CatalogType::PgLegacyChar => Ok(SqlScalarType::PgLegacyChar),
6236                CatalogType::PgLegacyName => Ok(SqlScalarType::PgLegacyName),
6237                CatalogType::Pseudo => {
6238                    sql_bail!(
6239                        "cannot reference pseudo type {}",
6240                        catalog.resolve_full_name(entry.name()).to_string()
6241                    )
6242                }
6243                CatalogType::RegClass => Ok(SqlScalarType::RegClass),
6244                CatalogType::RegProc => Ok(SqlScalarType::RegProc),
6245                CatalogType::RegType => Ok(SqlScalarType::RegType),
6246                CatalogType::String => Ok(SqlScalarType::String),
6247                CatalogType::Time => Ok(SqlScalarType::Time),
6248                CatalogType::Uuid => Ok(SqlScalarType::Uuid),
6249                CatalogType::Int2Vector => Ok(SqlScalarType::Int2Vector),
6250                CatalogType::MzAclItem => Ok(SqlScalarType::MzAclItem),
6251                CatalogType::Numeric => unreachable!("handled above"),
6252                CatalogType::Char => unreachable!("handled above"),
6253                CatalogType::VarChar => unreachable!("handled above"),
6254                CatalogType::Timestamp => unreachable!("handled above"),
6255                CatalogType::TimestampTz => unreachable!("handled above"),
6256            }
6257        }
6258    }
6259}
6260
6261/// This is used to collect aggregates and table functions from within an `Expr`.
6262/// See the explanation of aggregate handling at the top of the file for more details.
6263struct AggregateTableFuncVisitor<'a> {
6264    scx: &'a StatementContext<'a>,
6265    aggs: Vec<Function<Aug>>,
6266    within_aggregate: bool,
6267    tables: BTreeMap<Function<Aug>, String>,
6268    table_disallowed_context: Vec<&'static str>,
6269    in_select_item: bool,
6270    id_gen: IdGen,
6271    err: Option<PlanError>,
6272}
6273
6274impl<'a> AggregateTableFuncVisitor<'a> {
6275    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6276        AggregateTableFuncVisitor {
6277            scx,
6278            aggs: Vec::new(),
6279            within_aggregate: false,
6280            tables: BTreeMap::new(),
6281            table_disallowed_context: Vec::new(),
6282            in_select_item: false,
6283            id_gen: Default::default(),
6284            err: None,
6285        }
6286    }
6287
6288    fn into_result(
6289        self,
6290    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6291        match self.err {
6292            Some(err) => Err(err),
6293            None => {
6294                // Dedup while preserving the order. We don't care what the order is, but it
6295                // has to be reproducible so that EXPLAIN PLAN tests work.
6296                let mut seen = BTreeSet::new();
6297                let aggs = self
6298                    .aggs
6299                    .into_iter()
6300                    .filter(move |agg| seen.insert(agg.clone()))
6301                    .collect();
6302                Ok((aggs, self.tables))
6303            }
6304        }
6305    }
6306}
6307
6308impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6309    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6310        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6311            Ok(i) => i,
6312            // Catching missing functions later in planning improves error messages.
6313            Err(_) => return,
6314        };
6315
6316        match item.func() {
6317            // We don't want to collect window aggregations, because these will be handled not by
6318            // plan_aggregate, but by plan_function.
6319            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6320                if self.within_aggregate {
6321                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6322                    return;
6323                }
6324                self.aggs.push(func.clone());
6325                let Function {
6326                    name: _,
6327                    args,
6328                    filter,
6329                    over: _,
6330                    distinct: _,
6331                } = func;
6332                if let Some(filter) = filter {
6333                    self.visit_expr_mut(filter);
6334                }
6335                let old_within_aggregate = self.within_aggregate;
6336                self.within_aggregate = true;
6337                self.table_disallowed_context
6338                    .push("aggregate function calls");
6339
6340                self.visit_function_args_mut(args);
6341
6342                self.within_aggregate = old_within_aggregate;
6343                self.table_disallowed_context.pop();
6344            }
6345            Ok(Func::Table { .. }) => {
6346                self.table_disallowed_context.push("other table functions");
6347                visit_mut::visit_function_mut(self, func);
6348                self.table_disallowed_context.pop();
6349            }
6350            _ => visit_mut::visit_function_mut(self, func),
6351        }
6352    }
6353
6354    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6355        // Don't go into subqueries.
6356    }
6357
6358    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6359        let (disallowed_context, func) = match expr {
6360            Expr::Case { .. } => (Some("CASE"), None),
6361            Expr::HomogenizingFunction {
6362                function: HomogenizingFunction::Coalesce,
6363                ..
6364            } => (Some("COALESCE"), None),
6365            Expr::Function(func) if self.in_select_item => {
6366                // If we're in a SELECT list, replace table functions with a uuid identifier
6367                // and save the table func so it can be planned elsewhere.
6368                let mut table_func = None;
6369                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6370                    if let Ok(Func::Table { .. }) = item.func() {
6371                        if let Some(context) = self.table_disallowed_context.last() {
6372                            self.err = Some(sql_err!(
6373                                "table functions are not allowed in {} (function {})",
6374                                context,
6375                                func.name
6376                            ));
6377                            return;
6378                        }
6379                        table_func = Some(func.clone());
6380                    }
6381                }
6382                // Since we will descend into the table func below, don't add its own disallow
6383                // context here, instead use visit_function to set that.
6384                (None, table_func)
6385            }
6386            _ => (None, None),
6387        };
6388        if let Some(func) = func {
6389            // Since we are trading out expr, we need to visit the table func here.
6390            visit_mut::visit_expr_mut(self, expr);
6391            // Don't attempt to replace table functions with unsupported syntax.
6392            if let Function {
6393                name: _,
6394                args: _,
6395                filter: None,
6396                over: None,
6397                distinct: false,
6398            } = &func
6399            {
6400                // Identical table functions can be de-duplicated.
6401                let unique_id = self.id_gen.allocate_id();
6402                let id = self
6403                    .tables
6404                    .entry(func)
6405                    .or_insert_with(|| format!("table_func_{unique_id}"));
6406                // We know this is okay because id is is 11 characters + <=20 characters, which is
6407                // less than our max length.
6408                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6409            }
6410        }
6411        if let Some(context) = disallowed_context {
6412            self.table_disallowed_context.push(context);
6413        }
6414
6415        visit_mut::visit_expr_mut(self, expr);
6416
6417        if disallowed_context.is_some() {
6418            self.table_disallowed_context.pop();
6419        }
6420    }
6421
6422    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6423        let old = self.in_select_item;
6424        self.in_select_item = true;
6425        visit_mut::visit_select_item_mut(self, si);
6426        self.in_select_item = old;
6427    }
6428}
6429
6430#[derive(Default)]
6431struct WindowFuncCollector {
6432    window_funcs: Vec<Expr<Aug>>,
6433}
6434
6435impl WindowFuncCollector {
6436    fn into_result(self) -> Vec<Expr<Aug>> {
6437        // Dedup while preserving the order.
6438        let mut seen = BTreeSet::new();
6439        let window_funcs_dedupped = self
6440            .window_funcs
6441            .into_iter()
6442            .filter(move |expr| seen.insert(expr.clone()))
6443            // Reverse the order, so that in case of a nested window function call, the
6444            // inner one is evaluated first.
6445            .rev()
6446            .collect();
6447        window_funcs_dedupped
6448    }
6449}
6450
6451impl Visit<'_, Aug> for WindowFuncCollector {
6452    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6453        match expr {
6454            Expr::Function(func) => {
6455                if func.over.is_some() {
6456                    self.window_funcs.push(expr.clone());
6457                }
6458            }
6459            _ => (),
6460        }
6461        visit::visit_expr(self, expr);
6462    }
6463
6464    fn visit_query(&mut self, _query: &Query<Aug>) {
6465        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6466    }
6467}
6468
6469/// Specifies how long a query will live.
6470#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6471pub enum QueryLifetime {
6472    /// The query's (or the expression's) result will be computed at one point in time.
6473    OneShot,
6474    /// The query (or expression) is used in a dataflow that maintains an index.
6475    Index,
6476    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6477    MaterializedView,
6478    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6479    Subscribe,
6480    /// The query (or expression) is part of a (non-materialized) view.
6481    View,
6482    /// The expression is part of a source definition.
6483    Source,
6484}
6485
6486impl QueryLifetime {
6487    /// (This used to impact whether the query is allowed to reason about the time at which it is
6488    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6489    /// mechanism, see `ExprPrepStyle`.)
6490    pub fn is_one_shot(&self) -> bool {
6491        let result = match self {
6492            QueryLifetime::OneShot => true,
6493            QueryLifetime::Index => false,
6494            QueryLifetime::MaterializedView => false,
6495            QueryLifetime::Subscribe => false,
6496            QueryLifetime::View => false,
6497            QueryLifetime::Source => false,
6498        };
6499        assert_eq!(!result, self.is_maintained());
6500        result
6501    }
6502
6503    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6504    /// turned into a `TopK`.
6505    pub fn is_maintained(&self) -> bool {
6506        match self {
6507            QueryLifetime::OneShot => false,
6508            QueryLifetime::Index => true,
6509            QueryLifetime::MaterializedView => true,
6510            QueryLifetime::Subscribe => true,
6511            QueryLifetime::View => true,
6512            QueryLifetime::Source => true,
6513        }
6514    }
6515
6516    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6517    pub fn allow_show(&self) -> bool {
6518        match self {
6519            QueryLifetime::OneShot => true,
6520            QueryLifetime::Index => false,
6521            QueryLifetime::MaterializedView => false,
6522            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6523            QueryLifetime::View => false,
6524            QueryLifetime::Source => false,
6525        }
6526    }
6527}
6528
6529/// Description of a CTE sufficient for query planning.
6530#[derive(Debug, Clone)]
6531pub struct CteDesc {
6532    pub name: String,
6533    pub desc: RelationDesc,
6534}
6535
6536/// The state required when planning a `Query`.
6537#[derive(Debug, Clone)]
6538pub struct QueryContext<'a> {
6539    /// The context for the containing `Statement`.
6540    pub scx: &'a StatementContext<'a>,
6541    /// The lifetime that the planned query will have.
6542    pub lifetime: QueryLifetime,
6543    /// The scopes of the outer relation expression.
6544    pub outer_scopes: Vec<Scope>,
6545    /// The type of the outer relation expressions.
6546    pub outer_relation_types: Vec<SqlRelationType>,
6547    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6548    pub ctes: BTreeMap<LocalId, CteDesc>,
6549    /// A name manager, for interning column names that will be stored in HIR and MIR.
6550    pub name_manager: Rc<RefCell<NameManager>>,
6551    pub recursion_guard: RecursionGuard,
6552}
6553
6554impl CheckedRecursion for QueryContext<'_> {
6555    fn recursion_guard(&self) -> &RecursionGuard {
6556        &self.recursion_guard
6557    }
6558}
6559
6560impl<'a> QueryContext<'a> {
6561    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6562        QueryContext {
6563            scx,
6564            lifetime,
6565            outer_scopes: vec![],
6566            outer_relation_types: vec![],
6567            ctes: BTreeMap::new(),
6568            name_manager: Rc::new(RefCell::new(NameManager::new())),
6569            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6570        }
6571    }
6572
6573    fn relation_type(&self, expr: &HirRelationExpr) -> SqlRelationType {
6574        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6575    }
6576
6577    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6578    /// `self`.
6579    fn derived_context(&self, scope: Scope, relation_type: SqlRelationType) -> QueryContext<'a> {
6580        let ctes = self.ctes.clone();
6581        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6582        let outer_relation_types = iter::once(relation_type)
6583            .chain(self.outer_relation_types.clone())
6584            .collect();
6585        // These shenanigans are simpler than adding `&mut NameManager` arguments everywhere.
6586        let name_manager = Rc::clone(&self.name_manager);
6587
6588        QueryContext {
6589            scx: self.scx,
6590            lifetime: self.lifetime,
6591            outer_scopes,
6592            outer_relation_types,
6593            ctes,
6594            name_manager,
6595            recursion_guard: self.recursion_guard.clone(),
6596        }
6597    }
6598
6599    /// Derives a `QueryContext` for a scope that contains no columns.
6600    fn empty_derived_context(&self) -> QueryContext<'a> {
6601        let scope = Scope::empty();
6602        let ty = SqlRelationType::empty();
6603        self.derived_context(scope, ty)
6604    }
6605
6606    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6607    /// CTE.
6608    pub fn resolve_table_name(
6609        &self,
6610        object: ResolvedItemName,
6611    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6612        match object {
6613            ResolvedItemName::Item {
6614                id,
6615                full_name,
6616                version,
6617                ..
6618            } => {
6619                let item = self.scx.get_item(&id).at_version(version);
6620                let desc = match item.relation_desc() {
6621                    Some(desc) => desc.clone(),
6622                    None => {
6623                        return Err(PlanError::InvalidDependency {
6624                            name: full_name.to_string(),
6625                            item_type: item.item_type().to_string(),
6626                        });
6627                    }
6628                };
6629                let expr = HirRelationExpr::Get {
6630                    id: Id::Global(item.global_id()),
6631                    typ: desc.typ().clone(),
6632                };
6633
6634                let name = full_name.into();
6635                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6636
6637                Ok((expr, scope))
6638            }
6639            ResolvedItemName::Cte { id, name } => {
6640                let name = name.into();
6641                let cte = self.ctes.get(&id).unwrap();
6642                let expr = HirRelationExpr::Get {
6643                    id: Id::Local(id),
6644                    typ: cte.desc.typ().clone(),
6645                };
6646
6647                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6648
6649                Ok((expr, scope))
6650            }
6651            ResolvedItemName::Error => bail_internal!("should have been caught in name resolution"),
6652        }
6653    }
6654
6655    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6656    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6657    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6658        self.scx.humanize_sql_scalar_type(typ, postgres_compat)
6659    }
6660}
6661
6662/// A bundle of unrelated things that we need for planning `Expr`s.
6663#[derive(Debug, Clone)]
6664pub struct ExprContext<'a> {
6665    pub qcx: &'a QueryContext<'a>,
6666    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6667    pub name: &'a str,
6668    /// The context for the `Query` that contains this `Expr`.
6669    /// The current scope.
6670    pub scope: &'a Scope,
6671    /// The type of the current relation expression upon which this scalar
6672    /// expression will be evaluated.
6673    pub relation_type: &'a SqlRelationType,
6674    /// Are aggregate functions allowed in this context
6675    pub allow_aggregates: bool,
6676    /// Are subqueries allowed in this context
6677    pub allow_subqueries: bool,
6678    /// Are parameters allowed in this context.
6679    pub allow_parameters: bool,
6680    /// Are window functions allowed in this context
6681    pub allow_windows: bool,
6682}
6683
6684impl CheckedRecursion for ExprContext<'_> {
6685    fn recursion_guard(&self) -> &RecursionGuard {
6686        &self.qcx.recursion_guard
6687    }
6688}
6689
6690impl<'a> ExprContext<'a> {
6691    pub fn catalog(&self) -> &dyn SessionCatalog {
6692        self.qcx.scx.catalog
6693    }
6694
6695    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6696        let mut ecx = self.clone();
6697        ecx.name = name;
6698        ecx
6699    }
6700
6701    pub fn column_type<E>(&self, expr: &E) -> E::Type
6702    where
6703        E: AbstractExpr,
6704    {
6705        expr.typ(
6706            &self.qcx.outer_relation_types,
6707            self.relation_type,
6708            &self.qcx.scx.param_types.borrow(),
6709        )
6710    }
6711
6712    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6713    where
6714        E: AbstractExpr,
6715    {
6716        self.column_type(expr).scalar_type()
6717    }
6718
6719    fn derived_query_context(&self) -> QueryContext<'_> {
6720        let mut scope = self.scope.clone();
6721        scope.lateral_barrier = true;
6722        self.qcx.derived_context(scope, self.relation_type.clone())
6723    }
6724
6725    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6726        self.qcx.scx.require_feature_flag(flag)
6727    }
6728
6729    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, SqlScalarType>> {
6730        &self.qcx.scx.param_types
6731    }
6732
6733    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6734    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6735    pub fn humanize_sql_scalar_type(&self, typ: &SqlScalarType, postgres_compat: bool) -> String {
6736        self.qcx.scx.humanize_sql_scalar_type(typ, postgres_compat)
6737    }
6738
6739    pub fn intern(&self, item: &ScopeItem) -> Arc<str> {
6740        self.qcx.name_manager.borrow_mut().intern_scope_item(item)
6741    }
6742}
6743
6744/// Manages column names, doing lightweight string internment.
6745///
6746/// Names are stored in `HirScalarExpr` and `MirScalarExpr` using
6747/// `Option<Arc<str>>`; we use the `NameManager` when lowering from SQL to HIR
6748/// to ensure maximal sharing.
6749#[derive(Debug, Clone)]
6750pub struct NameManager(BTreeSet<Arc<str>>);
6751
6752impl NameManager {
6753    /// Creates a new `NameManager`, with no interned names
6754    pub fn new() -> Self {
6755        Self(BTreeSet::new())
6756    }
6757
6758    /// Interns a string, returning a reference-counted pointer to the interned
6759    /// string.
6760    fn intern<S: AsRef<str>>(&mut self, s: S) -> Arc<str> {
6761        let s = s.as_ref();
6762        if let Some(interned) = self.0.get(s) {
6763            Arc::clone(interned)
6764        } else {
6765            let interned: Arc<str> = Arc::from(s);
6766            self.0.insert(Arc::clone(&interned));
6767            interned
6768        }
6769    }
6770
6771    /// Interns a string representing a reference to a `ScopeItem`, returning a
6772    /// reference-counted pointer to the interned string.
6773    pub fn intern_scope_item(&mut self, item: &ScopeItem) -> Arc<str> {
6774        // TODO(mgree): extracting the table name from `item` leads to an issue with the catalog
6775        //
6776        // After an `ALTER ... RENAME` on a table, the catalog will have out-of-date
6777        // name information. Note that as of 2025-04-09, we don't support column
6778        // renames.
6779        //
6780        // A few bad alternatives:
6781        //
6782        // (1) Store it but don't write it down. This fails because the expression
6783        //     cache will erase our names on restart.
6784        // (2) When `ALTER ... RENAME` is run, re-optimize all downstream objects to
6785        //     get the right names. But the world now and the world when we made
6786        //     those objects may be different.
6787        // (3) Just don't write down the table name. Nothing fails... for now.
6788
6789        self.intern(item.column_name.as_str())
6790    }
6791}
6792
6793#[cfg(test)]
6794mod test {
6795    use super::*;
6796
6797    /// Ensure that `NameManager`'s string interning works as expected.
6798    ///
6799    /// In particular, structurally but not referentially identical strings should
6800    /// be interned to the same `Arc`ed pointer.
6801    #[mz_ore::test]
6802    pub fn test_name_manager_string_interning() {
6803        let mut nm = NameManager::new();
6804
6805        let orig_hi = "hi";
6806        let hi = nm.intern(orig_hi);
6807        let hello = nm.intern("hello");
6808
6809        assert_ne!(hi.as_ptr(), hello.as_ptr());
6810
6811        // this static string is _likely_ the same as `orig_hi``
6812        let hi2 = nm.intern("hi");
6813        assert_eq!(hi.as_ptr(), hi2.as_ptr());
6814
6815        // generate a "hi" string that doesn't get optimized to the same static string
6816        let s = format!(
6817            "{}{}",
6818            hi.chars().nth(0).unwrap(),
6819            hi2.chars().nth(1).unwrap()
6820        );
6821        // make sure that we're testing with a fresh string!
6822        assert_ne!(orig_hi.as_ptr(), s.as_ptr());
6823
6824        let hi3 = nm.intern(s);
6825        assert_eq!(hi.as_ptr(), hi3.as_ptr());
6826    }
6827}