mz_sql/plan/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL `Query`s are the declarative, computational part of SQL.
11//! This module turns `Query`s into `HirRelationExpr`s - a more explicit, algebraic way of
12//! describing computation.
13
14//! Functions named plan_* are typically responsible for handling a single node of the SQL ast.
15//! E.g. `plan_query` is responsible for handling `sqlparser::ast::Query`.
16//! plan_* functions which correspond to operations on relations typically return a `HirRelationExpr`.
17//! plan_* functions which correspond to operations on scalars typically return a `HirScalarExpr`
18//! and a `ScalarType`. (The latter is because it's not always possible to infer from a
19//! `HirScalarExpr` what the intended type is - notably in the case of decimals where the
20//! scale/precision are encoded only in the type).
21
22//! Aggregates are particularly twisty.
23//!
24//! In SQL, a GROUP BY turns any columns not in the group key into vectors of
25//! values. Then anywhere later in the scope, an aggregate function can be
26//! applied to that group. Inside the arguments of an aggregate function, other
27//! normal functions are applied element-wise over the vectors. Thus, `SELECT
28//! sum(foo.x + foo.y) FROM foo GROUP BY x` means adding the scalar `x` to the
29//! vector `y` and summing the results.
30//!
31//! In `HirRelationExpr`, aggregates can only be applied immediately at the time
32//! of grouping.
33//!
34//! To deal with this, whenever we see a SQL GROUP BY we look ahead for
35//! aggregates and precompute them in the `HirRelationExpr::Reduce`. When we
36//! reach the same aggregates during normal planning later on, we look them up
37//! in an `ExprContext` to find the precomputed versions.
38
39use std::borrow::Cow;
40use std::cell::RefCell;
41use std::collections::{BTreeMap, BTreeSet};
42use std::convert::{TryFrom, TryInto};
43use std::num::NonZeroU64;
44use std::{iter, mem};
45
46use itertools::Itertools;
47use mz_expr::virtual_syntax::AlgExcept;
48use mz_expr::{
49    Id, LetRecLimit, LocalId, MapFilterProject, MirScalarExpr, RowSetFinishing, func as expr_func,
50};
51use mz_ore::assert_none;
52use mz_ore::collections::CollectionExt;
53use mz_ore::option::FallibleMapExt;
54use mz_ore::stack::{CheckedRecursion, RecursionGuard};
55use mz_ore::str::StrExt;
56use mz_repr::adt::char::CharLength;
57use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
58use mz_repr::adt::timestamp::TimestampPrecision;
59use mz_repr::adt::varchar::VarCharMaxLength;
60use mz_repr::{
61    CatalogItemId, ColumnIndex, ColumnName, ColumnType, Datum, RelationDesc, RelationType,
62    RelationVersionSelector, Row, RowArena, ScalarType, strconv,
63};
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_sql_parser::ast::visit::Visit;
66use mz_sql_parser::ast::visit_mut::{self, VisitMut};
67use mz_sql_parser::ast::{
68    AsOf, Assignment, AstInfo, CreateWebhookSourceBody, CreateWebhookSourceCheck,
69    CreateWebhookSourceHeader, CreateWebhookSourceSecret, CteBlock, DeleteStatement, Distinct,
70    Expr, Function, FunctionArgs, HomogenizingFunction, Ident, InsertSource, IsExprConstruct, Join,
71    JoinConstraint, JoinOperator, Limit, MapEntry, MutRecBlock, MutRecBlockOption,
72    MutRecBlockOptionName, OrderByExpr, Query, Select, SelectItem, SelectOption, SelectOptionName,
73    SetExpr, SetOperator, ShowStatement, SubscriptPosition, TableAlias, TableFactor,
74    TableWithJoins, UnresolvedItemName, UpdateStatement, Value, Values, WindowFrame,
75    WindowFrameBound, WindowFrameUnits, WindowSpec, visit,
76};
77use mz_sql_parser::ident;
78use uuid::Uuid;
79
80use crate::catalog::{CatalogItemType, CatalogType, SessionCatalog};
81use crate::func::{self, Func, FuncSpec};
82use crate::names::{
83    Aug, FullItemName, PartialItemName, ResolvedDataType, ResolvedItemName, SchemaSpecifier,
84};
85use crate::normalize;
86use crate::plan::PlanError::InvalidWmrRecursionLimit;
87use crate::plan::error::PlanError;
88use crate::plan::hir::{
89    AbstractColumnType, AbstractExpr, AggregateExpr, AggregateFunc, AggregateWindowExpr,
90    BinaryFunc, CoercibleScalarExpr, CoercibleScalarType, ColumnOrder, ColumnRef, Hir,
91    HirRelationExpr, HirScalarExpr, JoinKind, ScalarWindowExpr, ScalarWindowFunc, UnaryFunc,
92    ValueWindowExpr, ValueWindowFunc, VariadicFunc, WindowExpr, WindowExprType,
93};
94use crate::plan::plan_utils::{self, GroupSizeHints, JoinSide};
95use crate::plan::scope::{Scope, ScopeItem, ScopeUngroupedColumn};
96use crate::plan::statement::{StatementContext, StatementDesc, show};
97use crate::plan::typeconv::{self, CastContext};
98use crate::plan::{
99    Params, PlanContext, QueryWhen, ShowCreatePlan, WebhookValidation, WebhookValidationSecret,
100    literal, transform_ast,
101};
102use crate::session::vars::{self, FeatureFlag};
103
104#[derive(Debug)]
105pub struct PlannedRootQuery<E> {
106    pub expr: E,
107    pub desc: RelationDesc,
108    pub finishing: RowSetFinishing<HirScalarExpr>,
109    pub scope: Scope,
110}
111
112/// Plans a top-level query, returning the `HirRelationExpr` describing the query
113/// plan, the `RelationDesc` describing the shape of the result set, a
114/// `RowSetFinishing` describing post-processing that must occur before results
115/// are sent to the client, and the types of the parameters in the query, if any
116/// were present.
117///
118/// Note that the returned `RelationDesc` describes the expression after
119/// applying the returned `RowSetFinishing`.
120#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
121pub fn plan_root_query(
122    scx: &StatementContext,
123    mut query: Query<Aug>,
124    lifetime: QueryLifetime,
125) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
126    transform_ast::transform(scx, &mut query)?;
127    let mut qcx = QueryContext::root(scx, lifetime);
128    let PlannedQuery {
129        mut expr,
130        scope,
131        order_by,
132        limit,
133        offset,
134        project,
135        group_size_hints,
136    } = plan_query(&mut qcx, &query)?;
137
138    let mut finishing = RowSetFinishing {
139        limit,
140        offset,
141        project,
142        order_by,
143    };
144
145    // Attempt to push the finishing's ordering past its projection. This allows
146    // data to be projected down on the workers rather than the coordinator. It
147    // also improves the optimizer's demand analysis, as the optimizer can only
148    // reason about demand information in `expr` (i.e., it can't see
149    // `finishing.project`).
150    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
151
152    if lifetime.is_maintained() {
153        expr.finish_maintained(&mut finishing, group_size_hints);
154    }
155
156    let typ = qcx.relation_type(&expr);
157    let typ = RelationType::new(
158        finishing
159            .project
160            .iter()
161            .map(|i| typ.column_types[*i].clone())
162            .collect(),
163    );
164    let desc = RelationDesc::new(typ, scope.column_names());
165
166    Ok(PlannedRootQuery {
167        expr,
168        desc,
169        finishing,
170        scope,
171    })
172}
173
174/// TODO(ct2): Dedup this with [plan_root_query].
175#[mz_ore::instrument(target = "compiler", level = "trace", name = "ast_to_hir")]
176pub fn plan_ct_query(
177    qcx: &mut QueryContext,
178    mut query: Query<Aug>,
179) -> Result<PlannedRootQuery<HirRelationExpr>, PlanError> {
180    transform_ast::transform(qcx.scx, &mut query)?;
181    let PlannedQuery {
182        mut expr,
183        scope,
184        order_by,
185        limit,
186        offset,
187        project,
188        group_size_hints,
189    } = plan_query(qcx, &query)?;
190
191    let mut finishing = RowSetFinishing {
192        limit,
193        offset,
194        project,
195        order_by,
196    };
197
198    // Attempt to push the finishing's ordering past its projection. This allows
199    // data to be projected down on the workers rather than the coordinator. It
200    // also improves the optimizer's demand analysis, as the optimizer can only
201    // reason about demand information in `expr` (i.e., it can't see
202    // `finishing.project`).
203    try_push_projection_order_by(&mut expr, &mut finishing.project, &mut finishing.order_by);
204
205    expr.finish_maintained(&mut finishing, group_size_hints);
206
207    let typ = qcx.relation_type(&expr);
208    let typ = RelationType::new(
209        finishing
210            .project
211            .iter()
212            .map(|i| typ.column_types[*i].clone())
213            .collect(),
214    );
215    let desc = RelationDesc::new(typ, scope.column_names());
216
217    Ok(PlannedRootQuery {
218        expr,
219        desc,
220        finishing,
221        scope,
222    })
223}
224
225/// Attempts to push a projection through an order by.
226///
227/// The returned bool indicates whether the pushdown was successful or not.
228/// Successful pushdown requires that all the columns referenced in `order_by`
229/// are included in `project`.
230///
231/// When successful, `expr` is wrapped in a projection node, `order_by` is
232/// rewritten to account for the pushed-down projection, and `project` is
233/// replaced with the trivial projection. When unsuccessful, no changes are made
234/// to any of the inputs.
235fn try_push_projection_order_by(
236    expr: &mut HirRelationExpr,
237    project: &mut Vec<usize>,
238    order_by: &mut Vec<ColumnOrder>,
239) -> bool {
240    let mut unproject = vec![None; expr.arity()];
241    for (out_i, in_i) in project.iter().copied().enumerate() {
242        unproject[in_i] = Some(out_i);
243    }
244    if order_by
245        .iter()
246        .all(|ob| ob.column < unproject.len() && unproject[ob.column].is_some())
247    {
248        let trivial_project = (0..project.len()).collect();
249        *expr = expr.take().project(mem::replace(project, trivial_project));
250        for ob in order_by {
251            ob.column = unproject[ob.column].unwrap();
252        }
253        true
254    } else {
255        false
256    }
257}
258
259pub fn plan_insert_query(
260    scx: &StatementContext,
261    table_name: ResolvedItemName,
262    columns: Vec<Ident>,
263    source: InsertSource<Aug>,
264    returning: Vec<SelectItem<Aug>>,
265) -> Result<
266    (
267        CatalogItemId,
268        HirRelationExpr,
269        PlannedRootQuery<Vec<HirScalarExpr>>,
270    ),
271    PlanError,
272> {
273    let mut qcx = QueryContext::root(scx, QueryLifetime::OneShot);
274    let table = scx.get_item_by_resolved_name(&table_name)?;
275
276    // Validate the target of the insert.
277    if table.item_type() != CatalogItemType::Table {
278        sql_bail!(
279            "cannot insert into {} '{}'",
280            table.item_type(),
281            table_name.full_name_str()
282        );
283    }
284    let desc = table.desc(&scx.catalog.resolve_full_name(table.name()))?;
285    let mut defaults = table
286        .writable_table_details()
287        .ok_or_else(|| {
288            sql_err!(
289                "cannot insert into non-writeable table '{}'",
290                table_name.full_name_str()
291            )
292        })?
293        .to_vec();
294
295    for default in &mut defaults {
296        transform_ast::transform(scx, default)?;
297    }
298
299    if table.id().is_system() {
300        sql_bail!(
301            "cannot insert into system table '{}'",
302            table_name.full_name_str()
303        );
304    }
305
306    let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
307
308    // Validate target column order.
309    let mut source_types = Vec::with_capacity(columns.len());
310    let mut ordering = Vec::with_capacity(columns.len());
311
312    if columns.is_empty() {
313        // Columns in source query must be in order. Let's guess the full shape and truncate to the
314        // right size later after planning the source query
315        source_types.extend(desc.iter_types().map(|x| &x.scalar_type));
316        ordering.extend(0..desc.arity());
317    } else {
318        let column_by_name: BTreeMap<&ColumnName, (usize, &ColumnType)> = desc
319            .iter()
320            .enumerate()
321            .map(|(idx, (name, typ))| (name, (idx, typ)))
322            .collect();
323
324        for c in &columns {
325            if let Some((idx, typ)) = column_by_name.get(c) {
326                ordering.push(*idx);
327                source_types.push(&typ.scalar_type);
328            } else {
329                sql_bail!(
330                    "column {} of relation {} does not exist",
331                    c.as_str().quoted(),
332                    table_name.full_name_str().quoted()
333                );
334            }
335        }
336        if let Some(dup) = columns.iter().duplicates().next() {
337            sql_bail!("column {} specified more than once", dup.as_str().quoted());
338        }
339    };
340
341    // Plan the source.
342    let expr = match source {
343        InsertSource::Query(mut query) => {
344            transform_ast::transform(scx, &mut query)?;
345
346            match query {
347                // Special-case simple VALUES clauses as PostgreSQL does.
348                Query {
349                    body: SetExpr::Values(Values(values)),
350                    ctes,
351                    order_by,
352                    limit: None,
353                    offset: None,
354                } if ctes.is_empty() && order_by.is_empty() => {
355                    let names: Vec<_> = ordering.iter().map(|i| desc.get_name(*i)).collect();
356                    plan_values_insert(&qcx, &names, &source_types, &values)?
357                }
358                _ => {
359                    let (expr, _scope) = plan_nested_query(&mut qcx, &query)?;
360                    expr
361                }
362            }
363        }
364        InsertSource::DefaultValues => {
365            HirRelationExpr::constant(vec![vec![]], RelationType::empty())
366        }
367    };
368
369    let expr_arity = expr.arity();
370
371    // Validate that the arity of the source query is at most the size of declared columns or the
372    // size of the table if none are declared
373    let max_columns = if columns.is_empty() {
374        desc.arity()
375    } else {
376        columns.len()
377    };
378    if expr_arity > max_columns {
379        sql_bail!("INSERT has more expressions than target columns");
380    }
381    // But it should never have less than the declared columns (or zero)
382    if expr_arity < columns.len() {
383        sql_bail!("INSERT has more target columns than expressions");
384    }
385
386    // Trim now that we know for sure the correct arity of the source query
387    source_types.truncate(expr_arity);
388    ordering.truncate(expr_arity);
389
390    // Ensure the types of the source query match the types of the target table,
391    // installing assignment casts where necessary and possible.
392    let expr = cast_relation(&qcx, CastContext::Assignment, expr, source_types).map_err(|e| {
393        sql_err!(
394            "column {} is of type {} but expression is of type {}",
395            desc.get_name(ordering[e.column]).as_str().quoted(),
396            qcx.humanize_scalar_type(&e.target_type, false),
397            qcx.humanize_scalar_type(&e.source_type, false),
398        )
399    })?;
400
401    // Fill in any omitted columns and rearrange into correct order
402    let mut map_exprs = vec![];
403    let mut project_key = Vec::with_capacity(desc.arity());
404
405    // Maps from table column index to position in the source query
406    let col_to_source: BTreeMap<_, _> = ordering.iter().enumerate().map(|(a, b)| (b, a)).collect();
407
408    let column_details = desc.iter_types().zip_eq(defaults).enumerate();
409    for (col_idx, (col_typ, default)) in column_details {
410        if let Some(src_idx) = col_to_source.get(&col_idx) {
411            project_key.push(*src_idx);
412        } else {
413            let hir = plan_default_expr(scx, &default, &col_typ.scalar_type)?;
414            project_key.push(expr_arity + map_exprs.len());
415            map_exprs.push(hir);
416        }
417    }
418
419    let returning = {
420        let (scope, typ) = if let ResolvedItemName::Item {
421            full_name,
422            version: _,
423            ..
424        } = table_name
425        {
426            let scope = Scope::from_source(Some(full_name.clone().into()), desc.iter_names());
427            let typ = desc.typ().clone();
428            (scope, typ)
429        } else {
430            (Scope::empty(), RelationType::empty())
431        };
432        let ecx = &ExprContext {
433            qcx: &qcx,
434            name: "RETURNING clause",
435            scope: &scope,
436            relation_type: &typ,
437            allow_aggregates: false,
438            allow_subqueries: false,
439            allow_parameters: false,
440            allow_windows: false,
441        };
442        let table_func_names = BTreeMap::new();
443        let mut output_columns = vec![];
444        let mut new_exprs = vec![];
445        let mut new_type = RelationType::empty();
446        for mut si in returning {
447            transform_ast::transform(scx, &mut si)?;
448            for (select_item, column_name) in expand_select_item(ecx, &si, &table_func_names)? {
449                let expr = match &select_item {
450                    ExpandedSelectItem::InputOrdinal(i) => HirScalarExpr::column(*i),
451                    ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
452                };
453                output_columns.push(column_name);
454                let typ = ecx.column_type(&expr);
455                new_type.column_types.push(typ);
456                new_exprs.push(expr);
457            }
458        }
459        let desc = RelationDesc::new(new_type, output_columns);
460        let desc_arity = desc.arity();
461        PlannedRootQuery {
462            expr: new_exprs,
463            desc,
464            finishing: RowSetFinishing {
465                order_by: vec![],
466                limit: None,
467                offset: 0,
468                project: (0..desc_arity).collect(),
469            },
470            scope,
471        }
472    };
473
474    Ok((
475        table.id(),
476        expr.map(map_exprs).project(project_key),
477        returning,
478    ))
479}
480
481/// Determines the mapping between some external data and a Materialize relation.
482///
483/// Returns the following:
484/// * [`CatalogItemId`] for the destination table.
485/// * [`RelationDesc`] representing the shape of the __input__ data we are copying from.
486/// * The [`ColumnIndex`]es that the source data maps to. TODO(cf2): We don't need this mapping
487///   since we now return a [`MapFilterProject`].
488/// * [`MapFilterProject`] which will map and project the input data to match the shape of the
489///   destination table.
490///
491pub fn plan_copy_item(
492    scx: &StatementContext,
493    item_name: ResolvedItemName,
494    columns: Vec<Ident>,
495) -> Result<
496    (
497        CatalogItemId,
498        RelationDesc,
499        Vec<ColumnIndex>,
500        Option<MapFilterProject>,
501    ),
502    PlanError,
503> {
504    let item = scx.get_item_by_resolved_name(&item_name)?;
505    let fullname = scx.catalog.resolve_full_name(item.name());
506    let table_desc = item.desc(&fullname)?.into_owned();
507    let mut ordering = Vec::with_capacity(columns.len());
508
509    // TODO(cf2): The logic here to create the `source_desc` and the MFP are a bit duplicated and
510    // should be simplified. The reason they are currently separate code paths is so we can roll
511    // out `COPY ... FROM <url>` without touching the current `COPY ... FROM ... STDIN` behavior.
512
513    // If we're copying data into a table that users can write into (e.g. not a `CREATE TABLE ...
514    // FROM SOURCE ...`), then we generate an MFP.
515    //
516    // Note: This method is called for both `COPY INTO <table> FROM` and `COPY <expr> TO <external>`
517    // so it's not always guaranteed that our `item` is a table.
518    let mfp = if let Some(table_defaults) = item.writable_table_details() {
519        let mut table_defaults = table_defaults.to_vec();
520
521        for default in &mut table_defaults {
522            transform_ast::transform(scx, default)?;
523        }
524
525        // Fill in any omitted columns and rearrange into correct order
526        let source_column_names: Vec<_> = columns
527            .iter()
528            .cloned()
529            .map(normalize::column_name)
530            .collect();
531
532        let mut default_exprs = Vec::new();
533        let mut project_keys = Vec::with_capacity(table_desc.arity());
534
535        // For each column in the destination table, either project it from the source data, or provide
536        // an expression to fill in a default value.
537        let column_details = table_desc.iter().zip_eq(table_defaults);
538        for ((col_name, col_type), col_default) in column_details {
539            let maybe_src_idx = source_column_names.iter().position(|name| name == col_name);
540            if let Some(src_idx) = maybe_src_idx {
541                project_keys.push(src_idx);
542            } else {
543                // If one a column from the table does not exist in the source data, then a default
544                // value will get appended to the end of the input Row from the source data.
545                let hir = plan_default_expr(scx, &col_default, &col_type.scalar_type)?;
546                let mir = hir.lower_uncorrelated()?;
547                project_keys.push(source_column_names.len() + default_exprs.len());
548                default_exprs.push(mir);
549            }
550        }
551
552        let mfp = MapFilterProject::new(source_column_names.len())
553            .map(default_exprs)
554            .project(project_keys);
555        Some(mfp)
556    } else {
557        None
558    };
559
560    // Create a mapping from input data to the table we're copying into.
561    let source_desc = if columns.is_empty() {
562        let indexes = (0..table_desc.arity()).map(ColumnIndex::from_raw);
563        ordering.extend(indexes);
564
565        // The source data should be in the same order as the table.
566        table_desc
567    } else {
568        let columns: Vec<_> = columns.into_iter().map(normalize::column_name).collect();
569        let column_by_name: BTreeMap<&ColumnName, (ColumnIndex, &ColumnType)> = table_desc
570            .iter_all()
571            .map(|(idx, name, typ)| (name, (*idx, typ)))
572            .collect();
573
574        let mut names = Vec::with_capacity(columns.len());
575        let mut source_types = Vec::with_capacity(columns.len());
576
577        for c in &columns {
578            if let Some((idx, typ)) = column_by_name.get(c) {
579                ordering.push(*idx);
580                source_types.push((*typ).clone());
581                names.push(c.clone());
582            } else {
583                sql_bail!(
584                    "column {} of relation {} does not exist",
585                    c.as_str().quoted(),
586                    item_name.full_name_str().quoted()
587                );
588            }
589        }
590        if let Some(dup) = columns.iter().duplicates().next() {
591            sql_bail!("column {} specified more than once", dup.as_str().quoted());
592        }
593
594        // The source data is a different shape than the destination table.
595        RelationDesc::new(RelationType::new(source_types), names)
596    };
597
598    Ok((item.id(), source_desc, ordering, mfp))
599}
600
601/// See the doc comment on [`plan_copy_item`] for the details of what this function returns.
602///
603/// TODO(cf3): Merge this method with [`plan_copy_item`].
604pub fn plan_copy_from(
605    scx: &StatementContext,
606    table_name: ResolvedItemName,
607    columns: Vec<Ident>,
608) -> Result<
609    (
610        CatalogItemId,
611        RelationDesc,
612        Vec<ColumnIndex>,
613        Option<MapFilterProject>,
614    ),
615    PlanError,
616> {
617    let table = scx.get_item_by_resolved_name(&table_name)?;
618
619    // Validate the target of the insert.
620    if table.item_type() != CatalogItemType::Table {
621        sql_bail!(
622            "cannot insert into {} '{}'",
623            table.item_type(),
624            table_name.full_name_str()
625        );
626    }
627
628    let _ = table.writable_table_details().ok_or_else(|| {
629        sql_err!(
630            "cannot insert into non-writeable table '{}'",
631            table_name.full_name_str()
632        )
633    })?;
634
635    if table.id().is_system() {
636        sql_bail!(
637            "cannot insert into system table '{}'",
638            table_name.full_name_str()
639        );
640    }
641    let (id, desc, ordering, mfp) = plan_copy_item(scx, table_name, columns)?;
642
643    Ok((id, desc, ordering, mfp))
644}
645
646/// Builds a plan that adds the default values for the missing columns and re-orders
647/// the datums in the given rows to match the order in the target table.
648pub fn plan_copy_from_rows(
649    pcx: &PlanContext,
650    catalog: &dyn SessionCatalog,
651    id: CatalogItemId,
652    columns: Vec<ColumnIndex>,
653    rows: Vec<mz_repr::Row>,
654) -> Result<HirRelationExpr, PlanError> {
655    let scx = StatementContext::new(Some(pcx), catalog);
656
657    // Always copy at the latest version of the table.
658    let table = catalog
659        .get_item(&id)
660        .at_version(RelationVersionSelector::Latest);
661    let desc = table.desc(&catalog.resolve_full_name(table.name()))?;
662
663    let mut defaults = table
664        .writable_table_details()
665        .ok_or_else(|| sql_err!("cannot copy into non-writeable table"))?
666        .to_vec();
667
668    for default in &mut defaults {
669        transform_ast::transform(&scx, default)?;
670    }
671
672    let column_types = columns
673        .iter()
674        .map(|x| desc.get_type(x).clone())
675        .map(|mut x| {
676            // Null constraint is enforced later, when inserting the row in the table.
677            // Without this, an assert is hit during lowering.
678            x.nullable = true;
679            x
680        })
681        .collect();
682    let typ = RelationType::new(column_types);
683    let expr = HirRelationExpr::Constant {
684        rows,
685        typ: typ.clone(),
686    };
687
688    // Exit early with just the raw constant if we know that all columns are present
689    // and in the correct order. This lets us bypass expensive downstream optimizations
690    // more easily, as at every stage we know this expression is nothing more than
691    // a constant (as opposed to e.g. a constant with with an identity map and identity
692    // projection).
693    let default: Vec<_> = (0..desc.arity()).map(ColumnIndex::from_raw).collect();
694    if columns == default {
695        return Ok(expr);
696    }
697
698    // Fill in any omitted columns and rearrange into correct order
699    let mut map_exprs = vec![];
700    let mut project_key = Vec::with_capacity(desc.arity());
701
702    // Maps from table column index to position in the source query
703    let col_to_source: BTreeMap<_, _> = columns.iter().enumerate().map(|(a, b)| (b, a)).collect();
704
705    let column_details = desc.iter_all().zip_eq(defaults);
706    for ((col_idx, _col_name, col_typ), default) in column_details {
707        if let Some(src_idx) = col_to_source.get(&col_idx) {
708            project_key.push(*src_idx);
709        } else {
710            let hir = plan_default_expr(&scx, &default, &col_typ.scalar_type)?;
711            project_key.push(typ.arity() + map_exprs.len());
712            map_exprs.push(hir);
713        }
714    }
715
716    Ok(expr.map(map_exprs).project(project_key))
717}
718
719/// Common information used for DELETE, UPDATE, and INSERT INTO ... SELECT plans.
720pub struct ReadThenWritePlan {
721    pub id: CatalogItemId,
722    /// Read portion of query.
723    ///
724    /// NOTE: Even if the WHERE filter is left off, we still need to perform a read to generate
725    /// retractions.
726    pub selection: HirRelationExpr,
727    /// Map from column index to SET expression. Empty for DELETE statements.
728    pub assignments: BTreeMap<usize, HirScalarExpr>,
729    pub finishing: RowSetFinishing,
730}
731
732pub fn plan_delete_query(
733    scx: &StatementContext,
734    mut delete_stmt: DeleteStatement<Aug>,
735) -> Result<ReadThenWritePlan, PlanError> {
736    transform_ast::transform(scx, &mut delete_stmt)?;
737
738    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
739    plan_mutation_query_inner(
740        qcx,
741        delete_stmt.table_name,
742        delete_stmt.alias,
743        delete_stmt.using,
744        vec![],
745        delete_stmt.selection,
746    )
747}
748
749pub fn plan_update_query(
750    scx: &StatementContext,
751    mut update_stmt: UpdateStatement<Aug>,
752) -> Result<ReadThenWritePlan, PlanError> {
753    transform_ast::transform(scx, &mut update_stmt)?;
754
755    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
756
757    plan_mutation_query_inner(
758        qcx,
759        update_stmt.table_name,
760        update_stmt.alias,
761        vec![],
762        update_stmt.assignments,
763        update_stmt.selection,
764    )
765}
766
767pub fn plan_mutation_query_inner(
768    qcx: QueryContext,
769    table_name: ResolvedItemName,
770    alias: Option<TableAlias>,
771    using: Vec<TableWithJoins<Aug>>,
772    assignments: Vec<Assignment<Aug>>,
773    selection: Option<Expr<Aug>>,
774) -> Result<ReadThenWritePlan, PlanError> {
775    // Get ID and version of the relation desc.
776    let (id, version) = match table_name {
777        ResolvedItemName::Item { id, version, .. } => (id, version),
778        _ => sql_bail!("cannot mutate non-user table"),
779    };
780
781    // Perform checks on item with given ID.
782    let item = qcx.scx.get_item(&id).at_version(version);
783    if item.item_type() != CatalogItemType::Table {
784        sql_bail!(
785            "cannot mutate {} '{}'",
786            item.item_type(),
787            table_name.full_name_str()
788        );
789    }
790    let _ = item.writable_table_details().ok_or_else(|| {
791        sql_err!(
792            "cannot mutate non-writeable table '{}'",
793            table_name.full_name_str()
794        )
795    })?;
796    if id.is_system() {
797        sql_bail!(
798            "cannot mutate system table '{}'",
799            table_name.full_name_str()
800        );
801    }
802
803    // Derive structs for operation from validated table
804    let (mut get, scope) = qcx.resolve_table_name(table_name)?;
805    let scope = plan_table_alias(scope, alias.as_ref())?;
806    let desc = item.desc(&qcx.scx.catalog.resolve_full_name(item.name()))?;
807    let relation_type = qcx.relation_type(&get);
808
809    if using.is_empty() {
810        if let Some(expr) = selection {
811            let ecx = &ExprContext {
812                qcx: &qcx,
813                name: "WHERE clause",
814                scope: &scope,
815                relation_type: &relation_type,
816                allow_aggregates: false,
817                allow_subqueries: true,
818                allow_parameters: true,
819                allow_windows: false,
820            };
821            let expr = plan_expr(ecx, &expr)?.type_as(ecx, &ScalarType::Bool)?;
822            get = get.filter(vec![expr]);
823        }
824    } else {
825        get = handle_mutation_using_clause(&qcx, selection, using, get, scope.clone())?;
826    }
827
828    let mut sets = BTreeMap::new();
829    for Assignment { id, value } in assignments {
830        // Get the index and type of the column.
831        let name = normalize::column_name(id);
832        match desc.get_by_name(&name) {
833            Some((idx, typ)) => {
834                let ecx = &ExprContext {
835                    qcx: &qcx,
836                    name: "SET clause",
837                    scope: &scope,
838                    relation_type: &relation_type,
839                    allow_aggregates: false,
840                    allow_subqueries: false,
841                    allow_parameters: true,
842                    allow_windows: false,
843                };
844                let expr = plan_expr(ecx, &value)?.cast_to(
845                    ecx,
846                    CastContext::Assignment,
847                    &typ.scalar_type,
848                )?;
849
850                if sets.insert(idx, expr).is_some() {
851                    sql_bail!("column {} set twice", name)
852                }
853            }
854            None => sql_bail!("unknown column {}", name),
855        };
856    }
857
858    let finishing = RowSetFinishing {
859        order_by: vec![],
860        limit: None,
861        offset: 0,
862        project: (0..desc.arity()).collect(),
863    };
864
865    Ok(ReadThenWritePlan {
866        id,
867        selection: get,
868        finishing,
869        assignments: sets,
870    })
871}
872
873// Adjust `get` to perform an existential subquery on `using` accounting for
874// `selection`.
875//
876// If `USING`, we essentially want to rewrite the query as a correlated
877// existential subquery, i.e.
878// ```
879// ...WHERE EXISTS (SELECT 1 FROM <using> WHERE <selection>)
880// ```
881// However, we can't do that directly because of esoteric rules w/r/t `lateral`
882// subqueries.
883// https://github.com/postgres/postgres/commit/158b7fa6a34006bdc70b515e14e120d3e896589b
884fn handle_mutation_using_clause(
885    qcx: &QueryContext,
886    selection: Option<Expr<Aug>>,
887    using: Vec<TableWithJoins<Aug>>,
888    get: HirRelationExpr,
889    outer_scope: Scope,
890) -> Result<HirRelationExpr, PlanError> {
891    // Plan `USING` as a cross-joined `FROM` without knowledge of the
892    // statement's `FROM` target. This prevents `lateral` subqueries from
893    // "seeing" the `FROM` target.
894    let (mut using_rel_expr, using_scope) =
895        using.into_iter().try_fold(plan_join_identity(), |l, twj| {
896            let (left, left_scope) = l;
897            plan_join(
898                qcx,
899                left,
900                left_scope,
901                &Join {
902                    relation: TableFactor::NestedJoin {
903                        join: Box::new(twj),
904                        alias: None,
905                    },
906                    join_operator: JoinOperator::CrossJoin,
907                },
908            )
909        })?;
910
911    if let Some(expr) = selection {
912        // Join `FROM` with `USING` tables, like `USING..., FROM`. This gives us
913        // PG-like semantics e.g. expressing ambiguous column references. We put
914        // `USING...` first for no real reason, but making a different decision
915        // would require adjusting the column references on this relation
916        // differently.
917        let on = HirScalarExpr::literal_true();
918        let joined = using_rel_expr
919            .clone()
920            .join(get.clone(), on, JoinKind::Inner);
921        let joined_scope = using_scope.product(outer_scope)?;
922        let joined_relation_type = qcx.relation_type(&joined);
923
924        let ecx = &ExprContext {
925            qcx,
926            name: "WHERE clause",
927            scope: &joined_scope,
928            relation_type: &joined_relation_type,
929            allow_aggregates: false,
930            allow_subqueries: true,
931            allow_parameters: true,
932            allow_windows: false,
933        };
934
935        // Plan the filter expression on `FROM, USING...`.
936        let mut expr = plan_expr(ecx, &expr)?.type_as(ecx, &ScalarType::Bool)?;
937
938        // Rewrite all column referring to the `FROM` section of `joined` (i.e.
939        // those to the right of `using_rel_expr`) to instead be correlated to
940        // the outer relation, i.e. `get`.
941        let using_rel_arity = qcx.relation_type(&using_rel_expr).arity();
942        // local import to not get confused with `mz_sql_parser::ast::visit::Visit`
943        use mz_expr::visit::Visit;
944        expr.visit_mut_post(&mut |e| {
945            if let HirScalarExpr::Column(c) = e {
946                if c.column >= using_rel_arity {
947                    c.level += 1;
948                    c.column -= using_rel_arity;
949                };
950            }
951        })?;
952
953        // Filter `USING` tables like `<using_rel_expr> WHERE <expr>`. Note that
954        // this filters the `USING` tables, _not_ the joined `USING..., FROM`
955        // relation.
956        using_rel_expr = using_rel_expr.filter(vec![expr]);
957    } else {
958        // Check that scopes are at compatible (i.e. do not double-reference
959        // same table), despite lack of selection
960        let _joined_scope = using_scope.product(outer_scope)?;
961    }
962    // From pg: Since the result [of EXISTS (<subquery>)] depends only on
963    // whether any rows are returned, and not on the contents of those rows,
964    // the output list of the subquery is normally unimportant.
965    //
966    // This means we don't need to worry about projecting/mapping any
967    // additional expressions here.
968    //
969    // https://www.postgresql.org/docs/14/functions-subquery.html
970
971    // Filter `get` like `...WHERE EXISTS (<using_rel_expr>)`.
972    Ok(get.filter(vec![using_rel_expr.exists()]))
973}
974
975#[derive(Debug)]
976pub(crate) struct CastRelationError {
977    pub(crate) column: usize,
978    pub(crate) source_type: ScalarType,
979    pub(crate) target_type: ScalarType,
980}
981
982/// Cast a relation from one type to another using the specified type of cast.
983///
984/// The length of `target_types` must match the arity of `expr`.
985pub(crate) fn cast_relation<'a, I>(
986    qcx: &QueryContext,
987    ccx: CastContext,
988    expr: HirRelationExpr,
989    target_types: I,
990) -> Result<HirRelationExpr, CastRelationError>
991where
992    I: IntoIterator<Item = &'a ScalarType>,
993{
994    let ecx = &ExprContext {
995        qcx,
996        name: "values",
997        scope: &Scope::empty(),
998        relation_type: &qcx.relation_type(&expr),
999        allow_aggregates: false,
1000        allow_subqueries: true,
1001        allow_parameters: true,
1002        allow_windows: false,
1003    };
1004    let mut map_exprs = vec![];
1005    let mut project_key = vec![];
1006    for (i, target_typ) in target_types.into_iter().enumerate() {
1007        let expr = HirScalarExpr::column(i);
1008        // We plan every cast and check the evaluated expressions rather than
1009        // checking the types directly because of some complex casting rules
1010        // between types not expressed in `ScalarType` equality.
1011        match typeconv::plan_cast(ecx, ccx, expr.clone(), target_typ) {
1012            Ok(cast_expr) => {
1013                if expr == cast_expr {
1014                    // Cast between types was unnecessary
1015                    project_key.push(i);
1016                } else {
1017                    // Cast between types required
1018                    project_key.push(ecx.relation_type.arity() + map_exprs.len());
1019                    map_exprs.push(cast_expr);
1020                }
1021            }
1022            Err(_) => {
1023                return Err(CastRelationError {
1024                    column: i,
1025                    source_type: ecx.scalar_type(&expr),
1026                    target_type: target_typ.clone(),
1027                });
1028            }
1029        }
1030    }
1031    Ok(expr.map(map_exprs).project(project_key))
1032}
1033
1034/// Plans an expression in the `UP TO` position of a `SUBSCRIBE` statement.
1035pub fn plan_up_to(
1036    scx: &StatementContext,
1037    mut up_to: Expr<Aug>,
1038) -> Result<MirScalarExpr, PlanError> {
1039    let scope = Scope::empty();
1040    let desc = RelationDesc::empty();
1041    // Even though this is part of a SUBSCRIBE, we need a QueryLifetime::OneShot (instead of
1042    // QueryLifetime::Subscribe), because the UP TO is evaluated only once.
1043    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1044    transform_ast::transform(scx, &mut up_to)?;
1045    let ecx = &ExprContext {
1046        qcx: &qcx,
1047        name: "UP TO",
1048        scope: &scope,
1049        relation_type: desc.typ(),
1050        allow_aggregates: false,
1051        allow_subqueries: false,
1052        allow_parameters: false,
1053        allow_windows: false,
1054    };
1055    plan_expr(ecx, &up_to)?
1056        .type_as_any(ecx)?
1057        .lower_uncorrelated()
1058}
1059
1060/// Plans an expression in the AS OF position of a `SELECT` or `SUBSCRIBE`, or `CREATE MATERIALIZED
1061/// VIEW` statement.
1062pub fn plan_as_of(
1063    scx: &StatementContext,
1064    as_of: Option<AsOf<Aug>>,
1065) -> Result<QueryWhen, PlanError> {
1066    match as_of {
1067        None => Ok(QueryWhen::Immediately),
1068        Some(mut as_of) => match as_of {
1069            AsOf::At(ref mut expr) | AsOf::AtLeast(ref mut expr) => {
1070                let scope = Scope::empty();
1071                let desc = RelationDesc::empty();
1072                // Even for a SUBSCRIBE, we need QueryLifetime::OneShot, because the AS OF is
1073                // evaluated only once.
1074                let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1075                transform_ast::transform(scx, expr)?;
1076                let ecx = &ExprContext {
1077                    qcx: &qcx,
1078                    name: "AS OF",
1079                    scope: &scope,
1080                    relation_type: desc.typ(),
1081                    allow_aggregates: false,
1082                    allow_subqueries: false,
1083                    allow_parameters: false,
1084                    allow_windows: false,
1085                };
1086                let expr = plan_expr(ecx, expr)?
1087                    .type_as_any(ecx)?
1088                    .lower_uncorrelated()?;
1089                match as_of {
1090                    AsOf::At(_) => Ok(QueryWhen::AtTimestamp(expr)),
1091                    AsOf::AtLeast(_) => Ok(QueryWhen::AtLeastTimestamp(expr)),
1092                }
1093            }
1094        },
1095    }
1096}
1097
1098/// Plans an expression in the AS position of a `CREATE SECRET`.
1099pub fn plan_secret_as(
1100    scx: &StatementContext,
1101    mut expr: Expr<Aug>,
1102) -> Result<MirScalarExpr, PlanError> {
1103    let scope = Scope::empty();
1104    let desc = RelationDesc::empty();
1105    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1106
1107    transform_ast::transform(scx, &mut expr)?;
1108
1109    let ecx = &ExprContext {
1110        qcx: &qcx,
1111        name: "AS",
1112        scope: &scope,
1113        relation_type: desc.typ(),
1114        allow_aggregates: false,
1115        allow_subqueries: false,
1116        allow_parameters: false,
1117        allow_windows: false,
1118    };
1119    let expr = plan_expr(ecx, &expr)?
1120        .type_as(ecx, &ScalarType::Bytes)?
1121        .lower_uncorrelated()?;
1122    Ok(expr)
1123}
1124
1125/// Plans an expression in the CHECK position of a `CREATE SOURCE ... FROM WEBHOOK`.
1126pub fn plan_webhook_validate_using(
1127    scx: &StatementContext,
1128    validate_using: CreateWebhookSourceCheck<Aug>,
1129) -> Result<WebhookValidation, PlanError> {
1130    let qcx = QueryContext::root(scx, QueryLifetime::Source);
1131
1132    let CreateWebhookSourceCheck {
1133        options,
1134        using: mut expr,
1135    } = validate_using;
1136
1137    let mut column_typs = vec![];
1138    let mut column_names = vec![];
1139
1140    let (bodies, headers, secrets) = options
1141        .map(|o| (o.bodies, o.headers, o.secrets))
1142        .unwrap_or_default();
1143
1144    // Append all of the bodies so they can be used in the expression.
1145    let mut body_tuples = vec![];
1146    for CreateWebhookSourceBody { alias, use_bytes } in bodies {
1147        let scalar_type = use_bytes
1148            .then_some(ScalarType::Bytes)
1149            .unwrap_or(ScalarType::String);
1150        let name = alias
1151            .map(|a| a.into_string())
1152            .unwrap_or_else(|| "body".to_string());
1153
1154        column_typs.push(ColumnType {
1155            scalar_type,
1156            nullable: false,
1157        });
1158        column_names.push(name);
1159
1160        // Store the column index so we can be sure to provide this body correctly.
1161        let column_idx = column_typs.len() - 1;
1162        // Double check we're consistent with column names.
1163        assert_eq!(
1164            column_idx,
1165            column_names.len() - 1,
1166            "body column names and types don't match"
1167        );
1168        body_tuples.push((column_idx, use_bytes));
1169    }
1170
1171    // Append all of the headers so they can be used in the expression.
1172    let mut header_tuples = vec![];
1173
1174    for CreateWebhookSourceHeader { alias, use_bytes } in headers {
1175        let value_type = use_bytes
1176            .then_some(ScalarType::Bytes)
1177            .unwrap_or(ScalarType::String);
1178        let name = alias
1179            .map(|a| a.into_string())
1180            .unwrap_or_else(|| "headers".to_string());
1181
1182        column_typs.push(ColumnType {
1183            scalar_type: ScalarType::Map {
1184                value_type: Box::new(value_type),
1185                custom_id: None,
1186            },
1187            nullable: false,
1188        });
1189        column_names.push(name);
1190
1191        // Store the column index so we can be sure to provide this body correctly.
1192        let column_idx = column_typs.len() - 1;
1193        // Double check we're consistent with column names.
1194        assert_eq!(
1195            column_idx,
1196            column_names.len() - 1,
1197            "header column names and types don't match"
1198        );
1199        header_tuples.push((column_idx, use_bytes));
1200    }
1201
1202    // Append all secrets so they can be used in the expression.
1203    let mut validation_secrets = vec![];
1204
1205    for CreateWebhookSourceSecret {
1206        secret,
1207        alias,
1208        use_bytes,
1209    } in secrets
1210    {
1211        // Either provide the secret to the validation expression as Bytes or a String.
1212        let scalar_type = use_bytes
1213            .then_some(ScalarType::Bytes)
1214            .unwrap_or(ScalarType::String);
1215
1216        column_typs.push(ColumnType {
1217            scalar_type,
1218            nullable: false,
1219        });
1220        let ResolvedItemName::Item {
1221            id,
1222            full_name: FullItemName { item, .. },
1223            ..
1224        } = secret
1225        else {
1226            return Err(PlanError::InvalidSecret(Box::new(secret)));
1227        };
1228
1229        // Plan the expression using the secret's alias, if one is provided.
1230        let name = if let Some(alias) = alias {
1231            alias.into_string()
1232        } else {
1233            item
1234        };
1235        column_names.push(name);
1236
1237        // Get the column index that corresponds for this secret, so we can make sure to provide the
1238        // secrets in the correct order during evaluation.
1239        let column_idx = column_typs.len() - 1;
1240        // Double check that our column names and types match.
1241        assert_eq!(
1242            column_idx,
1243            column_names.len() - 1,
1244            "column names and types don't match"
1245        );
1246
1247        validation_secrets.push(WebhookValidationSecret {
1248            id,
1249            column_idx,
1250            use_bytes,
1251        });
1252    }
1253
1254    let relation_typ = RelationType::new(column_typs);
1255    let desc = RelationDesc::new(relation_typ, column_names.clone());
1256    let scope = Scope::from_source(None, column_names);
1257
1258    transform_ast::transform(scx, &mut expr)?;
1259
1260    let ecx = &ExprContext {
1261        qcx: &qcx,
1262        name: "CHECK",
1263        scope: &scope,
1264        relation_type: desc.typ(),
1265        allow_aggregates: false,
1266        allow_subqueries: false,
1267        allow_parameters: false,
1268        allow_windows: false,
1269    };
1270    let expr = plan_expr(ecx, &expr)?
1271        .type_as(ecx, &ScalarType::Bool)?
1272        .lower_uncorrelated()?;
1273    let validation = WebhookValidation {
1274        expression: expr,
1275        relation_desc: desc,
1276        bodies: body_tuples,
1277        headers: header_tuples,
1278        secrets: validation_secrets,
1279    };
1280    Ok(validation)
1281}
1282
1283pub fn plan_default_expr(
1284    scx: &StatementContext,
1285    expr: &Expr<Aug>,
1286    target_ty: &ScalarType,
1287) -> Result<HirScalarExpr, PlanError> {
1288    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1289    let ecx = &ExprContext {
1290        qcx: &qcx,
1291        name: "DEFAULT expression",
1292        scope: &Scope::empty(),
1293        relation_type: &RelationType::empty(),
1294        allow_aggregates: false,
1295        allow_subqueries: false,
1296        allow_parameters: false,
1297        allow_windows: false,
1298    };
1299    let hir = plan_expr(ecx, expr)?.cast_to(ecx, CastContext::Assignment, target_ty)?;
1300    Ok(hir)
1301}
1302
1303pub fn plan_params<'a>(
1304    scx: &'a StatementContext,
1305    params: Vec<Expr<Aug>>,
1306    desc: &StatementDesc,
1307) -> Result<Params, PlanError> {
1308    if params.len() != desc.param_types.len() {
1309        sql_bail!(
1310            "expected {} params, got {}",
1311            desc.param_types.len(),
1312            params.len()
1313        );
1314    }
1315
1316    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
1317    let scope = Scope::empty();
1318    let rel_type = RelationType::empty();
1319
1320    let mut datums = Row::default();
1321    let mut packer = datums.packer();
1322    let mut types = Vec::new();
1323    let temp_storage = &RowArena::new();
1324    for (mut expr, ty) in params.into_iter().zip(&desc.param_types) {
1325        transform_ast::transform(scx, &mut expr)?;
1326
1327        let ecx = &ExprContext {
1328            qcx: &qcx,
1329            name: "EXECUTE",
1330            scope: &scope,
1331            relation_type: &rel_type,
1332            allow_aggregates: false,
1333            allow_subqueries: false,
1334            allow_parameters: false,
1335            allow_windows: false,
1336        };
1337        let ex = plan_expr(ecx, &expr)?.type_as_any(ecx)?;
1338        let st = ecx.scalar_type(&ex);
1339        if st != *ty {
1340            sql_bail!(
1341                "mismatched parameter type: expected {}, got {}",
1342                ecx.humanize_scalar_type(ty, false),
1343                ecx.humanize_scalar_type(&st, false),
1344            );
1345        }
1346        let ex = ex.lower_uncorrelated()?;
1347        let evaled = ex.eval(&[], temp_storage)?;
1348        packer.push(evaled);
1349        types.push(st);
1350    }
1351    Ok(Params { datums, types })
1352}
1353
1354pub fn plan_index_exprs<'a>(
1355    scx: &'a StatementContext,
1356    on_desc: &RelationDesc,
1357    exprs: Vec<Expr<Aug>>,
1358) -> Result<Vec<mz_expr::MirScalarExpr>, PlanError> {
1359    let scope = Scope::from_source(None, on_desc.iter_names());
1360    let qcx = QueryContext::root(scx, QueryLifetime::Index);
1361
1362    let ecx = &ExprContext {
1363        qcx: &qcx,
1364        name: "CREATE INDEX",
1365        scope: &scope,
1366        relation_type: on_desc.typ(),
1367        allow_aggregates: false,
1368        allow_subqueries: false,
1369        allow_parameters: false,
1370        allow_windows: false,
1371    };
1372    let mut out = vec![];
1373    for mut expr in exprs {
1374        transform_ast::transform(scx, &mut expr)?;
1375        let expr = plan_expr_or_col_index(ecx, &expr)?;
1376        let mut expr = expr.lower_uncorrelated()?;
1377        expr.reduce(&on_desc.typ().column_types);
1378        out.push(expr);
1379    }
1380    Ok(out)
1381}
1382
1383fn plan_expr_or_col_index(ecx: &ExprContext, e: &Expr<Aug>) -> Result<HirScalarExpr, PlanError> {
1384    match check_col_index(ecx.name, e, ecx.relation_type.column_types.len())? {
1385        Some(column) => Ok(HirScalarExpr::column(column)),
1386        _ => plan_expr(ecx, e)?.type_as_any(ecx),
1387    }
1388}
1389
1390fn check_col_index(name: &str, e: &Expr<Aug>, max: usize) -> Result<Option<usize>, PlanError> {
1391    match e {
1392        Expr::Value(Value::Number(n)) => {
1393            let n = n.parse::<usize>().map_err(|e| {
1394                sql_err!("unable to parse column reference in {}: {}: {}", name, n, e)
1395            })?;
1396            if n < 1 || n > max {
1397                sql_bail!(
1398                    "column reference {} in {} is out of range (1 - {})",
1399                    n,
1400                    name,
1401                    max
1402                );
1403            }
1404            Ok(Some(n - 1))
1405        }
1406        _ => Ok(None),
1407    }
1408}
1409
1410struct PlannedQuery {
1411    expr: HirRelationExpr,
1412    scope: Scope,
1413    order_by: Vec<ColumnOrder>,
1414    limit: Option<HirScalarExpr>,
1415    offset: usize,
1416    project: Vec<usize>,
1417    group_size_hints: GroupSizeHints,
1418}
1419
1420fn plan_query(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1421    qcx.checked_recur_mut(|qcx| plan_query_inner(qcx, q))
1422}
1423
1424fn plan_query_inner(qcx: &mut QueryContext, q: &Query<Aug>) -> Result<PlannedQuery, PlanError> {
1425    // Plan CTEs and introduce bindings to `qcx.ctes`. Returns shadowed bindings
1426    // for the identifiers, so that they can be re-installed before returning.
1427    let cte_bindings = plan_ctes(qcx, q)?;
1428
1429    let limit = match &q.limit {
1430        None => None,
1431        Some(Limit {
1432            quantity,
1433            with_ties: false,
1434        }) => {
1435            let ecx = &ExprContext {
1436                qcx,
1437                name: "LIMIT",
1438                scope: &Scope::empty(),
1439                relation_type: &RelationType::empty(),
1440                allow_aggregates: false,
1441                allow_subqueries: true,
1442                allow_parameters: true,
1443                allow_windows: false,
1444            };
1445            let limit = plan_expr(ecx, quantity)?;
1446            let limit = limit.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?;
1447
1448            let limit = if limit.is_constant() {
1449                let arena = RowArena::new();
1450                let limit = limit.lower_uncorrelated()?;
1451
1452                match limit.eval(&[], &arena)? {
1453                    d @ Datum::Int64(v) if v >= 0 => HirScalarExpr::literal(d, ScalarType::Int64),
1454                    d @ Datum::Null => HirScalarExpr::literal(d, ScalarType::Int64),
1455                    Datum::Int64(_) => sql_bail!("LIMIT must not be negative"),
1456                    _ => sql_bail!("constant LIMIT expression must reduce to an INT or NULL value"),
1457                }
1458            } else {
1459                // Gate non-constant LIMIT expressions behind a feature flag
1460                qcx.scx
1461                    .require_feature_flag(&vars::ENABLE_EXPRESSIONS_IN_LIMIT_SYNTAX)?;
1462                limit
1463            };
1464
1465            Some(limit)
1466        }
1467        Some(Limit {
1468            quantity: _,
1469            with_ties: true,
1470        }) => bail_unsupported!("FETCH ... WITH TIES"),
1471    };
1472    let offset = match &q.offset {
1473        None => 0,
1474        Some(Expr::Value(Value::Number(x))) => x.parse()?,
1475        _ => sql_bail!("OFFSET must be an integer constant"),
1476    };
1477
1478    let mut planned_query = match &q.body {
1479        SetExpr::Select(s) => {
1480            // Extract query options.
1481            let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
1482            let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
1483
1484            let plan = plan_select_from_where(qcx, *s.clone(), q.order_by.clone())?;
1485            PlannedQuery {
1486                expr: plan.expr,
1487                scope: plan.scope,
1488                order_by: plan.order_by,
1489                project: plan.project,
1490                limit,
1491                offset,
1492                group_size_hints,
1493            }
1494        }
1495        _ => {
1496            let (expr, scope) = plan_set_expr(qcx, &q.body)?;
1497            let ecx = &ExprContext {
1498                qcx,
1499                name: "ORDER BY clause of a set expression",
1500                scope: &scope,
1501                relation_type: &qcx.relation_type(&expr),
1502                allow_aggregates: false,
1503                allow_subqueries: true,
1504                allow_parameters: true,
1505                allow_windows: false,
1506            };
1507            let output_columns: Vec<_> = scope.column_names().enumerate().collect();
1508            let (order_by, map_exprs) = plan_order_by_exprs(ecx, &q.order_by, &output_columns)?;
1509            let project = (0..ecx.relation_type.arity()).collect();
1510            PlannedQuery {
1511                expr: expr.map(map_exprs),
1512                scope,
1513                order_by,
1514                limit,
1515                project,
1516                offset,
1517                group_size_hints: GroupSizeHints::default(),
1518            }
1519        }
1520    };
1521
1522    // Both introduce `Let` bindings atop `result` and re-install shadowed bindings.
1523    match &q.ctes {
1524        CteBlock::Simple(_) => {
1525            for (id, value, shadowed_val) in cte_bindings.into_iter().rev() {
1526                if let Some(cte) = qcx.ctes.remove(&id) {
1527                    planned_query.expr = HirRelationExpr::Let {
1528                        name: cte.name,
1529                        id: id.clone(),
1530                        value: Box::new(value),
1531                        body: Box::new(planned_query.expr),
1532                    };
1533                }
1534                if let Some(shadowed_val) = shadowed_val {
1535                    qcx.ctes.insert(id, shadowed_val);
1536                }
1537            }
1538        }
1539        CteBlock::MutuallyRecursive(MutRecBlock { options, ctes: _ }) => {
1540            let MutRecBlockOptionExtracted {
1541                recursion_limit,
1542                return_at_recursion_limit,
1543                error_at_recursion_limit,
1544                seen: _,
1545            } = MutRecBlockOptionExtracted::try_from(options.clone())?;
1546            let limit = match (recursion_limit, return_at_recursion_limit, error_at_recursion_limit) {
1547                (None, None, None) => None,
1548                (Some(max_iters), None, None) => Some((max_iters, LetRecLimit::RETURN_AT_LIMIT_DEFAULT)),
1549                (None, Some(max_iters), None) => Some((max_iters, true)),
1550                (None, None, Some(max_iters)) => Some((max_iters, false)),
1551                _ => {
1552                    return Err(InvalidWmrRecursionLimit("More than one recursion limit given. Please give at most one of RECURSION LIMIT, ERROR AT RECURSION LIMIT, RETURN AT RECURSION LIMIT.".to_owned()));
1553                }
1554            }.try_map(|(max_iters, return_at_limit)| Ok::<LetRecLimit, PlanError>(LetRecLimit {
1555                max_iters: NonZeroU64::new(*max_iters).ok_or(InvalidWmrRecursionLimit("Recursion limit has to be greater than 0.".to_owned()))?,
1556                return_at_limit: *return_at_limit,
1557            }))?;
1558
1559            let mut bindings = Vec::new();
1560            for (id, value, shadowed_val) in cte_bindings.into_iter() {
1561                if let Some(cte) = qcx.ctes.remove(&id) {
1562                    bindings.push((cte.name, id, value, cte.desc.typ().clone()));
1563                }
1564                if let Some(shadowed_val) = shadowed_val {
1565                    qcx.ctes.insert(id, shadowed_val);
1566                }
1567            }
1568            if !bindings.is_empty() {
1569                planned_query.expr = HirRelationExpr::LetRec {
1570                    limit,
1571                    bindings,
1572                    body: Box::new(planned_query.expr),
1573                }
1574            }
1575        }
1576    }
1577
1578    Ok(planned_query)
1579}
1580
1581generate_extracted_config!(
1582    MutRecBlockOption,
1583    (RecursionLimit, u64),
1584    (ReturnAtRecursionLimit, u64),
1585    (ErrorAtRecursionLimit, u64)
1586);
1587
1588/// Creates plans for CTEs and introduces them to `qcx.ctes`.
1589///
1590/// Returns for each identifier a planned `HirRelationExpr` value, and an optional
1591/// shadowed value that can be reinstalled once the planning has completed.
1592pub fn plan_ctes(
1593    qcx: &mut QueryContext,
1594    q: &Query<Aug>,
1595) -> Result<Vec<(LocalId, HirRelationExpr, Option<CteDesc>)>, PlanError> {
1596    // Accumulate planned expressions and shadowed descriptions.
1597    let mut result = Vec::new();
1598    // Retain the old descriptions of CTE bindings so that we can restore them
1599    // after we're done planning this SELECT.
1600    let mut shadowed_descs = BTreeMap::new();
1601
1602    // A reused identifier indicates a reused name.
1603    if let Some(ident) = q.ctes.bound_identifiers().duplicates().next() {
1604        sql_bail!(
1605            "WITH query name {} specified more than once",
1606            normalize::ident_ref(ident).quoted()
1607        )
1608    }
1609
1610    match &q.ctes {
1611        CteBlock::Simple(ctes) => {
1612            // Plan all CTEs, introducing the types for non-recursive CTEs as we go.
1613            for cte in ctes.iter() {
1614                let cte_name = normalize::ident(cte.alias.name.clone());
1615                let (val, scope) = plan_nested_query(qcx, &cte.query)?;
1616                let typ = qcx.relation_type(&val);
1617                let mut desc = RelationDesc::new(typ, scope.column_names());
1618                plan_utils::maybe_rename_columns(
1619                    format!("CTE {}", cte.alias.name),
1620                    &mut desc,
1621                    &cte.alias.columns,
1622                )?;
1623                // Capture the prior value if it exists, so that it can be re-installed.
1624                let shadowed = qcx.ctes.insert(
1625                    cte.id,
1626                    CteDesc {
1627                        name: cte_name,
1628                        desc,
1629                    },
1630                );
1631
1632                result.push((cte.id, val, shadowed));
1633            }
1634        }
1635        CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
1636            // Insert column types into `qcx.ctes` first for recursive bindings.
1637            for cte in ctes.iter() {
1638                let cte_name = normalize::ident(cte.name.clone());
1639                let mut desc_columns = Vec::with_capacity(cte.columns.capacity());
1640                for column in cte.columns.iter() {
1641                    desc_columns.push((
1642                        normalize::column_name(column.name.clone()),
1643                        ColumnType {
1644                            scalar_type: scalar_type_from_sql(qcx.scx, &column.data_type)?,
1645                            nullable: true,
1646                        },
1647                    ));
1648                }
1649                let desc = RelationDesc::from_names_and_types(desc_columns);
1650                let shadowed = qcx.ctes.insert(
1651                    cte.id,
1652                    CteDesc {
1653                        name: cte_name,
1654                        desc,
1655                    },
1656                );
1657                // Capture the prior value if it exists, so that it can be re-installed.
1658                if let Some(shadowed) = shadowed {
1659                    shadowed_descs.insert(cte.id, shadowed);
1660                }
1661            }
1662
1663            // Plan all CTEs and validate the proposed types.
1664            for cte in ctes.iter() {
1665                let (val, _scope) = plan_nested_query(qcx, &cte.query)?;
1666
1667                let proposed_typ = qcx.ctes[&cte.id].desc.typ();
1668
1669                if proposed_typ.column_types.iter().any(|c| !c.nullable) {
1670                    // Once WMR CTEs support NOT NULL constraints, check that
1671                    // nullability of derived column types are compatible.
1672                    sql_bail!(
1673                        "[internal error]: WMR CTEs do not support NOT NULL constraints on proposed column types"
1674                    );
1675                }
1676
1677                if !proposed_typ.keys.is_empty() {
1678                    // Once WMR CTEs support keys, check that keys exactly
1679                    // overlap.
1680                    sql_bail!("[internal error]: WMR CTEs do not support keys");
1681                }
1682
1683                // Validate that the derived and proposed types are the same.
1684                let derived_typ = qcx.relation_type(&val);
1685
1686                let type_err = |proposed_typ: &RelationType, derived_typ: RelationType| {
1687                    let cte_name = normalize::ident(cte.name.clone());
1688                    let proposed_typ = proposed_typ
1689                        .column_types
1690                        .iter()
1691                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1692                        .collect::<Vec<_>>();
1693                    let inferred_typ = derived_typ
1694                        .column_types
1695                        .iter()
1696                        .map(|ty| qcx.humanize_scalar_type(&ty.scalar_type, false))
1697                        .collect::<Vec<_>>();
1698                    Err(PlanError::RecursiveTypeMismatch(
1699                        cte_name,
1700                        proposed_typ,
1701                        inferred_typ,
1702                    ))
1703                };
1704
1705                if derived_typ.column_types.len() != proposed_typ.column_types.len() {
1706                    return type_err(proposed_typ, derived_typ);
1707                }
1708
1709                // Cast derived types to proposed types or error.
1710                let val = match cast_relation(
1711                    qcx,
1712                    // Choose `CastContext::Assignment`` because the user has
1713                    // been explicit about the types they expect. Choosing
1714                    // `CastContext::Implicit` is not "strong" enough to impose
1715                    // typmods from proposed types onto values.
1716                    CastContext::Assignment,
1717                    val,
1718                    proposed_typ.column_types.iter().map(|c| &c.scalar_type),
1719                ) {
1720                    Ok(val) => val,
1721                    Err(_) => return type_err(proposed_typ, derived_typ),
1722                };
1723
1724                result.push((cte.id, val, shadowed_descs.remove(&cte.id)));
1725            }
1726        }
1727    }
1728
1729    Ok(result)
1730}
1731
1732pub fn plan_nested_query(
1733    qcx: &mut QueryContext,
1734    q: &Query<Aug>,
1735) -> Result<(HirRelationExpr, Scope), PlanError> {
1736    let PlannedQuery {
1737        mut expr,
1738        scope,
1739        order_by,
1740        limit,
1741        offset,
1742        project,
1743        group_size_hints,
1744    } = qcx.checked_recur_mut(|qcx| plan_query(qcx, q))?;
1745    if limit.is_some() || offset > 0 {
1746        expr = HirRelationExpr::top_k(
1747            expr,
1748            vec![],
1749            order_by,
1750            limit,
1751            offset,
1752            group_size_hints.limit_input_group_size,
1753        );
1754    }
1755    Ok((expr.project(project), scope))
1756}
1757
1758fn plan_set_expr(
1759    qcx: &mut QueryContext,
1760    q: &SetExpr<Aug>,
1761) -> Result<(HirRelationExpr, Scope), PlanError> {
1762    match q {
1763        SetExpr::Select(select) => {
1764            let order_by_exprs = Vec::new();
1765            let plan = plan_select_from_where(qcx, *select.clone(), order_by_exprs)?;
1766            // We didn't provide any `order_by_exprs`, so `plan_select_from_where`
1767            // should not have planned any ordering.
1768            assert!(plan.order_by.is_empty());
1769            Ok((plan.expr.project(plan.project), plan.scope))
1770        }
1771        SetExpr::SetOperation {
1772            op,
1773            all,
1774            left,
1775            right,
1776        } => {
1777            // Plan the LHS and RHS.
1778            let (left_expr, left_scope) = qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, left))?;
1779            let (right_expr, right_scope) =
1780                qcx.checked_recur_mut(|qcx| plan_set_expr(qcx, right))?;
1781
1782            // Validate that the LHS and RHS are the same width.
1783            let left_type = qcx.relation_type(&left_expr);
1784            let right_type = qcx.relation_type(&right_expr);
1785            if left_type.arity() != right_type.arity() {
1786                sql_bail!(
1787                    "each {} query must have the same number of columns: {} vs {}",
1788                    op,
1789                    left_type.arity(),
1790                    right_type.arity(),
1791                );
1792            }
1793
1794            // Match the types of the corresponding columns on the LHS and RHS
1795            // using the normal type coercion rules. This is equivalent to
1796            // `coerce_homogeneous_exprs`, but implemented in terms of
1797            // `HirRelationExpr` rather than `HirScalarExpr`.
1798            let left_ecx = &ExprContext {
1799                qcx,
1800                name: &op.to_string(),
1801                scope: &left_scope,
1802                relation_type: &left_type,
1803                allow_aggregates: false,
1804                allow_subqueries: false,
1805                allow_parameters: false,
1806                allow_windows: false,
1807            };
1808            let right_ecx = &ExprContext {
1809                qcx,
1810                name: &op.to_string(),
1811                scope: &right_scope,
1812                relation_type: &right_type,
1813                allow_aggregates: false,
1814                allow_subqueries: false,
1815                allow_parameters: false,
1816                allow_windows: false,
1817            };
1818            let mut left_casts = vec![];
1819            let mut right_casts = vec![];
1820            for (i, (left_type, right_type)) in left_type
1821                .column_types
1822                .iter()
1823                .zip(right_type.column_types.iter())
1824                .enumerate()
1825            {
1826                let types = &[
1827                    CoercibleScalarType::Coerced(left_type.scalar_type.clone()),
1828                    CoercibleScalarType::Coerced(right_type.scalar_type.clone()),
1829                ];
1830                let target =
1831                    typeconv::guess_best_common_type(&left_ecx.with_name(&op.to_string()), types)?;
1832                match typeconv::plan_cast(
1833                    left_ecx,
1834                    CastContext::Implicit,
1835                    HirScalarExpr::column(i),
1836                    &target,
1837                ) {
1838                    Ok(expr) => left_casts.push(expr),
1839                    Err(_) => sql_bail!(
1840                        "{} types {} and {} cannot be matched",
1841                        op,
1842                        qcx.humanize_scalar_type(&left_type.scalar_type, false),
1843                        qcx.humanize_scalar_type(&target, false),
1844                    ),
1845                }
1846                match typeconv::plan_cast(
1847                    right_ecx,
1848                    CastContext::Implicit,
1849                    HirScalarExpr::column(i),
1850                    &target,
1851                ) {
1852                    Ok(expr) => right_casts.push(expr),
1853                    Err(_) => sql_bail!(
1854                        "{} types {} and {} cannot be matched",
1855                        op,
1856                        qcx.humanize_scalar_type(&target, false),
1857                        qcx.humanize_scalar_type(&right_type.scalar_type, false),
1858                    ),
1859                }
1860            }
1861            let lhs = if left_casts
1862                .iter()
1863                .enumerate()
1864                .any(|(i, e)| e != &HirScalarExpr::column(i))
1865            {
1866                let project_key: Vec<_> = (left_type.arity()..left_type.arity() * 2).collect();
1867                left_expr.map(left_casts).project(project_key)
1868            } else {
1869                left_expr
1870            };
1871            let rhs = if right_casts
1872                .iter()
1873                .enumerate()
1874                .any(|(i, e)| e != &HirScalarExpr::column(i))
1875            {
1876                let project_key: Vec<_> = (right_type.arity()..right_type.arity() * 2).collect();
1877                right_expr.map(right_casts).project(project_key)
1878            } else {
1879                right_expr
1880            };
1881
1882            let relation_expr = match op {
1883                SetOperator::Union => {
1884                    if *all {
1885                        lhs.union(rhs)
1886                    } else {
1887                        lhs.union(rhs).distinct()
1888                    }
1889                }
1890                SetOperator::Except => Hir::except(all, lhs, rhs),
1891                SetOperator::Intersect => {
1892                    // TODO: Let's not duplicate the left-hand expression into TWO dataflows!
1893                    // Though we believe that render() does The Right Thing (TM)
1894                    // Also note that we do *not* need another threshold() at the end of the method chain
1895                    // because the right-hand side of the outer union only produces existing records,
1896                    // i.e., the record counts for differential data flow definitely remain non-negative.
1897                    let left_clone = lhs.clone();
1898                    if *all {
1899                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1900                    } else {
1901                        lhs.union(left_clone.union(rhs.negate()).threshold().negate())
1902                            .distinct()
1903                    }
1904                }
1905            };
1906            let scope = Scope::from_source(
1907                None,
1908                // Column names are taken from the left, as in Postgres.
1909                left_scope.column_names(),
1910            );
1911
1912            Ok((relation_expr, scope))
1913        }
1914        SetExpr::Values(Values(values)) => plan_values(qcx, values),
1915        SetExpr::Table(name) => {
1916            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
1917            Ok((expr, scope))
1918        }
1919        SetExpr::Query(query) => {
1920            let (expr, scope) = plan_nested_query(qcx, query)?;
1921            Ok((expr, scope))
1922        }
1923        SetExpr::Show(stmt) => {
1924            // The create SQL definition of involving this query, will have the explicit `SHOW`
1925            // command in it. Many `SHOW` commands will expand into a sub-query that involves the
1926            // current schema of the executing user. When Materialize restarts and tries to re-plan
1927            // these queries, it will only have access to the raw `SHOW` command and have no idea
1928            // what schema to use. As a result Materialize will fail to boot.
1929            //
1930            // Some `SHOW` commands are ok, like `SHOW CLUSTERS`, and there are probably other ways
1931            // around this issue. Such as expanding the `SHOW` command in the SQL definition.
1932            // However, banning show commands in views gives us more flexibility to change their
1933            // output.
1934            //
1935            // TODO(jkosh44) Add message to error that prints out an equivalent view definition
1936            // with all show commands expanded into their equivalent SELECT statements.
1937            if !qcx.lifetime.allow_show() {
1938                return Err(PlanError::ShowCommandInView);
1939            }
1940
1941            // Some SHOW statements are a SELECT query. Others produces Rows
1942            // directly. Convert both of these to the needed Hir and Scope.
1943            fn to_hirscope(
1944                plan: ShowCreatePlan,
1945                desc: StatementDesc,
1946            ) -> Result<(HirRelationExpr, Scope), PlanError> {
1947                let rows = vec![plan.row.iter().collect::<Vec<_>>()];
1948                let desc = desc.relation_desc.expect("must exist");
1949                let expr = HirRelationExpr::constant(rows, desc.typ().clone());
1950                let scope = Scope::from_source(None, desc.iter_names());
1951                Ok((expr, scope))
1952            }
1953
1954            match stmt.clone() {
1955                ShowStatement::ShowColumns(stmt) => {
1956                    show::show_columns(qcx.scx, stmt)?.plan_hir(qcx)
1957                }
1958                ShowStatement::ShowCreateConnection(stmt) => to_hirscope(
1959                    show::plan_show_create_connection(qcx.scx, stmt.clone())?,
1960                    show::describe_show_create_connection(qcx.scx, stmt)?,
1961                ),
1962                ShowStatement::ShowCreateCluster(stmt) => to_hirscope(
1963                    show::plan_show_create_cluster(qcx.scx, stmt.clone())?,
1964                    show::describe_show_create_cluster(qcx.scx, stmt)?,
1965                ),
1966                ShowStatement::ShowCreateIndex(stmt) => to_hirscope(
1967                    show::plan_show_create_index(qcx.scx, stmt.clone())?,
1968                    show::describe_show_create_index(qcx.scx, stmt)?,
1969                ),
1970                ShowStatement::ShowCreateSink(stmt) => to_hirscope(
1971                    show::plan_show_create_sink(qcx.scx, stmt.clone())?,
1972                    show::describe_show_create_sink(qcx.scx, stmt)?,
1973                ),
1974                ShowStatement::ShowCreateSource(stmt) => to_hirscope(
1975                    show::plan_show_create_source(qcx.scx, stmt.clone())?,
1976                    show::describe_show_create_source(qcx.scx, stmt)?,
1977                ),
1978                ShowStatement::ShowCreateTable(stmt) => to_hirscope(
1979                    show::plan_show_create_table(qcx.scx, stmt.clone())?,
1980                    show::describe_show_create_table(qcx.scx, stmt)?,
1981                ),
1982                ShowStatement::ShowCreateView(stmt) => to_hirscope(
1983                    show::plan_show_create_view(qcx.scx, stmt.clone())?,
1984                    show::describe_show_create_view(qcx.scx, stmt)?,
1985                ),
1986                ShowStatement::ShowCreateMaterializedView(stmt) => to_hirscope(
1987                    show::plan_show_create_materialized_view(qcx.scx, stmt.clone())?,
1988                    show::describe_show_create_materialized_view(qcx.scx, stmt)?,
1989                ),
1990                ShowStatement::ShowObjects(stmt) => {
1991                    show::show_objects(qcx.scx, stmt)?.plan_hir(qcx)
1992                }
1993                ShowStatement::ShowVariable(_) => bail_unsupported!("SHOW variable in subqueries"),
1994                ShowStatement::InspectShard(_) => sql_bail!("unsupported INSPECT statement"),
1995            }
1996        }
1997    }
1998}
1999
2000/// Plans a `VALUES` clause that appears in a `SELECT` statement.
2001fn plan_values(
2002    qcx: &QueryContext,
2003    values: &[Vec<Expr<Aug>>],
2004) -> Result<(HirRelationExpr, Scope), PlanError> {
2005    assert!(!values.is_empty());
2006
2007    let ecx = &ExprContext {
2008        qcx,
2009        name: "VALUES",
2010        scope: &Scope::empty(),
2011        relation_type: &RelationType::empty(),
2012        allow_aggregates: false,
2013        allow_subqueries: true,
2014        allow_parameters: true,
2015        allow_windows: false,
2016    };
2017
2018    let ncols = values[0].len();
2019    let nrows = values.len();
2020
2021    // Arrange input expressions by columns, not rows, so that we can
2022    // call `coerce_homogeneous_exprs` on each column.
2023    let mut cols = vec![vec![]; ncols];
2024    for row in values {
2025        if row.len() != ncols {
2026            sql_bail!(
2027                "VALUES expression has varying number of columns: {} vs {}",
2028                row.len(),
2029                ncols
2030            );
2031        }
2032        for (i, v) in row.iter().enumerate() {
2033            cols[i].push(v);
2034        }
2035    }
2036
2037    // Plan each column.
2038    let mut col_iters = Vec::with_capacity(ncols);
2039    let mut col_types = Vec::with_capacity(ncols);
2040    for col in &cols {
2041        let col = coerce_homogeneous_exprs(ecx, plan_exprs(ecx, col)?, None)?;
2042        let mut col_type = ecx.column_type(&col[0]);
2043        for val in &col[1..] {
2044            col_type = col_type.union(&ecx.column_type(val))?;
2045        }
2046        col_types.push(col_type);
2047        col_iters.push(col.into_iter());
2048    }
2049
2050    // Build constant relation.
2051    let mut exprs = vec![];
2052    for _ in 0..nrows {
2053        for i in 0..ncols {
2054            exprs.push(col_iters[i].next().unwrap());
2055        }
2056    }
2057    let out = HirRelationExpr::CallTable {
2058        func: mz_expr::TableFunc::Wrap {
2059            width: ncols,
2060            types: col_types,
2061        },
2062        exprs,
2063    };
2064
2065    // Build column names.
2066    let mut scope = Scope::empty();
2067    for i in 0..ncols {
2068        let name = format!("column{}", i + 1);
2069        scope.items.push(ScopeItem::from_column_name(name));
2070    }
2071
2072    Ok((out, scope))
2073}
2074
2075/// Plans a `VALUES` clause that appears at the top level of an `INSERT`
2076/// statement.
2077///
2078/// This is special-cased in PostgreSQL and different enough from `plan_values`
2079/// that it is easier to use a separate function entirely. Unlike a normal
2080/// `VALUES` clause, each value is coerced to the type of the target table
2081/// via an assignment cast.
2082///
2083/// See: <https://github.com/postgres/postgres/blob/ad77039fa/src/backend/parser/analyze.c#L504-L518>
2084fn plan_values_insert(
2085    qcx: &QueryContext,
2086    target_names: &[&ColumnName],
2087    target_types: &[&ScalarType],
2088    values: &[Vec<Expr<Aug>>],
2089) -> Result<HirRelationExpr, PlanError> {
2090    assert!(!values.is_empty());
2091
2092    if !values.iter().map(|row| row.len()).all_equal() {
2093        sql_bail!("VALUES lists must all be the same length");
2094    }
2095
2096    let ecx = &ExprContext {
2097        qcx,
2098        name: "VALUES",
2099        scope: &Scope::empty(),
2100        relation_type: &RelationType::empty(),
2101        allow_aggregates: false,
2102        allow_subqueries: true,
2103        allow_parameters: true,
2104        allow_windows: false,
2105    };
2106
2107    let mut exprs = vec![];
2108    let mut types = vec![];
2109    for row in values {
2110        if row.len() > target_names.len() {
2111            sql_bail!("INSERT has more expressions than target columns");
2112        }
2113        for (column, val) in row.into_iter().enumerate() {
2114            let target_type = &target_types[column];
2115            let val = plan_expr(ecx, val)?;
2116            let val = typeconv::plan_coerce(ecx, val, target_type)?;
2117            let source_type = &ecx.scalar_type(&val);
2118            let val = match typeconv::plan_cast(ecx, CastContext::Assignment, val, target_type) {
2119                Ok(val) => val,
2120                Err(_) => sql_bail!(
2121                    "column {} is of type {} but expression is of type {}",
2122                    target_names[column].as_str().quoted(),
2123                    qcx.humanize_scalar_type(target_type, false),
2124                    qcx.humanize_scalar_type(source_type, false),
2125                ),
2126            };
2127            if column >= types.len() {
2128                types.push(ecx.column_type(&val));
2129            } else {
2130                types[column] = types[column].union(&ecx.column_type(&val))?;
2131            }
2132            exprs.push(val);
2133        }
2134    }
2135
2136    Ok(HirRelationExpr::CallTable {
2137        func: mz_expr::TableFunc::Wrap {
2138            width: values[0].len(),
2139            types,
2140        },
2141        exprs,
2142    })
2143}
2144
2145fn plan_join_identity() -> (HirRelationExpr, Scope) {
2146    let typ = RelationType::new(vec![]);
2147    let expr = HirRelationExpr::constant(vec![vec![]], typ);
2148    let scope = Scope::empty();
2149    (expr, scope)
2150}
2151
2152/// Describes how to execute a SELECT query.
2153///
2154/// `order_by` describes how to order the rows in `expr` *before* applying the
2155/// projection. The `scope` describes the columns in `expr` *after* the
2156/// projection has been applied.
2157#[derive(Debug)]
2158struct SelectPlan {
2159    expr: HirRelationExpr,
2160    scope: Scope,
2161    order_by: Vec<ColumnOrder>,
2162    project: Vec<usize>,
2163}
2164
2165generate_extracted_config!(
2166    SelectOption,
2167    (ExpectedGroupSize, u64),
2168    (AggregateInputGroupSize, u64),
2169    (DistinctOnInputGroupSize, u64),
2170    (LimitInputGroupSize, u64)
2171);
2172
2173/// Plans a SELECT query. The SELECT query may contain an intrusive ORDER BY clause.
2174///
2175/// Normally, the ORDER BY clause occurs after the columns specified in the
2176/// SELECT list have been projected. In a query like
2177///
2178///   CREATE TABLE (a int, b int)
2179///   (SELECT a FROM t) UNION (SELECT a FROM t) ORDER BY a
2180///
2181/// it is valid to refer to `a`, because it is explicitly selected, but it would
2182/// not be valid to refer to unselected column `b`.
2183///
2184/// But PostgreSQL extends the standard to permit queries like
2185///
2186///   SELECT a FROM t ORDER BY b
2187///
2188/// where expressions in the ORDER BY clause can refer to *both* input columns
2189/// and output columns.
2190fn plan_select_from_where(
2191    qcx: &QueryContext,
2192    mut s: Select<Aug>,
2193    mut order_by_exprs: Vec<OrderByExpr<Aug>>,
2194) -> Result<SelectPlan, PlanError> {
2195    // TODO: Both `s` and `order_by_exprs` are not references because the
2196    // AggregateTableFuncVisitor needs to be able to rewrite the expressions for
2197    // table function support (the UUID mapping). Attempt to change this so callers
2198    // don't need to clone the Select.
2199
2200    // Extract query options.
2201    let select_option_extracted = SelectOptionExtracted::try_from(s.options.clone())?;
2202    let group_size_hints = GroupSizeHints::try_from(select_option_extracted)?;
2203
2204    // Step 1. Handle FROM clause, including joins.
2205    let (mut relation_expr, mut from_scope) =
2206        s.from.iter().try_fold(plan_join_identity(), |l, twj| {
2207            let (left, left_scope) = l;
2208            plan_join(
2209                qcx,
2210                left,
2211                left_scope,
2212                &Join {
2213                    relation: TableFactor::NestedJoin {
2214                        join: Box::new(twj.clone()),
2215                        alias: None,
2216                    },
2217                    join_operator: JoinOperator::CrossJoin,
2218                },
2219            )
2220        })?;
2221
2222    // Step 2. Handle WHERE clause.
2223    if let Some(selection) = &s.selection {
2224        let ecx = &ExprContext {
2225            qcx,
2226            name: "WHERE clause",
2227            scope: &from_scope,
2228            relation_type: &qcx.relation_type(&relation_expr),
2229            allow_aggregates: false,
2230            allow_subqueries: true,
2231            allow_parameters: true,
2232            allow_windows: false,
2233        };
2234        let expr = plan_expr(ecx, selection)
2235            .map_err(|e| sql_err!("WHERE clause error: {}", e))?
2236            .type_as(ecx, &ScalarType::Bool)?;
2237        relation_expr = relation_expr.filter(vec![expr]);
2238    }
2239
2240    // Step 3. Gather aggregates and table functions.
2241    // (But skip window aggregates.)
2242    let (aggregates, table_funcs) = {
2243        let mut visitor = AggregateTableFuncVisitor::new(qcx.scx);
2244        visitor.visit_select_mut(&mut s);
2245        for o in order_by_exprs.iter_mut() {
2246            visitor.visit_order_by_expr_mut(o);
2247        }
2248        visitor.into_result()?
2249    };
2250    let mut table_func_names: BTreeMap<String, Ident> = BTreeMap::new();
2251    if !table_funcs.is_empty() {
2252        let (expr, scope) = plan_scalar_table_funcs(
2253            qcx,
2254            table_funcs,
2255            &mut table_func_names,
2256            &relation_expr,
2257            &from_scope,
2258        )?;
2259        relation_expr = relation_expr.join(expr, HirScalarExpr::literal_true(), JoinKind::Inner);
2260        from_scope = from_scope.product(scope)?;
2261    }
2262
2263    // Step 4. Expand SELECT clause.
2264    let projection = {
2265        let ecx = &ExprContext {
2266            qcx,
2267            name: "SELECT clause",
2268            scope: &from_scope,
2269            relation_type: &qcx.relation_type(&relation_expr),
2270            allow_aggregates: true,
2271            allow_subqueries: true,
2272            allow_parameters: true,
2273            allow_windows: true,
2274        };
2275        let mut out = vec![];
2276        for si in &s.projection {
2277            if *si == SelectItem::Wildcard && s.from.is_empty() {
2278                sql_bail!("SELECT * with no tables specified is not valid");
2279            }
2280            out.extend(expand_select_item(ecx, si, &table_func_names)?);
2281        }
2282        out
2283    };
2284
2285    // Step 5. Handle GROUP BY clause.
2286    // This will also plan the aggregates gathered in Step 3.
2287    // See an overview of how aggregates are planned in the doc comment at the top of the file.
2288    let (mut group_scope, select_all_mapping) = {
2289        // Compute GROUP BY expressions.
2290        let ecx = &ExprContext {
2291            qcx,
2292            name: "GROUP BY clause",
2293            scope: &from_scope,
2294            relation_type: &qcx.relation_type(&relation_expr),
2295            allow_aggregates: false,
2296            allow_subqueries: true,
2297            allow_parameters: true,
2298            allow_windows: false,
2299        };
2300        let mut group_key = vec![];
2301        let mut group_exprs: BTreeMap<HirScalarExpr, ScopeItem> = BTreeMap::new();
2302        let mut group_hir_exprs = vec![];
2303        let mut group_scope = Scope::empty();
2304        let mut select_all_mapping = BTreeMap::new();
2305
2306        for group_expr in &s.group_by {
2307            let (group_expr, expr) = plan_group_by_expr(ecx, group_expr, &projection)?;
2308            let new_column = group_key.len();
2309
2310            if let Some(group_expr) = group_expr {
2311                // Multiple AST expressions can map to the same HIR expression.
2312                // If we already have a ScopeItem for this HIR, we can add this
2313                // next AST expression to its set
2314                if let Some(existing_scope_item) = group_exprs.get_mut(&expr) {
2315                    existing_scope_item.exprs.insert(group_expr.clone());
2316                    continue;
2317                }
2318            }
2319
2320            let mut scope_item = if let HirScalarExpr::Column(ColumnRef {
2321                level: 0,
2322                column: old_column,
2323            }) = &expr
2324            {
2325                // If we later have `SELECT foo.*` then we have to find all
2326                // the `foo` items in `from_scope` and figure out where they
2327                // ended up in `group_scope`. This is really hard to do
2328                // right using SQL name resolution, so instead we just track
2329                // the movement here.
2330                select_all_mapping.insert(*old_column, new_column);
2331                let scope_item = ecx.scope.items[*old_column].clone();
2332                scope_item
2333            } else {
2334                ScopeItem::empty()
2335            };
2336
2337            if let Some(group_expr) = group_expr.cloned() {
2338                scope_item.exprs.insert(group_expr);
2339            }
2340
2341            group_key.push(from_scope.len() + group_exprs.len());
2342            group_hir_exprs.push(expr.clone());
2343            group_exprs.insert(expr, scope_item);
2344        }
2345
2346        assert_eq!(group_hir_exprs.len(), group_exprs.len());
2347        for expr in &group_hir_exprs {
2348            if let Some(scope_item) = group_exprs.remove(expr) {
2349                group_scope.items.push(scope_item);
2350            }
2351        }
2352
2353        // Plan aggregates.
2354        let ecx = &ExprContext {
2355            qcx,
2356            name: "aggregate function",
2357            scope: &from_scope,
2358            relation_type: &qcx.relation_type(&relation_expr.clone().map(group_hir_exprs.clone())),
2359            allow_aggregates: false,
2360            allow_subqueries: true,
2361            allow_parameters: true,
2362            allow_windows: false,
2363        };
2364        let mut agg_exprs = vec![];
2365        for sql_function in aggregates {
2366            if sql_function.over.is_some() {
2367                unreachable!(
2368                    "Window aggregate; AggregateTableFuncVisitor explicitly filters these out"
2369                );
2370            }
2371            agg_exprs.push(plan_aggregate_common(ecx, &sql_function)?);
2372            group_scope
2373                .items
2374                .push(ScopeItem::from_expr(Expr::Function(sql_function.clone())));
2375        }
2376        if !agg_exprs.is_empty() || !group_key.is_empty() || s.having.is_some() {
2377            // apply GROUP BY / aggregates
2378            relation_expr = relation_expr.map(group_hir_exprs).reduce(
2379                group_key,
2380                agg_exprs,
2381                group_size_hints.aggregate_input_group_size,
2382            );
2383
2384            // For every old column that wasn't a group key, add a scope item
2385            // that errors when referenced. We can't simply drop these items
2386            // from scope. These items need to *exist* because they might shadow
2387            // variables in outer scopes that would otherwise be valid to
2388            // reference, but accessing them needs to produce an error.
2389            for i in 0..from_scope.len() {
2390                if !select_all_mapping.contains_key(&i) {
2391                    let scope_item = &ecx.scope.items[i];
2392                    group_scope.ungrouped_columns.push(ScopeUngroupedColumn {
2393                        table_name: scope_item.table_name.clone(),
2394                        column_name: scope_item.column_name.clone(),
2395                        allow_unqualified_references: scope_item.allow_unqualified_references,
2396                    });
2397                }
2398            }
2399
2400            (group_scope, select_all_mapping)
2401        } else {
2402            // if no GROUP BY, aggregates or having then all columns remain in scope
2403            (
2404                from_scope.clone(),
2405                (0..from_scope.len()).map(|i| (i, i)).collect(),
2406            )
2407        }
2408    };
2409
2410    // Step 6. Handle HAVING clause.
2411    if let Some(ref having) = s.having {
2412        let ecx = &ExprContext {
2413            qcx,
2414            name: "HAVING clause",
2415            scope: &group_scope,
2416            relation_type: &qcx.relation_type(&relation_expr),
2417            allow_aggregates: true,
2418            allow_subqueries: true,
2419            allow_parameters: true,
2420            allow_windows: false,
2421        };
2422        let expr = plan_expr(ecx, having)?.type_as(ecx, &ScalarType::Bool)?;
2423        relation_expr = relation_expr.filter(vec![expr]);
2424    }
2425
2426    // Step 7. Gather window functions from SELECT, ORDER BY, and QUALIFY, and plan them.
2427    // (This includes window aggregations.)
2428    //
2429    // Note that window functions can be present only in SELECT, ORDER BY, or QUALIFY (including
2430    // DISTINCT ON), because they are executed after grouped aggregations and HAVING.
2431    //
2432    // Also note that window functions in the ORDER BY can't refer to columns introduced in the
2433    // SELECT. This is because when an output column appears in ORDER BY, it can only stand alone,
2434    // and can't be part of a bigger expression.
2435    // See https://www.postgresql.org/docs/current/queries-order.html:
2436    // "Note that an output column name has to stand alone, that is, it cannot be used in an
2437    // expression"
2438    let window_funcs = {
2439        let mut visitor = WindowFuncCollector::default();
2440        // The `visit_select` call visits both `SELECT` and `QUALIFY` (and many other things, but
2441        // window functions are excluded from other things by `allow_windows` being false when
2442        // planning those before this code).
2443        visitor.visit_select(&s);
2444        for o in order_by_exprs.iter() {
2445            visitor.visit_order_by_expr(o);
2446        }
2447        visitor.into_result()
2448    };
2449    for window_func in window_funcs {
2450        let ecx = &ExprContext {
2451            qcx,
2452            name: "window function",
2453            scope: &group_scope,
2454            relation_type: &qcx.relation_type(&relation_expr),
2455            allow_aggregates: true,
2456            allow_subqueries: true,
2457            allow_parameters: true,
2458            allow_windows: true,
2459        };
2460        relation_expr = relation_expr.map(vec![plan_expr(ecx, &window_func)?.type_as_any(ecx)?]);
2461        group_scope.items.push(ScopeItem::from_expr(window_func));
2462    }
2463    // From this point on, we shouldn't encounter _valid_ window function calls, because those have
2464    // been already planned now. However, we should still set `allow_windows: true` for the
2465    // remaining planning of `QUALIFY`, `SELECT`, and `ORDER BY`, in order to have a correct error
2466    // msg if an OVER clause is missing from a window function.
2467
2468    // Step 8. Handle QUALIFY clause. (very similar to HAVING)
2469    if let Some(ref qualify) = s.qualify {
2470        let ecx = &ExprContext {
2471            qcx,
2472            name: "QUALIFY clause",
2473            scope: &group_scope,
2474            relation_type: &qcx.relation_type(&relation_expr),
2475            allow_aggregates: true,
2476            allow_subqueries: true,
2477            allow_parameters: true,
2478            allow_windows: true,
2479        };
2480        let expr = plan_expr(ecx, qualify)?.type_as(ecx, &ScalarType::Bool)?;
2481        relation_expr = relation_expr.filter(vec![expr]);
2482    }
2483
2484    // Step 9. Handle SELECT clause.
2485    let output_columns = {
2486        let mut new_exprs = vec![];
2487        let mut new_type = qcx.relation_type(&relation_expr);
2488        let mut output_columns = vec![];
2489        for (select_item, column_name) in &projection {
2490            let ecx = &ExprContext {
2491                qcx,
2492                name: "SELECT clause",
2493                scope: &group_scope,
2494                relation_type: &new_type,
2495                allow_aggregates: true,
2496                allow_subqueries: true,
2497                allow_parameters: true,
2498                allow_windows: true,
2499            };
2500            let expr = match select_item {
2501                ExpandedSelectItem::InputOrdinal(i) => {
2502                    if let Some(column) = select_all_mapping.get(i).copied() {
2503                        HirScalarExpr::column(column)
2504                    } else {
2505                        return Err(PlanError::ungrouped_column(&from_scope.items[*i]));
2506                    }
2507                }
2508                ExpandedSelectItem::Expr(expr) => plan_expr(ecx, expr)?.type_as_any(ecx)?,
2509            };
2510            if let HirScalarExpr::Column(ColumnRef { level: 0, column }) = expr {
2511                // Simple column reference; no need to map on a new expression.
2512                output_columns.push((column, column_name));
2513            } else {
2514                // Complicated expression that requires a map expression. We
2515                // update `group_scope` as we go so that future expressions that
2516                // are textually identical to this one can reuse it. This
2517                // duplicate detection is required for proper determination of
2518                // ambiguous column references with SQL92-style `ORDER BY`
2519                // items. See `plan_order_by_or_distinct_expr` for more.
2520                let typ = ecx.column_type(&expr);
2521                new_type.column_types.push(typ);
2522                new_exprs.push(expr);
2523                output_columns.push((group_scope.len(), column_name));
2524                group_scope
2525                    .items
2526                    .push(ScopeItem::from_expr(select_item.as_expr().cloned()));
2527            }
2528        }
2529        relation_expr = relation_expr.map(new_exprs);
2530        output_columns
2531    };
2532    let mut project_key: Vec<_> = output_columns.iter().map(|(i, _name)| *i).collect();
2533
2534    // Step 10. Handle intrusive ORDER BY and DISTINCT.
2535    let order_by = {
2536        let relation_type = qcx.relation_type(&relation_expr);
2537        let (mut order_by, mut map_exprs) = plan_order_by_exprs(
2538            &ExprContext {
2539                qcx,
2540                name: "ORDER BY clause",
2541                scope: &group_scope,
2542                relation_type: &relation_type,
2543                allow_aggregates: true,
2544                allow_subqueries: true,
2545                allow_parameters: true,
2546                allow_windows: true,
2547            },
2548            &order_by_exprs,
2549            &output_columns,
2550        )?;
2551
2552        match s.distinct {
2553            None => relation_expr = relation_expr.map(map_exprs),
2554            Some(Distinct::EntireRow) => {
2555                if relation_type.arity() == 0 {
2556                    sql_bail!("SELECT DISTINCT must have at least one column");
2557                }
2558                // `SELECT DISTINCT` only distincts on the columns in the SELECT
2559                // list, so we can't proceed if `ORDER BY` has introduced any
2560                // columns for arbitrary expressions. This matches PostgreSQL.
2561                if !try_push_projection_order_by(
2562                    &mut relation_expr,
2563                    &mut project_key,
2564                    &mut order_by,
2565                ) {
2566                    sql_bail!(
2567                        "for SELECT DISTINCT, ORDER BY expressions must appear in select list"
2568                    );
2569                }
2570                assert!(map_exprs.is_empty());
2571                relation_expr = relation_expr.distinct();
2572            }
2573            Some(Distinct::On(exprs)) => {
2574                let ecx = &ExprContext {
2575                    qcx,
2576                    name: "DISTINCT ON clause",
2577                    scope: &group_scope,
2578                    relation_type: &qcx.relation_type(&relation_expr),
2579                    allow_aggregates: true,
2580                    allow_subqueries: true,
2581                    allow_parameters: true,
2582                    allow_windows: true,
2583                };
2584
2585                let mut distinct_exprs = vec![];
2586                for expr in &exprs {
2587                    let expr = plan_order_by_or_distinct_expr(ecx, expr, &output_columns)?;
2588                    distinct_exprs.push(expr);
2589                }
2590
2591                let mut distinct_key = vec![];
2592
2593                // If both `DISTINCT ON` and `ORDER BY` are specified, then the
2594                // `DISTINCT ON` expressions must match the initial `ORDER BY`
2595                // expressions, though the order of `DISTINCT ON` expressions
2596                // does not matter. This matches PostgreSQL and leaves the door
2597                // open to a future optimization where the `DISTINCT ON` and
2598                // `ORDER BY` operations happen in one pass.
2599                //
2600                // On the bright side, any columns that have already been
2601                // computed by `ORDER BY` can be reused in the distinct key.
2602                let arity = relation_type.arity();
2603                for ord in order_by.iter().take(distinct_exprs.len()) {
2604                    // The unusual construction of `expr` here is to ensure the
2605                    // temporary column expression lives long enough.
2606                    let mut expr = &HirScalarExpr::column(ord.column);
2607                    if ord.column >= arity {
2608                        expr = &map_exprs[ord.column - arity];
2609                    };
2610                    match distinct_exprs.iter().position(move |e| e == expr) {
2611                        None => sql_bail!(
2612                            "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
2613                        ),
2614                        Some(pos) => {
2615                            distinct_exprs.remove(pos);
2616                        }
2617                    }
2618                    distinct_key.push(ord.column);
2619                }
2620
2621                // Add any remaining `DISTINCT ON` expressions to the key.
2622                for expr in distinct_exprs {
2623                    // If the expression is a reference to an existing column,
2624                    // do not introduce a new column to support it.
2625                    let column = match expr {
2626                        HirScalarExpr::Column(ColumnRef { level: 0, column }) => column,
2627                        _ => {
2628                            map_exprs.push(expr);
2629                            arity + map_exprs.len() - 1
2630                        }
2631                    };
2632                    distinct_key.push(column);
2633                }
2634
2635                // `DISTINCT ON` is semantically a TopK with limit 1. The
2636                // columns in `ORDER BY` that are not part of the distinct key,
2637                // if there are any, determine the ordering within each group,
2638                // per PostgreSQL semantics.
2639                let distinct_len = distinct_key.len();
2640                relation_expr = HirRelationExpr::top_k(
2641                    relation_expr.map(map_exprs),
2642                    distinct_key,
2643                    order_by.iter().skip(distinct_len).cloned().collect(),
2644                    Some(HirScalarExpr::literal(Datum::Int64(1), ScalarType::Int64)),
2645                    0,
2646                    group_size_hints.distinct_on_input_group_size,
2647                );
2648            }
2649        }
2650
2651        order_by
2652    };
2653
2654    // Construct a clean scope to expose outwards, where all of the state that
2655    // accumulated in the scope during planning of this SELECT is erased. The
2656    // clean scope has at most one name for each column, and the names are not
2657    // associated with any table.
2658    let scope = Scope::from_source(None, projection.into_iter().map(|(_expr, name)| name));
2659
2660    Ok(SelectPlan {
2661        expr: relation_expr,
2662        scope,
2663        order_by,
2664        project: project_key,
2665    })
2666}
2667
2668fn plan_scalar_table_funcs(
2669    qcx: &QueryContext,
2670    table_funcs: BTreeMap<Function<Aug>, String>,
2671    table_func_names: &mut BTreeMap<String, Ident>,
2672    relation_expr: &HirRelationExpr,
2673    from_scope: &Scope,
2674) -> Result<(HirRelationExpr, Scope), PlanError> {
2675    let rows_from_qcx = qcx.derived_context(from_scope.clone(), qcx.relation_type(relation_expr));
2676
2677    for (table_func, id) in table_funcs.iter() {
2678        table_func_names.insert(
2679            id.clone(),
2680            // TODO(parkmycar): Re-visit after having `FullItemName` use `Ident`s.
2681            Ident::new_unchecked(table_func.name.full_item_name().item.clone()),
2682        );
2683    }
2684    // If there's only a single table function, we can skip generating
2685    // ordinality columns.
2686    if table_funcs.len() == 1 {
2687        let (table_func, id) = table_funcs.iter().next().unwrap();
2688        let (expr, mut scope) =
2689            plan_solitary_table_function(&rows_from_qcx, table_func, None, false)?;
2690
2691        // A single table-function might return several columns as a record
2692        let num_cols = scope.len();
2693        for i in 0..scope.len() {
2694            scope.items[i].table_name = Some(PartialItemName {
2695                database: None,
2696                schema: None,
2697                item: id.clone(),
2698            });
2699            scope.items[i].from_single_column_function = num_cols == 1;
2700            scope.items[i].allow_unqualified_references = false;
2701        }
2702        return Ok((expr, scope));
2703    }
2704    // Otherwise, plan as usual, emulating the ROWS FROM behavior
2705    let (expr, mut scope, num_cols) =
2706        plan_rows_from_internal(&rows_from_qcx, table_funcs.keys(), None)?;
2707
2708    // Munge the scope so table names match with the generated ids.
2709    let mut i = 0;
2710    for (id, num_cols) in table_funcs.values().zip(num_cols) {
2711        for _ in 0..num_cols {
2712            scope.items[i].table_name = Some(PartialItemName {
2713                database: None,
2714                schema: None,
2715                item: id.clone(),
2716            });
2717            scope.items[i].from_single_column_function = num_cols == 1;
2718            scope.items[i].allow_unqualified_references = false;
2719            i += 1;
2720        }
2721        // Ordinality column. This doubles as the
2722        // `is_exists_column_for_a_table_function_that_was_in_the_target_list` later on
2723        // because it only needs to be NULL or not.
2724        scope.items[i].table_name = Some(PartialItemName {
2725            database: None,
2726            schema: None,
2727            item: id.clone(),
2728        });
2729        scope.items[i].is_exists_column_for_a_table_function_that_was_in_the_target_list = true;
2730        scope.items[i].allow_unqualified_references = false;
2731        i += 1;
2732    }
2733    // Coalesced ordinality column.
2734    scope.items[i].allow_unqualified_references = false;
2735    Ok((expr, scope))
2736}
2737
2738/// Plans an expression in a `GROUP BY` clause.
2739///
2740/// For historical reasons, PostgreSQL allows `GROUP BY` expressions to refer to
2741/// names/expressions defined in the `SELECT` clause. These special cases are
2742/// handled by this function; see comments within the implementation for
2743/// details.
2744fn plan_group_by_expr<'a>(
2745    ecx: &ExprContext,
2746    group_expr: &'a Expr<Aug>,
2747    projection: &'a [(ExpandedSelectItem, ColumnName)],
2748) -> Result<(Option<&'a Expr<Aug>>, HirScalarExpr), PlanError> {
2749    let plan_projection = |column: usize| match &projection[column].0 {
2750        ExpandedSelectItem::InputOrdinal(column) => Ok((None, HirScalarExpr::column(*column))),
2751        ExpandedSelectItem::Expr(expr) => {
2752            Ok((Some(expr.as_ref()), plan_expr(ecx, expr)?.type_as_any(ecx)?))
2753        }
2754    };
2755
2756    // Check if the expression is a numeric literal, as in `GROUP BY 1`. This is
2757    // a special case that means to use the ith item in the SELECT clause.
2758    if let Some(column) = check_col_index(ecx.name, group_expr, projection.len())? {
2759        return plan_projection(column);
2760    }
2761
2762    // Check if the expression is a simple identifier, as in `GROUP BY foo`.
2763    // The `foo` can refer to *either* an input column or an output column. If
2764    // both exist, the input column is preferred.
2765    match group_expr {
2766        Expr::Identifier(names) => match plan_identifier(ecx, names) {
2767            Err(PlanError::UnknownColumn {
2768                table: None,
2769                column,
2770                similar,
2771            }) => {
2772                // The expression was a simple identifier that did not match an
2773                // input column. See if it matches an output column.
2774                let mut iter = projection.iter().map(|(_expr, name)| name);
2775                if let Some(i) = iter.position(|n| *n == column) {
2776                    if iter.any(|n| *n == column) {
2777                        Err(PlanError::AmbiguousColumn(column))
2778                    } else {
2779                        plan_projection(i)
2780                    }
2781                } else {
2782                    // The name didn't match an output column either. Return the
2783                    // "unknown column" error.
2784                    Err(PlanError::UnknownColumn {
2785                        table: None,
2786                        column,
2787                        similar,
2788                    })
2789                }
2790            }
2791            res => Ok((Some(group_expr), res?)),
2792        },
2793        _ => Ok((
2794            Some(group_expr),
2795            plan_expr(ecx, group_expr)?.type_as_any(ecx)?,
2796        )),
2797    }
2798}
2799
2800/// Plans a slice of `ORDER BY` expressions.
2801///
2802/// See `plan_order_by_or_distinct_expr` for details on the `output_columns`
2803/// parameter.
2804///
2805/// Returns the determined column orderings and a list of scalar expressions
2806/// that must be mapped onto the underlying relation expression.
2807pub(crate) fn plan_order_by_exprs(
2808    ecx: &ExprContext,
2809    order_by_exprs: &[OrderByExpr<Aug>],
2810    output_columns: &[(usize, &ColumnName)],
2811) -> Result<(Vec<ColumnOrder>, Vec<HirScalarExpr>), PlanError> {
2812    let mut order_by = vec![];
2813    let mut map_exprs = vec![];
2814    for obe in order_by_exprs {
2815        let expr = plan_order_by_or_distinct_expr(ecx, &obe.expr, output_columns)?;
2816        // If the expression is a reference to an existing column,
2817        // do not introduce a new column to support it.
2818        let column = match expr {
2819            HirScalarExpr::Column(ColumnRef { level: 0, column }) => column,
2820            _ => {
2821                map_exprs.push(expr);
2822                ecx.relation_type.arity() + map_exprs.len() - 1
2823            }
2824        };
2825        order_by.push(resolve_desc_and_nulls_last(obe, column));
2826    }
2827    Ok((order_by, map_exprs))
2828}
2829
2830/// Plans an expression that appears in an `ORDER BY` or `DISTINCT ON` clause.
2831///
2832/// The `output_columns` parameter describes, in order, the physical index and
2833/// name of each expression in the `SELECT` list. For example, `[(3, "a")]`
2834/// corresponds to a `SELECT` list with a single entry named "a" that can be
2835/// found at index 3 in the underlying relation expression.
2836///
2837/// There are three cases to handle.
2838///
2839///    1. A simple numeric literal, as in `ORDER BY 1`. This is an ordinal
2840///       reference to the specified output column.
2841///    2. An unqualified identifier, as in `ORDER BY a`. This is a reference to
2842///       an output column, if it exists; otherwise it is a reference to an
2843///       input column.
2844///    3. An arbitrary expression, as in `ORDER BY -a`. Column references in
2845///       arbitrary expressions exclusively refer to input columns, never output
2846///       columns.
2847fn plan_order_by_or_distinct_expr(
2848    ecx: &ExprContext,
2849    expr: &Expr<Aug>,
2850    output_columns: &[(usize, &ColumnName)],
2851) -> Result<HirScalarExpr, PlanError> {
2852    if let Some(i) = check_col_index(ecx.name, expr, output_columns.len())? {
2853        return Ok(HirScalarExpr::column(output_columns[i].0));
2854    }
2855
2856    if let Expr::Identifier(names) = expr {
2857        if let [name] = &names[..] {
2858            let name = normalize::column_name(name.clone());
2859            let mut iter = output_columns.iter().filter(|(_, n)| **n == name);
2860            if let Some((i, _)) = iter.next() {
2861                match iter.next() {
2862                    // Per SQL92, names are not considered ambiguous if they
2863                    // refer to identical target list expressions, as in
2864                    // `SELECT a + 1 AS foo, a + 1 AS foo ... ORDER BY foo`.
2865                    Some((i2, _)) if i != i2 => return Err(PlanError::AmbiguousColumn(name)),
2866                    _ => return Ok(HirScalarExpr::column(*i)),
2867                }
2868            }
2869        }
2870    }
2871
2872    plan_expr(ecx, expr)?.type_as_any(ecx)
2873}
2874
2875fn plan_table_with_joins(
2876    qcx: &QueryContext,
2877    table_with_joins: &TableWithJoins<Aug>,
2878) -> Result<(HirRelationExpr, Scope), PlanError> {
2879    let (mut expr, mut scope) = plan_table_factor(qcx, &table_with_joins.relation)?;
2880    for join in &table_with_joins.joins {
2881        let (new_expr, new_scope) = plan_join(qcx, expr, scope, join)?;
2882        expr = new_expr;
2883        scope = new_scope;
2884    }
2885    Ok((expr, scope))
2886}
2887
2888fn plan_table_factor(
2889    qcx: &QueryContext,
2890    table_factor: &TableFactor<Aug>,
2891) -> Result<(HirRelationExpr, Scope), PlanError> {
2892    match table_factor {
2893        TableFactor::Table { name, alias } => {
2894            let (expr, scope) = qcx.resolve_table_name(name.clone())?;
2895            let scope = plan_table_alias(scope, alias.as_ref())?;
2896            Ok((expr, scope))
2897        }
2898
2899        TableFactor::Function {
2900            function,
2901            alias,
2902            with_ordinality,
2903        } => plan_solitary_table_function(qcx, function, alias.as_ref(), *with_ordinality),
2904
2905        TableFactor::RowsFrom {
2906            functions,
2907            alias,
2908            with_ordinality,
2909        } => plan_rows_from(qcx, functions, alias.as_ref(), *with_ordinality),
2910
2911        TableFactor::Derived {
2912            lateral,
2913            subquery,
2914            alias,
2915        } => {
2916            let mut qcx = (*qcx).clone();
2917            if !lateral {
2918                // Since this derived table was not marked as `LATERAL`,
2919                // make elements in outer scopes invisible until we reach the
2920                // next lateral barrier.
2921                for scope in &mut qcx.outer_scopes {
2922                    if scope.lateral_barrier {
2923                        break;
2924                    }
2925                    scope.items.clear();
2926                }
2927            }
2928            qcx.outer_scopes[0].lateral_barrier = true;
2929            let (expr, scope) = plan_nested_query(&mut qcx, subquery)?;
2930            let scope = plan_table_alias(scope, alias.as_ref())?;
2931            Ok((expr, scope))
2932        }
2933
2934        TableFactor::NestedJoin { join, alias } => {
2935            let (expr, scope) = plan_table_with_joins(qcx, join)?;
2936            let scope = plan_table_alias(scope, alias.as_ref())?;
2937            Ok((expr, scope))
2938        }
2939    }
2940}
2941
2942/// Plans a `ROWS FROM` expression.
2943///
2944/// `ROWS FROM` concatenates table functions into a single table, filling in
2945/// `NULL`s in places where one table function has fewer rows than another. We
2946/// can achieve this by augmenting each table function with a row number, doing
2947/// a `FULL JOIN` between each table function on the row number and eventually
2948/// projecting away the row number columns. Concretely, the following query
2949/// using `ROWS FROM`
2950///
2951/// ```sql
2952/// SELECT
2953///     *
2954/// FROM
2955///     ROWS FROM (
2956///         generate_series(1, 2),
2957///         information_schema._pg_expandarray(ARRAY[9]),
2958///         generate_series(3, 6)
2959///     );
2960/// ```
2961///
2962/// is equivalent to the following query that does not use `ROWS FROM`:
2963///
2964/// ```sql
2965/// SELECT
2966///     gs1.generate_series, expand.x, expand.n, gs2.generate_series
2967/// FROM
2968///     generate_series(1, 2) WITH ORDINALITY AS gs1
2969///     FULL JOIN information_schema._pg_expandarray(ARRAY[9]) WITH ORDINALITY AS expand
2970///         ON gs1.ordinality = expand.ordinality
2971///     FULL JOIN generate_series(3, 6) WITH ORDINALITY AS gs3
2972///         ON coalesce(gs1.ordinality, expand.ordinality) = gs3.ordinality;
2973/// ```
2974///
2975/// Note the call to `coalesce` in the last join condition, which ensures that
2976/// `gs3` will align with whichever of `gs1` or `expand` has more rows.
2977///
2978/// This function creates a HirRelationExpr that follows the structure of the
2979/// latter query.
2980///
2981/// `with_ordinality` can be used to have the output expression contain a
2982/// single coalesced ordinality column at the end of the entire expression.
2983fn plan_rows_from(
2984    qcx: &QueryContext,
2985    functions: &[Function<Aug>],
2986    alias: Option<&TableAlias>,
2987    with_ordinality: bool,
2988) -> Result<(HirRelationExpr, Scope), PlanError> {
2989    // If there's only a single table function, planning proceeds as if `ROWS
2990    // FROM` hadn't been written at all.
2991    if let [function] = functions {
2992        return plan_solitary_table_function(qcx, function, alias, with_ordinality);
2993    }
2994
2995    // Per PostgreSQL, all scope items take the name of the first function
2996    // (unless aliased).
2997    // See: https://github.com/postgres/postgres/blob/639a86e36/src/backend/parser/parse_relation.c#L1701-L1705
2998    let (expr, mut scope, num_cols) = plan_rows_from_internal(
2999        qcx,
3000        functions,
3001        Some(functions[0].name.full_item_name().clone()),
3002    )?;
3003
3004    // Columns tracks the set of columns we will keep in the projection.
3005    let mut columns = Vec::new();
3006    let mut offset = 0;
3007    // Retain table function's non-ordinality columns.
3008    for (idx, cols) in num_cols.into_iter().enumerate() {
3009        for i in 0..cols {
3010            columns.push(offset + i);
3011        }
3012        offset += cols + 1;
3013
3014        // Remove the ordinality column from the scope, accounting for previous scope
3015        // changes from this loop.
3016        scope.items.remove(offset - idx - 1);
3017    }
3018
3019    // If `WITH ORDINALITY` was specified, include the coalesced ordinality
3020    // column. Otherwise remove it from the scope.
3021    if with_ordinality {
3022        columns.push(scope.items.len());
3023    } else {
3024        scope.items.pop();
3025    }
3026
3027    let expr = expr.project(columns);
3028
3029    let scope = plan_table_alias(scope, alias)?;
3030    Ok((expr, scope))
3031}
3032
3033/// Plans an expression coalescing multiple table functions. Each table
3034/// function is followed by its row ordinality. The entire expression is
3035/// followed by the coalesced row ordinality.
3036///
3037/// The returned Scope will set all item's table_name's to the `table_name`
3038/// parameter if it is `Some`. If `None`, they will be the name of each table
3039/// function.
3040///
3041/// The returned `Vec<usize>` is the number of (non-ordinality) columns from
3042/// each table function.
3043///
3044/// For example, with table functions tf1 returning 1 column (a) and tf2
3045/// returning 2 columns (b, c), this function will return an expr 6 columns:
3046///
3047/// - tf1.a
3048/// - tf1.ordinality
3049/// - tf2.b
3050/// - tf2.c
3051/// - tf2.ordinality
3052/// - coalesced_ordinality
3053///
3054/// And a `Vec<usize>` of `[1, 2]`.
3055fn plan_rows_from_internal<'a>(
3056    qcx: &QueryContext,
3057    functions: impl IntoIterator<Item = &'a Function<Aug>>,
3058    table_name: Option<FullItemName>,
3059) -> Result<(HirRelationExpr, Scope, Vec<usize>), PlanError> {
3060    let mut functions = functions.into_iter();
3061    let mut num_cols = Vec::new();
3062
3063    // Join together each of the table functions in turn. The last column is
3064    // always the column to join against and is maintained to be the coalescence
3065    // of the row number column for all prior functions.
3066    let (mut left_expr, mut left_scope) =
3067        plan_table_function_internal(qcx, functions.next().unwrap(), true, table_name.clone())?;
3068    num_cols.push(left_scope.len() - 1);
3069    // Create the coalesced ordinality column.
3070    left_expr = left_expr.map(vec![HirScalarExpr::column(left_scope.len() - 1)]);
3071    left_scope
3072        .items
3073        .push(ScopeItem::from_column_name("ordinality"));
3074
3075    for function in functions {
3076        // The right hand side of a join must be planned in a new scope.
3077        let qcx = qcx.empty_derived_context();
3078        let (right_expr, mut right_scope) =
3079            plan_table_function_internal(&qcx, function, true, table_name.clone())?;
3080        num_cols.push(right_scope.len() - 1);
3081        let left_col = left_scope.len() - 1;
3082        let right_col = left_scope.len() + right_scope.len() - 1;
3083        let on = HirScalarExpr::CallBinary {
3084            func: BinaryFunc::Eq,
3085            expr1: Box::new(HirScalarExpr::column(left_col)),
3086            expr2: Box::new(HirScalarExpr::column(right_col)),
3087        };
3088        left_expr = left_expr
3089            .join(right_expr, on, JoinKind::FullOuter)
3090            .map(vec![HirScalarExpr::CallVariadic {
3091                func: VariadicFunc::Coalesce,
3092                exprs: vec![
3093                    HirScalarExpr::column(left_col),
3094                    HirScalarExpr::column(right_col),
3095                ],
3096            }]);
3097
3098        // Project off the previous iteration's coalesced column, but keep both of this
3099        // iteration's ordinality columns.
3100        left_expr = left_expr.project(
3101            (0..left_col) // non-coalesced ordinality columns from left function
3102                .chain(left_col + 1..right_col + 2) // non-ordinality columns from right function
3103                .collect(),
3104        );
3105        // Move the coalesced ordinality column.
3106        right_scope.items.push(left_scope.items.pop().unwrap());
3107
3108        left_scope.items.extend(right_scope.items);
3109    }
3110
3111    Ok((left_expr, left_scope, num_cols))
3112}
3113
3114/// Plans a table function that appears alone, i.e., that is not part of a `ROWS
3115/// FROM` clause that contains other table functions. Special aliasing rules
3116/// apply.
3117fn plan_solitary_table_function(
3118    qcx: &QueryContext,
3119    function: &Function<Aug>,
3120    alias: Option<&TableAlias>,
3121    with_ordinality: bool,
3122) -> Result<(HirRelationExpr, Scope), PlanError> {
3123    let (expr, mut scope) = plan_table_function_internal(qcx, function, with_ordinality, None)?;
3124
3125    let single_column_function = scope.len() == 1 + if with_ordinality { 1 } else { 0 };
3126    if single_column_function {
3127        let item = &mut scope.items[0];
3128
3129        // Mark that the function only produced a single column. This impacts
3130        // whole-row references.
3131        item.from_single_column_function = true;
3132
3133        // Strange special case for solitary table functions that output one
3134        // column whose name matches the name of the table function. If a table
3135        // alias is provided, the column name is changed to the table alias's
3136        // name. Concretely, the following query returns a column named `x`
3137        // rather than a column named `generate_series`:
3138        //
3139        //     SELECT * FROM generate_series(1, 5) AS x
3140        //
3141        // Note that this case does not apply to e.g. `jsonb_array_elements`,
3142        // since its output column is explicitly named `value`, not
3143        // `jsonb_array_elements`.
3144        //
3145        // Note also that we may (correctly) change the column name again when
3146        // we plan the table alias below if the `alias.columns` is non-empty.
3147        if let Some(alias) = alias {
3148            if let ScopeItem {
3149                table_name: Some(table_name),
3150                column_name,
3151                ..
3152            } = item
3153            {
3154                if table_name.item.as_str() == column_name.as_str() {
3155                    *column_name = normalize::column_name(alias.name.clone());
3156                }
3157            }
3158        }
3159    }
3160
3161    let scope = plan_table_alias(scope, alias)?;
3162    Ok((expr, scope))
3163}
3164
3165/// Plans a table function.
3166///
3167/// You generally should call `plan_rows_from` or `plan_solitary_table_function`
3168/// instead to get the appropriate aliasing behavior.
3169fn plan_table_function_internal(
3170    qcx: &QueryContext,
3171    Function {
3172        name,
3173        args,
3174        filter,
3175        over,
3176        distinct,
3177    }: &Function<Aug>,
3178    with_ordinality: bool,
3179    table_name: Option<FullItemName>,
3180) -> Result<(HirRelationExpr, Scope), PlanError> {
3181    assert_none!(filter, "cannot parse table function with FILTER");
3182    assert_none!(over, "cannot parse table function with OVER");
3183    assert!(!*distinct, "cannot parse table function with DISTINCT");
3184
3185    let ecx = &ExprContext {
3186        qcx,
3187        name: "table function arguments",
3188        scope: &Scope::empty(),
3189        relation_type: &RelationType::empty(),
3190        allow_aggregates: false,
3191        allow_subqueries: true,
3192        allow_parameters: true,
3193        allow_windows: false,
3194    };
3195
3196    let scalar_args = match args {
3197        FunctionArgs::Star => sql_bail!("{} does not accept * as an argument", name),
3198        FunctionArgs::Args { args, order_by } => {
3199            if !order_by.is_empty() {
3200                sql_bail!(
3201                    "ORDER BY specified, but {} is not an aggregate function",
3202                    name
3203                );
3204            }
3205            plan_exprs(ecx, args)?
3206        }
3207    };
3208
3209    let table_name = match table_name {
3210        Some(table_name) => table_name.item,
3211        None => name.full_item_name().item.clone(),
3212    };
3213
3214    let scope_name = Some(PartialItemName {
3215        database: None,
3216        schema: None,
3217        item: table_name,
3218    });
3219
3220    let (mut expr, mut scope) = match resolve_func(ecx, name, args)? {
3221        Func::Table(impls) => {
3222            let tf = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3223            let scope = Scope::from_source(scope_name.clone(), tf.column_names);
3224            (tf.expr, scope)
3225        }
3226        Func::Scalar(impls) => {
3227            let expr = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
3228            let output = expr.typ(
3229                &qcx.outer_relation_types,
3230                &RelationType::new(vec![]),
3231                &qcx.scx.param_types.borrow(),
3232            );
3233
3234            let relation = RelationType::new(vec![output]);
3235
3236            let function_ident = Ident::new(name.full_item_name().item.clone())?;
3237            let column_name = normalize::column_name(function_ident);
3238            let name = column_name.to_string();
3239
3240            let scope = Scope::from_source(scope_name.clone(), vec![column_name]);
3241
3242            (
3243                HirRelationExpr::CallTable {
3244                    func: mz_expr::TableFunc::TabletizedScalar { relation, name },
3245                    exprs: vec![expr],
3246                },
3247                scope,
3248            )
3249        }
3250        o => sql_bail!(
3251            "{} functions are not supported in functions in FROM",
3252            o.class()
3253        ),
3254    };
3255
3256    if with_ordinality {
3257        expr = expr.map(vec![HirScalarExpr::Windowing(WindowExpr {
3258            func: WindowExprType::Scalar(ScalarWindowExpr {
3259                func: ScalarWindowFunc::RowNumber,
3260                order_by: vec![],
3261            }),
3262            partition_by: vec![],
3263            order_by: vec![],
3264        })]);
3265        scope
3266            .items
3267            .push(ScopeItem::from_name(scope_name, "ordinality"));
3268    }
3269
3270    Ok((expr, scope))
3271}
3272
3273fn plan_table_alias(mut scope: Scope, alias: Option<&TableAlias>) -> Result<Scope, PlanError> {
3274    if let Some(TableAlias {
3275        name,
3276        columns,
3277        strict,
3278    }) = alias
3279    {
3280        if (columns.len() > scope.items.len()) || (*strict && columns.len() != scope.items.len()) {
3281            sql_bail!(
3282                "{} has {} columns available but {} columns specified",
3283                name,
3284                scope.items.len(),
3285                columns.len()
3286            );
3287        }
3288
3289        let table_name = normalize::ident(name.to_owned());
3290        for (i, item) in scope.items.iter_mut().enumerate() {
3291            item.table_name = if item.allow_unqualified_references {
3292                Some(PartialItemName {
3293                    database: None,
3294                    schema: None,
3295                    item: table_name.clone(),
3296                })
3297            } else {
3298                // Columns that prohibit unqualified references are special
3299                // columns from the output of a NATURAL or USING join that can
3300                // only be referenced by their full, pre-join name. Applying an
3301                // alias to the output of that join renders those columns
3302                // inaccessible, which we accomplish here by setting the
3303                // table name to `None`.
3304                //
3305                // Concretely, consider:
3306                //
3307                //      CREATE TABLE t1 (a int);
3308                //      CREATE TABLE t2 (a int);
3309                //  (1) SELECT ... FROM (t1 NATURAL JOIN t2);
3310                //  (2) SELECT ... FROM (t1 NATURAL JOIN t2) AS t;
3311                //
3312                // In (1), the join has no alias. The underlying columns from
3313                // either side of the join can be referenced as `t1.a` and
3314                // `t2.a`, respectively, and the unqualified name `a` refers to
3315                // a column whose value is `coalesce(t1.a, t2.a)`.
3316                //
3317                // In (2), the join is aliased as `t`. The columns from either
3318                // side of the join (`t1.a` and `t2.a`) are inaccessible, and
3319                // the coalesced column can be named as either `a` or `t.a`.
3320                //
3321                // We previously had a bug [0] that mishandled this subtle
3322                // logic.
3323                //
3324                // NOTE(benesch): We could in theory choose to project away
3325                // those inaccessible columns and drop them from the scope
3326                // entirely, but that would require that this function also
3327                // take and return the `HirRelationExpr` that is being aliased,
3328                // which is a rather large refactor.
3329                //
3330                // [0]: https://github.com/MaterializeInc/database-issues/issues/4887
3331                None
3332            };
3333            item.column_name = columns
3334                .get(i)
3335                .map(|a| normalize::column_name(a.clone()))
3336                .unwrap_or_else(|| item.column_name.clone());
3337        }
3338    }
3339    Ok(scope)
3340}
3341
3342// `table_func_names` is a mapping from a UUID to the original function
3343// name. The UUIDs are identifiers that have been rewritten from some table
3344// function expression, and this mapping restores the original names.
3345fn invent_column_name(
3346    ecx: &ExprContext,
3347    expr: &Expr<Aug>,
3348    table_func_names: &BTreeMap<String, Ident>,
3349) -> Option<ColumnName> {
3350    // We follow PostgreSQL exactly here, which has some complicated rules
3351    // around "high" and "low" quality names. Low quality names override other
3352    // low quality names but not high quality names.
3353    //
3354    // See: https://github.com/postgres/postgres/blob/1f655fdc3/src/backend/parser/parse_target.c#L1716-L1728
3355
3356    #[derive(Debug)]
3357    enum NameQuality {
3358        Low,
3359        High,
3360    }
3361
3362    fn invent(
3363        ecx: &ExprContext,
3364        expr: &Expr<Aug>,
3365        table_func_names: &BTreeMap<String, Ident>,
3366    ) -> Option<(ColumnName, NameQuality)> {
3367        match expr {
3368            Expr::Identifier(names) => {
3369                if let [name] = names.as_slice() {
3370                    if let Some(table_func_name) = table_func_names.get(name.as_str()) {
3371                        return Some((
3372                            normalize::column_name(table_func_name.clone()),
3373                            NameQuality::High,
3374                        ));
3375                    }
3376                }
3377                names
3378                    .last()
3379                    .map(|n| (normalize::column_name(n.clone()), NameQuality::High))
3380            }
3381            Expr::Value(v) => match v {
3382                // Per PostgreSQL, `bool` and `interval` literals take on the name
3383                // of their type, but not other literal types.
3384                Value::Boolean(_) => Some(("bool".into(), NameQuality::High)),
3385                Value::Interval(_) => Some(("interval".into(), NameQuality::High)),
3386                _ => None,
3387            },
3388            Expr::Function(func) => {
3389                let (schema, item) = match &func.name {
3390                    ResolvedItemName::Item {
3391                        qualifiers,
3392                        full_name,
3393                        ..
3394                    } => (&qualifiers.schema_spec, full_name.item.clone()),
3395                    _ => unreachable!(),
3396                };
3397
3398                if schema == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_internal_schema_id())
3399                    || schema
3400                        == &SchemaSpecifier::from(ecx.qcx.scx.catalog.get_mz_unsafe_schema_id())
3401                {
3402                    None
3403                } else {
3404                    Some((item.into(), NameQuality::High))
3405                }
3406            }
3407            Expr::HomogenizingFunction { function, .. } => Some((
3408                function.to_string().to_lowercase().into(),
3409                NameQuality::High,
3410            )),
3411            Expr::NullIf { .. } => Some(("nullif".into(), NameQuality::High)),
3412            Expr::Array { .. } => Some(("array".into(), NameQuality::High)),
3413            Expr::List { .. } => Some(("list".into(), NameQuality::High)),
3414            Expr::Map { .. } | Expr::MapSubquery(_) => Some(("map".into(), NameQuality::High)),
3415            Expr::Cast { expr, data_type } => match invent(ecx, expr, table_func_names) {
3416                Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3417                _ => Some((data_type.unqualified_item_name().into(), NameQuality::Low)),
3418            },
3419            Expr::Case { else_result, .. } => {
3420                match else_result
3421                    .as_ref()
3422                    .and_then(|else_result| invent(ecx, else_result, table_func_names))
3423                {
3424                    Some((name, NameQuality::High)) => Some((name, NameQuality::High)),
3425                    _ => Some(("case".into(), NameQuality::Low)),
3426                }
3427            }
3428            Expr::FieldAccess { field, .. } => {
3429                Some((normalize::column_name(field.clone()), NameQuality::High))
3430            }
3431            Expr::Exists { .. } => Some(("exists".into(), NameQuality::High)),
3432            Expr::Subscript { expr, .. } => invent(ecx, expr, table_func_names),
3433            Expr::Subquery(query) | Expr::ListSubquery(query) | Expr::ArraySubquery(query) => {
3434                // A bit silly to have to plan the query here just to get its column
3435                // name, since we throw away the planned expression, but fixing this
3436                // requires a separate semantic analysis phase.
3437                let (_expr, scope) =
3438                    plan_nested_query(&mut ecx.derived_query_context(), query).ok()?;
3439                scope
3440                    .items
3441                    .first()
3442                    .map(|name| (name.column_name.clone(), NameQuality::High))
3443            }
3444            Expr::Row { .. } => Some(("row".into(), NameQuality::High)),
3445            _ => None,
3446        }
3447    }
3448
3449    invent(ecx, expr, table_func_names).map(|(name, _quality)| name)
3450}
3451
3452#[derive(Debug)]
3453enum ExpandedSelectItem<'a> {
3454    InputOrdinal(usize),
3455    Expr(Cow<'a, Expr<Aug>>),
3456}
3457
3458impl ExpandedSelectItem<'_> {
3459    fn as_expr(&self) -> Option<&Expr<Aug>> {
3460        match self {
3461            ExpandedSelectItem::InputOrdinal(_) => None,
3462            ExpandedSelectItem::Expr(expr) => Some(expr),
3463        }
3464    }
3465}
3466
3467fn expand_select_item<'a>(
3468    ecx: &ExprContext,
3469    s: &'a SelectItem<Aug>,
3470    table_func_names: &BTreeMap<String, Ident>,
3471) -> Result<Vec<(ExpandedSelectItem<'a>, ColumnName)>, PlanError> {
3472    match s {
3473        SelectItem::Expr {
3474            expr: Expr::QualifiedWildcard(table_name),
3475            alias: _,
3476        } => {
3477            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3478            let table_name =
3479                normalize::unresolved_item_name(UnresolvedItemName(table_name.clone()))?;
3480            let out: Vec<_> = ecx
3481                .scope
3482                .items
3483                .iter()
3484                .enumerate()
3485                .filter(|(_i, item)| item.is_from_table(&table_name))
3486                .map(|(i, item)| {
3487                    let name = item.column_name.clone();
3488                    (ExpandedSelectItem::InputOrdinal(i), name)
3489                })
3490                .collect();
3491            if out.is_empty() {
3492                sql_bail!("no table named '{}' in scope", table_name);
3493            }
3494            Ok(out)
3495        }
3496        SelectItem::Expr {
3497            expr: Expr::WildcardAccess(sql_expr),
3498            alias: _,
3499        } => {
3500            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3501            // A bit silly to have to plan the expression here just to get its
3502            // type, since we throw away the planned expression, but fixing this
3503            // requires a separate semantic analysis phase. Luckily this is an
3504            // uncommon operation and the PostgreSQL docs have a warning that
3505            // this operation is slow in Postgres too.
3506            let expr = plan_expr(ecx, sql_expr)?.type_as_any(ecx)?;
3507            let fields = match ecx.scalar_type(&expr) {
3508                ScalarType::Record { fields, .. } => fields,
3509                ty => sql_bail!(
3510                    "type {} is not composite",
3511                    ecx.humanize_scalar_type(&ty, false)
3512                ),
3513            };
3514            let mut skip_cols: BTreeSet<ColumnName> = BTreeSet::new();
3515            if let Expr::Identifier(ident) = sql_expr.as_ref() {
3516                if let [name] = ident.as_slice() {
3517                    if let Ok(items) = ecx.scope.items_from_table(
3518                        &[],
3519                        &PartialItemName {
3520                            database: None,
3521                            schema: None,
3522                            item: name.as_str().to_string(),
3523                        },
3524                    ) {
3525                        for (_, item) in items {
3526                            if item
3527                                .is_exists_column_for_a_table_function_that_was_in_the_target_list
3528                            {
3529                                skip_cols.insert(item.column_name.clone());
3530                            }
3531                        }
3532                    }
3533                }
3534            }
3535            let items = fields
3536                .iter()
3537                .filter_map(|(name, _ty)| {
3538                    if skip_cols.contains(name) {
3539                        None
3540                    } else {
3541                        let item = ExpandedSelectItem::Expr(Cow::Owned(Expr::FieldAccess {
3542                            expr: sql_expr.clone(),
3543                            field: name.clone().into(),
3544                        }));
3545                        Some((item, name.clone()))
3546                    }
3547                })
3548                .collect();
3549            Ok(items)
3550        }
3551        SelectItem::Wildcard => {
3552            *ecx.qcx.scx.ambiguous_columns.borrow_mut() = true;
3553            let items: Vec<_> = ecx
3554                .scope
3555                .items
3556                .iter()
3557                .enumerate()
3558                .filter(|(_i, item)| item.allow_unqualified_references)
3559                .map(|(i, item)| {
3560                    let name = item.column_name.clone();
3561                    (ExpandedSelectItem::InputOrdinal(i), name)
3562                })
3563                .collect();
3564
3565            Ok(items)
3566        }
3567        SelectItem::Expr { expr, alias } => {
3568            let name = alias
3569                .clone()
3570                .map(normalize::column_name)
3571                .or_else(|| invent_column_name(ecx, expr, table_func_names))
3572                .unwrap_or_else(|| "?column?".into());
3573            Ok(vec![(ExpandedSelectItem::Expr(Cow::Borrowed(expr)), name)])
3574        }
3575    }
3576}
3577
3578fn plan_join(
3579    left_qcx: &QueryContext,
3580    left: HirRelationExpr,
3581    left_scope: Scope,
3582    join: &Join<Aug>,
3583) -> Result<(HirRelationExpr, Scope), PlanError> {
3584    const ON_TRUE: JoinConstraint<Aug> = JoinConstraint::On(Expr::Value(Value::Boolean(true)));
3585    let (kind, constraint) = match &join.join_operator {
3586        JoinOperator::CrossJoin => (JoinKind::Inner, &ON_TRUE),
3587        JoinOperator::Inner(constraint) => (JoinKind::Inner, constraint),
3588        JoinOperator::LeftOuter(constraint) => (JoinKind::LeftOuter, constraint),
3589        JoinOperator::RightOuter(constraint) => (JoinKind::RightOuter, constraint),
3590        JoinOperator::FullOuter(constraint) => (JoinKind::FullOuter, constraint),
3591    };
3592
3593    let mut right_qcx = left_qcx.derived_context(left_scope.clone(), left_qcx.relation_type(&left));
3594    if !kind.can_be_correlated() {
3595        for item in &mut right_qcx.outer_scopes[0].items {
3596            // Per PostgreSQL (and apparently SQL:2008), we can't simply remove
3597            // these items from scope. These items need to *exist* because they
3598            // might shadow variables in outer scopes that would otherwise be
3599            // valid to reference, but accessing them needs to produce an error.
3600            item.error_if_referenced =
3601                Some(|table, column| PlanError::WrongJoinTypeForLateralColumn {
3602                    table: table.cloned(),
3603                    column: column.clone(),
3604                });
3605        }
3606    }
3607    let (right, right_scope) = plan_table_factor(&right_qcx, &join.relation)?;
3608
3609    let (expr, scope) = match constraint {
3610        JoinConstraint::On(expr) => {
3611            let product_scope = left_scope.product(right_scope)?;
3612            let ecx = &ExprContext {
3613                qcx: left_qcx,
3614                name: "ON clause",
3615                scope: &product_scope,
3616                relation_type: &RelationType::new(
3617                    left_qcx
3618                        .relation_type(&left)
3619                        .column_types
3620                        .into_iter()
3621                        .chain(right_qcx.relation_type(&right).column_types)
3622                        .collect(),
3623                ),
3624                allow_aggregates: false,
3625                allow_subqueries: true,
3626                allow_parameters: true,
3627                allow_windows: false,
3628            };
3629            let on = plan_expr(ecx, expr)?.type_as(ecx, &ScalarType::Bool)?;
3630            let joined = left.join(right, on, kind);
3631            (joined, product_scope)
3632        }
3633        JoinConstraint::Using { columns, alias } => {
3634            let column_names = columns
3635                .iter()
3636                .map(|ident| normalize::column_name(ident.clone()))
3637                .collect::<Vec<_>>();
3638
3639            plan_using_constraint(
3640                &column_names,
3641                left_qcx,
3642                left,
3643                left_scope,
3644                &right_qcx,
3645                right,
3646                right_scope,
3647                kind,
3648                alias.as_ref(),
3649            )?
3650        }
3651        JoinConstraint::Natural => {
3652            // We shouldn't need to set ambiguous_columns on both the right and left qcx since they
3653            // have the same scx. However, it doesn't hurt to be safe.
3654            *left_qcx.scx.ambiguous_columns.borrow_mut() = true;
3655            *right_qcx.scx.ambiguous_columns.borrow_mut() = true;
3656            let left_column_names = left_scope.column_names();
3657            let right_column_names: BTreeSet<_> = right_scope.column_names().collect();
3658            let column_names: Vec<_> = left_column_names
3659                .filter(|col| right_column_names.contains(col))
3660                .cloned()
3661                .collect();
3662            plan_using_constraint(
3663                &column_names,
3664                left_qcx,
3665                left,
3666                left_scope,
3667                &right_qcx,
3668                right,
3669                right_scope,
3670                kind,
3671                None,
3672            )?
3673        }
3674    };
3675    Ok((expr, scope))
3676}
3677
3678// See page 440 of ANSI SQL 2016 spec for details on scoping of using/natural joins
3679#[allow(clippy::too_many_arguments)]
3680fn plan_using_constraint(
3681    column_names: &[ColumnName],
3682    left_qcx: &QueryContext,
3683    left: HirRelationExpr,
3684    left_scope: Scope,
3685    right_qcx: &QueryContext,
3686    right: HirRelationExpr,
3687    right_scope: Scope,
3688    kind: JoinKind,
3689    alias: Option<&Ident>,
3690) -> Result<(HirRelationExpr, Scope), PlanError> {
3691    let mut both_scope = left_scope.clone().product(right_scope.clone())?;
3692
3693    // Cargo culting PG here; no discernable reason this must fail, but PG does
3694    // so we do, as well.
3695    let mut unique_column_names = BTreeSet::new();
3696    for c in column_names {
3697        if !unique_column_names.insert(c) {
3698            return Err(PlanError::Unsupported {
3699                feature: format!(
3700                    "column name {} appears more than once in USING clause",
3701                    c.as_str().quoted()
3702                ),
3703                discussion_no: None,
3704            });
3705        }
3706    }
3707
3708    let alias_item_name = alias.map(|alias| PartialItemName {
3709        database: None,
3710        schema: None,
3711        item: alias.clone().to_string(),
3712    });
3713
3714    if let Some(alias_item_name) = &alias_item_name {
3715        for partial_item_name in both_scope.table_names() {
3716            if partial_item_name.matches(alias_item_name) {
3717                sql_bail!(
3718                    "table name \"{}\" specified more than once",
3719                    alias_item_name
3720                )
3721            }
3722        }
3723    }
3724
3725    let ecx = &ExprContext {
3726        qcx: right_qcx,
3727        name: "USING clause",
3728        scope: &both_scope,
3729        relation_type: &RelationType::new(
3730            left_qcx
3731                .relation_type(&left)
3732                .column_types
3733                .into_iter()
3734                .chain(right_qcx.relation_type(&right).column_types)
3735                .collect(),
3736        ),
3737        allow_aggregates: false,
3738        allow_subqueries: false,
3739        allow_parameters: false,
3740        allow_windows: false,
3741    };
3742
3743    let mut join_exprs = vec![];
3744    let mut map_exprs = vec![];
3745    let mut new_items = vec![];
3746    let mut join_cols = vec![];
3747    let mut hidden_cols = vec![];
3748
3749    for column_name in column_names {
3750        let lhs = left_scope.resolve_using_column(column_name, JoinSide::Left)?;
3751        let mut rhs = right_scope.resolve_using_column(column_name, JoinSide::Right)?;
3752
3753        // Adjust the RHS reference to its post-join location.
3754        rhs.column += left_scope.len();
3755
3756        // Join keys must be resolved to same type.
3757        let mut exprs = coerce_homogeneous_exprs(
3758            &ecx.with_name(&format!(
3759                "NATURAL/USING join column {}",
3760                column_name.as_str().quoted()
3761            )),
3762            vec![
3763                CoercibleScalarExpr::Coerced(HirScalarExpr::Column(lhs)),
3764                CoercibleScalarExpr::Coerced(HirScalarExpr::Column(rhs)),
3765            ],
3766            None,
3767        )?;
3768        let (expr1, expr2) = (exprs.remove(0), exprs.remove(0));
3769
3770        match kind {
3771            JoinKind::LeftOuter { .. } | JoinKind::Inner { .. } => {
3772                join_cols.push(lhs.column);
3773                hidden_cols.push(rhs.column);
3774            }
3775            JoinKind::RightOuter => {
3776                join_cols.push(rhs.column);
3777                hidden_cols.push(lhs.column);
3778            }
3779            JoinKind::FullOuter => {
3780                // Create a new column that will be the coalesced value of left
3781                // and right.
3782                join_cols.push(both_scope.items.len() + map_exprs.len());
3783                hidden_cols.push(lhs.column);
3784                hidden_cols.push(rhs.column);
3785                map_exprs.push(HirScalarExpr::CallVariadic {
3786                    func: VariadicFunc::Coalesce,
3787                    exprs: vec![expr1.clone(), expr2.clone()],
3788                });
3789                new_items.push(ScopeItem::from_column_name(column_name));
3790            }
3791        }
3792
3793        // If a `join_using_alias` is present, add a new scope item that accepts
3794        // only table-qualified references for each specified join column.
3795        // Unlike regular table aliases, a `join_using_alias` should not hide the
3796        // names of the joined relations.
3797        if alias_item_name.is_some() {
3798            let new_item_col = both_scope.items.len() + new_items.len();
3799            join_cols.push(new_item_col);
3800            hidden_cols.push(new_item_col);
3801
3802            new_items.push(ScopeItem::from_name(
3803                alias_item_name.clone(),
3804                column_name.clone().to_string(),
3805            ));
3806
3807            // Should be safe to use either `lhs` or `rhs` here since the column
3808            // is available in both scopes and must have the same type of the new item.
3809            map_exprs.push(HirScalarExpr::Column(lhs));
3810        }
3811
3812        join_exprs.push(HirScalarExpr::CallBinary {
3813            func: BinaryFunc::Eq,
3814            expr1: Box::new(expr1),
3815            expr2: Box::new(expr2),
3816        });
3817    }
3818    both_scope.items.extend(new_items);
3819
3820    // The columns from the secondary side of the join remain accessible by
3821    // their table-qualified name, but not by their column name alone. They are
3822    // also excluded from `SELECT *`.
3823    for c in hidden_cols {
3824        both_scope.items[c].allow_unqualified_references = false;
3825    }
3826
3827    // Reproject all returned elements to the front of the list.
3828    let project_key = join_cols
3829        .into_iter()
3830        .chain(0..both_scope.items.len())
3831        .unique()
3832        .collect::<Vec<_>>();
3833
3834    both_scope = both_scope.project(&project_key);
3835
3836    let on = HirScalarExpr::variadic_and(join_exprs);
3837
3838    let both = left
3839        .join(right, on, kind)
3840        .map(map_exprs)
3841        .project(project_key);
3842    Ok((both, both_scope))
3843}
3844
3845pub fn plan_expr<'a>(
3846    ecx: &'a ExprContext,
3847    e: &Expr<Aug>,
3848) -> Result<CoercibleScalarExpr, PlanError> {
3849    ecx.checked_recur(|ecx| plan_expr_inner(ecx, e))
3850}
3851
3852fn plan_expr_inner<'a>(
3853    ecx: &'a ExprContext,
3854    e: &Expr<Aug>,
3855) -> Result<CoercibleScalarExpr, PlanError> {
3856    if let Some(i) = ecx.scope.resolve_expr(e) {
3857        // We've already calculated this expression.
3858        return Ok(HirScalarExpr::Column(i).into());
3859    }
3860
3861    match e {
3862        // Names.
3863        Expr::Identifier(names) | Expr::QualifiedWildcard(names) => {
3864            Ok(plan_identifier(ecx, names)?.into())
3865        }
3866
3867        // Literals.
3868        Expr::Value(val) => plan_literal(val),
3869        Expr::Parameter(n) => plan_parameter(ecx, *n),
3870        Expr::Array(exprs) => plan_array(ecx, exprs, None),
3871        Expr::List(exprs) => plan_list(ecx, exprs, None),
3872        Expr::Map(exprs) => plan_map(ecx, exprs, None),
3873        Expr::Row { exprs } => plan_row(ecx, exprs),
3874
3875        // Generalized functions, operators, and casts.
3876        Expr::Op { op, expr1, expr2 } => {
3877            Ok(plan_op(ecx, normalize::op(op)?, expr1, expr2.as_deref())?.into())
3878        }
3879        Expr::Cast { expr, data_type } => plan_cast(ecx, expr, data_type),
3880        Expr::Function(func) => Ok(plan_function(ecx, func)?.into()),
3881
3882        // Special functions and operators.
3883        Expr::Not { expr } => plan_not(ecx, expr),
3884        Expr::And { left, right } => plan_and(ecx, left, right),
3885        Expr::Or { left, right } => plan_or(ecx, left, right),
3886        Expr::IsExpr {
3887            expr,
3888            construct,
3889            negated,
3890        } => Ok(plan_is_expr(ecx, expr, construct, *negated)?.into()),
3891        Expr::Case {
3892            operand,
3893            conditions,
3894            results,
3895            else_result,
3896        } => Ok(plan_case(ecx, operand, conditions, results, else_result)?.into()),
3897        Expr::HomogenizingFunction { function, exprs } => {
3898            plan_homogenizing_function(ecx, function, exprs)
3899        }
3900        Expr::NullIf { l_expr, r_expr } => Ok(plan_case(
3901            ecx,
3902            &None,
3903            &[l_expr.clone().equals(*r_expr.clone())],
3904            &[Expr::null()],
3905            &Some(Box::new(*l_expr.clone())),
3906        )?
3907        .into()),
3908        Expr::FieldAccess { expr, field } => plan_field_access(ecx, expr, field),
3909        Expr::WildcardAccess(expr) => plan_expr(ecx, expr),
3910        Expr::Subscript { expr, positions } => plan_subscript(ecx, expr, positions),
3911        Expr::Like {
3912            expr,
3913            pattern,
3914            escape,
3915            case_insensitive,
3916            negated,
3917        } => Ok(plan_like(
3918            ecx,
3919            expr,
3920            pattern,
3921            escape.as_deref(),
3922            *case_insensitive,
3923            *negated,
3924        )?
3925        .into()),
3926
3927        Expr::InList {
3928            expr,
3929            list,
3930            negated,
3931        } => plan_in_list(ecx, expr, list, negated),
3932
3933        // Subqueries.
3934        Expr::Exists(query) => plan_exists(ecx, query),
3935        Expr::Subquery(query) => plan_subquery(ecx, query),
3936        Expr::ListSubquery(query) => plan_list_subquery(ecx, query),
3937        Expr::MapSubquery(query) => plan_map_subquery(ecx, query),
3938        Expr::ArraySubquery(query) => plan_array_subquery(ecx, query),
3939        Expr::Collate { expr, collation } => plan_collate(ecx, expr, collation),
3940        Expr::Nested(_) => unreachable!("Expr::Nested not desugared"),
3941        Expr::InSubquery { .. } => unreachable!("Expr::InSubquery not desugared"),
3942        Expr::AnyExpr { .. } => unreachable!("Expr::AnyExpr not desugared"),
3943        Expr::AllExpr { .. } => unreachable!("Expr::AllExpr not desugared"),
3944        Expr::AnySubquery { .. } => unreachable!("Expr::AnySubquery not desugared"),
3945        Expr::AllSubquery { .. } => unreachable!("Expr::AllSubquery not desugared"),
3946        Expr::Between { .. } => unreachable!("Expr::Between not desugared"),
3947    }
3948}
3949
3950fn plan_parameter(ecx: &ExprContext, n: usize) -> Result<CoercibleScalarExpr, PlanError> {
3951    if !ecx.allow_parameters {
3952        // It might be clearer to return an error like "cannot use parameter
3953        // here", but this is how PostgreSQL does it, and so for now we follow
3954        // PostgreSQL.
3955        return Err(PlanError::UnknownParameter(n));
3956    }
3957    if n == 0 || n > 65536 {
3958        return Err(PlanError::UnknownParameter(n));
3959    }
3960    if ecx.param_types().borrow().contains_key(&n) {
3961        Ok(HirScalarExpr::Parameter(n).into())
3962    } else {
3963        Ok(CoercibleScalarExpr::Parameter(n))
3964    }
3965}
3966
3967fn plan_row(ecx: &ExprContext, exprs: &[Expr<Aug>]) -> Result<CoercibleScalarExpr, PlanError> {
3968    let mut out = vec![];
3969    for e in exprs {
3970        out.push(plan_expr(ecx, e)?);
3971    }
3972    Ok(CoercibleScalarExpr::LiteralRecord(out))
3973}
3974
3975fn plan_cast(
3976    ecx: &ExprContext,
3977    expr: &Expr<Aug>,
3978    data_type: &ResolvedDataType,
3979) -> Result<CoercibleScalarExpr, PlanError> {
3980    let to_scalar_type = scalar_type_from_sql(ecx.qcx.scx, data_type)?;
3981    let expr = match expr {
3982        // Special case a direct cast of an ARRAY, LIST, or MAP expression so
3983        // we can pass in the target type as a type hint. This is
3984        // a limited form of the coercion that we do for string literals
3985        // via CoercibleScalarExpr. We used to let CoercibleScalarExpr
3986        // handle ARRAY/LIST/MAP coercion too, but doing so causes
3987        // PostgreSQL compatibility trouble.
3988        //
3989        // See: https://github.com/postgres/postgres/blob/31f403e95/src/backend/parser/parse_expr.c#L2762-L2768
3990        Expr::Array(exprs) => plan_array(ecx, exprs, Some(&to_scalar_type))?,
3991        Expr::List(exprs) => plan_list(ecx, exprs, Some(&to_scalar_type))?,
3992        Expr::Map(exprs) => plan_map(ecx, exprs, Some(&to_scalar_type))?,
3993        _ => plan_expr(ecx, expr)?,
3994    };
3995    let ecx = &ecx.with_name("CAST");
3996    let expr = typeconv::plan_coerce(ecx, expr, &to_scalar_type)?;
3997    let expr = typeconv::plan_cast(ecx, CastContext::Explicit, expr, &to_scalar_type)?;
3998    Ok(expr.into())
3999}
4000
4001fn plan_not(ecx: &ExprContext, expr: &Expr<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4002    let ecx = ecx.with_name("NOT argument");
4003    Ok(HirScalarExpr::CallUnary {
4004        func: UnaryFunc::Not(expr_func::Not),
4005        expr: Box::new(plan_expr(&ecx, expr)?.type_as(&ecx, &ScalarType::Bool)?),
4006    }
4007    .into())
4008}
4009
4010fn plan_and(
4011    ecx: &ExprContext,
4012    left: &Expr<Aug>,
4013    right: &Expr<Aug>,
4014) -> Result<CoercibleScalarExpr, PlanError> {
4015    let ecx = ecx.with_name("AND argument");
4016    Ok(HirScalarExpr::variadic_and(vec![
4017        plan_expr(&ecx, left)?.type_as(&ecx, &ScalarType::Bool)?,
4018        plan_expr(&ecx, right)?.type_as(&ecx, &ScalarType::Bool)?,
4019    ])
4020    .into())
4021}
4022
4023fn plan_or(
4024    ecx: &ExprContext,
4025    left: &Expr<Aug>,
4026    right: &Expr<Aug>,
4027) -> Result<CoercibleScalarExpr, PlanError> {
4028    let ecx = ecx.with_name("OR argument");
4029    Ok(HirScalarExpr::variadic_or(vec![
4030        plan_expr(&ecx, left)?.type_as(&ecx, &ScalarType::Bool)?,
4031        plan_expr(&ecx, right)?.type_as(&ecx, &ScalarType::Bool)?,
4032    ])
4033    .into())
4034}
4035
4036fn plan_in_list(
4037    ecx: &ExprContext,
4038    lhs: &Expr<Aug>,
4039    list: &Vec<Expr<Aug>>,
4040    negated: &bool,
4041) -> Result<CoercibleScalarExpr, PlanError> {
4042    let ecx = ecx.with_name("IN list");
4043    let or = HirScalarExpr::variadic_or(
4044        list.into_iter()
4045            .map(|e| {
4046                let eq = lhs.clone().equals(e.clone());
4047                plan_expr(&ecx, &eq)?.type_as(&ecx, &ScalarType::Bool)
4048            })
4049            .collect::<Result<Vec<HirScalarExpr>, PlanError>>()?,
4050    );
4051    Ok(if *negated {
4052        or.call_unary(UnaryFunc::Not(expr_func::Not))
4053    } else {
4054        or
4055    }
4056    .into())
4057}
4058
4059fn plan_homogenizing_function(
4060    ecx: &ExprContext,
4061    function: &HomogenizingFunction,
4062    exprs: &[Expr<Aug>],
4063) -> Result<CoercibleScalarExpr, PlanError> {
4064    assert!(!exprs.is_empty()); // `COALESCE()` is a syntax error
4065    let expr = HirScalarExpr::CallVariadic {
4066        func: match function {
4067            HomogenizingFunction::Coalesce => VariadicFunc::Coalesce,
4068            HomogenizingFunction::Greatest => VariadicFunc::Greatest,
4069            HomogenizingFunction::Least => VariadicFunc::Least,
4070        },
4071        exprs: coerce_homogeneous_exprs(
4072            &ecx.with_name(&function.to_string().to_lowercase()),
4073            plan_exprs(ecx, exprs)?,
4074            None,
4075        )?,
4076    };
4077    Ok(expr.into())
4078}
4079
4080fn plan_field_access(
4081    ecx: &ExprContext,
4082    expr: &Expr<Aug>,
4083    field: &Ident,
4084) -> Result<CoercibleScalarExpr, PlanError> {
4085    let field = normalize::column_name(field.clone());
4086    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4087    let ty = ecx.scalar_type(&expr);
4088    let i = match &ty {
4089        ScalarType::Record { fields, .. } => fields.iter().position(|(name, _ty)| *name == field),
4090        ty => sql_bail!(
4091            "column notation applied to type {}, which is not a composite type",
4092            ecx.humanize_scalar_type(ty, false)
4093        ),
4094    };
4095    match i {
4096        None => sql_bail!(
4097            "field {} not found in data type {}",
4098            field,
4099            ecx.humanize_scalar_type(&ty, false)
4100        ),
4101        Some(i) => Ok(expr
4102            .call_unary(UnaryFunc::RecordGet(expr_func::RecordGet(i)))
4103            .into()),
4104    }
4105}
4106
4107fn plan_subscript(
4108    ecx: &ExprContext,
4109    expr: &Expr<Aug>,
4110    positions: &[SubscriptPosition<Aug>],
4111) -> Result<CoercibleScalarExpr, PlanError> {
4112    assert!(
4113        !positions.is_empty(),
4114        "subscript expression must contain at least one position"
4115    );
4116
4117    let ecx = &ecx.with_name("subscripting");
4118    let expr = plan_expr(ecx, expr)?.type_as_any(ecx)?;
4119    let ty = ecx.scalar_type(&expr);
4120    match &ty {
4121        ScalarType::Array(..) | ScalarType::Int2Vector => plan_subscript_array(
4122            ecx,
4123            expr,
4124            positions,
4125            // Int2Vector uses 0-based indexing, while arrays use 1-based indexing, so we need to
4126            // adjust all Int2Vector subscript operations by 1 (both w/r/t input and the values we
4127            // track in its backing data).
4128            if ty == ScalarType::Int2Vector { 1 } else { 0 },
4129        ),
4130        ScalarType::Jsonb => plan_subscript_jsonb(ecx, expr, positions),
4131        ScalarType::List { element_type, .. } => {
4132            // `elem_type_name` is used only in error msgs, so we set `postgres_compat` to false.
4133            let elem_type_name = ecx.humanize_scalar_type(element_type, false);
4134            let n_layers = ty.unwrap_list_n_layers();
4135            plan_subscript_list(ecx, expr, positions, n_layers, &elem_type_name)
4136        }
4137        ty => sql_bail!(
4138            "cannot subscript type {}",
4139            ecx.humanize_scalar_type(ty, false)
4140        ),
4141    }
4142}
4143
4144// All subscript positions are of the form [<expr>(:<expr>?)?]; extract all
4145// expressions from those that look like indexes (i.e. `[<expr>]`) or error if
4146// any were slices (i.e. included colon).
4147fn extract_scalar_subscript_from_positions<'a>(
4148    positions: &'a [SubscriptPosition<Aug>],
4149    expr_type_name: &str,
4150) -> Result<Vec<&'a Expr<Aug>>, PlanError> {
4151    let mut scalar_subscripts = Vec::with_capacity(positions.len());
4152    for p in positions {
4153        if p.explicit_slice {
4154            sql_bail!("{} subscript does not support slices", expr_type_name);
4155        }
4156        assert!(
4157            p.end.is_none(),
4158            "index-appearing subscripts cannot have end value"
4159        );
4160        scalar_subscripts.push(p.start.as_ref().expect("has start if not slice"));
4161    }
4162    Ok(scalar_subscripts)
4163}
4164
4165fn plan_subscript_array(
4166    ecx: &ExprContext,
4167    expr: HirScalarExpr,
4168    positions: &[SubscriptPosition<Aug>],
4169    offset: i64,
4170) -> Result<CoercibleScalarExpr, PlanError> {
4171    let mut exprs = Vec::with_capacity(positions.len() + 1);
4172    exprs.push(expr);
4173
4174    // Subscripting arrays doesn't yet support slicing, so we always want to
4175    // extract scalars or error.
4176    let indexes = extract_scalar_subscript_from_positions(positions, "array")?;
4177
4178    for i in indexes {
4179        exprs.push(plan_expr(ecx, i)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?);
4180    }
4181
4182    Ok(HirScalarExpr::CallVariadic {
4183        func: VariadicFunc::ArrayIndex { offset },
4184        exprs,
4185    }
4186    .into())
4187}
4188
4189fn plan_subscript_list(
4190    ecx: &ExprContext,
4191    mut expr: HirScalarExpr,
4192    positions: &[SubscriptPosition<Aug>],
4193    mut remaining_layers: usize,
4194    elem_type_name: &str,
4195) -> Result<CoercibleScalarExpr, PlanError> {
4196    let mut i = 0;
4197
4198    while i < positions.len() {
4199        // Take all contiguous index operations, i.e. find next slice operation.
4200        let j = positions[i..]
4201            .iter()
4202            .position(|p| p.explicit_slice)
4203            .unwrap_or(positions.len() - i);
4204        if j != 0 {
4205            let indexes = extract_scalar_subscript_from_positions(&positions[i..i + j], "")?;
4206            let (n, e) = plan_index_list(
4207                ecx,
4208                expr,
4209                indexes.as_slice(),
4210                remaining_layers,
4211                elem_type_name,
4212            )?;
4213            remaining_layers = n;
4214            expr = e;
4215            i += j;
4216        }
4217
4218        // Take all contiguous slice operations, i.e. find next index operation.
4219        let j = positions[i..]
4220            .iter()
4221            .position(|p| !p.explicit_slice)
4222            .unwrap_or(positions.len() - i);
4223        if j != 0 {
4224            expr = plan_slice_list(
4225                ecx,
4226                expr,
4227                &positions[i..i + j],
4228                remaining_layers,
4229                elem_type_name,
4230            )?;
4231            i += j;
4232        }
4233    }
4234
4235    Ok(expr.into())
4236}
4237
4238fn plan_index_list(
4239    ecx: &ExprContext,
4240    expr: HirScalarExpr,
4241    indexes: &[&Expr<Aug>],
4242    n_layers: usize,
4243    elem_type_name: &str,
4244) -> Result<(usize, HirScalarExpr), PlanError> {
4245    let depth = indexes.len();
4246
4247    if depth > n_layers {
4248        if n_layers == 0 {
4249            sql_bail!("cannot subscript type {}", elem_type_name)
4250        } else {
4251            sql_bail!(
4252                "cannot index into {} layers; list only has {} layer{}",
4253                depth,
4254                n_layers,
4255                if n_layers == 1 { "" } else { "s" }
4256            )
4257        }
4258    }
4259
4260    let mut exprs = Vec::with_capacity(depth + 1);
4261    exprs.push(expr);
4262
4263    for i in indexes {
4264        exprs.push(plan_expr(ecx, i)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?);
4265    }
4266
4267    Ok((
4268        n_layers - depth,
4269        HirScalarExpr::CallVariadic {
4270            func: VariadicFunc::ListIndex,
4271            exprs,
4272        },
4273    ))
4274}
4275
4276fn plan_slice_list(
4277    ecx: &ExprContext,
4278    expr: HirScalarExpr,
4279    slices: &[SubscriptPosition<Aug>],
4280    n_layers: usize,
4281    elem_type_name: &str,
4282) -> Result<HirScalarExpr, PlanError> {
4283    if n_layers == 0 {
4284        sql_bail!("cannot subscript type {}", elem_type_name)
4285    }
4286
4287    // first arg will be list
4288    let mut exprs = Vec::with_capacity(slices.len() + 1);
4289    exprs.push(expr);
4290    // extract (start, end) parts from collected slices
4291    let extract_position_or_default = |position, default| -> Result<HirScalarExpr, PlanError> {
4292        Ok(match position {
4293            Some(p) => {
4294                plan_expr(ecx, p)?.cast_to(ecx, CastContext::Explicit, &ScalarType::Int64)?
4295            }
4296            None => HirScalarExpr::literal(Datum::Int64(default), ScalarType::Int64),
4297        })
4298    };
4299    for p in slices {
4300        let start = extract_position_or_default(p.start.as_ref(), 1)?;
4301        let end = extract_position_or_default(p.end.as_ref(), i64::MAX - 1)?;
4302        exprs.push(start);
4303        exprs.push(end);
4304    }
4305
4306    Ok(HirScalarExpr::CallVariadic {
4307        func: VariadicFunc::ListSliceLinear,
4308        exprs,
4309    })
4310}
4311
4312fn plan_like(
4313    ecx: &ExprContext,
4314    expr: &Expr<Aug>,
4315    pattern: &Expr<Aug>,
4316    escape: Option<&Expr<Aug>>,
4317    case_insensitive: bool,
4318    not: bool,
4319) -> Result<HirScalarExpr, PlanError> {
4320    use CastContext::Implicit;
4321    let ecx = ecx.with_name("LIKE argument");
4322    let expr = plan_expr(&ecx, expr)?;
4323    let haystack = match ecx.scalar_type(&expr) {
4324        CoercibleScalarType::Coerced(ref ty @ ScalarType::Char { length }) => expr
4325            .type_as(&ecx, ty)?
4326            .call_unary(UnaryFunc::PadChar(expr_func::PadChar { length })),
4327        _ => expr.cast_to(&ecx, Implicit, &ScalarType::String)?,
4328    };
4329    let mut pattern = plan_expr(&ecx, pattern)?.cast_to(&ecx, Implicit, &ScalarType::String)?;
4330    if let Some(escape) = escape {
4331        pattern = pattern.call_binary(
4332            plan_expr(&ecx, escape)?.cast_to(&ecx, Implicit, &ScalarType::String)?,
4333            BinaryFunc::LikeEscape,
4334        );
4335    }
4336    let like = haystack.call_binary(pattern, BinaryFunc::IsLikeMatch { case_insensitive });
4337    if not {
4338        Ok(like.call_unary(UnaryFunc::Not(expr_func::Not)))
4339    } else {
4340        Ok(like)
4341    }
4342}
4343
4344fn plan_subscript_jsonb(
4345    ecx: &ExprContext,
4346    expr: HirScalarExpr,
4347    positions: &[SubscriptPosition<Aug>],
4348) -> Result<CoercibleScalarExpr, PlanError> {
4349    use CastContext::Implicit;
4350    use ScalarType::{Int64, String};
4351
4352    // JSONB doesn't support the slicing syntax, so simply error if you
4353    // encounter any explicit slices.
4354    let subscripts = extract_scalar_subscript_from_positions(positions, "jsonb")?;
4355
4356    let mut exprs = Vec::with_capacity(subscripts.len());
4357    for s in subscripts {
4358        let subscript = plan_expr(ecx, s)?;
4359        let subscript = if let Ok(subscript) = subscript.clone().cast_to(ecx, Implicit, &String) {
4360            subscript
4361        } else if let Ok(subscript) = subscript.cast_to(ecx, Implicit, &Int64) {
4362            // Integers are converted to a string here and then re-parsed as an
4363            // integer by `JsonbGetPath`. Weird, but this is how PostgreSQL says to
4364            // do it.
4365            typeconv::to_string(ecx, subscript)
4366        } else {
4367            sql_bail!("jsonb subscript type must be coercible to integer or text");
4368        };
4369        exprs.push(subscript);
4370    }
4371
4372    // Subscripting works like `expr #> ARRAY[subscript]` rather than
4373    // `expr->subscript` as you might expect.
4374    let expr = expr.call_binary(
4375        HirScalarExpr::CallVariadic {
4376            func: VariadicFunc::ArrayCreate {
4377                elem_type: ScalarType::String,
4378            },
4379            exprs,
4380        },
4381        BinaryFunc::JsonbGetPath,
4382    );
4383    Ok(expr.into())
4384}
4385
4386fn plan_exists(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4387    if !ecx.allow_subqueries {
4388        sql_bail!("{} does not allow subqueries", ecx.name)
4389    }
4390    let mut qcx = ecx.derived_query_context();
4391    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4392    Ok(expr.exists().into())
4393}
4394
4395fn plan_subquery(ecx: &ExprContext, query: &Query<Aug>) -> Result<CoercibleScalarExpr, PlanError> {
4396    if !ecx.allow_subqueries {
4397        sql_bail!("{} does not allow subqueries", ecx.name)
4398    }
4399    let mut qcx = ecx.derived_query_context();
4400    let (expr, _scope) = plan_nested_query(&mut qcx, query)?;
4401    let column_types = qcx.relation_type(&expr).column_types;
4402    if column_types.len() != 1 {
4403        sql_bail!(
4404            "Expected subselect to return 1 column, got {} columns",
4405            column_types.len()
4406        );
4407    }
4408    Ok(expr.select().into())
4409}
4410
4411fn plan_list_subquery(
4412    ecx: &ExprContext,
4413    query: &Query<Aug>,
4414) -> Result<CoercibleScalarExpr, PlanError> {
4415    plan_vector_like_subquery(
4416        ecx,
4417        query,
4418        |_| false,
4419        |elem_type| VariadicFunc::ListCreate { elem_type },
4420        |order_by| AggregateFunc::ListConcat { order_by },
4421        BinaryFunc::ListListConcat,
4422        |elem_type| {
4423            HirScalarExpr::literal(
4424                Datum::empty_list(),
4425                ScalarType::List {
4426                    element_type: Box::new(elem_type),
4427                    custom_id: None,
4428                },
4429            )
4430        },
4431        "list",
4432    )
4433}
4434
4435fn plan_array_subquery(
4436    ecx: &ExprContext,
4437    query: &Query<Aug>,
4438) -> Result<CoercibleScalarExpr, PlanError> {
4439    plan_vector_like_subquery(
4440        ecx,
4441        query,
4442        |elem_type| {
4443            matches!(
4444                elem_type,
4445                ScalarType::Char { .. }
4446                    | ScalarType::Array { .. }
4447                    | ScalarType::List { .. }
4448                    | ScalarType::Map { .. }
4449            )
4450        },
4451        |elem_type| VariadicFunc::ArrayCreate { elem_type },
4452        |order_by| AggregateFunc::ArrayConcat { order_by },
4453        BinaryFunc::ArrayArrayConcat,
4454        |elem_type| {
4455            HirScalarExpr::literal(Datum::empty_array(), ScalarType::Array(Box::new(elem_type)))
4456        },
4457        "[]",
4458    )
4459}
4460
4461/// Generic function used to plan both array subqueries and list subqueries
4462fn plan_vector_like_subquery<F1, F2, F3, F4>(
4463    ecx: &ExprContext,
4464    query: &Query<Aug>,
4465    is_unsupported_type: F1,
4466    vector_create: F2,
4467    aggregate_concat: F3,
4468    binary_concat: BinaryFunc,
4469    empty_literal: F4,
4470    vector_type_string: &str,
4471) -> Result<CoercibleScalarExpr, PlanError>
4472where
4473    F1: Fn(&ScalarType) -> bool,
4474    F2: Fn(ScalarType) -> VariadicFunc,
4475    F3: Fn(Vec<ColumnOrder>) -> AggregateFunc,
4476    F4: Fn(ScalarType) -> HirScalarExpr,
4477{
4478    if !ecx.allow_subqueries {
4479        sql_bail!("{} does not allow subqueries", ecx.name)
4480    }
4481
4482    let mut qcx = ecx.derived_query_context();
4483    let mut planned_query = plan_query(&mut qcx, query)?;
4484    if planned_query.limit.is_some() || planned_query.offset > 0 {
4485        planned_query.expr = HirRelationExpr::top_k(
4486            planned_query.expr,
4487            vec![],
4488            planned_query.order_by.clone(),
4489            planned_query.limit,
4490            planned_query.offset,
4491            planned_query.group_size_hints.limit_input_group_size,
4492        );
4493    }
4494
4495    if planned_query.project.len() != 1 {
4496        sql_bail!(
4497            "Expected subselect to return 1 column, got {} columns",
4498            planned_query.project.len()
4499        );
4500    }
4501
4502    let project_column = *planned_query.project.get(0).unwrap();
4503    let elem_type = qcx
4504        .relation_type(&planned_query.expr)
4505        .column_types
4506        .get(project_column)
4507        .cloned()
4508        .unwrap()
4509        .scalar_type();
4510
4511    if is_unsupported_type(&elem_type) {
4512        bail_unsupported!(format!(
4513            "cannot build array from subquery because return type {}{}",
4514            ecx.humanize_scalar_type(&elem_type, false),
4515            vector_type_string
4516        ));
4517    }
4518
4519    // `ColumnRef`s in `aggregation_exprs` refers to the columns produced by planning the
4520    // subquery above.
4521    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::CallVariadic {
4522        func: vector_create(elem_type.clone()),
4523        exprs: vec![HirScalarExpr::column(project_column)],
4524    })
4525    .chain(
4526        planned_query
4527            .order_by
4528            .iter()
4529            .map(|co| HirScalarExpr::column(co.column)),
4530    )
4531    .collect();
4532
4533    // However, column references for `aggregation_projection` and `aggregation_order_by`
4534    // are with reference to the `exprs` of the aggregation expression.  Here that is
4535    // `aggregation_exprs`.
4536    let aggregation_projection = vec![0];
4537    let aggregation_order_by = planned_query
4538        .order_by
4539        .into_iter()
4540        .enumerate()
4541        .map(|(i, order)| ColumnOrder { column: i, ..order })
4542        .collect();
4543
4544    let reduced_expr = planned_query
4545        .expr
4546        .reduce(
4547            vec![],
4548            vec![AggregateExpr {
4549                func: aggregate_concat(aggregation_order_by),
4550                expr: Box::new(HirScalarExpr::CallVariadic {
4551                    func: VariadicFunc::RecordCreate {
4552                        field_names: iter::repeat(ColumnName::from(""))
4553                            .take(aggregation_exprs.len())
4554                            .collect(),
4555                    },
4556                    exprs: aggregation_exprs,
4557                }),
4558                distinct: false,
4559            }],
4560            None,
4561        )
4562        .project(aggregation_projection);
4563
4564    // If `expr` has no rows, return an empty array/list rather than NULL.
4565    Ok(HirScalarExpr::CallBinary {
4566        func: binary_concat,
4567        expr1: Box::new(HirScalarExpr::Select(Box::new(reduced_expr))),
4568        expr2: Box::new(empty_literal(elem_type)),
4569    }
4570    .into())
4571}
4572
4573fn plan_map_subquery(
4574    ecx: &ExprContext,
4575    query: &Query<Aug>,
4576) -> Result<CoercibleScalarExpr, PlanError> {
4577    if !ecx.allow_subqueries {
4578        sql_bail!("{} does not allow subqueries", ecx.name)
4579    }
4580
4581    let mut qcx = ecx.derived_query_context();
4582    let mut query = plan_query(&mut qcx, query)?;
4583    if query.limit.is_some() || query.offset > 0 {
4584        query.expr = HirRelationExpr::top_k(
4585            query.expr,
4586            vec![],
4587            query.order_by.clone(),
4588            query.limit,
4589            query.offset,
4590            query.group_size_hints.limit_input_group_size,
4591        );
4592    }
4593    if query.project.len() != 2 {
4594        sql_bail!(
4595            "expected map subquery to return 2 columns, got {} columns",
4596            query.project.len()
4597        );
4598    }
4599
4600    let query_types = qcx.relation_type(&query.expr).column_types;
4601    let key_column = query.project[0];
4602    let key_type = query_types[key_column].clone().scalar_type();
4603    let value_column = query.project[1];
4604    let value_type = query_types[value_column].clone().scalar_type();
4605
4606    if key_type != ScalarType::String {
4607        sql_bail!("cannot build map from subquery because first column is not of type text");
4608    }
4609
4610    let aggregation_exprs: Vec<_> = iter::once(HirScalarExpr::CallVariadic {
4611        func: VariadicFunc::RecordCreate {
4612            field_names: vec![ColumnName::from("key"), ColumnName::from("value")],
4613        },
4614        exprs: vec![
4615            HirScalarExpr::column(key_column),
4616            HirScalarExpr::column(value_column),
4617        ],
4618    })
4619    .chain(
4620        query
4621            .order_by
4622            .iter()
4623            .map(|co| HirScalarExpr::column(co.column)),
4624    )
4625    .collect();
4626
4627    let expr = query
4628        .expr
4629        .reduce(
4630            vec![],
4631            vec![AggregateExpr {
4632                func: AggregateFunc::MapAgg {
4633                    order_by: query
4634                        .order_by
4635                        .into_iter()
4636                        .enumerate()
4637                        .map(|(i, order)| ColumnOrder { column: i, ..order })
4638                        .collect(),
4639                    value_type: value_type.clone(),
4640                },
4641                expr: Box::new(HirScalarExpr::CallVariadic {
4642                    func: VariadicFunc::RecordCreate {
4643                        field_names: iter::repeat(ColumnName::from(""))
4644                            .take(aggregation_exprs.len())
4645                            .collect(),
4646                    },
4647                    exprs: aggregation_exprs,
4648                }),
4649                distinct: false,
4650            }],
4651            None,
4652        )
4653        .project(vec![0]);
4654
4655    // If `expr` has no rows, return an empty map rather than NULL.
4656    let expr = HirScalarExpr::CallVariadic {
4657        func: VariadicFunc::Coalesce,
4658        exprs: vec![
4659            HirScalarExpr::Select(Box::new(expr)),
4660            HirScalarExpr::literal(
4661                Datum::empty_map(),
4662                ScalarType::Map {
4663                    value_type: Box::new(value_type),
4664                    custom_id: None,
4665                },
4666            ),
4667        ],
4668    };
4669
4670    Ok(expr.into())
4671}
4672
4673fn plan_collate(
4674    ecx: &ExprContext,
4675    expr: &Expr<Aug>,
4676    collation: &UnresolvedItemName,
4677) -> Result<CoercibleScalarExpr, PlanError> {
4678    if collation.0.len() == 2
4679        && collation.0[0] == ident!(mz_repr::namespaces::PG_CATALOG_SCHEMA)
4680        && collation.0[1] == ident!("default")
4681    {
4682        plan_expr(ecx, expr)
4683    } else {
4684        bail_unsupported!("COLLATE");
4685    }
4686}
4687
4688/// Plans a slice of expressions.
4689///
4690/// This function is a simple convenience function for mapping [`plan_expr`]
4691/// over a slice of expressions. The planned expressions are returned in the
4692/// same order as the input. If any of the expressions fail to plan, returns an
4693/// error instead.
4694fn plan_exprs<E>(ecx: &ExprContext, exprs: &[E]) -> Result<Vec<CoercibleScalarExpr>, PlanError>
4695where
4696    E: std::borrow::Borrow<Expr<Aug>>,
4697{
4698    let mut out = vec![];
4699    for expr in exprs {
4700        out.push(plan_expr(ecx, expr.borrow())?);
4701    }
4702    Ok(out)
4703}
4704
4705/// Plans an `ARRAY` expression.
4706fn plan_array(
4707    ecx: &ExprContext,
4708    exprs: &[Expr<Aug>],
4709    type_hint: Option<&ScalarType>,
4710) -> Result<CoercibleScalarExpr, PlanError> {
4711    // Plan each element expression.
4712    let mut out = vec![];
4713    for expr in exprs {
4714        out.push(match expr {
4715            // Special case nested ARRAY expressions so we can plumb
4716            // the type hint through.
4717            Expr::Array(exprs) => plan_array(ecx, exprs, type_hint.clone())?,
4718            _ => plan_expr(ecx, expr)?,
4719        });
4720    }
4721
4722    // Attempt to make use of the type hint.
4723    let type_hint = match type_hint {
4724        // The user has provided an explicit cast to an array type. We know the
4725        // element type to coerce to. Need to be careful, though: if there's
4726        // evidence that any of the array elements are themselves arrays, we
4727        // want to coerce to the array type, not the element type.
4728        Some(ScalarType::Array(elem_type)) => {
4729            let multidimensional = out.iter().any(|e| {
4730                matches!(
4731                    ecx.scalar_type(e),
4732                    CoercibleScalarType::Coerced(ScalarType::Array(_))
4733                )
4734            });
4735            if multidimensional {
4736                type_hint
4737            } else {
4738                Some(&**elem_type)
4739            }
4740        }
4741        // The user provided an explicit cast to a non-array type. We'll have to
4742        // guess what the correct type for the array. Our caller will then
4743        // handle converting that array type to the desired non-array type.
4744        Some(_) => None,
4745        // No type hint. We'll have to guess the correct type for the array.
4746        None => None,
4747    };
4748
4749    // Coerce all elements to the same type.
4750    let (elem_type, exprs) = if exprs.is_empty() {
4751        if let Some(elem_type) = type_hint {
4752            (elem_type.clone(), vec![])
4753        } else {
4754            sql_bail!("cannot determine type of empty array");
4755        }
4756    } else {
4757        let out = coerce_homogeneous_exprs(&ecx.with_name("ARRAY"), out, type_hint)?;
4758        (ecx.scalar_type(&out[0]), out)
4759    };
4760
4761    // Arrays of `char` type are disallowed due to a known limitation:
4762    // https://github.com/MaterializeInc/database-issues/issues/2360.
4763    //
4764    // Arrays of `list` and `map` types are disallowed due to mind-bending
4765    // semantics.
4766    if matches!(
4767        elem_type,
4768        ScalarType::Char { .. } | ScalarType::List { .. } | ScalarType::Map { .. }
4769    ) {
4770        bail_unsupported!(format!("{}[]", ecx.humanize_scalar_type(&elem_type, false)));
4771    }
4772
4773    Ok(HirScalarExpr::CallVariadic {
4774        func: VariadicFunc::ArrayCreate { elem_type },
4775        exprs,
4776    }
4777    .into())
4778}
4779
4780fn plan_list(
4781    ecx: &ExprContext,
4782    exprs: &[Expr<Aug>],
4783    type_hint: Option<&ScalarType>,
4784) -> Result<CoercibleScalarExpr, PlanError> {
4785    let (elem_type, exprs) = if exprs.is_empty() {
4786        if let Some(ScalarType::List { element_type, .. }) = type_hint {
4787            (element_type.without_modifiers(), vec![])
4788        } else {
4789            sql_bail!("cannot determine type of empty list");
4790        }
4791    } else {
4792        let type_hint = match type_hint {
4793            Some(ScalarType::List { element_type, .. }) => Some(&**element_type),
4794            _ => None,
4795        };
4796
4797        let mut out = vec![];
4798        for expr in exprs {
4799            out.push(match expr {
4800                // Special case nested LIST expressions so we can plumb
4801                // the type hint through.
4802                Expr::List(exprs) => plan_list(ecx, exprs, type_hint)?,
4803                _ => plan_expr(ecx, expr)?,
4804            });
4805        }
4806        let out = coerce_homogeneous_exprs(&ecx.with_name("LIST"), out, type_hint)?;
4807        (ecx.scalar_type(&out[0]).without_modifiers(), out)
4808    };
4809
4810    if matches!(elem_type, ScalarType::Char { .. }) {
4811        bail_unsupported!("char list");
4812    }
4813
4814    Ok(HirScalarExpr::CallVariadic {
4815        func: VariadicFunc::ListCreate { elem_type },
4816        exprs,
4817    }
4818    .into())
4819}
4820
4821fn plan_map(
4822    ecx: &ExprContext,
4823    entries: &[MapEntry<Aug>],
4824    type_hint: Option<&ScalarType>,
4825) -> Result<CoercibleScalarExpr, PlanError> {
4826    let (value_type, exprs) = if entries.is_empty() {
4827        if let Some(ScalarType::Map { value_type, .. }) = type_hint {
4828            (value_type.without_modifiers(), vec![])
4829        } else {
4830            sql_bail!("cannot determine type of empty map");
4831        }
4832    } else {
4833        let type_hint = match type_hint {
4834            Some(ScalarType::Map { value_type, .. }) => Some(&**value_type),
4835            _ => None,
4836        };
4837
4838        let mut keys = vec![];
4839        let mut values = vec![];
4840        for MapEntry { key, value } in entries {
4841            let key = plan_expr(ecx, key)?.type_as(ecx, &ScalarType::String)?;
4842            let value = match value {
4843                // Special case nested MAP expressions so we can plumb
4844                // the type hint through.
4845                Expr::Map(entries) => plan_map(ecx, entries, type_hint)?,
4846                _ => plan_expr(ecx, value)?,
4847            };
4848            keys.push(key);
4849            values.push(value);
4850        }
4851        let values = coerce_homogeneous_exprs(&ecx.with_name("MAP"), values, type_hint)?;
4852        let value_type = ecx.scalar_type(&values[0]).without_modifiers();
4853        let out = itertools::interleave(keys, values).collect();
4854        (value_type, out)
4855    };
4856
4857    if matches!(value_type, ScalarType::Char { .. }) {
4858        bail_unsupported!("char map");
4859    }
4860
4861    let expr = HirScalarExpr::CallVariadic {
4862        func: VariadicFunc::MapBuild { value_type },
4863        exprs,
4864    };
4865    Ok(expr.into())
4866}
4867
4868/// Coerces a list of expressions such that all input expressions will be cast
4869/// to the same type. If successful, returns a new list of expressions in the
4870/// same order as the input, where each expression has the appropriate casts to
4871/// make them all of a uniform type.
4872///
4873/// If `force_type` is `Some`, the expressions are forced to the specified type
4874/// via an explicit cast. Otherwise the best common type is guessed via
4875/// [`typeconv::guess_best_common_type`] and conversions are attempted via
4876/// implicit casts
4877///
4878/// Note that this is our implementation of Postgres' type conversion for
4879/// ["`UNION`, `CASE`, and Related Constructs"][union-type-conv], though it
4880/// isn't yet used in all of those cases.
4881///
4882/// [union-type-conv]:
4883/// https://www.postgresql.org/docs/12/typeconv-union-case.html
4884pub fn coerce_homogeneous_exprs(
4885    ecx: &ExprContext,
4886    exprs: Vec<CoercibleScalarExpr>,
4887    force_type: Option<&ScalarType>,
4888) -> Result<Vec<HirScalarExpr>, PlanError> {
4889    assert!(!exprs.is_empty());
4890
4891    let target_holder;
4892    let target = match force_type {
4893        Some(t) => t,
4894        None => {
4895            let types: Vec<_> = exprs.iter().map(|e| ecx.scalar_type(e)).collect();
4896            target_holder = typeconv::guess_best_common_type(ecx, &types)?;
4897            &target_holder
4898        }
4899    };
4900
4901    // Try to cast all expressions to `target`.
4902    let mut out = Vec::new();
4903    for expr in exprs {
4904        let arg = typeconv::plan_coerce(ecx, expr, target)?;
4905        let ccx = match force_type {
4906            None => CastContext::Implicit,
4907            Some(_) => CastContext::Explicit,
4908        };
4909        match typeconv::plan_cast(ecx, ccx, arg.clone(), target) {
4910            Ok(expr) => out.push(expr),
4911            Err(_) => sql_bail!(
4912                "{} could not convert type {} to {}",
4913                ecx.name,
4914                ecx.humanize_scalar_type(&ecx.scalar_type(&arg), false),
4915                ecx.humanize_scalar_type(target, false),
4916            ),
4917        }
4918    }
4919    Ok(out)
4920}
4921
4922/// Creates a `ColumnOrder` from an `OrderByExpr` and column index.
4923/// Column index is specified by the caller, but `desc` and `nulls_last` is figured out here.
4924pub(crate) fn resolve_desc_and_nulls_last<T: AstInfo>(
4925    obe: &OrderByExpr<T>,
4926    column: usize,
4927) -> ColumnOrder {
4928    let desc = !obe.asc.unwrap_or(true);
4929    ColumnOrder {
4930        column,
4931        desc,
4932        // https://www.postgresql.org/docs/14/queries-order.html
4933        //   "NULLS FIRST is the default for DESC order, and NULLS LAST otherwise"
4934        nulls_last: obe.nulls_last.unwrap_or(!desc),
4935    }
4936}
4937
4938/// Plans the ORDER BY clause of a window function.
4939///
4940/// Unfortunately, we have to create two HIR structs from an AST OrderByExpr:
4941/// A ColumnOrder has asc/desc and nulls first/last, but can't represent an HirScalarExpr, just
4942/// a column reference by index. Therefore, we return both HirScalarExprs and ColumnOrders.
4943/// Note that the column references in the ColumnOrders point NOT to input columns, but into the
4944/// `Vec<HirScalarExpr>` that we return.
4945fn plan_function_order_by(
4946    ecx: &ExprContext,
4947    order_by: &[OrderByExpr<Aug>],
4948) -> Result<(Vec<HirScalarExpr>, Vec<ColumnOrder>), PlanError> {
4949    let mut order_by_exprs = vec![];
4950    let mut col_orders = vec![];
4951    {
4952        for (i, obe) in order_by.iter().enumerate() {
4953            // Unlike `SELECT ... ORDER BY` clauses, function `ORDER BY` clauses
4954            // do not support ordinal references in PostgreSQL. So we use
4955            // `plan_expr` directly rather than `plan_order_by_or_distinct_expr`.
4956            let expr = plan_expr(ecx, &obe.expr)?.type_as_any(ecx)?;
4957            order_by_exprs.push(expr);
4958            col_orders.push(resolve_desc_and_nulls_last(obe, i));
4959        }
4960    }
4961    Ok((order_by_exprs, col_orders))
4962}
4963
4964/// Common part of the planning of windowed and non-windowed aggregation functions.
4965fn plan_aggregate_common(
4966    ecx: &ExprContext,
4967    Function::<Aug> {
4968        name,
4969        args,
4970        filter,
4971        over: _,
4972        distinct,
4973    }: &Function<Aug>,
4974) -> Result<AggregateExpr, PlanError> {
4975    // Normal aggregate functions, like `sum`, expect as input a single expression
4976    // which yields the datum to aggregate. Order sensitive aggregate functions,
4977    // like `jsonb_agg`, are special, and instead expect a Record whose first
4978    // element yields the datum to aggregate and whose successive elements yield
4979    // keys to order by. This expectation is hard coded within the implementation
4980    // of each of the order-sensitive aggregates. The specification of how many
4981    // order by keys to consider, and in what order, is passed via the `order_by`
4982    // field on the `AggregateFunc` variant.
4983
4984    // While all aggregate functions support the ORDER BY syntax, it's a no-op for
4985    // most, so explicitly drop it if the function doesn't care about order. This
4986    // prevents the projection into Record below from triggering on unsupported
4987    // functions.
4988
4989    let impls = match resolve_func(ecx, name, args)? {
4990        Func::Aggregate(impls) => impls,
4991        _ => unreachable!("plan_aggregate_common called on non-aggregate function,"),
4992    };
4993
4994    // We follow PostgreSQL's rule here for mapping `count(*)` into the
4995    // generalized function selection framework. The rule is simple: the user
4996    // must type `count(*)`, but the function selection framework sees an empty
4997    // parameter list, as if the user had typed `count()`. But if the user types
4998    // `count()` directly, that is an error. Like PostgreSQL, we apply these
4999    // rules to all aggregates, not just `count`, since we may one day support
5000    // user-defined aggregates, including user-defined aggregates that take no
5001    // parameters.
5002    let (args, order_by) = match &args {
5003        FunctionArgs::Star => (vec![], vec![]),
5004        FunctionArgs::Args { args, order_by } => {
5005            if args.is_empty() {
5006                sql_bail!(
5007                    "{}(*) must be used to call a parameterless aggregate function",
5008                    ecx.qcx
5009                        .scx
5010                        .humanize_resolved_name(name)
5011                        .expect("name actually resolved")
5012                );
5013            }
5014            let args = plan_exprs(ecx, args)?;
5015            (args, order_by.clone())
5016        }
5017    };
5018
5019    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &order_by)?;
5020
5021    let (mut expr, func) = func::select_impl(ecx, FuncSpec::Func(name), impls, args, col_orders)?;
5022    if let Some(filter) = &filter {
5023        // If a filter is present, as in
5024        //
5025        //     <agg>(<expr>) FILTER (WHERE <cond>)
5026        //
5027        // we plan it by essentially rewriting the expression to
5028        //
5029        //     <agg>(CASE WHEN <cond> THEN <expr> ELSE <identity>)
5030        //
5031        // where <identity> is the identity input for <agg>.
5032        let cond = plan_expr(&ecx.with_name("FILTER"), filter)?.type_as(ecx, &ScalarType::Bool)?;
5033        let expr_typ = ecx.scalar_type(&expr);
5034        expr = HirScalarExpr::If {
5035            cond: Box::new(cond),
5036            then: Box::new(expr),
5037            els: Box::new(HirScalarExpr::literal(func.identity_datum(), expr_typ)),
5038        };
5039    }
5040
5041    let mut seen_outer = false;
5042    let mut seen_inner = false;
5043    #[allow(deprecated)]
5044    expr.visit_columns(0, &mut |depth, col| {
5045        if depth == 0 && col.level == 0 {
5046            seen_inner = true;
5047        } else if col.level > depth {
5048            seen_outer = true;
5049        }
5050    });
5051    if seen_outer && !seen_inner {
5052        bail_unsupported!(
5053            3720,
5054            "aggregate functions that refer exclusively to outer columns"
5055        );
5056    }
5057
5058    // If a function supports ORDER BY (even if there was no ORDER BY specified),
5059    // map the needed expressions into the aggregate datum.
5060    if func.is_order_sensitive() {
5061        let field_names = iter::repeat(ColumnName::from(""))
5062            .take(1 + order_by_exprs.len())
5063            .collect();
5064        let mut exprs = vec![expr];
5065        exprs.extend(order_by_exprs);
5066        expr = HirScalarExpr::CallVariadic {
5067            func: VariadicFunc::RecordCreate { field_names },
5068            exprs,
5069        };
5070    }
5071
5072    Ok(AggregateExpr {
5073        func,
5074        expr: Box::new(expr),
5075        distinct: *distinct,
5076    })
5077}
5078
5079fn plan_identifier(ecx: &ExprContext, names: &[Ident]) -> Result<HirScalarExpr, PlanError> {
5080    let mut names = names.to_vec();
5081    let col_name = normalize::column_name(names.pop().unwrap());
5082
5083    // If the name is qualified, it must refer to a column in a table.
5084    if !names.is_empty() {
5085        let table_name = normalize::unresolved_item_name(UnresolvedItemName(names))?;
5086        let i = ecx
5087            .scope
5088            .resolve_table_column(&ecx.qcx.outer_scopes, &table_name, &col_name)?;
5089        return Ok(HirScalarExpr::Column(i));
5090    }
5091
5092    // If the name is unqualified, first check if it refers to a column. Track any similar names
5093    // that might exist for a better error message.
5094    let similar_names = match ecx.scope.resolve_column(&ecx.qcx.outer_scopes, &col_name) {
5095        Ok(i) => return Ok(HirScalarExpr::Column(i)),
5096        Err(PlanError::UnknownColumn { similar, .. }) => similar,
5097        Err(e) => return Err(e),
5098    };
5099
5100    // The name doesn't refer to a column. Check if it is a whole-row reference
5101    // to a table.
5102    let items = ecx.scope.items_from_table(
5103        &ecx.qcx.outer_scopes,
5104        &PartialItemName {
5105            database: None,
5106            schema: None,
5107            item: col_name.as_str().to_owned(),
5108        },
5109    )?;
5110    match items.as_slice() {
5111        // The name doesn't refer to a table either. Return an error.
5112        [] => Err(PlanError::UnknownColumn {
5113            table: None,
5114            column: col_name,
5115            similar: similar_names,
5116        }),
5117        // The name refers to a table that is the result of a function that
5118        // returned a single column. Per PostgreSQL, this is a special case
5119        // that returns the value directly.
5120        // See: https://github.com/postgres/postgres/blob/22592e10b/src/backend/parser/parse_expr.c#L2519-L2524
5121        [(column, item)] if item.from_single_column_function => Ok(HirScalarExpr::Column(*column)),
5122        // The name refers to a normal table. Return a record containing all the
5123        // columns of the table.
5124        _ => {
5125            let mut has_exists_column = None;
5126            let (exprs, field_names): (Vec<_>, Vec<_>) = items
5127                .into_iter()
5128                .filter_map(|(column, item)| {
5129                    if item.is_exists_column_for_a_table_function_that_was_in_the_target_list {
5130                        has_exists_column = Some(column);
5131                        None
5132                    } else {
5133                        let expr = HirScalarExpr::Column(column);
5134                        let name = item.column_name.clone();
5135                        Some((expr, name))
5136                    }
5137                })
5138                .unzip();
5139            // For the special case of a table function with a single column, the single column is instead not wrapped.
5140            let expr = if exprs.len() == 1 && has_exists_column.is_some() {
5141                exprs.into_element()
5142            } else {
5143                HirScalarExpr::CallVariadic {
5144                    func: VariadicFunc::RecordCreate { field_names },
5145                    exprs,
5146                }
5147            };
5148            if let Some(has_exists_column) = has_exists_column {
5149                Ok(HirScalarExpr::If {
5150                    cond: Box::new(HirScalarExpr::CallUnary {
5151                        func: UnaryFunc::IsNull(mz_expr::func::IsNull),
5152                        expr: Box::new(HirScalarExpr::Column(has_exists_column)),
5153                    }),
5154                    then: Box::new(HirScalarExpr::literal_null(ecx.scalar_type(&expr))),
5155                    els: Box::new(expr),
5156                })
5157            } else {
5158                Ok(expr)
5159            }
5160        }
5161    }
5162}
5163
5164fn plan_op(
5165    ecx: &ExprContext,
5166    op: &str,
5167    expr1: &Expr<Aug>,
5168    expr2: Option<&Expr<Aug>>,
5169) -> Result<HirScalarExpr, PlanError> {
5170    let impls = func::resolve_op(op)?;
5171    let args = match expr2 {
5172        None => plan_exprs(ecx, &[expr1])?,
5173        Some(expr2) => plan_exprs(ecx, &[expr1, expr2])?,
5174    };
5175    func::select_impl(ecx, FuncSpec::Op(op), impls, args, vec![])
5176}
5177
5178fn plan_function<'a>(
5179    ecx: &ExprContext,
5180    f @ Function {
5181        name,
5182        args,
5183        filter,
5184        over,
5185        distinct,
5186    }: &'a Function<Aug>,
5187) -> Result<HirScalarExpr, PlanError> {
5188    let impls = match resolve_func(ecx, name, args)? {
5189        Func::Table(_) => {
5190            sql_bail!(
5191                "table functions are not allowed in {} (function {})",
5192                ecx.name,
5193                name
5194            );
5195        }
5196        Func::Scalar(impls) => {
5197            if over.is_some() {
5198                sql_bail!(
5199                    "OVER clause not allowed on {name}. The OVER clause can only be used with window functions (including aggregations)."
5200                );
5201            }
5202            impls
5203        }
5204        Func::ScalarWindow(impls) => {
5205            let (
5206                ignore_nulls,
5207                order_by_exprs,
5208                col_orders,
5209                _window_frame,
5210                partition_by,
5211                scalar_args,
5212            ) = plan_window_function_non_aggr(ecx, f)?;
5213
5214            // All scalar window functions have 0 parameters. Let's print a nice error msg if the
5215            // user gave some args. (The below `func::select_impl` would fail anyway, but the error
5216            // msg there is less informative.)
5217            if !scalar_args.is_empty() {
5218                if let ResolvedItemName::Item {
5219                    full_name: FullItemName { item, .. },
5220                    ..
5221                } = name
5222                {
5223                    sql_bail!(
5224                        "function {} has 0 parameters, but was called with {}",
5225                        item,
5226                        scalar_args.len()
5227                    );
5228                }
5229            }
5230
5231            // Note: the window frame doesn't affect scalar window funcs, but, strangely, we should
5232            // accept a window frame here without an error msg. (Postgres also does this.)
5233            // TODO: maybe we should give a notice
5234
5235            let func = func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5236
5237            if ignore_nulls {
5238                // If we ever add a scalar window function that supports ignore, then don't forget
5239                // to also update HIR EXPLAIN.
5240                bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5241            }
5242
5243            return Ok(HirScalarExpr::Windowing(WindowExpr {
5244                func: WindowExprType::Scalar(ScalarWindowExpr {
5245                    func,
5246                    order_by: col_orders,
5247                }),
5248                partition_by,
5249                order_by: order_by_exprs,
5250            }));
5251        }
5252        Func::ValueWindow(impls) => {
5253            let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by, scalar_args) =
5254                plan_window_function_non_aggr(ecx, f)?;
5255
5256            let (args_encoded, func) =
5257                func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])?;
5258
5259            if ignore_nulls {
5260                match func {
5261                    ValueWindowFunc::Lag | ValueWindowFunc::Lead => {}
5262                    _ => bail_unsupported!(IGNORE_NULLS_ERROR_MSG),
5263                }
5264            }
5265
5266            return Ok(HirScalarExpr::Windowing(WindowExpr {
5267                func: WindowExprType::Value(ValueWindowExpr {
5268                    func,
5269                    args: Box::new(args_encoded),
5270                    order_by: col_orders,
5271                    window_frame,
5272                    ignore_nulls, // (RESPECT NULLS is the default)
5273                }),
5274                partition_by,
5275                order_by: order_by_exprs,
5276            }));
5277        }
5278        Func::Aggregate(_) => {
5279            if f.over.is_none() {
5280                // Not a window aggregate. Something is wrong.
5281                if ecx.allow_aggregates {
5282                    // Should already have been caught by `scope.resolve_expr` in `plan_expr_inner`
5283                    // (after having been planned earlier in `Step 5` of `plan_select_from_where`).
5284                    sql_bail!(
5285                        "Internal error: encountered unplanned non-windowed aggregate function: {:?}",
5286                        name,
5287                    );
5288                } else {
5289                    // scope.resolve_expr didn't catch it because we have not yet planned it,
5290                    // because it was in an unsupported context.
5291                    sql_bail!(
5292                        "aggregate functions are not allowed in {} (function {})",
5293                        ecx.name,
5294                        name
5295                    );
5296                }
5297            } else {
5298                let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition_by) =
5299                    plan_window_function_common(ecx, &f.name, &f.over)?;
5300
5301                // https://github.com/MaterializeInc/database-issues/issues/6720
5302                match (&window_frame.start_bound, &window_frame.end_bound) {
5303                    (
5304                        mz_expr::WindowFrameBound::UnboundedPreceding,
5305                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5306                    )
5307                    | (
5308                        mz_expr::WindowFrameBound::UnboundedPreceding,
5309                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5310                    )
5311                    | (
5312                        mz_expr::WindowFrameBound::OffsetPreceding(..),
5313                        mz_expr::WindowFrameBound::UnboundedFollowing,
5314                    )
5315                    | (
5316                        mz_expr::WindowFrameBound::OffsetFollowing(..),
5317                        mz_expr::WindowFrameBound::UnboundedFollowing,
5318                    ) => bail_unsupported!("mixed unbounded - offset frames"),
5319                    (_, _) => {} // other cases are ok
5320                }
5321
5322                if ignore_nulls {
5323                    // https://github.com/MaterializeInc/database-issues/issues/6722
5324                    // If we ever add support for ignore_nulls for a window aggregate, then don't
5325                    // forget to also update HIR EXPLAIN.
5326                    bail_unsupported!(IGNORE_NULLS_ERROR_MSG);
5327                }
5328
5329                let aggregate_expr = plan_aggregate_common(ecx, f)?;
5330
5331                if aggregate_expr.distinct {
5332                    // https://github.com/MaterializeInc/database-issues/issues/6626
5333                    bail_unsupported!("DISTINCT in window aggregates");
5334                }
5335
5336                return Ok(HirScalarExpr::Windowing(WindowExpr {
5337                    func: WindowExprType::Aggregate(AggregateWindowExpr {
5338                        aggregate_expr,
5339                        order_by: col_orders,
5340                        window_frame,
5341                    }),
5342                    partition_by,
5343                    order_by: order_by_exprs,
5344                }));
5345            }
5346        }
5347    };
5348
5349    if over.is_some() {
5350        unreachable!("If there is an OVER clause, we should have returned already above.");
5351    }
5352
5353    if *distinct {
5354        sql_bail!(
5355            "DISTINCT specified, but {} is not an aggregate function",
5356            ecx.qcx
5357                .scx
5358                .humanize_resolved_name(name)
5359                .expect("already resolved")
5360        );
5361    }
5362    if filter.is_some() {
5363        sql_bail!(
5364            "FILTER specified, but {} is not an aggregate function",
5365            ecx.qcx
5366                .scx
5367                .humanize_resolved_name(name)
5368                .expect("already resolved")
5369        );
5370    }
5371
5372    let scalar_args = match &args {
5373        FunctionArgs::Star => {
5374            sql_bail!(
5375                "* argument is invalid with non-aggregate function {}",
5376                ecx.qcx
5377                    .scx
5378                    .humanize_resolved_name(name)
5379                    .expect("already resolved")
5380            )
5381        }
5382        FunctionArgs::Args { args, order_by } => {
5383            if !order_by.is_empty() {
5384                sql_bail!(
5385                    "ORDER BY specified, but {} is not an aggregate function",
5386                    ecx.qcx
5387                        .scx
5388                        .humanize_resolved_name(name)
5389                        .expect("already resolved")
5390                );
5391            }
5392            plan_exprs(ecx, args)?
5393        }
5394    };
5395
5396    func::select_impl(ecx, FuncSpec::Func(name), impls, scalar_args, vec![])
5397}
5398
5399pub const IGNORE_NULLS_ERROR_MSG: &str =
5400    "IGNORE NULLS and RESPECT NULLS options for functions other than LAG and LEAD";
5401
5402/// Resolves the name to a set of function implementations.
5403///
5404/// If the name does not specify a known built-in function, returns an error.
5405pub fn resolve_func(
5406    ecx: &ExprContext,
5407    name: &ResolvedItemName,
5408    args: &mz_sql_parser::ast::FunctionArgs<Aug>,
5409) -> Result<&'static Func, PlanError> {
5410    if let Ok(i) = ecx.qcx.scx.get_item_by_resolved_name(name) {
5411        if let Ok(f) = i.func() {
5412            return Ok(f);
5413        }
5414    }
5415
5416    // Couldn't resolve function with this name, so generate verbose error
5417    // message.
5418    let cexprs = match args {
5419        mz_sql_parser::ast::FunctionArgs::Star => vec![],
5420        mz_sql_parser::ast::FunctionArgs::Args { args, order_by } => {
5421            if !order_by.is_empty() {
5422                sql_bail!(
5423                    "ORDER BY specified, but {} is not an aggregate function",
5424                    name
5425                );
5426            }
5427            plan_exprs(ecx, args)?
5428        }
5429    };
5430
5431    let arg_types: Vec<_> = cexprs
5432        .into_iter()
5433        .map(|ty| match ecx.scalar_type(&ty) {
5434            CoercibleScalarType::Coerced(ty) => ecx.humanize_scalar_type(&ty, false),
5435            CoercibleScalarType::Record(_) => "record".to_string(),
5436            CoercibleScalarType::Uncoerced => "unknown".to_string(),
5437        })
5438        .collect();
5439
5440    Err(PlanError::UnknownFunction {
5441        name: name.to_string(),
5442        arg_types,
5443    })
5444}
5445
5446fn plan_is_expr<'a>(
5447    ecx: &ExprContext,
5448    expr: &'a Expr<Aug>,
5449    construct: &IsExprConstruct<Aug>,
5450    not: bool,
5451) -> Result<HirScalarExpr, PlanError> {
5452    let expr = plan_expr(ecx, expr)?;
5453    let mut expr = match construct {
5454        IsExprConstruct::Null => {
5455            // PostgreSQL can plan `NULL IS NULL` but not `$1 IS NULL`. This is
5456            // at odds with our type coercion rules, which treat `NULL` literals
5457            // and unconstrained parameters identically. Providing a type hint
5458            // of string means we wind up supporting both.
5459            let expr = expr.type_as_any(ecx)?;
5460            expr.call_is_null()
5461        }
5462        IsExprConstruct::Unknown => {
5463            let expr = expr.type_as(ecx, &ScalarType::Bool)?;
5464            expr.call_is_null()
5465        }
5466        IsExprConstruct::True => {
5467            let expr = expr.type_as(ecx, &ScalarType::Bool)?;
5468            expr.call_unary(UnaryFunc::IsTrue(expr_func::IsTrue))
5469        }
5470        IsExprConstruct::False => {
5471            let expr = expr.type_as(ecx, &ScalarType::Bool)?;
5472            expr.call_unary(UnaryFunc::IsFalse(expr_func::IsFalse))
5473        }
5474        IsExprConstruct::DistinctFrom(expr2) => {
5475            let expr1 = expr.type_as_any(ecx)?;
5476            let expr2 = plan_expr(ecx, expr2)?.type_as_any(ecx)?;
5477            // There are three cases:
5478            // 1. Both terms are non-null, in which case the result should be `a != b`.
5479            // 2. Exactly one term is null, in which case the result should be true.
5480            // 3. Both terms are null, in which case the result should be false.
5481            //
5482            // (a != b OR a IS NULL OR b IS NULL) AND (a IS NOT NULL OR b IS NOT NULL)
5483            let term1 = HirScalarExpr::variadic_or(vec![
5484                expr1.clone().call_binary(expr2.clone(), BinaryFunc::NotEq),
5485                expr1.clone().call_is_null(),
5486                expr2.clone().call_is_null(),
5487            ]);
5488            let term2 = HirScalarExpr::variadic_or(vec![
5489                expr1.call_is_null().not(),
5490                expr2.call_is_null().not(),
5491            ]);
5492            term1.and(term2)
5493        }
5494    };
5495    if not {
5496        expr = expr.not();
5497    }
5498    Ok(expr)
5499}
5500
5501fn plan_case<'a>(
5502    ecx: &ExprContext,
5503    operand: &'a Option<Box<Expr<Aug>>>,
5504    conditions: &'a [Expr<Aug>],
5505    results: &'a [Expr<Aug>],
5506    else_result: &'a Option<Box<Expr<Aug>>>,
5507) -> Result<HirScalarExpr, PlanError> {
5508    let mut cond_exprs = Vec::new();
5509    let mut result_exprs = Vec::new();
5510    for (c, r) in conditions.iter().zip(results) {
5511        let c = match operand {
5512            Some(operand) => operand.clone().equals(c.clone()),
5513            None => c.clone(),
5514        };
5515        let cexpr = plan_expr(ecx, &c)?.type_as(ecx, &ScalarType::Bool)?;
5516        cond_exprs.push(cexpr);
5517        result_exprs.push(r);
5518    }
5519    result_exprs.push(match else_result {
5520        Some(else_result) => else_result,
5521        None => &Expr::Value(Value::Null),
5522    });
5523    let mut result_exprs = coerce_homogeneous_exprs(
5524        &ecx.with_name("CASE"),
5525        plan_exprs(ecx, &result_exprs)?,
5526        None,
5527    )?;
5528    let mut expr = result_exprs.pop().unwrap();
5529    assert_eq!(cond_exprs.len(), result_exprs.len());
5530    for (cexpr, rexpr) in cond_exprs.into_iter().zip(result_exprs).rev() {
5531        expr = HirScalarExpr::If {
5532            cond: Box::new(cexpr),
5533            then: Box::new(rexpr),
5534            els: Box::new(expr),
5535        }
5536    }
5537    Ok(expr)
5538}
5539
5540fn plan_literal<'a>(l: &'a Value) -> Result<CoercibleScalarExpr, PlanError> {
5541    let (datum, scalar_type) = match l {
5542        Value::Number(s) => {
5543            let d = strconv::parse_numeric(s.as_str())?;
5544            if !s.contains(&['E', '.'][..]) {
5545                // Maybe representable as an int?
5546                if let Ok(n) = d.0.try_into() {
5547                    (Datum::Int32(n), ScalarType::Int32)
5548                } else if let Ok(n) = d.0.try_into() {
5549                    (Datum::Int64(n), ScalarType::Int64)
5550                } else {
5551                    (Datum::Numeric(d), ScalarType::Numeric { max_scale: None })
5552                }
5553            } else {
5554                (Datum::Numeric(d), ScalarType::Numeric { max_scale: None })
5555            }
5556        }
5557        Value::HexString(_) => bail_unsupported!("hex string literals"),
5558        Value::Boolean(b) => match b {
5559            false => (Datum::False, ScalarType::Bool),
5560            true => (Datum::True, ScalarType::Bool),
5561        },
5562        Value::Interval(i) => {
5563            let i = literal::plan_interval(i)?;
5564            (Datum::Interval(i), ScalarType::Interval)
5565        }
5566        Value::String(s) => return Ok(CoercibleScalarExpr::LiteralString(s.clone())),
5567        Value::Null => return Ok(CoercibleScalarExpr::LiteralNull),
5568    };
5569    let expr = HirScalarExpr::literal(datum, scalar_type);
5570    Ok(expr.into())
5571}
5572
5573/// The common part of the planning of non-aggregate window functions, i.e.,
5574/// scalar window functions and value window functions.
5575fn plan_window_function_non_aggr<'a>(
5576    ecx: &ExprContext,
5577    Function {
5578        name,
5579        args,
5580        filter,
5581        over,
5582        distinct,
5583    }: &'a Function<Aug>,
5584) -> Result<
5585    (
5586        bool,
5587        Vec<HirScalarExpr>,
5588        Vec<ColumnOrder>,
5589        mz_expr::WindowFrame,
5590        Vec<HirScalarExpr>,
5591        Vec<CoercibleScalarExpr>,
5592    ),
5593    PlanError,
5594> {
5595    let (ignore_nulls, order_by_exprs, col_orders, window_frame, partition) =
5596        plan_window_function_common(ecx, name, over)?;
5597
5598    if *distinct {
5599        sql_bail!(
5600            "DISTINCT specified, but {} is not an aggregate function",
5601            name
5602        );
5603    }
5604
5605    if filter.is_some() {
5606        bail_unsupported!("FILTER in non-aggregate window functions");
5607    }
5608
5609    let scalar_args = match &args {
5610        FunctionArgs::Star => {
5611            sql_bail!("* argument is invalid with non-aggregate function {}", name)
5612        }
5613        FunctionArgs::Args { args, order_by } => {
5614            if !order_by.is_empty() {
5615                sql_bail!(
5616                    "ORDER BY specified, but {} is not an aggregate function",
5617                    name
5618                );
5619            }
5620            plan_exprs(ecx, args)?
5621        }
5622    };
5623
5624    Ok((
5625        ignore_nulls,
5626        order_by_exprs,
5627        col_orders,
5628        window_frame,
5629        partition,
5630        scalar_args,
5631    ))
5632}
5633
5634/// The common part of the planning of all window functions.
5635fn plan_window_function_common(
5636    ecx: &ExprContext,
5637    name: &<Aug as AstInfo>::ItemName,
5638    over: &Option<WindowSpec<Aug>>,
5639) -> Result<
5640    (
5641        bool,
5642        Vec<HirScalarExpr>,
5643        Vec<ColumnOrder>,
5644        mz_expr::WindowFrame,
5645        Vec<HirScalarExpr>,
5646    ),
5647    PlanError,
5648> {
5649    if !ecx.allow_windows {
5650        sql_bail!(
5651            "window functions are not allowed in {} (function {})",
5652            ecx.name,
5653            name
5654        );
5655    }
5656
5657    let window_spec = match over.as_ref() {
5658        Some(over) => over,
5659        None => sql_bail!("window function {} requires an OVER clause", name),
5660    };
5661    if window_spec.ignore_nulls && window_spec.respect_nulls {
5662        sql_bail!("Both IGNORE NULLS and RESPECT NULLS were given.");
5663    }
5664    let window_frame = match window_spec.window_frame.as_ref() {
5665        Some(frame) => plan_window_frame(frame)?,
5666        None => mz_expr::WindowFrame::default(),
5667    };
5668    let mut partition = Vec::new();
5669    for expr in &window_spec.partition_by {
5670        partition.push(plan_expr(ecx, expr)?.type_as_any(ecx)?);
5671    }
5672
5673    let (order_by_exprs, col_orders) = plan_function_order_by(ecx, &window_spec.order_by)?;
5674
5675    Ok((
5676        window_spec.ignore_nulls,
5677        order_by_exprs,
5678        col_orders,
5679        window_frame,
5680        partition,
5681    ))
5682}
5683
5684fn plan_window_frame(
5685    WindowFrame {
5686        units,
5687        start_bound,
5688        end_bound,
5689    }: &WindowFrame,
5690) -> Result<mz_expr::WindowFrame, PlanError> {
5691    use mz_expr::WindowFrameBound::*;
5692    let units = window_frame_unit_ast_to_expr(units)?;
5693    let start_bound = window_frame_bound_ast_to_expr(start_bound);
5694    let end_bound = end_bound
5695        .as_ref()
5696        .map(window_frame_bound_ast_to_expr)
5697        .unwrap_or(CurrentRow);
5698
5699    // Validate bounds according to Postgres rules
5700    match (&start_bound, &end_bound) {
5701        // Start bound can't be UNBOUNDED FOLLOWING
5702        (UnboundedFollowing, _) => {
5703            sql_bail!("frame start cannot be UNBOUNDED FOLLOWING")
5704        }
5705        // End bound can't be UNBOUNDED PRECEDING
5706        (_, UnboundedPreceding) => {
5707            sql_bail!("frame end cannot be UNBOUNDED PRECEDING")
5708        }
5709        // Start bound should come before end bound in the list of bound definitions
5710        (CurrentRow, OffsetPreceding(_)) => {
5711            sql_bail!("frame starting from current row cannot have preceding rows")
5712        }
5713        (OffsetFollowing(_), OffsetPreceding(_) | CurrentRow) => {
5714            sql_bail!("frame starting from following row cannot have preceding rows")
5715        }
5716        // The above rules are adopted from Postgres.
5717        // The following rules are Materialize-specific.
5718        (OffsetPreceding(o1), OffsetFollowing(o2)) => {
5719            // Note that the only hard limit is that partition size + offset should fit in i64, so
5720            // in theory, we could support much larger offsets than this. But for our current
5721            // performance, even 1000000 is quite big.
5722            if *o1 > 1000000 || *o2 > 1000000 {
5723                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5724            }
5725        }
5726        (OffsetPreceding(o1), OffsetPreceding(o2)) => {
5727            if *o1 > 1000000 || *o2 > 1000000 {
5728                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5729            }
5730        }
5731        (OffsetFollowing(o1), OffsetFollowing(o2)) => {
5732            if *o1 > 1000000 || *o2 > 1000000 {
5733                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5734            }
5735        }
5736        (OffsetPreceding(o), CurrentRow) => {
5737            if *o > 1000000 {
5738                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5739            }
5740        }
5741        (CurrentRow, OffsetFollowing(o)) => {
5742            if *o > 1000000 {
5743                sql_bail!("Window frame offsets greater than 1000000 are currently not supported")
5744            }
5745        }
5746        // Other bounds are valid
5747        (_, _) => (),
5748    }
5749
5750    // RANGE is only supported in the default frame
5751    // https://github.com/MaterializeInc/database-issues/issues/6585
5752    if units == mz_expr::WindowFrameUnits::Range
5753        && (start_bound != UnboundedPreceding || end_bound != CurrentRow)
5754    {
5755        bail_unsupported!("RANGE in non-default window frames")
5756    }
5757
5758    let frame = mz_expr::WindowFrame {
5759        units,
5760        start_bound,
5761        end_bound,
5762    };
5763    Ok(frame)
5764}
5765
5766fn window_frame_unit_ast_to_expr(
5767    unit: &WindowFrameUnits,
5768) -> Result<mz_expr::WindowFrameUnits, PlanError> {
5769    match unit {
5770        WindowFrameUnits::Rows => Ok(mz_expr::WindowFrameUnits::Rows),
5771        WindowFrameUnits::Range => Ok(mz_expr::WindowFrameUnits::Range),
5772        WindowFrameUnits::Groups => bail_unsupported!("GROUPS in window frames"),
5773    }
5774}
5775
5776fn window_frame_bound_ast_to_expr(bound: &WindowFrameBound) -> mz_expr::WindowFrameBound {
5777    match bound {
5778        WindowFrameBound::CurrentRow => mz_expr::WindowFrameBound::CurrentRow,
5779        WindowFrameBound::Preceding(None) => mz_expr::WindowFrameBound::UnboundedPreceding,
5780        WindowFrameBound::Preceding(Some(offset)) => {
5781            mz_expr::WindowFrameBound::OffsetPreceding(*offset)
5782        }
5783        WindowFrameBound::Following(None) => mz_expr::WindowFrameBound::UnboundedFollowing,
5784        WindowFrameBound::Following(Some(offset)) => {
5785            mz_expr::WindowFrameBound::OffsetFollowing(*offset)
5786        }
5787    }
5788}
5789
5790pub fn scalar_type_from_sql(
5791    scx: &StatementContext,
5792    data_type: &ResolvedDataType,
5793) -> Result<ScalarType, PlanError> {
5794    match data_type {
5795        ResolvedDataType::AnonymousList(elem_type) => {
5796            let elem_type = scalar_type_from_sql(scx, elem_type)?;
5797            if matches!(elem_type, ScalarType::Char { .. }) {
5798                bail_unsupported!("char list");
5799            }
5800            Ok(ScalarType::List {
5801                element_type: Box::new(elem_type),
5802                custom_id: None,
5803            })
5804        }
5805        ResolvedDataType::AnonymousMap {
5806            key_type,
5807            value_type,
5808        } => {
5809            match scalar_type_from_sql(scx, key_type)? {
5810                ScalarType::String => {}
5811                other => sql_bail!(
5812                    "map key type must be {}, got {}",
5813                    scx.humanize_scalar_type(&ScalarType::String, false),
5814                    scx.humanize_scalar_type(&other, false)
5815                ),
5816            }
5817            Ok(ScalarType::Map {
5818                value_type: Box::new(scalar_type_from_sql(scx, value_type)?),
5819                custom_id: None,
5820            })
5821        }
5822        ResolvedDataType::Named { id, modifiers, .. } => {
5823            scalar_type_from_catalog(scx.catalog, *id, modifiers)
5824        }
5825        ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
5826    }
5827}
5828
5829pub fn scalar_type_from_catalog(
5830    catalog: &dyn SessionCatalog,
5831    id: CatalogItemId,
5832    modifiers: &[i64],
5833) -> Result<ScalarType, PlanError> {
5834    let entry = catalog.get_item(&id);
5835    let type_details = match entry.type_details() {
5836        Some(type_details) => type_details,
5837        None => {
5838            // Resolution should never produce a `ResolvedDataType::Named` with
5839            // an ID of a non-type, but we error gracefully just in case.
5840            sql_bail!(
5841                "internal error: {} does not refer to a type",
5842                catalog.resolve_full_name(entry.name()).to_string().quoted()
5843            );
5844        }
5845    };
5846    match &type_details.typ {
5847        CatalogType::Numeric => {
5848            let mut modifiers = modifiers.iter().fuse();
5849            let precision = match modifiers.next() {
5850                Some(p) if *p < 1 || *p > i64::from(NUMERIC_DATUM_MAX_PRECISION) => {
5851                    sql_bail!(
5852                        "precision for type numeric must be between 1 and {}",
5853                        NUMERIC_DATUM_MAX_PRECISION,
5854                    );
5855                }
5856                Some(p) => Some(*p),
5857                None => None,
5858            };
5859            let scale = match modifiers.next() {
5860                Some(scale) => {
5861                    if let Some(precision) = precision {
5862                        if *scale > precision {
5863                            sql_bail!(
5864                                "scale for type numeric must be between 0 and precision {}",
5865                                precision
5866                            );
5867                        }
5868                    }
5869                    Some(NumericMaxScale::try_from(*scale)?)
5870                }
5871                None => None,
5872            };
5873            if modifiers.next().is_some() {
5874                sql_bail!("type numeric supports at most two type modifiers");
5875            }
5876            Ok(ScalarType::Numeric { max_scale: scale })
5877        }
5878        CatalogType::Char => {
5879            let mut modifiers = modifiers.iter().fuse();
5880            let length = match modifiers.next() {
5881                Some(l) => Some(CharLength::try_from(*l)?),
5882                None => Some(CharLength::ONE),
5883            };
5884            if modifiers.next().is_some() {
5885                sql_bail!("type character supports at most one type modifier");
5886            }
5887            Ok(ScalarType::Char { length })
5888        }
5889        CatalogType::VarChar => {
5890            let mut modifiers = modifiers.iter().fuse();
5891            let length = match modifiers.next() {
5892                Some(l) => Some(VarCharMaxLength::try_from(*l)?),
5893                None => None,
5894            };
5895            if modifiers.next().is_some() {
5896                sql_bail!("type character varying supports at most one type modifier");
5897            }
5898            Ok(ScalarType::VarChar { max_length: length })
5899        }
5900        CatalogType::Timestamp => {
5901            let mut modifiers = modifiers.iter().fuse();
5902            let precision = match modifiers.next() {
5903                Some(p) => Some(TimestampPrecision::try_from(*p)?),
5904                None => None,
5905            };
5906            if modifiers.next().is_some() {
5907                sql_bail!("type timestamp supports at most one type modifier");
5908            }
5909            Ok(ScalarType::Timestamp { precision })
5910        }
5911        CatalogType::TimestampTz => {
5912            let mut modifiers = modifiers.iter().fuse();
5913            let precision = match modifiers.next() {
5914                Some(p) => Some(TimestampPrecision::try_from(*p)?),
5915                None => None,
5916            };
5917            if modifiers.next().is_some() {
5918                sql_bail!("type timestamp with time zone supports at most one type modifier");
5919            }
5920            Ok(ScalarType::TimestampTz { precision })
5921        }
5922        t => {
5923            if !modifiers.is_empty() {
5924                sql_bail!(
5925                    "{} does not support type modifiers",
5926                    catalog.resolve_full_name(entry.name()).to_string()
5927                );
5928            }
5929            match t {
5930                CatalogType::Array {
5931                    element_reference: element_id,
5932                } => Ok(ScalarType::Array(Box::new(scalar_type_from_catalog(
5933                    catalog,
5934                    *element_id,
5935                    modifiers,
5936                )?))),
5937                CatalogType::List {
5938                    element_reference: element_id,
5939                    element_modifiers,
5940                } => Ok(ScalarType::List {
5941                    element_type: Box::new(scalar_type_from_catalog(
5942                        catalog,
5943                        *element_id,
5944                        element_modifiers,
5945                    )?),
5946                    custom_id: Some(id),
5947                }),
5948                CatalogType::Map {
5949                    key_reference: _,
5950                    key_modifiers: _,
5951                    value_reference: value_id,
5952                    value_modifiers,
5953                } => Ok(ScalarType::Map {
5954                    value_type: Box::new(scalar_type_from_catalog(
5955                        catalog,
5956                        *value_id,
5957                        value_modifiers,
5958                    )?),
5959                    custom_id: Some(id),
5960                }),
5961                CatalogType::Range {
5962                    element_reference: element_id,
5963                } => Ok(ScalarType::Range {
5964                    element_type: Box::new(scalar_type_from_catalog(catalog, *element_id, &[])?),
5965                }),
5966                CatalogType::Record { fields } => {
5967                    let scalars: Box<[(ColumnName, ColumnType)]> = fields
5968                        .iter()
5969                        .map(|f| {
5970                            let scalar_type = scalar_type_from_catalog(
5971                                catalog,
5972                                f.type_reference,
5973                                &f.type_modifiers,
5974                            )?;
5975                            Ok((
5976                                f.name.clone(),
5977                                ColumnType {
5978                                    scalar_type,
5979                                    nullable: true,
5980                                },
5981                            ))
5982                        })
5983                        .collect::<Result<Box<_>, PlanError>>()?;
5984                    Ok(ScalarType::Record {
5985                        fields: scalars,
5986                        custom_id: Some(id),
5987                    })
5988                }
5989                CatalogType::AclItem => Ok(ScalarType::AclItem),
5990                CatalogType::Bool => Ok(ScalarType::Bool),
5991                CatalogType::Bytes => Ok(ScalarType::Bytes),
5992                CatalogType::Date => Ok(ScalarType::Date),
5993                CatalogType::Float32 => Ok(ScalarType::Float32),
5994                CatalogType::Float64 => Ok(ScalarType::Float64),
5995                CatalogType::Int16 => Ok(ScalarType::Int16),
5996                CatalogType::Int32 => Ok(ScalarType::Int32),
5997                CatalogType::Int64 => Ok(ScalarType::Int64),
5998                CatalogType::UInt16 => Ok(ScalarType::UInt16),
5999                CatalogType::UInt32 => Ok(ScalarType::UInt32),
6000                CatalogType::UInt64 => Ok(ScalarType::UInt64),
6001                CatalogType::MzTimestamp => Ok(ScalarType::MzTimestamp),
6002                CatalogType::Interval => Ok(ScalarType::Interval),
6003                CatalogType::Jsonb => Ok(ScalarType::Jsonb),
6004                CatalogType::Oid => Ok(ScalarType::Oid),
6005                CatalogType::PgLegacyChar => Ok(ScalarType::PgLegacyChar),
6006                CatalogType::PgLegacyName => Ok(ScalarType::PgLegacyName),
6007                CatalogType::Pseudo => {
6008                    sql_bail!(
6009                        "cannot reference pseudo type {}",
6010                        catalog.resolve_full_name(entry.name()).to_string()
6011                    )
6012                }
6013                CatalogType::RegClass => Ok(ScalarType::RegClass),
6014                CatalogType::RegProc => Ok(ScalarType::RegProc),
6015                CatalogType::RegType => Ok(ScalarType::RegType),
6016                CatalogType::String => Ok(ScalarType::String),
6017                CatalogType::Time => Ok(ScalarType::Time),
6018                CatalogType::Uuid => Ok(ScalarType::Uuid),
6019                CatalogType::Int2Vector => Ok(ScalarType::Int2Vector),
6020                CatalogType::MzAclItem => Ok(ScalarType::MzAclItem),
6021                CatalogType::Numeric => unreachable!("handled above"),
6022                CatalogType::Char => unreachable!("handled above"),
6023                CatalogType::VarChar => unreachable!("handled above"),
6024                CatalogType::Timestamp => unreachable!("handled above"),
6025                CatalogType::TimestampTz => unreachable!("handled above"),
6026            }
6027        }
6028    }
6029}
6030
6031/// This is used to collect aggregates and table functions from within an `Expr`.
6032/// See the explanation of aggregate handling at the top of the file for more details.
6033struct AggregateTableFuncVisitor<'a> {
6034    scx: &'a StatementContext<'a>,
6035    aggs: Vec<Function<Aug>>,
6036    within_aggregate: bool,
6037    tables: BTreeMap<Function<Aug>, String>,
6038    table_disallowed_context: Vec<&'static str>,
6039    in_select_item: bool,
6040    err: Option<PlanError>,
6041}
6042
6043impl<'a> AggregateTableFuncVisitor<'a> {
6044    fn new(scx: &'a StatementContext<'a>) -> AggregateTableFuncVisitor<'a> {
6045        AggregateTableFuncVisitor {
6046            scx,
6047            aggs: Vec::new(),
6048            within_aggregate: false,
6049            tables: BTreeMap::new(),
6050            table_disallowed_context: Vec::new(),
6051            in_select_item: false,
6052            err: None,
6053        }
6054    }
6055
6056    fn into_result(
6057        self,
6058    ) -> Result<(Vec<Function<Aug>>, BTreeMap<Function<Aug>, String>), PlanError> {
6059        match self.err {
6060            Some(err) => Err(err),
6061            None => {
6062                // Dedup while preserving the order. We don't care what the order is, but it
6063                // has to be reproducible so that EXPLAIN PLAN tests work.
6064                let mut seen = BTreeSet::new();
6065                let aggs = self
6066                    .aggs
6067                    .into_iter()
6068                    .filter(move |agg| seen.insert(agg.clone()))
6069                    .collect();
6070                Ok((aggs, self.tables))
6071            }
6072        }
6073    }
6074}
6075
6076impl<'a> VisitMut<'_, Aug> for AggregateTableFuncVisitor<'a> {
6077    fn visit_function_mut(&mut self, func: &mut Function<Aug>) {
6078        let item = match self.scx.get_item_by_resolved_name(&func.name) {
6079            Ok(i) => i,
6080            // Catching missing functions later in planning improves error messages.
6081            Err(_) => return,
6082        };
6083
6084        match item.func() {
6085            // We don't want to collect window aggregations, because these will be handled not by
6086            // plan_aggregate, but by plan_function.
6087            Ok(Func::Aggregate { .. }) if func.over.is_none() => {
6088                if self.within_aggregate {
6089                    self.err = Some(sql_err!("nested aggregate functions are not allowed",));
6090                    return;
6091                }
6092                self.aggs.push(func.clone());
6093                let Function {
6094                    name: _,
6095                    args,
6096                    filter,
6097                    over: _,
6098                    distinct: _,
6099                } = func;
6100                if let Some(filter) = filter {
6101                    self.visit_expr_mut(filter);
6102                }
6103                let old_within_aggregate = self.within_aggregate;
6104                self.within_aggregate = true;
6105                self.table_disallowed_context
6106                    .push("aggregate function calls");
6107
6108                self.visit_function_args_mut(args);
6109
6110                self.within_aggregate = old_within_aggregate;
6111                self.table_disallowed_context.pop();
6112            }
6113            Ok(Func::Table { .. }) => {
6114                self.table_disallowed_context.push("other table functions");
6115                visit_mut::visit_function_mut(self, func);
6116                self.table_disallowed_context.pop();
6117            }
6118            _ => visit_mut::visit_function_mut(self, func),
6119        }
6120    }
6121
6122    fn visit_query_mut(&mut self, _query: &mut Query<Aug>) {
6123        // Don't go into subqueries.
6124    }
6125
6126    fn visit_expr_mut(&mut self, expr: &mut Expr<Aug>) {
6127        let (disallowed_context, func) = match expr {
6128            Expr::Case { .. } => (Some("CASE"), None),
6129            Expr::HomogenizingFunction {
6130                function: HomogenizingFunction::Coalesce,
6131                ..
6132            } => (Some("COALESCE"), None),
6133            Expr::Function(func) if self.in_select_item => {
6134                // If we're in a SELECT list, replace table functions with a uuid identifier
6135                // and save the table func so it can be planned elsewhere.
6136                let mut table_func = None;
6137                if let Ok(item) = self.scx.get_item_by_resolved_name(&func.name) {
6138                    if let Ok(Func::Table { .. }) = item.func() {
6139                        if let Some(context) = self.table_disallowed_context.last() {
6140                            self.err = Some(sql_err!(
6141                                "table functions are not allowed in {} (function {})",
6142                                context,
6143                                func.name
6144                            ));
6145                            return;
6146                        }
6147                        table_func = Some(func.clone());
6148                    }
6149                }
6150                // Since we will descend into the table func below, don't add its own disallow
6151                // context here, instead use visit_function to set that.
6152                (None, table_func)
6153            }
6154            _ => (None, None),
6155        };
6156        if let Some(func) = func {
6157            // Since we are trading out expr, we need to visit the table func here.
6158            visit_mut::visit_expr_mut(self, expr);
6159            // Don't attempt to replace table functions with unsupported syntax.
6160            if let Function {
6161                name: _,
6162                args: _,
6163                filter: None,
6164                over: None,
6165                distinct: false,
6166            } = &func
6167            {
6168                // Identical table functions can be de-duplicated.
6169                let id = self
6170                    .tables
6171                    .entry(func)
6172                    .or_insert_with(|| format!("table_func_{}", Uuid::new_v4()));
6173                // We know this is okay because id is is 11 characters + 36 characters, which is
6174                // less than our max length.
6175                *expr = Expr::Identifier(vec![Ident::new_unchecked(id.clone())]);
6176            }
6177        }
6178        if let Some(context) = disallowed_context {
6179            self.table_disallowed_context.push(context);
6180        }
6181
6182        visit_mut::visit_expr_mut(self, expr);
6183
6184        if disallowed_context.is_some() {
6185            self.table_disallowed_context.pop();
6186        }
6187    }
6188
6189    fn visit_select_item_mut(&mut self, si: &mut SelectItem<Aug>) {
6190        let old = self.in_select_item;
6191        self.in_select_item = true;
6192        visit_mut::visit_select_item_mut(self, si);
6193        self.in_select_item = old;
6194    }
6195}
6196
6197#[derive(Default)]
6198struct WindowFuncCollector {
6199    window_funcs: Vec<Expr<Aug>>,
6200}
6201
6202impl WindowFuncCollector {
6203    fn into_result(self) -> Vec<Expr<Aug>> {
6204        // Dedup while preserving the order.
6205        let mut seen = BTreeSet::new();
6206        let window_funcs_dedupped = self
6207            .window_funcs
6208            .into_iter()
6209            .filter(move |expr| seen.insert(expr.clone()))
6210            // Reverse the order, so that in case of a nested window function call, the
6211            // inner one is evaluated first.
6212            .rev()
6213            .collect();
6214        window_funcs_dedupped
6215    }
6216}
6217
6218impl Visit<'_, Aug> for WindowFuncCollector {
6219    fn visit_expr(&mut self, expr: &Expr<Aug>) {
6220        match expr {
6221            Expr::Function(func) => {
6222                if func.over.is_some() {
6223                    self.window_funcs.push(expr.clone());
6224                }
6225            }
6226            _ => (),
6227        }
6228        visit::visit_expr(self, expr);
6229    }
6230
6231    fn visit_query(&mut self, _query: &Query<Aug>) {
6232        // Don't go into subqueries. Those will be handled by their own `plan_query`.
6233    }
6234}
6235
6236/// Specifies how long a query will live.
6237#[derive(Debug, Eq, PartialEq, Copy, Clone)]
6238pub enum QueryLifetime {
6239    /// The query's (or the expression's) result will be computed at one point in time.
6240    OneShot,
6241    /// The query (or expression) is used in a dataflow that maintains an index.
6242    Index,
6243    /// The query (or expression) is used in a dataflow that maintains a materialized view.
6244    MaterializedView,
6245    /// The query (or expression) is used in a dataflow that maintains a SUBSCRIBE.
6246    Subscribe,
6247    /// The query (or expression) is part of a (non-materialized) view.
6248    View,
6249    /// The expression is part of a source definition.
6250    Source,
6251}
6252
6253impl QueryLifetime {
6254    /// (This used to impact whether the query is allowed to reason about the time at which it is
6255    /// running, e.g., by calling the `now()` function. Nowadays, this is decided by a different
6256    /// mechanism, see `ExprPrepStyle`.)
6257    pub fn is_one_shot(&self) -> bool {
6258        let result = match self {
6259            QueryLifetime::OneShot => true,
6260            QueryLifetime::Index => false,
6261            QueryLifetime::MaterializedView => false,
6262            QueryLifetime::Subscribe => false,
6263            QueryLifetime::View => false,
6264            QueryLifetime::Source => false,
6265        };
6266        assert_eq!(!result, self.is_maintained());
6267        result
6268    }
6269
6270    /// Maintained dataflows can't have a finishing applied directly. Therefore, the finishing is
6271    /// turned into a `TopK`.
6272    pub fn is_maintained(&self) -> bool {
6273        match self {
6274            QueryLifetime::OneShot => false,
6275            QueryLifetime::Index => true,
6276            QueryLifetime::MaterializedView => true,
6277            QueryLifetime::Subscribe => true,
6278            QueryLifetime::View => true,
6279            QueryLifetime::Source => true,
6280        }
6281    }
6282
6283    /// Most maintained dataflows don't allow SHOW commands currently. However, SUBSCRIBE does.
6284    pub fn allow_show(&self) -> bool {
6285        match self {
6286            QueryLifetime::OneShot => true,
6287            QueryLifetime::Index => false,
6288            QueryLifetime::MaterializedView => false,
6289            QueryLifetime::Subscribe => true, // SUBSCRIBE allows SHOW commands!
6290            QueryLifetime::View => false,
6291            QueryLifetime::Source => false,
6292        }
6293    }
6294}
6295
6296/// Description of a CTE sufficient for query planning.
6297#[derive(Debug, Clone)]
6298pub struct CteDesc {
6299    pub name: String,
6300    pub desc: RelationDesc,
6301}
6302
6303/// The state required when planning a `Query`.
6304#[derive(Debug, Clone)]
6305pub struct QueryContext<'a> {
6306    /// The context for the containing `Statement`.
6307    pub scx: &'a StatementContext<'a>,
6308    /// The lifetime that the planned query will have.
6309    pub lifetime: QueryLifetime,
6310    /// The scopes of the outer relation expression.
6311    pub outer_scopes: Vec<Scope>,
6312    /// The type of the outer relation expressions.
6313    pub outer_relation_types: Vec<RelationType>,
6314    /// CTEs for this query, mapping their assigned LocalIds to their definition.
6315    pub ctes: BTreeMap<LocalId, CteDesc>,
6316    pub recursion_guard: RecursionGuard,
6317}
6318
6319impl CheckedRecursion for QueryContext<'_> {
6320    fn recursion_guard(&self) -> &RecursionGuard {
6321        &self.recursion_guard
6322    }
6323}
6324
6325impl<'a> QueryContext<'a> {
6326    pub fn root(scx: &'a StatementContext, lifetime: QueryLifetime) -> QueryContext<'a> {
6327        QueryContext {
6328            scx,
6329            lifetime,
6330            outer_scopes: vec![],
6331            outer_relation_types: vec![],
6332            ctes: BTreeMap::new(),
6333            recursion_guard: RecursionGuard::with_limit(1024), // chosen arbitrarily
6334        }
6335    }
6336
6337    fn relation_type(&self, expr: &HirRelationExpr) -> RelationType {
6338        expr.typ(&self.outer_relation_types, &self.scx.param_types.borrow())
6339    }
6340
6341    /// Generate a new `QueryContext` appropriate to be used in subqueries of
6342    /// `self`.
6343    fn derived_context(&self, scope: Scope, relation_type: RelationType) -> QueryContext<'a> {
6344        let ctes = self.ctes.clone();
6345        let outer_scopes = iter::once(scope).chain(self.outer_scopes.clone()).collect();
6346        let outer_relation_types = iter::once(relation_type)
6347            .chain(self.outer_relation_types.clone())
6348            .collect();
6349
6350        QueryContext {
6351            scx: self.scx,
6352            lifetime: self.lifetime,
6353            outer_scopes,
6354            outer_relation_types,
6355            ctes,
6356            recursion_guard: self.recursion_guard.clone(),
6357        }
6358    }
6359
6360    /// Derives a `QueryContext` for a scope that contains no columns.
6361    fn empty_derived_context(&self) -> QueryContext<'a> {
6362        let scope = Scope::empty();
6363        let ty = RelationType::empty();
6364        self.derived_context(scope, ty)
6365    }
6366
6367    /// Resolves `object` to a table expr, i.e. creating a `Get` or inlining a
6368    /// CTE.
6369    pub fn resolve_table_name(
6370        &self,
6371        object: ResolvedItemName,
6372    ) -> Result<(HirRelationExpr, Scope), PlanError> {
6373        match object {
6374            ResolvedItemName::Item {
6375                id,
6376                full_name,
6377                version,
6378                ..
6379            } => {
6380                let name = full_name.into();
6381                let item = self.scx.get_item(&id).at_version(version);
6382                let desc = item
6383                    .desc(&self.scx.catalog.resolve_full_name(item.name()))?
6384                    .clone();
6385                let expr = HirRelationExpr::Get {
6386                    id: Id::Global(item.global_id()),
6387                    typ: desc.typ().clone(),
6388                };
6389
6390                let scope = Scope::from_source(Some(name), desc.iter_names().cloned());
6391
6392                Ok((expr, scope))
6393            }
6394            ResolvedItemName::Cte { id, name } => {
6395                let name = name.into();
6396                let cte = self.ctes.get(&id).unwrap();
6397                let expr = HirRelationExpr::Get {
6398                    id: Id::Local(id),
6399                    typ: cte.desc.typ().clone(),
6400                };
6401
6402                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6403
6404                Ok((expr, scope))
6405            }
6406            ResolvedItemName::ContinualTask { id, name } => {
6407                let cte = self.ctes.get(&id).unwrap();
6408                let expr = HirRelationExpr::Get {
6409                    id: Id::Local(id),
6410                    typ: cte.desc.typ().clone(),
6411                };
6412
6413                let scope = Scope::from_source(Some(name), cte.desc.iter_names());
6414
6415                Ok((expr, scope))
6416            }
6417            ResolvedItemName::Error => unreachable!("should have been caught in name resolution"),
6418        }
6419    }
6420
6421    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6422    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6423    pub fn humanize_scalar_type(&self, typ: &ScalarType, postgres_compat: bool) -> String {
6424        self.scx.humanize_scalar_type(typ, postgres_compat)
6425    }
6426}
6427
6428/// A bundle of unrelated things that we need for planning `Expr`s.
6429#[derive(Debug, Clone)]
6430pub struct ExprContext<'a> {
6431    pub qcx: &'a QueryContext<'a>,
6432    /// The name of this kind of expression eg "WHERE clause". Used only for error messages.
6433    pub name: &'a str,
6434    /// The context for the `Query` that contains this `Expr`.
6435    /// The current scope.
6436    pub scope: &'a Scope,
6437    /// The type of the current relation expression upon which this scalar
6438    /// expression will be evaluated.
6439    pub relation_type: &'a RelationType,
6440    /// Are aggregate functions allowed in this context
6441    pub allow_aggregates: bool,
6442    /// Are subqueries allowed in this context
6443    pub allow_subqueries: bool,
6444    /// Are parameters allowed in this context.
6445    pub allow_parameters: bool,
6446    /// Are window functions allowed in this context
6447    pub allow_windows: bool,
6448}
6449
6450impl CheckedRecursion for ExprContext<'_> {
6451    fn recursion_guard(&self) -> &RecursionGuard {
6452        &self.qcx.recursion_guard
6453    }
6454}
6455
6456impl<'a> ExprContext<'a> {
6457    pub fn catalog(&self) -> &dyn SessionCatalog {
6458        self.qcx.scx.catalog
6459    }
6460
6461    pub fn with_name(&self, name: &'a str) -> ExprContext<'a> {
6462        let mut ecx = self.clone();
6463        ecx.name = name;
6464        ecx
6465    }
6466
6467    pub fn column_type<E>(&self, expr: &E) -> E::Type
6468    where
6469        E: AbstractExpr,
6470    {
6471        expr.typ(
6472            &self.qcx.outer_relation_types,
6473            self.relation_type,
6474            &self.qcx.scx.param_types.borrow(),
6475        )
6476    }
6477
6478    pub fn scalar_type<E>(&self, expr: &E) -> <E::Type as AbstractColumnType>::AbstractScalarType
6479    where
6480        E: AbstractExpr,
6481    {
6482        self.column_type(expr).scalar_type()
6483    }
6484
6485    fn derived_query_context(&self) -> QueryContext {
6486        let mut scope = self.scope.clone();
6487        scope.lateral_barrier = true;
6488        self.qcx.derived_context(scope, self.relation_type.clone())
6489    }
6490
6491    pub fn require_feature_flag(&self, flag: &'static FeatureFlag) -> Result<(), PlanError> {
6492        self.qcx.scx.require_feature_flag(flag)
6493    }
6494
6495    pub fn param_types(&self) -> &RefCell<BTreeMap<usize, ScalarType>> {
6496        &self.qcx.scx.param_types
6497    }
6498
6499    /// The returned String is more detailed when the `postgres_compat` flag is not set. However,
6500    /// the flag should be set in, e.g., the implementation of the `pg_typeof` function.
6501    pub fn humanize_scalar_type(&self, typ: &ScalarType, postgres_compat: bool) -> String {
6502        self.qcx.scx.humanize_scalar_type(typ, postgres_compat)
6503    }
6504}