1use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, SqlScalarType};
16use mz_sql_parser::ast::InspectShardStatement;
17use std::time::Duration;
18use uncased::UncasedStr;
19
20use crate::ast::display::AstDisplay;
21use crate::ast::{
22 CloseStatement, DeallocateStatement, DeclareStatement, DiscardStatement, DiscardTarget,
23 ExecuteStatement, FetchOption, FetchOptionName, FetchStatement, PrepareStatement,
24 ResetVariableStatement, SetVariableStatement, SetVariableTo, ShowVariableStatement,
25};
26use crate::names::{self, Aug};
27use crate::plan::statement::{StatementContext, StatementDesc};
28use crate::plan::{
29 ClosePlan, DeallocatePlan, DeclarePlan, ExecutePlan, ExecuteTimeout, FetchPlan,
30 InspectShardPlan, Params, Plan, PlanError, PreparePlan, ResetVariablePlan, SetVariablePlan,
31 ShowVariablePlan, VariableValue, describe, query,
32};
33use crate::session::vars;
34use crate::session::vars::{SCHEMA_ALIAS, VarInput};
35
36pub fn describe_set_variable(
37 _: &StatementContext,
38 _: SetVariableStatement,
39) -> Result<StatementDesc, PlanError> {
40 Ok(StatementDesc::new(None))
41}
42
43pub fn plan_set_variable(
44 scx: &StatementContext,
45 SetVariableStatement {
46 local,
47 variable,
48 to,
49 }: SetVariableStatement,
50) -> Result<Plan, PlanError> {
51 let value = plan_set_variable_to(to)?;
52 let name = variable.into_string();
53
54 if let VariableValue::Values(values) = &value {
58 vars::check_transaction_isolation_feature_flag(
59 &name,
60 VarInput::SqlSet(values),
61 scx.catalog.system_vars(),
62 )?;
63 }
64
65 Ok(Plan::SetVariable(SetVariablePlan { name, value, local }))
66}
67
68pub fn plan_set_variable_to(to: SetVariableTo) -> Result<VariableValue, PlanError> {
69 match to {
70 SetVariableTo::Default => Ok(VariableValue::Default),
71 SetVariableTo::Values(values) => {
72 let values = values
78 .into_iter()
79 .map(|v| v.into_unquoted_value())
80 .collect();
81 Ok(VariableValue::Values(values))
82 }
83 }
84}
85
86pub fn describe_reset_variable(
87 _: &StatementContext,
88 _: ResetVariableStatement,
89) -> Result<StatementDesc, PlanError> {
90 Ok(StatementDesc::new(None))
91}
92
93pub fn plan_reset_variable(
94 _: &StatementContext,
95 ResetVariableStatement { variable }: ResetVariableStatement,
96) -> Result<Plan, PlanError> {
97 Ok(Plan::ResetVariable(ResetVariablePlan {
98 name: variable.to_string(),
99 }))
100}
101
102pub fn describe_show_variable(
103 _: &StatementContext,
104 ShowVariableStatement { variable, .. }: ShowVariableStatement,
105) -> Result<StatementDesc, PlanError> {
106 let desc = if variable.as_str() == UncasedStr::new("ALL") {
107 RelationDesc::builder()
108 .with_column("name", SqlScalarType::String.nullable(false))
109 .with_column("setting", SqlScalarType::String.nullable(false))
110 .with_column("description", SqlScalarType::String.nullable(false))
111 .finish()
112 } else if variable.as_str() == SCHEMA_ALIAS {
113 RelationDesc::builder()
114 .with_column(variable.as_str(), SqlScalarType::String.nullable(true))
115 .finish()
116 } else {
117 RelationDesc::builder()
118 .with_column(variable.as_str(), SqlScalarType::String.nullable(false))
119 .finish()
120 };
121 Ok(StatementDesc::new(Some(desc)))
122}
123
124pub fn plan_show_variable(
125 _: &StatementContext,
126 ShowVariableStatement { variable }: ShowVariableStatement,
127) -> Result<Plan, PlanError> {
128 if variable.as_str() == UncasedStr::new("ALL") {
129 Ok(Plan::ShowAllVariables)
130 } else {
131 Ok(Plan::ShowVariable(ShowVariablePlan {
132 name: variable.to_string(),
133 }))
134 }
135}
136
137pub fn describe_inspect_shard(
138 _: &StatementContext,
139 InspectShardStatement { .. }: InspectShardStatement,
140) -> Result<StatementDesc, PlanError> {
141 let desc = RelationDesc::builder()
142 .with_column("state", SqlScalarType::Jsonb.nullable(false))
143 .finish();
144 Ok(StatementDesc::new(Some(desc)))
145}
146
147pub fn plan_inspect_shard(
148 scx: &StatementContext,
149 InspectShardStatement { id }: InspectShardStatement,
150) -> Result<Plan, PlanError> {
151 let id: CatalogItemId = id.parse().map_err(|_| sql_err!("invalid shard id"))?;
152 let gid = scx
154 .catalog
155 .try_get_item(&id)
156 .ok_or_else(|| sql_err!("item doesn't exist"))?
157 .at_version(RelationVersionSelector::Latest)
158 .global_id();
159 Ok(Plan::InspectShard(InspectShardPlan { id: gid }))
160}
161
162pub fn describe_discard(
163 _: &StatementContext,
164 _: DiscardStatement,
165) -> Result<StatementDesc, PlanError> {
166 Ok(StatementDesc::new(None))
167}
168
169pub fn plan_discard(
170 _: &StatementContext,
171 DiscardStatement { target }: DiscardStatement,
172) -> Result<Plan, PlanError> {
173 match target {
174 DiscardTarget::All => Ok(Plan::DiscardAll),
175 DiscardTarget::Temp => Ok(Plan::DiscardTemp),
176 DiscardTarget::Sequences => bail_unsupported!("DISCARD SEQUENCES"),
177 DiscardTarget::Plans => bail_unsupported!("DISCARD PLANS"),
178 }
179}
180
181pub fn describe_declare(
182 scx: &StatementContext,
183 DeclareStatement { stmt, .. }: DeclareStatement<Aug>,
184 param_types_in: &[Option<SqlScalarType>],
185) -> Result<StatementDesc, PlanError> {
186 let (stmt_resolved, _) = names::resolve(scx.catalog, *stmt)?;
187 let desc = describe(scx.pcx()?, scx.catalog, stmt_resolved, param_types_in)?;
190 for (i, ty) in desc.param_types.into_iter().enumerate() {
193 scx.param_types.borrow_mut().insert(i + 1, ty);
194 }
195 Ok(StatementDesc::new(None))
196}
197
198pub fn plan_declare(
199 _: &StatementContext,
200 DeclareStatement { name, stmt, sql }: DeclareStatement<Aug>,
201 params: &Params,
202) -> Result<Plan, PlanError> {
203 Ok(Plan::Declare(DeclarePlan {
204 name: name.to_string(),
205 stmt: *stmt,
206 sql,
207 params: params.clone(),
208 }))
209}
210
211pub fn describe_fetch(
212 scx: &StatementContext,
213 FetchStatement {
214 name,
215 count: _,
216 options: _,
217 }: FetchStatement<Aug>,
218) -> Result<StatementDesc, PlanError> {
219 if let Some(mut desc) = scx
220 .catalog
221 .get_portal_desc_unverified(&name.to_string())
222 .cloned()
223 {
224 desc.param_types = Vec::new();
227 Ok(desc)
228 } else {
229 Err(PlanError::UnknownCursor(name.to_string()))
230 }
231}
232
233generate_extracted_config!(FetchOption, (Timeout, Duration));
234
235pub fn plan_fetch(
236 _: &StatementContext,
237 FetchStatement {
238 name,
239 count,
240 options,
241 }: FetchStatement<Aug>,
242) -> Result<Plan, PlanError> {
243 let FetchOptionExtracted { timeout, .. } = options.try_into()?;
244 let timeout = match timeout {
245 Some(timeout) => {
246 const DAY: Duration = Duration::from_secs(60 * 60 * 24);
250 if timeout > DAY {
251 sql_bail!("timeout out of range: {}s", timeout.as_secs_f64());
252 }
253 ExecuteTimeout::Seconds(timeout.as_secs_f64())
254 }
255 None => ExecuteTimeout::WaitOnce,
257 };
258 Ok(Plan::Fetch(FetchPlan {
259 name: name.to_string(),
260 count,
261 timeout,
262 }))
263}
264
265pub fn describe_close(_: &StatementContext, _: CloseStatement) -> Result<StatementDesc, PlanError> {
266 Ok(StatementDesc::new(None))
267}
268
269pub fn plan_close(
270 _: &StatementContext,
271 CloseStatement { name }: CloseStatement,
272) -> Result<Plan, PlanError> {
273 Ok(Plan::Close(ClosePlan {
274 name: name.to_string(),
275 }))
276}
277
278pub fn describe_prepare(
279 _: &StatementContext,
280 _: PrepareStatement<Aug>,
281) -> Result<StatementDesc, PlanError> {
282 Ok(StatementDesc::new(None))
283}
284
285pub fn plan_prepare(
286 scx: &StatementContext,
287 PrepareStatement { name, stmt, sql }: PrepareStatement<Aug>,
288) -> Result<Plan, PlanError> {
289 let param_types = [];
291 let (stmt_resolved, _) = names::resolve(scx.catalog, *stmt.clone())?;
292 let desc = describe(scx.pcx()?, scx.catalog, stmt_resolved, ¶m_types)?;
293 Ok(Plan::Prepare(PreparePlan {
294 name: name.to_string(),
295 stmt: *stmt,
296 desc,
297 sql,
298 }))
299}
300
301pub fn describe_execute(
302 scx: &StatementContext,
303 stmt: ExecuteStatement<Aug>,
304) -> Result<StatementDesc, PlanError> {
305 Ok(plan_execute_desc(scx, stmt)?.0.clone())
311}
312
313pub fn plan_execute(
314 scx: &StatementContext,
315 stmt: ExecuteStatement<Aug>,
316) -> Result<Plan, PlanError> {
317 Ok(plan_execute_desc(scx, stmt)?.1)
318}
319
320fn plan_execute_desc<'a>(
321 scx: &'a StatementContext,
322 ExecuteStatement { name, params }: ExecuteStatement<Aug>,
323) -> Result<(&'a StatementDesc, Plan), PlanError> {
324 let name = name.to_string();
325 let desc = match scx.catalog.get_prepared_statement_desc(&name) {
326 Some(desc) => desc,
327 None => sql_bail!("unknown prepared statement {}", name),
329 };
330 Ok((
331 desc,
332 Plan::Execute(ExecutePlan {
333 name,
334 params: query::plan_params(scx, params, desc)?,
335 }),
336 ))
337}
338
339pub fn describe_deallocate(
340 _: &StatementContext,
341 _: DeallocateStatement,
342) -> Result<StatementDesc, PlanError> {
343 Ok(StatementDesc::new(None))
344}
345
346pub fn plan_deallocate(
347 _: &StatementContext,
348 DeallocateStatement { name }: DeallocateStatement,
349) -> Result<Plan, PlanError> {
350 Ok(Plan::Deallocate(DeallocatePlan {
351 name: name.map(|name| name.to_string()),
352 }))
353}