1use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, ScalarType};
16use mz_sql_parser::ast::InspectShardStatement;
17use std::time::Duration;
18use uncased::UncasedStr;
19
20use crate::ast::display::AstDisplay;
21use crate::ast::{
22 CloseStatement, DeallocateStatement, DeclareStatement, DiscardStatement, DiscardTarget,
23 ExecuteStatement, FetchOption, FetchOptionName, FetchStatement, PrepareStatement,
24 ResetVariableStatement, SetVariableStatement, SetVariableTo, ShowVariableStatement,
25};
26use crate::names::{self, Aug};
27use crate::plan::statement::{StatementContext, StatementDesc};
28use crate::plan::{
29 ClosePlan, DeallocatePlan, DeclarePlan, ExecutePlan, ExecuteTimeout, FetchPlan,
30 InspectShardPlan, Params, Plan, PlanError, PreparePlan, ResetVariablePlan, SetVariablePlan,
31 ShowVariablePlan, VariableValue, describe, query,
32};
33use crate::session::vars;
34use crate::session::vars::{IsolationLevel, SCHEMA_ALIAS, TRANSACTION_ISOLATION_VAR_NAME};
35
36pub fn describe_set_variable(
37 _: &StatementContext,
38 _: SetVariableStatement,
39) -> Result<StatementDesc, PlanError> {
40 Ok(StatementDesc::new(None))
41}
42
43pub fn plan_set_variable(
44 scx: &StatementContext,
45 SetVariableStatement {
46 local,
47 variable,
48 to,
49 }: SetVariableStatement,
50) -> Result<Plan, PlanError> {
51 let value = plan_set_variable_to(to)?;
52 let name = variable.into_string();
53
54 if let VariableValue::Values(values) = &value {
55 if let Some(value) = values.first() {
56 if name.as_str() == TRANSACTION_ISOLATION_VAR_NAME
57 && value == IsolationLevel::StrongSessionSerializable.as_str()
58 {
59 scx.require_feature_flag(&vars::ENABLE_SESSION_TIMELINES)?;
60 }
61 }
62 }
63
64 Ok(Plan::SetVariable(SetVariablePlan { name, value, local }))
65}
66
67pub fn plan_set_variable_to(to: SetVariableTo) -> Result<VariableValue, PlanError> {
68 match to {
69 SetVariableTo::Default => Ok(VariableValue::Default),
70 SetVariableTo::Values(values) => {
71 let values = values
77 .into_iter()
78 .map(|v| v.into_unquoted_value())
79 .collect();
80 Ok(VariableValue::Values(values))
81 }
82 }
83}
84
85pub fn describe_reset_variable(
86 _: &StatementContext,
87 _: ResetVariableStatement,
88) -> Result<StatementDesc, PlanError> {
89 Ok(StatementDesc::new(None))
90}
91
92pub fn plan_reset_variable(
93 _: &StatementContext,
94 ResetVariableStatement { variable }: ResetVariableStatement,
95) -> Result<Plan, PlanError> {
96 Ok(Plan::ResetVariable(ResetVariablePlan {
97 name: variable.to_string(),
98 }))
99}
100
101pub fn describe_show_variable(
102 _: &StatementContext,
103 ShowVariableStatement { variable, .. }: ShowVariableStatement,
104) -> Result<StatementDesc, PlanError> {
105 let desc = if variable.as_str() == UncasedStr::new("ALL") {
106 RelationDesc::builder()
107 .with_column("name", ScalarType::String.nullable(false))
108 .with_column("setting", ScalarType::String.nullable(false))
109 .with_column("description", ScalarType::String.nullable(false))
110 .finish()
111 } else if variable.as_str() == SCHEMA_ALIAS {
112 RelationDesc::builder()
113 .with_column(variable.as_str(), ScalarType::String.nullable(true))
114 .finish()
115 } else {
116 RelationDesc::builder()
117 .with_column(variable.as_str(), ScalarType::String.nullable(false))
118 .finish()
119 };
120 Ok(StatementDesc::new(Some(desc)))
121}
122
123pub fn plan_show_variable(
124 _: &StatementContext,
125 ShowVariableStatement { variable }: ShowVariableStatement,
126) -> Result<Plan, PlanError> {
127 if variable.as_str() == UncasedStr::new("ALL") {
128 Ok(Plan::ShowAllVariables)
129 } else {
130 Ok(Plan::ShowVariable(ShowVariablePlan {
131 name: variable.to_string(),
132 }))
133 }
134}
135
136pub fn describe_inspect_shard(
137 _: &StatementContext,
138 InspectShardStatement { .. }: InspectShardStatement,
139) -> Result<StatementDesc, PlanError> {
140 let desc = RelationDesc::builder()
141 .with_column("state", ScalarType::Jsonb.nullable(false))
142 .finish();
143 Ok(StatementDesc::new(Some(desc)))
144}
145
146pub fn plan_inspect_shard(
147 scx: &StatementContext,
148 InspectShardStatement { id }: InspectShardStatement,
149) -> Result<Plan, PlanError> {
150 let id: CatalogItemId = id.parse().map_err(|_| sql_err!("invalid shard id"))?;
151 let gid = scx
153 .catalog
154 .try_get_item(&id)
155 .ok_or_else(|| sql_err!("item doesn't exist"))?
156 .at_version(RelationVersionSelector::Latest)
157 .global_id();
158 Ok(Plan::InspectShard(InspectShardPlan { id: gid }))
159}
160
161pub fn describe_discard(
162 _: &StatementContext,
163 _: DiscardStatement,
164) -> Result<StatementDesc, PlanError> {
165 Ok(StatementDesc::new(None))
166}
167
168pub fn plan_discard(
169 _: &StatementContext,
170 DiscardStatement { target }: DiscardStatement,
171) -> Result<Plan, PlanError> {
172 match target {
173 DiscardTarget::All => Ok(Plan::DiscardAll),
174 DiscardTarget::Temp => Ok(Plan::DiscardTemp),
175 DiscardTarget::Sequences => bail_unsupported!("DISCARD SEQUENCES"),
176 DiscardTarget::Plans => bail_unsupported!("DISCARD PLANS"),
177 }
178}
179
180pub fn describe_declare(
181 scx: &StatementContext,
182 DeclareStatement { stmt, .. }: DeclareStatement<Aug>,
183 param_types_in: &[Option<ScalarType>],
184) -> Result<StatementDesc, PlanError> {
185 let (stmt_resolved, _) = names::resolve(scx.catalog, *stmt)?;
186 let desc = describe(scx.pcx()?, scx.catalog, stmt_resolved, param_types_in)?;
189 for (i, ty) in desc.param_types.into_iter().enumerate() {
192 scx.param_types.borrow_mut().insert(i + 1, ty);
193 }
194 Ok(StatementDesc::new(None))
195}
196
197pub fn plan_declare(
198 _: &StatementContext,
199 DeclareStatement { name, stmt, sql }: DeclareStatement<Aug>,
200 params: &Params,
201) -> Result<Plan, PlanError> {
202 Ok(Plan::Declare(DeclarePlan {
203 name: name.to_string(),
204 stmt: *stmt,
205 sql,
206 params: params.clone(),
207 }))
208}
209
210pub fn describe_fetch(
211 scx: &StatementContext,
212 FetchStatement {
213 name,
214 count: _,
215 options: _,
216 }: FetchStatement<Aug>,
217) -> Result<StatementDesc, PlanError> {
218 if let Some(mut desc) = scx
219 .catalog
220 .get_portal_desc_unverified(&name.to_string())
221 .cloned()
222 {
223 desc.param_types = Vec::new();
226 Ok(desc)
227 } else {
228 Err(PlanError::UnknownCursor(name.to_string()))
229 }
230}
231
232generate_extracted_config!(FetchOption, (Timeout, Duration));
233
234pub fn plan_fetch(
235 _: &StatementContext,
236 FetchStatement {
237 name,
238 count,
239 options,
240 }: FetchStatement<Aug>,
241) -> Result<Plan, PlanError> {
242 let FetchOptionExtracted { timeout, .. } = options.try_into()?;
243 let timeout = match timeout {
244 Some(timeout) => {
245 const DAY: Duration = Duration::from_secs(60 * 60 * 24);
249 if timeout > DAY {
250 sql_bail!("timeout out of range: {}s", timeout.as_secs_f64());
251 }
252 ExecuteTimeout::Seconds(timeout.as_secs_f64())
253 }
254 None => ExecuteTimeout::WaitOnce,
256 };
257 Ok(Plan::Fetch(FetchPlan {
258 name: name.to_string(),
259 count,
260 timeout,
261 }))
262}
263
264pub fn describe_close(_: &StatementContext, _: CloseStatement) -> Result<StatementDesc, PlanError> {
265 Ok(StatementDesc::new(None))
266}
267
268pub fn plan_close(
269 _: &StatementContext,
270 CloseStatement { name }: CloseStatement,
271) -> Result<Plan, PlanError> {
272 Ok(Plan::Close(ClosePlan {
273 name: name.to_string(),
274 }))
275}
276
277pub fn describe_prepare(
278 _: &StatementContext,
279 _: PrepareStatement<Aug>,
280) -> Result<StatementDesc, PlanError> {
281 Ok(StatementDesc::new(None))
282}
283
284pub fn plan_prepare(
285 scx: &StatementContext,
286 PrepareStatement { name, stmt, sql }: PrepareStatement<Aug>,
287) -> Result<Plan, PlanError> {
288 let param_types = [];
290 let (stmt_resolved, _) = names::resolve(scx.catalog, *stmt.clone())?;
291 let desc = describe(scx.pcx()?, scx.catalog, stmt_resolved, ¶m_types)?;
292 Ok(Plan::Prepare(PreparePlan {
293 name: name.to_string(),
294 stmt: *stmt,
295 desc,
296 sql,
297 }))
298}
299
300pub fn describe_execute(
301 scx: &StatementContext,
302 stmt: ExecuteStatement<Aug>,
303) -> Result<StatementDesc, PlanError> {
304 Ok(plan_execute_desc(scx, stmt)?.0.clone())
310}
311
312pub fn plan_execute(
313 scx: &StatementContext,
314 stmt: ExecuteStatement<Aug>,
315) -> Result<Plan, PlanError> {
316 Ok(plan_execute_desc(scx, stmt)?.1)
317}
318
319fn plan_execute_desc<'a>(
320 scx: &'a StatementContext,
321 ExecuteStatement { name, params }: ExecuteStatement<Aug>,
322) -> Result<(&'a StatementDesc, Plan), PlanError> {
323 let name = name.to_string();
324 let desc = match scx.catalog.get_prepared_statement_desc(&name) {
325 Some(desc) => desc,
326 None => sql_bail!("unknown prepared statement {}", name),
328 };
329 Ok((
330 desc,
331 Plan::Execute(ExecutePlan {
332 name,
333 params: query::plan_params(scx, params, desc)?,
334 }),
335 ))
336}
337
338pub fn describe_deallocate(
339 _: &StatementContext,
340 _: DeallocateStatement,
341) -> Result<StatementDesc, PlanError> {
342 Ok(StatementDesc::new(None))
343}
344
345pub fn plan_deallocate(
346 _: &StatementContext,
347 DeallocateStatement { name }: DeallocateStatement,
348) -> Result<Plan, PlanError> {
349 Ok(Plan::Deallocate(DeallocatePlan {
350 name: name.map(|name| name.to_string()),
351 }))
352}