1use std::collections::BTreeMap;
33use std::sync::LazyLock;
34
35use enum_kinds::EnumKind;
36use itertools::Itertools;
37use mz_ore::cast::ReinterpretCast;
38use mz_ore::collections::CollectionExt;
39use mz_ore::result::ResultExt;
40use mz_repr::SqlRelationType;
41use mz_repr::{Datum, RelationDesc, RowArena, SqlColumnType, SqlScalarType};
42use mz_sql_parser::ast::{CteBlock, Expr, Function, FunctionArgs, Select, SelectItem, SetExpr};
43
44use crate::ast::{Query, SelectStatement};
45use crate::func::Func;
46use crate::names::Aug;
47use crate::plan::query::{self, ExprContext, QueryLifetime};
48use crate::plan::scope::Scope;
49use crate::plan::statement::StatementContext;
50use crate::plan::typeconv::CastContext;
51use crate::plan::{HirScalarExpr, Params};
52use crate::plan::{PlanError, QueryContext};
53
54#[derive(Debug, EnumKind, Clone)]
59#[enum_kind(SefKind)]
60pub enum SideEffectingFunc {
61 PgCancelBackend {
63 connection_id: u32,
65 },
66}
67
68pub fn describe_select_if_side_effecting(
72 scx: &StatementContext,
73 select: &SelectStatement<Aug>,
74) -> Result<Option<RelationDesc>, PlanError> {
75 let Some(sef_call) = extract_sef_call(scx, select)? else {
76 return Ok(None);
77 };
78
79 let desc = RelationDesc::builder()
83 .with_column(sef_call.imp.name, sef_call.imp.return_type.clone())
84 .finish();
85
86 Ok(Some(desc))
87}
88
89pub fn plan_select_if_side_effecting(
93 scx: &StatementContext,
94 select: &SelectStatement<Aug>,
95 params: &Params,
96) -> Result<Option<SideEffectingFunc>, PlanError> {
97 let Some(sef_call) = extract_sef_call(scx, select)? else {
98 return Ok(None);
99 };
100
101 let temp_storage = RowArena::new();
104 let mut args = vec![];
105 for mut arg in sef_call.args {
106 arg.bind_parameters_and_simplify_offset(scx, QueryLifetime::OneShot, params)?;
107 let arg = arg.lower_uncorrelated(scx.catalog.system_vars())?;
108 args.push(arg);
109 }
110 let mut datums = vec![];
111 for arg in &args {
112 let datum = arg.eval(&[], &temp_storage)?;
113 datums.push(datum);
114 }
115
116 let func = (sef_call.imp.plan_fn)(&datums);
117
118 Ok(Some(func))
119}
120
121fn extract_sef_call(
124 scx: &StatementContext,
125 select: &SelectStatement<Aug>,
126) -> Result<Option<SefCall>, PlanError> {
127 let SelectStatement {
129 query:
130 Query {
131 ctes: CteBlock::Simple(ctes),
132 body: SetExpr::Select(body),
133 order_by,
134 limit: None,
135 offset: None,
136 },
137 as_of: None,
138 } = select
139 else {
140 return Ok(None);
141 };
142 if !ctes.is_empty() || !order_by.is_empty() {
143 return Ok(None);
144 }
145 let Select {
146 distinct: None,
147 projection,
148 from,
149 selection: None,
150 group_by,
151 having: None,
152 qualify: None,
153 options,
154 } = &**body
155 else {
156 return Ok(None);
157 };
158 if !from.is_empty() || !group_by.is_empty() || !options.is_empty() || projection.len() != 1 {
159 return Ok(None);
160 }
161 let [
162 SelectItem::Expr {
163 expr:
164 Expr::Function(Function {
165 name,
166 args: FunctionArgs::Args { args, order_by },
167 filter: None,
168 over: None,
169 distinct: false,
170 }),
171 alias: None,
172 },
173 ] = &projection[..]
174 else {
175 return Ok(None);
176 };
177 if !order_by.is_empty() {
178 return Ok(None);
179 }
180
181 let Ok(func) = scx
185 .get_item_by_resolved_name(name)
186 .and_then(|item| item.func().err_into())
187 else {
188 return Ok(None);
189 };
190 let func_impl = match func {
191 Func::Scalar(impls) if impls.len() == 1 => impls.into_element(),
192 _ => return Ok(None),
193 };
194
195 let Some(sef_impl) = PG_CATALOG_SEF_BUILTINS.get(&func_impl.oid) else {
197 return Ok(None);
198 };
199
200 if args.len() != sef_impl.param_types.len() {
203 return Ok(None);
207 }
208
209 let mut args_out = vec![];
211 let qcx = QueryContext::root(scx, QueryLifetime::OneShot);
212 let ecx = ExprContext {
213 qcx: &qcx,
214 name: sef_impl.name,
215 scope: &Scope::empty(),
216 relation_type: &SqlRelationType::empty(),
217 allow_aggregates: false,
218 allow_subqueries: false,
219 allow_parameters: true,
220 allow_windows: false,
221 };
222 for (arg, ty) in args.iter().zip_eq(sef_impl.param_types) {
223 let arg = query::plan_expr(&ecx, arg)?;
227
228 let Ok(arg) = arg.cast_to(&ecx, CastContext::Implicit, ty) else {
237 return Ok(None);
238 };
239
240 args_out.push(arg);
241 }
242
243 Ok(Some(SefCall {
244 imp: sef_impl,
245 args: args_out,
246 }))
247}
248
249struct SefCall {
250 imp: &'static SideEffectingFuncImpl,
251 args: Vec<HirScalarExpr>,
252}
253
254pub struct SideEffectingFuncImpl {
259 pub name: &'static str,
261 pub oid: u32,
263 pub param_types: &'static [SqlScalarType],
265 pub return_type: SqlColumnType,
267 pub plan_fn: fn(&[Datum]) -> SideEffectingFunc,
270}
271
272pub static PG_CATALOG_SEF_BUILTINS: LazyLock<BTreeMap<u32, SideEffectingFuncImpl>> =
275 LazyLock::new(|| {
276 [PG_CANCEL_BACKEND]
277 .into_iter()
278 .map(|f| (f.oid, f))
279 .collect()
280 });
281
282const PG_CANCEL_BACKEND: SideEffectingFuncImpl = SideEffectingFuncImpl {
287 name: "pg_cancel_backend",
288 oid: 2171,
289 param_types: &[SqlScalarType::Int32],
290 return_type: SqlScalarType::Bool.nullable(false),
291 plan_fn: |datums| -> SideEffectingFunc {
292 SideEffectingFunc::PgCancelBackend {
293 connection_id: u32::reinterpret_cast(datums[0].unwrap_int32()),
294 }
295 },
296};