1use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24 ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25 CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26 CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27 CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28 CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29 MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30 UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37pub fn ident(ident: Ident) -> String {
39 ident.into_string()
40}
41
42pub fn ident_ref(ident: &Ident) -> &str {
44 ident.as_str()
45}
46
47pub fn column_name(id: Ident) -> ColumnName {
49 ColumnName::from(ident(id))
50}
51
52pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54 if name.0.len() < 1 || name.0.len() > 3 {
55 return Err(PlanError::MisqualifiedName(name.to_string()));
56 }
57 let out = PartialItemName {
58 item: ident(
59 name.0
60 .pop()
61 .expect("name checked to have at least one component"),
62 ),
63 schema: name.0.pop().map(ident),
64 database: name.0.pop().map(ident),
65 };
66 assert!(name.0.is_empty());
67 Ok(out)
68}
69
70pub fn unresolved_schema_name(
72 mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74 if name.0.len() < 1 || name.0.len() > 2 {
75 return Err(PlanError::MisqualifiedName(name.to_string()));
76 }
77 let out = PartialSchemaName {
78 schema: ident(
79 name.0
80 .pop()
81 .expect("name checked to have at least one component"),
82 ),
83 database: name.0.pop().map(ident),
84 };
85 assert!(name.0.is_empty());
86 Ok(out)
87}
88
89pub fn op(op: &Op) -> Result<&str, PlanError> {
93 if let Some(namespace) = &op.namespace {
94 if namespace.len() != 0
95 && (namespace.len() != 1
96 || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97 {
98 sql_bail!(
99 "operator does not exist: {}.{}",
100 namespace.iter().map(|n| n.to_string()).join("."),
101 op.op,
102 )
103 }
104 }
105 Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110 Value(Value),
111 Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118 SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119 }
120 }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124 fn from(s: SqlValueOrSecret) -> Self {
125 match s {
126 SqlValueOrSecret::Value(v) => Some(v),
127 SqlValueOrSecret::Secret(_id) => None,
128 }
129 }
130}
131
132pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136 let mut out = vec![];
138 if let RawDatabaseSpecifier::Name(n) = name.database {
139 out.push(Ident::new_unchecked(n));
140 }
141 out.push(Ident::new_unchecked(name.schema));
142 out.push(Ident::new_unchecked(name.item));
143 UnresolvedItemName(out)
144}
145
146pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149 match raw_name.0.len() {
150 3 => Ok(FullItemName {
151 item: ident(raw_name.0.pop().unwrap()),
152 schema: ident(raw_name.0.pop().unwrap()),
153 database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154 }),
155 2 => Ok(FullItemName {
156 item: ident(raw_name.0.pop().unwrap()),
157 schema: ident(raw_name.0.pop().unwrap()),
158 database: RawDatabaseSpecifier::Ambient,
159 }),
160 _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161 }
162}
163
164pub fn create_statement(
173 scx: &StatementContext,
174 mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176 let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177 Ok(unresolve(
178 scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179 ))
180 };
181
182 let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183 Ok(unresolve(scx.allocate_temporary_full_name(
184 unresolved_item_name(name.clone())?,
185 )))
186 };
187
188 struct QueryNormalizer {
189 ctes: Vec<Ident>,
190 err: Option<PlanError>,
191 }
192
193 impl QueryNormalizer {
194 fn new() -> QueryNormalizer {
195 QueryNormalizer {
196 ctes: vec![],
197 err: None,
198 }
199 }
200 }
201
202 impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203 fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204 let n = self.ctes.len();
205 match &query.ctes {
206 CteBlock::Simple(ctes) => {
207 for cte in ctes.iter() {
208 self.ctes.push(cte.alias.name.clone());
209 }
210 }
211 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212 for cte in ctes.iter() {
213 self.ctes.push(cte.name.clone());
214 }
215 }
216 }
217 visit_mut::visit_query_mut(self, query);
218 self.ctes.truncate(n);
219 }
220
221 fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222 match &mut func.args {
223 FunctionArgs::Star => (),
224 FunctionArgs::Args { args, order_by } => {
225 for arg in args {
226 self.visit_expr_mut(arg);
227 }
228 for expr in order_by {
229 self.visit_order_by_expr_mut(expr);
230 }
231 }
232 }
233 if let Some(over) = &mut func.over {
234 self.visit_window_spec_mut(over);
235 }
236 }
237
238 fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239 match table_factor {
240 TableFactor::Table { name, alias, .. } => {
241 self.visit_item_name_mut(name);
242 if let Some(alias) = alias {
243 self.visit_table_alias_mut(alias);
244 }
245 }
246 _ => visit_mut::visit_table_factor_mut(self, table_factor),
249 }
250 }
251 }
252
253 match &mut stmt {
264 Statement::CreateSource(CreateSourceStatement {
265 name,
266 in_cluster: _,
267 col_names: _,
268 connection: _,
269 format: _,
270 include_metadata: _,
271 envelope: _,
272 if_not_exists,
273 key_constraint: _,
274 with_options: _,
275 external_references: _,
276 progress_subsource: _,
277 }) => {
278 *name = allocate_name(name)?;
279 *if_not_exists = false;
280 }
281
282 Statement::CreateSubsource(CreateSubsourceStatement {
283 name,
284 columns,
285 constraints: _,
286 of_source: _,
287 if_not_exists,
288 with_options: _,
289 }) => {
290 *name = allocate_name(name)?;
291 let mut normalizer = QueryNormalizer::new();
292 for c in columns {
293 normalizer.visit_column_def_mut(c);
294 }
295 if let Some(err) = normalizer.err {
296 return Err(err);
297 }
298 *if_not_exists = false;
299 }
300
301 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302 name,
303 columns,
304 constraints: _,
305 external_reference: _,
306 source: _,
307 if_not_exists,
308 format: _,
309 include_metadata: _,
310 envelope: _,
311 with_options: _,
312 }) => {
313 *name = allocate_name(name)?;
314 let mut normalizer = QueryNormalizer::new();
315 if let TableFromSourceColumns::Defined(columns) = columns {
316 for c in columns {
317 normalizer.visit_column_def_mut(c);
318 }
319 }
320 if let Some(err) = normalizer.err {
321 return Err(err);
322 }
323 *if_not_exists = false;
324 }
325
326 Statement::CreateTable(CreateTableStatement {
327 name,
328 columns,
329 constraints: _,
330 if_not_exists,
331 temporary,
332 with_options: _,
333 }) => {
334 *name = if *temporary {
335 allocate_temporary_name(name)?
336 } else {
337 allocate_name(name)?
338 };
339 let mut normalizer = QueryNormalizer::new();
340 for c in columns {
341 normalizer.visit_column_def_mut(c);
342 }
343 if let Some(err) = normalizer.err {
344 return Err(err);
345 }
346 *if_not_exists = false;
347 }
348
349 Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350 name,
351 is_table: _,
352 if_not_exists,
353 include_headers: _,
354 body_format: _,
355 validate_using: _,
356 in_cluster: _,
357 }) => {
358 *name = allocate_name(name)?;
359 *if_not_exists = false;
360 }
361
362 Statement::CreateSink(CreateSinkStatement {
363 name,
364 in_cluster: _,
365 connection: _,
366 format: _,
367 envelope: _,
368 if_not_exists,
369 ..
370 }) => {
371 if let Some(name) = name {
372 *name = allocate_name(name)?;
373 }
374 *if_not_exists = false;
375 }
376
377 Statement::CreateView(CreateViewStatement {
378 temporary,
379 if_exists,
380 definition:
381 ViewDefinition {
382 name,
383 query,
384 columns: _,
385 },
386 }) => {
387 *name = if *temporary {
388 allocate_temporary_name(name)?
389 } else {
390 allocate_name(name)?
391 };
392 {
393 let mut normalizer = QueryNormalizer::new();
394 normalizer.visit_query_mut(query);
395 if let Some(err) = normalizer.err {
396 return Err(err);
397 }
398 }
399 *if_exists = IfExistsBehavior::Error;
400 }
401
402 Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403 if_exists,
404 name,
405 columns: _,
406 replacement_for: _,
407 in_cluster: _,
408 query,
409 with_options: _,
410 as_of: _,
411 }) => {
412 *name = allocate_name(name)?;
413 {
414 let mut normalizer = QueryNormalizer::new();
415 normalizer.visit_query_mut(query);
416 if let Some(err) = normalizer.err {
417 return Err(err);
418 }
419 }
420 *if_exists = IfExistsBehavior::Error;
421 }
422
423 Statement::CreateContinualTask(CreateContinualTaskStatement {
424 name,
425 columns: _,
426 input,
427 with_options: _,
428 stmts,
429 in_cluster: _,
430 as_of: _,
431 sugar,
432 }) => {
433 let mut normalizer = QueryNormalizer::new();
434 normalizer.visit_item_name_mut(name);
435 normalizer.visit_item_name_mut(input);
436 for stmt in stmts {
437 match stmt {
438 ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
439 ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
440 }
441 }
442 match sugar {
443 Some(CreateContinualTaskSugar::Transform { transform }) => {
444 normalizer.visit_query_mut(transform)
445 }
446 Some(CreateContinualTaskSugar::Retain { retain }) => {
447 normalizer.visit_expr_mut(retain)
448 }
449 None => {}
450 }
451 if let Some(err) = normalizer.err {
452 return Err(err);
453 }
454 }
455
456 Statement::CreateIndex(CreateIndexStatement {
457 name: _,
458 in_cluster: _,
459 key_parts,
460 with_options: _,
461 if_not_exists,
462 ..
463 }) => {
464 let mut normalizer = QueryNormalizer::new();
465 if let Some(key_parts) = key_parts {
466 for key_part in key_parts {
467 normalizer.visit_expr_mut(key_part);
468 if let Some(err) = normalizer.err {
469 return Err(err);
470 }
471 }
472 }
473 *if_not_exists = false;
474 }
475
476 Statement::CreateType(CreateTypeStatement { name, as_type }) => {
477 *name = allocate_name(name)?;
478 let mut normalizer = QueryNormalizer::new();
479 normalizer.visit_create_type_as_mut(as_type);
480 if let Some(err) = normalizer.err {
481 return Err(err);
482 }
483 }
484 Statement::CreateSecret(CreateSecretStatement {
485 name,
486 if_not_exists,
487 value: _,
488 }) => {
489 *name = allocate_name(name)?;
490 *if_not_exists = false;
491 }
492 Statement::CreateConnection(CreateConnectionStatement {
493 name,
494 connection_type: _,
495 values,
496 with_options,
497 if_not_exists,
498 }) => {
499 *name = allocate_name(name)?;
500 *if_not_exists = false;
501
502 values.sort();
503
504 with_options
507 .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
508 }
509
510 _ => unreachable!(),
511 }
512
513 Ok(stmt.to_ast_string_stable())
514}
515
516macro_rules! generate_extracted_config {
541 (
543 $option_ty:ty, [$($processed:tt)*],
544 ($option_name:path, $t:ty), $($tail:tt),*
545 ) => {
546 generate_extracted_config!(
547 $option_ty,
548 [$($processed)* ($option_name, Option::<$t>, None, false)],
549 $($tail),*
550 );
551 };
552 (
554 $option_ty:ty, [$($processed:tt)*],
555 ($option_name:path, $t:ty)
556 ) => {
557 generate_extracted_config!(
558 $option_ty,
559 [$($processed)* ($option_name, Option::<$t>, None, false)]
560 );
561 };
562 (
564 $option_ty:ty, [$($processed:tt)*],
565 ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*
566 ) => {
567 generate_extracted_config!(
568 $option_ty,
569 [$($processed)* ($option_name, $t, $v, false)],
570 $($tail),*
571 );
572 };
573 (
575 $option_ty:ty, [$($processed:tt)*],
576 ($option_name:path, $t:ty, Default($v:expr))
577 ) => {
578 generate_extracted_config!(
579 $option_ty,
580 [$($processed)* ($option_name, $t, $v, false)]
581 );
582 };
583 (
585 $option_ty:ty, [$($processed:tt)*],
586 ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*
587 ) => {
588 generate_extracted_config!(
589 $option_ty,
590 [$($processed)* ($option_name, $t, vec![], true)],
591 $($tail),*
592 );
593 };
594 (
596 $option_ty:ty, [$($processed:tt)*],
597 ($option_name:path, $t:ty, AllowMultiple)
598 ) => {
599 generate_extracted_config!(
600 $option_ty,
601 [$($processed)* ($option_name, $t, vec![], true)]
602 );
603 };
604 ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
605 paste::paste! {
606 #[derive(Debug)]
607 pub struct [<$option_ty Extracted>] {
608 pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
609 $(
610 pub [<$option_name:snake>]: generate_extracted_config!(
611 @ifty $allow_multiple,
612 Vec::<$t>,
613 $t
614 ),
615 )*
616 }
617
618 impl std::default::Default for [<$option_ty Extracted>] {
619 fn default() -> Self {
620 [<$option_ty Extracted>] {
621 seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
622 $(
623 [<$option_name:snake>]: <generate_extracted_config!(
624 @ifty $allow_multiple,
625 Vec::<$t>,
626 $t
627 )>::from($v),
628 )*
629 }
630 }
631 }
632
633 impl std::convert::TryFrom<Vec<$option_ty<Aug>>>
634 for [<$option_ty Extracted>]
635 {
636 type Error = $crate::plan::PlanError;
637 fn try_from(
638 v: Vec<$option_ty<Aug>>,
639 ) -> Result<[<$option_ty Extracted>], Self::Error> {
640 use [<$option_ty Name>]::*;
641 let mut extracted = [<$option_ty Extracted>]::default();
642 for option in v {
643 match option.name {
644 $(
645 $option_name => {
646 if !$allow_multiple
647 && !extracted.seen.insert(option.name.clone())
648 {
649 sql_bail!(
650 "{} specified more than once",
651 option.name.to_ast_string_simple(),
652 );
653 }
654 let val: $t = $crate::plan::with_options
655 ::TryFromValue::try_from_value(option.value)
656 .map_err(|e| sql_err!(
657 "invalid {}: {}",
658 option.name.to_ast_string_simple(),
659 e,
660 ))?;
661 generate_extracted_config!(
662 @ifexpr $allow_multiple,
663 extracted.[<$option_name:snake>].push(val),
664 extracted.[<$option_name:snake>] = val
665 );
666 }
667 )*
668 }
669 }
670 Ok(extracted)
671 }
672 }
673
674 impl [<$option_ty Extracted>] {
675 #[allow(unused)]
676 fn into_values(
677 self,
678 catalog: &dyn crate::catalog::SessionCatalog,
679 ) -> Vec<$option_ty<Aug>> {
680 use [<$option_ty Name>]::*;
681 let mut options = Vec::new();
682 $(
683 let value = self.[<$option_name:snake>];
684 let values: Vec<_> = generate_extracted_config!(
685 @ifexpr $allow_multiple,
686 value,
687 Vec::from([value])
688 );
689 for value in values {
690 let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
694 Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
695 >>::try_into_value(value, catalog);
696 match maybe_value {
697 Some(value) => {
698 let option = $option_ty {name: $option_name, value};
699 options.push(option);
700 },
701 None => (),
702 }
703 }
704 )*
705 options
706 }
707 }
708 }
709 };
710 ($option_ty:ty, $($h:tt),+) => {
711 generate_extracted_config!{$option_ty, [], $($h),+}
712 };
713 (@ifexpr false, $lhs:expr, $rhs:expr) => {
716 $rhs
717 };
718 (@ifexpr true, $lhs:expr, $rhs:expr) => {
719 $lhs
720 };
721 (@ifty false, $lhs:ty, $rhs:ty) => {
722 $rhs
723 };
724 (@ifty true, $lhs:ty, $rhs:ty) => {
725 $lhs
726 };
727}
728
729pub(crate) use generate_extracted_config;