1use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24 ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25 CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26 CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27 CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28 CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29 MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30 UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37pub fn ident(ident: Ident) -> String {
39 ident.into_string()
40}
41
42pub fn ident_ref(ident: &Ident) -> &str {
44 ident.as_str()
45}
46
47pub fn column_name(id: Ident) -> ColumnName {
49 ColumnName::from(ident(id))
50}
51
52pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54 if name.0.len() < 1 || name.0.len() > 3 {
55 return Err(PlanError::MisqualifiedName(name.to_string()));
56 }
57 let out = PartialItemName {
58 item: ident(
59 name.0
60 .pop()
61 .expect("name checked to have at least one component"),
62 ),
63 schema: name.0.pop().map(ident),
64 database: name.0.pop().map(ident),
65 };
66 assert!(name.0.is_empty());
67 Ok(out)
68}
69
70pub fn unresolved_schema_name(
72 mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74 if name.0.len() < 1 || name.0.len() > 2 {
75 return Err(PlanError::MisqualifiedName(name.to_string()));
76 }
77 let out = PartialSchemaName {
78 schema: ident(
79 name.0
80 .pop()
81 .expect("name checked to have at least one component"),
82 ),
83 database: name.0.pop().map(ident),
84 };
85 assert!(name.0.is_empty());
86 Ok(out)
87}
88
89pub fn op(op: &Op) -> Result<&str, PlanError> {
93 if let Some(namespace) = &op.namespace {
94 if namespace.len() != 0
95 && (namespace.len() != 1
96 || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97 {
98 sql_bail!(
99 "operator does not exist: {}.{}",
100 namespace.iter().map(|n| n.to_string()).join("."),
101 op.op,
102 )
103 }
104 }
105 Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110 Value(Value),
111 Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118 SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119 }
120 }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124 fn from(s: SqlValueOrSecret) -> Self {
125 match s {
126 SqlValueOrSecret::Value(v) => Some(v),
127 SqlValueOrSecret::Secret(_id) => None,
128 }
129 }
130}
131
132pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136 let mut out = vec![];
138 if let RawDatabaseSpecifier::Name(n) = name.database {
139 out.push(Ident::new_unchecked(n));
140 }
141 out.push(Ident::new_unchecked(name.schema));
142 out.push(Ident::new_unchecked(name.item));
143 UnresolvedItemName(out)
144}
145
146pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149 match raw_name.0.len() {
150 3 => Ok(FullItemName {
151 item: ident(raw_name.0.pop().unwrap()),
152 schema: ident(raw_name.0.pop().unwrap()),
153 database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154 }),
155 2 => Ok(FullItemName {
156 item: ident(raw_name.0.pop().unwrap()),
157 schema: ident(raw_name.0.pop().unwrap()),
158 database: RawDatabaseSpecifier::Ambient,
159 }),
160 _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161 }
162}
163
164pub fn create_statement(
173 scx: &StatementContext,
174 mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176 let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177 Ok(unresolve(
178 scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179 ))
180 };
181
182 let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183 Ok(unresolve(scx.allocate_temporary_full_name(
184 unresolved_item_name(name.clone())?,
185 )))
186 };
187
188 struct QueryNormalizer {
189 ctes: Vec<Ident>,
190 err: Option<PlanError>,
191 }
192
193 impl QueryNormalizer {
194 fn new() -> QueryNormalizer {
195 QueryNormalizer {
196 ctes: vec![],
197 err: None,
198 }
199 }
200 }
201
202 impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203 fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204 let n = self.ctes.len();
205 match &query.ctes {
206 CteBlock::Simple(ctes) => {
207 for cte in ctes.iter() {
208 self.ctes.push(cte.alias.name.clone());
209 }
210 }
211 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212 for cte in ctes.iter() {
213 self.ctes.push(cte.name.clone());
214 }
215 }
216 }
217 visit_mut::visit_query_mut(self, query);
218 self.ctes.truncate(n);
219 }
220
221 fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222 match &mut func.args {
223 FunctionArgs::Star => (),
224 FunctionArgs::Args { args, order_by } => {
225 for arg in args {
226 self.visit_expr_mut(arg);
227 }
228 for expr in order_by {
229 self.visit_order_by_expr_mut(expr);
230 }
231 }
232 }
233 if let Some(over) = &mut func.over {
234 self.visit_window_spec_mut(over);
235 }
236 }
237
238 fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239 match table_factor {
240 TableFactor::Table { name, alias, .. } => {
241 self.visit_item_name_mut(name);
242 if let Some(alias) = alias {
243 self.visit_table_alias_mut(alias);
244 }
245 }
246 _ => visit_mut::visit_table_factor_mut(self, table_factor),
249 }
250 }
251 }
252
253 match &mut stmt {
264 Statement::CreateSource(CreateSourceStatement {
265 name,
266 in_cluster: _,
267 col_names: _,
268 connection: _,
269 format: _,
270 include_metadata: _,
271 envelope: _,
272 if_not_exists,
273 key_constraint: _,
274 with_options: _,
275 external_references: _,
276 progress_subsource: _,
277 }) => {
278 *name = allocate_name(name)?;
279 *if_not_exists = false;
280 }
281
282 Statement::CreateSubsource(CreateSubsourceStatement {
283 name,
284 columns,
285 constraints: _,
286 of_source: _,
287 if_not_exists,
288 with_options: _,
289 }) => {
290 *name = allocate_name(name)?;
291 let mut normalizer = QueryNormalizer::new();
292 for c in columns {
293 normalizer.visit_column_def_mut(c);
294 }
295 if let Some(err) = normalizer.err {
296 return Err(err);
297 }
298 *if_not_exists = false;
299 }
300
301 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302 name,
303 columns,
304 constraints: _,
305 external_reference: _,
306 source: _,
307 if_not_exists,
308 format: _,
309 include_metadata: _,
310 envelope: _,
311 with_options: _,
312 }) => {
313 *name = allocate_name(name)?;
314 let mut normalizer = QueryNormalizer::new();
315 if let TableFromSourceColumns::Defined(columns) = columns {
316 for c in columns {
317 normalizer.visit_column_def_mut(c);
318 }
319 }
320 if let Some(err) = normalizer.err {
321 return Err(err);
322 }
323 *if_not_exists = false;
324 }
325
326 Statement::CreateTable(CreateTableStatement {
327 name,
328 columns,
329 constraints: _,
330 if_not_exists,
331 temporary,
332 with_options: _,
333 }) => {
334 *name = if *temporary {
335 allocate_temporary_name(name)?
336 } else {
337 allocate_name(name)?
338 };
339 let mut normalizer = QueryNormalizer::new();
340 for c in columns {
341 normalizer.visit_column_def_mut(c);
342 }
343 if let Some(err) = normalizer.err {
344 return Err(err);
345 }
346 *if_not_exists = false;
347 }
348
349 Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350 name,
351 is_table: _,
352 if_not_exists,
353 include_headers: _,
354 body_format: _,
355 validate_using: _,
356 in_cluster: _,
357 }) => {
358 *name = allocate_name(name)?;
359 *if_not_exists = false;
360 }
361
362 Statement::CreateSink(CreateSinkStatement {
363 name,
364 in_cluster: _,
365 connection: _,
366 format: _,
367 envelope: _,
368 if_not_exists,
369 ..
370 }) => {
371 if let Some(name) = name {
372 *name = allocate_name(name)?;
373 }
374 *if_not_exists = false;
375 }
376
377 Statement::CreateView(CreateViewStatement {
378 temporary,
379 if_exists,
380 definition:
381 ViewDefinition {
382 name,
383 query,
384 columns: _,
385 },
386 }) => {
387 *name = if *temporary {
388 allocate_temporary_name(name)?
389 } else {
390 allocate_name(name)?
391 };
392 {
393 let mut normalizer = QueryNormalizer::new();
394 normalizer.visit_query_mut(query);
395 if let Some(err) = normalizer.err {
396 return Err(err);
397 }
398 }
399 *if_exists = IfExistsBehavior::Error;
400 }
401
402 Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403 if_exists,
404 name,
405 columns: _,
406 replacing: _,
407 in_cluster: _,
408 query,
409 with_options: _,
410 as_of: _,
411 }) => {
412 *name = allocate_name(name)?;
413 {
414 let mut normalizer = QueryNormalizer::new();
415 normalizer.visit_query_mut(query);
416 if let Some(err) = normalizer.err {
417 return Err(err);
418 }
419 }
420 *if_exists = IfExistsBehavior::Error;
421 }
422
423 Statement::CreateContinualTask(CreateContinualTaskStatement {
424 name,
425 columns: _,
426 input,
427 with_options: _,
428 stmts,
429 in_cluster: _,
430 as_of: _,
431 sugar,
432 }) => {
433 let mut normalizer = QueryNormalizer::new();
434 normalizer.visit_item_name_mut(name);
435 normalizer.visit_item_name_mut(input);
436 for stmt in stmts {
437 match stmt {
438 ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
439 ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
440 }
441 }
442 match sugar {
443 Some(CreateContinualTaskSugar::Transform { transform }) => {
444 normalizer.visit_query_mut(transform)
445 }
446 Some(CreateContinualTaskSugar::Retain { retain }) => {
447 normalizer.visit_expr_mut(retain)
448 }
449 None => {}
450 }
451 if let Some(err) = normalizer.err {
452 return Err(err);
453 }
454 }
455
456 Statement::CreateIndex(CreateIndexStatement {
457 name: _,
458 in_cluster: _,
459 key_parts,
460 with_options: _,
461 if_not_exists,
462 ..
463 }) => {
464 let mut normalizer = QueryNormalizer::new();
465 if let Some(key_parts) = key_parts {
466 for key_part in key_parts {
467 normalizer.visit_expr_mut(key_part);
468 if let Some(err) = normalizer.err {
469 return Err(err);
470 }
471 }
472 }
473 *if_not_exists = false;
474 }
475
476 Statement::CreateType(CreateTypeStatement { name, as_type }) => {
477 *name = allocate_name(name)?;
478 let mut normalizer = QueryNormalizer::new();
479 normalizer.visit_create_type_as_mut(as_type);
480 if let Some(err) = normalizer.err {
481 return Err(err);
482 }
483 }
484 Statement::CreateSecret(CreateSecretStatement {
485 name,
486 if_not_exists,
487 value: _,
488 }) => {
489 *name = allocate_name(name)?;
490 *if_not_exists = false;
491 }
492 Statement::CreateConnection(CreateConnectionStatement {
493 name,
494 connection_type: _,
495 values,
496 with_options,
497 if_not_exists,
498 }) => {
499 *name = allocate_name(name)?;
500 *if_not_exists = false;
501
502 values.sort();
503
504 with_options
507 .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
508 }
509
510 _ => unreachable!(),
511 }
512
513 Ok(stmt.to_ast_string_stable())
514}
515
516macro_rules! generate_extracted_config {
541 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty), $($tail:tt),*) => {
543 generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)], $(
544 $tail
545 ),*);
546 };
547 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty)) => {
549 generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)]);
550 };
551 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*) => {
553 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)], $(
554 $tail
555 ),*);
556 };
557 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr))) => {
559 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)]);
560 };
561 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*) => {
563 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)], $(
564 $tail
565 ),*);
566 };
567 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple)) => {
569 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)]);
570 };
571 ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
572 paste::paste! {
573 #[derive(Debug)]
574 pub struct [<$option_ty Extracted>] {
575 pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
576 $(
577 pub [<$option_name:snake>]: generate_extracted_config!(
578 @ifty $allow_multiple,
579 Vec::<$t>,
580 $t
581 ),
582 )*
583 }
584
585 impl std::default::Default for [<$option_ty Extracted>] {
586 fn default() -> Self {
587 [<$option_ty Extracted>] {
588 seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
589 $(
590 [<$option_name:snake>]: <generate_extracted_config!(
591 @ifty $allow_multiple,
592 Vec::<$t>,
593 $t
594 )>::from($v),
595 )*
596 }
597 }
598 }
599
600 impl std::convert::TryFrom<Vec<$option_ty<Aug>>> for [<$option_ty Extracted>] {
601 type Error = $crate::plan::PlanError;
602 fn try_from(v: Vec<$option_ty<Aug>>) -> Result<[<$option_ty Extracted>], Self::Error> {
603 use [<$option_ty Name>]::*;
604 let mut extracted = [<$option_ty Extracted>]::default();
605 for option in v {
606 match option.name {
607 $(
608 $option_name => {
609 if !$allow_multiple && !extracted.seen.insert(option.name.clone()) {
610 sql_bail!("{} specified more than once", option.name.to_ast_string_simple());
611 }
612 let val: $t = $crate::plan::with_options::TryFromValue::try_from_value(option.value)
613 .map_err(|e| sql_err!("invalid {}: {}", option.name.to_ast_string_simple(), e))?;
614 generate_extracted_config!(
615 @ifexpr $allow_multiple,
616 extracted.[<$option_name:snake>].push(val),
617 extracted.[<$option_name:snake>] = val
618 );
619 }
620 )*
621 }
622 }
623 Ok(extracted)
624 }
625 }
626
627 impl [<$option_ty Extracted>] {
628 #[allow(unused)]
629 fn into_values(self, catalog: &dyn crate::catalog::SessionCatalog) -> Vec<$option_ty<Aug>> {
630 use [<$option_ty Name>]::*;
631 let mut options = Vec::new();
632 $(
633 let value = self.[<$option_name:snake>];
634 let values: Vec<_> = generate_extracted_config!(
635 @ifexpr $allow_multiple,
636 value,
637 Vec::from([value])
638 );
639 for value in values {
640 let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
644 Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
645 >>::try_into_value(value, catalog);
646 match maybe_value {
647 Some(value) => {
648 let option = $option_ty {name: $option_name, value};
649 options.push(option);
650 },
651 None => (),
652 }
653 }
654 )*
655 options
656 }
657 }
658 }
659 };
660 ($option_ty:ty, $($h:tt),+) => {
661 generate_extracted_config!{$option_ty, [], $($h),+}
662 };
663 (@ifexpr false, $lhs:expr, $rhs:expr) => {
666 $rhs
667 };
668 (@ifexpr true, $lhs:expr, $rhs:expr) => {
669 $lhs
670 };
671 (@ifty false, $lhs:ty, $rhs:ty) => {
672 $rhs
673 };
674 (@ifty true, $lhs:ty, $rhs:ty) => {
675 $lhs
676 };
677}
678
679pub(crate) use generate_extracted_config;