1use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24 CreateConnectionStatement, CreateIndexStatement, CreateMaterializedViewStatement,
25 CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
26 CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
27 CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
28 MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
29 UnresolvedSchemaName, Value, ViewDefinition,
30};
31
32use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
33use crate::plan::error::PlanError;
34use crate::plan::statement::StatementContext;
35
36pub fn ident(ident: Ident) -> String {
38 ident.into_string()
39}
40
41pub fn ident_ref(ident: &Ident) -> &str {
43 ident.as_str()
44}
45
46pub fn column_name(id: Ident) -> ColumnName {
48 ColumnName::from(ident(id))
49}
50
51pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
53 if name.0.len() < 1 || name.0.len() > 3 {
54 return Err(PlanError::MisqualifiedName(name.to_string()));
55 }
56 let out = PartialItemName {
57 item: ident(
58 name.0
59 .pop()
60 .expect("name checked to have at least one component"),
61 ),
62 schema: name.0.pop().map(ident),
63 database: name.0.pop().map(ident),
64 };
65 assert!(name.0.is_empty());
66 Ok(out)
67}
68
69pub fn unresolved_schema_name(
71 mut name: UnresolvedSchemaName,
72) -> Result<PartialSchemaName, PlanError> {
73 if name.0.len() < 1 || name.0.len() > 2 {
74 return Err(PlanError::MisqualifiedName(name.to_string()));
75 }
76 let out = PartialSchemaName {
77 schema: ident(
78 name.0
79 .pop()
80 .expect("name checked to have at least one component"),
81 ),
82 database: name.0.pop().map(ident),
83 };
84 assert!(name.0.is_empty());
85 Ok(out)
86}
87
88pub fn op(op: &Op) -> Result<&str, PlanError> {
92 if let Some(namespace) = &op.namespace {
93 if namespace.len() != 0
94 && (namespace.len() != 1
95 || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
96 {
97 sql_bail!(
98 "operator does not exist: {}.{}",
99 namespace.iter().map(|n| n.to_string()).join("."),
100 op.op,
101 )
102 }
103 }
104 Ok(&op.op)
105}
106
107#[derive(Debug, Clone)]
108pub enum SqlValueOrSecret {
109 Value(Value),
110 Secret(GlobalId),
111}
112
113impl fmt::Display for SqlValueOrSecret {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 match self {
116 SqlValueOrSecret::Value(v) => write!(f, "{}", v),
117 SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
118 }
119 }
120}
121
122impl From<SqlValueOrSecret> for Option<Value> {
123 fn from(s: SqlValueOrSecret) -> Self {
124 match s {
125 SqlValueOrSecret::Value(v) => Some(v),
126 SqlValueOrSecret::Secret(_id) => None,
127 }
128 }
129}
130
131pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
135 let mut out = vec![];
137 if let RawDatabaseSpecifier::Name(n) = name.database {
138 out.push(Ident::new_unchecked(n));
139 }
140 out.push(Ident::new_unchecked(name.schema));
141 out.push(Ident::new_unchecked(name.item));
142 UnresolvedItemName(out)
143}
144
145pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
148 match raw_name.0.len() {
149 3 => Ok(FullItemName {
150 item: ident(raw_name.0.pop().unwrap()),
151 schema: ident(raw_name.0.pop().unwrap()),
152 database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
153 }),
154 2 => Ok(FullItemName {
155 item: ident(raw_name.0.pop().unwrap()),
156 schema: ident(raw_name.0.pop().unwrap()),
157 database: RawDatabaseSpecifier::Ambient,
158 }),
159 _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
160 }
161}
162
163pub fn create_statement(
172 scx: &StatementContext,
173 mut stmt: Statement<Aug>,
174) -> Result<String, PlanError> {
175 let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
176 Ok(unresolve(
177 scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
178 ))
179 };
180
181 let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
182 Ok(unresolve(scx.allocate_temporary_full_name(
183 unresolved_item_name(name.clone())?,
184 )))
185 };
186
187 struct QueryNormalizer {
188 ctes: Vec<Ident>,
189 err: Option<PlanError>,
190 }
191
192 impl QueryNormalizer {
193 fn new() -> QueryNormalizer {
194 QueryNormalizer {
195 ctes: vec![],
196 err: None,
197 }
198 }
199 }
200
201 impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
202 fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
203 let n = self.ctes.len();
204 match &query.ctes {
205 CteBlock::Simple(ctes) => {
206 for cte in ctes.iter() {
207 self.ctes.push(cte.alias.name.clone());
208 }
209 }
210 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
211 for cte in ctes.iter() {
212 self.ctes.push(cte.name.clone());
213 }
214 }
215 }
216 visit_mut::visit_query_mut(self, query);
217 self.ctes.truncate(n);
218 }
219
220 fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
221 match &mut func.args {
222 FunctionArgs::Star => (),
223 FunctionArgs::Args { args, order_by } => {
224 for arg in args {
225 self.visit_expr_mut(arg);
226 }
227 for expr in order_by {
228 self.visit_order_by_expr_mut(expr);
229 }
230 }
231 }
232 if let Some(over) = &mut func.over {
233 self.visit_window_spec_mut(over);
234 }
235 }
236
237 fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
238 match table_factor {
239 TableFactor::Table { name, alias, .. } => {
240 self.visit_item_name_mut(name);
241 if let Some(alias) = alias {
242 self.visit_table_alias_mut(alias);
243 }
244 }
245 _ => visit_mut::visit_table_factor_mut(self, table_factor),
248 }
249 }
250 }
251
252 match &mut stmt {
263 Statement::CreateSource(CreateSourceStatement {
264 name,
265 in_cluster: _,
266 col_names: _,
267 connection: _,
268 format: _,
269 include_metadata: _,
270 envelope: _,
271 if_not_exists,
272 key_constraint: _,
273 with_options: _,
274 external_references: _,
275 progress_subsource: _,
276 }) => {
277 *name = allocate_name(name)?;
278 *if_not_exists = false;
279 }
280
281 Statement::CreateSubsource(CreateSubsourceStatement {
282 name,
283 columns,
284 constraints: _,
285 of_source: _,
286 if_not_exists,
287 with_options: _,
288 }) => {
289 *name = allocate_name(name)?;
290 let mut normalizer = QueryNormalizer::new();
291 for c in columns {
292 normalizer.visit_column_def_mut(c);
293 }
294 if let Some(err) = normalizer.err {
295 return Err(err);
296 }
297 *if_not_exists = false;
298 }
299
300 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
301 name,
302 columns,
303 constraints: _,
304 external_reference: _,
305 source: _,
306 if_not_exists,
307 format: _,
308 include_metadata: _,
309 envelope: _,
310 with_options: _,
311 }) => {
312 *name = allocate_name(name)?;
313 let mut normalizer = QueryNormalizer::new();
314 if let TableFromSourceColumns::Defined(columns) = columns {
315 for c in columns {
316 normalizer.visit_column_def_mut(c);
317 }
318 }
319 if let Some(err) = normalizer.err {
320 return Err(err);
321 }
322 *if_not_exists = false;
323 }
324
325 Statement::CreateTable(CreateTableStatement {
326 name,
327 columns,
328 constraints: _,
329 if_not_exists,
330 temporary,
331 with_options: _,
332 }) => {
333 *name = if *temporary {
334 allocate_temporary_name(name)?
335 } else {
336 allocate_name(name)?
337 };
338 let mut normalizer = QueryNormalizer::new();
339 for c in columns {
340 normalizer.visit_column_def_mut(c);
341 }
342 if let Some(err) = normalizer.err {
343 return Err(err);
344 }
345 *if_not_exists = false;
346 }
347
348 Statement::CreateWebhookSource(CreateWebhookSourceStatement {
349 name,
350 is_table: _,
351 if_not_exists,
352 include_headers: _,
353 body_format: _,
354 validate_using: _,
355 in_cluster: _,
356 }) => {
357 *name = allocate_name(name)?;
358 *if_not_exists = false;
359 }
360
361 Statement::CreateSink(CreateSinkStatement {
362 name,
363 in_cluster: _,
364 connection: _,
365 format: _,
366 envelope: _,
367 if_not_exists,
368 ..
369 }) => {
370 if let Some(name) = name {
371 *name = allocate_name(name)?;
372 }
373 *if_not_exists = false;
374 }
375
376 Statement::CreateView(CreateViewStatement {
377 temporary,
378 if_exists,
379 definition:
380 ViewDefinition {
381 name,
382 query,
383 columns: _,
384 },
385 }) => {
386 *name = if *temporary {
387 allocate_temporary_name(name)?
388 } else {
389 allocate_name(name)?
390 };
391 {
392 let mut normalizer = QueryNormalizer::new();
393 normalizer.visit_query_mut(query);
394 if let Some(err) = normalizer.err {
395 return Err(err);
396 }
397 }
398 *if_exists = IfExistsBehavior::Error;
399 }
400
401 Statement::CreateMaterializedView(CreateMaterializedViewStatement {
402 if_exists,
403 name,
404 columns: _,
405 replacement_for: _,
406 in_cluster: _,
407 in_cluster_replica: _,
408 query,
409 with_options: _,
410 as_of: _,
411 }) => {
412 *name = allocate_name(name)?;
413 {
414 let mut normalizer = QueryNormalizer::new();
415 normalizer.visit_query_mut(query);
416 if let Some(err) = normalizer.err {
417 return Err(err);
418 }
419 }
420 *if_exists = IfExistsBehavior::Error;
421 }
422
423 Statement::CreateIndex(CreateIndexStatement {
424 name: _,
425 in_cluster: _,
426 key_parts,
427 with_options: _,
428 if_not_exists,
429 ..
430 }) => {
431 let mut normalizer = QueryNormalizer::new();
432 if let Some(key_parts) = key_parts {
433 for key_part in key_parts {
434 normalizer.visit_expr_mut(key_part);
435 if let Some(err) = normalizer.err {
436 return Err(err);
437 }
438 }
439 }
440 *if_not_exists = false;
441 }
442
443 Statement::CreateType(CreateTypeStatement { name, as_type }) => {
444 *name = allocate_name(name)?;
445 let mut normalizer = QueryNormalizer::new();
446 normalizer.visit_create_type_as_mut(as_type);
447 if let Some(err) = normalizer.err {
448 return Err(err);
449 }
450 }
451 Statement::CreateSecret(CreateSecretStatement {
452 name,
453 if_not_exists,
454 value: _,
455 }) => {
456 *name = allocate_name(name)?;
457 *if_not_exists = false;
458 }
459 Statement::CreateConnection(CreateConnectionStatement {
460 name,
461 connection_type: _,
462 values,
463 with_options,
464 if_not_exists,
465 }) => {
466 *name = allocate_name(name)?;
467 *if_not_exists = false;
468
469 values.sort();
470
471 with_options
474 .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
475 }
476
477 _ => bail_internal!("unexpected statement type for normalization"),
478 }
479
480 Ok(stmt.to_ast_string_stable())
481}
482
483macro_rules! generate_extracted_config {
508 (
510 $option_ty:ty, [$($processed:tt)*],
511 ($option_name:path, $t:ty), $($tail:tt),*
512 ) => {
513 generate_extracted_config!(
514 $option_ty,
515 [$($processed)* ($option_name, Option::<$t>, None, false)],
516 $($tail),*
517 );
518 };
519 (
521 $option_ty:ty, [$($processed:tt)*],
522 ($option_name:path, $t:ty)
523 ) => {
524 generate_extracted_config!(
525 $option_ty,
526 [$($processed)* ($option_name, Option::<$t>, None, false)]
527 );
528 };
529 (
531 $option_ty:ty, [$($processed:tt)*],
532 ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*
533 ) => {
534 generate_extracted_config!(
535 $option_ty,
536 [$($processed)* ($option_name, $t, $v, false)],
537 $($tail),*
538 );
539 };
540 (
542 $option_ty:ty, [$($processed:tt)*],
543 ($option_name:path, $t:ty, Default($v:expr))
544 ) => {
545 generate_extracted_config!(
546 $option_ty,
547 [$($processed)* ($option_name, $t, $v, false)]
548 );
549 };
550 (
552 $option_ty:ty, [$($processed:tt)*],
553 ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*
554 ) => {
555 generate_extracted_config!(
556 $option_ty,
557 [$($processed)* ($option_name, $t, vec![], true)],
558 $($tail),*
559 );
560 };
561 (
563 $option_ty:ty, [$($processed:tt)*],
564 ($option_name:path, $t:ty, AllowMultiple)
565 ) => {
566 generate_extracted_config!(
567 $option_ty,
568 [$($processed)* ($option_name, $t, vec![], true)]
569 );
570 };
571 ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
572 paste::paste! {
573 #[derive(Debug)]
574 pub struct [<$option_ty Extracted>] {
575 pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
576 $(
577 pub [<$option_name:snake>]: generate_extracted_config!(
578 @ifty $allow_multiple,
579 Vec::<$t>,
580 $t
581 ),
582 )*
583 }
584
585 impl std::default::Default for [<$option_ty Extracted>] {
586 fn default() -> Self {
587 [<$option_ty Extracted>] {
588 seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
589 $(
590 [<$option_name:snake>]: <generate_extracted_config!(
591 @ifty $allow_multiple,
592 Vec::<$t>,
593 $t
594 )>::from($v),
595 )*
596 }
597 }
598 }
599
600 impl std::convert::TryFrom<Vec<$option_ty<Aug>>>
601 for [<$option_ty Extracted>]
602 {
603 type Error = $crate::plan::PlanError;
604 fn try_from(
605 v: Vec<$option_ty<Aug>>,
606 ) -> Result<[<$option_ty Extracted>], Self::Error> {
607 use [<$option_ty Name>]::*;
608 let mut extracted = [<$option_ty Extracted>]::default();
609 for option in v {
610 match option.name {
611 $(
612 $option_name => {
613 if !$allow_multiple
614 && !extracted.seen.insert(option.name.clone())
615 {
616 sql_bail!(
617 "{} specified more than once",
618 option.name.to_ast_string_simple(),
619 );
620 }
621 let val: $t = $crate::plan::with_options
622 ::TryFromValue::try_from_value(option.value)
623 .map_err(|e| sql_err!(
624 "invalid {}: {}",
625 option.name.to_ast_string_simple(),
626 e,
627 ))?;
628 generate_extracted_config!(
629 @ifexpr $allow_multiple,
630 extracted.[<$option_name:snake>].push(val),
631 extracted.[<$option_name:snake>] = val
632 );
633 }
634 )*
635 }
636 }
637 Ok(extracted)
638 }
639 }
640
641 impl [<$option_ty Extracted>] {
642 #[allow(unused)]
643 fn into_values(
644 self,
645 catalog: &dyn crate::catalog::SessionCatalog,
646 ) -> Vec<$option_ty<Aug>> {
647 use [<$option_ty Name>]::*;
648 let mut options = Vec::new();
649 $(
650 let value = self.[<$option_name:snake>];
651 let values: Vec<_> = generate_extracted_config!(
652 @ifexpr $allow_multiple,
653 value,
654 Vec::from([value])
655 );
656 for value in values {
657 let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
661 Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
662 >>::try_into_value(value, catalog);
663 match maybe_value {
664 Some(value) => {
665 let option = $option_ty {name: $option_name, value};
666 options.push(option);
667 },
668 None => (),
669 }
670 }
671 )*
672 options
673 }
674 }
675 }
676 };
677 ($option_ty:ty, $($h:tt),+) => {
678 generate_extracted_config!{$option_ty, [], $($h),+}
679 };
680 (@ifexpr false, $lhs:expr, $rhs:expr) => {
683 $rhs
684 };
685 (@ifexpr true, $lhs:expr, $rhs:expr) => {
686 $lhs
687 };
688 (@ifty false, $lhs:ty, $rhs:ty) => {
689 $rhs
690 };
691 (@ifty true, $lhs:ty, $rhs:ty) => {
692 $lhs
693 };
694}
695
696pub(crate) use generate_extracted_config;