1use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24 ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25 CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26 CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27 CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28 CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29 MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30 UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37pub fn ident(ident: Ident) -> String {
39 ident.into_string()
40}
41
42pub fn ident_ref(ident: &Ident) -> &str {
44 ident.as_str()
45}
46
47pub fn column_name(id: Ident) -> ColumnName {
49 ColumnName::from(ident(id))
50}
51
52pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54 if name.0.len() < 1 || name.0.len() > 3 {
55 return Err(PlanError::MisqualifiedName(name.to_string()));
56 }
57 let out = PartialItemName {
58 item: ident(
59 name.0
60 .pop()
61 .expect("name checked to have at least one component"),
62 ),
63 schema: name.0.pop().map(ident),
64 database: name.0.pop().map(ident),
65 };
66 assert!(name.0.is_empty());
67 Ok(out)
68}
69
70pub fn unresolved_schema_name(
72 mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74 if name.0.len() < 1 || name.0.len() > 2 {
75 return Err(PlanError::MisqualifiedName(name.to_string()));
76 }
77 let out = PartialSchemaName {
78 schema: ident(
79 name.0
80 .pop()
81 .expect("name checked to have at least one component"),
82 ),
83 database: name.0.pop().map(ident),
84 };
85 assert!(name.0.is_empty());
86 Ok(out)
87}
88
89pub fn op(op: &Op) -> Result<&str, PlanError> {
93 if let Some(namespace) = &op.namespace {
94 if namespace.len() != 0
95 && (namespace.len() != 1
96 || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97 {
98 sql_bail!(
99 "operator does not exist: {}.{}",
100 namespace.iter().map(|n| n.to_string()).join("."),
101 op.op,
102 )
103 }
104 }
105 Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110 Value(Value),
111 Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 match self {
117 SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118 SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119 }
120 }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124 fn from(s: SqlValueOrSecret) -> Self {
125 match s {
126 SqlValueOrSecret::Value(v) => Some(v),
127 SqlValueOrSecret::Secret(_id) => None,
128 }
129 }
130}
131
132pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136 let mut out = vec![];
138 if let RawDatabaseSpecifier::Name(n) = name.database {
139 out.push(Ident::new_unchecked(n));
140 }
141 out.push(Ident::new_unchecked(name.schema));
142 out.push(Ident::new_unchecked(name.item));
143 UnresolvedItemName(out)
144}
145
146pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149 match raw_name.0.len() {
150 3 => Ok(FullItemName {
151 item: ident(raw_name.0.pop().unwrap()),
152 schema: ident(raw_name.0.pop().unwrap()),
153 database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154 }),
155 2 => Ok(FullItemName {
156 item: ident(raw_name.0.pop().unwrap()),
157 schema: ident(raw_name.0.pop().unwrap()),
158 database: RawDatabaseSpecifier::Ambient,
159 }),
160 _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161 }
162}
163
164pub fn create_statement(
173 scx: &StatementContext,
174 mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176 let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177 Ok(unresolve(
178 scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179 ))
180 };
181
182 let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183 Ok(unresolve(scx.allocate_temporary_full_name(
184 unresolved_item_name(name.clone())?,
185 )))
186 };
187
188 struct QueryNormalizer {
189 ctes: Vec<Ident>,
190 err: Option<PlanError>,
191 }
192
193 impl QueryNormalizer {
194 fn new() -> QueryNormalizer {
195 QueryNormalizer {
196 ctes: vec![],
197 err: None,
198 }
199 }
200 }
201
202 impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203 fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204 let n = self.ctes.len();
205 match &query.ctes {
206 CteBlock::Simple(ctes) => {
207 for cte in ctes.iter() {
208 self.ctes.push(cte.alias.name.clone());
209 }
210 }
211 CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212 for cte in ctes.iter() {
213 self.ctes.push(cte.name.clone());
214 }
215 }
216 }
217 visit_mut::visit_query_mut(self, query);
218 self.ctes.truncate(n);
219 }
220
221 fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222 match &mut func.args {
223 FunctionArgs::Star => (),
224 FunctionArgs::Args { args, order_by } => {
225 for arg in args {
226 self.visit_expr_mut(arg);
227 }
228 for expr in order_by {
229 self.visit_order_by_expr_mut(expr);
230 }
231 }
232 }
233 if let Some(over) = &mut func.over {
234 self.visit_window_spec_mut(over);
235 }
236 }
237
238 fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239 match table_factor {
240 TableFactor::Table { name, alias, .. } => {
241 self.visit_item_name_mut(name);
242 if let Some(alias) = alias {
243 self.visit_table_alias_mut(alias);
244 }
245 }
246 _ => visit_mut::visit_table_factor_mut(self, table_factor),
249 }
250 }
251 }
252
253 match &mut stmt {
264 Statement::CreateSource(CreateSourceStatement {
265 name,
266 in_cluster: _,
267 col_names: _,
268 connection: _,
269 format: _,
270 include_metadata: _,
271 envelope: _,
272 if_not_exists,
273 key_constraint: _,
274 with_options: _,
275 external_references: _,
276 progress_subsource: _,
277 }) => {
278 *name = allocate_name(name)?;
279 *if_not_exists = false;
280 }
281
282 Statement::CreateSubsource(CreateSubsourceStatement {
283 name,
284 columns,
285 constraints: _,
286 of_source: _,
287 if_not_exists,
288 with_options: _,
289 }) => {
290 *name = allocate_name(name)?;
291 let mut normalizer = QueryNormalizer::new();
292 for c in columns {
293 normalizer.visit_column_def_mut(c);
294 }
295 if let Some(err) = normalizer.err {
296 return Err(err);
297 }
298 *if_not_exists = false;
299 }
300
301 Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302 name,
303 columns,
304 constraints: _,
305 external_reference: _,
306 source: _,
307 if_not_exists,
308 format: _,
309 include_metadata: _,
310 envelope: _,
311 with_options: _,
312 }) => {
313 *name = allocate_name(name)?;
314 let mut normalizer = QueryNormalizer::new();
315 if let TableFromSourceColumns::Defined(columns) = columns {
316 for c in columns {
317 normalizer.visit_column_def_mut(c);
318 }
319 }
320 if let Some(err) = normalizer.err {
321 return Err(err);
322 }
323 *if_not_exists = false;
324 }
325
326 Statement::CreateTable(CreateTableStatement {
327 name,
328 columns,
329 constraints: _,
330 if_not_exists,
331 temporary,
332 with_options: _,
333 }) => {
334 *name = if *temporary {
335 allocate_temporary_name(name)?
336 } else {
337 allocate_name(name)?
338 };
339 let mut normalizer = QueryNormalizer::new();
340 for c in columns {
341 normalizer.visit_column_def_mut(c);
342 }
343 if let Some(err) = normalizer.err {
344 return Err(err);
345 }
346 *if_not_exists = false;
347 }
348
349 Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350 name,
351 is_table: _,
352 if_not_exists,
353 include_headers: _,
354 body_format: _,
355 validate_using: _,
356 in_cluster: _,
357 }) => {
358 *name = allocate_name(name)?;
359 *if_not_exists = false;
360 }
361
362 Statement::CreateSink(CreateSinkStatement {
363 name,
364 in_cluster: _,
365 connection: _,
366 format: _,
367 envelope: _,
368 if_not_exists,
369 ..
370 }) => {
371 if let Some(name) = name {
372 *name = allocate_name(name)?;
373 }
374 *if_not_exists = false;
375 }
376
377 Statement::CreateView(CreateViewStatement {
378 temporary,
379 if_exists,
380 definition:
381 ViewDefinition {
382 name,
383 query,
384 columns: _,
385 },
386 }) => {
387 *name = if *temporary {
388 allocate_temporary_name(name)?
389 } else {
390 allocate_name(name)?
391 };
392 {
393 let mut normalizer = QueryNormalizer::new();
394 normalizer.visit_query_mut(query);
395 if let Some(err) = normalizer.err {
396 return Err(err);
397 }
398 }
399 *if_exists = IfExistsBehavior::Error;
400 }
401
402 Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403 if_exists,
404 name,
405 columns: _,
406 in_cluster: _,
407 query,
408 with_options: _,
409 as_of: _,
410 }) => {
411 *name = allocate_name(name)?;
412 {
413 let mut normalizer = QueryNormalizer::new();
414 normalizer.visit_query_mut(query);
415 if let Some(err) = normalizer.err {
416 return Err(err);
417 }
418 }
419 *if_exists = IfExistsBehavior::Error;
420 }
421
422 Statement::CreateContinualTask(CreateContinualTaskStatement {
423 name,
424 columns: _,
425 input,
426 with_options: _,
427 stmts,
428 in_cluster: _,
429 as_of: _,
430 sugar,
431 }) => {
432 let mut normalizer = QueryNormalizer::new();
433 normalizer.visit_item_name_mut(name);
434 normalizer.visit_item_name_mut(input);
435 for stmt in stmts {
436 match stmt {
437 ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
438 ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
439 }
440 }
441 match sugar {
442 Some(CreateContinualTaskSugar::Transform { transform }) => {
443 normalizer.visit_query_mut(transform)
444 }
445 Some(CreateContinualTaskSugar::Retain { retain }) => {
446 normalizer.visit_expr_mut(retain)
447 }
448 None => {}
449 }
450 if let Some(err) = normalizer.err {
451 return Err(err);
452 }
453 }
454
455 Statement::CreateIndex(CreateIndexStatement {
456 name: _,
457 in_cluster: _,
458 key_parts,
459 with_options: _,
460 if_not_exists,
461 ..
462 }) => {
463 let mut normalizer = QueryNormalizer::new();
464 if let Some(key_parts) = key_parts {
465 for key_part in key_parts {
466 normalizer.visit_expr_mut(key_part);
467 if let Some(err) = normalizer.err {
468 return Err(err);
469 }
470 }
471 }
472 *if_not_exists = false;
473 }
474
475 Statement::CreateType(CreateTypeStatement { name, as_type }) => {
476 *name = allocate_name(name)?;
477 let mut normalizer = QueryNormalizer::new();
478 normalizer.visit_create_type_as_mut(as_type);
479 if let Some(err) = normalizer.err {
480 return Err(err);
481 }
482 }
483 Statement::CreateSecret(CreateSecretStatement {
484 name,
485 if_not_exists,
486 value: _,
487 }) => {
488 *name = allocate_name(name)?;
489 *if_not_exists = false;
490 }
491 Statement::CreateConnection(CreateConnectionStatement {
492 name,
493 connection_type: _,
494 values,
495 with_options,
496 if_not_exists,
497 }) => {
498 *name = allocate_name(name)?;
499 *if_not_exists = false;
500
501 values.sort();
502
503 with_options
506 .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
507 }
508
509 _ => unreachable!(),
510 }
511
512 Ok(stmt.to_ast_string_stable())
513}
514
515macro_rules! generate_extracted_config {
540 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty), $($tail:tt),*) => {
542 generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)], $(
543 $tail
544 ),*);
545 };
546 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty)) => {
548 generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)]);
549 };
550 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*) => {
552 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)], $(
553 $tail
554 ),*);
555 };
556 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr))) => {
558 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)]);
559 };
560 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*) => {
562 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)], $(
563 $tail
564 ),*);
565 };
566 ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple)) => {
568 generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)]);
569 };
570 ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
571 paste::paste! {
572 #[derive(Debug)]
573 pub struct [<$option_ty Extracted>] {
574 pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
575 $(
576 pub [<$option_name:snake>]: generate_extracted_config!(
577 @ifty $allow_multiple,
578 Vec::<$t>,
579 $t
580 ),
581 )*
582 }
583
584 impl std::default::Default for [<$option_ty Extracted>] {
585 fn default() -> Self {
586 [<$option_ty Extracted>] {
587 seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
588 $(
589 [<$option_name:snake>]: <generate_extracted_config!(
590 @ifty $allow_multiple,
591 Vec::<$t>,
592 $t
593 )>::from($v),
594 )*
595 }
596 }
597 }
598
599 impl std::convert::TryFrom<Vec<$option_ty<Aug>>> for [<$option_ty Extracted>] {
600 type Error = $crate::plan::PlanError;
601 fn try_from(v: Vec<$option_ty<Aug>>) -> Result<[<$option_ty Extracted>], Self::Error> {
602 use [<$option_ty Name>]::*;
603 let mut extracted = [<$option_ty Extracted>]::default();
604 for option in v {
605 match option.name {
606 $(
607 $option_name => {
608 if !$allow_multiple && !extracted.seen.insert(option.name.clone()) {
609 sql_bail!("{} specified more than once", option.name.to_ast_string_simple());
610 }
611 let val: $t = $crate::plan::with_options::TryFromValue::try_from_value(option.value)
612 .map_err(|e| sql_err!("invalid {}: {}", option.name.to_ast_string_simple(), e))?;
613 generate_extracted_config!(
614 @ifexpr $allow_multiple,
615 extracted.[<$option_name:snake>].push(val),
616 extracted.[<$option_name:snake>] = val
617 );
618 }
619 )*
620 }
621 }
622 Ok(extracted)
623 }
624 }
625
626 impl [<$option_ty Extracted>] {
627 #[allow(unused)]
628 fn into_values(self, catalog: &dyn crate::catalog::SessionCatalog) -> Vec<$option_ty<Aug>> {
629 use [<$option_ty Name>]::*;
630 let mut options = Vec::new();
631 $(
632 let value = self.[<$option_name:snake>];
633 let values: Vec<_> = generate_extracted_config!(
634 @ifexpr $allow_multiple,
635 value,
636 Vec::from([value])
637 );
638 for value in values {
639 let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
643 Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
644 >>::try_into_value(value, catalog);
645 match maybe_value {
646 Some(value) => {
647 let option = $option_ty {name: $option_name, value};
648 options.push(option);
649 },
650 None => (),
651 }
652 }
653 )*
654 options
655 }
656 }
657 }
658 };
659 ($option_ty:ty, $($h:tt),+) => {
660 generate_extracted_config!{$option_ty, [], $($h),+}
661 };
662 (@ifexpr false, $lhs:expr, $rhs:expr) => {
665 $rhs
666 };
667 (@ifexpr true, $lhs:expr, $rhs:expr) => {
668 $lhs
669 };
670 (@ifty false, $lhs:ty, $rhs:ty) => {
671 $rhs
672 };
673 (@ifty true, $lhs:ty, $rhs:ty) => {
674 $lhs
675 };
676}
677
678pub(crate) use generate_extracted_config;