1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_arrow_util::builder::ArrowBuilder;
25use mz_auth::password::Password;
26use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
27use mz_expr::{CollectionPlan, UnmaterializableFunc};
28use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
29use mz_ore::cast::{CastFrom, TryCastFrom};
30use mz_ore::collections::{CollectionExt, HashSet};
31use mz_ore::num::NonNeg;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_proto::RustType;
35use mz_repr::adt::interval::Interval;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::network_policy_id::NetworkPolicyId;
38use mz_repr::optimize::OptimizerFeatureOverrides;
39use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
43 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
44 preserves_order, strconv,
45};
46use mz_sql_parser::ast::{
47 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
48 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
49 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
50 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
51 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
52 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
53 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
54 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
55 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
56 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
57 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
58 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
59 ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
60 CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
61 CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
141};
142use crate::plan::scope::Scope;
143use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
144use crate::plan::statement::{StatementContext, StatementDesc, scl};
145use crate::plan::typeconv::CastContext;
146use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
147use crate::plan::{
148 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
149 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
150 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
151 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
152 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
153 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
154 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
155 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
156 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
157 CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
158 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
159 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
160 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
161 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
162 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
163 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
164 WebhookValidation, literal, plan_utils, query, transform_ast,
165};
166use crate::session::vars::{
167 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
168 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
169 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 if scx.pcx.is_some() {
900 let min = scx.catalog.system_vars().min_timestamp_interval();
901 let max = scx.catalog.system_vars().max_timestamp_interval();
902 if duration < min || duration > max {
903 return Err(PlanError::InvalidTimestampInterval {
904 min,
905 max,
906 requested: duration,
907 });
908 }
909 }
910 duration
911 }
912 None => scx.catalog.system_vars().default_timestamp_interval(),
913 };
914
915 let (desc, data_source) = match progress_subsource {
916 Some(name) => {
917 let DeferredItemName::Named(name) = name else {
918 sql_bail!("[internal error] progress subsource must be named during purification");
919 };
920 let ResolvedItemName::Item { id, .. } = name else {
921 sql_bail!("[internal error] invalid target id");
922 };
923
924 let details = match external_connection {
925 GenericSourceConnection::Kafka(ref c) => {
926 SourceExportDetails::Kafka(KafkaSourceExportDetails {
927 metadata_columns: c.metadata_columns.clone(),
928 })
929 }
930 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
931 LoadGenerator::Auction
932 | LoadGenerator::Marketing
933 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
934 LoadGenerator::Counter { .. }
935 | LoadGenerator::Clock
936 | LoadGenerator::Datums
937 | LoadGenerator::KeyValue(_) => {
938 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
939 output: LoadGeneratorOutput::Default,
940 })
941 }
942 },
943 GenericSourceConnection::Postgres(_)
944 | GenericSourceConnection::MySql(_)
945 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
946 };
947
948 let data_source = DataSourceDesc::OldSyntaxIngestion {
949 desc: SourceDesc {
950 connection: external_connection,
951 timestamp_interval,
952 },
953 progress_subsource: *id,
954 data_config: SourceExportDataConfig {
955 encoding,
956 envelope: envelope.clone(),
957 },
958 details,
959 };
960 (desc, data_source)
961 }
962 None => {
963 let desc = external_connection.timestamp_desc();
964 let data_source = DataSourceDesc::Ingestion(SourceDesc {
965 connection: external_connection,
966 timestamp_interval,
967 });
968 (desc, data_source)
969 }
970 };
971
972 let if_not_exists = *if_not_exists;
973 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
974
975 let full_name = scx.catalog.resolve_full_name(&name);
977 let partial_name = PartialItemName::from(full_name.clone());
978 if let (false, Ok(item)) = (
981 if_not_exists,
982 scx.catalog.resolve_item_or_type(&partial_name),
983 ) {
984 return Err(PlanError::ItemAlreadyExists {
985 name: full_name.to_string(),
986 item_type: item.item_type(),
987 });
988 }
989
990 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
994
995 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
996
997 let timeline = match envelope {
999 SourceEnvelope::CdcV2 => {
1000 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1001 }
1002 _ => Timeline::EpochMilliseconds,
1003 };
1004
1005 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1006
1007 let source = Source {
1008 create_sql,
1009 data_source,
1010 desc,
1011 compaction_window,
1012 };
1013
1014 Ok(Plan::CreateSource(CreateSourcePlan {
1015 name,
1016 source,
1017 if_not_exists,
1018 timeline,
1019 in_cluster: Some(in_cluster.id()),
1020 }))
1021}
1022
1023pub fn plan_generic_source_connection(
1024 scx: &StatementContext<'_>,
1025 source_connection: &CreateSourceConnection<Aug>,
1026 include_metadata: &Vec<SourceIncludeMetadata>,
1027) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1028 Ok(match source_connection {
1029 CreateSourceConnection::Kafka {
1030 connection,
1031 options,
1032 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1033 scx,
1034 connection,
1035 options,
1036 include_metadata,
1037 )?),
1038 CreateSourceConnection::Postgres {
1039 connection,
1040 options,
1041 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1042 scx, connection, options,
1043 )?),
1044 CreateSourceConnection::SqlServer {
1045 connection,
1046 options,
1047 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1048 scx, connection, options,
1049 )?),
1050 CreateSourceConnection::MySql {
1051 connection,
1052 options,
1053 } => {
1054 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1055 }
1056 CreateSourceConnection::LoadGenerator { generator, options } => {
1057 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1058 scx,
1059 generator,
1060 options,
1061 include_metadata,
1062 )?)
1063 }
1064 })
1065}
1066
1067fn plan_load_generator_source_connection(
1068 scx: &StatementContext<'_>,
1069 generator: &ast::LoadGenerator,
1070 options: &Vec<LoadGeneratorOption<Aug>>,
1071 include_metadata: &Vec<SourceIncludeMetadata>,
1072) -> Result<LoadGeneratorSourceConnection, PlanError> {
1073 let load_generator =
1074 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1075 let LoadGeneratorOptionExtracted {
1076 tick_interval,
1077 as_of,
1078 up_to,
1079 ..
1080 } = options.clone().try_into()?;
1081 let tick_micros = match tick_interval {
1082 Some(interval) => Some(interval.as_micros().try_into()?),
1083 None => None,
1084 };
1085 if up_to < as_of {
1086 sql_bail!("UP TO cannot be less than AS OF");
1087 }
1088 Ok(LoadGeneratorSourceConnection {
1089 load_generator,
1090 tick_micros,
1091 as_of,
1092 up_to,
1093 })
1094}
1095
1096fn plan_mysql_source_connection(
1097 scx: &StatementContext<'_>,
1098 connection: &ResolvedItemName,
1099 options: &Vec<MySqlConfigOption<Aug>>,
1100) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1101 let connection_item = scx.get_item_by_resolved_name(connection)?;
1102 match connection_item.connection()? {
1103 Connection::MySql(connection) => connection,
1104 _ => sql_bail!(
1105 "{} is not a MySQL connection",
1106 scx.catalog.resolve_full_name(connection_item.name())
1107 ),
1108 };
1109 let MySqlConfigOptionExtracted {
1110 details,
1111 text_columns: _,
1115 exclude_columns: _,
1116 seen: _,
1117 } = options.clone().try_into()?;
1118 let details = details
1119 .as_ref()
1120 .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1121 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1122 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1123 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1124 Ok(MySqlSourceConnection {
1125 connection: connection_item.id(),
1126 connection_id: connection_item.id(),
1127 details,
1128 })
1129}
1130
1131fn plan_sqlserver_source_connection(
1132 scx: &StatementContext<'_>,
1133 connection: &ResolvedItemName,
1134 options: &Vec<SqlServerConfigOption<Aug>>,
1135) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1136 let connection_item = scx.get_item_by_resolved_name(connection)?;
1137 match connection_item.connection()? {
1138 Connection::SqlServer(connection) => connection,
1139 _ => sql_bail!(
1140 "{} is not a SQL Server connection",
1141 scx.catalog.resolve_full_name(connection_item.name())
1142 ),
1143 };
1144 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1145 let details = details
1146 .as_ref()
1147 .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1148 let extras = hex::decode(details)
1149 .map_err(|e| sql_err!("{e}"))
1150 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1151 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1152 Ok(SqlServerSourceConnection {
1153 connection_id: connection_item.id(),
1154 connection: connection_item.id(),
1155 extras,
1156 })
1157}
1158
1159fn plan_postgres_source_connection(
1160 scx: &StatementContext<'_>,
1161 connection: &ResolvedItemName,
1162 options: &Vec<PgConfigOption<Aug>>,
1163) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1164 let connection_item = scx.get_item_by_resolved_name(connection)?;
1165 let PgConfigOptionExtracted {
1166 details,
1167 publication,
1168 text_columns: _,
1172 exclude_columns: _,
1176 seen: _,
1177 } = options.clone().try_into()?;
1178 let details = details
1179 .as_ref()
1180 .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1181 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1182 let details =
1183 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1184 let publication_details =
1185 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1186 Ok(PostgresSourceConnection {
1187 connection: connection_item.id(),
1188 connection_id: connection_item.id(),
1189 publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1191 publication_details,
1192 })
1193}
1194
1195fn plan_kafka_source_connection(
1196 scx: &StatementContext<'_>,
1197 connection_name: &ResolvedItemName,
1198 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1199 include_metadata: &Vec<SourceIncludeMetadata>,
1200) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1201 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1202 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1203 sql_bail!(
1204 "{} is not a kafka connection",
1205 scx.catalog.resolve_full_name(connection_item.name())
1206 )
1207 }
1208 let KafkaSourceConfigOptionExtracted {
1209 group_id_prefix,
1210 topic,
1211 topic_metadata_refresh_interval,
1212 start_timestamp: _, start_offset,
1214 seen: _,
1215 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1216 let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1218 let mut start_offsets = BTreeMap::new();
1219 if let Some(offsets) = start_offset {
1220 for (part, offset) in offsets.iter().enumerate() {
1221 if *offset < 0 {
1222 sql_bail!("START OFFSET must be a nonnegative integer");
1223 }
1224 start_offsets.insert(i32::try_from(part)?, *offset);
1225 }
1226 }
1227 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1228 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1231 }
1232 let metadata_columns = include_metadata
1233 .into_iter()
1234 .flat_map(|item| match item {
1235 SourceIncludeMetadata::Timestamp { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "timestamp".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Timestamp))
1241 }
1242 SourceIncludeMetadata::Partition { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "partition".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Partition))
1248 }
1249 SourceIncludeMetadata::Offset { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "offset".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Offset))
1255 }
1256 SourceIncludeMetadata::Headers { alias } => {
1257 let name = match alias {
1258 Some(name) => name.to_string(),
1259 None => "headers".to_owned(),
1260 };
1261 Some((name, KafkaMetadataKind::Headers))
1262 }
1263 SourceIncludeMetadata::Header {
1264 alias,
1265 key,
1266 use_bytes,
1267 } => Some((
1268 alias.to_string(),
1269 KafkaMetadataKind::Header {
1270 key: key.clone(),
1271 use_bytes: *use_bytes,
1272 },
1273 )),
1274 SourceIncludeMetadata::Key { .. } => {
1275 None
1277 }
1278 })
1279 .collect();
1280 Ok(KafkaSourceConnection {
1281 connection: connection_item.id(),
1282 connection_id: connection_item.id(),
1283 topic,
1284 start_offsets,
1285 group_id_prefix,
1286 topic_metadata_refresh_interval,
1287 metadata_columns,
1288 })
1289}
1290
1291fn apply_source_envelope_encoding(
1292 scx: &StatementContext,
1293 envelope: &ast::SourceEnvelope,
1294 format: &Option<FormatSpecifier<Aug>>,
1295 key_desc: Option<RelationDesc>,
1296 value_desc: RelationDesc,
1297 include_metadata: &[SourceIncludeMetadata],
1298 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1299 source_connection: &GenericSourceConnection<ReferencedConnection>,
1300) -> Result<
1301 (
1302 RelationDesc,
1303 SourceEnvelope,
1304 Option<SourceDataEncoding<ReferencedConnection>>,
1305 ),
1306 PlanError,
1307> {
1308 let encoding = match format {
1309 Some(format) => Some(get_encoding(scx, format, envelope)?),
1310 None => None,
1311 };
1312
1313 let (key_desc, value_desc) = match &encoding {
1314 Some(encoding) => {
1315 match value_desc.typ().columns() {
1318 [typ] => match typ.scalar_type {
1319 SqlScalarType::Bytes => {}
1320 _ => sql_bail!(
1321 "The schema produced by the source is incompatible with format decoding"
1322 ),
1323 },
1324 _ => sql_bail!(
1325 "The schema produced by the source is incompatible with format decoding"
1326 ),
1327 }
1328
1329 let (key_desc, value_desc) = encoding.desc()?;
1330
1331 let key_desc = key_desc.map(|desc| {
1338 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1339 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1340 if is_kafka && is_envelope_none {
1341 RelationDesc::from_names_and_types(
1342 desc.into_iter()
1343 .map(|(name, typ)| (name, typ.nullable(true))),
1344 )
1345 } else {
1346 desc
1347 }
1348 });
1349 (key_desc, value_desc)
1350 }
1351 None => (key_desc, value_desc),
1352 };
1353
1354 let key_envelope_no_encoding = matches!(
1367 source_connection,
1368 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1369 load_generator: LoadGenerator::KeyValue(_),
1370 ..
1371 })
1372 );
1373 let mut key_envelope = get_key_envelope(
1374 include_metadata,
1375 encoding.as_ref(),
1376 key_envelope_no_encoding,
1377 )?;
1378
1379 match (&envelope, &key_envelope) {
1380 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1381 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1382 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1383 ),
1384 _ => {}
1385 };
1386
1387 let envelope = match &envelope {
1396 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1398 ast::SourceEnvelope::Debezium => {
1399 let after_idx = match typecheck_debezium(&value_desc) {
1401 Ok((_before_idx, after_idx)) => Ok(after_idx),
1402 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1403 Some(DataEncoding::Avro(_)) => Err(type_err),
1404 _ => Err(sql_err!(
1405 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1406 )),
1407 },
1408 }?;
1409
1410 UnplannedSourceEnvelope::Upsert {
1411 style: UpsertStyle::Debezium { after_idx },
1412 }
1413 }
1414 ast::SourceEnvelope::Upsert {
1415 value_decode_err_policy,
1416 } => {
1417 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1418 None => {
1419 if !key_envelope_no_encoding {
1420 bail_unsupported!(format!(
1421 "UPSERT requires a key/value format: {:?}",
1422 format
1423 ))
1424 }
1425 None
1426 }
1427 Some(key_encoding) => Some(key_encoding),
1428 };
1429 if key_envelope == KeyEnvelope::None {
1432 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1433 }
1434 let style = match value_decode_err_policy.as_slice() {
1436 [] => UpsertStyle::Default(key_envelope),
1437 [SourceErrorPolicy::Inline { alias }] => {
1438 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1439 UpsertStyle::ValueErrInline {
1440 key_envelope,
1441 error_column: alias
1442 .as_ref()
1443 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1444 }
1445 }
1446 _ => {
1447 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1448 }
1449 };
1450
1451 UnplannedSourceEnvelope::Upsert { style }
1452 }
1453 ast::SourceEnvelope::CdcV2 => {
1454 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1455 match format {
1457 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1458 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1459 }
1460 UnplannedSourceEnvelope::CdcV2
1461 }
1462 };
1463
1464 let metadata_desc = included_column_desc(metadata_columns_desc);
1465 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1466
1467 Ok((desc, envelope, encoding))
1468}
1469
1470fn plan_source_export_desc(
1473 scx: &StatementContext,
1474 name: &UnresolvedItemName,
1475 columns: &Vec<ColumnDef<Aug>>,
1476 constraints: &Vec<TableConstraint<Aug>>,
1477) -> Result<RelationDesc, PlanError> {
1478 let names: Vec<_> = columns
1479 .iter()
1480 .map(|c| normalize::column_name(c.name.clone()))
1481 .collect();
1482
1483 if let Some(dup) = names.iter().duplicates().next() {
1484 sql_bail!("column {} specified more than once", dup.quoted());
1485 }
1486
1487 let mut column_types = Vec::with_capacity(columns.len());
1490 let mut keys = Vec::new();
1491
1492 for (i, c) in columns.into_iter().enumerate() {
1493 let aug_data_type = &c.data_type;
1494 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1495 let mut nullable = true;
1496 for option in &c.options {
1497 match &option.option {
1498 ColumnOption::NotNull => nullable = false,
1499 ColumnOption::Default(_) => {
1500 bail_unsupported!("Source export with default value")
1501 }
1502 ColumnOption::Unique { is_primary } => {
1503 keys.push(vec![i]);
1504 if *is_primary {
1505 nullable = false;
1506 }
1507 }
1508 other => {
1509 bail_unsupported!(format!("Source export with column constraint: {}", other))
1510 }
1511 }
1512 }
1513 column_types.push(ty.nullable(nullable));
1514 }
1515
1516 let mut seen_primary = false;
1517 'c: for constraint in constraints {
1518 match constraint {
1519 TableConstraint::Unique {
1520 name: _,
1521 columns,
1522 is_primary,
1523 nulls_not_distinct,
1524 } => {
1525 if seen_primary && *is_primary {
1526 sql_bail!(
1527 "multiple primary keys for source export {} are not allowed",
1528 name.to_ast_string_stable()
1529 );
1530 }
1531 seen_primary = *is_primary || seen_primary;
1532
1533 let mut key = vec![];
1534 for column in columns {
1535 let column = normalize::column_name(column.clone());
1536 match names.iter().position(|name| *name == column) {
1537 None => sql_bail!("unknown column in constraint: {}", column),
1538 Some(i) => {
1539 let nullable = &mut column_types[i].nullable;
1540 if *is_primary {
1541 if *nulls_not_distinct {
1542 sql_bail!(
1543 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1544 );
1545 }
1546 *nullable = false;
1547 } else if !(*nulls_not_distinct || !*nullable) {
1548 break 'c;
1551 }
1552
1553 key.push(i);
1554 }
1555 }
1556 }
1557
1558 if *is_primary {
1559 keys.insert(0, key);
1560 } else {
1561 keys.push(key);
1562 }
1563 }
1564 TableConstraint::ForeignKey { .. } => {
1565 bail_unsupported!("Source export with a foreign key")
1566 }
1567 TableConstraint::Check { .. } => {
1568 bail_unsupported!("Source export with a check constraint")
1569 }
1570 }
1571 }
1572
1573 let typ = SqlRelationType::new(column_types).with_keys(keys);
1574 let desc = RelationDesc::new(typ, names);
1575 Ok(desc)
1576}
1577
1578generate_extracted_config!(
1579 CreateSubsourceOption,
1580 (Progress, bool, Default(false)),
1581 (ExternalReference, UnresolvedItemName),
1582 (RetainHistory, OptionalDuration),
1583 (TextColumns, Vec::<Ident>, Default(vec![])),
1584 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1585 (Details, String)
1586);
1587
1588pub fn plan_create_subsource(
1589 scx: &StatementContext,
1590 stmt: CreateSubsourceStatement<Aug>,
1591) -> Result<Plan, PlanError> {
1592 let CreateSubsourceStatement {
1593 name,
1594 columns,
1595 of_source,
1596 constraints,
1597 if_not_exists,
1598 with_options,
1599 } = &stmt;
1600
1601 let CreateSubsourceOptionExtracted {
1602 progress,
1603 retain_history,
1604 external_reference,
1605 text_columns,
1606 exclude_columns,
1607 details,
1608 seen: _,
1609 } = with_options.clone().try_into()?;
1610
1611 if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1616 bail_internal!(
1617 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1618 );
1619 }
1620
1621 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1622
1623 let data_source = if let Some(source_reference) = of_source {
1624 if scx.catalog.system_vars().enable_create_table_from_source()
1627 && scx.catalog.system_vars().force_source_table_syntax()
1628 {
1629 Err(PlanError::UseTablesForSources(
1630 "CREATE SUBSOURCE".to_string(),
1631 ))?;
1632 }
1633
1634 let ingestion_id = *source_reference.item_id();
1637 let external_reference = external_reference.ok_or_else(|| {
1638 sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1639 })?;
1640
1641 let details = details
1644 .as_ref()
1645 .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1646 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1647 let details =
1648 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1649 let details =
1650 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1651 let details = match details {
1652 SourceExportStatementDetails::Postgres { table } => {
1653 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1654 column_casts: crate::pure::postgres::generate_column_casts(
1655 scx,
1656 &table,
1657 &text_columns,
1658 )?,
1659 table,
1660 })
1661 }
1662 SourceExportStatementDetails::MySql {
1663 table,
1664 initial_gtid_set,
1665 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1666 table,
1667 initial_gtid_set,
1668 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1669 exclude_columns: exclude_columns
1670 .into_iter()
1671 .map(|c| c.into_string())
1672 .collect(),
1673 }),
1674 SourceExportStatementDetails::SqlServer {
1675 table,
1676 capture_instance,
1677 initial_lsn,
1678 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1679 capture_instance,
1680 table,
1681 initial_lsn,
1682 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1683 exclude_columns: exclude_columns
1684 .into_iter()
1685 .map(|c| c.into_string())
1686 .collect(),
1687 }),
1688 SourceExportStatementDetails::LoadGenerator { output } => {
1689 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1690 }
1691 SourceExportStatementDetails::Kafka {} => {
1692 bail_unsupported!("subsources cannot reference Kafka sources")
1693 }
1694 };
1695 DataSourceDesc::IngestionExport {
1696 ingestion_id,
1697 external_reference,
1698 details,
1699 data_config: SourceExportDataConfig {
1701 envelope: SourceEnvelope::None(NoneEnvelope {
1702 key_envelope: KeyEnvelope::None,
1703 key_arity: 0,
1704 }),
1705 encoding: None,
1706 },
1707 }
1708 } else if progress {
1709 DataSourceDesc::Progress
1710 } else {
1711 sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1712 };
1713
1714 let if_not_exists = *if_not_exists;
1715 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1716
1717 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1718
1719 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1720 let source = Source {
1721 create_sql,
1722 data_source,
1723 desc,
1724 compaction_window,
1725 };
1726
1727 Ok(Plan::CreateSource(CreateSourcePlan {
1728 name,
1729 source,
1730 if_not_exists,
1731 timeline: Timeline::EpochMilliseconds,
1732 in_cluster: None,
1733 }))
1734}
1735
1736generate_extracted_config!(
1737 TableFromSourceOption,
1738 (TextColumns, Vec::<Ident>, Default(vec![])),
1739 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1740 (PartitionBy, Vec<Ident>),
1741 (RetainHistory, OptionalDuration),
1742 (Details, String)
1743);
1744
1745pub fn plan_create_table_from_source(
1746 scx: &StatementContext,
1747 stmt: CreateTableFromSourceStatement<Aug>,
1748) -> Result<Plan, PlanError> {
1749 if !scx.catalog.system_vars().enable_create_table_from_source() {
1750 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1751 }
1752
1753 let CreateTableFromSourceStatement {
1754 name,
1755 columns,
1756 constraints,
1757 if_not_exists,
1758 source,
1759 external_reference,
1760 envelope,
1761 format,
1762 include_metadata,
1763 with_options,
1764 } = &stmt;
1765
1766 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1767
1768 let TableFromSourceOptionExtracted {
1769 text_columns,
1770 exclude_columns,
1771 retain_history,
1772 partition_by,
1773 details,
1774 seen: _,
1775 } = with_options.clone().try_into()?;
1776
1777 let source_item = scx.get_item_by_resolved_name(source)?;
1778 let ingestion_id = source_item.id();
1779
1780 let details = details
1783 .as_ref()
1784 .ok_or_else(|| internal_err!("source-export missing details"))?;
1785 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1786 let details =
1787 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1788 let details =
1789 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1790
1791 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1792 && include_metadata
1793 .iter()
1794 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1795 {
1796 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1798 }
1799 if !matches!(
1800 details,
1801 SourceExportStatementDetails::Kafka { .. }
1802 | SourceExportStatementDetails::LoadGenerator { .. }
1803 ) && !include_metadata.is_empty()
1804 {
1805 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1806 }
1807
1808 let details = match details {
1809 SourceExportStatementDetails::Postgres { table } => {
1810 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1811 column_casts: crate::pure::postgres::generate_column_casts(
1812 scx,
1813 &table,
1814 &text_columns,
1815 )?,
1816 table,
1817 })
1818 }
1819 SourceExportStatementDetails::MySql {
1820 table,
1821 initial_gtid_set,
1822 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1823 table,
1824 initial_gtid_set,
1825 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1826 exclude_columns: exclude_columns
1827 .into_iter()
1828 .map(|c| c.into_string())
1829 .collect(),
1830 }),
1831 SourceExportStatementDetails::SqlServer {
1832 table,
1833 capture_instance,
1834 initial_lsn,
1835 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1836 table,
1837 capture_instance,
1838 initial_lsn,
1839 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1840 exclude_columns: exclude_columns
1841 .into_iter()
1842 .map(|c| c.into_string())
1843 .collect(),
1844 }),
1845 SourceExportStatementDetails::LoadGenerator { output } => {
1846 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1847 }
1848 SourceExportStatementDetails::Kafka {} => {
1849 if !include_metadata.is_empty()
1850 && !matches!(
1851 envelope,
1852 ast::SourceEnvelope::Upsert { .. }
1853 | ast::SourceEnvelope::None
1854 | ast::SourceEnvelope::Debezium
1855 )
1856 {
1857 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1859 }
1860
1861 let metadata_columns = include_metadata
1862 .into_iter()
1863 .flat_map(|item| match item {
1864 SourceIncludeMetadata::Timestamp { alias } => {
1865 let name = match alias {
1866 Some(name) => name.to_string(),
1867 None => "timestamp".to_owned(),
1868 };
1869 Some((name, KafkaMetadataKind::Timestamp))
1870 }
1871 SourceIncludeMetadata::Partition { alias } => {
1872 let name = match alias {
1873 Some(name) => name.to_string(),
1874 None => "partition".to_owned(),
1875 };
1876 Some((name, KafkaMetadataKind::Partition))
1877 }
1878 SourceIncludeMetadata::Offset { alias } => {
1879 let name = match alias {
1880 Some(name) => name.to_string(),
1881 None => "offset".to_owned(),
1882 };
1883 Some((name, KafkaMetadataKind::Offset))
1884 }
1885 SourceIncludeMetadata::Headers { alias } => {
1886 let name = match alias {
1887 Some(name) => name.to_string(),
1888 None => "headers".to_owned(),
1889 };
1890 Some((name, KafkaMetadataKind::Headers))
1891 }
1892 SourceIncludeMetadata::Header {
1893 alias,
1894 key,
1895 use_bytes,
1896 } => Some((
1897 alias.to_string(),
1898 KafkaMetadataKind::Header {
1899 key: key.clone(),
1900 use_bytes: *use_bytes,
1901 },
1902 )),
1903 SourceIncludeMetadata::Key { .. } => {
1904 None
1906 }
1907 })
1908 .collect();
1909
1910 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1911 }
1912 };
1913
1914 let source_connection = &source_item
1915 .source_desc()?
1916 .ok_or_else(|| sql_err!("item is not a source"))?
1917 .connection;
1918
1919 let (key_desc, value_desc) =
1924 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1925 let columns = match columns {
1926 TableFromSourceColumns::Defined(columns) => columns,
1927 _ => bail_internal!("expected column definitions to be present"),
1928 };
1929 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1930 (None, desc)
1931 } else {
1932 let key_desc = source_connection.default_key_desc();
1933 let value_desc = source_connection.default_value_desc();
1934 (Some(key_desc), value_desc)
1935 };
1936
1937 let metadata_columns_desc = match &details {
1938 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1939 metadata_columns, ..
1940 }) => kafka_metadata_columns_desc(metadata_columns),
1941 _ => vec![],
1942 };
1943
1944 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1945 scx,
1946 &envelope,
1947 format,
1948 key_desc,
1949 value_desc,
1950 include_metadata,
1951 metadata_columns_desc,
1952 source_connection,
1953 )?;
1954 if let TableFromSourceColumns::Named(col_names) = columns {
1955 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1956 }
1957
1958 let names: Vec<_> = desc.iter_names().cloned().collect();
1959 if let Some(dup) = names.iter().duplicates().next() {
1960 sql_bail!("column {} specified more than once", dup.quoted());
1961 }
1962
1963 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1964
1965 let timeline = match envelope {
1968 SourceEnvelope::CdcV2 => {
1969 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1970 }
1971 _ => Timeline::EpochMilliseconds,
1972 };
1973
1974 if let Some(partition_by) = partition_by {
1975 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1976 check_partition_by(&desc, partition_by)?;
1977 }
1978
1979 let data_source = DataSourceDesc::IngestionExport {
1980 ingestion_id,
1981 external_reference: external_reference
1983 .as_ref()
1984 .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1985 .clone(),
1986 details,
1987 data_config: SourceExportDataConfig { envelope, encoding },
1988 };
1989
1990 let if_not_exists = *if_not_exists;
1991
1992 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1993
1994 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1995 let table = Table {
1996 create_sql,
1997 desc: VersionedRelationDesc::new(desc),
1998 temporary: false,
1999 compaction_window,
2000 data_source: TableDataSource::DataSource {
2001 desc: data_source,
2002 timeline,
2003 },
2004 };
2005
2006 Ok(Plan::CreateTable(CreateTablePlan {
2007 name,
2008 table,
2009 if_not_exists,
2010 }))
2011}
2012
2013generate_extracted_config!(
2014 LoadGeneratorOption,
2015 (TickInterval, Duration),
2016 (AsOf, u64, Default(0_u64)),
2017 (UpTo, u64, Default(u64::MAX)),
2018 (ScaleFactor, f64),
2019 (MaxCardinality, u64),
2020 (Keys, u64),
2021 (SnapshotRounds, u64),
2022 (TransactionalSnapshot, bool),
2023 (ValueSize, u64),
2024 (Seed, u64),
2025 (Partitions, u64),
2026 (BatchSize, u64)
2027);
2028
2029impl LoadGeneratorOptionExtracted {
2030 pub(super) fn ensure_only_valid_options(
2031 &self,
2032 loadgen: &ast::LoadGenerator,
2033 ) -> Result<(), PlanError> {
2034 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2035
2036 let mut options = self.seen.clone();
2037
2038 let permitted_options: &[_] = match loadgen {
2039 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2040 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2041 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2042 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2043 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2044 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2045 ast::LoadGenerator::KeyValue => &[
2046 TickInterval,
2047 Keys,
2048 SnapshotRounds,
2049 TransactionalSnapshot,
2050 ValueSize,
2051 Seed,
2052 Partitions,
2053 BatchSize,
2054 ],
2055 };
2056
2057 for o in permitted_options {
2058 options.remove(o);
2059 }
2060
2061 if !options.is_empty() {
2062 sql_bail!(
2063 "{} load generators do not support {} values",
2064 loadgen,
2065 options.iter().join(", ")
2066 )
2067 }
2068
2069 Ok(())
2070 }
2071}
2072
2073pub(crate) fn load_generator_ast_to_generator(
2074 scx: &StatementContext,
2075 loadgen: &ast::LoadGenerator,
2076 options: &[LoadGeneratorOption<Aug>],
2077 include_metadata: &[SourceIncludeMetadata],
2078) -> Result<LoadGenerator, PlanError> {
2079 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2080 extracted.ensure_only_valid_options(loadgen)?;
2081
2082 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2083 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2084 }
2085
2086 let load_generator = match loadgen {
2087 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2088 ast::LoadGenerator::Clock => {
2089 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2090 LoadGenerator::Clock
2091 }
2092 ast::LoadGenerator::Counter => {
2093 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2094 let LoadGeneratorOptionExtracted {
2095 max_cardinality, ..
2096 } = extracted;
2097 LoadGenerator::Counter { max_cardinality }
2098 }
2099 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2100 ast::LoadGenerator::Datums => {
2101 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2102 LoadGenerator::Datums
2103 }
2104 ast::LoadGenerator::Tpch => {
2105 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2106
2107 let sf: f64 = scale_factor.unwrap_or(0.01);
2109 if !sf.is_finite() || sf < 0.0 {
2110 sql_bail!("unsupported scale factor {sf}");
2111 }
2112
2113 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2114 let total = (sf * multiplier).floor();
2115 let mut i = i64::try_cast_from(total)
2116 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2117 if i < 1 {
2118 i = 1;
2119 }
2120 Ok(i)
2121 };
2122
2123 let count_supplier = f_to_i(10_000f64)?;
2126 let count_part = f_to_i(200_000f64)?;
2127 let count_customer = f_to_i(150_000f64)?;
2128 let count_orders = f_to_i(150_000f64 * 10f64)?;
2129 let count_clerk = f_to_i(1_000f64)?;
2130
2131 LoadGenerator::Tpch {
2132 count_supplier,
2133 count_part,
2134 count_customer,
2135 count_orders,
2136 count_clerk,
2137 }
2138 }
2139 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2140 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2141 let LoadGeneratorOptionExtracted {
2142 keys,
2143 snapshot_rounds,
2144 transactional_snapshot,
2145 value_size,
2146 tick_interval,
2147 seed,
2148 partitions,
2149 batch_size,
2150 ..
2151 } = extracted;
2152
2153 let mut include_offset = None;
2154 for im in include_metadata {
2155 match im {
2156 SourceIncludeMetadata::Offset { alias } => {
2157 include_offset = match alias {
2158 Some(alias) => Some(alias.to_string()),
2159 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2160 }
2161 }
2162 SourceIncludeMetadata::Key { .. } => continue,
2163
2164 _ => {
2165 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2166 }
2167 };
2168 }
2169
2170 let lgkv = KeyValueLoadGenerator {
2171 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2172 snapshot_rounds: snapshot_rounds
2173 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2174 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2176 value_size: value_size
2177 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2178 partitions: partitions
2179 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2180 tick_interval,
2181 batch_size: batch_size
2182 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2183 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2184 include_offset,
2185 };
2186
2187 if lgkv.keys == 0
2188 || lgkv.partitions == 0
2189 || lgkv.value_size == 0
2190 || lgkv.batch_size == 0
2191 {
2192 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2193 }
2194
2195 if lgkv.keys % lgkv.partitions != 0 {
2196 sql_bail!("KEYS must be a multiple of PARTITIONS")
2197 }
2198
2199 if lgkv.batch_size > lgkv.keys {
2200 sql_bail!("KEYS must be larger than BATCH SIZE")
2201 }
2202
2203 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2206 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2207 }
2208
2209 if lgkv.snapshot_rounds == 0 {
2210 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2211 }
2212
2213 LoadGenerator::KeyValue(lgkv)
2214 }
2215 };
2216
2217 Ok(load_generator)
2218}
2219
2220fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2221 let before = value_desc.get_by_name(&"before".into());
2222 let (after_idx, after_ty) = value_desc
2223 .get_by_name(&"after".into())
2224 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2225 let before_idx = if let Some((before_idx, before_ty)) = before {
2226 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2227 sql_bail!("'before' column must be of type record");
2228 }
2229 if before_ty != after_ty {
2230 sql_bail!("'before' type differs from 'after' column");
2231 }
2232 Some(before_idx)
2233 } else {
2234 None
2235 };
2236 Ok((before_idx, after_idx))
2237}
2238
2239fn get_encoding(
2240 scx: &StatementContext,
2241 format: &FormatSpecifier<Aug>,
2242 envelope: &ast::SourceEnvelope,
2243) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2244 let encoding = match format {
2245 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2246 FormatSpecifier::KeyValue { key, value } => {
2247 let key = {
2248 let encoding = get_encoding_inner(scx, key)?;
2249 Some(encoding.key.unwrap_or(encoding.value))
2250 };
2251 let value = get_encoding_inner(scx, value)?.value;
2252 SourceDataEncoding { key, value }
2253 }
2254 };
2255
2256 let requires_keyvalue = matches!(
2257 envelope,
2258 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2259 );
2260 let is_keyvalue = encoding.key.is_some();
2261 if requires_keyvalue && !is_keyvalue {
2262 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2263 };
2264
2265 Ok(encoding)
2266}
2267
2268fn source_sink_cluster_config<'a, 'ctx>(
2274 scx: &'a StatementContext<'ctx>,
2275 in_cluster: &mut Option<ResolvedClusterName>,
2276) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2277 let cluster = match in_cluster {
2278 None => {
2279 let cluster = scx.catalog.resolve_cluster(None)?;
2280 *in_cluster = Some(ResolvedClusterName {
2281 id: cluster.id(),
2282 print_name: None,
2283 });
2284 cluster
2285 }
2286 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2287 };
2288
2289 Ok(cluster)
2290}
2291
2292generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2293
2294#[derive(Debug)]
2295pub struct Schema {
2296 pub key_schema: Option<String>,
2297 pub value_schema: String,
2298 pub key_reference_schemas: Vec<String>,
2300 pub value_reference_schemas: Vec<String>,
2302 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2303 pub confluent_wire_format: bool,
2304}
2305
2306fn get_encoding_inner(
2307 scx: &StatementContext,
2308 format: &Format<Aug>,
2309) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2310 let value = match format {
2311 Format::Bytes => DataEncoding::Bytes,
2312 Format::Avro(schema) => {
2313 let Schema {
2314 key_schema,
2315 value_schema,
2316 key_reference_schemas,
2317 value_reference_schemas,
2318 csr_connection,
2319 confluent_wire_format,
2320 } = match schema {
2321 AvroSchema::InlineSchema {
2324 schema: ast::Schema { schema },
2325 with_options,
2326 } => {
2327 let AvroSchemaOptionExtracted {
2328 confluent_wire_format,
2329 ..
2330 } = with_options.clone().try_into()?;
2331
2332 Schema {
2333 key_schema: None,
2334 value_schema: schema.clone(),
2335 key_reference_schemas: vec![],
2336 value_reference_schemas: vec![],
2337 csr_connection: None,
2338 confluent_wire_format,
2339 }
2340 }
2341 AvroSchema::Csr {
2342 csr_connection:
2343 CsrConnectionAvro {
2344 connection,
2345 seed,
2346 key_strategy: _,
2347 value_strategy: _,
2348 },
2349 } => {
2350 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2351 let csr_connection = match item.connection()? {
2352 Connection::Csr(_) => item.id(),
2353 _ => {
2354 sql_bail!(
2355 "{} is not a schema registry connection",
2356 scx.catalog
2357 .resolve_full_name(item.name())
2358 .to_string()
2359 .quoted()
2360 )
2361 }
2362 };
2363
2364 if let Some(seed) = seed {
2365 Schema {
2366 key_schema: seed.key_schema.clone(),
2367 value_schema: seed.value_schema.clone(),
2368 key_reference_schemas: seed.key_reference_schemas.clone(),
2369 value_reference_schemas: seed.value_reference_schemas.clone(),
2370 csr_connection: Some(csr_connection),
2371 confluent_wire_format: true,
2372 }
2373 } else {
2374 sql_bail!("Avro CSR seed resolution has not been performed")
2375 }
2376 }
2377 };
2378
2379 if let Some(key_schema) = key_schema {
2380 return Ok(SourceDataEncoding {
2381 key: Some(DataEncoding::Avro(AvroEncoding {
2382 schema: key_schema,
2383 reference_schemas: key_reference_schemas,
2384 csr_connection: csr_connection.clone(),
2385 confluent_wire_format,
2386 })),
2387 value: DataEncoding::Avro(AvroEncoding {
2388 schema: value_schema,
2389 reference_schemas: value_reference_schemas,
2390 csr_connection,
2391 confluent_wire_format,
2392 }),
2393 });
2394 } else {
2395 DataEncoding::Avro(AvroEncoding {
2396 schema: value_schema,
2397 reference_schemas: value_reference_schemas,
2398 csr_connection,
2399 confluent_wire_format,
2400 })
2401 }
2402 }
2403 Format::Protobuf(schema) => match schema {
2404 ProtobufSchema::Csr {
2405 csr_connection:
2406 CsrConnectionProtobuf {
2407 connection:
2408 CsrConnection {
2409 connection,
2410 options,
2411 },
2412 seed,
2413 },
2414 } => {
2415 if let Some(CsrSeedProtobuf { key, value }) = seed {
2416 let item = scx.get_item_by_resolved_name(connection)?;
2417 let _ = match item.connection()? {
2418 Connection::Csr(connection) => connection,
2419 _ => {
2420 sql_bail!(
2421 "{} is not a schema registry connection",
2422 scx.catalog
2423 .resolve_full_name(item.name())
2424 .to_string()
2425 .quoted()
2426 )
2427 }
2428 };
2429
2430 if !options.is_empty() {
2431 sql_bail!("Protobuf CSR connections do not support any options");
2432 }
2433
2434 let value = DataEncoding::Protobuf(ProtobufEncoding {
2435 descriptors: strconv::parse_bytes(&value.schema)?,
2436 message_name: value.message_name.clone(),
2437 confluent_wire_format: true,
2438 });
2439 if let Some(key) = key {
2440 return Ok(SourceDataEncoding {
2441 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2442 descriptors: strconv::parse_bytes(&key.schema)?,
2443 message_name: key.message_name.clone(),
2444 confluent_wire_format: true,
2445 })),
2446 value,
2447 });
2448 }
2449 value
2450 } else {
2451 sql_bail!("Protobuf CSR seed resolution has not been performed")
2452 }
2453 }
2454 ProtobufSchema::InlineSchema {
2455 message_name,
2456 schema: ast::Schema { schema },
2457 } => {
2458 let descriptors = strconv::parse_bytes(schema)?;
2459
2460 DataEncoding::Protobuf(ProtobufEncoding {
2461 descriptors,
2462 message_name: message_name.to_owned(),
2463 confluent_wire_format: false,
2464 })
2465 }
2466 },
2467 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2468 regex: mz_repr::adt::regex::Regex::new(regex, false)
2469 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2470 }),
2471 Format::Csv { columns, delimiter } => {
2472 let columns = match columns {
2473 CsvColumns::Header { names } => {
2474 if names.is_empty() {
2475 sql_bail!("[internal error] column spec should get names in purify")
2476 }
2477 ColumnSpec::Header {
2478 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2479 }
2480 }
2481 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2482 };
2483 DataEncoding::Csv(CsvEncoding {
2484 columns,
2485 delimiter: u8::try_from(*delimiter)
2486 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2487 })
2488 }
2489 Format::Json { array: false } => DataEncoding::Json,
2490 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2491 Format::Text => DataEncoding::Text,
2492 };
2493 Ok(SourceDataEncoding { key: None, value })
2494}
2495
2496fn get_key_envelope(
2498 included_items: &[SourceIncludeMetadata],
2499 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2500 key_envelope_no_encoding: bool,
2501) -> Result<KeyEnvelope, PlanError> {
2502 let key_definition = included_items
2503 .iter()
2504 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2505 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2506 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2507 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2508 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2509 (Some(name), _) if key_envelope_no_encoding => {
2510 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2511 }
2512 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2513 (_, None) => {
2514 sql_bail!(
2518 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2519 got bare FORMAT"
2520 );
2521 }
2522 }
2523 } else {
2524 Ok(KeyEnvelope::None)
2525 }
2526}
2527
2528fn get_unnamed_key_envelope(
2531 key: Option<&DataEncoding<ReferencedConnection>>,
2532) -> Result<KeyEnvelope, PlanError> {
2533 let is_composite = match key {
2537 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2538 Some(
2539 DataEncoding::Avro(_)
2540 | DataEncoding::Csv(_)
2541 | DataEncoding::Protobuf(_)
2542 | DataEncoding::Regex { .. },
2543 ) => true,
2544 None => false,
2545 };
2546
2547 if is_composite {
2548 Ok(KeyEnvelope::Flattened)
2549 } else {
2550 Ok(KeyEnvelope::Named("key".to_string()))
2551 }
2552}
2553
2554pub fn describe_create_view(
2555 _: &StatementContext,
2556 _: CreateViewStatement<Aug>,
2557) -> Result<StatementDesc, PlanError> {
2558 Ok(StatementDesc::new(None))
2559}
2560
2561pub fn plan_view(
2562 scx: &StatementContext,
2563 def: &mut ViewDefinition<Aug>,
2564 temporary: bool,
2565) -> Result<(QualifiedItemName, View), PlanError> {
2566 let create_sql = normalize::create_statement(
2567 scx,
2568 Statement::CreateView(CreateViewStatement {
2569 if_exists: IfExistsBehavior::Error,
2570 temporary,
2571 definition: def.clone(),
2572 }),
2573 )?;
2574
2575 let ViewDefinition {
2576 name,
2577 columns,
2578 query,
2579 } = def;
2580
2581 let query::PlannedRootQuery {
2582 expr,
2583 mut desc,
2584 finishing,
2585 scope: _,
2586 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2587 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2593 &finishing,
2594 expr.arity()
2595 ));
2596 if expr.contains_parameters()? {
2597 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2598 }
2599
2600 let dependencies = expr
2601 .depends_on()
2602 .into_iter()
2603 .map(|gid| scx.catalog.resolve_item_id(&gid))
2604 .collect();
2605
2606 let name = if temporary {
2607 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2608 } else {
2609 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2610 };
2611
2612 plan_utils::maybe_rename_columns(
2613 format!("view {}", scx.catalog.resolve_full_name(&name)),
2614 &mut desc,
2615 columns,
2616 )?;
2617 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2618
2619 if let Some(dup) = names.iter().duplicates().next() {
2620 sql_bail!("column {} specified more than once", dup.quoted());
2621 }
2622
2623 let view = View {
2624 create_sql,
2625 expr,
2626 dependencies,
2627 column_names: names,
2628 temporary,
2629 };
2630
2631 Ok((name, view))
2632}
2633
2634pub fn plan_create_view(
2635 scx: &StatementContext,
2636 mut stmt: CreateViewStatement<Aug>,
2637) -> Result<Plan, PlanError> {
2638 let CreateViewStatement {
2639 temporary,
2640 if_exists,
2641 definition,
2642 } = &mut stmt;
2643 let (name, view) = plan_view(scx, definition, *temporary)?;
2644
2645 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2648
2649 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2650 let if_exists = true;
2651 let cascade = false;
2652 let maybe_item_to_drop = plan_drop_item(
2653 scx,
2654 ObjectType::View,
2655 if_exists,
2656 definition.name.clone(),
2657 cascade,
2658 )?;
2659
2660 if let Some(id) = maybe_item_to_drop {
2662 let dependencies = view.expr.depends_on();
2663 let invalid_drop = scx
2664 .get_item(&id)
2665 .global_ids()
2666 .any(|gid| dependencies.contains(&gid));
2667 if invalid_drop {
2668 let item = scx.catalog.get_item(&id);
2669 sql_bail!(
2670 "cannot replace view {0}: depended upon by new {0} definition",
2671 scx.catalog.resolve_full_name(item.name())
2672 );
2673 }
2674
2675 Some(id)
2676 } else {
2677 None
2678 }
2679 } else {
2680 None
2681 };
2682 let drop_ids = replace
2683 .map(|id| {
2684 scx.catalog
2685 .item_dependents(id)
2686 .into_iter()
2687 .map(|id| id.unwrap_item_id())
2688 .collect()
2689 })
2690 .unwrap_or_default();
2691
2692 validate_view_dependencies(scx, &view.dependencies.0)?;
2693
2694 let full_name = scx.catalog.resolve_full_name(&name);
2696 let partial_name = PartialItemName::from(full_name.clone());
2697 if let (Ok(item), IfExistsBehavior::Error, false) = (
2700 scx.catalog.resolve_item_or_type(&partial_name),
2701 *if_exists,
2702 ignore_if_exists_errors,
2703 ) {
2704 return Err(PlanError::ItemAlreadyExists {
2705 name: full_name.to_string(),
2706 item_type: item.item_type(),
2707 });
2708 }
2709
2710 Ok(Plan::CreateView(CreateViewPlan {
2711 name,
2712 view,
2713 replace,
2714 drop_ids,
2715 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2716 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2717 }))
2718}
2719
2720fn validate_view_dependencies(
2722 scx: &StatementContext,
2723 dependencies: &BTreeSet<CatalogItemId>,
2724) -> Result<(), PlanError> {
2725 for id in dependencies {
2726 let item = scx.catalog.get_item(id);
2727 if item.replacement_target().is_some() {
2728 let name = scx.catalog.minimal_qualification(item.name());
2729 return Err(PlanError::InvalidDependency {
2730 name: name.to_string(),
2731 item_type: format!("replacement {}", item.item_type()),
2732 });
2733 }
2734 }
2735
2736 Ok(())
2737}
2738
2739pub fn describe_create_materialized_view(
2740 _: &StatementContext,
2741 _: CreateMaterializedViewStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743 Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_create_network_policy(
2747 _: &StatementContext,
2748 _: CreateNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750 Ok(StatementDesc::new(None))
2751}
2752
2753pub fn describe_alter_network_policy(
2754 _: &StatementContext,
2755 _: AlterNetworkPolicyStatement<Aug>,
2756) -> Result<StatementDesc, PlanError> {
2757 Ok(StatementDesc::new(None))
2758}
2759
2760pub fn plan_create_materialized_view(
2761 scx: &StatementContext,
2762 mut stmt: CreateMaterializedViewStatement<Aug>,
2763) -> Result<Plan, PlanError> {
2764 let cluster_id =
2765 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2766 stmt.in_cluster = Some(ResolvedClusterName {
2767 id: cluster_id,
2768 print_name: None,
2769 });
2770
2771 let target_replica = match &stmt.in_cluster_replica {
2772 Some(replica_name) => {
2773 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2774
2775 let cluster = scx.catalog.get_cluster(cluster_id);
2776 let replica_id = cluster
2777 .replica_ids()
2778 .get(replica_name.as_str())
2779 .copied()
2780 .ok_or_else(|| {
2781 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2782 })?;
2783 Some(replica_id)
2784 }
2785 None => None,
2786 };
2787
2788 let create_sql =
2789 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2790
2791 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2792 let name = scx.allocate_qualified_name(partial_name.clone())?;
2793
2794 let query::PlannedRootQuery {
2795 expr,
2796 mut desc,
2797 finishing,
2798 scope: _,
2799 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2800 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2802 &finishing,
2803 expr.arity()
2804 ));
2805 if expr.contains_parameters()? {
2806 return Err(PlanError::ParameterNotAllowed(
2807 "materialized views".to_string(),
2808 ));
2809 }
2810
2811 plan_utils::maybe_rename_columns(
2812 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2813 &mut desc,
2814 &stmt.columns,
2815 )?;
2816 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2817
2818 let MaterializedViewOptionExtracted {
2819 assert_not_null,
2820 partition_by,
2821 retain_history,
2822 refresh,
2823 seen: _,
2824 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2825
2826 if let Some(partition_by) = partition_by {
2827 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2828 check_partition_by(&desc, partition_by)?;
2829 }
2830
2831 let refresh_schedule = {
2832 let mut refresh_schedule = RefreshSchedule::default();
2833 let mut on_commits_seen = 0;
2834 for refresh_option_value in refresh {
2835 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2836 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2837 }
2838 match refresh_option_value {
2839 RefreshOptionValue::OnCommit => {
2840 on_commits_seen += 1;
2841 }
2842 RefreshOptionValue::AtCreation => {
2843 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2844 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2845 }
2846 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2847 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2849 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2850 name: "REFRESH AT",
2851 scope: &Scope::empty(),
2852 relation_type: &SqlRelationType::empty(),
2853 allow_aggregates: false,
2854 allow_subqueries: false,
2855 allow_parameters: false,
2856 allow_windows: false,
2857 };
2858 let hir = plan_expr(ecx, &time)?.cast_to(
2859 ecx,
2860 CastContext::Assignment,
2861 &SqlScalarType::MzTimestamp,
2862 )?;
2863 let timestamp = hir
2865 .into_literal_mz_timestamp()
2866 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2867 refresh_schedule.ats.push(timestamp);
2868 }
2869 RefreshOptionValue::Every(RefreshEveryOptionValue {
2870 interval,
2871 aligned_to,
2872 }) => {
2873 let interval = Interval::try_from_value(Value::Interval(interval))?;
2874 if interval.as_microseconds() <= 0 {
2875 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2876 }
2877 if interval.months != 0 {
2878 sql_bail!("REFRESH interval must not involve units larger than days");
2883 }
2884 let interval = interval.duration()?;
2885 if u64::try_from(interval.as_millis()).is_err() {
2886 sql_bail!("REFRESH interval too large");
2887 }
2888
2889 let mut aligned_to = match aligned_to {
2890 Some(aligned_to) => aligned_to,
2891 None => {
2892 soft_panic_or_log!(
2893 "ALIGNED TO should have been filled in by purification"
2894 );
2895 sql_bail!(
2896 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2897 )
2898 }
2899 };
2900
2901 transform_ast::transform(scx, &mut aligned_to)?;
2903
2904 let ecx = &ExprContext {
2905 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2906 name: "REFRESH EVERY ... ALIGNED TO",
2907 scope: &Scope::empty(),
2908 relation_type: &SqlRelationType::empty(),
2909 allow_aggregates: false,
2910 allow_subqueries: false,
2911 allow_parameters: false,
2912 allow_windows: false,
2913 };
2914 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2915 ecx,
2916 CastContext::Assignment,
2917 &SqlScalarType::MzTimestamp,
2918 )?;
2919 let aligned_to_const = aligned_to_hir
2921 .into_literal_mz_timestamp()
2922 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2923
2924 refresh_schedule.everies.push(RefreshEvery {
2925 interval,
2926 aligned_to: aligned_to_const,
2927 });
2928 }
2929 }
2930 }
2931
2932 if on_commits_seen > 1 {
2933 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2934 }
2935 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2936 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2937 }
2938
2939 if refresh_schedule == RefreshSchedule::default() {
2940 None
2941 } else {
2942 Some(refresh_schedule)
2943 }
2944 };
2945
2946 let as_of = stmt.as_of.map(Timestamp::from);
2947 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2948 let mut non_null_assertions = assert_not_null
2949 .into_iter()
2950 .map(normalize::column_name)
2951 .map(|assertion_name| {
2952 column_names
2953 .iter()
2954 .position(|col| col == &assertion_name)
2955 .ok_or_else(|| {
2956 sql_err!(
2957 "column {} in ASSERT NOT NULL option not found",
2958 assertion_name.quoted()
2959 )
2960 })
2961 })
2962 .collect::<Result<Vec<_>, _>>()?;
2963 non_null_assertions.sort();
2964 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2965 let dup = &column_names[*dup];
2966 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2967 }
2968
2969 if let Some(dup) = column_names.iter().duplicates().next() {
2970 sql_bail!("column {} specified more than once", dup.quoted());
2971 }
2972
2973 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2976 Ok(true) => IfExistsBehavior::Skip,
2977 _ => stmt.if_exists,
2978 };
2979
2980 let mut replace = None;
2981 let mut if_not_exists = false;
2982 match if_exists {
2983 IfExistsBehavior::Replace => {
2984 let if_exists = true;
2985 let cascade = false;
2986 let replace_id = plan_drop_item(
2987 scx,
2988 ObjectType::MaterializedView,
2989 if_exists,
2990 partial_name.clone().into(),
2991 cascade,
2992 )?;
2993
2994 if let Some(id) = replace_id {
2996 let dependencies = expr.depends_on();
2997 let invalid_drop = scx
2998 .get_item(&id)
2999 .global_ids()
3000 .any(|gid| dependencies.contains(&gid));
3001 if invalid_drop {
3002 let item = scx.catalog.get_item(&id);
3003 sql_bail!(
3004 "cannot replace materialized view {0}: depended upon by new {0} definition",
3005 scx.catalog.resolve_full_name(item.name())
3006 );
3007 }
3008 replace = Some(id);
3009 }
3010 }
3011 IfExistsBehavior::Skip => if_not_exists = true,
3012 IfExistsBehavior::Error => (),
3013 }
3014 let drop_ids = replace
3015 .map(|id| {
3016 scx.catalog
3017 .item_dependents(id)
3018 .into_iter()
3019 .map(|id| id.unwrap_item_id())
3020 .collect()
3021 })
3022 .unwrap_or_default();
3023 let mut dependencies: BTreeSet<_> = expr
3024 .depends_on()
3025 .into_iter()
3026 .map(|gid| scx.catalog.resolve_item_id(&gid))
3027 .collect();
3028
3029 let mut replacement_target = None;
3031 if let Some(target_name) = &stmt.replacement_for {
3032 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3033
3034 let target = scx.get_item_by_resolved_name(target_name)?;
3035 if target.item_type() != CatalogItemType::MaterializedView {
3036 return Err(PlanError::InvalidReplacement {
3037 item_type: target.item_type(),
3038 item_name: scx.catalog.minimal_qualification(target.name()),
3039 replacement_type: CatalogItemType::MaterializedView,
3040 replacement_name: partial_name,
3041 });
3042 }
3043 if target.id().is_system() {
3044 sql_bail!(
3045 "cannot replace {} because it is required by the database system",
3046 scx.catalog.minimal_qualification(target.name()),
3047 );
3048 }
3049
3050 for dependent in scx.catalog.item_dependents(target.id()) {
3052 if let ObjectId::Item(id) = dependent
3053 && dependencies.contains(&id)
3054 {
3055 sql_bail!(
3056 "replacement would cause {} to depend on itself",
3057 scx.catalog.minimal_qualification(target.name()),
3058 );
3059 }
3060 }
3061
3062 dependencies.insert(target.id());
3063
3064 for use_id in target.used_by() {
3065 let use_item = scx.get_item(use_id);
3066 if use_item.replacement_target() == Some(target.id()) {
3067 sql_bail!(
3068 "cannot replace {} because it already has a replacement: {}",
3069 scx.catalog.minimal_qualification(target.name()),
3070 scx.catalog.minimal_qualification(use_item.name()),
3071 );
3072 }
3073 }
3074
3075 replacement_target = Some(target.id());
3076 }
3077
3078 validate_view_dependencies(scx, &dependencies)?;
3079
3080 let full_name = scx.catalog.resolve_full_name(&name);
3082 let partial_name = PartialItemName::from(full_name.clone());
3083 if let (IfExistsBehavior::Error, Ok(item)) =
3086 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3087 {
3088 return Err(PlanError::ItemAlreadyExists {
3089 name: full_name.to_string(),
3090 item_type: item.item_type(),
3091 });
3092 }
3093
3094 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3095 name,
3096 materialized_view: MaterializedView {
3097 create_sql,
3098 expr,
3099 dependencies: DependencyIds(dependencies),
3100 column_names,
3101 replacement_target,
3102 cluster_id,
3103 target_replica,
3104 non_null_assertions,
3105 compaction_window,
3106 refresh_schedule,
3107 as_of,
3108 },
3109 replace,
3110 drop_ids,
3111 if_not_exists,
3112 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3113 }))
3114}
3115
3116generate_extracted_config!(
3117 MaterializedViewOption,
3118 (AssertNotNull, Ident, AllowMultiple),
3119 (PartitionBy, Vec<Ident>),
3120 (RetainHistory, OptionalDuration),
3121 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3122);
3123
3124pub fn describe_create_sink(
3125 _: &StatementContext,
3126 _: CreateSinkStatement<Aug>,
3127) -> Result<StatementDesc, PlanError> {
3128 Ok(StatementDesc::new(None))
3129}
3130
3131generate_extracted_config!(
3132 CreateSinkOption,
3133 (Snapshot, bool),
3134 (PartitionStrategy, String),
3135 (Version, u64),
3136 (CommitInterval, Duration)
3137);
3138
3139pub fn plan_create_sink(
3140 scx: &StatementContext,
3141 stmt: CreateSinkStatement<Aug>,
3142) -> Result<Plan, PlanError> {
3143 let Some(name) = stmt.name.clone() else {
3145 return Err(PlanError::MissingName(CatalogItemType::Sink));
3146 };
3147 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3148 let full_name = scx.catalog.resolve_full_name(&name);
3149 let partial_name = PartialItemName::from(full_name.clone());
3150 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3151 return Err(PlanError::ItemAlreadyExists {
3152 name: full_name.to_string(),
3153 item_type: item.item_type(),
3154 });
3155 }
3156
3157 plan_sink(scx, stmt)
3158}
3159
3160fn plan_sink(
3165 scx: &StatementContext,
3166 mut stmt: CreateSinkStatement<Aug>,
3167) -> Result<Plan, PlanError> {
3168 let CreateSinkStatement {
3169 name,
3170 in_cluster: _,
3171 from,
3172 connection,
3173 format,
3174 envelope,
3175 mode,
3176 if_not_exists,
3177 with_options,
3178 } = stmt.clone();
3179
3180 let Some(name) = name else {
3181 return Err(PlanError::MissingName(CatalogItemType::Sink));
3182 };
3183 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3184
3185 let envelope = match (&connection, envelope, mode) {
3186 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3188 SinkEnvelope::Upsert
3189 }
3190 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3191 SinkEnvelope::Debezium
3192 }
3193 (CreateSinkConnection::Kafka { .. }, None, None) => {
3194 sql_bail!("ENVELOPE clause is required")
3195 }
3196 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3197 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3198 }
3199 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3201 SinkEnvelope::Upsert
3202 }
3203 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3204 SinkEnvelope::Append
3205 }
3206 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3207 sql_bail!("MODE clause is required")
3208 }
3209 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3210 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3211 }
3212 };
3213
3214 let from_name = &from;
3215 let from = scx.get_item_by_resolved_name(&from)?;
3216
3217 {
3218 use CatalogItemType::*;
3219 match from.item_type() {
3220 Table | Source | MaterializedView => {
3221 if from.replacement_target().is_some() {
3222 let name = scx.catalog.minimal_qualification(from.name());
3223 return Err(PlanError::InvalidSinkFrom {
3224 name: name.to_string(),
3225 item_type: format!("replacement {}", from.item_type()),
3226 });
3227 }
3228 }
3229 Sink | View | Index | Type | Func | Secret | Connection => {
3230 let name = scx.catalog.minimal_qualification(from.name());
3231 return Err(PlanError::InvalidSinkFrom {
3232 name: name.to_string(),
3233 item_type: from.item_type().to_string(),
3234 });
3235 }
3236 }
3237 }
3238
3239 if from.id().is_system() {
3240 bail_unsupported!("creating a sink directly on a catalog object");
3241 }
3242
3243 let desc = from
3244 .relation_desc()
3245 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3246 let key_indices = match &connection {
3247 CreateSinkConnection::Kafka { key: Some(key), .. }
3248 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3249 let key_columns = key
3250 .key_columns
3251 .clone()
3252 .into_iter()
3253 .map(normalize::column_name)
3254 .collect::<Vec<_>>();
3255 let mut uniq = BTreeSet::new();
3256 for col in key_columns.iter() {
3257 if !uniq.insert(col) {
3258 sql_bail!("duplicate column referenced in KEY: {}", col);
3259 }
3260 }
3261 let indices = key_columns
3262 .iter()
3263 .map(|col| -> anyhow::Result<usize> {
3264 let name_idx =
3265 desc.get_by_name(col)
3266 .map(|(idx, _type)| idx)
3267 .ok_or_else(|| {
3268 sql_err!("column referenced in KEY does not exist: {}", col)
3269 })?;
3270 if desc.get_unambiguous_name(name_idx).is_none() {
3271 sql_err!("column referenced in KEY is ambiguous: {}", col);
3272 }
3273 Ok(name_idx)
3274 })
3275 .collect::<Result<Vec<_>, _>>()?;
3276
3277 if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3280 let cols: Vec<_> = desc.iter().collect();
3281 for &idx in &indices {
3282 let (col_name, col_type) = cols[idx];
3283 let scalar = &col_type.scalar_type;
3284 let is_valid = matches!(
3285 scalar,
3286 SqlScalarType::Bool
3288 | SqlScalarType::Int16
3289 | SqlScalarType::Int32
3290 | SqlScalarType::Int64
3291 | SqlScalarType::UInt16
3292 | SqlScalarType::UInt32
3293 | SqlScalarType::UInt64
3294 | SqlScalarType::Numeric { .. }
3296 | SqlScalarType::Date
3298 | SqlScalarType::Time
3299 | SqlScalarType::Timestamp { .. }
3300 | SqlScalarType::TimestampTz { .. }
3301 | SqlScalarType::Interval
3302 | SqlScalarType::MzTimestamp
3303 | SqlScalarType::String
3305 | SqlScalarType::Char { .. }
3306 | SqlScalarType::VarChar { .. }
3307 | SqlScalarType::PgLegacyChar
3308 | SqlScalarType::PgLegacyName
3309 | SqlScalarType::Bytes
3310 | SqlScalarType::Jsonb
3311 | SqlScalarType::Uuid
3313 | SqlScalarType::Oid
3314 | SqlScalarType::RegProc
3315 | SqlScalarType::RegType
3316 | SqlScalarType::RegClass
3317 | SqlScalarType::MzAclItem
3318 | SqlScalarType::AclItem
3319 | SqlScalarType::Int2Vector
3320 | SqlScalarType::Range { .. }
3322 );
3323 if !is_valid {
3324 return Err(PlanError::IcebergSinkUnsupportedKeyType {
3325 column: col_name.to_string(),
3326 column_type: format!("{:?}", scalar),
3327 });
3328 }
3329 }
3330 }
3331
3332 let is_valid_key = desc
3333 .typ()
3334 .keys
3335 .iter()
3336 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3337
3338 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3339 if key.not_enforced {
3340 scx.catalog
3341 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3342 key: key_columns.clone(),
3343 name: name.item.clone(),
3344 })
3345 } else {
3346 return Err(PlanError::UpsertSinkWithInvalidKey {
3347 name: from_name.full_name_str(),
3348 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3349 valid_keys: desc
3350 .typ()
3351 .keys
3352 .iter()
3353 .map(|key| {
3354 key.iter()
3355 .map(|col| desc.get_name(*col).as_str().into())
3356 .collect()
3357 })
3358 .collect(),
3359 });
3360 }
3361 }
3362 Some(indices)
3363 }
3364 CreateSinkConnection::Kafka { key: None, .. }
3365 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3366 };
3367
3368 if key_indices.is_some() && envelope == SinkEnvelope::Append {
3369 sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3370 }
3371
3372 if envelope == SinkEnvelope::Append {
3374 if let CreateSinkConnection::Iceberg { .. } = &connection {
3375 use mz_storage_types::sinks::{
3376 ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3377 };
3378 for (col_name, _) in desc.iter() {
3379 if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3380 || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3381 {
3382 sql_bail!(
3383 "column {} conflicts with the system column that MODE APPEND \
3384 adds to the Iceberg table",
3385 col_name.quoted()
3386 );
3387 }
3388 }
3389 }
3390 }
3391
3392 let headers_index = match &connection {
3393 CreateSinkConnection::Kafka {
3394 headers: Some(headers),
3395 ..
3396 } => {
3397 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3398
3399 match envelope {
3400 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3401 SinkEnvelope::Debezium => {
3402 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3403 }
3404 };
3405
3406 let headers = normalize::column_name(headers.clone());
3407 let (idx, ty) = desc
3408 .get_by_name(&headers)
3409 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3410
3411 if desc.get_unambiguous_name(idx).is_none() {
3412 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3413 }
3414
3415 match &ty.scalar_type {
3416 SqlScalarType::Map { value_type, .. }
3417 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3418 _ => sql_bail!(
3419 "HEADERS column must have type map[text => text] or map[text => bytea]"
3420 ),
3421 }
3422
3423 Some(idx)
3424 }
3425 _ => None,
3426 };
3427
3428 let relation_key_indices = desc.typ().keys.get(0).cloned();
3430
3431 let key_desc_and_indices = key_indices.map(|key_indices| {
3432 let cols = desc
3433 .iter()
3434 .map(|(name, ty)| (name.clone(), ty.clone()))
3435 .collect::<Vec<_>>();
3436 let (names, types): (Vec<_>, Vec<_>) =
3437 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3438 let typ = SqlRelationType::new(types);
3439 (RelationDesc::new(typ, names), key_indices)
3440 });
3441
3442 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3443 return Err(PlanError::UpsertSinkWithoutKey);
3444 }
3445
3446 let CreateSinkOptionExtracted {
3447 snapshot,
3448 version,
3449 partition_strategy: _,
3450 seen: _,
3451 commit_interval,
3452 } = with_options.try_into()?;
3453
3454 let connection_builder = match connection {
3455 CreateSinkConnection::Kafka {
3456 connection,
3457 options,
3458 ..
3459 } => kafka_sink_builder(
3460 scx,
3461 connection,
3462 options,
3463 format,
3464 relation_key_indices,
3465 key_desc_and_indices,
3466 headers_index,
3467 desc.into_owned(),
3468 envelope,
3469 from.id(),
3470 commit_interval,
3471 )?,
3472 CreateSinkConnection::Iceberg {
3473 connection,
3474 aws_connection,
3475 options,
3476 ..
3477 } => iceberg_sink_builder(
3478 scx,
3479 connection,
3480 aws_connection,
3481 options,
3482 relation_key_indices,
3483 key_desc_and_indices,
3484 commit_interval,
3485 &desc,
3486 )?,
3487 };
3488
3489 let with_snapshot = snapshot.unwrap_or(true);
3491 let version = version.unwrap_or(0);
3493
3494 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3498 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3499
3500 Ok(Plan::CreateSink(CreateSinkPlan {
3501 name,
3502 sink: Sink {
3503 create_sql,
3504 from: from.global_id(),
3505 connection: connection_builder,
3506 envelope,
3507 version,
3508 commit_interval,
3509 },
3510 with_snapshot,
3511 if_not_exists,
3512 in_cluster: in_cluster.id(),
3513 }))
3514}
3515
3516fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3517 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3518
3519 let existing_keys = desc
3520 .typ()
3521 .keys
3522 .iter()
3523 .map(|key_columns| {
3524 key_columns
3525 .iter()
3526 .map(|col| desc.get_name(*col).as_str())
3527 .join(", ")
3528 })
3529 .join(", ");
3530
3531 sql_err!(
3532 "Key constraint ({}) conflicts with existing key ({})",
3533 user_keys,
3534 existing_keys
3535 )
3536}
3537
3538#[derive(Debug, Default, PartialEq, Clone)]
3541pub struct CsrConfigOptionExtracted {
3542 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3543 pub(crate) avro_key_fullname: Option<String>,
3544 pub(crate) avro_value_fullname: Option<String>,
3545 pub(crate) null_defaults: bool,
3546 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3547 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3548 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3549 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3550}
3551
3552impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3553 type Error = crate::plan::PlanError;
3554 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3555 let mut extracted = CsrConfigOptionExtracted::default();
3556 let mut common_doc_comments = BTreeMap::new();
3557 for option in v {
3558 if !extracted.seen.insert(option.name.clone()) {
3559 return Err(PlanError::Unstructured({
3560 format!("{} specified more than once", option.name)
3561 }));
3562 }
3563 let option_name = option.name.clone();
3564 let option_name_str = option_name.to_ast_string_simple();
3565 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3566 option_name: option_name.to_ast_string_simple(),
3567 err: e.into(),
3568 };
3569 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3570 val.map(|s| match s {
3571 WithOptionValue::Value(Value::String(s)) => {
3572 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3573 }
3574 _ => Err("must be a string".to_string()),
3575 })
3576 .transpose()
3577 .map_err(PlanError::Unstructured)
3578 .map_err(better_error)
3579 };
3580 match option.name {
3581 CsrConfigOptionName::AvroKeyFullname => {
3582 extracted.avro_key_fullname =
3583 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3584 }
3585 CsrConfigOptionName::AvroValueFullname => {
3586 extracted.avro_value_fullname =
3587 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3588 }
3589 CsrConfigOptionName::NullDefaults => {
3590 extracted.null_defaults =
3591 <bool>::try_from_value(option.value).map_err(better_error)?;
3592 }
3593 CsrConfigOptionName::AvroDocOn(doc_on) => {
3594 let value = String::try_from_value(option.value.ok_or_else(|| {
3595 PlanError::InvalidOptionValue {
3596 option_name: option_name_str,
3597 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3598 }
3599 })?)
3600 .map_err(better_error)?;
3601 let key = match doc_on.identifier {
3602 DocOnIdentifier::Column(ast::ColumnName {
3603 relation: ResolvedItemName::Item { id, .. },
3604 column: ResolvedColumnReference::Column { name, index: _ },
3605 }) => DocTarget::Field {
3606 object_id: id,
3607 column_name: name,
3608 },
3609 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3610 DocTarget::Type(id)
3611 }
3612 _ => sql_bail!("invalid DOC ON identifier"),
3613 };
3614
3615 match doc_on.for_schema {
3616 DocOnSchema::KeyOnly => {
3617 extracted.key_doc_options.insert(key, value);
3618 }
3619 DocOnSchema::ValueOnly => {
3620 extracted.value_doc_options.insert(key, value);
3621 }
3622 DocOnSchema::All => {
3623 common_doc_comments.insert(key, value);
3624 }
3625 }
3626 }
3627 CsrConfigOptionName::KeyCompatibilityLevel => {
3628 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3629 }
3630 CsrConfigOptionName::ValueCompatibilityLevel => {
3631 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3632 }
3633 }
3634 }
3635
3636 for (key, value) in common_doc_comments {
3637 if !extracted.key_doc_options.contains_key(&key) {
3638 extracted.key_doc_options.insert(key.clone(), value.clone());
3639 }
3640 if !extracted.value_doc_options.contains_key(&key) {
3641 extracted.value_doc_options.insert(key, value);
3642 }
3643 }
3644 Ok(extracted)
3645 }
3646}
3647
3648fn iceberg_sink_builder(
3649 scx: &StatementContext,
3650 catalog_connection: ResolvedItemName,
3651 aws_connection: ResolvedItemName,
3652 options: Vec<IcebergSinkConfigOption<Aug>>,
3653 relation_key_indices: Option<Vec<usize>>,
3654 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3655 commit_interval: Option<Duration>,
3656 desc: &RelationDesc,
3657) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3658 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3659
3660 ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3664 .map_err(|e| sql_err!("{}", e))?;
3665 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3666 let catalog_connection_id = catalog_connection_item.id();
3667 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3668 let aws_connection_id = aws_connection_item.id();
3669 if !matches!(
3670 catalog_connection_item.connection()?,
3671 Connection::IcebergCatalog(_)
3672 ) {
3673 sql_bail!(
3674 "{} is not an iceberg catalog connection",
3675 scx.catalog
3676 .resolve_full_name(catalog_connection_item.name())
3677 .to_string()
3678 .quoted()
3679 );
3680 };
3681
3682 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3683 sql_bail!(
3684 "{} is not an AWS connection",
3685 scx.catalog
3686 .resolve_full_name(aws_connection_item.name())
3687 .to_string()
3688 .quoted()
3689 );
3690 }
3691
3692 let IcebergSinkConfigOptionExtracted {
3693 table,
3694 namespace,
3695 seen: _,
3696 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3697
3698 let Some(table) = table else {
3699 sql_bail!("Iceberg sink must specify TABLE");
3700 };
3701 let Some(namespace) = namespace else {
3702 sql_bail!("Iceberg sink must specify NAMESPACE");
3703 };
3704 if commit_interval.is_none() {
3705 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3706 }
3707
3708 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3709 catalog_connection_id,
3710 catalog_connection: catalog_connection_id,
3711 aws_connection_id,
3712 aws_connection: aws_connection_id,
3713 table,
3714 namespace,
3715 relation_key_indices,
3716 key_desc_and_indices,
3717 }))
3718}
3719
3720fn kafka_sink_builder(
3721 scx: &StatementContext,
3722 connection: ResolvedItemName,
3723 options: Vec<KafkaSinkConfigOption<Aug>>,
3724 format: Option<FormatSpecifier<Aug>>,
3725 relation_key_indices: Option<Vec<usize>>,
3726 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3727 headers_index: Option<usize>,
3728 value_desc: RelationDesc,
3729 envelope: SinkEnvelope,
3730 sink_from: CatalogItemId,
3731 commit_interval: Option<Duration>,
3732) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3733 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3735 let connection_id = connection_item.id();
3736 match connection_item.connection()? {
3737 Connection::Kafka(_) => (),
3738 _ => sql_bail!(
3739 "{} is not a kafka connection",
3740 scx.catalog.resolve_full_name(connection_item.name())
3741 ),
3742 };
3743
3744 if commit_interval.is_some() {
3745 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3746 }
3747
3748 let KafkaSinkConfigOptionExtracted {
3749 topic,
3750 compression_type,
3751 partition_by,
3752 progress_group_id_prefix,
3753 transactional_id_prefix,
3754 legacy_ids,
3755 topic_config,
3756 topic_metadata_refresh_interval,
3757 topic_partition_count,
3758 topic_replication_factor,
3759 seen: _,
3760 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3761
3762 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3763 (Some(_), Some(true)) => {
3764 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3765 }
3766 (None, Some(true)) => KafkaIdStyle::Legacy,
3767 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3768 };
3769
3770 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3771 (Some(_), Some(true)) => {
3772 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3773 }
3774 (None, Some(true)) => KafkaIdStyle::Legacy,
3775 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3776 };
3777
3778 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3779
3780 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3781 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3784 }
3785
3786 let assert_positive = |val: Option<i32>, name: &str| {
3787 if let Some(val) = val {
3788 if val <= 0 {
3789 sql_bail!("{} must be a positive integer", name);
3790 }
3791 }
3792 val.map(NonNeg::try_from)
3793 .transpose()
3794 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3795 };
3796 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3797 let topic_replication_factor =
3798 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3799
3800 let gen_avro_schema_options = |conn| {
3803 let CsrConnectionAvro {
3804 connection:
3805 CsrConnection {
3806 connection,
3807 options,
3808 },
3809 seed,
3810 key_strategy,
3811 value_strategy,
3812 } = conn;
3813 if seed.is_some() {
3814 sql_bail!("SEED option does not make sense with sinks");
3815 }
3816 if key_strategy.is_some() {
3817 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3818 }
3819 if value_strategy.is_some() {
3820 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3821 }
3822
3823 let item = scx.get_item_by_resolved_name(&connection)?;
3824 let csr_connection = match item.connection()? {
3825 Connection::Csr(_) => item.id(),
3826 _ => {
3827 sql_bail!(
3828 "{} is not a schema registry connection",
3829 scx.catalog
3830 .resolve_full_name(item.name())
3831 .to_string()
3832 .quoted()
3833 )
3834 }
3835 };
3836 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3837
3838 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3839 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3840 }
3841
3842 if key_desc_and_indices.is_some()
3843 && (extracted_options.avro_key_fullname.is_some()
3844 ^ extracted_options.avro_value_fullname.is_some())
3845 {
3846 sql_bail!(
3847 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3848 );
3849 }
3850
3851 Ok((csr_connection, extracted_options))
3852 };
3853
3854 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3855 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3856 Format::Bytes if desc.arity() == 1 => {
3857 let col_type = &desc.typ().column_types[0].scalar_type;
3858 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3859 bail_unsupported!(format!(
3860 "BYTES format with non-encodable type: {:?}",
3861 col_type
3862 ));
3863 }
3864
3865 Ok(KafkaSinkFormatType::Bytes)
3866 }
3867 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3868 Format::Bytes | Format::Text => {
3869 bail_unsupported!("BYTES or TEXT format with multiple columns")
3870 }
3871 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3872 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3873 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3874 let schema = if is_key {
3875 AvroSchemaGenerator::new(
3876 desc.clone(),
3877 false,
3878 options.key_doc_options,
3879 options.avro_key_fullname.as_deref().unwrap_or("row"),
3880 options.null_defaults,
3881 Some(sink_from),
3882 false,
3883 )?
3884 .schema()
3885 .to_string()
3886 } else {
3887 AvroSchemaGenerator::new(
3888 desc.clone(),
3889 matches!(envelope, SinkEnvelope::Debezium),
3890 options.value_doc_options,
3891 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3892 options.null_defaults,
3893 Some(sink_from),
3894 true,
3895 )?
3896 .schema()
3897 .to_string()
3898 };
3899 Ok(KafkaSinkFormatType::Avro {
3900 schema,
3901 compatibility_level: if is_key {
3902 options.key_compatibility_level
3903 } else {
3904 options.value_compatibility_level
3905 },
3906 csr_connection,
3907 })
3908 }
3909 format => bail_unsupported!(format!("sink format {:?}", format)),
3910 };
3911
3912 let partition_by = match &partition_by {
3913 Some(partition_by) => {
3914 let mut scope = Scope::from_source(None, value_desc.iter_names());
3915
3916 match envelope {
3917 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3918 SinkEnvelope::Debezium => {
3919 let key_indices: HashSet<_> = key_desc_and_indices
3920 .as_ref()
3921 .map(|(_desc, indices)| indices.as_slice())
3922 .unwrap_or_default()
3923 .into_iter()
3924 .collect();
3925 for (i, item) in scope.items.iter_mut().enumerate() {
3926 if !key_indices.contains(&i) {
3927 item.error_if_referenced = Some(|_table, column| {
3928 PlanError::InvalidPartitionByEnvelopeDebezium {
3929 column_name: column.to_string(),
3930 }
3931 });
3932 }
3933 }
3934 }
3935 };
3936
3937 let ecx = &ExprContext {
3938 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3939 name: "PARTITION BY",
3940 scope: &scope,
3941 relation_type: value_desc.typ(),
3942 allow_aggregates: false,
3943 allow_subqueries: false,
3944 allow_parameters: false,
3945 allow_windows: false,
3946 };
3947 let expr = plan_expr(ecx, partition_by)?.cast_to(
3948 ecx,
3949 CastContext::Assignment,
3950 &SqlScalarType::UInt64,
3951 )?;
3952 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
3953
3954 Some(expr)
3955 }
3956 _ => None,
3957 };
3958
3959 let format = match format {
3961 Some(FormatSpecifier::KeyValue { key, value }) => {
3962 let key_format = match key_desc_and_indices.as_ref() {
3963 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3964 None => None,
3965 };
3966 KafkaSinkFormat {
3967 value_format: map_format(value, &value_desc, false)?,
3968 key_format,
3969 }
3970 }
3971 Some(FormatSpecifier::Bare(format)) => {
3972 let key_format = match key_desc_and_indices.as_ref() {
3973 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3974 None => None,
3975 };
3976 KafkaSinkFormat {
3977 value_format: map_format(format, &value_desc, false)?,
3978 key_format,
3979 }
3980 }
3981 None => bail_unsupported!("sink without format"),
3982 };
3983
3984 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3985 connection_id,
3986 connection: connection_id,
3987 format,
3988 topic: topic_name,
3989 relation_key_indices,
3990 key_desc_and_indices,
3991 headers_index,
3992 value_desc,
3993 partition_by,
3994 compression_type,
3995 progress_group_id,
3996 transactional_id,
3997 topic_options: KafkaTopicOptions {
3998 partition_count: topic_partition_count,
3999 replication_factor: topic_replication_factor,
4000 topic_config: topic_config.unwrap_or_default(),
4001 },
4002 topic_metadata_refresh_interval,
4003 }))
4004}
4005
4006pub fn describe_create_index(
4007 _: &StatementContext,
4008 _: CreateIndexStatement<Aug>,
4009) -> Result<StatementDesc, PlanError> {
4010 Ok(StatementDesc::new(None))
4011}
4012
4013pub fn plan_create_index(
4014 scx: &StatementContext,
4015 mut stmt: CreateIndexStatement<Aug>,
4016) -> Result<Plan, PlanError> {
4017 let CreateIndexStatement {
4018 name,
4019 on_name,
4020 in_cluster,
4021 key_parts,
4022 with_options,
4023 if_not_exists,
4024 } = &mut stmt;
4025 let on = scx.get_item_by_resolved_name(on_name)?;
4026
4027 {
4028 use CatalogItemType::*;
4029 match on.item_type() {
4030 Table | Source | View | MaterializedView => {
4031 if on.replacement_target().is_some() {
4032 sql_bail!(
4033 "index cannot be created on {} because it is a replacement {}",
4034 on_name.full_name_str(),
4035 on.item_type(),
4036 );
4037 }
4038 }
4039 Sink | Index | Type | Func | Secret | Connection => {
4040 sql_bail!(
4041 "index cannot be created on {} because it is a {}",
4042 on_name.full_name_str(),
4043 on.item_type(),
4044 );
4045 }
4046 }
4047 }
4048
4049 let on_desc = on
4050 .relation_desc()
4051 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4052
4053 let filled_key_parts = match key_parts {
4054 Some(kp) => kp.to_vec(),
4055 None => {
4056 let mut name_counts = BTreeMap::new();
4061 for name in on_desc.iter_names() {
4062 *name_counts.entry(name).or_insert(0usize) += 1;
4063 }
4064 let key = on_desc.typ().default_key();
4065 key.iter()
4066 .map(|i| {
4067 let name = on_desc.get_name(*i);
4068 if name_counts.get(name).copied() == Some(1) {
4069 Expr::Identifier(vec![name.clone().into()])
4070 } else {
4071 Expr::Value(Value::Number((i + 1).to_string()))
4072 }
4073 })
4074 .collect()
4075 }
4076 };
4077 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4078
4079 let index_name = if let Some(name) = name {
4080 QualifiedItemName {
4081 qualifiers: on.name().qualifiers.clone(),
4082 item: normalize::ident(name.clone()),
4083 }
4084 } else {
4085 let mut idx_name = QualifiedItemName {
4086 qualifiers: on.name().qualifiers.clone(),
4087 item: on.name().item.clone(),
4088 };
4089 if key_parts.is_none() {
4090 idx_name.item += "_primary_idx";
4092 } else {
4093 let index_name_col_suffix = keys
4096 .iter()
4097 .map(|k| match k {
4098 mz_expr::MirScalarExpr::Column(i, name) => {
4099 match (on_desc.get_unambiguous_name(*i), &name.0) {
4100 (Some(col_name), _) => col_name.to_string(),
4101 (None, Some(name)) => name.to_string(),
4102 (None, None) => format!("{}", i + 1),
4103 }
4104 }
4105 _ => "expr".to_string(),
4106 })
4107 .join("_");
4108 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4109 .expect("write on strings cannot fail");
4110 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4111 }
4112
4113 if !*if_not_exists {
4114 scx.catalog.find_available_name(idx_name)
4115 } else {
4116 idx_name
4117 }
4118 };
4119
4120 let full_name = scx.catalog.resolve_full_name(&index_name);
4122 let partial_name = PartialItemName::from(full_name.clone());
4123 if let (Ok(item), false, false) = (
4131 scx.catalog.resolve_item_or_type(&partial_name),
4132 *if_not_exists,
4133 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4134 ) {
4135 return Err(PlanError::ItemAlreadyExists {
4136 name: full_name.to_string(),
4137 item_type: item.item_type(),
4138 });
4139 }
4140
4141 let options = plan_index_options(scx, with_options.clone())?;
4142 let cluster_id = match in_cluster {
4143 None => scx.resolve_cluster(None)?.id(),
4144 Some(in_cluster) => in_cluster.id,
4145 };
4146
4147 *in_cluster = Some(ResolvedClusterName {
4148 id: cluster_id,
4149 print_name: None,
4150 });
4151
4152 *name = Some(Ident::new(index_name.item.clone())?);
4154 *key_parts = Some(filled_key_parts);
4155 let if_not_exists = *if_not_exists;
4156
4157 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4158 let compaction_window = options.iter().find_map(|o| {
4159 #[allow(irrefutable_let_patterns)]
4160 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4161 Some(lcw.clone())
4162 } else {
4163 None
4164 }
4165 });
4166
4167 Ok(Plan::CreateIndex(CreateIndexPlan {
4168 name: index_name,
4169 index: Index {
4170 create_sql,
4171 on: on.global_id(),
4172 keys,
4173 cluster_id,
4174 compaction_window,
4175 },
4176 if_not_exists,
4177 }))
4178}
4179
4180pub fn describe_create_type(
4181 _: &StatementContext,
4182 _: CreateTypeStatement<Aug>,
4183) -> Result<StatementDesc, PlanError> {
4184 Ok(StatementDesc::new(None))
4185}
4186
4187pub fn plan_create_type(
4188 scx: &StatementContext,
4189 stmt: CreateTypeStatement<Aug>,
4190) -> Result<Plan, PlanError> {
4191 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4192 let CreateTypeStatement { name, as_type, .. } = stmt;
4193
4194 fn validate_data_type(
4195 scx: &StatementContext,
4196 data_type: ResolvedDataType,
4197 as_type: &str,
4198 key: &str,
4199 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4200 let (id, modifiers) = match data_type {
4201 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4202 _ => sql_bail!(
4203 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4204 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4205 as_type,
4206 key,
4207 data_type.human_readable_name(),
4208 ),
4209 };
4210
4211 let item = scx.catalog.get_item(&id);
4212 match item.type_details() {
4213 None => sql_bail!(
4214 "{} must be of class type, but received {} which is of class {}",
4215 key,
4216 scx.catalog.resolve_full_name(item.name()),
4217 item.item_type()
4218 ),
4219 Some(CatalogTypeDetails {
4220 typ: CatalogType::Char,
4221 ..
4222 }) => {
4223 bail_unsupported!("embedding char type in a list or map")
4224 }
4225 _ => {
4226 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4228
4229 Ok((id, modifiers))
4230 }
4231 }
4232 }
4233
4234 let inner = match as_type {
4235 CreateTypeAs::List { options } => {
4236 let CreateTypeListOptionExtracted {
4237 element_type,
4238 seen: _,
4239 } = CreateTypeListOptionExtracted::try_from(options)?;
4240 let element_type =
4241 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4242 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4243 CatalogType::List {
4244 element_reference: id,
4245 element_modifiers: modifiers,
4246 }
4247 }
4248 CreateTypeAs::Map { options } => {
4249 let CreateTypeMapOptionExtracted {
4250 key_type,
4251 value_type,
4252 seen: _,
4253 } = CreateTypeMapOptionExtracted::try_from(options)?;
4254 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4255 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4256 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4257 let (value_id, value_modifiers) =
4258 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4259 CatalogType::Map {
4260 key_reference: key_id,
4261 key_modifiers,
4262 value_reference: value_id,
4263 value_modifiers,
4264 }
4265 }
4266 CreateTypeAs::Record { column_defs } => {
4267 let mut fields = vec![];
4268 for column_def in column_defs {
4269 let data_type = column_def.data_type;
4270 let key = ident(column_def.name.clone());
4271 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4272 fields.push(CatalogRecordField {
4273 name: ColumnName::from(key.clone()),
4274 type_reference: id,
4275 type_modifiers: modifiers,
4276 });
4277 }
4278 CatalogType::Record { fields }
4279 }
4280 };
4281
4282 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4283
4284 let full_name = scx.catalog.resolve_full_name(&name);
4286 let partial_name = PartialItemName::from(full_name.clone());
4287 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4290 if item.item_type().conflicts_with_type() {
4291 return Err(PlanError::ItemAlreadyExists {
4292 name: full_name.to_string(),
4293 item_type: item.item_type(),
4294 });
4295 }
4296 }
4297
4298 Ok(Plan::CreateType(CreateTypePlan {
4299 name,
4300 typ: Type { create_sql, inner },
4301 }))
4302}
4303
4304generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4305
4306generate_extracted_config!(
4307 CreateTypeMapOption,
4308 (KeyType, ResolvedDataType),
4309 (ValueType, ResolvedDataType)
4310);
4311
4312#[derive(Debug)]
4313pub enum PlannedAlterRoleOption {
4314 Attributes(PlannedRoleAttributes),
4315 Variable(PlannedRoleVariable),
4316}
4317
4318#[derive(Debug, Clone)]
4319pub struct PlannedRoleAttributes {
4320 pub inherit: Option<bool>,
4321 pub password: Option<Password>,
4322 pub scram_iterations: Option<NonZeroU32>,
4323 pub nopassword: Option<bool>,
4327 pub superuser: Option<bool>,
4328 pub login: Option<bool>,
4329}
4330
4331fn plan_role_attributes(
4332 options: Vec<RoleAttribute>,
4333 scx: &StatementContext,
4334) -> Result<PlannedRoleAttributes, PlanError> {
4335 let mut planned_attributes = PlannedRoleAttributes {
4336 inherit: None,
4337 password: None,
4338 scram_iterations: None,
4339 superuser: None,
4340 login: None,
4341 nopassword: None,
4342 };
4343
4344 for option in options {
4345 match option {
4346 RoleAttribute::Inherit | RoleAttribute::NoInherit
4347 if planned_attributes.inherit.is_some() =>
4348 {
4349 sql_bail!("conflicting or redundant options");
4350 }
4351 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4352 bail_never_supported!(
4353 "CREATECLUSTER attribute",
4354 "sql/create-role/#details",
4355 "Use system privileges instead."
4356 );
4357 }
4358 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4359 bail_never_supported!(
4360 "CREATEDB attribute",
4361 "sql/create-role/#details",
4362 "Use system privileges instead."
4363 );
4364 }
4365 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4366 bail_never_supported!(
4367 "CREATEROLE attribute",
4368 "sql/create-role/#details",
4369 "Use system privileges instead."
4370 );
4371 }
4372 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4373 sql_bail!("conflicting or redundant options");
4374 }
4375
4376 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4377 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4378 RoleAttribute::Password(password) => {
4379 if let Some(password) = password {
4380 planned_attributes.password = Some(password.into());
4381 planned_attributes.scram_iterations =
4382 Some(scx.catalog.system_vars().scram_iterations())
4383 } else {
4384 planned_attributes.nopassword = Some(true);
4385 }
4386 }
4387 RoleAttribute::SuperUser => {
4388 if planned_attributes.superuser == Some(false) {
4389 sql_bail!("conflicting or redundant options");
4390 }
4391 planned_attributes.superuser = Some(true);
4392 }
4393 RoleAttribute::NoSuperUser => {
4394 if planned_attributes.superuser == Some(true) {
4395 sql_bail!("conflicting or redundant options");
4396 }
4397 planned_attributes.superuser = Some(false);
4398 }
4399 RoleAttribute::Login => {
4400 if planned_attributes.login == Some(false) {
4401 sql_bail!("conflicting or redundant options");
4402 }
4403 planned_attributes.login = Some(true);
4404 }
4405 RoleAttribute::NoLogin => {
4406 if planned_attributes.login == Some(true) {
4407 sql_bail!("conflicting or redundant options");
4408 }
4409 planned_attributes.login = Some(false);
4410 }
4411 }
4412 }
4413 if planned_attributes.inherit == Some(false) {
4414 bail_unsupported!("non inherit roles");
4415 }
4416
4417 Ok(planned_attributes)
4418}
4419
4420#[derive(Debug)]
4421pub enum PlannedRoleVariable {
4422 Set { name: String, value: VariableValue },
4423 Reset { name: String },
4424}
4425
4426impl PlannedRoleVariable {
4427 pub fn name(&self) -> &str {
4428 match self {
4429 PlannedRoleVariable::Set { name, .. } => name,
4430 PlannedRoleVariable::Reset { name } => name,
4431 }
4432 }
4433}
4434
4435fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4436 let plan = match variable {
4437 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4438 name: name.to_string(),
4439 value: scl::plan_set_variable_to(value)?,
4440 },
4441 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4442 name: name.to_string(),
4443 },
4444 };
4445 Ok(plan)
4446}
4447
4448pub fn describe_create_role(
4449 _: &StatementContext,
4450 _: CreateRoleStatement,
4451) -> Result<StatementDesc, PlanError> {
4452 Ok(StatementDesc::new(None))
4453}
4454
4455pub fn plan_create_role(
4456 scx: &StatementContext,
4457 CreateRoleStatement { name, options }: CreateRoleStatement,
4458) -> Result<Plan, PlanError> {
4459 let attributes = plan_role_attributes(options, scx)?;
4460 Ok(Plan::CreateRole(CreateRolePlan {
4461 name: normalize::ident(name),
4462 attributes: attributes.into(),
4463 }))
4464}
4465
4466pub fn plan_create_network_policy(
4467 ctx: &StatementContext,
4468 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4469) -> Result<Plan, PlanError> {
4470 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4471 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4472
4473 let Some(rule_defs) = policy_options.rules else {
4474 sql_bail!("RULES must be specified when creating network policies.");
4475 };
4476
4477 let mut rules = vec![];
4478 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4479 let NetworkPolicyRuleOptionExtracted {
4480 seen: _,
4481 direction,
4482 action,
4483 address,
4484 } = options.try_into()?;
4485 let (direction, action, address) = match (direction, action, address) {
4486 (Some(direction), Some(action), Some(address)) => (
4487 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4488 NetworkPolicyRuleAction::try_from(action.as_str())?,
4489 PolicyAddress::try_from(address.as_str())?,
4490 ),
4491 (_, _, _) => {
4492 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4493 }
4494 };
4495 rules.push(NetworkPolicyRule {
4496 name: normalize::ident(name),
4497 direction,
4498 action,
4499 address,
4500 });
4501 }
4502
4503 if rules.len()
4504 > ctx
4505 .catalog
4506 .system_vars()
4507 .max_rules_per_network_policy()
4508 .try_into()?
4509 {
4510 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4511 }
4512
4513 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4514 name: normalize::ident(name),
4515 rules,
4516 }))
4517}
4518
4519pub fn plan_alter_network_policy(
4520 ctx: &StatementContext,
4521 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4522) -> Result<Plan, PlanError> {
4523 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4524
4525 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4526 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4527
4528 let Some(rule_defs) = policy_options.rules else {
4529 sql_bail!("RULES must be specified when creating network policies.");
4530 };
4531
4532 let mut rules = vec![];
4533 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4534 let NetworkPolicyRuleOptionExtracted {
4535 seen: _,
4536 direction,
4537 action,
4538 address,
4539 } = options.try_into()?;
4540
4541 let (direction, action, address) = match (direction, action, address) {
4542 (Some(direction), Some(action), Some(address)) => (
4543 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4544 NetworkPolicyRuleAction::try_from(action.as_str())?,
4545 PolicyAddress::try_from(address.as_str())?,
4546 ),
4547 (_, _, _) => {
4548 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4549 }
4550 };
4551 rules.push(NetworkPolicyRule {
4552 name: normalize::ident(name),
4553 direction,
4554 action,
4555 address,
4556 });
4557 }
4558 if rules.len()
4559 > ctx
4560 .catalog
4561 .system_vars()
4562 .max_rules_per_network_policy()
4563 .try_into()?
4564 {
4565 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4566 }
4567
4568 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4569 id: policy.id(),
4570 name: normalize::ident(name),
4571 rules,
4572 }))
4573}
4574
4575pub fn describe_create_cluster(
4576 _: &StatementContext,
4577 _: CreateClusterStatement<Aug>,
4578) -> Result<StatementDesc, PlanError> {
4579 Ok(StatementDesc::new(None))
4580}
4581
4582generate_extracted_config!(
4588 ClusterOption,
4589 (AvailabilityZones, Vec<String>),
4590 (Disk, bool),
4591 (IntrospectionDebugging, bool),
4592 (IntrospectionInterval, OptionalDuration),
4593 (Managed, bool),
4594 (Replicas, Vec<ReplicaDefinition<Aug>>),
4595 (ReplicationFactor, u32),
4596 (Size, String),
4597 (Schedule, ClusterScheduleOptionValue),
4598 (WorkloadClass, OptionalString)
4599);
4600
4601generate_extracted_config!(
4602 NetworkPolicyOption,
4603 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4604);
4605
4606generate_extracted_config!(
4607 NetworkPolicyRuleOption,
4608 (Direction, String),
4609 (Action, String),
4610 (Address, String)
4611);
4612
4613generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4614
4615generate_extracted_config!(
4616 ClusterAlterUntilReadyOption,
4617 (Timeout, Duration),
4618 (OnTimeout, String)
4619);
4620
4621generate_extracted_config!(
4622 ClusterFeature,
4623 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4624 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4625 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4626 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4627 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4628 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4629 (
4630 EnableProjectionPushdownAfterRelationCse,
4631 Option<bool>,
4632 Default(None)
4633 )
4634);
4635
4636pub fn plan_create_cluster(
4640 scx: &StatementContext,
4641 stmt: CreateClusterStatement<Aug>,
4642) -> Result<Plan, PlanError> {
4643 let plan = plan_create_cluster_inner(scx, stmt)?;
4644
4645 if let CreateClusterVariant::Managed(_) = &plan.variant {
4647 let stmt = unplan_create_cluster(scx, plan.clone())
4648 .map_err(|e| PlanError::Replan(e.to_string()))?;
4649 let create_sql = stmt.to_ast_string_stable();
4650 let stmt = parse::parse(&create_sql)
4651 .map_err(|e| PlanError::Replan(e.to_string()))?
4652 .into_element()
4653 .ast;
4654 let (stmt, _resolved_ids) =
4655 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4656 let stmt = match stmt {
4657 Statement::CreateCluster(stmt) => stmt,
4658 stmt => {
4659 return Err(PlanError::Replan(format!(
4660 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4661 )));
4662 }
4663 };
4664 let replan =
4665 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4666 if plan != replan {
4667 return Err(PlanError::Replan(format!(
4668 "replan does not match: plan={plan:?}, replan={replan:?}"
4669 )));
4670 }
4671 }
4672
4673 Ok(Plan::CreateCluster(plan))
4674}
4675
4676pub fn plan_create_cluster_inner(
4677 scx: &StatementContext,
4678 CreateClusterStatement {
4679 name,
4680 options,
4681 features,
4682 }: CreateClusterStatement<Aug>,
4683) -> Result<CreateClusterPlan, PlanError> {
4684 let ClusterOptionExtracted {
4685 availability_zones,
4686 introspection_debugging,
4687 introspection_interval,
4688 managed,
4689 replicas,
4690 replication_factor,
4691 seen: _,
4692 size,
4693 disk,
4694 schedule,
4695 workload_class,
4696 }: ClusterOptionExtracted = options.try_into()?;
4697
4698 let managed = managed.unwrap_or_else(|| replicas.is_none());
4699
4700 if !scx.catalog.active_role_id().is_system() {
4701 if !features.is_empty() {
4702 sql_bail!("FEATURES not supported for non-system users");
4703 }
4704 if workload_class.is_some() {
4705 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4706 }
4707 }
4708
4709 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4710 let workload_class = workload_class.and_then(|v| v.0);
4711
4712 if managed {
4713 if replicas.is_some() {
4714 sql_bail!("REPLICAS not supported for managed clusters");
4715 }
4716 let Some(size) = size else {
4717 sql_bail!("SIZE must be specified for managed clusters");
4718 };
4719
4720 if disk.is_some() {
4721 if scx.catalog.is_cluster_size_cc(&size) {
4725 sql_bail!(
4726 "DISK option not supported for modern cluster sizes because disk is always enabled"
4727 );
4728 }
4729
4730 scx.catalog
4731 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4732 }
4733
4734 let compute = plan_compute_replica_config(
4735 introspection_interval,
4736 introspection_debugging.unwrap_or(false),
4737 )?;
4738
4739 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4740 replication_factor.unwrap_or_else(|| {
4741 scx.catalog
4742 .system_vars()
4743 .default_cluster_replication_factor()
4744 })
4745 } else {
4746 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4747 if replication_factor.is_some() {
4748 sql_bail!(
4749 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4750 );
4751 }
4752 0
4756 };
4757 let availability_zones = availability_zones.unwrap_or_default();
4758
4759 if !availability_zones.is_empty() {
4760 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4761 }
4762
4763 let ClusterFeatureExtracted {
4765 reoptimize_imported_views,
4766 enable_eager_delta_joins,
4767 enable_new_outer_join_lowering,
4768 enable_variadic_left_join_lowering,
4769 enable_letrec_fixpoint_analysis,
4770 enable_join_prioritize_arranged,
4771 enable_projection_pushdown_after_relation_cse,
4772 seen: _,
4773 } = ClusterFeatureExtracted::try_from(features)?;
4774 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4775 reoptimize_imported_views,
4776 enable_eager_delta_joins,
4777 enable_new_outer_join_lowering,
4778 enable_variadic_left_join_lowering,
4779 enable_letrec_fixpoint_analysis,
4780 enable_join_prioritize_arranged,
4781 enable_projection_pushdown_after_relation_cse,
4782 ..Default::default()
4783 };
4784
4785 let schedule = plan_cluster_schedule(schedule)?;
4786
4787 Ok(CreateClusterPlan {
4788 name: normalize::ident(name),
4789 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4790 replication_factor,
4791 size,
4792 availability_zones,
4793 compute,
4794 optimizer_feature_overrides,
4795 schedule,
4796 }),
4797 workload_class,
4798 })
4799 } else {
4800 let Some(replica_defs) = replicas else {
4801 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4802 };
4803 if availability_zones.is_some() {
4804 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4805 }
4806 if replication_factor.is_some() {
4807 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4808 }
4809 if introspection_debugging.is_some() {
4810 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4811 }
4812 if introspection_interval.is_some() {
4813 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4814 }
4815 if size.is_some() {
4816 sql_bail!("SIZE not supported for unmanaged clusters");
4817 }
4818 if disk.is_some() {
4819 sql_bail!("DISK not supported for unmanaged clusters");
4820 }
4821 if !features.is_empty() {
4822 sql_bail!("FEATURES not supported for unmanaged clusters");
4823 }
4824 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4825 sql_bail!(
4826 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4827 );
4828 }
4829
4830 let mut replicas = vec![];
4831 for ReplicaDefinition { name, options } in replica_defs {
4832 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4833 }
4834
4835 Ok(CreateClusterPlan {
4836 name: normalize::ident(name),
4837 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4838 workload_class,
4839 })
4840 }
4841}
4842
4843pub fn unplan_create_cluster(
4847 scx: &StatementContext,
4848 CreateClusterPlan {
4849 name,
4850 variant,
4851 workload_class,
4852 }: CreateClusterPlan,
4853) -> Result<CreateClusterStatement<Aug>, PlanError> {
4854 match variant {
4855 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4856 replication_factor,
4857 size,
4858 availability_zones,
4859 compute,
4860 optimizer_feature_overrides,
4861 schedule,
4862 }) => {
4863 let schedule = unplan_cluster_schedule(schedule);
4864 let OptimizerFeatureOverrides {
4865 enable_guard_subquery_tablefunc: _,
4866 enable_consolidate_after_union_negate: _,
4867 enable_reduce_mfp_fusion: _,
4868 enable_cardinality_estimates: _,
4869 persist_fast_path_limit: _,
4870 reoptimize_imported_views,
4871 enable_eager_delta_joins,
4872 enable_new_outer_join_lowering,
4873 enable_variadic_left_join_lowering,
4874 enable_letrec_fixpoint_analysis,
4875 enable_join_prioritize_arranged,
4876 enable_projection_pushdown_after_relation_cse,
4877 enable_less_reduce_in_eqprop: _,
4878 enable_dequadratic_eqprop_map: _,
4879 enable_eq_classes_withholding_errors: _,
4880 enable_fast_path_plan_insights: _,
4881 enable_cast_elimination: _,
4882 enable_case_literal_transform: _,
4883 enable_simplify_quantified_comparisons: _,
4884 enable_coalesce_case_transform: _,
4885 } = optimizer_feature_overrides;
4886 let features_extracted = ClusterFeatureExtracted {
4888 seen: Default::default(),
4890 reoptimize_imported_views,
4891 enable_eager_delta_joins,
4892 enable_new_outer_join_lowering,
4893 enable_variadic_left_join_lowering,
4894 enable_letrec_fixpoint_analysis,
4895 enable_join_prioritize_arranged,
4896 enable_projection_pushdown_after_relation_cse,
4897 };
4898 let features = features_extracted.into_values(scx.catalog);
4899 let availability_zones = if availability_zones.is_empty() {
4900 None
4901 } else {
4902 Some(availability_zones)
4903 };
4904 let (introspection_interval, introspection_debugging) =
4905 unplan_compute_replica_config(compute);
4906 let replication_factor = match &schedule {
4909 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4910 ClusterScheduleOptionValue::Refresh { .. } => {
4911 assert!(
4912 replication_factor <= 1,
4913 "replication factor, {replication_factor:?}, must be <= 1"
4914 );
4915 None
4916 }
4917 };
4918 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4919 let options_extracted = ClusterOptionExtracted {
4920 seen: Default::default(),
4922 availability_zones,
4923 disk: None,
4924 introspection_debugging: Some(introspection_debugging),
4925 introspection_interval,
4926 managed: Some(true),
4927 replicas: None,
4928 replication_factor,
4929 size: Some(size),
4930 schedule: Some(schedule),
4931 workload_class,
4932 };
4933 let options = options_extracted.into_values(scx.catalog);
4934 let name = Ident::new_unchecked(name);
4935 Ok(CreateClusterStatement {
4936 name,
4937 options,
4938 features,
4939 })
4940 }
4941 CreateClusterVariant::Unmanaged(_) => {
4942 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4943 }
4944 }
4945}
4946
4947generate_extracted_config!(
4948 ReplicaOption,
4949 (AvailabilityZone, String),
4950 (BilledAs, String),
4951 (ComputeAddresses, Vec<String>),
4952 (ComputectlAddresses, Vec<String>),
4953 (Disk, bool),
4954 (Internal, bool, Default(false)),
4955 (IntrospectionDebugging, bool, Default(false)),
4956 (IntrospectionInterval, OptionalDuration),
4957 (Size, String),
4958 (StorageAddresses, Vec<String>),
4959 (StoragectlAddresses, Vec<String>),
4960 (Workers, u16)
4961);
4962
4963fn plan_replica_config(
4964 scx: &StatementContext,
4965 options: Vec<ReplicaOption<Aug>>,
4966) -> Result<ReplicaConfig, PlanError> {
4967 let ReplicaOptionExtracted {
4968 availability_zone,
4969 billed_as,
4970 computectl_addresses,
4971 disk,
4972 internal,
4973 introspection_debugging,
4974 introspection_interval,
4975 size,
4976 storagectl_addresses,
4977 ..
4978 }: ReplicaOptionExtracted = options.try_into()?;
4979
4980 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4981
4982 match (
4983 size,
4984 availability_zone,
4985 billed_as,
4986 storagectl_addresses,
4987 computectl_addresses,
4988 ) {
4989 (None, _, None, None, None) => {
4991 sql_bail!("SIZE option must be specified");
4994 }
4995 (Some(size), availability_zone, billed_as, None, None) => {
4996 if disk.is_some() {
4997 if scx.catalog.is_cluster_size_cc(&size) {
5001 sql_bail!(
5002 "DISK option not supported for modern cluster sizes because disk is always enabled"
5003 );
5004 }
5005
5006 scx.catalog
5007 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5008 }
5009
5010 Ok(ReplicaConfig::Orchestrated {
5011 size,
5012 availability_zone,
5013 compute,
5014 billed_as,
5015 internal,
5016 })
5017 }
5018
5019 (None, None, None, storagectl_addresses, computectl_addresses) => {
5020 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5021
5022 let Some(storagectl_addrs) = storagectl_addresses else {
5026 sql_bail!("missing STORAGECTL ADDRESSES option");
5027 };
5028 let Some(computectl_addrs) = computectl_addresses else {
5029 sql_bail!("missing COMPUTECTL ADDRESSES option");
5030 };
5031
5032 if storagectl_addrs.len() != computectl_addrs.len() {
5033 sql_bail!(
5034 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5035 );
5036 }
5037
5038 if disk.is_some() {
5039 sql_bail!("DISK can't be specified for unorchestrated clusters");
5040 }
5041
5042 Ok(ReplicaConfig::Unorchestrated {
5043 storagectl_addrs,
5044 computectl_addrs,
5045 compute,
5046 })
5047 }
5048 _ => {
5049 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5052 }
5053 }
5054}
5055
5056fn plan_compute_replica_config(
5060 introspection_interval: Option<OptionalDuration>,
5061 introspection_debugging: bool,
5062) -> Result<ComputeReplicaConfig, PlanError> {
5063 let introspection_interval = introspection_interval
5064 .map(|OptionalDuration(i)| i)
5065 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5066 let introspection = match introspection_interval {
5067 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5068 interval,
5069 debugging: introspection_debugging,
5070 }),
5071 None if introspection_debugging => {
5072 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5073 }
5074 None => None,
5075 };
5076 let compute = ComputeReplicaConfig { introspection };
5077 Ok(compute)
5078}
5079
5080fn unplan_compute_replica_config(
5084 compute_replica_config: ComputeReplicaConfig,
5085) -> (Option<OptionalDuration>, bool) {
5086 match compute_replica_config.introspection {
5087 Some(ComputeReplicaIntrospectionConfig {
5088 debugging,
5089 interval,
5090 }) => (Some(OptionalDuration(Some(interval))), debugging),
5091 None => (Some(OptionalDuration(None)), false),
5092 }
5093}
5094
5095fn plan_cluster_schedule(
5099 schedule: ClusterScheduleOptionValue,
5100) -> Result<ClusterSchedule, PlanError> {
5101 Ok(match schedule {
5102 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5103 ClusterScheduleOptionValue::Refresh {
5105 hydration_time_estimate: None,
5106 } => ClusterSchedule::Refresh {
5107 hydration_time_estimate: Duration::from_millis(0),
5108 },
5109 ClusterScheduleOptionValue::Refresh {
5111 hydration_time_estimate: Some(interval_value),
5112 } => {
5113 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5114 if interval.as_microseconds() < 0 {
5115 sql_bail!(
5116 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5117 interval
5118 );
5119 }
5120 if interval.months != 0 {
5121 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5125 }
5126 let duration = interval.duration()?;
5127 if u64::try_from(duration.as_millis()).is_err()
5128 || Interval::from_duration(&duration).is_err()
5129 {
5130 sql_bail!("HYDRATION TIME ESTIMATE too large");
5131 }
5132 ClusterSchedule::Refresh {
5133 hydration_time_estimate: duration,
5134 }
5135 }
5136 })
5137}
5138
5139fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5143 match schedule {
5144 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5145 ClusterSchedule::Refresh {
5146 hydration_time_estimate,
5147 } => {
5148 let interval = Interval::from_duration(&hydration_time_estimate)
5149 .expect("planning ensured that this is convertible back to Interval");
5150 let interval_value = literal::unplan_interval(&interval);
5151 ClusterScheduleOptionValue::Refresh {
5152 hydration_time_estimate: Some(interval_value),
5153 }
5154 }
5155 }
5156}
5157
5158pub fn describe_create_cluster_replica(
5159 _: &StatementContext,
5160 _: CreateClusterReplicaStatement<Aug>,
5161) -> Result<StatementDesc, PlanError> {
5162 Ok(StatementDesc::new(None))
5163}
5164
5165pub fn plan_create_cluster_replica(
5166 scx: &StatementContext,
5167 CreateClusterReplicaStatement {
5168 definition: ReplicaDefinition { name, options },
5169 of_cluster,
5170 }: CreateClusterReplicaStatement<Aug>,
5171) -> Result<Plan, PlanError> {
5172 let cluster = scx
5173 .catalog
5174 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5175 let current_replica_count = cluster.replica_ids().iter().count();
5176 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5177 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5178 return Err(PlanError::CreateReplicaFailStorageObjects {
5179 current_replica_count,
5180 internal_replica_count,
5181 hypothetical_replica_count: current_replica_count + 1,
5182 });
5183 }
5184
5185 let config = plan_replica_config(scx, options)?;
5186
5187 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5188 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5189 return Err(PlanError::MangedReplicaName(name.into_string()));
5190 }
5191 } else {
5192 ensure_cluster_is_not_managed(scx, cluster.id())?;
5193 }
5194
5195 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5196 name: normalize::ident(name),
5197 cluster_id: cluster.id(),
5198 config,
5199 }))
5200}
5201
5202pub fn describe_create_secret(
5203 _: &StatementContext,
5204 _: CreateSecretStatement<Aug>,
5205) -> Result<StatementDesc, PlanError> {
5206 Ok(StatementDesc::new(None))
5207}
5208
5209pub fn plan_create_secret(
5210 scx: &StatementContext,
5211 stmt: CreateSecretStatement<Aug>,
5212) -> Result<Plan, PlanError> {
5213 let CreateSecretStatement {
5214 name,
5215 if_not_exists,
5216 value,
5217 } = &stmt;
5218
5219 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5220 let mut create_sql_statement = stmt.clone();
5221 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5222 let create_sql =
5223 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5224 let secret_as = query::plan_secret_as(scx, value.clone())?;
5225
5226 let secret = Secret {
5227 create_sql,
5228 secret_as,
5229 };
5230
5231 Ok(Plan::CreateSecret(CreateSecretPlan {
5232 name,
5233 secret,
5234 if_not_exists: *if_not_exists,
5235 }))
5236}
5237
5238pub fn describe_create_connection(
5239 _: &StatementContext,
5240 _: CreateConnectionStatement<Aug>,
5241) -> Result<StatementDesc, PlanError> {
5242 Ok(StatementDesc::new(None))
5243}
5244
5245generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5246
5247pub fn plan_create_connection(
5248 scx: &StatementContext,
5249 mut stmt: CreateConnectionStatement<Aug>,
5250) -> Result<Plan, PlanError> {
5251 let CreateConnectionStatement {
5252 name,
5253 connection_type,
5254 values,
5255 if_not_exists,
5256 with_options,
5257 } = stmt.clone();
5258 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5259 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5260 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5261
5262 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5263 if options.validate.is_some() {
5264 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5265 }
5266 let validate = match options.validate {
5267 Some(val) => val,
5268 None => {
5269 scx.catalog
5270 .system_vars()
5271 .enable_default_connection_validation()
5272 && details.to_connection().validate_by_default()
5273 }
5274 };
5275
5276 let full_name = scx.catalog.resolve_full_name(&name);
5278 let partial_name = PartialItemName::from(full_name.clone());
5279 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5280 return Err(PlanError::ItemAlreadyExists {
5281 name: full_name.to_string(),
5282 item_type: item.item_type(),
5283 });
5284 }
5285
5286 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5289 stmt.values.retain(|v| {
5290 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5291 });
5292 stmt.values.push(ConnectionOption {
5293 name: ConnectionOptionName::PublicKey1,
5294 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5295 });
5296 stmt.values.push(ConnectionOption {
5297 name: ConnectionOptionName::PublicKey2,
5298 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5299 });
5300 }
5301 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5302
5303 let plan = CreateConnectionPlan {
5304 name,
5305 if_not_exists,
5306 connection: crate::plan::Connection {
5307 create_sql,
5308 details,
5309 },
5310 validate,
5311 };
5312 Ok(Plan::CreateConnection(plan))
5313}
5314
5315fn plan_drop_database(
5316 scx: &StatementContext,
5317 if_exists: bool,
5318 name: &UnresolvedDatabaseName,
5319 cascade: bool,
5320) -> Result<Option<DatabaseId>, PlanError> {
5321 Ok(match resolve_database(scx, name, if_exists)? {
5322 Some(database) => {
5323 if !cascade && database.has_schemas() {
5324 sql_bail!(
5325 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5326 name,
5327 );
5328 }
5329 Some(database.id())
5330 }
5331 None => None,
5332 })
5333}
5334
5335pub fn describe_drop_objects(
5336 _: &StatementContext,
5337 _: DropObjectsStatement,
5338) -> Result<StatementDesc, PlanError> {
5339 Ok(StatementDesc::new(None))
5340}
5341
5342pub fn plan_drop_objects(
5343 scx: &mut StatementContext,
5344 DropObjectsStatement {
5345 object_type,
5346 if_exists,
5347 names,
5348 cascade,
5349 }: DropObjectsStatement,
5350) -> Result<Plan, PlanError> {
5351 if object_type == mz_sql_parser::ast::ObjectType::Func {
5352 bail_unsupported!("DROP FUNCTION");
5353 }
5354 let object_type = object_type.into();
5355
5356 let mut referenced_ids = Vec::new();
5357 for name in names {
5358 let id = match &name {
5359 UnresolvedObjectName::Cluster(name) => {
5360 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5361 }
5362 UnresolvedObjectName::ClusterReplica(name) => {
5363 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5364 }
5365 UnresolvedObjectName::Database(name) => {
5366 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5367 }
5368 UnresolvedObjectName::Schema(name) => {
5369 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5370 }
5371 UnresolvedObjectName::Role(name) => {
5372 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5373 }
5374 UnresolvedObjectName::Item(name) => {
5375 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5376 .map(ObjectId::Item)
5377 }
5378 UnresolvedObjectName::NetworkPolicy(name) => {
5379 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5380 }
5381 };
5382 match id {
5383 Some(id) => referenced_ids.push(id),
5384 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5385 name: name.to_ast_string_simple(),
5386 object_type,
5387 }),
5388 }
5389 }
5390 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5391
5392 Ok(Plan::DropObjects(DropObjectsPlan {
5393 referenced_ids,
5394 drop_ids,
5395 object_type,
5396 }))
5397}
5398
5399fn plan_drop_schema(
5400 scx: &StatementContext,
5401 if_exists: bool,
5402 name: &UnresolvedSchemaName,
5403 cascade: bool,
5404) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5405 let normalized = normalize::unresolved_schema_name(name.clone())?;
5409 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5410 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5411 }
5412
5413 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5414 Some((database_spec, schema_spec)) => {
5415 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5416 sql_bail!(
5417 "cannot drop schema {name} because it is required by the database system",
5418 );
5419 }
5420 if let SchemaSpecifier::Temporary = schema_spec {
5421 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5422 }
5423 let schema = scx.get_schema(&database_spec, &schema_spec);
5424 if !cascade && schema.has_items() {
5425 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5426 sql_bail!(
5427 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5428 full_schema_name
5429 );
5430 }
5431 Some((database_spec, schema_spec))
5432 }
5433 None => None,
5434 })
5435}
5436
5437fn plan_drop_role(
5438 scx: &StatementContext,
5439 if_exists: bool,
5440 name: &Ident,
5441) -> Result<Option<RoleId>, PlanError> {
5442 match scx.catalog.resolve_role(name.as_str()) {
5443 Ok(role) => {
5444 let id = role.id();
5445 if &id == scx.catalog.active_role_id() {
5446 sql_bail!("current role cannot be dropped");
5447 }
5448 for role in scx.catalog.get_roles() {
5449 for (member_id, grantor_id) in role.membership() {
5450 if &id == grantor_id {
5451 let member_role = scx.catalog.get_role(member_id);
5452 sql_bail!(
5453 "cannot drop role {}: still depended up by membership of role {} in role {}",
5454 name.as_str(),
5455 role.name(),
5456 member_role.name()
5457 );
5458 }
5459 }
5460 }
5461 Ok(Some(role.id()))
5462 }
5463 Err(_) if if_exists => Ok(None),
5464 Err(e) => Err(e.into()),
5465 }
5466}
5467
5468fn plan_drop_cluster(
5469 scx: &StatementContext,
5470 if_exists: bool,
5471 name: &Ident,
5472 cascade: bool,
5473) -> Result<Option<ClusterId>, PlanError> {
5474 Ok(match resolve_cluster(scx, name, if_exists)? {
5475 Some(cluster) => {
5476 if !cascade && !cluster.bound_objects().is_empty() {
5477 return Err(PlanError::DependentObjectsStillExist {
5478 object_type: "cluster".to_string(),
5479 object_name: cluster.name().to_string(),
5480 dependents: Vec::new(),
5481 });
5482 }
5483 Some(cluster.id())
5484 }
5485 None => None,
5486 })
5487}
5488
5489fn plan_drop_network_policy(
5490 scx: &StatementContext,
5491 if_exists: bool,
5492 name: &Ident,
5493) -> Result<Option<NetworkPolicyId>, PlanError> {
5494 match scx.catalog.resolve_network_policy(name.as_str()) {
5495 Ok(policy) => {
5496 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5499 Err(PlanError::NetworkPolicyInUse)
5500 } else {
5501 Ok(Some(policy.id()))
5502 }
5503 }
5504 Err(_) if if_exists => Ok(None),
5505 Err(e) => Err(e.into()),
5506 }
5507}
5508
5509fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5512 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5514 false
5515 } else {
5516 cluster.bound_objects().iter().any(|id| {
5518 let item = scx.catalog.get_item(id);
5519 matches!(
5520 item.item_type(),
5521 CatalogItemType::Sink | CatalogItemType::Source
5522 )
5523 })
5524 }
5525}
5526
5527fn plan_drop_cluster_replica(
5528 scx: &StatementContext,
5529 if_exists: bool,
5530 name: &QualifiedReplica,
5531) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5532 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5533 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5534}
5535
5536fn plan_drop_item(
5538 scx: &StatementContext,
5539 object_type: ObjectType,
5540 if_exists: bool,
5541 name: UnresolvedItemName,
5542 cascade: bool,
5543) -> Result<Option<CatalogItemId>, PlanError> {
5544 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5545 Ok(r) => r,
5546 Err(PlanError::MismatchedObjectType {
5548 name,
5549 is_type: ObjectType::MaterializedView,
5550 expected_type: ObjectType::View,
5551 }) => {
5552 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5553 }
5554 e => e?,
5555 };
5556
5557 Ok(match resolved {
5558 Some(catalog_item) => {
5559 if catalog_item.id().is_system() {
5560 sql_bail!(
5561 "cannot drop {} {} because it is required by the database system",
5562 catalog_item.item_type(),
5563 scx.catalog.minimal_qualification(catalog_item.name()),
5564 );
5565 }
5566
5567 if !cascade {
5568 for id in catalog_item.used_by() {
5569 let dep = scx.catalog.get_item(id);
5570 if dependency_prevents_drop(object_type, dep) {
5571 return Err(PlanError::DependentObjectsStillExist {
5572 object_type: catalog_item.item_type().to_string(),
5573 object_name: scx
5574 .catalog
5575 .minimal_qualification(catalog_item.name())
5576 .to_string(),
5577 dependents: vec![(
5578 dep.item_type().to_string(),
5579 scx.catalog.minimal_qualification(dep.name()).to_string(),
5580 )],
5581 });
5582 }
5583 }
5584 }
5587 Some(catalog_item.id())
5588 }
5589 None => None,
5590 })
5591}
5592
5593fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5595 match object_type {
5596 ObjectType::Type => true,
5597 ObjectType::Table
5598 | ObjectType::View
5599 | ObjectType::MaterializedView
5600 | ObjectType::Source
5601 | ObjectType::Sink
5602 | ObjectType::Index
5603 | ObjectType::Role
5604 | ObjectType::Cluster
5605 | ObjectType::ClusterReplica
5606 | ObjectType::Secret
5607 | ObjectType::Connection
5608 | ObjectType::Database
5609 | ObjectType::Schema
5610 | ObjectType::Func
5611 | ObjectType::NetworkPolicy => match dep.item_type() {
5612 CatalogItemType::Func
5613 | CatalogItemType::Table
5614 | CatalogItemType::Source
5615 | CatalogItemType::View
5616 | CatalogItemType::MaterializedView
5617 | CatalogItemType::Sink
5618 | CatalogItemType::Type
5619 | CatalogItemType::Secret
5620 | CatalogItemType::Connection => true,
5621 CatalogItemType::Index => false,
5622 },
5623 }
5624}
5625
5626pub fn describe_alter_index_options(
5627 _: &StatementContext,
5628 _: AlterIndexStatement<Aug>,
5629) -> Result<StatementDesc, PlanError> {
5630 Ok(StatementDesc::new(None))
5631}
5632
5633pub fn describe_drop_owned(
5634 _: &StatementContext,
5635 _: DropOwnedStatement<Aug>,
5636) -> Result<StatementDesc, PlanError> {
5637 Ok(StatementDesc::new(None))
5638}
5639
5640pub fn plan_drop_owned(
5641 scx: &StatementContext,
5642 drop: DropOwnedStatement<Aug>,
5643) -> Result<Plan, PlanError> {
5644 let cascade = drop.cascade();
5645 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5646 let mut drop_ids = Vec::new();
5647 let mut privilege_revokes = Vec::new();
5648 let mut default_privilege_revokes = Vec::new();
5649
5650 fn update_privilege_revokes(
5651 object_id: SystemObjectId,
5652 privileges: &PrivilegeMap,
5653 role_ids: &BTreeSet<RoleId>,
5654 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5655 ) {
5656 privilege_revokes.extend(iter::zip(
5657 iter::repeat(object_id),
5658 privileges
5659 .all_values()
5660 .filter(|privilege| role_ids.contains(&privilege.grantee))
5661 .cloned(),
5662 ));
5663 }
5664
5665 for replica in scx.catalog.get_cluster_replicas() {
5667 if role_ids.contains(&replica.owner_id()) {
5668 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5669 }
5670 }
5671
5672 for cluster in scx.catalog.get_clusters() {
5674 if role_ids.contains(&cluster.owner_id()) {
5675 if !cascade {
5677 let non_owned_bound_objects: Vec<_> = cluster
5678 .bound_objects()
5679 .into_iter()
5680 .map(|item_id| scx.catalog.get_item(item_id))
5681 .filter(|item| !role_ids.contains(&item.owner_id()))
5682 .collect();
5683 if !non_owned_bound_objects.is_empty() {
5684 let names: Vec<_> = non_owned_bound_objects
5685 .into_iter()
5686 .map(|item| {
5687 (
5688 item.item_type().to_string(),
5689 scx.catalog.resolve_full_name(item.name()).to_string(),
5690 )
5691 })
5692 .collect();
5693 return Err(PlanError::DependentObjectsStillExist {
5694 object_type: "cluster".to_string(),
5695 object_name: cluster.name().to_string(),
5696 dependents: names,
5697 });
5698 }
5699 }
5700 drop_ids.push(cluster.id().into());
5701 }
5702 update_privilege_revokes(
5703 SystemObjectId::Object(cluster.id().into()),
5704 cluster.privileges(),
5705 &role_ids,
5706 &mut privilege_revokes,
5707 );
5708 }
5709
5710 for item in scx.catalog.get_items() {
5712 if role_ids.contains(&item.owner_id()) {
5713 if !cascade {
5714 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5716 let non_owned_dependencies: Vec<_> = used_by
5717 .into_iter()
5718 .map(|item_id| scx.catalog.get_item(item_id))
5719 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5720 .filter(|item| !role_ids.contains(&item.owner_id()))
5721 .collect();
5722 if !non_owned_dependencies.is_empty() {
5723 let names: Vec<_> = non_owned_dependencies
5724 .into_iter()
5725 .map(|item| {
5726 let item_typ = item.item_type().to_string();
5727 let item_name =
5728 scx.catalog.resolve_full_name(item.name()).to_string();
5729 (item_typ, item_name)
5730 })
5731 .collect();
5732 Err(PlanError::DependentObjectsStillExist {
5733 object_type: item.item_type().to_string(),
5734 object_name: scx
5735 .catalog
5736 .resolve_full_name(item.name())
5737 .to_string()
5738 .to_string(),
5739 dependents: names,
5740 })
5741 } else {
5742 Ok(())
5743 }
5744 };
5745
5746 if let Some(id) = item.progress_id() {
5749 let progress_item = scx.catalog.get_item(&id);
5750 check_if_dependents_exist(progress_item.used_by())?;
5751 }
5752 check_if_dependents_exist(item.used_by())?;
5753 }
5754 drop_ids.push(item.id().into());
5755 }
5756 update_privilege_revokes(
5757 SystemObjectId::Object(item.id().into()),
5758 item.privileges(),
5759 &role_ids,
5760 &mut privilege_revokes,
5761 );
5762 }
5763
5764 for schema in scx.catalog.get_schemas() {
5766 if !schema.id().is_temporary() {
5767 if role_ids.contains(&schema.owner_id()) {
5768 if !cascade {
5769 let non_owned_dependencies: Vec<_> = schema
5770 .item_ids()
5771 .map(|item_id| scx.catalog.get_item(&item_id))
5772 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5773 .filter(|item| !role_ids.contains(&item.owner_id()))
5774 .collect();
5775 if !non_owned_dependencies.is_empty() {
5776 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5777 sql_bail!(
5778 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5779 full_schema_name.to_string().quoted()
5780 );
5781 }
5782 }
5783 drop_ids.push((*schema.database(), *schema.id()).into())
5784 }
5785 update_privilege_revokes(
5786 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5787 schema.privileges(),
5788 &role_ids,
5789 &mut privilege_revokes,
5790 );
5791 }
5792 }
5793
5794 for database in scx.catalog.get_databases() {
5796 if role_ids.contains(&database.owner_id()) {
5797 if !cascade {
5798 let non_owned_schemas: Vec<_> = database
5799 .schemas()
5800 .into_iter()
5801 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5802 .collect();
5803 if !non_owned_schemas.is_empty() {
5804 sql_bail!(
5805 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5806 database.name().quoted(),
5807 );
5808 }
5809 }
5810 drop_ids.push(database.id().into());
5811 }
5812 update_privilege_revokes(
5813 SystemObjectId::Object(database.id().into()),
5814 database.privileges(),
5815 &role_ids,
5816 &mut privilege_revokes,
5817 );
5818 }
5819
5820 for network_policy in scx.catalog.get_network_policies() {
5822 if role_ids.contains(&network_policy.owner_id()) {
5823 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5824 }
5825 update_privilege_revokes(
5826 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5827 network_policy.privileges(),
5828 &role_ids,
5829 &mut privilege_revokes,
5830 );
5831 }
5832
5833 update_privilege_revokes(
5835 SystemObjectId::System,
5836 scx.catalog.get_system_privileges(),
5837 &role_ids,
5838 &mut privilege_revokes,
5839 );
5840
5841 for (default_privilege_object, default_privilege_acl_items) in
5842 scx.catalog.get_default_privileges()
5843 {
5844 for default_privilege_acl_item in default_privilege_acl_items {
5845 if role_ids.contains(&default_privilege_object.role_id)
5846 || role_ids.contains(&default_privilege_acl_item.grantee)
5847 {
5848 default_privilege_revokes.push((
5849 default_privilege_object.clone(),
5850 default_privilege_acl_item.clone(),
5851 ));
5852 }
5853 }
5854 }
5855
5856 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5857
5858 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5859 if !system_ids.is_empty() {
5860 let mut owners = system_ids
5861 .into_iter()
5862 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5863 .collect::<BTreeSet<_>>()
5864 .into_iter()
5865 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5866 sql_bail!(
5867 "cannot drop objects owned by role {} because they are required by the database system",
5868 owners.join(", "),
5869 );
5870 }
5871
5872 Ok(Plan::DropOwned(DropOwnedPlan {
5873 role_ids: role_ids.into_iter().collect(),
5874 drop_ids,
5875 privilege_revokes,
5876 default_privilege_revokes,
5877 }))
5878}
5879
5880fn plan_retain_history_option(
5881 scx: &StatementContext,
5882 retain_history: Option<OptionalDuration>,
5883) -> Result<Option<CompactionWindow>, PlanError> {
5884 if let Some(OptionalDuration(lcw)) = retain_history {
5885 Ok(Some(plan_retain_history(scx, lcw)?))
5886 } else {
5887 Ok(None)
5888 }
5889}
5890
5891fn plan_retain_history(
5897 scx: &StatementContext,
5898 lcw: Option<Duration>,
5899) -> Result<CompactionWindow, PlanError> {
5900 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5901 match lcw {
5902 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5907 option_name: "RETAIN HISTORY".to_string(),
5908 err: Box::new(PlanError::Unstructured(
5909 "internal error: unexpectedly zero".to_string(),
5910 )),
5911 }),
5912 Some(duration) => {
5913 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5916 && scx
5917 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5918 .is_err()
5919 {
5920 return Err(PlanError::RetainHistoryLow {
5921 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5922 });
5923 }
5924 Ok(duration.try_into()?)
5925 }
5926 None => {
5929 if scx
5930 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5931 .is_err()
5932 {
5933 Err(PlanError::RetainHistoryRequired)
5934 } else {
5935 Ok(CompactionWindow::DisableCompaction)
5936 }
5937 }
5938 }
5939}
5940
5941generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5942
5943fn plan_index_options(
5944 scx: &StatementContext,
5945 with_opts: Vec<IndexOption<Aug>>,
5946) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5947 if !with_opts.is_empty() {
5948 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5950 }
5951
5952 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5953
5954 let mut out = Vec::with_capacity(1);
5955 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5956 out.push(crate::plan::IndexOption::RetainHistory(cw));
5957 }
5958 Ok(out)
5959}
5960
5961generate_extracted_config!(
5962 TableOption,
5963 (PartitionBy, Vec<Ident>),
5964 (RetainHistory, OptionalDuration),
5965 (RedactedTest, String)
5966);
5967
5968fn plan_table_options(
5969 scx: &StatementContext,
5970 desc: &RelationDesc,
5971 with_opts: Vec<TableOption<Aug>>,
5972) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5973 let TableOptionExtracted {
5974 partition_by,
5975 retain_history,
5976 redacted_test,
5977 ..
5978 }: TableOptionExtracted = with_opts.try_into()?;
5979
5980 if let Some(partition_by) = partition_by {
5981 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5982 check_partition_by(desc, partition_by)?;
5983 }
5984
5985 if redacted_test.is_some() {
5986 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5987 }
5988
5989 let mut out = Vec::with_capacity(1);
5990 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5991 out.push(crate::plan::TableOption::RetainHistory(cw));
5992 }
5993 Ok(out)
5994}
5995
5996pub fn plan_alter_index_options(
5997 scx: &mut StatementContext,
5998 AlterIndexStatement {
5999 index_name,
6000 if_exists,
6001 action,
6002 }: AlterIndexStatement<Aug>,
6003) -> Result<Plan, PlanError> {
6004 let object_type = ObjectType::Index;
6005 match action {
6006 AlterIndexAction::ResetOptions(options) => {
6007 let mut options = options.into_iter();
6008 if let Some(opt) = options.next() {
6009 match opt {
6010 IndexOptionName::RetainHistory => {
6011 if options.next().is_some() {
6012 sql_bail!("RETAIN HISTORY must be only option");
6013 }
6014 return alter_retain_history(
6015 scx,
6016 object_type,
6017 if_exists,
6018 UnresolvedObjectName::Item(index_name),
6019 None,
6020 );
6021 }
6022 }
6023 }
6024 sql_bail!("expected option");
6025 }
6026 AlterIndexAction::SetOptions(options) => {
6027 let mut options = options.into_iter();
6028 if let Some(opt) = options.next() {
6029 match opt.name {
6030 IndexOptionName::RetainHistory => {
6031 if options.next().is_some() {
6032 sql_bail!("RETAIN HISTORY must be only option");
6033 }
6034 return alter_retain_history(
6035 scx,
6036 object_type,
6037 if_exists,
6038 UnresolvedObjectName::Item(index_name),
6039 opt.value,
6040 );
6041 }
6042 }
6043 }
6044 sql_bail!("expected option");
6045 }
6046 }
6047}
6048
6049pub fn describe_alter_cluster_set_options(
6050 _: &StatementContext,
6051 _: AlterClusterStatement<Aug>,
6052) -> Result<StatementDesc, PlanError> {
6053 Ok(StatementDesc::new(None))
6054}
6055
6056pub fn plan_alter_cluster(
6057 scx: &mut StatementContext,
6058 AlterClusterStatement {
6059 name,
6060 action,
6061 if_exists,
6062 }: AlterClusterStatement<Aug>,
6063) -> Result<Plan, PlanError> {
6064 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6065 Some(entry) => entry,
6066 None => {
6067 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6068 name: name.to_ast_string_simple(),
6069 object_type: ObjectType::Cluster,
6070 });
6071
6072 return Ok(Plan::AlterNoop(AlterNoopPlan {
6073 object_type: ObjectType::Cluster,
6074 }));
6075 }
6076 };
6077
6078 let mut options: PlanClusterOption = Default::default();
6079 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6080
6081 match action {
6082 AlterClusterAction::SetOptions {
6083 options: set_options,
6084 with_options,
6085 } => {
6086 let ClusterOptionExtracted {
6087 availability_zones,
6088 introspection_debugging,
6089 introspection_interval,
6090 managed,
6091 replicas: replica_defs,
6092 replication_factor,
6093 seen: _,
6094 size,
6095 disk,
6096 schedule,
6097 workload_class,
6098 }: ClusterOptionExtracted = set_options.try_into()?;
6099
6100 if !scx.catalog.active_role_id().is_system() {
6101 if workload_class.is_some() {
6102 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6103 }
6104 }
6105
6106 match managed.unwrap_or_else(|| cluster.is_managed()) {
6107 true => {
6108 let alter_strategy_extracted =
6109 ClusterAlterOptionExtracted::try_from(with_options)?;
6110 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6111
6112 match alter_strategy {
6113 AlterClusterPlanStrategy::None => {}
6114 _ => {
6115 scx.require_feature_flag(
6116 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6117 )?;
6118 }
6119 }
6120
6121 if replica_defs.is_some() {
6122 sql_bail!("REPLICAS not supported for managed clusters");
6123 }
6124 if schedule.is_some()
6125 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6126 {
6127 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6128 }
6129
6130 if let Some(replication_factor) = replication_factor {
6131 if schedule.is_some()
6132 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6133 {
6134 sql_bail!(
6135 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6136 );
6137 }
6138 if let Some(current_schedule) = cluster.schedule() {
6139 if !matches!(current_schedule, ClusterSchedule::Manual) {
6140 sql_bail!(
6141 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6142 );
6143 }
6144 }
6145
6146 let internal_replica_count =
6147 cluster.replicas().iter().filter(|r| r.internal()).count();
6148 let hypothetical_replica_count =
6149 internal_replica_count + usize::cast_from(replication_factor);
6150
6151 if contains_single_replica_objects(scx, cluster)
6154 && hypothetical_replica_count > 1
6155 {
6156 return Err(PlanError::CreateReplicaFailStorageObjects {
6157 current_replica_count: cluster.replica_ids().iter().count(),
6158 internal_replica_count,
6159 hypothetical_replica_count,
6160 });
6161 }
6162 } else if alter_strategy.is_some() {
6163 let internal_replica_count =
6167 cluster.replicas().iter().filter(|r| r.internal()).count();
6168 let hypothetical_replica_count = internal_replica_count * 2;
6169 if contains_single_replica_objects(scx, cluster) {
6170 return Err(PlanError::CreateReplicaFailStorageObjects {
6171 current_replica_count: cluster.replica_ids().iter().count(),
6172 internal_replica_count,
6173 hypothetical_replica_count,
6174 });
6175 }
6176 }
6177 }
6178 false => {
6179 if !alter_strategy.is_none() {
6180 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6181 }
6182 if availability_zones.is_some() {
6183 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6184 }
6185 if replication_factor.is_some() {
6186 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6187 }
6188 if introspection_debugging.is_some() {
6189 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6190 }
6191 if introspection_interval.is_some() {
6192 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6193 }
6194 if size.is_some() {
6195 sql_bail!("SIZE not supported for unmanaged clusters");
6196 }
6197 if disk.is_some() {
6198 sql_bail!("DISK not supported for unmanaged clusters");
6199 }
6200 if schedule.is_some()
6201 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6202 {
6203 sql_bail!(
6204 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6205 );
6206 }
6207 if let Some(current_schedule) = cluster.schedule() {
6208 if !matches!(current_schedule, ClusterSchedule::Manual)
6209 && schedule.is_none()
6210 {
6211 sql_bail!(
6212 "when switching a cluster to unmanaged, if the managed \
6213 cluster's SCHEDULE is anything other than MANUAL, you have to \
6214 explicitly set the SCHEDULE to MANUAL"
6215 );
6216 }
6217 }
6218 }
6219 }
6220
6221 let mut replicas = vec![];
6222 for ReplicaDefinition { name, options } in
6223 replica_defs.into_iter().flat_map(Vec::into_iter)
6224 {
6225 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6226 }
6227
6228 if let Some(managed) = managed {
6229 options.managed = AlterOptionParameter::Set(managed);
6230 }
6231 if let Some(replication_factor) = replication_factor {
6232 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6233 }
6234 if let Some(size) = &size {
6235 options.size = AlterOptionParameter::Set(size.clone());
6236 }
6237 if let Some(availability_zones) = availability_zones {
6238 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6239 }
6240 if let Some(introspection_debugging) = introspection_debugging {
6241 options.introspection_debugging =
6242 AlterOptionParameter::Set(introspection_debugging);
6243 }
6244 if let Some(introspection_interval) = introspection_interval {
6245 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6246 }
6247 if disk.is_some() {
6248 let size = match size.as_deref() {
6252 Some(s) => s,
6253 None => cluster
6254 .managed_size()
6255 .ok_or_else(|| sql_err!("cluster is not managed"))?,
6256 };
6257 if scx.catalog.is_cluster_size_cc(size) {
6258 sql_bail!(
6259 "DISK option not supported for modern cluster sizes because disk is always enabled"
6260 );
6261 }
6262
6263 scx.catalog
6264 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6265 }
6266 if !replicas.is_empty() {
6267 options.replicas = AlterOptionParameter::Set(replicas);
6268 }
6269 if let Some(schedule) = schedule {
6270 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6271 }
6272 if let Some(workload_class) = workload_class {
6273 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6274 }
6275 }
6276 AlterClusterAction::ResetOptions(reset_options) => {
6277 use AlterOptionParameter::Reset;
6278 use ClusterOptionName::*;
6279
6280 if !scx.catalog.active_role_id().is_system() {
6281 if reset_options.contains(&WorkloadClass) {
6282 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6283 }
6284 }
6285
6286 for option in reset_options {
6287 match option {
6288 AvailabilityZones => options.availability_zones = Reset,
6289 Disk => scx
6290 .catalog
6291 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6292 IntrospectionInterval => options.introspection_interval = Reset,
6293 IntrospectionDebugging => options.introspection_debugging = Reset,
6294 Managed => options.managed = Reset,
6295 Replicas => options.replicas = Reset,
6296 ReplicationFactor => options.replication_factor = Reset,
6297 Size => options.size = Reset,
6298 Schedule => options.schedule = Reset,
6299 WorkloadClass => options.workload_class = Reset,
6300 }
6301 }
6302 }
6303 }
6304 Ok(Plan::AlterCluster(AlterClusterPlan {
6305 id: cluster.id(),
6306 name: cluster.name().to_string(),
6307 options,
6308 strategy: alter_strategy,
6309 }))
6310}
6311
6312pub fn describe_alter_set_cluster(
6313 _: &StatementContext,
6314 _: AlterSetClusterStatement<Aug>,
6315) -> Result<StatementDesc, PlanError> {
6316 Ok(StatementDesc::new(None))
6317}
6318
6319pub fn plan_alter_item_set_cluster(
6320 scx: &StatementContext,
6321 AlterSetClusterStatement {
6322 if_exists,
6323 set_cluster: in_cluster_name,
6324 name,
6325 object_type,
6326 }: AlterSetClusterStatement<Aug>,
6327) -> Result<Plan, PlanError> {
6328 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6329
6330 let object_type = object_type.into();
6331
6332 match object_type {
6334 ObjectType::MaterializedView => {}
6335 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6336 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6337 }
6338 ObjectType::Table
6339 | ObjectType::View
6340 | ObjectType::Type
6341 | ObjectType::Role
6342 | ObjectType::Cluster
6343 | ObjectType::ClusterReplica
6344 | ObjectType::Secret
6345 | ObjectType::Connection
6346 | ObjectType::Database
6347 | ObjectType::Schema
6348 | ObjectType::Func
6349 | ObjectType::NetworkPolicy => {
6350 bail_never_supported!(
6351 format!("ALTER {object_type} SET CLUSTER"),
6352 "sql/alter-set-cluster/",
6353 format!("{object_type} has no associated cluster")
6354 )
6355 }
6356 }
6357
6358 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6359
6360 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6361 Some(entry) => {
6362 let current_cluster = entry.cluster_id();
6363 let Some(current_cluster) = current_cluster else {
6364 sql_bail!("No cluster associated with {name}");
6365 };
6366
6367 if current_cluster == in_cluster.id() {
6368 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6369 } else {
6370 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6371 id: entry.id(),
6372 set_cluster: in_cluster.id(),
6373 }))
6374 }
6375 }
6376 None => {
6377 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6378 name: name.to_ast_string_simple(),
6379 object_type,
6380 });
6381
6382 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6383 }
6384 }
6385}
6386
6387pub fn describe_alter_object_rename(
6388 _: &StatementContext,
6389 _: AlterObjectRenameStatement,
6390) -> Result<StatementDesc, PlanError> {
6391 Ok(StatementDesc::new(None))
6392}
6393
6394pub fn plan_alter_object_rename(
6395 scx: &mut StatementContext,
6396 AlterObjectRenameStatement {
6397 name,
6398 object_type,
6399 to_item_name,
6400 if_exists,
6401 }: AlterObjectRenameStatement,
6402) -> Result<Plan, PlanError> {
6403 let object_type = object_type.into();
6404 match (object_type, name) {
6405 (
6406 ObjectType::View
6407 | ObjectType::MaterializedView
6408 | ObjectType::Table
6409 | ObjectType::Source
6410 | ObjectType::Index
6411 | ObjectType::Sink
6412 | ObjectType::Secret
6413 | ObjectType::Connection,
6414 UnresolvedObjectName::Item(name),
6415 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6416 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6417 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6418 }
6419 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6420 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6421 }
6422 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6423 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6424 }
6425 (object_type, name) => {
6426 bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6429 }
6430 }
6431}
6432
6433pub fn plan_alter_schema_rename(
6434 scx: &mut StatementContext,
6435 name: UnresolvedSchemaName,
6436 to_schema_name: Ident,
6437 if_exists: bool,
6438) -> Result<Plan, PlanError> {
6439 let normalized = normalize::unresolved_schema_name(name.clone())?;
6443 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6444 sql_bail!(
6445 "cannot rename schemas in the ambient database: {:?}",
6446 mz_repr::namespaces::MZ_TEMP_SCHEMA
6447 );
6448 }
6449
6450 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6451 let object_type = ObjectType::Schema;
6452 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6453 name: name.to_ast_string_simple(),
6454 object_type,
6455 });
6456 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6457 };
6458
6459 if scx
6461 .resolve_schema_in_database(&db_spec, &to_schema_name)
6462 .is_ok()
6463 {
6464 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6465 to_schema_name.clone().into_string(),
6466 )));
6467 }
6468
6469 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6471 if schema.id().is_system() {
6472 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6473 }
6474
6475 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6476 cur_schema_spec: (db_spec, schema_spec),
6477 new_schema_name: to_schema_name.into_string(),
6478 }))
6479}
6480
6481pub fn plan_alter_schema_swap<F>(
6482 scx: &mut StatementContext,
6483 name_a: UnresolvedSchemaName,
6484 name_b: Ident,
6485 if_exists: bool,
6486 gen_temp_suffix: F,
6487) -> Result<Plan, PlanError>
6488where
6489 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6490{
6491 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6495 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6496 {
6497 sql_bail!("cannot swap schemas that are in the ambient database");
6498 }
6499 let name_b_str = normalize::ident_ref(&name_b);
6501 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6502 sql_bail!("cannot swap schemas that are in the ambient database");
6503 }
6504
6505 let schema_a = match scx.resolve_schema(name_a.clone()) {
6506 Ok(schema) => schema,
6507 Err(_) if if_exists => {
6508 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6509 name: name_a.to_ast_string_simple(),
6510 object_type: ObjectType::Schema,
6511 });
6512 return Ok(Plan::AlterNoop(AlterNoopPlan {
6513 object_type: ObjectType::Schema,
6514 }));
6515 }
6516 Err(e) => return Err(e),
6517 };
6518
6519 let db_spec = schema_a.database().clone();
6520 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6521 sql_bail!("cannot swap schemas that are in the ambient database");
6522 };
6523 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6524
6525 if schema_a.id().is_system() || schema_b.id().is_system() {
6527 bail_never_supported!("swapping a system schema".to_string())
6528 }
6529
6530 const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6534 let check = |temp_suffix: &str| {
6535 let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6536 temp_name.append_lossy(temp_suffix);
6537 scx.resolve_schema_in_database(&db_spec, &temp_name)
6538 .is_err()
6539 };
6540 let temp_suffix = gen_temp_suffix(&check)?;
6541 let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6542
6543 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6544 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6545 schema_a_name: schema_a.name().schema.to_string(),
6546 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6547 schema_b_name: schema_b.name().schema.to_string(),
6548 name_temp,
6549 }))
6550}
6551
6552pub fn plan_alter_item_rename(
6553 scx: &mut StatementContext,
6554 object_type: ObjectType,
6555 name: UnresolvedItemName,
6556 to_item_name: Ident,
6557 if_exists: bool,
6558) -> Result<Plan, PlanError> {
6559 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6560 Ok(r) => r,
6561 Err(PlanError::MismatchedObjectType {
6563 name,
6564 is_type: ObjectType::MaterializedView,
6565 expected_type: ObjectType::View,
6566 }) => {
6567 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6568 }
6569 e => e?,
6570 };
6571
6572 match resolved {
6573 Some(entry) => {
6574 let full_name = scx.catalog.resolve_full_name(entry.name());
6575 let item_type = entry.item_type();
6576
6577 let proposed_name = QualifiedItemName {
6578 qualifiers: entry.name().qualifiers.clone(),
6579 item: to_item_name.clone().into_string(),
6580 };
6581
6582 let conflicting_type_exists;
6586 let conflicting_item_exists;
6587 if item_type == CatalogItemType::Type {
6588 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6589 conflicting_item_exists = scx
6590 .catalog
6591 .get_item_by_name(&proposed_name)
6592 .map(|item| item.item_type().conflicts_with_type())
6593 .unwrap_or(false);
6594 } else {
6595 conflicting_type_exists = item_type.conflicts_with_type()
6596 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6597 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6598 };
6599 if conflicting_type_exists || conflicting_item_exists {
6600 sql_bail!("catalog item '{}' already exists", to_item_name);
6601 }
6602
6603 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6604 id: entry.id(),
6605 current_full_name: full_name,
6606 to_name: normalize::ident(to_item_name),
6607 object_type,
6608 }))
6609 }
6610 None => {
6611 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6612 name: name.to_ast_string_simple(),
6613 object_type,
6614 });
6615
6616 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6617 }
6618 }
6619}
6620
6621pub fn plan_alter_cluster_rename(
6622 scx: &mut StatementContext,
6623 object_type: ObjectType,
6624 name: Ident,
6625 to_name: Ident,
6626 if_exists: bool,
6627) -> Result<Plan, PlanError> {
6628 match resolve_cluster(scx, &name, if_exists)? {
6629 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6630 id: entry.id(),
6631 name: entry.name().to_string(),
6632 to_name: ident(to_name),
6633 })),
6634 None => {
6635 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6636 name: name.to_ast_string_simple(),
6637 object_type,
6638 });
6639
6640 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6641 }
6642 }
6643}
6644
6645pub fn plan_alter_cluster_swap<F>(
6646 scx: &mut StatementContext,
6647 name_a: Ident,
6648 name_b: Ident,
6649 if_exists: bool,
6650 gen_temp_suffix: F,
6651) -> Result<Plan, PlanError>
6652where
6653 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6654{
6655 let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6656 Ok(cluster) => cluster,
6657 Err(_) if if_exists => {
6658 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6659 name: name_a.to_ast_string_simple(),
6660 object_type: ObjectType::Cluster,
6661 });
6662 return Ok(Plan::AlterNoop(AlterNoopPlan {
6663 object_type: ObjectType::Cluster,
6664 }));
6665 }
6666 Err(e) => return Err(e),
6667 };
6668 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6669
6670 const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6671 let check = |temp_suffix: &str| {
6672 let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6673 temp_name.append_lossy(temp_suffix);
6674 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6675 Err(CatalogError::UnknownCluster(_)) => true,
6677 Ok(_) | Err(_) => false,
6679 }
6680 };
6681 let temp_suffix = gen_temp_suffix(&check)?;
6682 let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6683
6684 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6685 id_a: cluster_a.id(),
6686 id_b: cluster_b.id(),
6687 name_a: name_a.into_string(),
6688 name_b: name_b.into_string(),
6689 name_temp,
6690 }))
6691}
6692
6693pub fn plan_alter_cluster_replica_rename(
6694 scx: &mut StatementContext,
6695 object_type: ObjectType,
6696 name: QualifiedReplica,
6697 to_item_name: Ident,
6698 if_exists: bool,
6699) -> Result<Plan, PlanError> {
6700 match resolve_cluster_replica(scx, &name, if_exists)? {
6701 Some((cluster, replica)) => {
6702 ensure_cluster_is_not_managed(scx, cluster.id())?;
6703 Ok(Plan::AlterClusterReplicaRename(
6704 AlterClusterReplicaRenamePlan {
6705 cluster_id: cluster.id(),
6706 replica_id: replica,
6707 name: QualifiedReplica {
6708 cluster: Ident::new(cluster.name())?,
6709 replica: name.replica,
6710 },
6711 to_name: normalize::ident(to_item_name),
6712 },
6713 ))
6714 }
6715 None => {
6716 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6717 name: name.to_ast_string_simple(),
6718 object_type,
6719 });
6720
6721 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6722 }
6723 }
6724}
6725
6726pub fn describe_alter_object_swap(
6727 _: &StatementContext,
6728 _: AlterObjectSwapStatement,
6729) -> Result<StatementDesc, PlanError> {
6730 Ok(StatementDesc::new(None))
6731}
6732
6733pub fn plan_alter_object_swap(
6734 scx: &mut StatementContext,
6735 stmt: AlterObjectSwapStatement,
6736) -> Result<Plan, PlanError> {
6737 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6738
6739 let AlterObjectSwapStatement {
6740 object_type,
6741 if_exists,
6742 name_a,
6743 name_b,
6744 } = stmt;
6745 let object_type = object_type.into();
6746
6747 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6749 let mut attempts = 0;
6750 let name_temp = loop {
6751 attempts += 1;
6752 if attempts > 10 {
6753 tracing::warn!("Unable to generate temp id for swapping");
6754 sql_bail!("unable to swap!");
6755 }
6756
6757 let short_id = mz_ore::id_gen::temp_id();
6759 if check_fn(&short_id) {
6760 break short_id;
6761 }
6762 };
6763
6764 Ok(name_temp)
6765 };
6766
6767 match (object_type, name_a, name_b) {
6768 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6769 plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6770 }
6771 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6772 plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6773 }
6774 (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6775 bail_internal!("name type does not match object type for ALTER SWAP")
6776 }
6777 (
6778 ObjectType::Table
6779 | ObjectType::View
6780 | ObjectType::MaterializedView
6781 | ObjectType::Source
6782 | ObjectType::Sink
6783 | ObjectType::Index
6784 | ObjectType::Type
6785 | ObjectType::Role
6786 | ObjectType::ClusterReplica
6787 | ObjectType::Secret
6788 | ObjectType::Connection
6789 | ObjectType::Database
6790 | ObjectType::Func
6791 | ObjectType::NetworkPolicy,
6792 _,
6793 _,
6794 ) => Err(PlanError::Unsupported {
6795 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6796 discussion_no: None,
6797 }),
6798 }
6799}
6800
6801pub fn describe_alter_retain_history(
6802 _: &StatementContext,
6803 _: AlterRetainHistoryStatement<Aug>,
6804) -> Result<StatementDesc, PlanError> {
6805 Ok(StatementDesc::new(None))
6806}
6807
6808pub fn plan_alter_retain_history(
6809 scx: &StatementContext,
6810 AlterRetainHistoryStatement {
6811 object_type,
6812 if_exists,
6813 name,
6814 history,
6815 }: AlterRetainHistoryStatement<Aug>,
6816) -> Result<Plan, PlanError> {
6817 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6818}
6819
6820fn alter_retain_history(
6821 scx: &StatementContext,
6822 object_type: ObjectType,
6823 if_exists: bool,
6824 name: UnresolvedObjectName,
6825 history: Option<WithOptionValue<Aug>>,
6826) -> Result<Plan, PlanError> {
6827 let name = match (object_type, name) {
6828 (
6829 ObjectType::View
6831 | ObjectType::MaterializedView
6832 | ObjectType::Table
6833 | ObjectType::Source
6834 | ObjectType::Index,
6835 UnresolvedObjectName::Item(name),
6836 ) => name,
6837 (object_type, _) => {
6838 bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6839 }
6840 };
6841 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6842 Some(entry) => {
6843 let full_name = scx.catalog.resolve_full_name(entry.name());
6844 let item_type = entry.item_type();
6845
6846 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6848 return Err(PlanError::AlterViewOnMaterializedView(
6849 full_name.to_string(),
6850 ));
6851 } else if object_type == ObjectType::View {
6852 sql_bail!("{object_type} does not support RETAIN HISTORY")
6853 } else if object_type != item_type {
6854 sql_bail!(
6855 "\"{}\" is a {} not a {}",
6856 full_name,
6857 entry.item_type(),
6858 format!("{object_type}").to_lowercase()
6859 )
6860 }
6861
6862 let (value, lcw) = match &history {
6864 Some(WithOptionValue::RetainHistoryFor(value)) => {
6865 let window = OptionalDuration::try_from_value(value.clone())?;
6866 (Some(value.clone()), window.0)
6867 }
6868 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6870 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6871 };
6872 let window = plan_retain_history(scx, lcw)?;
6873
6874 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6875 id: entry.id(),
6876 value,
6877 window,
6878 object_type,
6879 }))
6880 }
6881 None => {
6882 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6883 name: name.to_ast_string_simple(),
6884 object_type,
6885 });
6886
6887 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6888 }
6889 }
6890}
6891
6892fn alter_source_timestamp_interval(
6893 scx: &StatementContext,
6894 if_exists: bool,
6895 source_name: UnresolvedItemName,
6896 value: Option<WithOptionValue<Aug>>,
6897) -> Result<Plan, PlanError> {
6898 let object_type = ObjectType::Source;
6899 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6900 Some(entry) => {
6901 let full_name = scx.catalog.resolve_full_name(entry.name());
6902 if entry.item_type() != CatalogItemType::Source {
6903 sql_bail!(
6904 "\"{}\" is a {} not a {}",
6905 full_name,
6906 entry.item_type(),
6907 format!("{object_type}").to_lowercase()
6908 )
6909 }
6910
6911 match value {
6912 Some(val) => {
6913 let val = match val {
6914 WithOptionValue::Value(v) => v,
6915 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
6916 };
6917 let duration = Duration::try_from_value(val.clone())?;
6918
6919 let min = scx.catalog.system_vars().min_timestamp_interval();
6920 let max = scx.catalog.system_vars().max_timestamp_interval();
6921 if duration < min || duration > max {
6922 return Err(PlanError::InvalidTimestampInterval {
6923 min,
6924 max,
6925 requested: duration,
6926 });
6927 }
6928
6929 Ok(Plan::AlterSourceTimestampInterval(
6930 AlterSourceTimestampIntervalPlan {
6931 id: entry.id(),
6932 value: Some(val),
6933 interval: duration,
6934 },
6935 ))
6936 }
6937 None => {
6938 let interval = scx.catalog.system_vars().default_timestamp_interval();
6939 Ok(Plan::AlterSourceTimestampInterval(
6940 AlterSourceTimestampIntervalPlan {
6941 id: entry.id(),
6942 value: None,
6943 interval,
6944 },
6945 ))
6946 }
6947 }
6948 }
6949 None => {
6950 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6951 name: source_name.to_ast_string_simple(),
6952 object_type,
6953 });
6954
6955 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6956 }
6957 }
6958}
6959
6960pub fn describe_alter_secret_options(
6961 _: &StatementContext,
6962 _: AlterSecretStatement<Aug>,
6963) -> Result<StatementDesc, PlanError> {
6964 Ok(StatementDesc::new(None))
6965}
6966
6967pub fn plan_alter_secret(
6968 scx: &mut StatementContext,
6969 stmt: AlterSecretStatement<Aug>,
6970) -> Result<Plan, PlanError> {
6971 let AlterSecretStatement {
6972 name,
6973 if_exists,
6974 value,
6975 } = stmt;
6976 let object_type = ObjectType::Secret;
6977 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6978 Some(entry) => entry.id(),
6979 None => {
6980 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6981 name: name.to_string(),
6982 object_type,
6983 });
6984
6985 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6986 }
6987 };
6988
6989 let secret_as = query::plan_secret_as(scx, value)?;
6990
6991 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6992}
6993
6994pub fn describe_alter_connection(
6995 _: &StatementContext,
6996 _: AlterConnectionStatement<Aug>,
6997) -> Result<StatementDesc, PlanError> {
6998 Ok(StatementDesc::new(None))
6999}
7000
7001generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7002
7003pub fn plan_alter_connection(
7004 scx: &StatementContext,
7005 stmt: AlterConnectionStatement<Aug>,
7006) -> Result<Plan, PlanError> {
7007 let AlterConnectionStatement {
7008 name,
7009 if_exists,
7010 actions,
7011 with_options,
7012 } = stmt;
7013 let conn_name = normalize::unresolved_item_name(name)?;
7014 let entry = match scx.catalog.resolve_item(&conn_name) {
7015 Ok(entry) => entry,
7016 Err(_) if if_exists => {
7017 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7018 name: conn_name.to_string(),
7019 object_type: ObjectType::Sink,
7020 });
7021
7022 return Ok(Plan::AlterNoop(AlterNoopPlan {
7023 object_type: ObjectType::Connection,
7024 }));
7025 }
7026 Err(e) => return Err(e.into()),
7027 };
7028
7029 let connection = entry.connection()?;
7030
7031 if actions
7032 .iter()
7033 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7034 {
7035 if actions.len() > 1 {
7036 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7037 }
7038
7039 if !with_options.is_empty() {
7040 sql_bail!(
7041 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7042 with_options
7043 .iter()
7044 .map(|o| o.to_ast_string_simple())
7045 .join(", ")
7046 );
7047 }
7048
7049 if !matches!(connection, Connection::Ssh(_)) {
7050 sql_bail!(
7051 "{} is not an SSH connection",
7052 scx.catalog.resolve_full_name(entry.name())
7053 )
7054 }
7055
7056 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7057 id: entry.id(),
7058 action: crate::plan::AlterConnectionAction::RotateKeys,
7059 }));
7060 }
7061
7062 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7063 if options.validate.is_some() {
7064 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7065 }
7066
7067 let validate = match options.validate {
7068 Some(val) => val,
7069 None => {
7070 scx.catalog
7071 .system_vars()
7072 .enable_default_connection_validation()
7073 && connection.validate_by_default()
7074 }
7075 };
7076
7077 let connection_type = match connection {
7078 Connection::Aws(_) => CreateConnectionType::Aws,
7079 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7080 Connection::Kafka(_) => CreateConnectionType::Kafka,
7081 Connection::Csr(_) => CreateConnectionType::Csr,
7082 Connection::Postgres(_) => CreateConnectionType::Postgres,
7083 Connection::Ssh(_) => CreateConnectionType::Ssh,
7084 Connection::MySql(_) => CreateConnectionType::MySql,
7085 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7086 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7087 };
7088
7089 let specified_options: BTreeSet<_> = actions
7091 .iter()
7092 .map(|action: &AlterConnectionAction<Aug>| match action {
7093 AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7094 AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7095 AlterConnectionAction::RotateKeys => {
7096 Err(internal_err!("RotateKeys is handled separately above"))
7097 }
7098 })
7099 .collect::<Result<_, PlanError>>()?;
7100
7101 for invalid in INALTERABLE_OPTIONS {
7102 if specified_options.contains(invalid) {
7103 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7104 }
7105 }
7106
7107 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7108
7109 let mut set_options_vec: Vec<_> = Vec::new();
7111 let mut drop_options: BTreeSet<_> = BTreeSet::new();
7112 for action in actions {
7113 match action {
7114 AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7115 AlterConnectionAction::DropOption(name) => {
7116 drop_options.insert(name);
7117 }
7118 AlterConnectionAction::RotateKeys => {
7119 bail_internal!("RotateKeys is handled separately above")
7120 }
7121 }
7122 }
7123
7124 let set_options: BTreeMap<_, _> = set_options_vec
7125 .clone()
7126 .into_iter()
7127 .map(|option| (option.name, option.value))
7128 .collect();
7129
7130 let connection_options_extracted =
7134 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7135
7136 let duplicates: Vec<_> = connection_options_extracted
7137 .seen
7138 .intersection(&drop_options)
7139 .collect();
7140
7141 if !duplicates.is_empty() {
7142 sql_bail!(
7143 "cannot both SET and DROP/RESET options {}",
7144 duplicates
7145 .iter()
7146 .map(|option| option.to_string())
7147 .join(", ")
7148 )
7149 }
7150
7151 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7152 let set_options_count = mutually_exclusive_options
7153 .iter()
7154 .filter(|o| set_options.contains_key(o))
7155 .count();
7156 let drop_options_count = mutually_exclusive_options
7157 .iter()
7158 .filter(|o| drop_options.contains(o))
7159 .count();
7160
7161 if set_options_count > 0 && drop_options_count > 0 {
7163 sql_bail!(
7164 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7165 connection_type,
7166 mutually_exclusive_options
7167 .iter()
7168 .map(|option| option.to_string())
7169 .join(", ")
7170 )
7171 }
7172
7173 if set_options_count > 0 || drop_options_count > 0 {
7178 drop_options.extend(mutually_exclusive_options.iter().cloned());
7179 }
7180
7181 }
7184
7185 Ok(Plan::AlterConnection(AlterConnectionPlan {
7186 id: entry.id(),
7187 action: crate::plan::AlterConnectionAction::AlterOptions {
7188 set_options,
7189 drop_options,
7190 validate,
7191 },
7192 }))
7193}
7194
7195pub fn describe_alter_sink(
7196 _: &StatementContext,
7197 _: AlterSinkStatement<Aug>,
7198) -> Result<StatementDesc, PlanError> {
7199 Ok(StatementDesc::new(None))
7200}
7201
7202pub fn plan_alter_sink(
7203 scx: &mut StatementContext,
7204 stmt: AlterSinkStatement<Aug>,
7205) -> Result<Plan, PlanError> {
7206 let AlterSinkStatement {
7207 sink_name,
7208 if_exists,
7209 action,
7210 } = stmt;
7211
7212 let object_type = ObjectType::Sink;
7213 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7214
7215 let Some(item) = item else {
7216 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7217 name: sink_name.to_string(),
7218 object_type,
7219 });
7220
7221 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7222 };
7223 let item = item.at_version(RelationVersionSelector::Latest);
7225
7226 match action {
7227 AlterSinkAction::ChangeRelation(new_from) => {
7228 let create_sql = item.create_sql();
7230 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7231 let [stmt]: [StatementParseResult; 1] = stmts
7232 .try_into()
7233 .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7234 let Statement::CreateSink(stmt) = stmt.ast else {
7235 bail_internal!("create SQL of sink is not a CREATE SINK statement");
7236 };
7237
7238 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7240 stmt.from = new_from;
7241
7242 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7244 bail_internal!("plan_sink did not produce a CreateSink plan");
7245 };
7246
7247 plan.sink.version += 1;
7248
7249 Ok(Plan::AlterSink(AlterSinkPlan {
7250 item_id: item.id(),
7251 global_id: item.global_id(),
7252 sink: plan.sink,
7253 with_snapshot: plan.with_snapshot,
7254 in_cluster: plan.in_cluster,
7255 }))
7256 }
7257 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7258 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7259 }
7260}
7261
7262pub fn describe_alter_source(
7263 _: &StatementContext,
7264 _: AlterSourceStatement<Aug>,
7265) -> Result<StatementDesc, PlanError> {
7266 Ok(StatementDesc::new(None))
7268}
7269
7270generate_extracted_config!(
7271 AlterSourceAddSubsourceOption,
7272 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7273 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7274 (Details, String)
7275);
7276
7277pub fn plan_alter_source(
7278 scx: &mut StatementContext,
7279 stmt: AlterSourceStatement<Aug>,
7280) -> Result<Plan, PlanError> {
7281 let AlterSourceStatement {
7282 source_name,
7283 if_exists,
7284 action,
7285 } = stmt;
7286 let object_type = ObjectType::Source;
7287
7288 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7289 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7290 name: source_name.to_string(),
7291 object_type,
7292 });
7293
7294 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7295 }
7296
7297 match action {
7298 AlterSourceAction::SetOptions(options) => {
7299 let mut options = options.into_iter();
7300 let option = options
7301 .next()
7302 .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7303 if option.name == CreateSourceOptionName::RetainHistory {
7304 if options.next().is_some() {
7305 sql_bail!("RETAIN HISTORY must be only option");
7306 }
7307 return alter_retain_history(
7308 scx,
7309 object_type,
7310 if_exists,
7311 UnresolvedObjectName::Item(source_name),
7312 option.value,
7313 );
7314 }
7315 if option.name == CreateSourceOptionName::TimestampInterval {
7316 if options.next().is_some() {
7317 sql_bail!("TIMESTAMP INTERVAL must be only option");
7318 }
7319 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7320 }
7321 sql_bail!(
7324 "Cannot modify the {} of a SOURCE.",
7325 option.name.to_ast_string_simple()
7326 );
7327 }
7328 AlterSourceAction::ResetOptions(reset) => {
7329 let mut options = reset.into_iter();
7330 let option = options
7331 .next()
7332 .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7333 if option == CreateSourceOptionName::RetainHistory {
7334 if options.next().is_some() {
7335 sql_bail!("RETAIN HISTORY must be only option");
7336 }
7337 return alter_retain_history(
7338 scx,
7339 object_type,
7340 if_exists,
7341 UnresolvedObjectName::Item(source_name),
7342 None,
7343 );
7344 }
7345 if option == CreateSourceOptionName::TimestampInterval {
7346 if options.next().is_some() {
7347 sql_bail!("TIMESTAMP INTERVAL must be only option");
7348 }
7349 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7350 }
7351 sql_bail!(
7352 "Cannot modify the {} of a SOURCE.",
7353 option.to_ast_string_simple()
7354 );
7355 }
7356 AlterSourceAction::DropSubsources { .. } => {
7357 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7358 }
7359 AlterSourceAction::AddSubsources { .. } => {
7360 sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7361 }
7362 AlterSourceAction::RefreshReferences => {
7363 sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7364 }
7365 };
7366}
7367
7368pub fn describe_alter_system_set(
7369 _: &StatementContext,
7370 _: AlterSystemSetStatement,
7371) -> Result<StatementDesc, PlanError> {
7372 Ok(StatementDesc::new(None))
7373}
7374
7375pub fn plan_alter_system_set(
7376 _: &StatementContext,
7377 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7378) -> Result<Plan, PlanError> {
7379 let name = name.to_string();
7380 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7381 name,
7382 value: scl::plan_set_variable_to(to)?,
7383 }))
7384}
7385
7386pub fn describe_alter_system_reset(
7387 _: &StatementContext,
7388 _: AlterSystemResetStatement,
7389) -> Result<StatementDesc, PlanError> {
7390 Ok(StatementDesc::new(None))
7391}
7392
7393pub fn plan_alter_system_reset(
7394 _: &StatementContext,
7395 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7396) -> Result<Plan, PlanError> {
7397 let name = name.to_string();
7398 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7399}
7400
7401pub fn describe_alter_system_reset_all(
7402 _: &StatementContext,
7403 _: AlterSystemResetAllStatement,
7404) -> Result<StatementDesc, PlanError> {
7405 Ok(StatementDesc::new(None))
7406}
7407
7408pub fn plan_alter_system_reset_all(
7409 _: &StatementContext,
7410 _: AlterSystemResetAllStatement,
7411) -> Result<Plan, PlanError> {
7412 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7413}
7414
7415pub fn describe_alter_role(
7416 _: &StatementContext,
7417 _: AlterRoleStatement<Aug>,
7418) -> Result<StatementDesc, PlanError> {
7419 Ok(StatementDesc::new(None))
7420}
7421
7422pub fn plan_alter_role(
7423 scx: &StatementContext,
7424 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7425) -> Result<Plan, PlanError> {
7426 let option = match option {
7427 AlterRoleOption::Attributes(attrs) => {
7428 let attrs = plan_role_attributes(attrs, scx)?;
7429 PlannedAlterRoleOption::Attributes(attrs)
7430 }
7431 AlterRoleOption::Variable(variable) => {
7432 let var = plan_role_variable(variable)?;
7433 PlannedAlterRoleOption::Variable(var)
7434 }
7435 };
7436
7437 Ok(Plan::AlterRole(AlterRolePlan {
7438 id: name.id,
7439 name: name.name,
7440 option,
7441 }))
7442}
7443
7444pub fn describe_alter_table_add_column(
7445 _: &StatementContext,
7446 _: AlterTableAddColumnStatement<Aug>,
7447) -> Result<StatementDesc, PlanError> {
7448 Ok(StatementDesc::new(None))
7449}
7450
7451pub fn plan_alter_table_add_column(
7452 scx: &StatementContext,
7453 stmt: AlterTableAddColumnStatement<Aug>,
7454) -> Result<Plan, PlanError> {
7455 let AlterTableAddColumnStatement {
7456 if_exists,
7457 name,
7458 if_col_not_exist,
7459 column_name,
7460 data_type,
7461 } = stmt;
7462 let object_type = ObjectType::Table;
7463
7464 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7465
7466 let (relation_id, item_name, desc) =
7467 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7468 Some(item) => {
7469 let item_name = scx.catalog.resolve_full_name(item.name());
7471 let item = item.at_version(RelationVersionSelector::Latest);
7472 let desc = item
7473 .relation_desc()
7474 .ok_or_else(|| sql_err!("item does not have a relation description"))?
7475 .into_owned();
7476 (item.id(), item_name, desc)
7477 }
7478 None => {
7479 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7480 name: name.to_ast_string_simple(),
7481 object_type,
7482 });
7483 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7484 }
7485 };
7486
7487 let column_name = ColumnName::from(column_name.as_str());
7488 if desc.get_by_name(&column_name).is_some() {
7489 if if_col_not_exist {
7490 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7491 column_name: column_name.to_string(),
7492 object_name: item_name.item,
7493 });
7494 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7495 } else {
7496 return Err(PlanError::ColumnAlreadyExists {
7497 column_name,
7498 object_name: item_name.item,
7499 });
7500 }
7501 }
7502
7503 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7504 let column_type = scalar_type.nullable(true);
7506 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7508
7509 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7510 relation_id,
7511 column_name,
7512 column_type,
7513 raw_sql_type,
7514 }))
7515}
7516
7517pub fn describe_alter_materialized_view_apply_replacement(
7518 _: &StatementContext,
7519 _: AlterMaterializedViewApplyReplacementStatement,
7520) -> Result<StatementDesc, PlanError> {
7521 Ok(StatementDesc::new(None))
7522}
7523
7524pub fn plan_alter_materialized_view_apply_replacement(
7525 scx: &StatementContext,
7526 stmt: AlterMaterializedViewApplyReplacementStatement,
7527) -> Result<Plan, PlanError> {
7528 let AlterMaterializedViewApplyReplacementStatement {
7529 if_exists,
7530 name,
7531 replacement_name,
7532 } = stmt;
7533
7534 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7535
7536 let object_type = ObjectType::MaterializedView;
7537 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7538 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7539 name: name.to_ast_string_simple(),
7540 object_type,
7541 });
7542 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7543 };
7544
7545 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7546 .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7547
7548 if replacement.replacement_target() != Some(mv.id()) {
7549 return Err(PlanError::InvalidReplacement {
7550 item_type: mv.item_type(),
7551 item_name: scx.catalog.minimal_qualification(mv.name()),
7552 replacement_type: replacement.item_type(),
7553 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7554 });
7555 }
7556
7557 Ok(Plan::AlterMaterializedViewApplyReplacement(
7558 AlterMaterializedViewApplyReplacementPlan {
7559 id: mv.id(),
7560 replacement_id: replacement.id(),
7561 },
7562 ))
7563}
7564
7565pub fn describe_comment(
7566 _: &StatementContext,
7567 _: CommentStatement<Aug>,
7568) -> Result<StatementDesc, PlanError> {
7569 Ok(StatementDesc::new(None))
7570}
7571
7572pub fn plan_comment(
7573 scx: &mut StatementContext,
7574 stmt: CommentStatement<Aug>,
7575) -> Result<Plan, PlanError> {
7576 const MAX_COMMENT_LENGTH: usize = 1024;
7577
7578 let CommentStatement { object, comment } = stmt;
7579
7580 if let Some(c) = &comment {
7582 if c.len() > 1024 {
7583 return Err(PlanError::CommentTooLong {
7584 length: c.len(),
7585 max_size: MAX_COMMENT_LENGTH,
7586 });
7587 }
7588 }
7589
7590 let (object_id, column_pos) = match &object {
7591 com_ty @ CommentObjectType::Table { name }
7592 | com_ty @ CommentObjectType::View { name }
7593 | com_ty @ CommentObjectType::MaterializedView { name }
7594 | com_ty @ CommentObjectType::Index { name }
7595 | com_ty @ CommentObjectType::Func { name }
7596 | com_ty @ CommentObjectType::Connection { name }
7597 | com_ty @ CommentObjectType::Source { name }
7598 | com_ty @ CommentObjectType::Sink { name }
7599 | com_ty @ CommentObjectType::Secret { name } => {
7600 let item = scx.get_item_by_resolved_name(name)?;
7601 match (com_ty, item.item_type()) {
7602 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7603 (CommentObjectId::Table(item.id()), None)
7604 }
7605 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7606 (CommentObjectId::View(item.id()), None)
7607 }
7608 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7609 (CommentObjectId::MaterializedView(item.id()), None)
7610 }
7611 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7612 (CommentObjectId::Index(item.id()), None)
7613 }
7614 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7615 (CommentObjectId::Func(item.id()), None)
7616 }
7617 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7618 (CommentObjectId::Connection(item.id()), None)
7619 }
7620 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7621 (CommentObjectId::Source(item.id()), None)
7622 }
7623 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7624 (CommentObjectId::Sink(item.id()), None)
7625 }
7626 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7627 (CommentObjectId::Secret(item.id()), None)
7628 }
7629 (com_ty, cat_ty) => {
7630 let expected_type = match com_ty {
7631 CommentObjectType::Table { .. } => ObjectType::Table,
7632 CommentObjectType::View { .. } => ObjectType::View,
7633 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7634 CommentObjectType::Index { .. } => ObjectType::Index,
7635 CommentObjectType::Func { .. } => ObjectType::Func,
7636 CommentObjectType::Connection { .. } => ObjectType::Connection,
7637 CommentObjectType::Source { .. } => ObjectType::Source,
7638 CommentObjectType::Sink { .. } => ObjectType::Sink,
7639 CommentObjectType::Secret { .. } => ObjectType::Secret,
7640 _ => sql_bail!("cannot comment on this object type"),
7641 };
7642
7643 return Err(PlanError::InvalidObjectType {
7644 expected_type: SystemObjectType::Object(expected_type),
7645 actual_type: SystemObjectType::Object(cat_ty.into()),
7646 object_name: item.name().item.clone(),
7647 });
7648 }
7649 }
7650 }
7651 CommentObjectType::Type { ty } => match ty {
7652 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7653 sql_bail!("cannot comment on anonymous list or map type");
7654 }
7655 ResolvedDataType::Named { id, modifiers, .. } => {
7656 if !modifiers.is_empty() {
7657 sql_bail!("cannot comment on type with modifiers");
7658 }
7659 (CommentObjectId::Type(*id), None)
7660 }
7661 ResolvedDataType::Error => bail_internal!("unresolved data type"),
7662 },
7663 CommentObjectType::Column { name } => {
7664 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7665 match item.item_type() {
7666 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7667 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7668 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7669 CatalogItemType::MaterializedView => {
7670 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7671 }
7672 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7673 r => {
7674 return Err(PlanError::Unsupported {
7675 feature: format!("Specifying comments on a column of {r}"),
7676 discussion_no: None,
7677 });
7678 }
7679 }
7680 }
7681 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7682 CommentObjectType::Database { name } => {
7683 (CommentObjectId::Database(*name.database_id()), None)
7684 }
7685 CommentObjectType::Schema { name } => {
7686 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7690 sql_bail!(
7691 "cannot comment on schema {} because it is a temporary schema",
7692 mz_repr::namespaces::MZ_TEMP_SCHEMA
7693 );
7694 }
7695 (
7696 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7697 None,
7698 )
7699 }
7700 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7701 CommentObjectType::ClusterReplica { name } => {
7702 let replica = scx.catalog.resolve_cluster_replica(name)?;
7703 (
7704 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7705 None,
7706 )
7707 }
7708 CommentObjectType::NetworkPolicy { name } => {
7709 (CommentObjectId::NetworkPolicy(name.id), None)
7710 }
7711 };
7712
7713 if let Some(p) = column_pos {
7719 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7720 max_num_columns: MAX_NUM_COLUMNS,
7721 req_num_columns: p,
7722 })?;
7723 }
7724
7725 Ok(Plan::Comment(CommentPlan {
7726 object_id,
7727 sub_component: column_pos,
7728 comment,
7729 }))
7730}
7731
7732pub(crate) fn resolve_cluster<'a>(
7733 scx: &'a StatementContext,
7734 name: &'a Ident,
7735 if_exists: bool,
7736) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7737 match scx.resolve_cluster(Some(name)) {
7738 Ok(cluster) => Ok(Some(cluster)),
7739 Err(_) if if_exists => Ok(None),
7740 Err(e) => Err(e),
7741 }
7742}
7743
7744pub(crate) fn resolve_cluster_replica<'a>(
7745 scx: &'a StatementContext,
7746 name: &QualifiedReplica,
7747 if_exists: bool,
7748) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7749 match scx.resolve_cluster(Some(&name.cluster)) {
7750 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7751 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7752 None if if_exists => Ok(None),
7753 None => Err(sql_err!(
7754 "CLUSTER {} has no CLUSTER REPLICA named {}",
7755 cluster.name(),
7756 name.replica.as_str().quoted(),
7757 )),
7758 },
7759 Err(_) if if_exists => Ok(None),
7760 Err(e) => Err(e),
7761 }
7762}
7763
7764pub(crate) fn resolve_database<'a>(
7765 scx: &'a StatementContext,
7766 name: &'a UnresolvedDatabaseName,
7767 if_exists: bool,
7768) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7769 match scx.resolve_database(name) {
7770 Ok(database) => Ok(Some(database)),
7771 Err(_) if if_exists => Ok(None),
7772 Err(e) => Err(e),
7773 }
7774}
7775
7776pub(crate) fn resolve_schema<'a>(
7777 scx: &'a StatementContext,
7778 name: UnresolvedSchemaName,
7779 if_exists: bool,
7780) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7781 match scx.resolve_schema(name) {
7782 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7783 Err(_) if if_exists => Ok(None),
7784 Err(e) => Err(e),
7785 }
7786}
7787
7788pub(crate) fn resolve_network_policy<'a>(
7789 scx: &'a StatementContext,
7790 name: Ident,
7791 if_exists: bool,
7792) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7793 match scx.catalog.resolve_network_policy(&name.to_string()) {
7794 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7795 id: policy.id(),
7796 name: policy.name().to_string(),
7797 })),
7798 Err(_) if if_exists => Ok(None),
7799 Err(e) => Err(e.into()),
7800 }
7801}
7802
7803pub(crate) fn resolve_item_or_type<'a>(
7804 scx: &'a StatementContext,
7805 object_type: ObjectType,
7806 name: UnresolvedItemName,
7807 if_exists: bool,
7808) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7809 let name = normalize::unresolved_item_name(name)?;
7810 let catalog_item = match object_type {
7811 ObjectType::Type => scx.catalog.resolve_type(&name),
7812 ObjectType::Table
7813 | ObjectType::View
7814 | ObjectType::MaterializedView
7815 | ObjectType::Source
7816 | ObjectType::Sink
7817 | ObjectType::Index
7818 | ObjectType::Role
7819 | ObjectType::Cluster
7820 | ObjectType::ClusterReplica
7821 | ObjectType::Secret
7822 | ObjectType::Connection
7823 | ObjectType::Database
7824 | ObjectType::Schema
7825 | ObjectType::Func
7826 | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7827 };
7828
7829 match catalog_item {
7830 Ok(item) => {
7831 let is_type = ObjectType::from(item.item_type());
7832 if object_type == is_type {
7833 Ok(Some(item))
7834 } else {
7835 Err(PlanError::MismatchedObjectType {
7836 name: scx.catalog.minimal_qualification(item.name()),
7837 is_type,
7838 expected_type: object_type,
7839 })
7840 }
7841 }
7842 Err(_) if if_exists => Ok(None),
7843 Err(e) => Err(e.into()),
7844 }
7845}
7846
7847fn ensure_cluster_is_not_managed(
7849 scx: &StatementContext,
7850 cluster_id: ClusterId,
7851) -> Result<(), PlanError> {
7852 let cluster = scx.catalog.get_cluster(cluster_id);
7853 if cluster.is_managed() {
7854 Err(PlanError::ManagedCluster {
7855 cluster_name: cluster.name().to_string(),
7856 })
7857 } else {
7858 Ok(())
7859 }
7860}