1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155 ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156 ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157 CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158 CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165 WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 duration
906 }
907 None => scx.catalog.config().timestamp_interval,
908 };
909
910 let (desc, data_source) = match progress_subsource {
911 Some(name) => {
912 let DeferredItemName::Named(name) = name else {
913 sql_bail!("[internal error] progress subsource must be named during purification");
914 };
915 let ResolvedItemName::Item { id, .. } = name else {
916 sql_bail!("[internal error] invalid target id");
917 };
918
919 let details = match external_connection {
920 GenericSourceConnection::Kafka(ref c) => {
921 SourceExportDetails::Kafka(KafkaSourceExportDetails {
922 metadata_columns: c.metadata_columns.clone(),
923 })
924 }
925 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926 LoadGenerator::Auction
927 | LoadGenerator::Marketing
928 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929 LoadGenerator::Counter { .. }
930 | LoadGenerator::Clock
931 | LoadGenerator::Datums
932 | LoadGenerator::KeyValue(_) => {
933 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934 output: LoadGeneratorOutput::Default,
935 })
936 }
937 },
938 GenericSourceConnection::Postgres(_)
939 | GenericSourceConnection::MySql(_)
940 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941 };
942
943 let data_source = DataSourceDesc::OldSyntaxIngestion {
944 desc: SourceDesc {
945 connection: external_connection,
946 timestamp_interval,
947 },
948 progress_subsource: *id,
949 data_config: SourceExportDataConfig {
950 encoding,
951 envelope: envelope.clone(),
952 },
953 details,
954 };
955 (desc, data_source)
956 }
957 None => {
958 let desc = external_connection.timestamp_desc();
959 let data_source = DataSourceDesc::Ingestion(SourceDesc {
960 connection: external_connection,
961 timestamp_interval,
962 });
963 (desc, data_source)
964 }
965 };
966
967 let if_not_exists = *if_not_exists;
968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970 let full_name = scx.catalog.resolve_full_name(&name);
972 let partial_name = PartialItemName::from(full_name.clone());
973 if let (false, Ok(item)) = (
976 if_not_exists,
977 scx.catalog.resolve_item_or_type(&partial_name),
978 ) {
979 return Err(PlanError::ItemAlreadyExists {
980 name: full_name.to_string(),
981 item_type: item.item_type(),
982 });
983 }
984
985 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992 let timeline = match envelope {
994 SourceEnvelope::CdcV2 => {
995 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996 }
997 _ => Timeline::EpochMilliseconds,
998 };
999
1000 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002 let source = Source {
1003 create_sql,
1004 data_source,
1005 desc,
1006 compaction_window,
1007 };
1008
1009 Ok(Plan::CreateSource(CreateSourcePlan {
1010 name,
1011 source,
1012 if_not_exists,
1013 timeline,
1014 in_cluster: Some(in_cluster.id()),
1015 }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019 scx: &StatementContext<'_>,
1020 source_connection: &CreateSourceConnection<Aug>,
1021 include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023 Ok(match source_connection {
1024 CreateSourceConnection::Kafka {
1025 connection,
1026 options,
1027 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028 scx,
1029 connection,
1030 options,
1031 include_metadata,
1032 )?),
1033 CreateSourceConnection::Postgres {
1034 connection,
1035 options,
1036 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037 scx, connection, options,
1038 )?),
1039 CreateSourceConnection::SqlServer {
1040 connection,
1041 options,
1042 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043 scx, connection, options,
1044 )?),
1045 CreateSourceConnection::MySql {
1046 connection,
1047 options,
1048 } => {
1049 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053 scx,
1054 generator,
1055 options,
1056 include_metadata,
1057 )?)
1058 }
1059 })
1060}
1061
1062fn plan_load_generator_source_connection(
1063 scx: &StatementContext<'_>,
1064 generator: &ast::LoadGenerator,
1065 options: &Vec<LoadGeneratorOption<Aug>>,
1066 include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068 let load_generator =
1069 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070 let LoadGeneratorOptionExtracted {
1071 tick_interval,
1072 as_of,
1073 up_to,
1074 ..
1075 } = options.clone().try_into()?;
1076 let tick_micros = match tick_interval {
1077 Some(interval) => Some(interval.as_micros().try_into()?),
1078 None => None,
1079 };
1080 if up_to < as_of {
1081 sql_bail!("UP TO cannot be less than AS OF");
1082 }
1083 Ok(LoadGeneratorSourceConnection {
1084 load_generator,
1085 tick_micros,
1086 as_of,
1087 up_to,
1088 })
1089}
1090
1091fn plan_mysql_source_connection(
1092 scx: &StatementContext<'_>,
1093 connection: &ResolvedItemName,
1094 options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096 let connection_item = scx.get_item_by_resolved_name(connection)?;
1097 match connection_item.connection()? {
1098 Connection::MySql(connection) => connection,
1099 _ => sql_bail!(
1100 "{} is not a MySQL connection",
1101 scx.catalog.resolve_full_name(connection_item.name())
1102 ),
1103 };
1104 let MySqlConfigOptionExtracted {
1105 details,
1106 text_columns: _,
1110 exclude_columns: _,
1111 seen: _,
1112 } = options.clone().try_into()?;
1113 let details = details
1114 .as_ref()
1115 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119 Ok(MySqlSourceConnection {
1120 connection: connection_item.id(),
1121 connection_id: connection_item.id(),
1122 details,
1123 })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127 scx: &StatementContext<'_>,
1128 connection: &ResolvedItemName,
1129 options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131 let connection_item = scx.get_item_by_resolved_name(connection)?;
1132 match connection_item.connection()? {
1133 Connection::SqlServer(connection) => connection,
1134 _ => sql_bail!(
1135 "{} is not a SQL Server connection",
1136 scx.catalog.resolve_full_name(connection_item.name())
1137 ),
1138 };
1139 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140 let details = details
1141 .as_ref()
1142 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143 let extras = hex::decode(details)
1144 .map_err(|e| sql_err!("{e}"))
1145 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147 Ok(SqlServerSourceConnection {
1148 connection_id: connection_item.id(),
1149 connection: connection_item.id(),
1150 extras,
1151 })
1152}
1153
1154fn plan_postgres_source_connection(
1155 scx: &StatementContext<'_>,
1156 connection: &ResolvedItemName,
1157 options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159 let connection_item = scx.get_item_by_resolved_name(connection)?;
1160 let PgConfigOptionExtracted {
1161 details,
1162 publication,
1163 text_columns: _,
1167 exclude_columns: _,
1171 seen: _,
1172 } = options.clone().try_into()?;
1173 let details = details
1174 .as_ref()
1175 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177 let details =
1178 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179 let publication_details =
1180 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181 Ok(PostgresSourceConnection {
1182 connection: connection_item.id(),
1183 connection_id: connection_item.id(),
1184 publication: publication.expect("validated exists during purification"),
1185 publication_details,
1186 })
1187}
1188
1189fn plan_kafka_source_connection(
1190 scx: &StatementContext<'_>,
1191 connection_name: &ResolvedItemName,
1192 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193 include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197 sql_bail!(
1198 "{} is not a kafka connection",
1199 scx.catalog.resolve_full_name(connection_item.name())
1200 )
1201 }
1202 let KafkaSourceConfigOptionExtracted {
1203 group_id_prefix,
1204 topic,
1205 topic_metadata_refresh_interval,
1206 start_timestamp: _, start_offset,
1208 seen: _,
1209 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210 let topic = topic.expect("validated exists during purification");
1211 let mut start_offsets = BTreeMap::new();
1212 if let Some(offsets) = start_offset {
1213 for (part, offset) in offsets.iter().enumerate() {
1214 if *offset < 0 {
1215 sql_bail!("START OFFSET must be a nonnegative integer");
1216 }
1217 start_offsets.insert(i32::try_from(part)?, *offset);
1218 }
1219 }
1220 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224 }
1225 let metadata_columns = include_metadata
1226 .into_iter()
1227 .flat_map(|item| match item {
1228 SourceIncludeMetadata::Timestamp { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "timestamp".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Timestamp))
1234 }
1235 SourceIncludeMetadata::Partition { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "partition".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Partition))
1241 }
1242 SourceIncludeMetadata::Offset { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "offset".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Offset))
1248 }
1249 SourceIncludeMetadata::Headers { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "headers".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Headers))
1255 }
1256 SourceIncludeMetadata::Header {
1257 alias,
1258 key,
1259 use_bytes,
1260 } => Some((
1261 alias.to_string(),
1262 KafkaMetadataKind::Header {
1263 key: key.clone(),
1264 use_bytes: *use_bytes,
1265 },
1266 )),
1267 SourceIncludeMetadata::Key { .. } => {
1268 None
1270 }
1271 })
1272 .collect();
1273 Ok(KafkaSourceConnection {
1274 connection: connection_item.id(),
1275 connection_id: connection_item.id(),
1276 topic,
1277 start_offsets,
1278 group_id_prefix,
1279 topic_metadata_refresh_interval,
1280 metadata_columns,
1281 })
1282}
1283
1284fn apply_source_envelope_encoding(
1285 scx: &StatementContext,
1286 envelope: &ast::SourceEnvelope,
1287 format: &Option<FormatSpecifier<Aug>>,
1288 key_desc: Option<RelationDesc>,
1289 value_desc: RelationDesc,
1290 include_metadata: &[SourceIncludeMetadata],
1291 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292 source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294 (
1295 RelationDesc,
1296 SourceEnvelope,
1297 Option<SourceDataEncoding<ReferencedConnection>>,
1298 ),
1299 PlanError,
1300> {
1301 let encoding = match format {
1302 Some(format) => Some(get_encoding(scx, format, envelope)?),
1303 None => None,
1304 };
1305
1306 let (key_desc, value_desc) = match &encoding {
1307 Some(encoding) => {
1308 match value_desc.typ().columns() {
1311 [typ] => match typ.scalar_type {
1312 SqlScalarType::Bytes => {}
1313 _ => sql_bail!(
1314 "The schema produced by the source is incompatible with format decoding"
1315 ),
1316 },
1317 _ => sql_bail!(
1318 "The schema produced by the source is incompatible with format decoding"
1319 ),
1320 }
1321
1322 let (key_desc, value_desc) = encoding.desc()?;
1323
1324 let key_desc = key_desc.map(|desc| {
1331 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333 if is_kafka && is_envelope_none {
1334 RelationDesc::from_names_and_types(
1335 desc.into_iter()
1336 .map(|(name, typ)| (name, typ.nullable(true))),
1337 )
1338 } else {
1339 desc
1340 }
1341 });
1342 (key_desc, value_desc)
1343 }
1344 None => (key_desc, value_desc),
1345 };
1346
1347 let key_envelope_no_encoding = matches!(
1360 source_connection,
1361 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362 load_generator: LoadGenerator::KeyValue(_),
1363 ..
1364 })
1365 );
1366 let mut key_envelope = get_key_envelope(
1367 include_metadata,
1368 encoding.as_ref(),
1369 key_envelope_no_encoding,
1370 )?;
1371
1372 match (&envelope, &key_envelope) {
1373 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376 ),
1377 _ => {}
1378 };
1379
1380 let envelope = match &envelope {
1389 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391 ast::SourceEnvelope::Debezium => {
1392 let after_idx = match typecheck_debezium(&value_desc) {
1394 Ok((_before_idx, after_idx)) => Ok(after_idx),
1395 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396 Some(DataEncoding::Avro(_)) => Err(type_err),
1397 _ => Err(sql_err!(
1398 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399 )),
1400 },
1401 }?;
1402
1403 UnplannedSourceEnvelope::Upsert {
1404 style: UpsertStyle::Debezium { after_idx },
1405 }
1406 }
1407 ast::SourceEnvelope::Upsert {
1408 value_decode_err_policy,
1409 } => {
1410 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411 None => {
1412 if !key_envelope_no_encoding {
1413 bail_unsupported!(format!(
1414 "UPSERT requires a key/value format: {:?}",
1415 format
1416 ))
1417 }
1418 None
1419 }
1420 Some(key_encoding) => Some(key_encoding),
1421 };
1422 if key_envelope == KeyEnvelope::None {
1425 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426 }
1427 let style = match value_decode_err_policy.as_slice() {
1429 [] => UpsertStyle::Default(key_envelope),
1430 [SourceErrorPolicy::Inline { alias }] => {
1431 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432 UpsertStyle::ValueErrInline {
1433 key_envelope,
1434 error_column: alias
1435 .as_ref()
1436 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437 }
1438 }
1439 _ => {
1440 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441 }
1442 };
1443
1444 UnplannedSourceEnvelope::Upsert { style }
1445 }
1446 ast::SourceEnvelope::CdcV2 => {
1447 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448 match format {
1450 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452 }
1453 UnplannedSourceEnvelope::CdcV2
1454 }
1455 };
1456
1457 let metadata_desc = included_column_desc(metadata_columns_desc);
1458 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460 Ok((desc, envelope, encoding))
1461}
1462
1463fn plan_source_export_desc(
1466 scx: &StatementContext,
1467 name: &UnresolvedItemName,
1468 columns: &Vec<ColumnDef<Aug>>,
1469 constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471 let names: Vec<_> = columns
1472 .iter()
1473 .map(|c| normalize::column_name(c.name.clone()))
1474 .collect();
1475
1476 if let Some(dup) = names.iter().duplicates().next() {
1477 sql_bail!("column {} specified more than once", dup.quoted());
1478 }
1479
1480 let mut column_types = Vec::with_capacity(columns.len());
1483 let mut keys = Vec::new();
1484
1485 for (i, c) in columns.into_iter().enumerate() {
1486 let aug_data_type = &c.data_type;
1487 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488 let mut nullable = true;
1489 for option in &c.options {
1490 match &option.option {
1491 ColumnOption::NotNull => nullable = false,
1492 ColumnOption::Default(_) => {
1493 bail_unsupported!("Source export with default value")
1494 }
1495 ColumnOption::Unique { is_primary } => {
1496 keys.push(vec![i]);
1497 if *is_primary {
1498 nullable = false;
1499 }
1500 }
1501 other => {
1502 bail_unsupported!(format!("Source export with column constraint: {}", other))
1503 }
1504 }
1505 }
1506 column_types.push(ty.nullable(nullable));
1507 }
1508
1509 let mut seen_primary = false;
1510 'c: for constraint in constraints {
1511 match constraint {
1512 TableConstraint::Unique {
1513 name: _,
1514 columns,
1515 is_primary,
1516 nulls_not_distinct,
1517 } => {
1518 if seen_primary && *is_primary {
1519 sql_bail!(
1520 "multiple primary keys for source export {} are not allowed",
1521 name.to_ast_string_stable()
1522 );
1523 }
1524 seen_primary = *is_primary || seen_primary;
1525
1526 let mut key = vec![];
1527 for column in columns {
1528 let column = normalize::column_name(column.clone());
1529 match names.iter().position(|name| *name == column) {
1530 None => sql_bail!("unknown column in constraint: {}", column),
1531 Some(i) => {
1532 let nullable = &mut column_types[i].nullable;
1533 if *is_primary {
1534 if *nulls_not_distinct {
1535 sql_bail!(
1536 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537 );
1538 }
1539 *nullable = false;
1540 } else if !(*nulls_not_distinct || !*nullable) {
1541 break 'c;
1544 }
1545
1546 key.push(i);
1547 }
1548 }
1549 }
1550
1551 if *is_primary {
1552 keys.insert(0, key);
1553 } else {
1554 keys.push(key);
1555 }
1556 }
1557 TableConstraint::ForeignKey { .. } => {
1558 bail_unsupported!("Source export with a foreign key")
1559 }
1560 TableConstraint::Check { .. } => {
1561 bail_unsupported!("Source export with a check constraint")
1562 }
1563 }
1564 }
1565
1566 let typ = SqlRelationType::new(column_types).with_keys(keys);
1567 let desc = RelationDesc::new(typ, names);
1568 Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572 CreateSubsourceOption,
1573 (Progress, bool, Default(false)),
1574 (ExternalReference, UnresolvedItemName),
1575 (RetainHistory, OptionalDuration),
1576 (TextColumns, Vec::<Ident>, Default(vec![])),
1577 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578 (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582 scx: &StatementContext,
1583 stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585 let CreateSubsourceStatement {
1586 name,
1587 columns,
1588 of_source,
1589 constraints,
1590 if_not_exists,
1591 with_options,
1592 } = &stmt;
1593
1594 let CreateSubsourceOptionExtracted {
1595 progress,
1596 retain_history,
1597 external_reference,
1598 text_columns,
1599 exclude_columns,
1600 details,
1601 seen: _,
1602 } = with_options.clone().try_into()?;
1603
1604 assert!(
1609 progress ^ (external_reference.is_some() && of_source.is_some()),
1610 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611 );
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.unwrap();
1630
1631 let details = details
1634 .as_ref()
1635 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637 let details =
1638 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641 let details = match details {
1642 SourceExportStatementDetails::Postgres { table } => {
1643 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644 column_casts: crate::pure::postgres::generate_column_casts(
1645 scx,
1646 &table,
1647 &text_columns,
1648 )?,
1649 table,
1650 })
1651 }
1652 SourceExportStatementDetails::MySql {
1653 table,
1654 initial_gtid_set,
1655 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656 table,
1657 initial_gtid_set,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::SqlServer {
1665 table,
1666 capture_instance,
1667 initial_lsn,
1668 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669 capture_instance,
1670 table,
1671 initial_lsn,
1672 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673 exclude_columns: exclude_columns
1674 .into_iter()
1675 .map(|c| c.into_string())
1676 .collect(),
1677 }),
1678 SourceExportStatementDetails::LoadGenerator { output } => {
1679 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680 }
1681 SourceExportStatementDetails::Kafka {} => {
1682 bail_unsupported!("subsources cannot reference Kafka sources")
1683 }
1684 };
1685 DataSourceDesc::IngestionExport {
1686 ingestion_id,
1687 external_reference,
1688 details,
1689 data_config: SourceExportDataConfig {
1691 envelope: SourceEnvelope::None(NoneEnvelope {
1692 key_envelope: KeyEnvelope::None,
1693 key_arity: 0,
1694 }),
1695 encoding: None,
1696 },
1697 }
1698 } else if progress {
1699 DataSourceDesc::Progress
1700 } else {
1701 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702 };
1703
1704 let if_not_exists = *if_not_exists;
1705 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710 let source = Source {
1711 create_sql,
1712 data_source,
1713 desc,
1714 compaction_window,
1715 };
1716
1717 Ok(Plan::CreateSource(CreateSourcePlan {
1718 name,
1719 source,
1720 if_not_exists,
1721 timeline: Timeline::EpochMilliseconds,
1722 in_cluster: None,
1723 }))
1724}
1725
1726generate_extracted_config!(
1727 TableFromSourceOption,
1728 (TextColumns, Vec::<Ident>, Default(vec![])),
1729 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730 (PartitionBy, Vec<Ident>),
1731 (RetainHistory, OptionalDuration),
1732 (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736 scx: &StatementContext,
1737 stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739 if !scx.catalog.system_vars().enable_create_table_from_source() {
1740 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741 }
1742
1743 let CreateTableFromSourceStatement {
1744 name,
1745 columns,
1746 constraints,
1747 if_not_exists,
1748 source,
1749 external_reference,
1750 envelope,
1751 format,
1752 include_metadata,
1753 with_options,
1754 } = &stmt;
1755
1756 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758 let TableFromSourceOptionExtracted {
1759 text_columns,
1760 exclude_columns,
1761 retain_history,
1762 partition_by,
1763 details,
1764 seen: _,
1765 } = with_options.clone().try_into()?;
1766
1767 let source_item = scx.get_item_by_resolved_name(source)?;
1768 let ingestion_id = source_item.id();
1769
1770 let details = details
1773 .as_ref()
1774 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776 let details =
1777 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778 let details =
1779 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782 && include_metadata
1783 .iter()
1784 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785 {
1786 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788 }
1789 if !matches!(
1790 details,
1791 SourceExportStatementDetails::Kafka { .. }
1792 | SourceExportStatementDetails::LoadGenerator { .. }
1793 ) && !include_metadata.is_empty()
1794 {
1795 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796 }
1797
1798 let details = match details {
1799 SourceExportStatementDetails::Postgres { table } => {
1800 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801 column_casts: crate::pure::postgres::generate_column_casts(
1802 scx,
1803 &table,
1804 &text_columns,
1805 )?,
1806 table,
1807 })
1808 }
1809 SourceExportStatementDetails::MySql {
1810 table,
1811 initial_gtid_set,
1812 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813 table,
1814 initial_gtid_set,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::SqlServer {
1822 table,
1823 capture_instance,
1824 initial_lsn,
1825 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826 table,
1827 capture_instance,
1828 initial_lsn,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 }),
1835 SourceExportStatementDetails::LoadGenerator { output } => {
1836 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837 }
1838 SourceExportStatementDetails::Kafka {} => {
1839 if !include_metadata.is_empty()
1840 && !matches!(
1841 envelope,
1842 ast::SourceEnvelope::Upsert { .. }
1843 | ast::SourceEnvelope::None
1844 | ast::SourceEnvelope::Debezium
1845 )
1846 {
1847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849 }
1850
1851 let metadata_columns = include_metadata
1852 .into_iter()
1853 .flat_map(|item| match item {
1854 SourceIncludeMetadata::Timestamp { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "timestamp".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Timestamp))
1860 }
1861 SourceIncludeMetadata::Partition { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "partition".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Partition))
1867 }
1868 SourceIncludeMetadata::Offset { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "offset".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Offset))
1874 }
1875 SourceIncludeMetadata::Headers { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "headers".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Headers))
1881 }
1882 SourceIncludeMetadata::Header {
1883 alias,
1884 key,
1885 use_bytes,
1886 } => Some((
1887 alias.to_string(),
1888 KafkaMetadataKind::Header {
1889 key: key.clone(),
1890 use_bytes: *use_bytes,
1891 },
1892 )),
1893 SourceIncludeMetadata::Key { .. } => {
1894 None
1896 }
1897 })
1898 .collect();
1899
1900 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901 }
1902 };
1903
1904 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906 let (key_desc, value_desc) =
1911 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912 let columns = match columns {
1913 TableFromSourceColumns::Defined(columns) => columns,
1914 _ => unreachable!(),
1915 };
1916 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917 (None, desc)
1918 } else {
1919 let key_desc = source_connection.default_key_desc();
1920 let value_desc = source_connection.default_value_desc();
1921 (Some(key_desc), value_desc)
1922 };
1923
1924 let metadata_columns_desc = match &details {
1925 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926 metadata_columns, ..
1927 }) => kafka_metadata_columns_desc(metadata_columns),
1928 _ => vec![],
1929 };
1930
1931 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932 scx,
1933 &envelope,
1934 format,
1935 key_desc,
1936 value_desc,
1937 include_metadata,
1938 metadata_columns_desc,
1939 source_connection,
1940 )?;
1941 if let TableFromSourceColumns::Named(col_names) = columns {
1942 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943 }
1944
1945 let names: Vec<_> = desc.iter_names().cloned().collect();
1946 if let Some(dup) = names.iter().duplicates().next() {
1947 sql_bail!("column {} specified more than once", dup.quoted());
1948 }
1949
1950 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952 let timeline = match envelope {
1955 SourceEnvelope::CdcV2 => {
1956 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957 }
1958 _ => Timeline::EpochMilliseconds,
1959 };
1960
1961 if let Some(partition_by) = partition_by {
1962 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963 check_partition_by(&desc, partition_by)?;
1964 }
1965
1966 let data_source = DataSourceDesc::IngestionExport {
1967 ingestion_id,
1968 external_reference: external_reference
1969 .as_ref()
1970 .expect("populated in purification")
1971 .clone(),
1972 details,
1973 data_config: SourceExportDataConfig { envelope, encoding },
1974 };
1975
1976 let if_not_exists = *if_not_exists;
1977
1978 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981 let table = Table {
1982 create_sql,
1983 desc: VersionedRelationDesc::new(desc),
1984 temporary: false,
1985 compaction_window,
1986 data_source: TableDataSource::DataSource {
1987 desc: data_source,
1988 timeline,
1989 },
1990 };
1991
1992 Ok(Plan::CreateTable(CreateTablePlan {
1993 name,
1994 table,
1995 if_not_exists,
1996 }))
1997}
1998
1999generate_extracted_config!(
2000 LoadGeneratorOption,
2001 (TickInterval, Duration),
2002 (AsOf, u64, Default(0_u64)),
2003 (UpTo, u64, Default(u64::MAX)),
2004 (ScaleFactor, f64),
2005 (MaxCardinality, u64),
2006 (Keys, u64),
2007 (SnapshotRounds, u64),
2008 (TransactionalSnapshot, bool),
2009 (ValueSize, u64),
2010 (Seed, u64),
2011 (Partitions, u64),
2012 (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016 pub(super) fn ensure_only_valid_options(
2017 &self,
2018 loadgen: &ast::LoadGenerator,
2019 ) -> Result<(), PlanError> {
2020 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022 let mut options = self.seen.clone();
2023
2024 let permitted_options: &[_] = match loadgen {
2025 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031 ast::LoadGenerator::KeyValue => &[
2032 TickInterval,
2033 Keys,
2034 SnapshotRounds,
2035 TransactionalSnapshot,
2036 ValueSize,
2037 Seed,
2038 Partitions,
2039 BatchSize,
2040 ],
2041 };
2042
2043 for o in permitted_options {
2044 options.remove(o);
2045 }
2046
2047 if !options.is_empty() {
2048 sql_bail!(
2049 "{} load generators do not support {} values",
2050 loadgen,
2051 options.iter().join(", ")
2052 )
2053 }
2054
2055 Ok(())
2056 }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060 scx: &StatementContext,
2061 loadgen: &ast::LoadGenerator,
2062 options: &[LoadGeneratorOption<Aug>],
2063 include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066 extracted.ensure_only_valid_options(loadgen)?;
2067
2068 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070 }
2071
2072 let load_generator = match loadgen {
2073 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074 ast::LoadGenerator::Clock => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076 LoadGenerator::Clock
2077 }
2078 ast::LoadGenerator::Counter => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080 let LoadGeneratorOptionExtracted {
2081 max_cardinality, ..
2082 } = extracted;
2083 LoadGenerator::Counter { max_cardinality }
2084 }
2085 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086 ast::LoadGenerator::Datums => {
2087 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088 LoadGenerator::Datums
2089 }
2090 ast::LoadGenerator::Tpch => {
2091 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093 let sf: f64 = scale_factor.unwrap_or(0.01);
2095 if !sf.is_finite() || sf < 0.0 {
2096 sql_bail!("unsupported scale factor {sf}");
2097 }
2098
2099 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100 let total = (sf * multiplier).floor();
2101 let mut i = i64::try_cast_from(total)
2102 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103 if i < 1 {
2104 i = 1;
2105 }
2106 Ok(i)
2107 };
2108
2109 let count_supplier = f_to_i(10_000f64)?;
2112 let count_part = f_to_i(200_000f64)?;
2113 let count_customer = f_to_i(150_000f64)?;
2114 let count_orders = f_to_i(150_000f64 * 10f64)?;
2115 let count_clerk = f_to_i(1_000f64)?;
2116
2117 LoadGenerator::Tpch {
2118 count_supplier,
2119 count_part,
2120 count_customer,
2121 count_orders,
2122 count_clerk,
2123 }
2124 }
2125 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127 let LoadGeneratorOptionExtracted {
2128 keys,
2129 snapshot_rounds,
2130 transactional_snapshot,
2131 value_size,
2132 tick_interval,
2133 seed,
2134 partitions,
2135 batch_size,
2136 ..
2137 } = extracted;
2138
2139 let mut include_offset = None;
2140 for im in include_metadata {
2141 match im {
2142 SourceIncludeMetadata::Offset { alias } => {
2143 include_offset = match alias {
2144 Some(alias) => Some(alias.to_string()),
2145 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146 }
2147 }
2148 SourceIncludeMetadata::Key { .. } => continue,
2149
2150 _ => {
2151 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152 }
2153 };
2154 }
2155
2156 let lgkv = KeyValueLoadGenerator {
2157 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158 snapshot_rounds: snapshot_rounds
2159 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162 value_size: value_size
2163 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164 partitions: partitions
2165 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166 tick_interval,
2167 batch_size: batch_size
2168 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170 include_offset,
2171 };
2172
2173 if lgkv.keys == 0
2174 || lgkv.partitions == 0
2175 || lgkv.value_size == 0
2176 || lgkv.batch_size == 0
2177 {
2178 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179 }
2180
2181 if lgkv.keys % lgkv.partitions != 0 {
2182 sql_bail!("KEYS must be a multiple of PARTITIONS")
2183 }
2184
2185 if lgkv.batch_size > lgkv.keys {
2186 sql_bail!("KEYS must be larger than BATCH SIZE")
2187 }
2188
2189 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193 }
2194
2195 if lgkv.snapshot_rounds == 0 {
2196 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197 }
2198
2199 LoadGenerator::KeyValue(lgkv)
2200 }
2201 };
2202
2203 Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207 let before = value_desc.get_by_name(&"before".into());
2208 let (after_idx, after_ty) = value_desc
2209 .get_by_name(&"after".into())
2210 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211 let before_idx = if let Some((before_idx, before_ty)) = before {
2212 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213 sql_bail!("'before' column must be of type record");
2214 }
2215 if before_ty != after_ty {
2216 sql_bail!("'before' type differs from 'after' column");
2217 }
2218 Some(before_idx)
2219 } else {
2220 None
2221 };
2222 Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226 scx: &StatementContext,
2227 format: &FormatSpecifier<Aug>,
2228 envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230 let encoding = match format {
2231 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232 FormatSpecifier::KeyValue { key, value } => {
2233 let key = {
2234 let encoding = get_encoding_inner(scx, key)?;
2235 Some(encoding.key.unwrap_or(encoding.value))
2236 };
2237 let value = get_encoding_inner(scx, value)?.value;
2238 SourceDataEncoding { key, value }
2239 }
2240 };
2241
2242 let requires_keyvalue = matches!(
2243 envelope,
2244 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245 );
2246 let is_keyvalue = encoding.key.is_some();
2247 if requires_keyvalue && !is_keyvalue {
2248 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249 };
2250
2251 Ok(encoding)
2252}
2253
2254fn source_sink_cluster_config<'a, 'ctx>(
2260 scx: &'a StatementContext<'ctx>,
2261 in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263 let cluster = match in_cluster {
2264 None => {
2265 let cluster = scx.catalog.resolve_cluster(None)?;
2266 *in_cluster = Some(ResolvedClusterName {
2267 id: cluster.id(),
2268 print_name: None,
2269 });
2270 cluster
2271 }
2272 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273 };
2274
2275 Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282 pub key_schema: Option<String>,
2283 pub value_schema: String,
2284 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285 pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289 scx: &StatementContext,
2290 format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292 let value = match format {
2293 Format::Bytes => DataEncoding::Bytes,
2294 Format::Avro(schema) => {
2295 let Schema {
2296 key_schema,
2297 value_schema,
2298 csr_connection,
2299 confluent_wire_format,
2300 } = match schema {
2301 AvroSchema::InlineSchema {
2304 schema: ast::Schema { schema },
2305 with_options,
2306 } => {
2307 let AvroSchemaOptionExtracted {
2308 confluent_wire_format,
2309 ..
2310 } = with_options.clone().try_into()?;
2311
2312 Schema {
2313 key_schema: None,
2314 value_schema: schema.clone(),
2315 csr_connection: None,
2316 confluent_wire_format,
2317 }
2318 }
2319 AvroSchema::Csr {
2320 csr_connection:
2321 CsrConnectionAvro {
2322 connection,
2323 seed,
2324 key_strategy: _,
2325 value_strategy: _,
2326 },
2327 } => {
2328 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329 let csr_connection = match item.connection()? {
2330 Connection::Csr(_) => item.id(),
2331 _ => {
2332 sql_bail!(
2333 "{} is not a schema registry connection",
2334 scx.catalog
2335 .resolve_full_name(item.name())
2336 .to_string()
2337 .quoted()
2338 )
2339 }
2340 };
2341
2342 if let Some(seed) = seed {
2343 Schema {
2344 key_schema: seed.key_schema.clone(),
2345 value_schema: seed.value_schema.clone(),
2346 csr_connection: Some(csr_connection),
2347 confluent_wire_format: true,
2348 }
2349 } else {
2350 unreachable!("CSR seed resolution should already have been called: Avro")
2351 }
2352 }
2353 };
2354
2355 if let Some(key_schema) = key_schema {
2356 return Ok(SourceDataEncoding {
2357 key: Some(DataEncoding::Avro(AvroEncoding {
2358 schema: key_schema,
2359 csr_connection: csr_connection.clone(),
2360 confluent_wire_format,
2361 })),
2362 value: DataEncoding::Avro(AvroEncoding {
2363 schema: value_schema,
2364 csr_connection,
2365 confluent_wire_format,
2366 }),
2367 });
2368 } else {
2369 DataEncoding::Avro(AvroEncoding {
2370 schema: value_schema,
2371 csr_connection,
2372 confluent_wire_format,
2373 })
2374 }
2375 }
2376 Format::Protobuf(schema) => match schema {
2377 ProtobufSchema::Csr {
2378 csr_connection:
2379 CsrConnectionProtobuf {
2380 connection:
2381 CsrConnection {
2382 connection,
2383 options,
2384 },
2385 seed,
2386 },
2387 } => {
2388 if let Some(CsrSeedProtobuf { key, value }) = seed {
2389 let item = scx.get_item_by_resolved_name(connection)?;
2390 let _ = match item.connection()? {
2391 Connection::Csr(connection) => connection,
2392 _ => {
2393 sql_bail!(
2394 "{} is not a schema registry connection",
2395 scx.catalog
2396 .resolve_full_name(item.name())
2397 .to_string()
2398 .quoted()
2399 )
2400 }
2401 };
2402
2403 if !options.is_empty() {
2404 sql_bail!("Protobuf CSR connections do not support any options");
2405 }
2406
2407 let value = DataEncoding::Protobuf(ProtobufEncoding {
2408 descriptors: strconv::parse_bytes(&value.schema)?,
2409 message_name: value.message_name.clone(),
2410 confluent_wire_format: true,
2411 });
2412 if let Some(key) = key {
2413 return Ok(SourceDataEncoding {
2414 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415 descriptors: strconv::parse_bytes(&key.schema)?,
2416 message_name: key.message_name.clone(),
2417 confluent_wire_format: true,
2418 })),
2419 value,
2420 });
2421 }
2422 value
2423 } else {
2424 unreachable!("CSR seed resolution should already have been called: Proto")
2425 }
2426 }
2427 ProtobufSchema::InlineSchema {
2428 message_name,
2429 schema: ast::Schema { schema },
2430 } => {
2431 let descriptors = strconv::parse_bytes(schema)?;
2432
2433 DataEncoding::Protobuf(ProtobufEncoding {
2434 descriptors,
2435 message_name: message_name.to_owned(),
2436 confluent_wire_format: false,
2437 })
2438 }
2439 },
2440 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441 regex: mz_repr::adt::regex::Regex::new(regex, false)
2442 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443 }),
2444 Format::Csv { columns, delimiter } => {
2445 let columns = match columns {
2446 CsvColumns::Header { names } => {
2447 if names.is_empty() {
2448 sql_bail!("[internal error] column spec should get names in purify")
2449 }
2450 ColumnSpec::Header {
2451 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452 }
2453 }
2454 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455 };
2456 DataEncoding::Csv(CsvEncoding {
2457 columns,
2458 delimiter: u8::try_from(*delimiter)
2459 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460 })
2461 }
2462 Format::Json { array: false } => DataEncoding::Json,
2463 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464 Format::Text => DataEncoding::Text,
2465 };
2466 Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469fn get_key_envelope(
2471 included_items: &[SourceIncludeMetadata],
2472 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473 key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475 let key_definition = included_items
2476 .iter()
2477 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482 (Some(name), _) if key_envelope_no_encoding => {
2483 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484 }
2485 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486 (_, None) => {
2487 sql_bail!(
2491 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492 got bare FORMAT"
2493 );
2494 }
2495 }
2496 } else {
2497 Ok(KeyEnvelope::None)
2498 }
2499}
2500
2501fn get_unnamed_key_envelope(
2504 key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506 let is_composite = match key {
2510 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511 Some(
2512 DataEncoding::Avro(_)
2513 | DataEncoding::Csv(_)
2514 | DataEncoding::Protobuf(_)
2515 | DataEncoding::Regex { .. },
2516 ) => true,
2517 None => false,
2518 };
2519
2520 if is_composite {
2521 Ok(KeyEnvelope::Flattened)
2522 } else {
2523 Ok(KeyEnvelope::Named("key".to_string()))
2524 }
2525}
2526
2527pub fn describe_create_view(
2528 _: &StatementContext,
2529 _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531 Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535 scx: &StatementContext,
2536 def: &mut ViewDefinition<Aug>,
2537 temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539 let create_sql = normalize::create_statement(
2540 scx,
2541 Statement::CreateView(CreateViewStatement {
2542 if_exists: IfExistsBehavior::Error,
2543 temporary,
2544 definition: def.clone(),
2545 }),
2546 )?;
2547
2548 let ViewDefinition {
2549 name,
2550 columns,
2551 query,
2552 } = def;
2553
2554 let query::PlannedRootQuery {
2555 expr,
2556 mut desc,
2557 finishing,
2558 scope: _,
2559 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566 &finishing,
2567 expr.arity()
2568 ));
2569 if expr.contains_parameters()? {
2570 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571 }
2572
2573 let dependencies = expr
2574 .depends_on()
2575 .into_iter()
2576 .map(|gid| scx.catalog.resolve_item_id(&gid))
2577 .collect();
2578
2579 let name = if temporary {
2580 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581 } else {
2582 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583 };
2584
2585 plan_utils::maybe_rename_columns(
2586 format!("view {}", scx.catalog.resolve_full_name(&name)),
2587 &mut desc,
2588 columns,
2589 )?;
2590 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592 if let Some(dup) = names.iter().duplicates().next() {
2593 sql_bail!("column {} specified more than once", dup.quoted());
2594 }
2595
2596 let view = View {
2597 create_sql,
2598 expr,
2599 dependencies,
2600 column_names: names,
2601 temporary,
2602 };
2603
2604 Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608 scx: &StatementContext,
2609 mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611 let CreateViewStatement {
2612 temporary,
2613 if_exists,
2614 definition,
2615 } = &mut stmt;
2616 let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623 let if_exists = true;
2624 let cascade = false;
2625 let maybe_item_to_drop = plan_drop_item(
2626 scx,
2627 ObjectType::View,
2628 if_exists,
2629 definition.name.clone(),
2630 cascade,
2631 )?;
2632
2633 if let Some(id) = maybe_item_to_drop {
2635 let dependencies = view.expr.depends_on();
2636 let invalid_drop = scx
2637 .get_item(&id)
2638 .global_ids()
2639 .any(|gid| dependencies.contains(&gid));
2640 if invalid_drop {
2641 let item = scx.catalog.get_item(&id);
2642 sql_bail!(
2643 "cannot replace view {0}: depended upon by new {0} definition",
2644 scx.catalog.resolve_full_name(item.name())
2645 );
2646 }
2647
2648 Some(id)
2649 } else {
2650 None
2651 }
2652 } else {
2653 None
2654 };
2655 let drop_ids = replace
2656 .map(|id| {
2657 scx.catalog
2658 .item_dependents(id)
2659 .into_iter()
2660 .map(|id| id.unwrap_item_id())
2661 .collect()
2662 })
2663 .unwrap_or_default();
2664
2665 validate_view_dependencies(scx, &view.dependencies.0)?;
2666
2667 let full_name = scx.catalog.resolve_full_name(&name);
2669 let partial_name = PartialItemName::from(full_name.clone());
2670 if let (Ok(item), IfExistsBehavior::Error, false) = (
2673 scx.catalog.resolve_item_or_type(&partial_name),
2674 *if_exists,
2675 ignore_if_exists_errors,
2676 ) {
2677 return Err(PlanError::ItemAlreadyExists {
2678 name: full_name.to_string(),
2679 item_type: item.item_type(),
2680 });
2681 }
2682
2683 Ok(Plan::CreateView(CreateViewPlan {
2684 name,
2685 view,
2686 replace,
2687 drop_ids,
2688 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2689 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2690 }))
2691}
2692
2693fn validate_view_dependencies(
2695 scx: &StatementContext,
2696 dependencies: &BTreeSet<CatalogItemId>,
2697) -> Result<(), PlanError> {
2698 for id in dependencies {
2699 let item = scx.catalog.get_item(id);
2700 if item.replacement_target().is_some() {
2701 let name = scx.catalog.minimal_qualification(item.name());
2702 return Err(PlanError::InvalidDependency {
2703 name: name.to_string(),
2704 item_type: format!("replacement {}", item.item_type()),
2705 });
2706 }
2707 }
2708
2709 Ok(())
2710}
2711
2712pub fn describe_create_materialized_view(
2713 _: &StatementContext,
2714 _: CreateMaterializedViewStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716 Ok(StatementDesc::new(None))
2717}
2718
2719pub fn describe_create_continual_task(
2720 _: &StatementContext,
2721 _: CreateContinualTaskStatement<Aug>,
2722) -> Result<StatementDesc, PlanError> {
2723 Ok(StatementDesc::new(None))
2724}
2725
2726pub fn describe_create_network_policy(
2727 _: &StatementContext,
2728 _: CreateNetworkPolicyStatement<Aug>,
2729) -> Result<StatementDesc, PlanError> {
2730 Ok(StatementDesc::new(None))
2731}
2732
2733pub fn describe_alter_network_policy(
2734 _: &StatementContext,
2735 _: AlterNetworkPolicyStatement<Aug>,
2736) -> Result<StatementDesc, PlanError> {
2737 Ok(StatementDesc::new(None))
2738}
2739
2740pub fn plan_create_materialized_view(
2741 scx: &StatementContext,
2742 mut stmt: CreateMaterializedViewStatement<Aug>,
2743) -> Result<Plan, PlanError> {
2744 let cluster_id =
2745 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2746 stmt.in_cluster = Some(ResolvedClusterName {
2747 id: cluster_id,
2748 print_name: None,
2749 });
2750
2751 let create_sql =
2752 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2753
2754 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2755 let name = scx.allocate_qualified_name(partial_name.clone())?;
2756
2757 let query::PlannedRootQuery {
2758 expr,
2759 mut desc,
2760 finishing,
2761 scope: _,
2762 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2763 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2765 &finishing,
2766 expr.arity()
2767 ));
2768 if expr.contains_parameters()? {
2769 return Err(PlanError::ParameterNotAllowed(
2770 "materialized views".to_string(),
2771 ));
2772 }
2773
2774 plan_utils::maybe_rename_columns(
2775 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2776 &mut desc,
2777 &stmt.columns,
2778 )?;
2779 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2780
2781 let MaterializedViewOptionExtracted {
2782 assert_not_null,
2783 partition_by,
2784 retain_history,
2785 refresh,
2786 seen: _,
2787 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2788
2789 if let Some(partition_by) = partition_by {
2790 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2791 check_partition_by(&desc, partition_by)?;
2792 }
2793
2794 let refresh_schedule = {
2795 let mut refresh_schedule = RefreshSchedule::default();
2796 let mut on_commits_seen = 0;
2797 for refresh_option_value in refresh {
2798 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2799 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2800 }
2801 match refresh_option_value {
2802 RefreshOptionValue::OnCommit => {
2803 on_commits_seen += 1;
2804 }
2805 RefreshOptionValue::AtCreation => {
2806 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2807 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2808 }
2809 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2810 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2812 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2813 name: "REFRESH AT",
2814 scope: &Scope::empty(),
2815 relation_type: &SqlRelationType::empty(),
2816 allow_aggregates: false,
2817 allow_subqueries: false,
2818 allow_parameters: false,
2819 allow_windows: false,
2820 };
2821 let hir = plan_expr(ecx, &time)?.cast_to(
2822 ecx,
2823 CastContext::Assignment,
2824 &SqlScalarType::MzTimestamp,
2825 )?;
2826 let timestamp = hir
2828 .into_literal_mz_timestamp()
2829 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2830 refresh_schedule.ats.push(timestamp);
2831 }
2832 RefreshOptionValue::Every(RefreshEveryOptionValue {
2833 interval,
2834 aligned_to,
2835 }) => {
2836 let interval = Interval::try_from_value(Value::Interval(interval))?;
2837 if interval.as_microseconds() <= 0 {
2838 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2839 }
2840 if interval.months != 0 {
2841 sql_bail!("REFRESH interval must not involve units larger than days");
2846 }
2847 let interval = interval.duration()?;
2848 if u64::try_from(interval.as_millis()).is_err() {
2849 sql_bail!("REFRESH interval too large");
2850 }
2851
2852 let mut aligned_to = match aligned_to {
2853 Some(aligned_to) => aligned_to,
2854 None => {
2855 soft_panic_or_log!(
2856 "ALIGNED TO should have been filled in by purification"
2857 );
2858 sql_bail!(
2859 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2860 )
2861 }
2862 };
2863
2864 transform_ast::transform(scx, &mut aligned_to)?;
2866
2867 let ecx = &ExprContext {
2868 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2869 name: "REFRESH EVERY ... ALIGNED TO",
2870 scope: &Scope::empty(),
2871 relation_type: &SqlRelationType::empty(),
2872 allow_aggregates: false,
2873 allow_subqueries: false,
2874 allow_parameters: false,
2875 allow_windows: false,
2876 };
2877 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2878 ecx,
2879 CastContext::Assignment,
2880 &SqlScalarType::MzTimestamp,
2881 )?;
2882 let aligned_to_const = aligned_to_hir
2884 .into_literal_mz_timestamp()
2885 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2886
2887 refresh_schedule.everies.push(RefreshEvery {
2888 interval,
2889 aligned_to: aligned_to_const,
2890 });
2891 }
2892 }
2893 }
2894
2895 if on_commits_seen > 1 {
2896 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2897 }
2898 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2899 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2900 }
2901
2902 if refresh_schedule == RefreshSchedule::default() {
2903 None
2904 } else {
2905 Some(refresh_schedule)
2906 }
2907 };
2908
2909 let as_of = stmt.as_of.map(Timestamp::from);
2910 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2911 let mut non_null_assertions = assert_not_null
2912 .into_iter()
2913 .map(normalize::column_name)
2914 .map(|assertion_name| {
2915 column_names
2916 .iter()
2917 .position(|col| col == &assertion_name)
2918 .ok_or_else(|| {
2919 sql_err!(
2920 "column {} in ASSERT NOT NULL option not found",
2921 assertion_name.quoted()
2922 )
2923 })
2924 })
2925 .collect::<Result<Vec<_>, _>>()?;
2926 non_null_assertions.sort();
2927 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2928 let dup = &column_names[*dup];
2929 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2930 }
2931
2932 if let Some(dup) = column_names.iter().duplicates().next() {
2933 sql_bail!("column {} specified more than once", dup.quoted());
2934 }
2935
2936 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2939 Ok(true) => IfExistsBehavior::Skip,
2940 _ => stmt.if_exists,
2941 };
2942
2943 let mut replace = None;
2944 let mut if_not_exists = false;
2945 match if_exists {
2946 IfExistsBehavior::Replace => {
2947 let if_exists = true;
2948 let cascade = false;
2949 let replace_id = plan_drop_item(
2950 scx,
2951 ObjectType::MaterializedView,
2952 if_exists,
2953 partial_name.clone().into(),
2954 cascade,
2955 )?;
2956
2957 if let Some(id) = replace_id {
2959 let dependencies = expr.depends_on();
2960 let invalid_drop = scx
2961 .get_item(&id)
2962 .global_ids()
2963 .any(|gid| dependencies.contains(&gid));
2964 if invalid_drop {
2965 let item = scx.catalog.get_item(&id);
2966 sql_bail!(
2967 "cannot replace materialized view {0}: depended upon by new {0} definition",
2968 scx.catalog.resolve_full_name(item.name())
2969 );
2970 }
2971 replace = Some(id);
2972 }
2973 }
2974 IfExistsBehavior::Skip => if_not_exists = true,
2975 IfExistsBehavior::Error => (),
2976 }
2977 let drop_ids = replace
2978 .map(|id| {
2979 scx.catalog
2980 .item_dependents(id)
2981 .into_iter()
2982 .map(|id| id.unwrap_item_id())
2983 .collect()
2984 })
2985 .unwrap_or_default();
2986 let mut dependencies: BTreeSet<_> = expr
2987 .depends_on()
2988 .into_iter()
2989 .map(|gid| scx.catalog.resolve_item_id(&gid))
2990 .collect();
2991
2992 let mut replacement_target = None;
2994 if let Some(target_name) = &stmt.replacement_for {
2995 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2996
2997 let target = scx.get_item_by_resolved_name(target_name)?;
2998 if target.item_type() != CatalogItemType::MaterializedView {
2999 return Err(PlanError::InvalidReplacement {
3000 item_type: target.item_type(),
3001 item_name: scx.catalog.minimal_qualification(target.name()),
3002 replacement_type: CatalogItemType::MaterializedView,
3003 replacement_name: partial_name,
3004 });
3005 }
3006 if target.id().is_system() {
3007 sql_bail!(
3008 "cannot replace {} because it is required by the database system",
3009 scx.catalog.minimal_qualification(target.name()),
3010 );
3011 }
3012
3013 for dependent in scx.catalog.item_dependents(target.id()) {
3015 if let ObjectId::Item(id) = dependent
3016 && dependencies.contains(&id)
3017 {
3018 sql_bail!(
3019 "replacement would cause {} to depend on itself",
3020 scx.catalog.minimal_qualification(target.name()),
3021 );
3022 }
3023 }
3024
3025 dependencies.insert(target.id());
3026
3027 for use_id in target.used_by() {
3028 let use_item = scx.get_item(use_id);
3029 if use_item.replacement_target() == Some(target.id()) {
3030 sql_bail!(
3031 "cannot replace {} because it already has a replacement: {}",
3032 scx.catalog.minimal_qualification(target.name()),
3033 scx.catalog.minimal_qualification(use_item.name()),
3034 );
3035 }
3036 }
3037
3038 replacement_target = Some(target.id());
3039 }
3040
3041 validate_view_dependencies(scx, &dependencies)?;
3042
3043 let full_name = scx.catalog.resolve_full_name(&name);
3045 let partial_name = PartialItemName::from(full_name.clone());
3046 if let (IfExistsBehavior::Error, Ok(item)) =
3049 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3050 {
3051 return Err(PlanError::ItemAlreadyExists {
3052 name: full_name.to_string(),
3053 item_type: item.item_type(),
3054 });
3055 }
3056
3057 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3058 name,
3059 materialized_view: MaterializedView {
3060 create_sql,
3061 expr,
3062 dependencies: DependencyIds(dependencies),
3063 column_names,
3064 replacement_target,
3065 cluster_id,
3066 non_null_assertions,
3067 compaction_window,
3068 refresh_schedule,
3069 as_of,
3070 },
3071 replace,
3072 drop_ids,
3073 if_not_exists,
3074 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3075 }))
3076}
3077
3078generate_extracted_config!(
3079 MaterializedViewOption,
3080 (AssertNotNull, Ident, AllowMultiple),
3081 (PartitionBy, Vec<Ident>),
3082 (RetainHistory, OptionalDuration),
3083 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3084);
3085
3086pub fn plan_create_continual_task(
3087 scx: &StatementContext,
3088 mut stmt: CreateContinualTaskStatement<Aug>,
3089) -> Result<Plan, PlanError> {
3090 match &stmt.sugar {
3091 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3092 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3093 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3094 }
3095 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3096 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3097 }
3098 };
3099 let cluster_id = match &stmt.in_cluster {
3100 None => scx.catalog.resolve_cluster(None)?.id(),
3101 Some(in_cluster) => in_cluster.id,
3102 };
3103 stmt.in_cluster = Some(ResolvedClusterName {
3104 id: cluster_id,
3105 print_name: None,
3106 });
3107
3108 let create_sql =
3109 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3110
3111 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3112
3113 let mut desc = match stmt.columns {
3118 None => None,
3119 Some(columns) => {
3120 let mut desc_columns = Vec::with_capacity(columns.capacity());
3121 for col in columns.iter() {
3122 desc_columns.push((
3123 normalize::column_name(col.name.clone()),
3124 SqlColumnType {
3125 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3126 nullable: false,
3127 },
3128 ));
3129 }
3130 Some(RelationDesc::from_names_and_types(desc_columns))
3131 }
3132 };
3133 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3134 match input.item_type() {
3135 CatalogItemType::ContinualTask
3138 | CatalogItemType::Table
3139 | CatalogItemType::MaterializedView
3140 | CatalogItemType::Source => {}
3141 CatalogItemType::Sink
3142 | CatalogItemType::View
3143 | CatalogItemType::Index
3144 | CatalogItemType::Type
3145 | CatalogItemType::Func
3146 | CatalogItemType::Secret
3147 | CatalogItemType::Connection => {
3148 sql_bail!(
3149 "CONTINUAL TASK cannot use {} as an input",
3150 input.item_type()
3151 );
3152 }
3153 }
3154
3155 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3156 let ct_name = stmt.name;
3157 let placeholder_id = match &ct_name {
3158 ResolvedItemName::ContinualTask { id, name } => {
3159 let desc = match desc.as_ref().cloned() {
3160 Some(x) => x,
3161 None => {
3162 let desc = input.relation_desc().expect("item type checked above");
3167 desc.into_owned()
3168 }
3169 };
3170 qcx.ctes.insert(
3171 *id,
3172 CteDesc {
3173 name: name.item.clone(),
3174 desc,
3175 },
3176 );
3177 Some(*id)
3178 }
3179 _ => None,
3180 };
3181
3182 let mut exprs = Vec::new();
3183 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3184 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3185 let query::PlannedRootQuery {
3186 expr,
3187 desc: desc_query,
3188 finishing,
3189 scope: _,
3190 } = query::plan_ct_query(&mut qcx, query)?;
3191 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3194 &finishing,
3195 expr.arity()
3196 ));
3197 if expr.contains_parameters()? {
3198 if expr.contains_parameters()? {
3199 return Err(PlanError::ParameterNotAllowed(
3200 "continual tasks".to_string(),
3201 ));
3202 }
3203 }
3204 let expr = match desc.as_mut() {
3205 None => {
3206 desc = Some(desc_query);
3207 expr
3208 }
3209 Some(desc) => {
3210 if desc_query.arity() > desc.arity() {
3213 sql_bail!(
3214 "statement {}: INSERT has more expressions than target columns",
3215 idx
3216 );
3217 }
3218 if desc_query.arity() < desc.arity() {
3219 sql_bail!(
3220 "statement {}: INSERT has more target columns than expressions",
3221 idx
3222 );
3223 }
3224 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3227 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3228 let expr = expr.map_err(|e| {
3229 sql_err!(
3230 "statement {}: column {} is of type {} but expression is of type {}",
3231 idx,
3232 desc.get_name(e.column).quoted(),
3233 qcx.humanize_scalar_type(&e.target_type, false),
3234 qcx.humanize_scalar_type(&e.source_type, false),
3235 )
3236 })?;
3237
3238 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3241 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3242 if updated {
3243 let new_types = zip_types().map(|(ct, q)| {
3244 let mut ct = ct.clone();
3245 if q.nullable {
3246 ct.nullable = true;
3247 }
3248 ct
3249 });
3250 *desc = RelationDesc::from_names_and_types(
3251 desc.iter_names().cloned().zip_eq(new_types),
3252 );
3253 }
3254
3255 expr
3256 }
3257 };
3258 match stmt {
3259 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3260 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3261 }
3262 }
3263 let expr = exprs
3266 .into_iter()
3267 .reduce(|acc, expr| acc.union(expr))
3268 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3269 let dependencies = expr
3270 .depends_on()
3271 .into_iter()
3272 .map(|gid| scx.catalog.resolve_item_id(&gid))
3273 .collect();
3274
3275 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3276 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3277 if let Some(dup) = column_names.iter().duplicates().next() {
3278 sql_bail!("column {} specified more than once", dup.quoted());
3279 }
3280
3281 let name = match &ct_name {
3283 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3284 ResolvedItemName::ContinualTask { name, .. } => {
3285 let name = scx.allocate_qualified_name(name.clone())?;
3286 let full_name = scx.catalog.resolve_full_name(&name);
3287 let partial_name = PartialItemName::from(full_name.clone());
3288 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3291 return Err(PlanError::ItemAlreadyExists {
3292 name: full_name.to_string(),
3293 item_type: item.item_type(),
3294 });
3295 }
3296 name
3297 }
3298 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3299 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3300 };
3301
3302 let as_of = stmt.as_of.map(Timestamp::from);
3303 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3304 name,
3305 placeholder_id,
3306 desc,
3307 input_id: input.global_id(),
3308 with_snapshot: snapshot.unwrap_or(true),
3309 continual_task: MaterializedView {
3310 create_sql,
3311 expr,
3312 dependencies,
3313 column_names,
3314 replacement_target: None,
3315 cluster_id,
3316 non_null_assertions: Vec::new(),
3317 compaction_window: None,
3318 refresh_schedule: None,
3319 as_of,
3320 },
3321 }))
3322}
3323
3324fn continual_task_query<'a>(
3325 ct_name: &ResolvedItemName,
3326 stmt: &'a ast::ContinualTaskStmt<Aug>,
3327) -> Option<ast::Query<Aug>> {
3328 match stmt {
3329 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3330 table_name: _,
3331 columns,
3332 source,
3333 returning,
3334 }) => {
3335 if !columns.is_empty() || !returning.is_empty() {
3336 return None;
3337 }
3338 match source {
3339 ast::InsertSource::Query(query) => Some(query.clone()),
3340 ast::InsertSource::DefaultValues => None,
3341 }
3342 }
3343 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3344 table_name: _,
3345 alias,
3346 using,
3347 selection,
3348 }) => {
3349 if !using.is_empty() {
3350 return None;
3351 }
3352 let from = ast::TableWithJoins {
3354 relation: ast::TableFactor::Table {
3355 name: ct_name.clone(),
3356 alias: alias.clone(),
3357 },
3358 joins: Vec::new(),
3359 };
3360 let select = ast::Select {
3361 from: vec![from],
3362 selection: selection.clone(),
3363 distinct: None,
3364 projection: vec![ast::SelectItem::Wildcard],
3365 group_by: Vec::new(),
3366 having: None,
3367 qualify: None,
3368 options: Vec::new(),
3369 };
3370 let query = ast::Query {
3371 ctes: ast::CteBlock::Simple(Vec::new()),
3372 body: ast::SetExpr::Select(Box::new(select)),
3373 order_by: Vec::new(),
3374 limit: None,
3375 offset: None,
3376 };
3377 Some(query)
3379 }
3380 }
3381}
3382
3383generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3384
3385pub fn describe_create_sink(
3386 _: &StatementContext,
3387 _: CreateSinkStatement<Aug>,
3388) -> Result<StatementDesc, PlanError> {
3389 Ok(StatementDesc::new(None))
3390}
3391
3392generate_extracted_config!(
3393 CreateSinkOption,
3394 (Snapshot, bool),
3395 (PartitionStrategy, String),
3396 (Version, u64),
3397 (CommitInterval, Duration)
3398);
3399
3400pub fn plan_create_sink(
3401 scx: &StatementContext,
3402 stmt: CreateSinkStatement<Aug>,
3403) -> Result<Plan, PlanError> {
3404 let Some(name) = stmt.name.clone() else {
3406 return Err(PlanError::MissingName(CatalogItemType::Sink));
3407 };
3408 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3409 let full_name = scx.catalog.resolve_full_name(&name);
3410 let partial_name = PartialItemName::from(full_name.clone());
3411 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3412 return Err(PlanError::ItemAlreadyExists {
3413 name: full_name.to_string(),
3414 item_type: item.item_type(),
3415 });
3416 }
3417
3418 plan_sink(scx, stmt)
3419}
3420
3421fn plan_sink(
3426 scx: &StatementContext,
3427 mut stmt: CreateSinkStatement<Aug>,
3428) -> Result<Plan, PlanError> {
3429 let CreateSinkStatement {
3430 name,
3431 in_cluster: _,
3432 from,
3433 connection,
3434 format,
3435 envelope,
3436 mode,
3437 if_not_exists,
3438 with_options,
3439 } = stmt.clone();
3440
3441 let Some(name) = name else {
3442 return Err(PlanError::MissingName(CatalogItemType::Sink));
3443 };
3444 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3445
3446 let envelope = match (&connection, envelope, mode) {
3447 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3449 SinkEnvelope::Upsert
3450 }
3451 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3452 SinkEnvelope::Debezium
3453 }
3454 (CreateSinkConnection::Kafka { .. }, None, None) => {
3455 sql_bail!("ENVELOPE clause is required")
3456 }
3457 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3458 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3459 }
3460 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3462 SinkEnvelope::Upsert
3463 }
3464 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3465 sql_bail!("MODE clause is required")
3466 }
3467 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3468 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3469 }
3470 };
3471
3472 let from_name = &from;
3473 let from = scx.get_item_by_resolved_name(&from)?;
3474
3475 {
3476 use CatalogItemType::*;
3477 match from.item_type() {
3478 Table | Source | MaterializedView | ContinualTask => {
3479 if from.replacement_target().is_some() {
3480 let name = scx.catalog.minimal_qualification(from.name());
3481 return Err(PlanError::InvalidSinkFrom {
3482 name: name.to_string(),
3483 item_type: format!("replacement {}", from.item_type()),
3484 });
3485 }
3486 }
3487 Sink | View | Index | Type | Func | Secret | Connection => {
3488 let name = scx.catalog.minimal_qualification(from.name());
3489 return Err(PlanError::InvalidSinkFrom {
3490 name: name.to_string(),
3491 item_type: from.item_type().to_string(),
3492 });
3493 }
3494 }
3495 }
3496
3497 if from.id().is_system() {
3498 bail_unsupported!("creating a sink directly on a catalog object");
3499 }
3500
3501 let desc = from.relation_desc().expect("item type checked above");
3502 let key_indices = match &connection {
3503 CreateSinkConnection::Kafka { key: Some(key), .. }
3504 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3505 let key_columns = key
3506 .key_columns
3507 .clone()
3508 .into_iter()
3509 .map(normalize::column_name)
3510 .collect::<Vec<_>>();
3511 let mut uniq = BTreeSet::new();
3512 for col in key_columns.iter() {
3513 if !uniq.insert(col) {
3514 sql_bail!("duplicate column referenced in KEY: {}", col);
3515 }
3516 }
3517 let indices = key_columns
3518 .iter()
3519 .map(|col| -> anyhow::Result<usize> {
3520 let name_idx =
3521 desc.get_by_name(col)
3522 .map(|(idx, _type)| idx)
3523 .ok_or_else(|| {
3524 sql_err!("column referenced in KEY does not exist: {}", col)
3525 })?;
3526 if desc.get_unambiguous_name(name_idx).is_none() {
3527 sql_err!("column referenced in KEY is ambiguous: {}", col);
3528 }
3529 Ok(name_idx)
3530 })
3531 .collect::<Result<Vec<_>, _>>()?;
3532 let is_valid_key = desc
3533 .typ()
3534 .keys
3535 .iter()
3536 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3537
3538 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3539 if key.not_enforced {
3540 scx.catalog
3541 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3542 key: key_columns.clone(),
3543 name: name.item.clone(),
3544 })
3545 } else {
3546 return Err(PlanError::UpsertSinkWithInvalidKey {
3547 name: from_name.full_name_str(),
3548 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3549 valid_keys: desc
3550 .typ()
3551 .keys
3552 .iter()
3553 .map(|key| {
3554 key.iter()
3555 .map(|col| desc.get_name(*col).as_str().into())
3556 .collect()
3557 })
3558 .collect(),
3559 });
3560 }
3561 }
3562 Some(indices)
3563 }
3564 CreateSinkConnection::Kafka { key: None, .. }
3565 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3566 };
3567
3568 let headers_index = match &connection {
3569 CreateSinkConnection::Kafka {
3570 headers: Some(headers),
3571 ..
3572 } => {
3573 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3574
3575 match envelope {
3576 SinkEnvelope::Upsert => (),
3577 SinkEnvelope::Debezium => {
3578 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3579 }
3580 };
3581
3582 let headers = normalize::column_name(headers.clone());
3583 let (idx, ty) = desc
3584 .get_by_name(&headers)
3585 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3586
3587 if desc.get_unambiguous_name(idx).is_none() {
3588 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3589 }
3590
3591 match &ty.scalar_type {
3592 SqlScalarType::Map { value_type, .. }
3593 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3594 _ => sql_bail!(
3595 "HEADERS column must have type map[text => text] or map[text => bytea]"
3596 ),
3597 }
3598
3599 Some(idx)
3600 }
3601 _ => None,
3602 };
3603
3604 let relation_key_indices = desc.typ().keys.get(0).cloned();
3606
3607 let key_desc_and_indices = key_indices.map(|key_indices| {
3608 let cols = desc
3609 .iter()
3610 .map(|(name, ty)| (name.clone(), ty.clone()))
3611 .collect::<Vec<_>>();
3612 let (names, types): (Vec<_>, Vec<_>) =
3613 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3614 let typ = SqlRelationType::new(types);
3615 (RelationDesc::new(typ, names), key_indices)
3616 });
3617
3618 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3619 return Err(PlanError::UpsertSinkWithoutKey);
3620 }
3621
3622 let CreateSinkOptionExtracted {
3623 snapshot,
3624 version,
3625 partition_strategy: _,
3626 seen: _,
3627 commit_interval,
3628 } = with_options.try_into()?;
3629
3630 let connection_builder = match connection {
3631 CreateSinkConnection::Kafka {
3632 connection,
3633 options,
3634 ..
3635 } => kafka_sink_builder(
3636 scx,
3637 connection,
3638 options,
3639 format,
3640 relation_key_indices,
3641 key_desc_and_indices,
3642 headers_index,
3643 desc.into_owned(),
3644 envelope,
3645 from.id(),
3646 commit_interval,
3647 )?,
3648 CreateSinkConnection::Iceberg {
3649 connection,
3650 aws_connection,
3651 options,
3652 ..
3653 } => iceberg_sink_builder(
3654 scx,
3655 connection,
3656 aws_connection,
3657 options,
3658 relation_key_indices,
3659 key_desc_and_indices,
3660 commit_interval,
3661 )?,
3662 };
3663
3664 let with_snapshot = snapshot.unwrap_or(true);
3666 let version = version.unwrap_or(0);
3668
3669 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3673 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3674
3675 Ok(Plan::CreateSink(CreateSinkPlan {
3676 name,
3677 sink: Sink {
3678 create_sql,
3679 from: from.global_id(),
3680 connection: connection_builder,
3681 envelope,
3682 version,
3683 commit_interval,
3684 },
3685 with_snapshot,
3686 if_not_exists,
3687 in_cluster: in_cluster.id(),
3688 }))
3689}
3690
3691fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3692 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3693
3694 let existing_keys = desc
3695 .typ()
3696 .keys
3697 .iter()
3698 .map(|key_columns| {
3699 key_columns
3700 .iter()
3701 .map(|col| desc.get_name(*col).as_str())
3702 .join(", ")
3703 })
3704 .join(", ");
3705
3706 sql_err!(
3707 "Key constraint ({}) conflicts with existing key ({})",
3708 user_keys,
3709 existing_keys
3710 )
3711}
3712
3713#[derive(Debug, Default, PartialEq, Clone)]
3716pub struct CsrConfigOptionExtracted {
3717 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3718 pub(crate) avro_key_fullname: Option<String>,
3719 pub(crate) avro_value_fullname: Option<String>,
3720 pub(crate) null_defaults: bool,
3721 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3722 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3723 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3724 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3725}
3726
3727impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3728 type Error = crate::plan::PlanError;
3729 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3730 let mut extracted = CsrConfigOptionExtracted::default();
3731 let mut common_doc_comments = BTreeMap::new();
3732 for option in v {
3733 if !extracted.seen.insert(option.name.clone()) {
3734 return Err(PlanError::Unstructured({
3735 format!("{} specified more than once", option.name)
3736 }));
3737 }
3738 let option_name = option.name.clone();
3739 let option_name_str = option_name.to_ast_string_simple();
3740 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3741 option_name: option_name.to_ast_string_simple(),
3742 err: e.into(),
3743 };
3744 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3745 val.map(|s| match s {
3746 WithOptionValue::Value(Value::String(s)) => {
3747 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3748 }
3749 _ => Err("must be a string".to_string()),
3750 })
3751 .transpose()
3752 .map_err(PlanError::Unstructured)
3753 .map_err(better_error)
3754 };
3755 match option.name {
3756 CsrConfigOptionName::AvroKeyFullname => {
3757 extracted.avro_key_fullname =
3758 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3759 }
3760 CsrConfigOptionName::AvroValueFullname => {
3761 extracted.avro_value_fullname =
3762 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3763 }
3764 CsrConfigOptionName::NullDefaults => {
3765 extracted.null_defaults =
3766 <bool>::try_from_value(option.value).map_err(better_error)?;
3767 }
3768 CsrConfigOptionName::AvroDocOn(doc_on) => {
3769 let value = String::try_from_value(option.value.ok_or_else(|| {
3770 PlanError::InvalidOptionValue {
3771 option_name: option_name_str,
3772 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3773 }
3774 })?)
3775 .map_err(better_error)?;
3776 let key = match doc_on.identifier {
3777 DocOnIdentifier::Column(ast::ColumnName {
3778 relation: ResolvedItemName::Item { id, .. },
3779 column: ResolvedColumnReference::Column { name, index: _ },
3780 }) => DocTarget::Field {
3781 object_id: id,
3782 column_name: name,
3783 },
3784 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3785 DocTarget::Type(id)
3786 }
3787 _ => unreachable!(),
3788 };
3789
3790 match doc_on.for_schema {
3791 DocOnSchema::KeyOnly => {
3792 extracted.key_doc_options.insert(key, value);
3793 }
3794 DocOnSchema::ValueOnly => {
3795 extracted.value_doc_options.insert(key, value);
3796 }
3797 DocOnSchema::All => {
3798 common_doc_comments.insert(key, value);
3799 }
3800 }
3801 }
3802 CsrConfigOptionName::KeyCompatibilityLevel => {
3803 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3804 }
3805 CsrConfigOptionName::ValueCompatibilityLevel => {
3806 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3807 }
3808 }
3809 }
3810
3811 for (key, value) in common_doc_comments {
3812 if !extracted.key_doc_options.contains_key(&key) {
3813 extracted.key_doc_options.insert(key.clone(), value.clone());
3814 }
3815 if !extracted.value_doc_options.contains_key(&key) {
3816 extracted.value_doc_options.insert(key, value);
3817 }
3818 }
3819 Ok(extracted)
3820 }
3821}
3822
3823fn iceberg_sink_builder(
3824 scx: &StatementContext,
3825 catalog_connection: ResolvedItemName,
3826 aws_connection: ResolvedItemName,
3827 options: Vec<IcebergSinkConfigOption<Aug>>,
3828 relation_key_indices: Option<Vec<usize>>,
3829 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3830 commit_interval: Option<Duration>,
3831) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3832 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3833 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3834 let catalog_connection_id = catalog_connection_item.id();
3835 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3836 let aws_connection_id = aws_connection_item.id();
3837 if !matches!(
3838 catalog_connection_item.connection()?,
3839 Connection::IcebergCatalog(_)
3840 ) {
3841 sql_bail!(
3842 "{} is not an iceberg catalog connection",
3843 scx.catalog
3844 .resolve_full_name(catalog_connection_item.name())
3845 .to_string()
3846 .quoted()
3847 );
3848 };
3849
3850 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3851 sql_bail!(
3852 "{} is not an AWS connection",
3853 scx.catalog
3854 .resolve_full_name(aws_connection_item.name())
3855 .to_string()
3856 .quoted()
3857 );
3858 }
3859
3860 let IcebergSinkConfigOptionExtracted {
3861 table,
3862 namespace,
3863 seen: _,
3864 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3865
3866 let Some(table) = table else {
3867 sql_bail!("Iceberg sink must specify TABLE");
3868 };
3869 let Some(namespace) = namespace else {
3870 sql_bail!("Iceberg sink must specify NAMESPACE");
3871 };
3872 if commit_interval.is_none() {
3873 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3874 }
3875
3876 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3877 catalog_connection_id,
3878 catalog_connection: catalog_connection_id,
3879 aws_connection_id,
3880 aws_connection: aws_connection_id,
3881 table,
3882 namespace,
3883 relation_key_indices,
3884 key_desc_and_indices,
3885 }))
3886}
3887
3888fn kafka_sink_builder(
3889 scx: &StatementContext,
3890 connection: ResolvedItemName,
3891 options: Vec<KafkaSinkConfigOption<Aug>>,
3892 format: Option<FormatSpecifier<Aug>>,
3893 relation_key_indices: Option<Vec<usize>>,
3894 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3895 headers_index: Option<usize>,
3896 value_desc: RelationDesc,
3897 envelope: SinkEnvelope,
3898 sink_from: CatalogItemId,
3899 commit_interval: Option<Duration>,
3900) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3901 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3903 let connection_id = connection_item.id();
3904 match connection_item.connection()? {
3905 Connection::Kafka(_) => (),
3906 _ => sql_bail!(
3907 "{} is not a kafka connection",
3908 scx.catalog.resolve_full_name(connection_item.name())
3909 ),
3910 };
3911
3912 if commit_interval.is_some() {
3913 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3914 }
3915
3916 let KafkaSinkConfigOptionExtracted {
3917 topic,
3918 compression_type,
3919 partition_by,
3920 progress_group_id_prefix,
3921 transactional_id_prefix,
3922 legacy_ids,
3923 topic_config,
3924 topic_metadata_refresh_interval,
3925 topic_partition_count,
3926 topic_replication_factor,
3927 seen: _,
3928 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3929
3930 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3931 (Some(_), Some(true)) => {
3932 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3933 }
3934 (None, Some(true)) => KafkaIdStyle::Legacy,
3935 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3936 };
3937
3938 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3939 (Some(_), Some(true)) => {
3940 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3941 }
3942 (None, Some(true)) => KafkaIdStyle::Legacy,
3943 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3944 };
3945
3946 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3947
3948 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3949 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3952 }
3953
3954 let assert_positive = |val: Option<i32>, name: &str| {
3955 if let Some(val) = val {
3956 if val <= 0 {
3957 sql_bail!("{} must be a positive integer", name);
3958 }
3959 }
3960 val.map(NonNeg::try_from)
3961 .transpose()
3962 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3963 };
3964 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3965 let topic_replication_factor =
3966 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3967
3968 let gen_avro_schema_options = |conn| {
3971 let CsrConnectionAvro {
3972 connection:
3973 CsrConnection {
3974 connection,
3975 options,
3976 },
3977 seed,
3978 key_strategy,
3979 value_strategy,
3980 } = conn;
3981 if seed.is_some() {
3982 sql_bail!("SEED option does not make sense with sinks");
3983 }
3984 if key_strategy.is_some() {
3985 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3986 }
3987 if value_strategy.is_some() {
3988 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3989 }
3990
3991 let item = scx.get_item_by_resolved_name(&connection)?;
3992 let csr_connection = match item.connection()? {
3993 Connection::Csr(_) => item.id(),
3994 _ => {
3995 sql_bail!(
3996 "{} is not a schema registry connection",
3997 scx.catalog
3998 .resolve_full_name(item.name())
3999 .to_string()
4000 .quoted()
4001 )
4002 }
4003 };
4004 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4005
4006 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4007 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4008 }
4009
4010 if key_desc_and_indices.is_some()
4011 && (extracted_options.avro_key_fullname.is_some()
4012 ^ extracted_options.avro_value_fullname.is_some())
4013 {
4014 sql_bail!(
4015 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4016 );
4017 }
4018
4019 Ok((csr_connection, extracted_options))
4020 };
4021
4022 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4023 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4024 Format::Bytes if desc.arity() == 1 => {
4025 let col_type = &desc.typ().column_types[0].scalar_type;
4026 if !mz_pgrepr::Value::can_encode_binary(col_type) {
4027 bail_unsupported!(format!(
4028 "BYTES format with non-encodable type: {:?}",
4029 col_type
4030 ));
4031 }
4032
4033 Ok(KafkaSinkFormatType::Bytes)
4034 }
4035 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4036 Format::Bytes | Format::Text => {
4037 bail_unsupported!("BYTES or TEXT format with multiple columns")
4038 }
4039 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4040 Format::Avro(AvroSchema::Csr { csr_connection }) => {
4041 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4042 let schema = if is_key {
4043 AvroSchemaGenerator::new(
4044 desc.clone(),
4045 false,
4046 options.key_doc_options,
4047 options.avro_key_fullname.as_deref().unwrap_or("row"),
4048 options.null_defaults,
4049 Some(sink_from),
4050 false,
4051 )?
4052 .schema()
4053 .to_string()
4054 } else {
4055 AvroSchemaGenerator::new(
4056 desc.clone(),
4057 matches!(envelope, SinkEnvelope::Debezium),
4058 options.value_doc_options,
4059 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4060 options.null_defaults,
4061 Some(sink_from),
4062 true,
4063 )?
4064 .schema()
4065 .to_string()
4066 };
4067 Ok(KafkaSinkFormatType::Avro {
4068 schema,
4069 compatibility_level: if is_key {
4070 options.key_compatibility_level
4071 } else {
4072 options.value_compatibility_level
4073 },
4074 csr_connection,
4075 })
4076 }
4077 format => bail_unsupported!(format!("sink format {:?}", format)),
4078 };
4079
4080 let partition_by = match &partition_by {
4081 Some(partition_by) => {
4082 let mut scope = Scope::from_source(None, value_desc.iter_names());
4083
4084 match envelope {
4085 SinkEnvelope::Upsert => (),
4086 SinkEnvelope::Debezium => {
4087 let key_indices: HashSet<_> = key_desc_and_indices
4088 .as_ref()
4089 .map(|(_desc, indices)| indices.as_slice())
4090 .unwrap_or_default()
4091 .into_iter()
4092 .collect();
4093 for (i, item) in scope.items.iter_mut().enumerate() {
4094 if !key_indices.contains(&i) {
4095 item.error_if_referenced = Some(|_table, column| {
4096 PlanError::InvalidPartitionByEnvelopeDebezium {
4097 column_name: column.to_string(),
4098 }
4099 });
4100 }
4101 }
4102 }
4103 };
4104
4105 let ecx = &ExprContext {
4106 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4107 name: "PARTITION BY",
4108 scope: &scope,
4109 relation_type: value_desc.typ(),
4110 allow_aggregates: false,
4111 allow_subqueries: false,
4112 allow_parameters: false,
4113 allow_windows: false,
4114 };
4115 let expr = plan_expr(ecx, partition_by)?.cast_to(
4116 ecx,
4117 CastContext::Assignment,
4118 &SqlScalarType::UInt64,
4119 )?;
4120 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4121
4122 Some(expr)
4123 }
4124 _ => None,
4125 };
4126
4127 let format = match format {
4129 Some(FormatSpecifier::KeyValue { key, value }) => {
4130 let key_format = match key_desc_and_indices.as_ref() {
4131 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4132 None => None,
4133 };
4134 KafkaSinkFormat {
4135 value_format: map_format(value, &value_desc, false)?,
4136 key_format,
4137 }
4138 }
4139 Some(FormatSpecifier::Bare(format)) => {
4140 let key_format = match key_desc_and_indices.as_ref() {
4141 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4142 None => None,
4143 };
4144 KafkaSinkFormat {
4145 value_format: map_format(format, &value_desc, false)?,
4146 key_format,
4147 }
4148 }
4149 None => bail_unsupported!("sink without format"),
4150 };
4151
4152 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4153 connection_id,
4154 connection: connection_id,
4155 format,
4156 topic: topic_name,
4157 relation_key_indices,
4158 key_desc_and_indices,
4159 headers_index,
4160 value_desc,
4161 partition_by,
4162 compression_type,
4163 progress_group_id,
4164 transactional_id,
4165 topic_options: KafkaTopicOptions {
4166 partition_count: topic_partition_count,
4167 replication_factor: topic_replication_factor,
4168 topic_config: topic_config.unwrap_or_default(),
4169 },
4170 topic_metadata_refresh_interval,
4171 }))
4172}
4173
4174pub fn describe_create_index(
4175 _: &StatementContext,
4176 _: CreateIndexStatement<Aug>,
4177) -> Result<StatementDesc, PlanError> {
4178 Ok(StatementDesc::new(None))
4179}
4180
4181pub fn plan_create_index(
4182 scx: &StatementContext,
4183 mut stmt: CreateIndexStatement<Aug>,
4184) -> Result<Plan, PlanError> {
4185 let CreateIndexStatement {
4186 name,
4187 on_name,
4188 in_cluster,
4189 key_parts,
4190 with_options,
4191 if_not_exists,
4192 } = &mut stmt;
4193 let on = scx.get_item_by_resolved_name(on_name)?;
4194
4195 {
4196 use CatalogItemType::*;
4197 match on.item_type() {
4198 Table | Source | View | MaterializedView | ContinualTask => {
4199 if on.replacement_target().is_some() {
4200 sql_bail!(
4201 "index cannot be created on {} because it is a replacement {}",
4202 on_name.full_name_str(),
4203 on.item_type(),
4204 );
4205 }
4206 }
4207 Sink | Index | Type | Func | Secret | Connection => {
4208 sql_bail!(
4209 "index cannot be created on {} because it is a {}",
4210 on_name.full_name_str(),
4211 on.item_type(),
4212 );
4213 }
4214 }
4215 }
4216
4217 let on_desc = on.relation_desc().expect("item type checked above");
4218
4219 let filled_key_parts = match key_parts {
4220 Some(kp) => kp.to_vec(),
4221 None => {
4222 let key = on_desc.typ().default_key();
4224 key.iter()
4225 .map(|i| match on_desc.get_unambiguous_name(*i) {
4226 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4227 _ => Expr::Value(Value::Number((i + 1).to_string())),
4228 })
4229 .collect()
4230 }
4231 };
4232 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4233
4234 let index_name = if let Some(name) = name {
4235 QualifiedItemName {
4236 qualifiers: on.name().qualifiers.clone(),
4237 item: normalize::ident(name.clone()),
4238 }
4239 } else {
4240 let mut idx_name = QualifiedItemName {
4241 qualifiers: on.name().qualifiers.clone(),
4242 item: on.name().item.clone(),
4243 };
4244 if key_parts.is_none() {
4245 idx_name.item += "_primary_idx";
4247 } else {
4248 let index_name_col_suffix = keys
4251 .iter()
4252 .map(|k| match k {
4253 mz_expr::MirScalarExpr::Column(i, name) => {
4254 match (on_desc.get_unambiguous_name(*i), &name.0) {
4255 (Some(col_name), _) => col_name.to_string(),
4256 (None, Some(name)) => name.to_string(),
4257 (None, None) => format!("{}", i + 1),
4258 }
4259 }
4260 _ => "expr".to_string(),
4261 })
4262 .join("_");
4263 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4264 .expect("write on strings cannot fail");
4265 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4266 }
4267
4268 if !*if_not_exists {
4269 scx.catalog.find_available_name(idx_name)
4270 } else {
4271 idx_name
4272 }
4273 };
4274
4275 let full_name = scx.catalog.resolve_full_name(&index_name);
4277 let partial_name = PartialItemName::from(full_name.clone());
4278 if let (Ok(item), false, false) = (
4286 scx.catalog.resolve_item_or_type(&partial_name),
4287 *if_not_exists,
4288 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4289 ) {
4290 return Err(PlanError::ItemAlreadyExists {
4291 name: full_name.to_string(),
4292 item_type: item.item_type(),
4293 });
4294 }
4295
4296 let options = plan_index_options(scx, with_options.clone())?;
4297 let cluster_id = match in_cluster {
4298 None => scx.resolve_cluster(None)?.id(),
4299 Some(in_cluster) => in_cluster.id,
4300 };
4301
4302 *in_cluster = Some(ResolvedClusterName {
4303 id: cluster_id,
4304 print_name: None,
4305 });
4306
4307 *name = Some(Ident::new(index_name.item.clone())?);
4309 *key_parts = Some(filled_key_parts);
4310 let if_not_exists = *if_not_exists;
4311
4312 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4313 let compaction_window = options.iter().find_map(|o| {
4314 #[allow(irrefutable_let_patterns)]
4315 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4316 Some(lcw.clone())
4317 } else {
4318 None
4319 }
4320 });
4321
4322 Ok(Plan::CreateIndex(CreateIndexPlan {
4323 name: index_name,
4324 index: Index {
4325 create_sql,
4326 on: on.global_id(),
4327 keys,
4328 cluster_id,
4329 compaction_window,
4330 },
4331 if_not_exists,
4332 }))
4333}
4334
4335pub fn describe_create_type(
4336 _: &StatementContext,
4337 _: CreateTypeStatement<Aug>,
4338) -> Result<StatementDesc, PlanError> {
4339 Ok(StatementDesc::new(None))
4340}
4341
4342pub fn plan_create_type(
4343 scx: &StatementContext,
4344 stmt: CreateTypeStatement<Aug>,
4345) -> Result<Plan, PlanError> {
4346 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4347 let CreateTypeStatement { name, as_type, .. } = stmt;
4348
4349 fn validate_data_type(
4350 scx: &StatementContext,
4351 data_type: ResolvedDataType,
4352 as_type: &str,
4353 key: &str,
4354 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4355 let (id, modifiers) = match data_type {
4356 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4357 _ => sql_bail!(
4358 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4359 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4360 as_type,
4361 key,
4362 data_type.human_readable_name(),
4363 ),
4364 };
4365
4366 let item = scx.catalog.get_item(&id);
4367 match item.type_details() {
4368 None => sql_bail!(
4369 "{} must be of class type, but received {} which is of class {}",
4370 key,
4371 scx.catalog.resolve_full_name(item.name()),
4372 item.item_type()
4373 ),
4374 Some(CatalogTypeDetails {
4375 typ: CatalogType::Char,
4376 ..
4377 }) => {
4378 bail_unsupported!("embedding char type in a list or map")
4379 }
4380 _ => {
4381 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4383
4384 Ok((id, modifiers))
4385 }
4386 }
4387 }
4388
4389 let inner = match as_type {
4390 CreateTypeAs::List { options } => {
4391 let CreateTypeListOptionExtracted {
4392 element_type,
4393 seen: _,
4394 } = CreateTypeListOptionExtracted::try_from(options)?;
4395 let element_type =
4396 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4397 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4398 CatalogType::List {
4399 element_reference: id,
4400 element_modifiers: modifiers,
4401 }
4402 }
4403 CreateTypeAs::Map { options } => {
4404 let CreateTypeMapOptionExtracted {
4405 key_type,
4406 value_type,
4407 seen: _,
4408 } = CreateTypeMapOptionExtracted::try_from(options)?;
4409 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4410 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4411 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4412 let (value_id, value_modifiers) =
4413 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4414 CatalogType::Map {
4415 key_reference: key_id,
4416 key_modifiers,
4417 value_reference: value_id,
4418 value_modifiers,
4419 }
4420 }
4421 CreateTypeAs::Record { column_defs } => {
4422 let mut fields = vec![];
4423 for column_def in column_defs {
4424 let data_type = column_def.data_type;
4425 let key = ident(column_def.name.clone());
4426 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4427 fields.push(CatalogRecordField {
4428 name: ColumnName::from(key.clone()),
4429 type_reference: id,
4430 type_modifiers: modifiers,
4431 });
4432 }
4433 CatalogType::Record { fields }
4434 }
4435 };
4436
4437 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4438
4439 let full_name = scx.catalog.resolve_full_name(&name);
4441 let partial_name = PartialItemName::from(full_name.clone());
4442 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4445 if item.item_type().conflicts_with_type() {
4446 return Err(PlanError::ItemAlreadyExists {
4447 name: full_name.to_string(),
4448 item_type: item.item_type(),
4449 });
4450 }
4451 }
4452
4453 Ok(Plan::CreateType(CreateTypePlan {
4454 name,
4455 typ: Type { create_sql, inner },
4456 }))
4457}
4458
4459generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4460
4461generate_extracted_config!(
4462 CreateTypeMapOption,
4463 (KeyType, ResolvedDataType),
4464 (ValueType, ResolvedDataType)
4465);
4466
4467#[derive(Debug)]
4468pub enum PlannedAlterRoleOption {
4469 Attributes(PlannedRoleAttributes),
4470 Variable(PlannedRoleVariable),
4471}
4472
4473#[derive(Debug, Clone)]
4474pub struct PlannedRoleAttributes {
4475 pub inherit: Option<bool>,
4476 pub password: Option<Password>,
4477 pub scram_iterations: Option<NonZeroU32>,
4478 pub nopassword: Option<bool>,
4482 pub superuser: Option<bool>,
4483 pub login: Option<bool>,
4484}
4485
4486fn plan_role_attributes(
4487 options: Vec<RoleAttribute>,
4488 scx: &StatementContext,
4489) -> Result<PlannedRoleAttributes, PlanError> {
4490 let mut planned_attributes = PlannedRoleAttributes {
4491 inherit: None,
4492 password: None,
4493 scram_iterations: None,
4494 superuser: None,
4495 login: None,
4496 nopassword: None,
4497 };
4498
4499 for option in options {
4500 match option {
4501 RoleAttribute::Inherit | RoleAttribute::NoInherit
4502 if planned_attributes.inherit.is_some() =>
4503 {
4504 sql_bail!("conflicting or redundant options");
4505 }
4506 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4507 bail_never_supported!(
4508 "CREATECLUSTER attribute",
4509 "sql/create-role/#details",
4510 "Use system privileges instead."
4511 );
4512 }
4513 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4514 bail_never_supported!(
4515 "CREATEDB attribute",
4516 "sql/create-role/#details",
4517 "Use system privileges instead."
4518 );
4519 }
4520 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4521 bail_never_supported!(
4522 "CREATEROLE attribute",
4523 "sql/create-role/#details",
4524 "Use system privileges instead."
4525 );
4526 }
4527 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4528 sql_bail!("conflicting or redundant options");
4529 }
4530
4531 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4532 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4533 RoleAttribute::Password(password) => {
4534 if let Some(password) = password {
4535 planned_attributes.password = Some(password.into());
4536 planned_attributes.scram_iterations =
4537 Some(scx.catalog.system_vars().scram_iterations())
4538 } else {
4539 planned_attributes.nopassword = Some(true);
4540 }
4541 }
4542 RoleAttribute::SuperUser => {
4543 if planned_attributes.superuser == Some(false) {
4544 sql_bail!("conflicting or redundant options");
4545 }
4546 planned_attributes.superuser = Some(true);
4547 }
4548 RoleAttribute::NoSuperUser => {
4549 if planned_attributes.superuser == Some(true) {
4550 sql_bail!("conflicting or redundant options");
4551 }
4552 planned_attributes.superuser = Some(false);
4553 }
4554 RoleAttribute::Login => {
4555 if planned_attributes.login == Some(false) {
4556 sql_bail!("conflicting or redundant options");
4557 }
4558 planned_attributes.login = Some(true);
4559 }
4560 RoleAttribute::NoLogin => {
4561 if planned_attributes.login == Some(true) {
4562 sql_bail!("conflicting or redundant options");
4563 }
4564 planned_attributes.login = Some(false);
4565 }
4566 }
4567 }
4568 if planned_attributes.inherit == Some(false) {
4569 bail_unsupported!("non inherit roles");
4570 }
4571
4572 Ok(planned_attributes)
4573}
4574
4575#[derive(Debug)]
4576pub enum PlannedRoleVariable {
4577 Set { name: String, value: VariableValue },
4578 Reset { name: String },
4579}
4580
4581impl PlannedRoleVariable {
4582 pub fn name(&self) -> &str {
4583 match self {
4584 PlannedRoleVariable::Set { name, .. } => name,
4585 PlannedRoleVariable::Reset { name } => name,
4586 }
4587 }
4588}
4589
4590fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4591 let plan = match variable {
4592 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4593 name: name.to_string(),
4594 value: scl::plan_set_variable_to(value)?,
4595 },
4596 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4597 name: name.to_string(),
4598 },
4599 };
4600 Ok(plan)
4601}
4602
4603pub fn describe_create_role(
4604 _: &StatementContext,
4605 _: CreateRoleStatement,
4606) -> Result<StatementDesc, PlanError> {
4607 Ok(StatementDesc::new(None))
4608}
4609
4610pub fn plan_create_role(
4611 scx: &StatementContext,
4612 CreateRoleStatement { name, options }: CreateRoleStatement,
4613) -> Result<Plan, PlanError> {
4614 let attributes = plan_role_attributes(options, scx)?;
4615 Ok(Plan::CreateRole(CreateRolePlan {
4616 name: normalize::ident(name),
4617 attributes: attributes.into(),
4618 }))
4619}
4620
4621pub fn plan_create_network_policy(
4622 ctx: &StatementContext,
4623 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4624) -> Result<Plan, PlanError> {
4625 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4626 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4627
4628 let Some(rule_defs) = policy_options.rules else {
4629 sql_bail!("RULES must be specified when creating network policies.");
4630 };
4631
4632 let mut rules = vec![];
4633 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4634 let NetworkPolicyRuleOptionExtracted {
4635 seen: _,
4636 direction,
4637 action,
4638 address,
4639 } = options.try_into()?;
4640 let (direction, action, address) = match (direction, action, address) {
4641 (Some(direction), Some(action), Some(address)) => (
4642 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4643 NetworkPolicyRuleAction::try_from(action.as_str())?,
4644 PolicyAddress::try_from(address.as_str())?,
4645 ),
4646 (_, _, _) => {
4647 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4648 }
4649 };
4650 rules.push(NetworkPolicyRule {
4651 name: normalize::ident(name),
4652 direction,
4653 action,
4654 address,
4655 });
4656 }
4657
4658 if rules.len()
4659 > ctx
4660 .catalog
4661 .system_vars()
4662 .max_rules_per_network_policy()
4663 .try_into()?
4664 {
4665 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4666 }
4667
4668 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4669 name: normalize::ident(name),
4670 rules,
4671 }))
4672}
4673
4674pub fn plan_alter_network_policy(
4675 ctx: &StatementContext,
4676 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4677) -> Result<Plan, PlanError> {
4678 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4679
4680 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4681 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4682
4683 let Some(rule_defs) = policy_options.rules else {
4684 sql_bail!("RULES must be specified when creating network policies.");
4685 };
4686
4687 let mut rules = vec![];
4688 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4689 let NetworkPolicyRuleOptionExtracted {
4690 seen: _,
4691 direction,
4692 action,
4693 address,
4694 } = options.try_into()?;
4695
4696 let (direction, action, address) = match (direction, action, address) {
4697 (Some(direction), Some(action), Some(address)) => (
4698 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4699 NetworkPolicyRuleAction::try_from(action.as_str())?,
4700 PolicyAddress::try_from(address.as_str())?,
4701 ),
4702 (_, _, _) => {
4703 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4704 }
4705 };
4706 rules.push(NetworkPolicyRule {
4707 name: normalize::ident(name),
4708 direction,
4709 action,
4710 address,
4711 });
4712 }
4713 if rules.len()
4714 > ctx
4715 .catalog
4716 .system_vars()
4717 .max_rules_per_network_policy()
4718 .try_into()?
4719 {
4720 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4721 }
4722
4723 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4724 id: policy.id(),
4725 name: normalize::ident(name),
4726 rules,
4727 }))
4728}
4729
4730pub fn describe_create_cluster(
4731 _: &StatementContext,
4732 _: CreateClusterStatement<Aug>,
4733) -> Result<StatementDesc, PlanError> {
4734 Ok(StatementDesc::new(None))
4735}
4736
4737generate_extracted_config!(
4743 ClusterOption,
4744 (AvailabilityZones, Vec<String>),
4745 (Disk, bool),
4746 (IntrospectionDebugging, bool),
4747 (IntrospectionInterval, OptionalDuration),
4748 (Managed, bool),
4749 (Replicas, Vec<ReplicaDefinition<Aug>>),
4750 (ReplicationFactor, u32),
4751 (Size, String),
4752 (Schedule, ClusterScheduleOptionValue),
4753 (WorkloadClass, OptionalString)
4754);
4755
4756generate_extracted_config!(
4757 NetworkPolicyOption,
4758 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4759);
4760
4761generate_extracted_config!(
4762 NetworkPolicyRuleOption,
4763 (Direction, String),
4764 (Action, String),
4765 (Address, String)
4766);
4767
4768generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4769
4770generate_extracted_config!(
4771 ClusterAlterUntilReadyOption,
4772 (Timeout, Duration),
4773 (OnTimeout, String)
4774);
4775
4776generate_extracted_config!(
4777 ClusterFeature,
4778 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4779 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4780 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4781 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4782 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4783 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4784 (
4785 EnableProjectionPushdownAfterRelationCse,
4786 Option<bool>,
4787 Default(None)
4788 )
4789);
4790
4791pub fn plan_create_cluster(
4795 scx: &StatementContext,
4796 stmt: CreateClusterStatement<Aug>,
4797) -> Result<Plan, PlanError> {
4798 let plan = plan_create_cluster_inner(scx, stmt)?;
4799
4800 if let CreateClusterVariant::Managed(_) = &plan.variant {
4802 let stmt = unplan_create_cluster(scx, plan.clone())
4803 .map_err(|e| PlanError::Replan(e.to_string()))?;
4804 let create_sql = stmt.to_ast_string_stable();
4805 let stmt = parse::parse(&create_sql)
4806 .map_err(|e| PlanError::Replan(e.to_string()))?
4807 .into_element()
4808 .ast;
4809 let (stmt, _resolved_ids) =
4810 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4811 let stmt = match stmt {
4812 Statement::CreateCluster(stmt) => stmt,
4813 stmt => {
4814 return Err(PlanError::Replan(format!(
4815 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4816 )));
4817 }
4818 };
4819 let replan =
4820 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4821 if plan != replan {
4822 return Err(PlanError::Replan(format!(
4823 "replan does not match: plan={plan:?}, replan={replan:?}"
4824 )));
4825 }
4826 }
4827
4828 Ok(Plan::CreateCluster(plan))
4829}
4830
4831pub fn plan_create_cluster_inner(
4832 scx: &StatementContext,
4833 CreateClusterStatement {
4834 name,
4835 options,
4836 features,
4837 }: CreateClusterStatement<Aug>,
4838) -> Result<CreateClusterPlan, PlanError> {
4839 let ClusterOptionExtracted {
4840 availability_zones,
4841 introspection_debugging,
4842 introspection_interval,
4843 managed,
4844 replicas,
4845 replication_factor,
4846 seen: _,
4847 size,
4848 disk,
4849 schedule,
4850 workload_class,
4851 }: ClusterOptionExtracted = options.try_into()?;
4852
4853 let managed = managed.unwrap_or_else(|| replicas.is_none());
4854
4855 if !scx.catalog.active_role_id().is_system() {
4856 if !features.is_empty() {
4857 sql_bail!("FEATURES not supported for non-system users");
4858 }
4859 if workload_class.is_some() {
4860 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4861 }
4862 }
4863
4864 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4865 let workload_class = workload_class.and_then(|v| v.0);
4866
4867 if managed {
4868 if replicas.is_some() {
4869 sql_bail!("REPLICAS not supported for managed clusters");
4870 }
4871 let Some(size) = size else {
4872 sql_bail!("SIZE must be specified for managed clusters");
4873 };
4874
4875 if disk.is_some() {
4876 if scx.catalog.is_cluster_size_cc(&size) {
4880 sql_bail!(
4881 "DISK option not supported for modern cluster sizes because disk is always enabled"
4882 );
4883 }
4884
4885 scx.catalog
4886 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4887 }
4888
4889 let compute = plan_compute_replica_config(
4890 introspection_interval,
4891 introspection_debugging.unwrap_or(false),
4892 )?;
4893
4894 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4895 replication_factor.unwrap_or_else(|| {
4896 scx.catalog
4897 .system_vars()
4898 .default_cluster_replication_factor()
4899 })
4900 } else {
4901 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4902 if replication_factor.is_some() {
4903 sql_bail!(
4904 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4905 );
4906 }
4907 0
4911 };
4912 let availability_zones = availability_zones.unwrap_or_default();
4913
4914 if !availability_zones.is_empty() {
4915 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4916 }
4917
4918 let ClusterFeatureExtracted {
4920 reoptimize_imported_views,
4921 enable_eager_delta_joins,
4922 enable_new_outer_join_lowering,
4923 enable_variadic_left_join_lowering,
4924 enable_letrec_fixpoint_analysis,
4925 enable_join_prioritize_arranged,
4926 enable_projection_pushdown_after_relation_cse,
4927 seen: _,
4928 } = ClusterFeatureExtracted::try_from(features)?;
4929 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4930 reoptimize_imported_views,
4931 enable_eager_delta_joins,
4932 enable_new_outer_join_lowering,
4933 enable_variadic_left_join_lowering,
4934 enable_letrec_fixpoint_analysis,
4935 enable_join_prioritize_arranged,
4936 enable_projection_pushdown_after_relation_cse,
4937 ..Default::default()
4938 };
4939
4940 let schedule = plan_cluster_schedule(schedule)?;
4941
4942 Ok(CreateClusterPlan {
4943 name: normalize::ident(name),
4944 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4945 replication_factor,
4946 size,
4947 availability_zones,
4948 compute,
4949 optimizer_feature_overrides,
4950 schedule,
4951 }),
4952 workload_class,
4953 })
4954 } else {
4955 let Some(replica_defs) = replicas else {
4956 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4957 };
4958 if availability_zones.is_some() {
4959 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4960 }
4961 if replication_factor.is_some() {
4962 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4963 }
4964 if introspection_debugging.is_some() {
4965 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4966 }
4967 if introspection_interval.is_some() {
4968 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4969 }
4970 if size.is_some() {
4971 sql_bail!("SIZE not supported for unmanaged clusters");
4972 }
4973 if disk.is_some() {
4974 sql_bail!("DISK not supported for unmanaged clusters");
4975 }
4976 if !features.is_empty() {
4977 sql_bail!("FEATURES not supported for unmanaged clusters");
4978 }
4979 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4980 sql_bail!(
4981 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4982 );
4983 }
4984
4985 let mut replicas = vec![];
4986 for ReplicaDefinition { name, options } in replica_defs {
4987 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4988 }
4989
4990 Ok(CreateClusterPlan {
4991 name: normalize::ident(name),
4992 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4993 workload_class,
4994 })
4995 }
4996}
4997
4998pub fn unplan_create_cluster(
5002 scx: &StatementContext,
5003 CreateClusterPlan {
5004 name,
5005 variant,
5006 workload_class,
5007 }: CreateClusterPlan,
5008) -> Result<CreateClusterStatement<Aug>, PlanError> {
5009 match variant {
5010 CreateClusterVariant::Managed(CreateClusterManagedPlan {
5011 replication_factor,
5012 size,
5013 availability_zones,
5014 compute,
5015 optimizer_feature_overrides,
5016 schedule,
5017 }) => {
5018 let schedule = unplan_cluster_schedule(schedule);
5019 let OptimizerFeatureOverrides {
5020 enable_guard_subquery_tablefunc: _,
5021 enable_consolidate_after_union_negate: _,
5022 enable_reduce_mfp_fusion: _,
5023 enable_cardinality_estimates: _,
5024 persist_fast_path_limit: _,
5025 reoptimize_imported_views,
5026 enable_eager_delta_joins,
5027 enable_new_outer_join_lowering,
5028 enable_variadic_left_join_lowering,
5029 enable_letrec_fixpoint_analysis,
5030 enable_reduce_reduction: _,
5031 enable_join_prioritize_arranged,
5032 enable_projection_pushdown_after_relation_cse,
5033 enable_less_reduce_in_eqprop: _,
5034 enable_dequadratic_eqprop_map: _,
5035 enable_eq_classes_withholding_errors: _,
5036 enable_fast_path_plan_insights: _,
5037 enable_cast_elimination: _,
5038 } = optimizer_feature_overrides;
5039 let features_extracted = ClusterFeatureExtracted {
5041 seen: Default::default(),
5043 reoptimize_imported_views,
5044 enable_eager_delta_joins,
5045 enable_new_outer_join_lowering,
5046 enable_variadic_left_join_lowering,
5047 enable_letrec_fixpoint_analysis,
5048 enable_join_prioritize_arranged,
5049 enable_projection_pushdown_after_relation_cse,
5050 };
5051 let features = features_extracted.into_values(scx.catalog);
5052 let availability_zones = if availability_zones.is_empty() {
5053 None
5054 } else {
5055 Some(availability_zones)
5056 };
5057 let (introspection_interval, introspection_debugging) =
5058 unplan_compute_replica_config(compute);
5059 let replication_factor = match &schedule {
5062 ClusterScheduleOptionValue::Manual => Some(replication_factor),
5063 ClusterScheduleOptionValue::Refresh { .. } => {
5064 assert!(
5065 replication_factor <= 1,
5066 "replication factor, {replication_factor:?}, must be <= 1"
5067 );
5068 None
5069 }
5070 };
5071 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5072 let options_extracted = ClusterOptionExtracted {
5073 seen: Default::default(),
5075 availability_zones,
5076 disk: None,
5077 introspection_debugging: Some(introspection_debugging),
5078 introspection_interval,
5079 managed: Some(true),
5080 replicas: None,
5081 replication_factor,
5082 size: Some(size),
5083 schedule: Some(schedule),
5084 workload_class,
5085 };
5086 let options = options_extracted.into_values(scx.catalog);
5087 let name = Ident::new_unchecked(name);
5088 Ok(CreateClusterStatement {
5089 name,
5090 options,
5091 features,
5092 })
5093 }
5094 CreateClusterVariant::Unmanaged(_) => {
5095 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5096 }
5097 }
5098}
5099
5100generate_extracted_config!(
5101 ReplicaOption,
5102 (AvailabilityZone, String),
5103 (BilledAs, String),
5104 (ComputeAddresses, Vec<String>),
5105 (ComputectlAddresses, Vec<String>),
5106 (Disk, bool),
5107 (Internal, bool, Default(false)),
5108 (IntrospectionDebugging, bool, Default(false)),
5109 (IntrospectionInterval, OptionalDuration),
5110 (Size, String),
5111 (StorageAddresses, Vec<String>),
5112 (StoragectlAddresses, Vec<String>),
5113 (Workers, u16)
5114);
5115
5116fn plan_replica_config(
5117 scx: &StatementContext,
5118 options: Vec<ReplicaOption<Aug>>,
5119) -> Result<ReplicaConfig, PlanError> {
5120 let ReplicaOptionExtracted {
5121 availability_zone,
5122 billed_as,
5123 computectl_addresses,
5124 disk,
5125 internal,
5126 introspection_debugging,
5127 introspection_interval,
5128 size,
5129 storagectl_addresses,
5130 ..
5131 }: ReplicaOptionExtracted = options.try_into()?;
5132
5133 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5134
5135 match (
5136 size,
5137 availability_zone,
5138 billed_as,
5139 storagectl_addresses,
5140 computectl_addresses,
5141 ) {
5142 (None, _, None, None, None) => {
5144 sql_bail!("SIZE option must be specified");
5147 }
5148 (Some(size), availability_zone, billed_as, None, None) => {
5149 if disk.is_some() {
5150 if scx.catalog.is_cluster_size_cc(&size) {
5154 sql_bail!(
5155 "DISK option not supported for modern cluster sizes because disk is always enabled"
5156 );
5157 }
5158
5159 scx.catalog
5160 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5161 }
5162
5163 Ok(ReplicaConfig::Orchestrated {
5164 size,
5165 availability_zone,
5166 compute,
5167 billed_as,
5168 internal,
5169 })
5170 }
5171
5172 (None, None, None, storagectl_addresses, computectl_addresses) => {
5173 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5174
5175 let Some(storagectl_addrs) = storagectl_addresses else {
5179 sql_bail!("missing STORAGECTL ADDRESSES option");
5180 };
5181 let Some(computectl_addrs) = computectl_addresses else {
5182 sql_bail!("missing COMPUTECTL ADDRESSES option");
5183 };
5184
5185 if storagectl_addrs.len() != computectl_addrs.len() {
5186 sql_bail!(
5187 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5188 );
5189 }
5190
5191 if disk.is_some() {
5192 sql_bail!("DISK can't be specified for unorchestrated clusters");
5193 }
5194
5195 Ok(ReplicaConfig::Unorchestrated {
5196 storagectl_addrs,
5197 computectl_addrs,
5198 compute,
5199 })
5200 }
5201 _ => {
5202 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5205 }
5206 }
5207}
5208
5209fn plan_compute_replica_config(
5213 introspection_interval: Option<OptionalDuration>,
5214 introspection_debugging: bool,
5215) -> Result<ComputeReplicaConfig, PlanError> {
5216 let introspection_interval = introspection_interval
5217 .map(|OptionalDuration(i)| i)
5218 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5219 let introspection = match introspection_interval {
5220 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5221 interval,
5222 debugging: introspection_debugging,
5223 }),
5224 None if introspection_debugging => {
5225 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5226 }
5227 None => None,
5228 };
5229 let compute = ComputeReplicaConfig { introspection };
5230 Ok(compute)
5231}
5232
5233fn unplan_compute_replica_config(
5237 compute_replica_config: ComputeReplicaConfig,
5238) -> (Option<OptionalDuration>, bool) {
5239 match compute_replica_config.introspection {
5240 Some(ComputeReplicaIntrospectionConfig {
5241 debugging,
5242 interval,
5243 }) => (Some(OptionalDuration(Some(interval))), debugging),
5244 None => (Some(OptionalDuration(None)), false),
5245 }
5246}
5247
5248fn plan_cluster_schedule(
5252 schedule: ClusterScheduleOptionValue,
5253) -> Result<ClusterSchedule, PlanError> {
5254 Ok(match schedule {
5255 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5256 ClusterScheduleOptionValue::Refresh {
5258 hydration_time_estimate: None,
5259 } => ClusterSchedule::Refresh {
5260 hydration_time_estimate: Duration::from_millis(0),
5261 },
5262 ClusterScheduleOptionValue::Refresh {
5264 hydration_time_estimate: Some(interval_value),
5265 } => {
5266 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5267 if interval.as_microseconds() < 0 {
5268 sql_bail!(
5269 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5270 interval
5271 );
5272 }
5273 if interval.months != 0 {
5274 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5278 }
5279 let duration = interval.duration()?;
5280 if u64::try_from(duration.as_millis()).is_err()
5281 || Interval::from_duration(&duration).is_err()
5282 {
5283 sql_bail!("HYDRATION TIME ESTIMATE too large");
5284 }
5285 ClusterSchedule::Refresh {
5286 hydration_time_estimate: duration,
5287 }
5288 }
5289 })
5290}
5291
5292fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5296 match schedule {
5297 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5298 ClusterSchedule::Refresh {
5299 hydration_time_estimate,
5300 } => {
5301 let interval = Interval::from_duration(&hydration_time_estimate)
5302 .expect("planning ensured that this is convertible back to Interval");
5303 let interval_value = literal::unplan_interval(&interval);
5304 ClusterScheduleOptionValue::Refresh {
5305 hydration_time_estimate: Some(interval_value),
5306 }
5307 }
5308 }
5309}
5310
5311pub fn describe_create_cluster_replica(
5312 _: &StatementContext,
5313 _: CreateClusterReplicaStatement<Aug>,
5314) -> Result<StatementDesc, PlanError> {
5315 Ok(StatementDesc::new(None))
5316}
5317
5318pub fn plan_create_cluster_replica(
5319 scx: &StatementContext,
5320 CreateClusterReplicaStatement {
5321 definition: ReplicaDefinition { name, options },
5322 of_cluster,
5323 }: CreateClusterReplicaStatement<Aug>,
5324) -> Result<Plan, PlanError> {
5325 let cluster = scx
5326 .catalog
5327 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5328 let current_replica_count = cluster.replica_ids().iter().count();
5329 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5330 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5331 return Err(PlanError::CreateReplicaFailStorageObjects {
5332 current_replica_count,
5333 internal_replica_count,
5334 hypothetical_replica_count: current_replica_count + 1,
5335 });
5336 }
5337
5338 let config = plan_replica_config(scx, options)?;
5339
5340 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5341 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5342 return Err(PlanError::MangedReplicaName(name.into_string()));
5343 }
5344 } else {
5345 ensure_cluster_is_not_managed(scx, cluster.id())?;
5346 }
5347
5348 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5349 name: normalize::ident(name),
5350 cluster_id: cluster.id(),
5351 config,
5352 }))
5353}
5354
5355pub fn describe_create_secret(
5356 _: &StatementContext,
5357 _: CreateSecretStatement<Aug>,
5358) -> Result<StatementDesc, PlanError> {
5359 Ok(StatementDesc::new(None))
5360}
5361
5362pub fn plan_create_secret(
5363 scx: &StatementContext,
5364 stmt: CreateSecretStatement<Aug>,
5365) -> Result<Plan, PlanError> {
5366 let CreateSecretStatement {
5367 name,
5368 if_not_exists,
5369 value,
5370 } = &stmt;
5371
5372 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5373 let mut create_sql_statement = stmt.clone();
5374 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5375 let create_sql =
5376 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5377 let secret_as = query::plan_secret_as(scx, value.clone())?;
5378
5379 let secret = Secret {
5380 create_sql,
5381 secret_as,
5382 };
5383
5384 Ok(Plan::CreateSecret(CreateSecretPlan {
5385 name,
5386 secret,
5387 if_not_exists: *if_not_exists,
5388 }))
5389}
5390
5391pub fn describe_create_connection(
5392 _: &StatementContext,
5393 _: CreateConnectionStatement<Aug>,
5394) -> Result<StatementDesc, PlanError> {
5395 Ok(StatementDesc::new(None))
5396}
5397
5398generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5399
5400pub fn plan_create_connection(
5401 scx: &StatementContext,
5402 mut stmt: CreateConnectionStatement<Aug>,
5403) -> Result<Plan, PlanError> {
5404 let CreateConnectionStatement {
5405 name,
5406 connection_type,
5407 values,
5408 if_not_exists,
5409 with_options,
5410 } = stmt.clone();
5411 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5412 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5413 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5414
5415 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5416 if options.validate.is_some() {
5417 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5418 }
5419 let validate = match options.validate {
5420 Some(val) => val,
5421 None => {
5422 scx.catalog
5423 .system_vars()
5424 .enable_default_connection_validation()
5425 && details.to_connection().validate_by_default()
5426 }
5427 };
5428
5429 let full_name = scx.catalog.resolve_full_name(&name);
5431 let partial_name = PartialItemName::from(full_name.clone());
5432 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5433 return Err(PlanError::ItemAlreadyExists {
5434 name: full_name.to_string(),
5435 item_type: item.item_type(),
5436 });
5437 }
5438
5439 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5442 stmt.values.retain(|v| {
5443 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5444 });
5445 stmt.values.push(ConnectionOption {
5446 name: ConnectionOptionName::PublicKey1,
5447 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5448 });
5449 stmt.values.push(ConnectionOption {
5450 name: ConnectionOptionName::PublicKey2,
5451 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5452 });
5453 }
5454 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5455
5456 let plan = CreateConnectionPlan {
5457 name,
5458 if_not_exists,
5459 connection: crate::plan::Connection {
5460 create_sql,
5461 details,
5462 },
5463 validate,
5464 };
5465 Ok(Plan::CreateConnection(plan))
5466}
5467
5468fn plan_drop_database(
5469 scx: &StatementContext,
5470 if_exists: bool,
5471 name: &UnresolvedDatabaseName,
5472 cascade: bool,
5473) -> Result<Option<DatabaseId>, PlanError> {
5474 Ok(match resolve_database(scx, name, if_exists)? {
5475 Some(database) => {
5476 if !cascade && database.has_schemas() {
5477 sql_bail!(
5478 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5479 name,
5480 );
5481 }
5482 Some(database.id())
5483 }
5484 None => None,
5485 })
5486}
5487
5488pub fn describe_drop_objects(
5489 _: &StatementContext,
5490 _: DropObjectsStatement,
5491) -> Result<StatementDesc, PlanError> {
5492 Ok(StatementDesc::new(None))
5493}
5494
5495pub fn plan_drop_objects(
5496 scx: &mut StatementContext,
5497 DropObjectsStatement {
5498 object_type,
5499 if_exists,
5500 names,
5501 cascade,
5502 }: DropObjectsStatement,
5503) -> Result<Plan, PlanError> {
5504 assert_ne!(
5505 object_type,
5506 mz_sql_parser::ast::ObjectType::Func,
5507 "rejected in parser"
5508 );
5509 let object_type = object_type.into();
5510
5511 let mut referenced_ids = Vec::new();
5512 for name in names {
5513 let id = match &name {
5514 UnresolvedObjectName::Cluster(name) => {
5515 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5516 }
5517 UnresolvedObjectName::ClusterReplica(name) => {
5518 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5519 }
5520 UnresolvedObjectName::Database(name) => {
5521 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5522 }
5523 UnresolvedObjectName::Schema(name) => {
5524 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5525 }
5526 UnresolvedObjectName::Role(name) => {
5527 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5528 }
5529 UnresolvedObjectName::Item(name) => {
5530 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5531 .map(ObjectId::Item)
5532 }
5533 UnresolvedObjectName::NetworkPolicy(name) => {
5534 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5535 }
5536 };
5537 match id {
5538 Some(id) => referenced_ids.push(id),
5539 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5540 name: name.to_ast_string_simple(),
5541 object_type,
5542 }),
5543 }
5544 }
5545 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5546
5547 Ok(Plan::DropObjects(DropObjectsPlan {
5548 referenced_ids,
5549 drop_ids,
5550 object_type,
5551 }))
5552}
5553
5554fn plan_drop_schema(
5555 scx: &StatementContext,
5556 if_exists: bool,
5557 name: &UnresolvedSchemaName,
5558 cascade: bool,
5559) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5560 let normalized = normalize::unresolved_schema_name(name.clone())?;
5564 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5565 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5566 }
5567
5568 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5569 Some((database_spec, schema_spec)) => {
5570 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5571 sql_bail!(
5572 "cannot drop schema {name} because it is required by the database system",
5573 );
5574 }
5575 if let SchemaSpecifier::Temporary = schema_spec {
5576 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5577 }
5578 let schema = scx.get_schema(&database_spec, &schema_spec);
5579 if !cascade && schema.has_items() {
5580 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5581 sql_bail!(
5582 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5583 full_schema_name
5584 );
5585 }
5586 Some((database_spec, schema_spec))
5587 }
5588 None => None,
5589 })
5590}
5591
5592fn plan_drop_role(
5593 scx: &StatementContext,
5594 if_exists: bool,
5595 name: &Ident,
5596) -> Result<Option<RoleId>, PlanError> {
5597 match scx.catalog.resolve_role(name.as_str()) {
5598 Ok(role) => {
5599 let id = role.id();
5600 if &id == scx.catalog.active_role_id() {
5601 sql_bail!("current role cannot be dropped");
5602 }
5603 for role in scx.catalog.get_roles() {
5604 for (member_id, grantor_id) in role.membership() {
5605 if &id == grantor_id {
5606 let member_role = scx.catalog.get_role(member_id);
5607 sql_bail!(
5608 "cannot drop role {}: still depended up by membership of role {} in role {}",
5609 name.as_str(),
5610 role.name(),
5611 member_role.name()
5612 );
5613 }
5614 }
5615 }
5616 Ok(Some(role.id()))
5617 }
5618 Err(_) if if_exists => Ok(None),
5619 Err(e) => Err(e.into()),
5620 }
5621}
5622
5623fn plan_drop_cluster(
5624 scx: &StatementContext,
5625 if_exists: bool,
5626 name: &Ident,
5627 cascade: bool,
5628) -> Result<Option<ClusterId>, PlanError> {
5629 Ok(match resolve_cluster(scx, name, if_exists)? {
5630 Some(cluster) => {
5631 if !cascade && !cluster.bound_objects().is_empty() {
5632 return Err(PlanError::DependentObjectsStillExist {
5633 object_type: "cluster".to_string(),
5634 object_name: cluster.name().to_string(),
5635 dependents: Vec::new(),
5636 });
5637 }
5638 Some(cluster.id())
5639 }
5640 None => None,
5641 })
5642}
5643
5644fn plan_drop_network_policy(
5645 scx: &StatementContext,
5646 if_exists: bool,
5647 name: &Ident,
5648) -> Result<Option<NetworkPolicyId>, PlanError> {
5649 match scx.catalog.resolve_network_policy(name.as_str()) {
5650 Ok(policy) => {
5651 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5654 Err(PlanError::NetworkPolicyInUse)
5655 } else {
5656 Ok(Some(policy.id()))
5657 }
5658 }
5659 Err(_) if if_exists => Ok(None),
5660 Err(e) => Err(e.into()),
5661 }
5662}
5663
5664fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5667 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5669 false
5670 } else {
5671 cluster.bound_objects().iter().any(|id| {
5673 let item = scx.catalog.get_item(id);
5674 matches!(
5675 item.item_type(),
5676 CatalogItemType::Sink | CatalogItemType::Source
5677 )
5678 })
5679 }
5680}
5681
5682fn plan_drop_cluster_replica(
5683 scx: &StatementContext,
5684 if_exists: bool,
5685 name: &QualifiedReplica,
5686) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5687 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5688 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5689}
5690
5691fn plan_drop_item(
5693 scx: &StatementContext,
5694 object_type: ObjectType,
5695 if_exists: bool,
5696 name: UnresolvedItemName,
5697 cascade: bool,
5698) -> Result<Option<CatalogItemId>, PlanError> {
5699 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5700 Ok(r) => r,
5701 Err(PlanError::MismatchedObjectType {
5703 name,
5704 is_type: ObjectType::MaterializedView,
5705 expected_type: ObjectType::View,
5706 }) => {
5707 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5708 }
5709 e => e?,
5710 };
5711
5712 Ok(match resolved {
5713 Some(catalog_item) => {
5714 if catalog_item.id().is_system() {
5715 sql_bail!(
5716 "cannot drop {} {} because it is required by the database system",
5717 catalog_item.item_type(),
5718 scx.catalog.minimal_qualification(catalog_item.name()),
5719 );
5720 }
5721
5722 if !cascade {
5723 for id in catalog_item.used_by() {
5724 let dep = scx.catalog.get_item(id);
5725 if dependency_prevents_drop(object_type, dep) {
5726 return Err(PlanError::DependentObjectsStillExist {
5727 object_type: catalog_item.item_type().to_string(),
5728 object_name: scx
5729 .catalog
5730 .minimal_qualification(catalog_item.name())
5731 .to_string(),
5732 dependents: vec![(
5733 dep.item_type().to_string(),
5734 scx.catalog.minimal_qualification(dep.name()).to_string(),
5735 )],
5736 });
5737 }
5738 }
5739 }
5742 Some(catalog_item.id())
5743 }
5744 None => None,
5745 })
5746}
5747
5748fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5750 match object_type {
5751 ObjectType::Type => true,
5752 _ => match dep.item_type() {
5753 CatalogItemType::Func
5754 | CatalogItemType::Table
5755 | CatalogItemType::Source
5756 | CatalogItemType::View
5757 | CatalogItemType::MaterializedView
5758 | CatalogItemType::Sink
5759 | CatalogItemType::Type
5760 | CatalogItemType::Secret
5761 | CatalogItemType::Connection
5762 | CatalogItemType::ContinualTask => true,
5763 CatalogItemType::Index => false,
5764 },
5765 }
5766}
5767
5768pub fn describe_alter_index_options(
5769 _: &StatementContext,
5770 _: AlterIndexStatement<Aug>,
5771) -> Result<StatementDesc, PlanError> {
5772 Ok(StatementDesc::new(None))
5773}
5774
5775pub fn describe_drop_owned(
5776 _: &StatementContext,
5777 _: DropOwnedStatement<Aug>,
5778) -> Result<StatementDesc, PlanError> {
5779 Ok(StatementDesc::new(None))
5780}
5781
5782pub fn plan_drop_owned(
5783 scx: &StatementContext,
5784 drop: DropOwnedStatement<Aug>,
5785) -> Result<Plan, PlanError> {
5786 let cascade = drop.cascade();
5787 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5788 let mut drop_ids = Vec::new();
5789 let mut privilege_revokes = Vec::new();
5790 let mut default_privilege_revokes = Vec::new();
5791
5792 fn update_privilege_revokes(
5793 object_id: SystemObjectId,
5794 privileges: &PrivilegeMap,
5795 role_ids: &BTreeSet<RoleId>,
5796 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5797 ) {
5798 privilege_revokes.extend(iter::zip(
5799 iter::repeat(object_id),
5800 privileges
5801 .all_values()
5802 .filter(|privilege| role_ids.contains(&privilege.grantee))
5803 .cloned(),
5804 ));
5805 }
5806
5807 for replica in scx.catalog.get_cluster_replicas() {
5809 if role_ids.contains(&replica.owner_id()) {
5810 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5811 }
5812 }
5813
5814 for cluster in scx.catalog.get_clusters() {
5816 if role_ids.contains(&cluster.owner_id()) {
5817 if !cascade {
5819 let non_owned_bound_objects: Vec<_> = cluster
5820 .bound_objects()
5821 .into_iter()
5822 .map(|item_id| scx.catalog.get_item(item_id))
5823 .filter(|item| !role_ids.contains(&item.owner_id()))
5824 .collect();
5825 if !non_owned_bound_objects.is_empty() {
5826 let names: Vec<_> = non_owned_bound_objects
5827 .into_iter()
5828 .map(|item| {
5829 (
5830 item.item_type().to_string(),
5831 scx.catalog.resolve_full_name(item.name()).to_string(),
5832 )
5833 })
5834 .collect();
5835 return Err(PlanError::DependentObjectsStillExist {
5836 object_type: "cluster".to_string(),
5837 object_name: cluster.name().to_string(),
5838 dependents: names,
5839 });
5840 }
5841 }
5842 drop_ids.push(cluster.id().into());
5843 }
5844 update_privilege_revokes(
5845 SystemObjectId::Object(cluster.id().into()),
5846 cluster.privileges(),
5847 &role_ids,
5848 &mut privilege_revokes,
5849 );
5850 }
5851
5852 for item in scx.catalog.get_items() {
5854 if role_ids.contains(&item.owner_id()) {
5855 if !cascade {
5856 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5858 let non_owned_dependencies: Vec<_> = used_by
5859 .into_iter()
5860 .map(|item_id| scx.catalog.get_item(item_id))
5861 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5862 .filter(|item| !role_ids.contains(&item.owner_id()))
5863 .collect();
5864 if !non_owned_dependencies.is_empty() {
5865 let names: Vec<_> = non_owned_dependencies
5866 .into_iter()
5867 .map(|item| {
5868 let item_typ = item.item_type().to_string();
5869 let item_name =
5870 scx.catalog.resolve_full_name(item.name()).to_string();
5871 (item_typ, item_name)
5872 })
5873 .collect();
5874 Err(PlanError::DependentObjectsStillExist {
5875 object_type: item.item_type().to_string(),
5876 object_name: scx
5877 .catalog
5878 .resolve_full_name(item.name())
5879 .to_string()
5880 .to_string(),
5881 dependents: names,
5882 })
5883 } else {
5884 Ok(())
5885 }
5886 };
5887
5888 if let Some(id) = item.progress_id() {
5891 let progress_item = scx.catalog.get_item(&id);
5892 check_if_dependents_exist(progress_item.used_by())?;
5893 }
5894 check_if_dependents_exist(item.used_by())?;
5895 }
5896 drop_ids.push(item.id().into());
5897 }
5898 update_privilege_revokes(
5899 SystemObjectId::Object(item.id().into()),
5900 item.privileges(),
5901 &role_ids,
5902 &mut privilege_revokes,
5903 );
5904 }
5905
5906 for schema in scx.catalog.get_schemas() {
5908 if !schema.id().is_temporary() {
5909 if role_ids.contains(&schema.owner_id()) {
5910 if !cascade {
5911 let non_owned_dependencies: Vec<_> = schema
5912 .item_ids()
5913 .map(|item_id| scx.catalog.get_item(&item_id))
5914 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5915 .filter(|item| !role_ids.contains(&item.owner_id()))
5916 .collect();
5917 if !non_owned_dependencies.is_empty() {
5918 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5919 sql_bail!(
5920 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5921 full_schema_name.to_string().quoted()
5922 );
5923 }
5924 }
5925 drop_ids.push((*schema.database(), *schema.id()).into())
5926 }
5927 update_privilege_revokes(
5928 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5929 schema.privileges(),
5930 &role_ids,
5931 &mut privilege_revokes,
5932 );
5933 }
5934 }
5935
5936 for database in scx.catalog.get_databases() {
5938 if role_ids.contains(&database.owner_id()) {
5939 if !cascade {
5940 let non_owned_schemas: Vec<_> = database
5941 .schemas()
5942 .into_iter()
5943 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5944 .collect();
5945 if !non_owned_schemas.is_empty() {
5946 sql_bail!(
5947 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5948 database.name().quoted(),
5949 );
5950 }
5951 }
5952 drop_ids.push(database.id().into());
5953 }
5954 update_privilege_revokes(
5955 SystemObjectId::Object(database.id().into()),
5956 database.privileges(),
5957 &role_ids,
5958 &mut privilege_revokes,
5959 );
5960 }
5961
5962 update_privilege_revokes(
5964 SystemObjectId::System,
5965 scx.catalog.get_system_privileges(),
5966 &role_ids,
5967 &mut privilege_revokes,
5968 );
5969
5970 for (default_privilege_object, default_privilege_acl_items) in
5971 scx.catalog.get_default_privileges()
5972 {
5973 for default_privilege_acl_item in default_privilege_acl_items {
5974 if role_ids.contains(&default_privilege_object.role_id)
5975 || role_ids.contains(&default_privilege_acl_item.grantee)
5976 {
5977 default_privilege_revokes.push((
5978 default_privilege_object.clone(),
5979 default_privilege_acl_item.clone(),
5980 ));
5981 }
5982 }
5983 }
5984
5985 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5986
5987 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5988 if !system_ids.is_empty() {
5989 let mut owners = system_ids
5990 .into_iter()
5991 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5992 .collect::<BTreeSet<_>>()
5993 .into_iter()
5994 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5995 sql_bail!(
5996 "cannot drop objects owned by role {} because they are required by the database system",
5997 owners.join(", "),
5998 );
5999 }
6000
6001 Ok(Plan::DropOwned(DropOwnedPlan {
6002 role_ids: role_ids.into_iter().collect(),
6003 drop_ids,
6004 privilege_revokes,
6005 default_privilege_revokes,
6006 }))
6007}
6008
6009fn plan_retain_history_option(
6010 scx: &StatementContext,
6011 retain_history: Option<OptionalDuration>,
6012) -> Result<Option<CompactionWindow>, PlanError> {
6013 if let Some(OptionalDuration(lcw)) = retain_history {
6014 Ok(Some(plan_retain_history(scx, lcw)?))
6015 } else {
6016 Ok(None)
6017 }
6018}
6019
6020fn plan_retain_history(
6026 scx: &StatementContext,
6027 lcw: Option<Duration>,
6028) -> Result<CompactionWindow, PlanError> {
6029 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6030 match lcw {
6031 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6036 option_name: "RETAIN HISTORY".to_string(),
6037 err: Box::new(PlanError::Unstructured(
6038 "internal error: unexpectedly zero".to_string(),
6039 )),
6040 }),
6041 Some(duration) => {
6042 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6045 && scx
6046 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6047 .is_err()
6048 {
6049 return Err(PlanError::RetainHistoryLow {
6050 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6051 });
6052 }
6053 Ok(duration.try_into()?)
6054 }
6055 None => {
6058 if scx
6059 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6060 .is_err()
6061 {
6062 Err(PlanError::RetainHistoryRequired)
6063 } else {
6064 Ok(CompactionWindow::DisableCompaction)
6065 }
6066 }
6067 }
6068}
6069
6070generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6071
6072fn plan_index_options(
6073 scx: &StatementContext,
6074 with_opts: Vec<IndexOption<Aug>>,
6075) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6076 if !with_opts.is_empty() {
6077 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6079 }
6080
6081 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6082
6083 let mut out = Vec::with_capacity(1);
6084 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6085 out.push(crate::plan::IndexOption::RetainHistory(cw));
6086 }
6087 Ok(out)
6088}
6089
6090generate_extracted_config!(
6091 TableOption,
6092 (PartitionBy, Vec<Ident>),
6093 (RetainHistory, OptionalDuration),
6094 (RedactedTest, String)
6095);
6096
6097fn plan_table_options(
6098 scx: &StatementContext,
6099 desc: &RelationDesc,
6100 with_opts: Vec<TableOption<Aug>>,
6101) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6102 let TableOptionExtracted {
6103 partition_by,
6104 retain_history,
6105 redacted_test,
6106 ..
6107 }: TableOptionExtracted = with_opts.try_into()?;
6108
6109 if let Some(partition_by) = partition_by {
6110 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6111 check_partition_by(desc, partition_by)?;
6112 }
6113
6114 if redacted_test.is_some() {
6115 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6116 }
6117
6118 let mut out = Vec::with_capacity(1);
6119 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6120 out.push(crate::plan::TableOption::RetainHistory(cw));
6121 }
6122 Ok(out)
6123}
6124
6125pub fn plan_alter_index_options(
6126 scx: &mut StatementContext,
6127 AlterIndexStatement {
6128 index_name,
6129 if_exists,
6130 action,
6131 }: AlterIndexStatement<Aug>,
6132) -> Result<Plan, PlanError> {
6133 let object_type = ObjectType::Index;
6134 match action {
6135 AlterIndexAction::ResetOptions(options) => {
6136 let mut options = options.into_iter();
6137 if let Some(opt) = options.next() {
6138 match opt {
6139 IndexOptionName::RetainHistory => {
6140 if options.next().is_some() {
6141 sql_bail!("RETAIN HISTORY must be only option");
6142 }
6143 return alter_retain_history(
6144 scx,
6145 object_type,
6146 if_exists,
6147 UnresolvedObjectName::Item(index_name),
6148 None,
6149 );
6150 }
6151 }
6152 }
6153 sql_bail!("expected option");
6154 }
6155 AlterIndexAction::SetOptions(options) => {
6156 let mut options = options.into_iter();
6157 if let Some(opt) = options.next() {
6158 match opt.name {
6159 IndexOptionName::RetainHistory => {
6160 if options.next().is_some() {
6161 sql_bail!("RETAIN HISTORY must be only option");
6162 }
6163 return alter_retain_history(
6164 scx,
6165 object_type,
6166 if_exists,
6167 UnresolvedObjectName::Item(index_name),
6168 opt.value,
6169 );
6170 }
6171 }
6172 }
6173 sql_bail!("expected option");
6174 }
6175 }
6176}
6177
6178pub fn describe_alter_cluster_set_options(
6179 _: &StatementContext,
6180 _: AlterClusterStatement<Aug>,
6181) -> Result<StatementDesc, PlanError> {
6182 Ok(StatementDesc::new(None))
6183}
6184
6185pub fn plan_alter_cluster(
6186 scx: &mut StatementContext,
6187 AlterClusterStatement {
6188 name,
6189 action,
6190 if_exists,
6191 }: AlterClusterStatement<Aug>,
6192) -> Result<Plan, PlanError> {
6193 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6194 Some(entry) => entry,
6195 None => {
6196 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6197 name: name.to_ast_string_simple(),
6198 object_type: ObjectType::Cluster,
6199 });
6200
6201 return Ok(Plan::AlterNoop(AlterNoopPlan {
6202 object_type: ObjectType::Cluster,
6203 }));
6204 }
6205 };
6206
6207 let mut options: PlanClusterOption = Default::default();
6208 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6209
6210 match action {
6211 AlterClusterAction::SetOptions {
6212 options: set_options,
6213 with_options,
6214 } => {
6215 let ClusterOptionExtracted {
6216 availability_zones,
6217 introspection_debugging,
6218 introspection_interval,
6219 managed,
6220 replicas: replica_defs,
6221 replication_factor,
6222 seen: _,
6223 size,
6224 disk,
6225 schedule,
6226 workload_class,
6227 }: ClusterOptionExtracted = set_options.try_into()?;
6228
6229 if !scx.catalog.active_role_id().is_system() {
6230 if workload_class.is_some() {
6231 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6232 }
6233 }
6234
6235 match managed.unwrap_or_else(|| cluster.is_managed()) {
6236 true => {
6237 let alter_strategy_extracted =
6238 ClusterAlterOptionExtracted::try_from(with_options)?;
6239 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6240
6241 match alter_strategy {
6242 AlterClusterPlanStrategy::None => {}
6243 _ => {
6244 scx.require_feature_flag(
6245 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6246 )?;
6247 }
6248 }
6249
6250 if replica_defs.is_some() {
6251 sql_bail!("REPLICAS not supported for managed clusters");
6252 }
6253 if schedule.is_some()
6254 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6255 {
6256 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6257 }
6258
6259 if let Some(replication_factor) = replication_factor {
6260 if schedule.is_some()
6261 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6262 {
6263 sql_bail!(
6264 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6265 );
6266 }
6267 if let Some(current_schedule) = cluster.schedule() {
6268 if !matches!(current_schedule, ClusterSchedule::Manual) {
6269 sql_bail!(
6270 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6271 );
6272 }
6273 }
6274
6275 let internal_replica_count =
6276 cluster.replicas().iter().filter(|r| r.internal()).count();
6277 let hypothetical_replica_count =
6278 internal_replica_count + usize::cast_from(replication_factor);
6279
6280 if contains_single_replica_objects(scx, cluster)
6283 && hypothetical_replica_count > 1
6284 {
6285 return Err(PlanError::CreateReplicaFailStorageObjects {
6286 current_replica_count: cluster.replica_ids().iter().count(),
6287 internal_replica_count,
6288 hypothetical_replica_count,
6289 });
6290 }
6291 } else if alter_strategy.is_some() {
6292 let internal_replica_count =
6296 cluster.replicas().iter().filter(|r| r.internal()).count();
6297 let hypothetical_replica_count = internal_replica_count * 2;
6298 if contains_single_replica_objects(scx, cluster) {
6299 return Err(PlanError::CreateReplicaFailStorageObjects {
6300 current_replica_count: cluster.replica_ids().iter().count(),
6301 internal_replica_count,
6302 hypothetical_replica_count,
6303 });
6304 }
6305 }
6306 }
6307 false => {
6308 if !alter_strategy.is_none() {
6309 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6310 }
6311 if availability_zones.is_some() {
6312 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6313 }
6314 if replication_factor.is_some() {
6315 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6316 }
6317 if introspection_debugging.is_some() {
6318 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6319 }
6320 if introspection_interval.is_some() {
6321 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6322 }
6323 if size.is_some() {
6324 sql_bail!("SIZE not supported for unmanaged clusters");
6325 }
6326 if disk.is_some() {
6327 sql_bail!("DISK not supported for unmanaged clusters");
6328 }
6329 if schedule.is_some()
6330 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6331 {
6332 sql_bail!(
6333 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6334 );
6335 }
6336 if let Some(current_schedule) = cluster.schedule() {
6337 if !matches!(current_schedule, ClusterSchedule::Manual)
6338 && schedule.is_none()
6339 {
6340 sql_bail!(
6341 "when switching a cluster to unmanaged, if the managed \
6342 cluster's SCHEDULE is anything other than MANUAL, you have to \
6343 explicitly set the SCHEDULE to MANUAL"
6344 );
6345 }
6346 }
6347 }
6348 }
6349
6350 let mut replicas = vec![];
6351 for ReplicaDefinition { name, options } in
6352 replica_defs.into_iter().flat_map(Vec::into_iter)
6353 {
6354 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6355 }
6356
6357 if let Some(managed) = managed {
6358 options.managed = AlterOptionParameter::Set(managed);
6359 }
6360 if let Some(replication_factor) = replication_factor {
6361 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6362 }
6363 if let Some(size) = &size {
6364 options.size = AlterOptionParameter::Set(size.clone());
6365 }
6366 if let Some(availability_zones) = availability_zones {
6367 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6368 }
6369 if let Some(introspection_debugging) = introspection_debugging {
6370 options.introspection_debugging =
6371 AlterOptionParameter::Set(introspection_debugging);
6372 }
6373 if let Some(introspection_interval) = introspection_interval {
6374 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6375 }
6376 if disk.is_some() {
6377 let size = size.as_deref().unwrap_or_else(|| {
6381 cluster.managed_size().expect("cluster known to be managed")
6382 });
6383 if scx.catalog.is_cluster_size_cc(size) {
6384 sql_bail!(
6385 "DISK option not supported for modern cluster sizes because disk is always enabled"
6386 );
6387 }
6388
6389 scx.catalog
6390 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6391 }
6392 if !replicas.is_empty() {
6393 options.replicas = AlterOptionParameter::Set(replicas);
6394 }
6395 if let Some(schedule) = schedule {
6396 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6397 }
6398 if let Some(workload_class) = workload_class {
6399 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6400 }
6401 }
6402 AlterClusterAction::ResetOptions(reset_options) => {
6403 use AlterOptionParameter::Reset;
6404 use ClusterOptionName::*;
6405
6406 if !scx.catalog.active_role_id().is_system() {
6407 if reset_options.contains(&WorkloadClass) {
6408 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6409 }
6410 }
6411
6412 for option in reset_options {
6413 match option {
6414 AvailabilityZones => options.availability_zones = Reset,
6415 Disk => scx
6416 .catalog
6417 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6418 IntrospectionInterval => options.introspection_interval = Reset,
6419 IntrospectionDebugging => options.introspection_debugging = Reset,
6420 Managed => options.managed = Reset,
6421 Replicas => options.replicas = Reset,
6422 ReplicationFactor => options.replication_factor = Reset,
6423 Size => options.size = Reset,
6424 Schedule => options.schedule = Reset,
6425 WorkloadClass => options.workload_class = Reset,
6426 }
6427 }
6428 }
6429 }
6430 Ok(Plan::AlterCluster(AlterClusterPlan {
6431 id: cluster.id(),
6432 name: cluster.name().to_string(),
6433 options,
6434 strategy: alter_strategy,
6435 }))
6436}
6437
6438pub fn describe_alter_set_cluster(
6439 _: &StatementContext,
6440 _: AlterSetClusterStatement<Aug>,
6441) -> Result<StatementDesc, PlanError> {
6442 Ok(StatementDesc::new(None))
6443}
6444
6445pub fn plan_alter_item_set_cluster(
6446 scx: &StatementContext,
6447 AlterSetClusterStatement {
6448 if_exists,
6449 set_cluster: in_cluster_name,
6450 name,
6451 object_type,
6452 }: AlterSetClusterStatement<Aug>,
6453) -> Result<Plan, PlanError> {
6454 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6455
6456 let object_type = object_type.into();
6457
6458 match object_type {
6460 ObjectType::MaterializedView => {}
6461 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6462 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6463 }
6464 _ => {
6465 bail_never_supported!(
6466 format!("ALTER {object_type} SET CLUSTER"),
6467 "sql/alter-set-cluster/",
6468 format!("{object_type} has no associated cluster")
6469 )
6470 }
6471 }
6472
6473 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6474
6475 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6476 Some(entry) => {
6477 let current_cluster = entry.cluster_id();
6478 let Some(current_cluster) = current_cluster else {
6479 sql_bail!("No cluster associated with {name}");
6480 };
6481
6482 if current_cluster == in_cluster.id() {
6483 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6484 } else {
6485 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6486 id: entry.id(),
6487 set_cluster: in_cluster.id(),
6488 }))
6489 }
6490 }
6491 None => {
6492 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6493 name: name.to_ast_string_simple(),
6494 object_type,
6495 });
6496
6497 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6498 }
6499 }
6500}
6501
6502pub fn describe_alter_object_rename(
6503 _: &StatementContext,
6504 _: AlterObjectRenameStatement,
6505) -> Result<StatementDesc, PlanError> {
6506 Ok(StatementDesc::new(None))
6507}
6508
6509pub fn plan_alter_object_rename(
6510 scx: &mut StatementContext,
6511 AlterObjectRenameStatement {
6512 name,
6513 object_type,
6514 to_item_name,
6515 if_exists,
6516 }: AlterObjectRenameStatement,
6517) -> Result<Plan, PlanError> {
6518 let object_type = object_type.into();
6519 match (object_type, name) {
6520 (
6521 ObjectType::View
6522 | ObjectType::MaterializedView
6523 | ObjectType::Table
6524 | ObjectType::Source
6525 | ObjectType::Index
6526 | ObjectType::Sink
6527 | ObjectType::Secret
6528 | ObjectType::Connection,
6529 UnresolvedObjectName::Item(name),
6530 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6531 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6532 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6533 }
6534 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6535 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6536 }
6537 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6538 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6539 }
6540 (object_type, name) => {
6541 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6542 }
6543 }
6544}
6545
6546pub fn plan_alter_schema_rename(
6547 scx: &mut StatementContext,
6548 name: UnresolvedSchemaName,
6549 to_schema_name: Ident,
6550 if_exists: bool,
6551) -> Result<Plan, PlanError> {
6552 let normalized = normalize::unresolved_schema_name(name.clone())?;
6556 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6557 sql_bail!(
6558 "cannot rename schemas in the ambient database: {:?}",
6559 mz_repr::namespaces::MZ_TEMP_SCHEMA
6560 );
6561 }
6562
6563 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6564 let object_type = ObjectType::Schema;
6565 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6566 name: name.to_ast_string_simple(),
6567 object_type,
6568 });
6569 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6570 };
6571
6572 if scx
6574 .resolve_schema_in_database(&db_spec, &to_schema_name)
6575 .is_ok()
6576 {
6577 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6578 to_schema_name.clone().into_string(),
6579 )));
6580 }
6581
6582 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6584 if schema.id().is_system() {
6585 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6586 }
6587
6588 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6589 cur_schema_spec: (db_spec, schema_spec),
6590 new_schema_name: to_schema_name.into_string(),
6591 }))
6592}
6593
6594pub fn plan_alter_schema_swap<F>(
6595 scx: &mut StatementContext,
6596 name_a: UnresolvedSchemaName,
6597 name_b: Ident,
6598 gen_temp_suffix: F,
6599) -> Result<Plan, PlanError>
6600where
6601 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6602{
6603 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6607 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6608 {
6609 sql_bail!("cannot swap schemas that are in the ambient database");
6610 }
6611 let name_b_str = normalize::ident_ref(&name_b);
6613 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6614 sql_bail!("cannot swap schemas that are in the ambient database");
6615 }
6616
6617 let schema_a = scx.resolve_schema(name_a.clone())?;
6618
6619 let db_spec = schema_a.database().clone();
6620 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6621 sql_bail!("cannot swap schemas that are in the ambient database");
6622 };
6623 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6624
6625 if schema_a.id().is_system() || schema_b.id().is_system() {
6627 bail_never_supported!("swapping a system schema".to_string())
6628 }
6629
6630 let check = |temp_suffix: &str| {
6634 let mut temp_name = ident!("mz_schema_swap_");
6635 temp_name.append_lossy(temp_suffix);
6636 scx.resolve_schema_in_database(&db_spec, &temp_name)
6637 .is_err()
6638 };
6639 let temp_suffix = gen_temp_suffix(&check)?;
6640 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6641
6642 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6643 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6644 schema_a_name: schema_a.name().schema.to_string(),
6645 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6646 schema_b_name: schema_b.name().schema.to_string(),
6647 name_temp,
6648 }))
6649}
6650
6651pub fn plan_alter_item_rename(
6652 scx: &mut StatementContext,
6653 object_type: ObjectType,
6654 name: UnresolvedItemName,
6655 to_item_name: Ident,
6656 if_exists: bool,
6657) -> Result<Plan, PlanError> {
6658 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6659 Ok(r) => r,
6660 Err(PlanError::MismatchedObjectType {
6662 name,
6663 is_type: ObjectType::MaterializedView,
6664 expected_type: ObjectType::View,
6665 }) => {
6666 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6667 }
6668 e => e?,
6669 };
6670
6671 match resolved {
6672 Some(entry) => {
6673 let full_name = scx.catalog.resolve_full_name(entry.name());
6674 let item_type = entry.item_type();
6675
6676 let proposed_name = QualifiedItemName {
6677 qualifiers: entry.name().qualifiers.clone(),
6678 item: to_item_name.clone().into_string(),
6679 };
6680
6681 let conflicting_type_exists;
6685 let conflicting_item_exists;
6686 if item_type == CatalogItemType::Type {
6687 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6688 conflicting_item_exists = scx
6689 .catalog
6690 .get_item_by_name(&proposed_name)
6691 .map(|item| item.item_type().conflicts_with_type())
6692 .unwrap_or(false);
6693 } else {
6694 conflicting_type_exists = item_type.conflicts_with_type()
6695 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6696 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6697 };
6698 if conflicting_type_exists || conflicting_item_exists {
6699 sql_bail!("catalog item '{}' already exists", to_item_name);
6700 }
6701
6702 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6703 id: entry.id(),
6704 current_full_name: full_name,
6705 to_name: normalize::ident(to_item_name),
6706 object_type,
6707 }))
6708 }
6709 None => {
6710 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6711 name: name.to_ast_string_simple(),
6712 object_type,
6713 });
6714
6715 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6716 }
6717 }
6718}
6719
6720pub fn plan_alter_cluster_rename(
6721 scx: &mut StatementContext,
6722 object_type: ObjectType,
6723 name: Ident,
6724 to_name: Ident,
6725 if_exists: bool,
6726) -> Result<Plan, PlanError> {
6727 match resolve_cluster(scx, &name, if_exists)? {
6728 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6729 id: entry.id(),
6730 name: entry.name().to_string(),
6731 to_name: ident(to_name),
6732 })),
6733 None => {
6734 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6735 name: name.to_ast_string_simple(),
6736 object_type,
6737 });
6738
6739 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6740 }
6741 }
6742}
6743
6744pub fn plan_alter_cluster_swap<F>(
6745 scx: &mut StatementContext,
6746 name_a: Ident,
6747 name_b: Ident,
6748 gen_temp_suffix: F,
6749) -> Result<Plan, PlanError>
6750where
6751 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6752{
6753 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6754 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6755
6756 let check = |temp_suffix: &str| {
6757 let mut temp_name = ident!("mz_schema_swap_");
6758 temp_name.append_lossy(temp_suffix);
6759 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6760 Err(CatalogError::UnknownCluster(_)) => true,
6762 Ok(_) | Err(_) => false,
6764 }
6765 };
6766 let temp_suffix = gen_temp_suffix(&check)?;
6767 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6768
6769 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6770 id_a: cluster_a.id(),
6771 id_b: cluster_b.id(),
6772 name_a: name_a.into_string(),
6773 name_b: name_b.into_string(),
6774 name_temp,
6775 }))
6776}
6777
6778pub fn plan_alter_cluster_replica_rename(
6779 scx: &mut StatementContext,
6780 object_type: ObjectType,
6781 name: QualifiedReplica,
6782 to_item_name: Ident,
6783 if_exists: bool,
6784) -> Result<Plan, PlanError> {
6785 match resolve_cluster_replica(scx, &name, if_exists)? {
6786 Some((cluster, replica)) => {
6787 ensure_cluster_is_not_managed(scx, cluster.id())?;
6788 Ok(Plan::AlterClusterReplicaRename(
6789 AlterClusterReplicaRenamePlan {
6790 cluster_id: cluster.id(),
6791 replica_id: replica,
6792 name: QualifiedReplica {
6793 cluster: Ident::new(cluster.name())?,
6794 replica: name.replica,
6795 },
6796 to_name: normalize::ident(to_item_name),
6797 },
6798 ))
6799 }
6800 None => {
6801 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6802 name: name.to_ast_string_simple(),
6803 object_type,
6804 });
6805
6806 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6807 }
6808 }
6809}
6810
6811pub fn describe_alter_object_swap(
6812 _: &StatementContext,
6813 _: AlterObjectSwapStatement,
6814) -> Result<StatementDesc, PlanError> {
6815 Ok(StatementDesc::new(None))
6816}
6817
6818pub fn plan_alter_object_swap(
6819 scx: &mut StatementContext,
6820 stmt: AlterObjectSwapStatement,
6821) -> Result<Plan, PlanError> {
6822 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6823
6824 let AlterObjectSwapStatement {
6825 object_type,
6826 name_a,
6827 name_b,
6828 } = stmt;
6829 let object_type = object_type.into();
6830
6831 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6833 let mut attempts = 0;
6834 let name_temp = loop {
6835 attempts += 1;
6836 if attempts > 10 {
6837 tracing::warn!("Unable to generate temp id for swapping");
6838 sql_bail!("unable to swap!");
6839 }
6840
6841 let short_id = mz_ore::id_gen::temp_id();
6843 if check_fn(&short_id) {
6844 break short_id;
6845 }
6846 };
6847
6848 Ok(name_temp)
6849 };
6850
6851 match (object_type, name_a, name_b) {
6852 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6853 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6854 }
6855 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6856 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6857 }
6858 (object_type, _, _) => Err(PlanError::Unsupported {
6859 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6860 discussion_no: None,
6861 }),
6862 }
6863}
6864
6865pub fn describe_alter_retain_history(
6866 _: &StatementContext,
6867 _: AlterRetainHistoryStatement<Aug>,
6868) -> Result<StatementDesc, PlanError> {
6869 Ok(StatementDesc::new(None))
6870}
6871
6872pub fn plan_alter_retain_history(
6873 scx: &StatementContext,
6874 AlterRetainHistoryStatement {
6875 object_type,
6876 if_exists,
6877 name,
6878 history,
6879 }: AlterRetainHistoryStatement<Aug>,
6880) -> Result<Plan, PlanError> {
6881 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6882}
6883
6884fn alter_retain_history(
6885 scx: &StatementContext,
6886 object_type: ObjectType,
6887 if_exists: bool,
6888 name: UnresolvedObjectName,
6889 history: Option<WithOptionValue<Aug>>,
6890) -> Result<Plan, PlanError> {
6891 let name = match (object_type, name) {
6892 (
6893 ObjectType::View
6895 | ObjectType::MaterializedView
6896 | ObjectType::Table
6897 | ObjectType::Source
6898 | ObjectType::Index,
6899 UnresolvedObjectName::Item(name),
6900 ) => name,
6901 (object_type, _) => {
6902 sql_bail!("{object_type} does not support RETAIN HISTORY")
6903 }
6904 };
6905 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6906 Some(entry) => {
6907 let full_name = scx.catalog.resolve_full_name(entry.name());
6908 let item_type = entry.item_type();
6909
6910 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6912 return Err(PlanError::AlterViewOnMaterializedView(
6913 full_name.to_string(),
6914 ));
6915 } else if object_type == ObjectType::View {
6916 sql_bail!("{object_type} does not support RETAIN HISTORY")
6917 } else if object_type != item_type {
6918 sql_bail!(
6919 "\"{}\" is a {} not a {}",
6920 full_name,
6921 entry.item_type(),
6922 format!("{object_type}").to_lowercase()
6923 )
6924 }
6925
6926 let (value, lcw) = match &history {
6928 Some(WithOptionValue::RetainHistoryFor(value)) => {
6929 let window = OptionalDuration::try_from_value(value.clone())?;
6930 (Some(value.clone()), window.0)
6931 }
6932 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6934 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6935 };
6936 let window = plan_retain_history(scx, lcw)?;
6937
6938 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6939 id: entry.id(),
6940 value,
6941 window,
6942 object_type,
6943 }))
6944 }
6945 None => {
6946 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6947 name: name.to_ast_string_simple(),
6948 object_type,
6949 });
6950
6951 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6952 }
6953 }
6954}
6955
6956pub fn describe_alter_secret_options(
6957 _: &StatementContext,
6958 _: AlterSecretStatement<Aug>,
6959) -> Result<StatementDesc, PlanError> {
6960 Ok(StatementDesc::new(None))
6961}
6962
6963pub fn plan_alter_secret(
6964 scx: &mut StatementContext,
6965 stmt: AlterSecretStatement<Aug>,
6966) -> Result<Plan, PlanError> {
6967 let AlterSecretStatement {
6968 name,
6969 if_exists,
6970 value,
6971 } = stmt;
6972 let object_type = ObjectType::Secret;
6973 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6974 Some(entry) => entry.id(),
6975 None => {
6976 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6977 name: name.to_string(),
6978 object_type,
6979 });
6980
6981 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6982 }
6983 };
6984
6985 let secret_as = query::plan_secret_as(scx, value)?;
6986
6987 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6988}
6989
6990pub fn describe_alter_connection(
6991 _: &StatementContext,
6992 _: AlterConnectionStatement<Aug>,
6993) -> Result<StatementDesc, PlanError> {
6994 Ok(StatementDesc::new(None))
6995}
6996
6997generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6998
6999pub fn plan_alter_connection(
7000 scx: &StatementContext,
7001 stmt: AlterConnectionStatement<Aug>,
7002) -> Result<Plan, PlanError> {
7003 let AlterConnectionStatement {
7004 name,
7005 if_exists,
7006 actions,
7007 with_options,
7008 } = stmt;
7009 let conn_name = normalize::unresolved_item_name(name)?;
7010 let entry = match scx.catalog.resolve_item(&conn_name) {
7011 Ok(entry) => entry,
7012 Err(_) if if_exists => {
7013 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7014 name: conn_name.to_string(),
7015 object_type: ObjectType::Sink,
7016 });
7017
7018 return Ok(Plan::AlterNoop(AlterNoopPlan {
7019 object_type: ObjectType::Connection,
7020 }));
7021 }
7022 Err(e) => return Err(e.into()),
7023 };
7024
7025 let connection = entry.connection()?;
7026
7027 if actions
7028 .iter()
7029 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7030 {
7031 if actions.len() > 1 {
7032 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7033 }
7034
7035 if !with_options.is_empty() {
7036 sql_bail!(
7037 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7038 with_options
7039 .iter()
7040 .map(|o| o.to_ast_string_simple())
7041 .join(", ")
7042 );
7043 }
7044
7045 if !matches!(connection, Connection::Ssh(_)) {
7046 sql_bail!(
7047 "{} is not an SSH connection",
7048 scx.catalog.resolve_full_name(entry.name())
7049 )
7050 }
7051
7052 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7053 id: entry.id(),
7054 action: crate::plan::AlterConnectionAction::RotateKeys,
7055 }));
7056 }
7057
7058 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7059 if options.validate.is_some() {
7060 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7061 }
7062
7063 let validate = match options.validate {
7064 Some(val) => val,
7065 None => {
7066 scx.catalog
7067 .system_vars()
7068 .enable_default_connection_validation()
7069 && connection.validate_by_default()
7070 }
7071 };
7072
7073 let connection_type = match connection {
7074 Connection::Aws(_) => CreateConnectionType::Aws,
7075 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7076 Connection::Kafka(_) => CreateConnectionType::Kafka,
7077 Connection::Csr(_) => CreateConnectionType::Csr,
7078 Connection::Postgres(_) => CreateConnectionType::Postgres,
7079 Connection::Ssh(_) => CreateConnectionType::Ssh,
7080 Connection::MySql(_) => CreateConnectionType::MySql,
7081 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7082 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7083 };
7084
7085 let specified_options: BTreeSet<_> = actions
7087 .iter()
7088 .map(|action: &AlterConnectionAction<Aug>| match action {
7089 AlterConnectionAction::SetOption(option) => option.name.clone(),
7090 AlterConnectionAction::DropOption(name) => name.clone(),
7091 AlterConnectionAction::RotateKeys => unreachable!(),
7092 })
7093 .collect();
7094
7095 for invalid in INALTERABLE_OPTIONS {
7096 if specified_options.contains(invalid) {
7097 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7098 }
7099 }
7100
7101 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7102
7103 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7105 actions.into_iter().partition_map(|action| match action {
7106 AlterConnectionAction::SetOption(option) => Either::Left(option),
7107 AlterConnectionAction::DropOption(name) => Either::Right(name),
7108 AlterConnectionAction::RotateKeys => unreachable!(),
7109 });
7110
7111 let set_options: BTreeMap<_, _> = set_options_vec
7112 .clone()
7113 .into_iter()
7114 .map(|option| (option.name, option.value))
7115 .collect();
7116
7117 let connection_options_extracted =
7121 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7122
7123 let duplicates: Vec<_> = connection_options_extracted
7124 .seen
7125 .intersection(&drop_options)
7126 .collect();
7127
7128 if !duplicates.is_empty() {
7129 sql_bail!(
7130 "cannot both SET and DROP/RESET options {}",
7131 duplicates
7132 .iter()
7133 .map(|option| option.to_string())
7134 .join(", ")
7135 )
7136 }
7137
7138 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7139 let set_options_count = mutually_exclusive_options
7140 .iter()
7141 .filter(|o| set_options.contains_key(o))
7142 .count();
7143 let drop_options_count = mutually_exclusive_options
7144 .iter()
7145 .filter(|o| drop_options.contains(o))
7146 .count();
7147
7148 if set_options_count > 0 && drop_options_count > 0 {
7150 sql_bail!(
7151 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7152 connection_type,
7153 mutually_exclusive_options
7154 .iter()
7155 .map(|option| option.to_string())
7156 .join(", ")
7157 )
7158 }
7159
7160 if set_options_count > 0 || drop_options_count > 0 {
7165 drop_options.extend(mutually_exclusive_options.iter().cloned());
7166 }
7167
7168 }
7171
7172 Ok(Plan::AlterConnection(AlterConnectionPlan {
7173 id: entry.id(),
7174 action: crate::plan::AlterConnectionAction::AlterOptions {
7175 set_options,
7176 drop_options,
7177 validate,
7178 },
7179 }))
7180}
7181
7182pub fn describe_alter_sink(
7183 _: &StatementContext,
7184 _: AlterSinkStatement<Aug>,
7185) -> Result<StatementDesc, PlanError> {
7186 Ok(StatementDesc::new(None))
7187}
7188
7189pub fn plan_alter_sink(
7190 scx: &mut StatementContext,
7191 stmt: AlterSinkStatement<Aug>,
7192) -> Result<Plan, PlanError> {
7193 let AlterSinkStatement {
7194 sink_name,
7195 if_exists,
7196 action,
7197 } = stmt;
7198
7199 let object_type = ObjectType::Sink;
7200 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7201
7202 let Some(item) = item else {
7203 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7204 name: sink_name.to_string(),
7205 object_type,
7206 });
7207
7208 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7209 };
7210 let item = item.at_version(RelationVersionSelector::Latest);
7212
7213 match action {
7214 AlterSinkAction::ChangeRelation(new_from) => {
7215 let create_sql = item.create_sql();
7217 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7218 let [stmt]: [StatementParseResult; 1] = stmts
7219 .try_into()
7220 .expect("create sql of sink was not exactly one statement");
7221 let Statement::CreateSink(stmt) = stmt.ast else {
7222 unreachable!("invalid create SQL for sink item");
7223 };
7224
7225 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7227 stmt.from = new_from;
7228
7229 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7231 unreachable!("invalid plan for CREATE SINK statement");
7232 };
7233
7234 plan.sink.version += 1;
7235
7236 Ok(Plan::AlterSink(AlterSinkPlan {
7237 item_id: item.id(),
7238 global_id: item.global_id(),
7239 sink: plan.sink,
7240 with_snapshot: plan.with_snapshot,
7241 in_cluster: plan.in_cluster,
7242 }))
7243 }
7244 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7245 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7246 }
7247}
7248
7249pub fn describe_alter_source(
7250 _: &StatementContext,
7251 _: AlterSourceStatement<Aug>,
7252) -> Result<StatementDesc, PlanError> {
7253 Ok(StatementDesc::new(None))
7255}
7256
7257generate_extracted_config!(
7258 AlterSourceAddSubsourceOption,
7259 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7260 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7261 (Details, String)
7262);
7263
7264pub fn plan_alter_source(
7265 scx: &mut StatementContext,
7266 stmt: AlterSourceStatement<Aug>,
7267) -> Result<Plan, PlanError> {
7268 let AlterSourceStatement {
7269 source_name,
7270 if_exists,
7271 action,
7272 } = stmt;
7273 let object_type = ObjectType::Source;
7274
7275 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7276 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7277 name: source_name.to_string(),
7278 object_type,
7279 });
7280
7281 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7282 }
7283
7284 match action {
7285 AlterSourceAction::SetOptions(options) => {
7286 let mut options = options.into_iter();
7287 let option = options.next().unwrap();
7288 if option.name == CreateSourceOptionName::RetainHistory {
7289 if options.next().is_some() {
7290 sql_bail!("RETAIN HISTORY must be only option");
7291 }
7292 return alter_retain_history(
7293 scx,
7294 object_type,
7295 if_exists,
7296 UnresolvedObjectName::Item(source_name),
7297 option.value,
7298 );
7299 }
7300 sql_bail!(
7303 "Cannot modify the {} of a SOURCE.",
7304 option.name.to_ast_string_simple()
7305 );
7306 }
7307 AlterSourceAction::ResetOptions(reset) => {
7308 let mut options = reset.into_iter();
7309 let option = options.next().unwrap();
7310 if option == CreateSourceOptionName::RetainHistory {
7311 if options.next().is_some() {
7312 sql_bail!("RETAIN HISTORY must be only option");
7313 }
7314 return alter_retain_history(
7315 scx,
7316 object_type,
7317 if_exists,
7318 UnresolvedObjectName::Item(source_name),
7319 None,
7320 );
7321 }
7322 sql_bail!(
7323 "Cannot modify the {} of a SOURCE.",
7324 option.to_ast_string_simple()
7325 );
7326 }
7327 AlterSourceAction::DropSubsources { .. } => {
7328 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7329 }
7330 AlterSourceAction::AddSubsources { .. } => {
7331 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7332 }
7333 AlterSourceAction::RefreshReferences => {
7334 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7335 }
7336 };
7337}
7338
7339pub fn describe_alter_system_set(
7340 _: &StatementContext,
7341 _: AlterSystemSetStatement,
7342) -> Result<StatementDesc, PlanError> {
7343 Ok(StatementDesc::new(None))
7344}
7345
7346pub fn plan_alter_system_set(
7347 _: &StatementContext,
7348 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7349) -> Result<Plan, PlanError> {
7350 let name = name.to_string();
7351 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7352 name,
7353 value: scl::plan_set_variable_to(to)?,
7354 }))
7355}
7356
7357pub fn describe_alter_system_reset(
7358 _: &StatementContext,
7359 _: AlterSystemResetStatement,
7360) -> Result<StatementDesc, PlanError> {
7361 Ok(StatementDesc::new(None))
7362}
7363
7364pub fn plan_alter_system_reset(
7365 _: &StatementContext,
7366 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7367) -> Result<Plan, PlanError> {
7368 let name = name.to_string();
7369 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7370}
7371
7372pub fn describe_alter_system_reset_all(
7373 _: &StatementContext,
7374 _: AlterSystemResetAllStatement,
7375) -> Result<StatementDesc, PlanError> {
7376 Ok(StatementDesc::new(None))
7377}
7378
7379pub fn plan_alter_system_reset_all(
7380 _: &StatementContext,
7381 _: AlterSystemResetAllStatement,
7382) -> Result<Plan, PlanError> {
7383 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7384}
7385
7386pub fn describe_alter_role(
7387 _: &StatementContext,
7388 _: AlterRoleStatement<Aug>,
7389) -> Result<StatementDesc, PlanError> {
7390 Ok(StatementDesc::new(None))
7391}
7392
7393pub fn plan_alter_role(
7394 scx: &StatementContext,
7395 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7396) -> Result<Plan, PlanError> {
7397 let option = match option {
7398 AlterRoleOption::Attributes(attrs) => {
7399 let attrs = plan_role_attributes(attrs, scx)?;
7400 PlannedAlterRoleOption::Attributes(attrs)
7401 }
7402 AlterRoleOption::Variable(variable) => {
7403 let var = plan_role_variable(variable)?;
7404 PlannedAlterRoleOption::Variable(var)
7405 }
7406 };
7407
7408 Ok(Plan::AlterRole(AlterRolePlan {
7409 id: name.id,
7410 name: name.name,
7411 option,
7412 }))
7413}
7414
7415pub fn describe_alter_table_add_column(
7416 _: &StatementContext,
7417 _: AlterTableAddColumnStatement<Aug>,
7418) -> Result<StatementDesc, PlanError> {
7419 Ok(StatementDesc::new(None))
7420}
7421
7422pub fn plan_alter_table_add_column(
7423 scx: &StatementContext,
7424 stmt: AlterTableAddColumnStatement<Aug>,
7425) -> Result<Plan, PlanError> {
7426 let AlterTableAddColumnStatement {
7427 if_exists,
7428 name,
7429 if_col_not_exist,
7430 column_name,
7431 data_type,
7432 } = stmt;
7433 let object_type = ObjectType::Table;
7434
7435 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7436
7437 let (relation_id, item_name, desc) =
7438 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7439 Some(item) => {
7440 let item_name = scx.catalog.resolve_full_name(item.name());
7442 let item = item.at_version(RelationVersionSelector::Latest);
7443 let desc = item.relation_desc().expect("table has desc").into_owned();
7444 (item.id(), item_name, desc)
7445 }
7446 None => {
7447 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7448 name: name.to_ast_string_simple(),
7449 object_type,
7450 });
7451 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7452 }
7453 };
7454
7455 let column_name = ColumnName::from(column_name.as_str());
7456 if desc.get_by_name(&column_name).is_some() {
7457 if if_col_not_exist {
7458 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7459 column_name: column_name.to_string(),
7460 object_name: item_name.item,
7461 });
7462 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7463 } else {
7464 return Err(PlanError::ColumnAlreadyExists {
7465 column_name,
7466 object_name: item_name.item,
7467 });
7468 }
7469 }
7470
7471 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7472 let column_type = scalar_type.nullable(true);
7474 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7476
7477 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7478 relation_id,
7479 column_name,
7480 column_type,
7481 raw_sql_type,
7482 }))
7483}
7484
7485pub fn describe_alter_materialized_view_apply_replacement(
7486 _: &StatementContext,
7487 _: AlterMaterializedViewApplyReplacementStatement,
7488) -> Result<StatementDesc, PlanError> {
7489 Ok(StatementDesc::new(None))
7490}
7491
7492pub fn plan_alter_materialized_view_apply_replacement(
7493 scx: &StatementContext,
7494 stmt: AlterMaterializedViewApplyReplacementStatement,
7495) -> Result<Plan, PlanError> {
7496 let AlterMaterializedViewApplyReplacementStatement {
7497 if_exists,
7498 name,
7499 replacement_name,
7500 } = stmt;
7501
7502 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7503
7504 let object_type = ObjectType::MaterializedView;
7505 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7506 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7507 name: name.to_ast_string_simple(),
7508 object_type,
7509 });
7510 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7511 };
7512
7513 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7514 .expect("if_exists not set");
7515
7516 if replacement.replacement_target() != Some(mv.id()) {
7517 return Err(PlanError::InvalidReplacement {
7518 item_type: mv.item_type(),
7519 item_name: scx.catalog.minimal_qualification(mv.name()),
7520 replacement_type: replacement.item_type(),
7521 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7522 });
7523 }
7524
7525 Ok(Plan::AlterMaterializedViewApplyReplacement(
7526 AlterMaterializedViewApplyReplacementPlan {
7527 id: mv.id(),
7528 replacement_id: replacement.id(),
7529 },
7530 ))
7531}
7532
7533pub fn describe_comment(
7534 _: &StatementContext,
7535 _: CommentStatement<Aug>,
7536) -> Result<StatementDesc, PlanError> {
7537 Ok(StatementDesc::new(None))
7538}
7539
7540pub fn plan_comment(
7541 scx: &mut StatementContext,
7542 stmt: CommentStatement<Aug>,
7543) -> Result<Plan, PlanError> {
7544 const MAX_COMMENT_LENGTH: usize = 1024;
7545
7546 let CommentStatement { object, comment } = stmt;
7547
7548 if let Some(c) = &comment {
7550 if c.len() > 1024 {
7551 return Err(PlanError::CommentTooLong {
7552 length: c.len(),
7553 max_size: MAX_COMMENT_LENGTH,
7554 });
7555 }
7556 }
7557
7558 let (object_id, column_pos) = match &object {
7559 com_ty @ CommentObjectType::Table { name }
7560 | com_ty @ CommentObjectType::View { name }
7561 | com_ty @ CommentObjectType::MaterializedView { name }
7562 | com_ty @ CommentObjectType::Index { name }
7563 | com_ty @ CommentObjectType::Func { name }
7564 | com_ty @ CommentObjectType::Connection { name }
7565 | com_ty @ CommentObjectType::Source { name }
7566 | com_ty @ CommentObjectType::Sink { name }
7567 | com_ty @ CommentObjectType::Secret { name }
7568 | com_ty @ CommentObjectType::ContinualTask { name } => {
7569 let item = scx.get_item_by_resolved_name(name)?;
7570 match (com_ty, item.item_type()) {
7571 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7572 (CommentObjectId::Table(item.id()), None)
7573 }
7574 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7575 (CommentObjectId::View(item.id()), None)
7576 }
7577 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7578 (CommentObjectId::MaterializedView(item.id()), None)
7579 }
7580 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7581 (CommentObjectId::Index(item.id()), None)
7582 }
7583 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7584 (CommentObjectId::Func(item.id()), None)
7585 }
7586 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7587 (CommentObjectId::Connection(item.id()), None)
7588 }
7589 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7590 (CommentObjectId::Source(item.id()), None)
7591 }
7592 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7593 (CommentObjectId::Sink(item.id()), None)
7594 }
7595 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7596 (CommentObjectId::Secret(item.id()), None)
7597 }
7598 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7599 (CommentObjectId::ContinualTask(item.id()), None)
7600 }
7601 (com_ty, cat_ty) => {
7602 let expected_type = match com_ty {
7603 CommentObjectType::Table { .. } => ObjectType::Table,
7604 CommentObjectType::View { .. } => ObjectType::View,
7605 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7606 CommentObjectType::Index { .. } => ObjectType::Index,
7607 CommentObjectType::Func { .. } => ObjectType::Func,
7608 CommentObjectType::Connection { .. } => ObjectType::Connection,
7609 CommentObjectType::Source { .. } => ObjectType::Source,
7610 CommentObjectType::Sink { .. } => ObjectType::Sink,
7611 CommentObjectType::Secret { .. } => ObjectType::Secret,
7612 _ => unreachable!("these are the only types we match on"),
7613 };
7614
7615 return Err(PlanError::InvalidObjectType {
7616 expected_type: SystemObjectType::Object(expected_type),
7617 actual_type: SystemObjectType::Object(cat_ty.into()),
7618 object_name: item.name().item.clone(),
7619 });
7620 }
7621 }
7622 }
7623 CommentObjectType::Type { ty } => match ty {
7624 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7625 sql_bail!("cannot comment on anonymous list or map type");
7626 }
7627 ResolvedDataType::Named { id, modifiers, .. } => {
7628 if !modifiers.is_empty() {
7629 sql_bail!("cannot comment on type with modifiers");
7630 }
7631 (CommentObjectId::Type(*id), None)
7632 }
7633 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7634 },
7635 CommentObjectType::Column { name } => {
7636 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7637 match item.item_type() {
7638 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7639 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7640 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7641 CatalogItemType::MaterializedView => {
7642 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7643 }
7644 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7645 r => {
7646 return Err(PlanError::Unsupported {
7647 feature: format!("Specifying comments on a column of {r}"),
7648 discussion_no: None,
7649 });
7650 }
7651 }
7652 }
7653 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7654 CommentObjectType::Database { name } => {
7655 (CommentObjectId::Database(*name.database_id()), None)
7656 }
7657 CommentObjectType::Schema { name } => {
7658 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7662 sql_bail!(
7663 "cannot comment on schema {} because it is a temporary schema",
7664 mz_repr::namespaces::MZ_TEMP_SCHEMA
7665 );
7666 }
7667 (
7668 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7669 None,
7670 )
7671 }
7672 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7673 CommentObjectType::ClusterReplica { name } => {
7674 let replica = scx.catalog.resolve_cluster_replica(name)?;
7675 (
7676 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7677 None,
7678 )
7679 }
7680 CommentObjectType::NetworkPolicy { name } => {
7681 (CommentObjectId::NetworkPolicy(name.id), None)
7682 }
7683 };
7684
7685 if let Some(p) = column_pos {
7691 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7692 max_num_columns: MAX_NUM_COLUMNS,
7693 req_num_columns: p,
7694 })?;
7695 }
7696
7697 Ok(Plan::Comment(CommentPlan {
7698 object_id,
7699 sub_component: column_pos,
7700 comment,
7701 }))
7702}
7703
7704pub(crate) fn resolve_cluster<'a>(
7705 scx: &'a StatementContext,
7706 name: &'a Ident,
7707 if_exists: bool,
7708) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7709 match scx.resolve_cluster(Some(name)) {
7710 Ok(cluster) => Ok(Some(cluster)),
7711 Err(_) if if_exists => Ok(None),
7712 Err(e) => Err(e),
7713 }
7714}
7715
7716pub(crate) fn resolve_cluster_replica<'a>(
7717 scx: &'a StatementContext,
7718 name: &QualifiedReplica,
7719 if_exists: bool,
7720) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7721 match scx.resolve_cluster(Some(&name.cluster)) {
7722 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7723 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7724 None if if_exists => Ok(None),
7725 None => Err(sql_err!(
7726 "CLUSTER {} has no CLUSTER REPLICA named {}",
7727 cluster.name(),
7728 name.replica.as_str().quoted(),
7729 )),
7730 },
7731 Err(_) if if_exists => Ok(None),
7732 Err(e) => Err(e),
7733 }
7734}
7735
7736pub(crate) fn resolve_database<'a>(
7737 scx: &'a StatementContext,
7738 name: &'a UnresolvedDatabaseName,
7739 if_exists: bool,
7740) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7741 match scx.resolve_database(name) {
7742 Ok(database) => Ok(Some(database)),
7743 Err(_) if if_exists => Ok(None),
7744 Err(e) => Err(e),
7745 }
7746}
7747
7748pub(crate) fn resolve_schema<'a>(
7749 scx: &'a StatementContext,
7750 name: UnresolvedSchemaName,
7751 if_exists: bool,
7752) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7753 match scx.resolve_schema(name) {
7754 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7755 Err(_) if if_exists => Ok(None),
7756 Err(e) => Err(e),
7757 }
7758}
7759
7760pub(crate) fn resolve_network_policy<'a>(
7761 scx: &'a StatementContext,
7762 name: Ident,
7763 if_exists: bool,
7764) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7765 match scx.catalog.resolve_network_policy(&name.to_string()) {
7766 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7767 id: policy.id(),
7768 name: policy.name().to_string(),
7769 })),
7770 Err(_) if if_exists => Ok(None),
7771 Err(e) => Err(e.into()),
7772 }
7773}
7774
7775pub(crate) fn resolve_item_or_type<'a>(
7776 scx: &'a StatementContext,
7777 object_type: ObjectType,
7778 name: UnresolvedItemName,
7779 if_exists: bool,
7780) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7781 let name = normalize::unresolved_item_name(name)?;
7782 let catalog_item = match object_type {
7783 ObjectType::Type => scx.catalog.resolve_type(&name),
7784 _ => scx.catalog.resolve_item(&name),
7785 };
7786
7787 match catalog_item {
7788 Ok(item) => {
7789 let is_type = ObjectType::from(item.item_type());
7790 if object_type == is_type {
7791 Ok(Some(item))
7792 } else {
7793 Err(PlanError::MismatchedObjectType {
7794 name: scx.catalog.minimal_qualification(item.name()),
7795 is_type,
7796 expected_type: object_type,
7797 })
7798 }
7799 }
7800 Err(_) if if_exists => Ok(None),
7801 Err(e) => Err(e.into()),
7802 }
7803}
7804
7805fn ensure_cluster_is_not_managed(
7807 scx: &StatementContext,
7808 cluster_id: ClusterId,
7809) -> Result<(), PlanError> {
7810 let cluster = scx.catalog.get_cluster(cluster_id);
7811 if cluster.is_managed() {
7812 Err(PlanError::ManagedCluster {
7813 cluster_name: cluster.name().to_string(),
7814 })
7815 } else {
7816 Ok(())
7817 }
7818}