1use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24 ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25 CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26 CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27 CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28 CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29 MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30 UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37pub fn ident(ident: Ident) -> String {
39 ident.into_string()
40}
41
42pub fn ident_ref(ident: &Ident) -> &str {
44 ident.as_str()
45}
46
47pub fn column_name(id: Ident) -> ColumnName {
49 ColumnName::from(ident(id))
50}
51
52pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54 if name.0.len() < 1 || name.0.len() > 3 {
55 return Err(PlanError::MisqualifiedName(name.to_string()));
56 }
57 let out = PartialItemName {
58 item: ident(
59 name.0
60 .pop()
61 .expect("name checked to have at least one component"),
62 ),
63 schema: name.0.pop().map(ident),
64 database: name.0.pop().map(ident),
65 };
66 assert!(name.0.is_empty());
67 Ok(out)
68}
69
70pub fn unresolved_schema_name(
72 mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74 if name.0.len() < 1 || name.0.len() > 2 {
75 return Err(PlanError::MisqualifiedName(name.to_string()));
76 }
77 let out = PartialSchemaName {
78 schema: ident(
79 name.0
80 .pop()
81 .expect("name checked to have at least one component"),
82 ),
83 database: name.0.pop().map(ident),
84 };
85 assert!(name.0.is_empty());
86 Ok(out)
87}
88
89pub fn op(op: &Op) -> Result<&str, PlanError> {
93 if let Some(namespace) = &op.namespace {
94 if namespace.len() != 0
95 && (namespace.len() != 1
96 || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97 {
98 sql_bail!(
99 "operator does not exist: {}.{}",
100 namespace.iter().map(|n| n.to_string()).join("."),
101 op.op,
102 )
103 }
104 }
105 Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110 Value(Value),
111 Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118 SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119 }
120 }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124 fn from(s: SqlValueOrSecret) -> Self {
125 match s {
126 SqlValueOrSecret::Value(v) => Some(v),
127 SqlValueOrSecret::Secret(_id) => None,
128 }
129 }
130}
131
132pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136 let mut out = vec![];
138 if let RawDatabaseSpecifier::Name(n) = name.database {
139 out.push(Ident::new_unchecked(n));
140 }
141 out.push(Ident::new_unchecked(name.schema));
142 out.push(Ident::new_unchecked(name.item));
143 UnresolvedItemName(out)
144}
145
146pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149 match raw_name.0.len() {
150 3 => Ok(FullItemName {
151 item: ident(raw_name.0.pop().unwrap()),
152 schema: ident(raw_name.0.pop().unwrap()),
153 database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154 }),
155 2 => Ok(FullItemName {
156 item: ident(raw_name.0.pop().unwrap()),
157 schema: ident(raw_name.0.pop().unwrap()),
158 database: RawDatabaseSpecifier::Ambient,
159 }),
160 _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161 }
162}
163
164pub fn create_statement(
173 scx: &StatementContext,
174 mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176 let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177 Ok(unresolve(
178 scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179 ))
180 };
181
182 let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183 Ok(unresolve(scx.allocate_temporary_full_name(
184 unresolved_item_name(name.clone())?,
185 )))
186 };
187
188 struct QueryNormalizer {
189 ctes: Vec<Ident>,
190 err: Option<PlanError>,
191 }
192
193 impl QueryNormalizer {
194 fn new() -> QueryNormalizer {
195 QueryNormalizer {
196 ctes: vec![],
197 err: None,
198 }
199 }
200 }
201
202 impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203 fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204 let n = self.ctes.len();
205 match &query.ctes {
206 CteBlock::Simple(ctes) => {
207 for cte in ctes.iter() {
208 self.ctes.push(cte.alias.name.clone());
209 }
210 }
211 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212 for cte in ctes.iter() {
213 self.ctes.push(cte.name.clone());
214 }
215 }
216 }
217 visit_mut::visit_query_mut(self, query);
218 self.ctes.truncate(n);
219 }
220
221 fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222 match &mut func.args {
223 FunctionArgs::Star => (),
224 FunctionArgs::Args { args, order_by } => {
225 for arg in args {
226 self.visit_expr_mut(arg);
227 }
228 for expr in order_by {
229 self.visit_order_by_expr_mut(expr);
230 }
231 }
232 }
233 if let Some(over) = &mut func.over {
234 self.visit_window_spec_mut(over);
235 }
236 }
237
238 fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239 match table_factor {
240 TableFactor::Table { name, alias, .. } => {
241 self.visit_item_name_mut(name);
242 if let Some(alias) = alias {
243 self.visit_table_alias_mut(alias);
244 }
245 }
246 _ => visit_mut::visit_table_factor_mut(self, table_factor),
249 }
250 }
251 }
252
253 match &mut stmt {
264 Statement::CreateSource(CreateSourceStatement {
265 name,
266 in_cluster: _,
267 col_names: _,
268 connection: _,
269 format: _,
270 include_metadata: _,
271 envelope: _,
272 if_not_exists,
273 key_constraint: _,
274 with_options: _,
275 external_references: _,
276 progress_subsource: _,
277 }) => {
278 *name = allocate_name(name)?;
279 *if_not_exists = false;
280 }
281
282 Statement::CreateSubsource(CreateSubsourceStatement {
283 name,
284 columns,
285 constraints: _,
286 of_source: _,
287 if_not_exists,
288 with_options: _,
289 }) => {
290 *name = allocate_name(name)?;
291 let mut normalizer = QueryNormalizer::new();
292 for c in columns {
293 normalizer.visit_column_def_mut(c);
294 }
295 if let Some(err) = normalizer.err {
296 return Err(err);
297 }
298 *if_not_exists = false;
299 }
300
301 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302 name,
303 columns,
304 constraints: _,
305 external_reference: _,
306 source: _,
307 if_not_exists,
308 format: _,
309 include_metadata: _,
310 envelope: _,
311 with_options: _,
312 }) => {
313 *name = allocate_name(name)?;
314 let mut normalizer = QueryNormalizer::new();
315 if let TableFromSourceColumns::Defined(columns) = columns {
316 for c in columns {
317 normalizer.visit_column_def_mut(c);
318 }
319 }
320 if let Some(err) = normalizer.err {
321 return Err(err);
322 }
323 *if_not_exists = false;
324 }
325
326 Statement::CreateTable(CreateTableStatement {
327 name,
328 columns,
329 constraints: _,
330 if_not_exists,
331 temporary,
332 with_options: _,
333 }) => {
334 *name = if *temporary {
335 allocate_temporary_name(name)?
336 } else {
337 allocate_name(name)?
338 };
339 let mut normalizer = QueryNormalizer::new();
340 for c in columns {
341 normalizer.visit_column_def_mut(c);
342 }
343 if let Some(err) = normalizer.err {
344 return Err(err);
345 }
346 *if_not_exists = false;
347 }
348
349 Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350 name,
351 is_table: _,
352 if_not_exists,
353 include_headers: _,
354 body_format: _,
355 validate_using: _,
356 in_cluster: _,
357 }) => {
358 *name = allocate_name(name)?;
359 *if_not_exists = false;
360 }
361
362 Statement::CreateSink(CreateSinkStatement {
363 name,
364 in_cluster: _,
365 connection: _,
366 format: _,
367 envelope: _,
368 if_not_exists,
369 ..
370 }) => {
371 if let Some(name) = name {
372 *name = allocate_name(name)?;
373 }
374 *if_not_exists = false;
375 }
376
377 Statement::CreateView(CreateViewStatement {
378 temporary,
379 if_exists,
380 definition:
381 ViewDefinition {
382 name,
383 query,
384 columns: _,
385 },
386 }) => {
387 *name = if *temporary {
388 allocate_temporary_name(name)?
389 } else {
390 allocate_name(name)?
391 };
392 {
393 let mut normalizer = QueryNormalizer::new();
394 normalizer.visit_query_mut(query);
395 if let Some(err) = normalizer.err {
396 return Err(err);
397 }
398 }
399 *if_exists = IfExistsBehavior::Error;
400 }
401
402 Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403 if_exists,
404 name,
405 columns: _,
406 replacement_for: _,
407 in_cluster: _,
408 in_cluster_replica: _,
409 query,
410 with_options: _,
411 as_of: _,
412 }) => {
413 *name = allocate_name(name)?;
414 {
415 let mut normalizer = QueryNormalizer::new();
416 normalizer.visit_query_mut(query);
417 if let Some(err) = normalizer.err {
418 return Err(err);
419 }
420 }
421 *if_exists = IfExistsBehavior::Error;
422 }
423
424 Statement::CreateContinualTask(CreateContinualTaskStatement {
425 name,
426 columns: _,
427 input,
428 with_options: _,
429 stmts,
430 in_cluster: _,
431 as_of: _,
432 sugar,
433 }) => {
434 let mut normalizer = QueryNormalizer::new();
435 normalizer.visit_item_name_mut(name);
436 normalizer.visit_item_name_mut(input);
437 for stmt in stmts {
438 match stmt {
439 ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
440 ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
441 }
442 }
443 match sugar {
444 Some(CreateContinualTaskSugar::Transform { transform }) => {
445 normalizer.visit_query_mut(transform)
446 }
447 Some(CreateContinualTaskSugar::Retain { retain }) => {
448 normalizer.visit_expr_mut(retain)
449 }
450 None => {}
451 }
452 if let Some(err) = normalizer.err {
453 return Err(err);
454 }
455 }
456
457 Statement::CreateIndex(CreateIndexStatement {
458 name: _,
459 in_cluster: _,
460 key_parts,
461 with_options: _,
462 if_not_exists,
463 ..
464 }) => {
465 let mut normalizer = QueryNormalizer::new();
466 if let Some(key_parts) = key_parts {
467 for key_part in key_parts {
468 normalizer.visit_expr_mut(key_part);
469 if let Some(err) = normalizer.err {
470 return Err(err);
471 }
472 }
473 }
474 *if_not_exists = false;
475 }
476
477 Statement::CreateType(CreateTypeStatement { name, as_type }) => {
478 *name = allocate_name(name)?;
479 let mut normalizer = QueryNormalizer::new();
480 normalizer.visit_create_type_as_mut(as_type);
481 if let Some(err) = normalizer.err {
482 return Err(err);
483 }
484 }
485 Statement::CreateSecret(CreateSecretStatement {
486 name,
487 if_not_exists,
488 value: _,
489 }) => {
490 *name = allocate_name(name)?;
491 *if_not_exists = false;
492 }
493 Statement::CreateConnection(CreateConnectionStatement {
494 name,
495 connection_type: _,
496 values,
497 with_options,
498 if_not_exists,
499 }) => {
500 *name = allocate_name(name)?;
501 *if_not_exists = false;
502
503 values.sort();
504
505 with_options
508 .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
509 }
510
511 _ => unreachable!(),
512 }
513
514 Ok(stmt.to_ast_string_stable())
515}
516
517macro_rules! generate_extracted_config {
542 (
544 $option_ty:ty, [$($processed:tt)*],
545 ($option_name:path, $t:ty), $($tail:tt),*
546 ) => {
547 generate_extracted_config!(
548 $option_ty,
549 [$($processed)* ($option_name, Option::<$t>, None, false)],
550 $($tail),*
551 );
552 };
553 (
555 $option_ty:ty, [$($processed:tt)*],
556 ($option_name:path, $t:ty)
557 ) => {
558 generate_extracted_config!(
559 $option_ty,
560 [$($processed)* ($option_name, Option::<$t>, None, false)]
561 );
562 };
563 (
565 $option_ty:ty, [$($processed:tt)*],
566 ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*
567 ) => {
568 generate_extracted_config!(
569 $option_ty,
570 [$($processed)* ($option_name, $t, $v, false)],
571 $($tail),*
572 );
573 };
574 (
576 $option_ty:ty, [$($processed:tt)*],
577 ($option_name:path, $t:ty, Default($v:expr))
578 ) => {
579 generate_extracted_config!(
580 $option_ty,
581 [$($processed)* ($option_name, $t, $v, false)]
582 );
583 };
584 (
586 $option_ty:ty, [$($processed:tt)*],
587 ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*
588 ) => {
589 generate_extracted_config!(
590 $option_ty,
591 [$($processed)* ($option_name, $t, vec![], true)],
592 $($tail),*
593 );
594 };
595 (
597 $option_ty:ty, [$($processed:tt)*],
598 ($option_name:path, $t:ty, AllowMultiple)
599 ) => {
600 generate_extracted_config!(
601 $option_ty,
602 [$($processed)* ($option_name, $t, vec![], true)]
603 );
604 };
605 ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
606 paste::paste! {
607 #[derive(Debug)]
608 pub struct [<$option_ty Extracted>] {
609 pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
610 $(
611 pub [<$option_name:snake>]: generate_extracted_config!(
612 @ifty $allow_multiple,
613 Vec::<$t>,
614 $t
615 ),
616 )*
617 }
618
619 impl std::default::Default for [<$option_ty Extracted>] {
620 fn default() -> Self {
621 [<$option_ty Extracted>] {
622 seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
623 $(
624 [<$option_name:snake>]: <generate_extracted_config!(
625 @ifty $allow_multiple,
626 Vec::<$t>,
627 $t
628 )>::from($v),
629 )*
630 }
631 }
632 }
633
634 impl std::convert::TryFrom<Vec<$option_ty<Aug>>>
635 for [<$option_ty Extracted>]
636 {
637 type Error = $crate::plan::PlanError;
638 fn try_from(
639 v: Vec<$option_ty<Aug>>,
640 ) -> Result<[<$option_ty Extracted>], Self::Error> {
641 use [<$option_ty Name>]::*;
642 let mut extracted = [<$option_ty Extracted>]::default();
643 for option in v {
644 match option.name {
645 $(
646 $option_name => {
647 if !$allow_multiple
648 && !extracted.seen.insert(option.name.clone())
649 {
650 sql_bail!(
651 "{} specified more than once",
652 option.name.to_ast_string_simple(),
653 );
654 }
655 let val: $t = $crate::plan::with_options
656 ::TryFromValue::try_from_value(option.value)
657 .map_err(|e| sql_err!(
658 "invalid {}: {}",
659 option.name.to_ast_string_simple(),
660 e,
661 ))?;
662 generate_extracted_config!(
663 @ifexpr $allow_multiple,
664 extracted.[<$option_name:snake>].push(val),
665 extracted.[<$option_name:snake>] = val
666 );
667 }
668 )*
669 }
670 }
671 Ok(extracted)
672 }
673 }
674
675 impl [<$option_ty Extracted>] {
676 #[allow(unused)]
677 fn into_values(
678 self,
679 catalog: &dyn crate::catalog::SessionCatalog,
680 ) -> Vec<$option_ty<Aug>> {
681 use [<$option_ty Name>]::*;
682 let mut options = Vec::new();
683 $(
684 let value = self.[<$option_name:snake>];
685 let values: Vec<_> = generate_extracted_config!(
686 @ifexpr $allow_multiple,
687 value,
688 Vec::from([value])
689 );
690 for value in values {
691 let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
695 Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
696 >>::try_into_value(value, catalog);
697 match maybe_value {
698 Some(value) => {
699 let option = $option_ty {name: $option_name, value};
700 options.push(option);
701 },
702 None => (),
703 }
704 }
705 )*
706 options
707 }
708 }
709 }
710 };
711 ($option_ty:ty, $($h:tt),+) => {
712 generate_extracted_config!{$option_ty, [], $($h),+}
713 };
714 (@ifexpr false, $lhs:expr, $rhs:expr) => {
717 $rhs
718 };
719 (@ifexpr true, $lhs:expr, $rhs:expr) => {
720 $lhs
721 };
722 (@ifty false, $lhs:ty, $rhs:ty) => {
723 $rhs
724 };
725 (@ifty true, $lhs:ty, $rhs:ty) => {
726 $lhs
727 };
728}
729
730pub(crate) use generate_extracted_config;