mz_sql/plan/
side_effecting_func.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for side-effecting functions.
11//!
12//! In PostgreSQL, these functions can appear anywhere in a query:
13//!
14//! ```sql
15//! SELECT 1 WHERE pg_cancel_backend(1234)
16//! ```
17//!
18//! In Materialize, our compute layer cannot execute functions with side
19//! effects. So we sniff out the common form of calls to side-effecting
20//! functions, i.e. at the top level of a `SELECT`
21//!
22//! ```sql
23//! SELECT side_effecting_function(...)
24//! ```
25//!
26//! where all arguments are literals or bound parameters, and plan them
27//! specially as a `Plan::SideEffectingFunc`. This gets us compatibility with
28//! PostgreSQL for most real-world use cases, without causing stress for the
29//! compute layer (optimizer, dataflow execution, etc.), as we can apply all the
30//! side effects entirely in the adapter layer.
31
32use std::collections::BTreeMap;
33use std::sync::LazyLock;
34
35use enum_kinds::EnumKind;
36use itertools::Itertools;
37use mz_ore::cast::ReinterpretCast;
38use mz_ore::collections::CollectionExt;
39use mz_ore::result::ResultExt;
40use mz_repr::RelationType;
41use mz_repr::{ColumnType, Datum, RelationDesc, RowArena, ScalarType};
42use mz_sql_parser::ast::{CteBlock, Expr, Function, FunctionArgs, Select, SelectItem, SetExpr};
43
44use crate::ast::{Query, SelectStatement};
45use crate::func::Func;
46use crate::names::Aug;
47use crate::plan::query::{self, ExprContext, QueryLifetime};
48use crate::plan::scope::Scope;
49use crate::plan::statement::StatementContext;
50use crate::plan::typeconv::CastContext;
51use crate::plan::{HirScalarExpr, Params};
52use crate::plan::{PlanError, QueryContext};
53
54/// A side-effecting function is a function whose evaluation triggers side
55/// effects.
56///
57/// See the module docs for details.
58#[derive(Debug, EnumKind)]
59#[enum_kind(SefKind)]
60pub enum SideEffectingFunc {
61    /// The `pg_cancel_backend` function, .
62    PgCancelBackend {
63        // The ID of the connection to cancel.
64        connection_id: u32,
65    },
66}
67
68/// Describes a `SELECT` if it contains calls to side-effecting functions.
69///
70/// See the module docs for details.
71pub fn describe_select_if_side_effecting(
72    scx: &StatementContext,
73    select: &SelectStatement<Aug>,
74) -> Result<Option<RelationDesc>, PlanError> {
75    let Some(sef_call) = extract_sef_call(scx, select)? else {
76        return Ok(None);
77    };
78
79    // We currently support only a single call to a side-effecting function
80    // without an alias, so there is always a single output column is named
81    // after the function.
82    let desc = RelationDesc::builder()
83        .with_column(sef_call.imp.name, sef_call.imp.return_type.clone())
84        .finish();
85
86    Ok(Some(desc))
87}
88
89/// Plans the `SELECT` if it contains calls to side-effecting functions.
90///
91/// See the module docs for details.
92pub fn plan_select_if_side_effecting(
93    scx: &StatementContext,
94    select: &SelectStatement<Aug>,
95    params: &Params,
96) -> Result<Option<SideEffectingFunc>, PlanError> {
97    let Some(sef_call) = extract_sef_call(scx, select)? else {
98        return Ok(None);
99    };
100
101    // Bind parameters and then eagerly evaluate each argument. Expressions that
102    // cannot be eagerly evaluated should have been rejected by `extract_sef_call`.
103    let temp_storage = RowArena::new();
104    let mut args = vec![];
105    for mut arg in sef_call.args {
106        arg.bind_parameters(scx, QueryLifetime::OneShot, params)?;
107        let arg = arg.lower_uncorrelated()?;
108        args.push(arg);
109    }
110    let mut datums = vec![];
111    for arg in &args {
112        let datum = arg.eval(&[], &temp_storage)?;
113        datums.push(datum);
114    }
115
116    let func = (sef_call.imp.plan_fn)(&datums);
117
118    Ok(Some(func))
119}
120
121/// Helper function used in both describing and planning a side-effecting
122/// `SELECT`.
123fn extract_sef_call(
124    scx: &StatementContext,
125    select: &SelectStatement<Aug>,
126) -> Result<Option<SefCall>, PlanError> {
127    // First check if the `SELECT` contains exactly one function call.
128    let SelectStatement {
129        query:
130            Query {
131                ctes: CteBlock::Simple(ctes),
132                body: SetExpr::Select(body),
133                order_by,
134                limit: None,
135                offset: None,
136            },
137        as_of: None,
138    } = select
139    else {
140        return Ok(None);
141    };
142    if !ctes.is_empty() || !order_by.is_empty() {
143        return Ok(None);
144    }
145    let Select {
146        distinct: None,
147        projection,
148        from,
149        selection: None,
150        group_by,
151        having: None,
152        qualify: None,
153        options,
154    } = &**body
155    else {
156        return Ok(None);
157    };
158    if !from.is_empty() || !group_by.is_empty() || !options.is_empty() || projection.len() != 1 {
159        return Ok(None);
160    }
161    let [
162        SelectItem::Expr {
163            expr:
164                Expr::Function(Function {
165                    name,
166                    args: FunctionArgs::Args { args, order_by },
167                    filter: None,
168                    over: None,
169                    distinct: false,
170                }),
171            alias: None,
172        },
173    ] = &projection[..]
174    else {
175        return Ok(None);
176    };
177    if !order_by.is_empty() {
178        return Ok(None);
179    }
180
181    // Check if the called function is a scalar function with exactly one
182    // implementation. All side-effecting functions have only a single
183    // implementation.
184    let Ok(func) = scx
185        .get_item_by_resolved_name(name)
186        .and_then(|item| item.func().err_into())
187    else {
188        return Ok(None);
189    };
190    let func_impl = match func {
191        Func::Scalar(impls) if impls.len() == 1 => impls.into_element(),
192        _ => return Ok(None),
193    };
194
195    // Check whether the implementation is a known side-effecting function.
196    let Some(sef_impl) = PG_CATALOG_SEF_BUILTINS.get(&func_impl.oid) else {
197        return Ok(None);
198    };
199
200    // Check that the number of provided arguments matches the function
201    // signature.
202    if args.len() != sef_impl.param_types.len() {
203        // We return `Ok(None)` instead of an error for the same reason to let
204        // the function selection code produce the standard "no function matches
205        // the given name and argument types" error.
206        return Ok(None);
207    }
208
209    // Plan and coerce all argument expressions.
210    let mut args_out = vec![];
211    let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
212    let ecx = ExprContext {
213        qcx: &qcx,
214        name: sef_impl.name,
215        scope: &Scope::empty(),
216        relation_type: &RelationType::empty(),
217        allow_aggregates: false,
218        allow_subqueries: false,
219        allow_parameters: true,
220        allow_windows: false,
221    };
222    for (arg, ty) in args.iter().zip_eq(sef_impl.param_types) {
223        // If we encounter an error when planning the argument expression, that
224        // error is unrelated to planning the function call and can be returned
225        // directly to the user.
226        let arg = query::plan_expr(&ecx, arg)?;
227
228        // Implicitly cast the argument to the correct type. This matches what
229        // the standard function selection code will do.
230        //
231        // If the cast fails, we give up on planning the side-effecting function but
232        // intentionally do not produce an error. This way, we fall into the
233        // standard function selection code, which will produce the correct "no
234        // function matches the given name and argument types" error rather than a
235        // "cast failed" error.
236        let Ok(arg) = arg.cast_to(&ecx, CastContext::Implicit, ty) else {
237            return Ok(None);
238        };
239
240        args_out.push(arg);
241    }
242
243    Ok(Some(SefCall {
244        imp: sef_impl,
245        args: args_out,
246    }))
247}
248
249struct SefCall {
250    imp: &'static SideEffectingFuncImpl,
251    args: Vec<HirScalarExpr>,
252}
253
254/// Defines the implementation of a side-effecting function.
255///
256/// This is a very restricted subset of the [`Func`] struct (no overloads, no
257/// variadic arguments, etc) to make side-effecting functions easier to plan.
258pub struct SideEffectingFuncImpl {
259    /// The name of the function.
260    pub name: &'static str,
261    /// The OID of the function.
262    pub oid: u32,
263    /// The parameter types for the function.
264    pub param_types: &'static [ScalarType],
265    /// The return type of the function.
266    pub return_type: ColumnType,
267    /// A function that will produce a `SideEffectingFunc` given arguments
268    /// that have been evaluated to `Datum`s.
269    pub plan_fn: fn(&[Datum]) -> SideEffectingFunc,
270}
271
272/// A map of the side-effecting functions in the `pg_catalog` schema, keyed by
273/// OID.
274pub static PG_CATALOG_SEF_BUILTINS: LazyLock<BTreeMap<u32, SideEffectingFuncImpl>> =
275    LazyLock::new(|| {
276        [PG_CANCEL_BACKEND]
277            .into_iter()
278            .map(|f| (f.oid, f))
279            .collect()
280    });
281
282// Implementations of each side-effecting function follow.
283//
284// If you add a new side-effecting function, be sure to add it to the map above.
285
286const PG_CANCEL_BACKEND: SideEffectingFuncImpl = SideEffectingFuncImpl {
287    name: "pg_cancel_backend",
288    oid: 2171,
289    param_types: &[ScalarType::Int32],
290    return_type: ScalarType::Bool.nullable(false),
291    plan_fn: |datums| -> SideEffectingFunc {
292        SideEffectingFunc::PgCancelBackend {
293            connection_id: u32::reinterpret_cast(datums[0].unwrap_int32()),
294        }
295    },
296};