1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_arrow_util::builder::ArrowBuilder;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::str::StrExt;
32use mz_ore::{soft_assert_or_log, soft_panic_or_log};
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
59 CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
60 CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
61 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
62 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
63 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
64 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
65 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
66 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
67 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
68 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
69 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
70 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
71 FormatSpecifier, GlueAvroOption, GlueAvroOptionName, IcebergSinkConfigOption, Ident,
72 IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint,
73 LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption,
74 MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption,
75 NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption,
76 NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema,
77 QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue,
78 ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar,
79 SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName,
80 Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption,
81 TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName,
82 UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition,
83 WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::ReferencedConnection;
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use mz_storage_types::wire_format::WireFormat;
124use prost::Message;
125
126use crate::ast::display::AstDisplay;
127use crate::catalog::{
128 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
129 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
130};
131use crate::iceberg::IcebergSinkConfigOptionExtracted;
132use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
133use crate::names::{
134 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
135 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
136 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
137};
138use crate::normalize::{self, ident};
139use crate::plan::error::PlanError;
140use crate::plan::query::{
141 ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
155 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
156 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
157 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
158 CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165 WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS, VarInput,
171};
172use crate::{names, parse};
173
174mod connection;
175
176const MAX_NUM_COLUMNS: usize = 256;
181
182const MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60);
183const MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
184
185static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
186 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
187
188fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
192 if partition_by.len() > desc.len() {
193 tracing::error!(
194 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
195 desc.len(),
196 partition_by.len()
197 );
198 partition_by.truncate(desc.len());
199 }
200
201 let desc_prefix = desc.iter().take(partition_by.len());
202 for (idx, ((desc_name, desc_type), partition_name)) in
203 desc_prefix.zip_eq(partition_by).enumerate()
204 {
205 let partition_name = normalize::column_name(partition_name);
206 if *desc_name != partition_name {
207 sql_bail!(
208 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
209 );
210 }
211 if !preserves_order(&desc_type.scalar_type) {
212 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
213 }
214 }
215 Ok(())
216}
217
218pub fn describe_create_database(
219 _: &StatementContext,
220 _: CreateDatabaseStatement,
221) -> Result<StatementDesc, PlanError> {
222 Ok(StatementDesc::new(None))
223}
224
225pub fn plan_create_database(
226 _: &StatementContext,
227 CreateDatabaseStatement {
228 name,
229 if_not_exists,
230 }: CreateDatabaseStatement,
231) -> Result<Plan, PlanError> {
232 Ok(Plan::CreateDatabase(CreateDatabasePlan {
233 name: normalize::ident(name.0),
234 if_not_exists,
235 }))
236}
237
238pub fn describe_create_schema(
239 _: &StatementContext,
240 _: CreateSchemaStatement,
241) -> Result<StatementDesc, PlanError> {
242 Ok(StatementDesc::new(None))
243}
244
245pub fn plan_create_schema(
246 scx: &StatementContext,
247 CreateSchemaStatement {
248 mut name,
249 if_not_exists,
250 }: CreateSchemaStatement,
251) -> Result<Plan, PlanError> {
252 if name.0.len() > 2 {
253 sql_bail!("schema name {} has more than two components", name);
254 }
255 let schema_name = normalize::ident(
256 name.0
257 .pop()
258 .expect("names always have at least one component"),
259 );
260 let database_spec = match name.0.pop() {
261 None => match scx.catalog.active_database() {
262 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
263 None => sql_bail!("no database specified and no active database"),
264 },
265 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
266 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
267 Err(_) => sql_bail!("invalid database {}", n.as_str()),
268 },
269 };
270 Ok(Plan::CreateSchema(CreateSchemaPlan {
271 database_spec,
272 schema_name,
273 if_not_exists,
274 }))
275}
276
277pub fn describe_create_table(
278 _: &StatementContext,
279 _: CreateTableStatement<Aug>,
280) -> Result<StatementDesc, PlanError> {
281 Ok(StatementDesc::new(None))
282}
283
284pub fn plan_create_table(
285 scx: &StatementContext,
286 stmt: CreateTableStatement<Aug>,
287) -> Result<Plan, PlanError> {
288 let CreateTableStatement {
289 name,
290 columns,
291 constraints,
292 if_not_exists,
293 temporary,
294 with_options,
295 } = &stmt;
296
297 let names: Vec<_> = columns
298 .iter()
299 .filter(|c| {
300 let is_versioned = c
304 .options
305 .iter()
306 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
307 !is_versioned
308 })
309 .map(|c| normalize::column_name(c.name.clone()))
310 .collect();
311
312 if let Some(dup) = names.iter().duplicates().next() {
313 sql_bail!("column {} specified more than once", dup.quoted());
314 }
315
316 let mut column_types = Vec::with_capacity(columns.len());
319 let mut defaults = Vec::with_capacity(columns.len());
320 let mut changes = BTreeMap::new();
321 let mut keys = Vec::new();
322
323 for (i, c) in columns.into_iter().enumerate() {
324 let aug_data_type = &c.data_type;
325 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
326 let mut nullable = true;
327 let mut default = Expr::null();
328 let mut versioned = false;
329 for option in &c.options {
330 match &option.option {
331 ColumnOption::NotNull => nullable = false,
332 ColumnOption::Default(expr) => {
333 let mut expr = expr.clone();
336 transform_ast::transform(scx, &mut expr)?;
337 let _ = query::plan_default_expr(scx, &expr, &ty)?;
338 default = expr.clone();
339 }
340 ColumnOption::Unique { is_primary } => {
341 keys.push(vec![i]);
342 if *is_primary {
343 nullable = false;
344 }
345 }
346 ColumnOption::Versioned { action, version } => {
347 let version = RelationVersion::from(*version);
348 versioned = true;
349
350 let name = normalize::column_name(c.name.clone());
351 let typ = ty.clone().nullable(nullable);
352
353 changes.insert(version, (action.clone(), name, typ));
354 }
355 other => {
356 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
357 }
358 }
359 }
360 if !versioned {
363 column_types.push(ty.nullable(nullable));
364 }
365 defaults.push(default);
366 }
367
368 let mut seen_primary = false;
369 'c: for constraint in constraints {
370 match constraint {
371 TableConstraint::Unique {
372 name: _,
373 columns,
374 is_primary,
375 nulls_not_distinct,
376 } => {
377 if seen_primary && *is_primary {
378 sql_bail!(
379 "multiple primary keys for table {} are not allowed",
380 name.to_ast_string_stable()
381 );
382 }
383 seen_primary = *is_primary || seen_primary;
384
385 let mut key = vec![];
386 for column in columns {
387 let column = normalize::column_name(column.clone());
388 match names.iter().position(|name| *name == column) {
389 None => sql_bail!("unknown column in constraint: {}", column),
390 Some(i) => {
391 let nullable = &mut column_types[i].nullable;
392 if *is_primary {
393 if *nulls_not_distinct {
394 sql_bail!(
395 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
396 );
397 }
398
399 *nullable = false;
400 } else if !(*nulls_not_distinct || !*nullable) {
401 break 'c;
404 }
405
406 key.push(i);
407 }
408 }
409 }
410
411 if *is_primary {
412 keys.insert(0, key);
413 } else {
414 keys.push(key);
415 }
416 }
417 TableConstraint::ForeignKey { .. } => {
418 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
421 }
422 TableConstraint::Check { .. } => {
423 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
426 }
427 }
428 }
429
430 if !keys.is_empty() {
431 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
434 }
435
436 let typ = SqlRelationType::new(column_types).with_keys(keys);
437
438 let temporary = *temporary;
439 let name = if temporary {
440 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
441 } else {
442 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
443 };
444
445 let full_name = scx.catalog.resolve_full_name(&name);
447 let partial_name = PartialItemName::from(full_name.clone());
448 if let (false, Ok(item)) = (
451 if_not_exists,
452 scx.catalog.resolve_item_or_type(&partial_name),
453 ) {
454 return Err(PlanError::ItemAlreadyExists {
455 name: full_name.to_string(),
456 item_type: item.item_type(),
457 });
458 }
459
460 let desc = RelationDesc::new(typ, names);
461 let mut desc = VersionedRelationDesc::new(desc);
462 for (version, (_action, name, typ)) in changes.into_iter() {
463 let new_version = desc.add_column(name, typ);
464 if version != new_version {
465 return Err(PlanError::InvalidTable {
466 name: full_name.item,
467 });
468 }
469 }
470
471 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
472
473 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
479 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
480
481 let compaction_window = options.iter().find_map(|o| {
482 #[allow(irrefutable_let_patterns)]
483 if let crate::plan::TableOption::RetainHistory(lcw) = o {
484 Some(lcw.clone())
485 } else {
486 None
487 }
488 });
489
490 let table = Table {
491 create_sql,
492 desc,
493 temporary,
494 compaction_window,
495 data_source: TableDataSource::TableWrites { defaults },
496 };
497 Ok(Plan::CreateTable(CreateTablePlan {
498 name,
499 table,
500 if_not_exists: *if_not_exists,
501 }))
502}
503
504pub fn describe_create_table_from_source(
505 _: &StatementContext,
506 _: CreateTableFromSourceStatement<Aug>,
507) -> Result<StatementDesc, PlanError> {
508 Ok(StatementDesc::new(None))
509}
510
511pub fn describe_create_webhook_source(
512 _: &StatementContext,
513 _: CreateWebhookSourceStatement<Aug>,
514) -> Result<StatementDesc, PlanError> {
515 Ok(StatementDesc::new(None))
516}
517
518pub fn describe_create_source(
519 _: &StatementContext,
520 _: CreateSourceStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522 Ok(StatementDesc::new(None))
523}
524
525pub fn describe_create_subsource(
526 _: &StatementContext,
527 _: CreateSubsourceStatement<Aug>,
528) -> Result<StatementDesc, PlanError> {
529 Ok(StatementDesc::new(None))
530}
531
532generate_extracted_config!(
533 CreateSourceOption,
534 (TimestampInterval, Duration),
535 (RetainHistory, OptionalDuration)
536);
537
538generate_extracted_config!(
539 PgConfigOption,
540 (Details, String),
541 (Publication, String),
542 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
543 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
544);
545
546generate_extracted_config!(
547 MySqlConfigOption,
548 (Details, String),
549 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
550 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
551);
552
553generate_extracted_config!(
554 SqlServerConfigOption,
555 (Details, String),
556 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
557 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
558);
559
560pub fn plan_create_webhook_source(
561 scx: &StatementContext,
562 mut stmt: CreateWebhookSourceStatement<Aug>,
563) -> Result<Plan, PlanError> {
564 if stmt.is_table {
565 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
566 }
567
568 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
571 let create_sql =
572 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
573
574 let CreateWebhookSourceStatement {
575 name,
576 if_not_exists,
577 body_format,
578 include_headers,
579 validate_using,
580 is_table,
581 in_cluster: _,
583 } = stmt;
584
585 let validate_using = validate_using
586 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
587 .transpose()?;
588 if let Some(WebhookValidation { expression, .. }) = &validate_using {
589 if !expression.contains_column() {
592 return Err(PlanError::WebhookValidationDoesNotUseColumns);
593 }
594 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
598 return Err(PlanError::WebhookValidationNonDeterministic);
599 }
600 }
601
602 let body_format = match body_format {
603 Format::Bytes => WebhookBodyFormat::Bytes,
604 Format::Json { array } => WebhookBodyFormat::Json { array },
605 Format::Text => WebhookBodyFormat::Text,
606 ty => {
608 return Err(PlanError::Unsupported {
609 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
610 discussion_no: None,
611 });
612 }
613 };
614
615 let mut column_ty = vec![
616 SqlColumnType {
618 scalar_type: SqlScalarType::from(body_format),
619 nullable: false,
620 },
621 ];
622 let mut column_names = vec!["body".to_string()];
623
624 let mut headers = WebhookHeaders::default();
625
626 if let Some(filters) = include_headers.column {
628 column_ty.push(SqlColumnType {
629 scalar_type: SqlScalarType::Map {
630 value_type: Box::new(SqlScalarType::String),
631 custom_id: None,
632 },
633 nullable: false,
634 });
635 column_names.push("headers".to_string());
636
637 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
638 filters.into_iter().partition_map(|filter| {
639 if filter.block {
640 itertools::Either::Right(filter.header_name)
641 } else {
642 itertools::Either::Left(filter.header_name)
643 }
644 });
645 headers.header_column = Some(WebhookHeaderFilters { allow, block });
646 }
647
648 for header in include_headers.mappings {
650 let scalar_type = header
651 .use_bytes
652 .then_some(SqlScalarType::Bytes)
653 .unwrap_or(SqlScalarType::String);
654 column_ty.push(SqlColumnType {
655 scalar_type,
656 nullable: true,
657 });
658 column_names.push(header.column_name.into_string());
659
660 let column_idx = column_ty.len() - 1;
661 assert_eq!(
663 column_idx,
664 column_names.len() - 1,
665 "header column names and types don't match"
666 );
667 headers
668 .mapped_headers
669 .insert(column_idx, (header.header_name, header.use_bytes));
670 }
671
672 let mut unique_check = HashSet::with_capacity(column_names.len());
674 for name in &column_names {
675 if !unique_check.insert(name) {
676 return Err(PlanError::AmbiguousColumn(name.clone().into()));
677 }
678 }
679 if column_names.len() > MAX_NUM_COLUMNS {
680 return Err(PlanError::TooManyColumns {
681 max_num_columns: MAX_NUM_COLUMNS,
682 req_num_columns: column_names.len(),
683 });
684 }
685
686 let typ = SqlRelationType::new(column_ty);
687 let desc = RelationDesc::new(typ, column_names);
688
689 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
691 let full_name = scx.catalog.resolve_full_name(&name);
692 let partial_name = PartialItemName::from(full_name.clone());
693 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
694 return Err(PlanError::ItemAlreadyExists {
695 name: full_name.to_string(),
696 item_type: item.item_type(),
697 });
698 }
699
700 let timeline = Timeline::EpochMilliseconds;
703
704 let plan = if is_table {
705 let data_source = DataSourceDesc::Webhook {
706 validate_using,
707 body_format,
708 headers,
709 cluster_id: Some(in_cluster.id()),
710 };
711 let data_source = TableDataSource::DataSource {
712 desc: data_source,
713 timeline,
714 };
715 Plan::CreateTable(CreateTablePlan {
716 name,
717 if_not_exists,
718 table: Table {
719 create_sql,
720 desc: VersionedRelationDesc::new(desc),
721 temporary: false,
722 compaction_window: None,
723 data_source,
724 },
725 })
726 } else {
727 let data_source = DataSourceDesc::Webhook {
728 validate_using,
729 body_format,
730 headers,
731 cluster_id: None,
733 };
734 Plan::CreateSource(CreateSourcePlan {
735 name,
736 source: Source {
737 create_sql,
738 data_source,
739 desc,
740 compaction_window: None,
741 },
742 if_not_exists,
743 timeline,
744 in_cluster: Some(in_cluster.id()),
745 })
746 };
747
748 Ok(plan)
749}
750
751pub fn plan_create_source(
752 scx: &StatementContext,
753 mut stmt: CreateSourceStatement<Aug>,
754) -> Result<Plan, PlanError> {
755 let CreateSourceStatement {
756 name,
757 in_cluster: _,
758 col_names,
759 connection: source_connection,
760 envelope,
761 if_not_exists,
762 format,
763 key_constraint,
764 include_metadata,
765 with_options,
766 external_references: referenced_subsources,
767 progress_subsource,
768 } = &stmt;
769
770 mz_ore::soft_assert_or_log!(
771 referenced_subsources.is_none(),
772 "referenced subsources must be cleared in purification"
773 );
774
775 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
776 && scx.catalog.system_vars().force_source_table_syntax();
777
778 if force_source_table_syntax {
781 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
782 Err(PlanError::UseTablesForSources(
783 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
784 ))?;
785 }
786 }
787
788 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
789
790 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
791 && include_metadata
792 .iter()
793 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
794 {
795 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
797 }
798 if !matches!(
799 source_connection,
800 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
801 ) && !include_metadata.is_empty()
802 {
803 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
804 }
805
806 if !include_metadata.is_empty()
807 && !matches!(
808 envelope,
809 ast::SourceEnvelope::Upsert { .. }
810 | ast::SourceEnvelope::None
811 | ast::SourceEnvelope::Debezium
812 )
813 {
814 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
815 }
816
817 let external_connection =
818 plan_generic_source_connection(scx, source_connection, include_metadata)?;
819
820 let CreateSourceOptionExtracted {
821 timestamp_interval,
822 retain_history,
823 seen: _,
824 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
825
826 let metadata_columns_desc = match external_connection {
827 GenericSourceConnection::Kafka(KafkaSourceConnection {
828 ref metadata_columns,
829 ..
830 }) => kafka_metadata_columns_desc(metadata_columns),
831 _ => vec![],
832 };
833
834 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
836 scx,
837 &envelope,
838 format,
839 Some(external_connection.default_key_desc()),
840 external_connection.default_value_desc(),
841 include_metadata,
842 metadata_columns_desc,
843 &external_connection,
844 )?;
845 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
846
847 let names: Vec<_> = desc.iter_names().cloned().collect();
848 if let Some(dup) = names.iter().duplicates().next() {
849 sql_bail!("column {} specified more than once", dup.quoted());
850 }
851
852 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
854 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
857
858 let key_columns = columns
859 .into_iter()
860 .map(normalize::column_name)
861 .collect::<Vec<_>>();
862
863 let mut uniq = BTreeSet::new();
864 for col in key_columns.iter() {
865 if !uniq.insert(col) {
866 sql_bail!("Repeated column name in source key constraint: {}", col);
867 }
868 }
869
870 let key_indices = key_columns
871 .iter()
872 .map(|col| {
873 let name_idx = desc
874 .get_by_name(col)
875 .map(|(idx, _type)| idx)
876 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
877 if desc.get_unambiguous_name(name_idx).is_none() {
878 sql_bail!("Ambiguous column in source key constraint: {}", col);
879 }
880 Ok(name_idx)
881 })
882 .collect::<Result<Vec<_>, _>>()?;
883
884 if !desc.typ().keys.is_empty() {
885 return Err(key_constraint_err(&desc, &key_columns));
886 } else {
887 desc = desc.with_key(key_indices);
888 }
889 }
890
891 let timestamp_interval = match timestamp_interval {
892 Some(duration) => {
893 if scx.pcx.is_some() {
897 let min = scx.catalog.system_vars().min_timestamp_interval();
898 let max = scx.catalog.system_vars().max_timestamp_interval();
899 if duration < min || duration > max {
900 return Err(PlanError::InvalidTimestampInterval {
901 min,
902 max,
903 requested: duration,
904 });
905 }
906 }
907 duration
908 }
909 None => scx.catalog.system_vars().default_timestamp_interval(),
910 };
911
912 let (desc, data_source) = match progress_subsource {
913 Some(name) => {
914 let DeferredItemName::Named(name) = name else {
915 sql_bail!("[internal error] progress subsource must be named during purification");
916 };
917 let ResolvedItemName::Item { id, .. } = name else {
918 sql_bail!("[internal error] invalid target id");
919 };
920
921 let details = match external_connection {
922 GenericSourceConnection::Kafka(ref c) => {
923 SourceExportDetails::Kafka(KafkaSourceExportDetails {
924 metadata_columns: c.metadata_columns.clone(),
925 })
926 }
927 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
928 LoadGenerator::Auction
929 | LoadGenerator::Marketing
930 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
931 LoadGenerator::Counter { .. }
932 | LoadGenerator::Clock
933 | LoadGenerator::Datums
934 | LoadGenerator::KeyValue(_) => {
935 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
936 output: LoadGeneratorOutput::Default,
937 })
938 }
939 },
940 GenericSourceConnection::Postgres(_)
941 | GenericSourceConnection::MySql(_)
942 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
943 };
944
945 let data_source = DataSourceDesc::OldSyntaxIngestion {
946 desc: SourceDesc {
947 connection: external_connection,
948 timestamp_interval,
949 },
950 progress_subsource: *id,
951 data_config: SourceExportDataConfig {
952 encoding,
953 envelope: envelope.clone(),
954 },
955 details,
956 };
957 (desc, data_source)
958 }
959 None => {
960 let desc = external_connection.timestamp_desc();
961 let data_source = DataSourceDesc::Ingestion(SourceDesc {
962 connection: external_connection,
963 timestamp_interval,
964 });
965 (desc, data_source)
966 }
967 };
968
969 let if_not_exists = *if_not_exists;
970 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
971
972 let full_name = scx.catalog.resolve_full_name(&name);
974 let partial_name = PartialItemName::from(full_name.clone());
975 if let (false, Ok(item)) = (
978 if_not_exists,
979 scx.catalog.resolve_item_or_type(&partial_name),
980 ) {
981 return Err(PlanError::ItemAlreadyExists {
982 name: full_name.to_string(),
983 item_type: item.item_type(),
984 });
985 }
986
987 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
991
992 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
993
994 let timeline = match envelope {
996 SourceEnvelope::CdcV2 => {
997 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
998 }
999 _ => Timeline::EpochMilliseconds,
1000 };
1001
1002 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1003
1004 let source = Source {
1005 create_sql,
1006 data_source,
1007 desc,
1008 compaction_window,
1009 };
1010
1011 Ok(Plan::CreateSource(CreateSourcePlan {
1012 name,
1013 source,
1014 if_not_exists,
1015 timeline,
1016 in_cluster: Some(in_cluster.id()),
1017 }))
1018}
1019
1020pub fn plan_generic_source_connection(
1021 scx: &StatementContext<'_>,
1022 source_connection: &CreateSourceConnection<Aug>,
1023 include_metadata: &Vec<SourceIncludeMetadata>,
1024) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1025 Ok(match source_connection {
1026 CreateSourceConnection::Kafka {
1027 connection,
1028 options,
1029 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1030 scx,
1031 connection,
1032 options,
1033 include_metadata,
1034 )?),
1035 CreateSourceConnection::Postgres {
1036 connection,
1037 options,
1038 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1039 scx, connection, options,
1040 )?),
1041 CreateSourceConnection::SqlServer {
1042 connection,
1043 options,
1044 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1045 scx, connection, options,
1046 )?),
1047 CreateSourceConnection::MySql {
1048 connection,
1049 options,
1050 } => {
1051 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1052 }
1053 CreateSourceConnection::LoadGenerator { generator, options } => {
1054 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1055 scx,
1056 generator,
1057 options,
1058 include_metadata,
1059 )?)
1060 }
1061 })
1062}
1063
1064fn plan_load_generator_source_connection(
1065 scx: &StatementContext<'_>,
1066 generator: &ast::LoadGenerator,
1067 options: &Vec<LoadGeneratorOption<Aug>>,
1068 include_metadata: &Vec<SourceIncludeMetadata>,
1069) -> Result<LoadGeneratorSourceConnection, PlanError> {
1070 let load_generator =
1071 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1072 let LoadGeneratorOptionExtracted {
1073 tick_interval,
1074 as_of,
1075 up_to,
1076 ..
1077 } = options.clone().try_into()?;
1078 let tick_micros = match tick_interval {
1079 Some(interval) => Some(interval.as_micros().try_into()?),
1080 None => None,
1081 };
1082 if up_to < as_of {
1083 sql_bail!("UP TO cannot be less than AS OF");
1084 }
1085 Ok(LoadGeneratorSourceConnection {
1086 load_generator,
1087 tick_micros,
1088 as_of,
1089 up_to,
1090 })
1091}
1092
1093fn plan_mysql_source_connection(
1094 scx: &StatementContext<'_>,
1095 connection: &ResolvedItemName,
1096 options: &Vec<MySqlConfigOption<Aug>>,
1097) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1098 let connection_item = scx.get_item_by_resolved_name(connection)?;
1099 match connection_item.connection()? {
1100 Connection::MySql(connection) => connection,
1101 _ => sql_bail!(
1102 "{} is not a MySQL connection",
1103 scx.catalog.resolve_full_name(connection_item.name())
1104 ),
1105 };
1106 let MySqlConfigOptionExtracted {
1107 details,
1108 text_columns: _,
1112 exclude_columns: _,
1113 seen: _,
1114 } = options.clone().try_into()?;
1115 let details = details
1116 .as_ref()
1117 .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1118 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1119 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1120 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1121 Ok(MySqlSourceConnection {
1122 connection: connection_item.id(),
1123 connection_id: connection_item.id(),
1124 details,
1125 })
1126}
1127
1128fn plan_sqlserver_source_connection(
1129 scx: &StatementContext<'_>,
1130 connection: &ResolvedItemName,
1131 options: &Vec<SqlServerConfigOption<Aug>>,
1132) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1133 let connection_item = scx.get_item_by_resolved_name(connection)?;
1134 match connection_item.connection()? {
1135 Connection::SqlServer(connection) => connection,
1136 _ => sql_bail!(
1137 "{} is not a SQL Server connection",
1138 scx.catalog.resolve_full_name(connection_item.name())
1139 ),
1140 };
1141 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1142 let details = details
1143 .as_ref()
1144 .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1145 let extras = hex::decode(details)
1146 .map_err(|e| sql_err!("{e}"))
1147 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1148 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1149 Ok(SqlServerSourceConnection {
1150 connection_id: connection_item.id(),
1151 connection: connection_item.id(),
1152 extras,
1153 })
1154}
1155
1156fn plan_postgres_source_connection(
1157 scx: &StatementContext<'_>,
1158 connection: &ResolvedItemName,
1159 options: &Vec<PgConfigOption<Aug>>,
1160) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1161 let connection_item = scx.get_item_by_resolved_name(connection)?;
1162 let PgConfigOptionExtracted {
1163 details,
1164 publication,
1165 text_columns: _,
1169 exclude_columns: _,
1173 seen: _,
1174 } = options.clone().try_into()?;
1175 let details = details
1176 .as_ref()
1177 .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1178 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1179 let details =
1180 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1181 let publication_details =
1182 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1183 Ok(PostgresSourceConnection {
1184 connection: connection_item.id(),
1185 connection_id: connection_item.id(),
1186 publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1188 publication_details,
1189 })
1190}
1191
1192fn plan_kafka_source_connection(
1193 scx: &StatementContext<'_>,
1194 connection_name: &ResolvedItemName,
1195 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1196 include_metadata: &Vec<SourceIncludeMetadata>,
1197) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1198 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1199 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1200 sql_bail!(
1201 "{} is not a kafka connection",
1202 scx.catalog.resolve_full_name(connection_item.name())
1203 )
1204 }
1205 let KafkaSourceConfigOptionExtracted {
1206 group_id_prefix,
1207 topic,
1208 mut topic_metadata_refresh_interval,
1209 start_timestamp: _, start_offset,
1211 seen: _,
1212 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1213 let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1215 let mut start_offsets = BTreeMap::new();
1216 if let Some(offsets) = start_offset {
1217 for (part, offset) in offsets.iter().enumerate() {
1218 if *offset < 0 {
1219 sql_bail!("START OFFSET must be a nonnegative integer");
1220 }
1221 start_offsets.insert(i32::try_from(part)?, *offset);
1222 }
1223 }
1224 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1225 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1228 }
1229 if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
1230 topic_metadata_refresh_interval = MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL;
1232 }
1233 let metadata_columns = include_metadata
1234 .into_iter()
1235 .flat_map(|item| match item {
1236 SourceIncludeMetadata::Timestamp { alias } => {
1237 let name = match alias {
1238 Some(name) => name.to_string(),
1239 None => "timestamp".to_owned(),
1240 };
1241 Some((name, KafkaMetadataKind::Timestamp))
1242 }
1243 SourceIncludeMetadata::Partition { alias } => {
1244 let name = match alias {
1245 Some(name) => name.to_string(),
1246 None => "partition".to_owned(),
1247 };
1248 Some((name, KafkaMetadataKind::Partition))
1249 }
1250 SourceIncludeMetadata::Offset { alias } => {
1251 let name = match alias {
1252 Some(name) => name.to_string(),
1253 None => "offset".to_owned(),
1254 };
1255 Some((name, KafkaMetadataKind::Offset))
1256 }
1257 SourceIncludeMetadata::Headers { alias } => {
1258 let name = match alias {
1259 Some(name) => name.to_string(),
1260 None => "headers".to_owned(),
1261 };
1262 Some((name, KafkaMetadataKind::Headers))
1263 }
1264 SourceIncludeMetadata::Header {
1265 alias,
1266 key,
1267 use_bytes,
1268 } => Some((
1269 alias.to_string(),
1270 KafkaMetadataKind::Header {
1271 key: key.clone(),
1272 use_bytes: *use_bytes,
1273 },
1274 )),
1275 SourceIncludeMetadata::Key { .. } => {
1276 None
1278 }
1279 })
1280 .collect();
1281 Ok(KafkaSourceConnection {
1282 connection: connection_item.id(),
1283 connection_id: connection_item.id(),
1284 topic,
1285 start_offsets,
1286 group_id_prefix,
1287 topic_metadata_refresh_interval,
1288 metadata_columns,
1289 })
1290}
1291
1292fn apply_source_envelope_encoding(
1293 scx: &StatementContext,
1294 envelope: &ast::SourceEnvelope,
1295 format: &Option<FormatSpecifier<Aug>>,
1296 key_desc: Option<RelationDesc>,
1297 value_desc: RelationDesc,
1298 include_metadata: &[SourceIncludeMetadata],
1299 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1300 source_connection: &GenericSourceConnection<ReferencedConnection>,
1301) -> Result<
1302 (
1303 RelationDesc,
1304 SourceEnvelope,
1305 Option<SourceDataEncoding<ReferencedConnection>>,
1306 ),
1307 PlanError,
1308> {
1309 let encoding = match format {
1310 Some(format) => Some(get_encoding(scx, format, envelope)?),
1311 None => None,
1312 };
1313
1314 let (key_desc, value_desc) = match &encoding {
1315 Some(encoding) => {
1316 match value_desc.typ().columns() {
1319 [typ] => match typ.scalar_type {
1320 SqlScalarType::Bytes => {}
1321 _ => sql_bail!(
1322 "The schema produced by the source is incompatible with format decoding"
1323 ),
1324 },
1325 _ => sql_bail!(
1326 "The schema produced by the source is incompatible with format decoding"
1327 ),
1328 }
1329
1330 let (key_desc, value_desc) = encoding.desc()?;
1331
1332 let key_desc = key_desc.map(|desc| {
1339 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1340 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1341 if is_kafka && is_envelope_none {
1342 RelationDesc::from_names_and_types(
1343 desc.into_iter()
1344 .map(|(name, typ)| (name, typ.nullable(true))),
1345 )
1346 } else {
1347 desc
1348 }
1349 });
1350 (key_desc, value_desc)
1351 }
1352 None => (key_desc, value_desc),
1353 };
1354
1355 let key_envelope_no_encoding = matches!(
1368 source_connection,
1369 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1370 load_generator: LoadGenerator::KeyValue(_),
1371 ..
1372 })
1373 );
1374 let mut key_envelope = get_key_envelope(
1375 include_metadata,
1376 encoding.as_ref(),
1377 key_envelope_no_encoding,
1378 )?;
1379
1380 match (&envelope, &key_envelope) {
1381 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1382 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1383 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1384 ),
1385 _ => {}
1386 };
1387
1388 let envelope = match &envelope {
1397 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1399 ast::SourceEnvelope::Debezium => {
1400 let after_idx = match typecheck_debezium(&value_desc) {
1402 Ok((_before_idx, after_idx)) => Ok(after_idx),
1403 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1404 Some(DataEncoding::Avro(_)) => Err(type_err),
1405 _ => Err(sql_err!(
1406 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1407 )),
1408 },
1409 }?;
1410
1411 UnplannedSourceEnvelope::Upsert {
1412 style: UpsertStyle::Debezium { after_idx },
1413 }
1414 }
1415 ast::SourceEnvelope::Upsert {
1416 value_decode_err_policy,
1417 } => {
1418 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1419 None => {
1420 if !key_envelope_no_encoding {
1421 bail_unsupported!(format!(
1422 "UPSERT requires a key/value format: {:?}",
1423 format
1424 ))
1425 }
1426 None
1427 }
1428 Some(key_encoding) => Some(key_encoding),
1429 };
1430 if key_envelope == KeyEnvelope::None {
1433 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1434 }
1435 let style = match value_decode_err_policy.as_slice() {
1437 [] => UpsertStyle::Default(key_envelope),
1438 [SourceErrorPolicy::Inline { alias }] => {
1439 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1440 UpsertStyle::ValueErrInline {
1441 key_envelope,
1442 error_column: alias
1443 .as_ref()
1444 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1445 }
1446 }
1447 _ => {
1448 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1449 }
1450 };
1451
1452 UnplannedSourceEnvelope::Upsert { style }
1453 }
1454 ast::SourceEnvelope::CdcV2 => {
1455 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1456 match format {
1458 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1459 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1460 }
1461 UnplannedSourceEnvelope::CdcV2
1462 }
1463 };
1464
1465 let metadata_desc = included_column_desc(metadata_columns_desc);
1466 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1467
1468 Ok((desc, envelope, encoding))
1469}
1470
1471fn plan_source_export_desc(
1474 scx: &StatementContext,
1475 name: &UnresolvedItemName,
1476 columns: &Vec<ColumnDef<Aug>>,
1477 constraints: &Vec<TableConstraint<Aug>>,
1478) -> Result<RelationDesc, PlanError> {
1479 let names: Vec<_> = columns
1480 .iter()
1481 .map(|c| normalize::column_name(c.name.clone()))
1482 .collect();
1483
1484 if let Some(dup) = names.iter().duplicates().next() {
1485 sql_bail!("column {} specified more than once", dup.quoted());
1486 }
1487
1488 let mut column_types = Vec::with_capacity(columns.len());
1491 let mut keys = Vec::new();
1492
1493 for (i, c) in columns.into_iter().enumerate() {
1494 let aug_data_type = &c.data_type;
1495 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1496 let mut nullable = true;
1497 for option in &c.options {
1498 match &option.option {
1499 ColumnOption::NotNull => nullable = false,
1500 ColumnOption::Default(_) => {
1501 bail_unsupported!("Source export with default value")
1502 }
1503 ColumnOption::Unique { is_primary } => {
1504 keys.push(vec![i]);
1505 if *is_primary {
1506 nullable = false;
1507 }
1508 }
1509 other => {
1510 bail_unsupported!(format!("Source export with column constraint: {}", other))
1511 }
1512 }
1513 }
1514 column_types.push(ty.nullable(nullable));
1515 }
1516
1517 let mut seen_primary = false;
1518 'c: for constraint in constraints {
1519 match constraint {
1520 TableConstraint::Unique {
1521 name: _,
1522 columns,
1523 is_primary,
1524 nulls_not_distinct,
1525 } => {
1526 if seen_primary && *is_primary {
1527 sql_bail!(
1528 "multiple primary keys for source export {} are not allowed",
1529 name.to_ast_string_stable()
1530 );
1531 }
1532 seen_primary = *is_primary || seen_primary;
1533
1534 let mut key = vec![];
1535 for column in columns {
1536 let column = normalize::column_name(column.clone());
1537 match names.iter().position(|name| *name == column) {
1538 None => sql_bail!("unknown column in constraint: {}", column),
1539 Some(i) => {
1540 let nullable = &mut column_types[i].nullable;
1541 if *is_primary {
1542 if *nulls_not_distinct {
1543 sql_bail!(
1544 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1545 );
1546 }
1547 *nullable = false;
1548 } else if !(*nulls_not_distinct || !*nullable) {
1549 break 'c;
1552 }
1553
1554 key.push(i);
1555 }
1556 }
1557 }
1558
1559 if *is_primary {
1560 keys.insert(0, key);
1561 } else {
1562 keys.push(key);
1563 }
1564 }
1565 TableConstraint::ForeignKey { .. } => {
1566 bail_unsupported!("Source export with a foreign key")
1567 }
1568 TableConstraint::Check { .. } => {
1569 bail_unsupported!("Source export with a check constraint")
1570 }
1571 }
1572 }
1573
1574 let typ = SqlRelationType::new(column_types).with_keys(keys);
1575 let desc = RelationDesc::new(typ, names);
1576 Ok(desc)
1577}
1578
1579generate_extracted_config!(
1580 CreateSubsourceOption,
1581 (Progress, bool, Default(false)),
1582 (ExternalReference, UnresolvedItemName),
1583 (RetainHistory, OptionalDuration),
1584 (TextColumns, Vec::<Ident>, Default(vec![])),
1585 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1586 (Details, String)
1587);
1588
1589pub fn plan_create_subsource(
1590 scx: &StatementContext,
1591 stmt: CreateSubsourceStatement<Aug>,
1592) -> Result<Plan, PlanError> {
1593 let CreateSubsourceStatement {
1594 name,
1595 columns,
1596 of_source,
1597 constraints,
1598 if_not_exists,
1599 with_options,
1600 } = &stmt;
1601
1602 let CreateSubsourceOptionExtracted {
1603 progress,
1604 retain_history,
1605 external_reference,
1606 text_columns,
1607 exclude_columns,
1608 details,
1609 seen: _,
1610 } = with_options.clone().try_into()?;
1611
1612 if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1617 bail_internal!(
1618 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1619 );
1620 }
1621
1622 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1623
1624 let data_source = if let Some(source_reference) = of_source {
1625 if scx.catalog.system_vars().enable_create_table_from_source()
1628 && scx.catalog.system_vars().force_source_table_syntax()
1629 {
1630 Err(PlanError::UseTablesForSources(
1631 "CREATE SUBSOURCE".to_string(),
1632 ))?;
1633 }
1634
1635 let ingestion_id = *source_reference.item_id();
1638 let external_reference = external_reference.ok_or_else(|| {
1639 sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1640 })?;
1641
1642 let details = details
1645 .as_ref()
1646 .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1647 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1648 let details =
1649 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1650 let details =
1651 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1652 let details = match details {
1653 SourceExportStatementDetails::Postgres { table } => {
1654 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1655 column_casts: crate::pure::postgres::generate_column_casts(
1656 scx,
1657 &table,
1658 &text_columns,
1659 )?,
1660 table,
1661 })
1662 }
1663 SourceExportStatementDetails::MySql {
1664 table,
1665 initial_gtid_set,
1666 binlog_full_metadata,
1667 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1668 table,
1669 initial_gtid_set,
1670 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1671 exclude_columns: exclude_columns
1672 .into_iter()
1673 .map(|c| c.into_string())
1674 .collect(),
1675 binlog_full_metadata,
1676 }),
1677 SourceExportStatementDetails::SqlServer {
1678 table,
1679 capture_instance,
1680 initial_lsn,
1681 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1682 capture_instance,
1683 table,
1684 initial_lsn,
1685 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1686 exclude_columns: exclude_columns
1687 .into_iter()
1688 .map(|c| c.into_string())
1689 .collect(),
1690 }),
1691 SourceExportStatementDetails::LoadGenerator { output } => {
1692 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1693 }
1694 SourceExportStatementDetails::Kafka {} => {
1695 bail_unsupported!("subsources cannot reference Kafka sources")
1696 }
1697 };
1698 DataSourceDesc::IngestionExport {
1699 ingestion_id,
1700 external_reference,
1701 details,
1702 data_config: SourceExportDataConfig {
1704 envelope: SourceEnvelope::None(NoneEnvelope {
1705 key_envelope: KeyEnvelope::None,
1706 key_arity: 0,
1707 }),
1708 encoding: None,
1709 },
1710 }
1711 } else if progress {
1712 DataSourceDesc::Progress
1713 } else {
1714 sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1715 };
1716
1717 let if_not_exists = *if_not_exists;
1718 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1719
1720 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1721
1722 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1723 let source = Source {
1724 create_sql,
1725 data_source,
1726 desc,
1727 compaction_window,
1728 };
1729
1730 Ok(Plan::CreateSource(CreateSourcePlan {
1731 name,
1732 source,
1733 if_not_exists,
1734 timeline: Timeline::EpochMilliseconds,
1735 in_cluster: None,
1736 }))
1737}
1738
1739generate_extracted_config!(
1740 TableFromSourceOption,
1741 (TextColumns, Vec::<Ident>, Default(vec![])),
1742 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1743 (PartitionBy, Vec<Ident>),
1744 (RetainHistory, OptionalDuration),
1745 (Details, String)
1746);
1747
1748pub fn plan_create_table_from_source(
1749 scx: &StatementContext,
1750 stmt: CreateTableFromSourceStatement<Aug>,
1751) -> Result<Plan, PlanError> {
1752 if !scx.catalog.system_vars().enable_create_table_from_source() {
1753 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1754 }
1755
1756 let CreateTableFromSourceStatement {
1757 name,
1758 columns,
1759 constraints,
1760 if_not_exists,
1761 source,
1762 external_reference,
1763 envelope,
1764 format,
1765 include_metadata,
1766 with_options,
1767 } = &stmt;
1768
1769 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1770
1771 let TableFromSourceOptionExtracted {
1772 text_columns,
1773 exclude_columns,
1774 retain_history,
1775 partition_by,
1776 details,
1777 seen: _,
1778 } = with_options.clone().try_into()?;
1779
1780 let source_item = scx.get_item_by_resolved_name(source)?;
1781 let ingestion_id = source_item.id();
1782
1783 let details = details
1786 .as_ref()
1787 .ok_or_else(|| internal_err!("source-export missing details"))?;
1788 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1789 let details =
1790 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1791 let details =
1792 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1793
1794 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1795 && include_metadata
1796 .iter()
1797 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1798 {
1799 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1801 }
1802 if !matches!(
1803 details,
1804 SourceExportStatementDetails::Kafka { .. }
1805 | SourceExportStatementDetails::LoadGenerator { .. }
1806 ) && !include_metadata.is_empty()
1807 {
1808 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1809 }
1810
1811 let details = match details {
1812 SourceExportStatementDetails::Postgres { table } => {
1813 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1814 column_casts: crate::pure::postgres::generate_column_casts(
1815 scx,
1816 &table,
1817 &text_columns,
1818 )?,
1819 table,
1820 })
1821 }
1822 SourceExportStatementDetails::MySql {
1823 table,
1824 initial_gtid_set,
1825 binlog_full_metadata,
1826 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1827 table,
1828 initial_gtid_set,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 binlog_full_metadata,
1835 }),
1836 SourceExportStatementDetails::SqlServer {
1837 table,
1838 capture_instance,
1839 initial_lsn,
1840 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1841 table,
1842 capture_instance,
1843 initial_lsn,
1844 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1845 exclude_columns: exclude_columns
1846 .into_iter()
1847 .map(|c| c.into_string())
1848 .collect(),
1849 }),
1850 SourceExportStatementDetails::LoadGenerator { output } => {
1851 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1852 }
1853 SourceExportStatementDetails::Kafka {} => {
1854 if !include_metadata.is_empty()
1855 && !matches!(
1856 envelope,
1857 ast::SourceEnvelope::Upsert { .. }
1858 | ast::SourceEnvelope::None
1859 | ast::SourceEnvelope::Debezium
1860 )
1861 {
1862 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1864 }
1865
1866 let metadata_columns = include_metadata
1867 .into_iter()
1868 .flat_map(|item| match item {
1869 SourceIncludeMetadata::Timestamp { alias } => {
1870 let name = match alias {
1871 Some(name) => name.to_string(),
1872 None => "timestamp".to_owned(),
1873 };
1874 Some((name, KafkaMetadataKind::Timestamp))
1875 }
1876 SourceIncludeMetadata::Partition { alias } => {
1877 let name = match alias {
1878 Some(name) => name.to_string(),
1879 None => "partition".to_owned(),
1880 };
1881 Some((name, KafkaMetadataKind::Partition))
1882 }
1883 SourceIncludeMetadata::Offset { alias } => {
1884 let name = match alias {
1885 Some(name) => name.to_string(),
1886 None => "offset".to_owned(),
1887 };
1888 Some((name, KafkaMetadataKind::Offset))
1889 }
1890 SourceIncludeMetadata::Headers { alias } => {
1891 let name = match alias {
1892 Some(name) => name.to_string(),
1893 None => "headers".to_owned(),
1894 };
1895 Some((name, KafkaMetadataKind::Headers))
1896 }
1897 SourceIncludeMetadata::Header {
1898 alias,
1899 key,
1900 use_bytes,
1901 } => Some((
1902 alias.to_string(),
1903 KafkaMetadataKind::Header {
1904 key: key.clone(),
1905 use_bytes: *use_bytes,
1906 },
1907 )),
1908 SourceIncludeMetadata::Key { .. } => {
1909 None
1911 }
1912 })
1913 .collect();
1914
1915 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1916 }
1917 };
1918
1919 let source_connection = &source_item
1920 .source_desc()?
1921 .ok_or_else(|| sql_err!("item is not a source"))?
1922 .connection;
1923
1924 let (key_desc, value_desc) =
1929 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1930 let columns = match columns {
1931 TableFromSourceColumns::Defined(columns) => columns,
1932 _ => bail_internal!("expected column definitions to be present"),
1933 };
1934 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1935 (None, desc)
1936 } else {
1937 let key_desc = source_connection.default_key_desc();
1938 let value_desc = source_connection.default_value_desc();
1939 (Some(key_desc), value_desc)
1940 };
1941
1942 let metadata_columns_desc = match &details {
1943 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1944 metadata_columns, ..
1945 }) => kafka_metadata_columns_desc(metadata_columns),
1946 _ => vec![],
1947 };
1948
1949 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1950 scx,
1951 &envelope,
1952 format,
1953 key_desc,
1954 value_desc,
1955 include_metadata,
1956 metadata_columns_desc,
1957 source_connection,
1958 )?;
1959 if let TableFromSourceColumns::Named(col_names) = columns {
1960 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1961 }
1962
1963 let names: Vec<_> = desc.iter_names().cloned().collect();
1964 if let Some(dup) = names.iter().duplicates().next() {
1965 sql_bail!("column {} specified more than once", dup.quoted());
1966 }
1967
1968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1969
1970 let timeline = match envelope {
1973 SourceEnvelope::CdcV2 => {
1974 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1975 }
1976 _ => Timeline::EpochMilliseconds,
1977 };
1978
1979 if let Some(partition_by) = partition_by {
1980 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1981 check_partition_by(&desc, partition_by)?;
1982 }
1983
1984 let data_source = DataSourceDesc::IngestionExport {
1985 ingestion_id,
1986 external_reference: external_reference
1988 .as_ref()
1989 .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1990 .clone(),
1991 details,
1992 data_config: SourceExportDataConfig { envelope, encoding },
1993 };
1994
1995 let if_not_exists = *if_not_exists;
1996
1997 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1998
1999 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2000 let table = Table {
2001 create_sql,
2002 desc: VersionedRelationDesc::new(desc),
2003 temporary: false,
2004 compaction_window,
2005 data_source: TableDataSource::DataSource {
2006 desc: data_source,
2007 timeline,
2008 },
2009 };
2010
2011 Ok(Plan::CreateTable(CreateTablePlan {
2012 name,
2013 table,
2014 if_not_exists,
2015 }))
2016}
2017
2018generate_extracted_config!(
2019 LoadGeneratorOption,
2020 (TickInterval, Duration),
2021 (AsOf, u64, Default(0_u64)),
2022 (UpTo, u64, Default(u64::MAX)),
2023 (ScaleFactor, f64),
2024 (MaxCardinality, u64),
2025 (Keys, u64),
2026 (SnapshotRounds, u64),
2027 (TransactionalSnapshot, bool),
2028 (ValueSize, u64),
2029 (Seed, u64),
2030 (Partitions, u64),
2031 (BatchSize, u64)
2032);
2033
2034impl LoadGeneratorOptionExtracted {
2035 pub(super) fn ensure_only_valid_options(
2036 &self,
2037 loadgen: &ast::LoadGenerator,
2038 ) -> Result<(), PlanError> {
2039 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2040
2041 let mut options = self.seen.clone();
2042
2043 let permitted_options: &[_] = match loadgen {
2044 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2045 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2046 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2047 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2048 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2049 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2050 ast::LoadGenerator::KeyValue => &[
2051 TickInterval,
2052 Keys,
2053 SnapshotRounds,
2054 TransactionalSnapshot,
2055 ValueSize,
2056 Seed,
2057 Partitions,
2058 BatchSize,
2059 ],
2060 };
2061
2062 for o in permitted_options {
2063 options.remove(o);
2064 }
2065
2066 if !options.is_empty() {
2067 sql_bail!(
2068 "{} load generators do not support {} values",
2069 loadgen,
2070 options.iter().join(", ")
2071 )
2072 }
2073
2074 Ok(())
2075 }
2076}
2077
2078pub(crate) fn load_generator_ast_to_generator(
2079 scx: &StatementContext,
2080 loadgen: &ast::LoadGenerator,
2081 options: &[LoadGeneratorOption<Aug>],
2082 include_metadata: &[SourceIncludeMetadata],
2083) -> Result<LoadGenerator, PlanError> {
2084 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2085 extracted.ensure_only_valid_options(loadgen)?;
2086
2087 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2088 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2089 }
2090
2091 let load_generator = match loadgen {
2092 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2093 ast::LoadGenerator::Clock => {
2094 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2095 LoadGenerator::Clock
2096 }
2097 ast::LoadGenerator::Counter => {
2098 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2099 let LoadGeneratorOptionExtracted {
2100 max_cardinality, ..
2101 } = extracted;
2102 LoadGenerator::Counter { max_cardinality }
2103 }
2104 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2105 ast::LoadGenerator::Datums => {
2106 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2107 LoadGenerator::Datums
2108 }
2109 ast::LoadGenerator::Tpch => {
2110 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2111
2112 let sf: f64 = scale_factor.unwrap_or(0.01);
2114 if !sf.is_finite() || sf < 0.0 {
2115 sql_bail!("unsupported scale factor {sf}");
2116 }
2117
2118 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2119 let total = (sf * multiplier).floor();
2120 let mut i = i64::try_cast_from(total)
2121 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2122 if i < 1 {
2123 i = 1;
2124 }
2125 Ok(i)
2126 };
2127
2128 let count_supplier = f_to_i(10_000f64)?;
2131 let count_part = f_to_i(200_000f64)?;
2132 let count_customer = f_to_i(150_000f64)?;
2133 let count_orders = f_to_i(150_000f64 * 10f64)?;
2134 let count_clerk = f_to_i(1_000f64)?;
2135
2136 LoadGenerator::Tpch {
2137 count_supplier,
2138 count_part,
2139 count_customer,
2140 count_orders,
2141 count_clerk,
2142 }
2143 }
2144 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2145 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2146 let LoadGeneratorOptionExtracted {
2147 keys,
2148 snapshot_rounds,
2149 transactional_snapshot,
2150 value_size,
2151 tick_interval,
2152 seed,
2153 partitions,
2154 batch_size,
2155 ..
2156 } = extracted;
2157
2158 let mut include_offset = None;
2159 for im in include_metadata {
2160 match im {
2161 SourceIncludeMetadata::Offset { alias } => {
2162 include_offset = match alias {
2163 Some(alias) => Some(alias.to_string()),
2164 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2165 }
2166 }
2167 SourceIncludeMetadata::Key { .. } => continue,
2168
2169 _ => {
2170 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2171 }
2172 };
2173 }
2174
2175 let lgkv = KeyValueLoadGenerator {
2176 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2177 snapshot_rounds: snapshot_rounds
2178 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2179 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2181 value_size: value_size
2182 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2183 partitions: partitions
2184 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2185 tick_interval,
2186 batch_size: batch_size
2187 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2188 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2189 include_offset,
2190 };
2191
2192 if lgkv.keys == 0
2193 || lgkv.partitions == 0
2194 || lgkv.value_size == 0
2195 || lgkv.batch_size == 0
2196 {
2197 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2198 }
2199
2200 if lgkv.keys % lgkv.partitions != 0 {
2201 sql_bail!("KEYS must be a multiple of PARTITIONS")
2202 }
2203
2204 if lgkv.batch_size > lgkv.keys {
2205 sql_bail!("KEYS must be larger than BATCH SIZE")
2206 }
2207
2208 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2211 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2212 }
2213
2214 if lgkv.snapshot_rounds == 0 {
2215 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2216 }
2217
2218 LoadGenerator::KeyValue(lgkv)
2219 }
2220 };
2221
2222 Ok(load_generator)
2223}
2224
2225fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2226 let before = value_desc.get_by_name(&"before".into());
2227 let (after_idx, after_ty) = value_desc
2228 .get_by_name(&"after".into())
2229 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2230 let before_idx = if let Some((before_idx, before_ty)) = before {
2231 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2232 sql_bail!("'before' column must be of type record");
2233 }
2234 if before_ty != after_ty {
2235 sql_bail!("'before' type differs from 'after' column");
2236 }
2237 Some(before_idx)
2238 } else {
2239 None
2240 };
2241 Ok((before_idx, after_idx))
2242}
2243
2244fn get_encoding(
2245 scx: &StatementContext,
2246 format: &FormatSpecifier<Aug>,
2247 envelope: &ast::SourceEnvelope,
2248) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2249 let encoding = match format {
2250 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2251 FormatSpecifier::KeyValue { key, value } => {
2252 let key = {
2253 let encoding = get_encoding_inner(scx, key)?;
2254 Some(encoding.key.unwrap_or(encoding.value))
2255 };
2256 let value = get_encoding_inner(scx, value)?.value;
2257 SourceDataEncoding { key, value }
2258 }
2259 };
2260
2261 let requires_keyvalue = matches!(
2262 envelope,
2263 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2264 );
2265 let is_keyvalue = encoding.key.is_some();
2266 if requires_keyvalue && !is_keyvalue {
2267 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2268 };
2269
2270 Ok(encoding)
2271}
2272
2273fn source_sink_cluster_config<'a, 'ctx>(
2279 scx: &'a StatementContext<'ctx>,
2280 in_cluster: &mut Option<ResolvedClusterName>,
2281) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2282 let cluster = match in_cluster {
2283 None => {
2284 let cluster = scx.catalog.resolve_cluster(None)?;
2285 *in_cluster = Some(ResolvedClusterName {
2286 id: cluster.id(),
2287 print_name: None,
2288 });
2289 cluster
2290 }
2291 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2292 };
2293
2294 Ok(cluster)
2295}
2296
2297generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2298
2299generate_extracted_config!(GlueAvroOption, (SchemaName, String));
2300
2301#[derive(Debug)]
2302pub struct Schema {
2303 pub key_schema: Option<String>,
2304 pub value_schema: String,
2305 pub key_reference_schemas: Vec<String>,
2307 pub value_reference_schemas: Vec<String>,
2309 pub wire_format: WireFormat<ReferencedConnection>,
2313}
2314
2315fn get_encoding_inner(
2316 scx: &StatementContext,
2317 format: &Format<Aug>,
2318) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2319 let value = match format {
2320 Format::Bytes => DataEncoding::Bytes,
2321 Format::Avro(schema) => {
2322 let Schema {
2323 key_schema,
2324 value_schema,
2325 key_reference_schemas,
2326 value_reference_schemas,
2327 wire_format,
2328 } = match schema {
2329 AvroSchema::InlineSchema {
2332 schema: ast::Schema { schema },
2333 with_options,
2334 } => {
2335 let AvroSchemaOptionExtracted {
2336 confluent_wire_format,
2337 ..
2338 } = with_options.clone().try_into()?;
2339 let wire_format = if confluent_wire_format {
2340 WireFormat::Confluent { registry: None }
2341 } else {
2342 WireFormat::None
2343 };
2344 Schema {
2345 key_schema: None,
2346 value_schema: schema.clone(),
2347 key_reference_schemas: vec![],
2348 value_reference_schemas: vec![],
2349 wire_format,
2350 }
2351 }
2352 AvroSchema::Csr {
2353 csr_connection:
2354 CsrConnectionAvro {
2355 connection,
2356 seed,
2357 key_strategy: _,
2358 value_strategy: _,
2359 },
2360 } => {
2361 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2362 let csr_connection = match item.connection()? {
2363 Connection::Csr(_) => item.id(),
2364 _ => {
2365 sql_bail!(
2366 "{} is not a Confluent Schema Registry connection",
2367 scx.catalog
2368 .resolve_full_name(item.name())
2369 .to_string()
2370 .quoted()
2371 )
2372 }
2373 };
2374
2375 if let Some(seed) = seed {
2376 Schema {
2377 key_schema: seed.key_schema.clone(),
2378 value_schema: seed.value_schema.clone(),
2379 key_reference_schemas: seed.key_reference_schemas.clone(),
2380 value_reference_schemas: seed.value_reference_schemas.clone(),
2381 wire_format: WireFormat::Confluent {
2382 registry: Some(csr_connection),
2383 },
2384 }
2385 } else {
2386 sql_bail!("Avro CSR seed resolution has not been performed")
2387 }
2388 }
2389 AvroSchema::Glue {
2390 connection,
2391 with_options: _,
2392 seed,
2393 } => {
2394 let item = scx.get_item_by_resolved_name(connection)?;
2395 let glue_connection = match item.connection()? {
2396 Connection::GlueSchemaRegistry(_) => item.id(),
2397 _ => {
2398 sql_bail!(
2399 "{} is not an AWS Glue Schema Registry connection",
2400 scx.catalog
2401 .resolve_full_name(item.name())
2402 .to_string()
2403 .quoted()
2404 )
2405 }
2406 };
2407
2408 let Some(seed) = seed else {
2412 sql_bail!("Avro Glue seed resolution has not been performed");
2413 };
2414
2415 Schema {
2416 key_schema: None,
2417 value_schema: seed.value_schema.clone(),
2418 key_reference_schemas: vec![],
2419 value_reference_schemas: vec![],
2420 wire_format: WireFormat::Glue {
2421 registry: Some(glue_connection),
2422 },
2423 }
2424 }
2425 };
2426
2427 if let Some(key_schema) = key_schema {
2428 return Ok(SourceDataEncoding {
2429 key: Some(DataEncoding::Avro(AvroEncoding {
2430 schema: key_schema,
2431 reference_schemas: key_reference_schemas,
2432 wire_format: wire_format.clone(),
2433 })),
2434 value: DataEncoding::Avro(AvroEncoding {
2435 schema: value_schema,
2436 reference_schemas: value_reference_schemas,
2437 wire_format,
2438 }),
2439 });
2440 } else {
2441 DataEncoding::Avro(AvroEncoding {
2442 schema: value_schema,
2443 reference_schemas: value_reference_schemas,
2444 wire_format,
2445 })
2446 }
2447 }
2448 Format::Protobuf(schema) => match schema {
2449 ProtobufSchema::Csr {
2450 csr_connection:
2451 CsrConnectionProtobuf {
2452 connection:
2453 CsrConnection {
2454 connection,
2455 options,
2456 },
2457 seed,
2458 },
2459 } => {
2460 if let Some(CsrSeedProtobuf { key, value }) = seed {
2461 let item = scx.get_item_by_resolved_name(connection)?;
2462 let _ = match item.connection()? {
2463 Connection::Csr(connection) => connection,
2464 _ => {
2465 sql_bail!(
2466 "{} is not a schema registry connection",
2467 scx.catalog
2468 .resolve_full_name(item.name())
2469 .to_string()
2470 .quoted()
2471 )
2472 }
2473 };
2474
2475 if !options.is_empty() {
2476 sql_bail!("Protobuf CSR connections do not support any options");
2477 }
2478
2479 let value = DataEncoding::Protobuf(ProtobufEncoding {
2480 descriptors: strconv::parse_bytes(&value.schema)?,
2481 message_name: value.message_name.clone(),
2482 confluent_wire_format: true,
2483 });
2484 if let Some(key) = key {
2485 return Ok(SourceDataEncoding {
2486 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2487 descriptors: strconv::parse_bytes(&key.schema)?,
2488 message_name: key.message_name.clone(),
2489 confluent_wire_format: true,
2490 })),
2491 value,
2492 });
2493 }
2494 value
2495 } else {
2496 sql_bail!("Protobuf CSR seed resolution has not been performed")
2497 }
2498 }
2499 ProtobufSchema::InlineSchema {
2500 message_name,
2501 schema: ast::Schema { schema },
2502 } => {
2503 let descriptors = strconv::parse_bytes(schema)?;
2504
2505 DataEncoding::Protobuf(ProtobufEncoding {
2506 descriptors,
2507 message_name: message_name.to_owned(),
2508 confluent_wire_format: false,
2509 })
2510 }
2511 },
2512 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2513 regex: mz_repr::adt::regex::Regex::new(regex, false)
2514 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2515 }),
2516 Format::Csv { columns, delimiter } => {
2517 let columns = match columns {
2518 CsvColumns::Header { names } => {
2519 if names.is_empty() {
2520 sql_bail!("[internal error] column spec should get names in purify")
2521 }
2522 ColumnSpec::Header {
2523 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2524 }
2525 }
2526 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2527 };
2528 DataEncoding::Csv(CsvEncoding {
2529 columns,
2530 delimiter: u8::try_from(*delimiter)
2531 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2532 })
2533 }
2534 Format::Json { array: false } => DataEncoding::Json,
2535 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2536 Format::Text => DataEncoding::Text,
2537 };
2538 Ok(SourceDataEncoding { key: None, value })
2539}
2540
2541fn get_key_envelope(
2543 included_items: &[SourceIncludeMetadata],
2544 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2545 key_envelope_no_encoding: bool,
2546) -> Result<KeyEnvelope, PlanError> {
2547 let key_definition = included_items
2548 .iter()
2549 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2550 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2551 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2552 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2553 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2554 (Some(name), _) if key_envelope_no_encoding => {
2555 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2556 }
2557 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2558 (_, None) => {
2559 sql_bail!(
2563 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2564 got bare FORMAT"
2565 );
2566 }
2567 }
2568 } else {
2569 Ok(KeyEnvelope::None)
2570 }
2571}
2572
2573fn get_unnamed_key_envelope(
2576 key: Option<&DataEncoding<ReferencedConnection>>,
2577) -> Result<KeyEnvelope, PlanError> {
2578 let is_composite = match key {
2582 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2583 Some(
2584 DataEncoding::Avro(_)
2585 | DataEncoding::Csv(_)
2586 | DataEncoding::Protobuf(_)
2587 | DataEncoding::Regex { .. },
2588 ) => true,
2589 None => false,
2590 };
2591
2592 if is_composite {
2593 Ok(KeyEnvelope::Flattened)
2594 } else {
2595 Ok(KeyEnvelope::Named("key".to_string()))
2596 }
2597}
2598
2599pub fn describe_create_view(
2600 _: &StatementContext,
2601 _: CreateViewStatement<Aug>,
2602) -> Result<StatementDesc, PlanError> {
2603 Ok(StatementDesc::new(None))
2604}
2605
2606pub fn plan_view(
2607 scx: &StatementContext,
2608 def: &mut ViewDefinition<Aug>,
2609 temporary: bool,
2610) -> Result<(QualifiedItemName, View), PlanError> {
2611 let create_sql = normalize::create_statement(
2612 scx,
2613 Statement::CreateView(CreateViewStatement {
2614 if_exists: IfExistsBehavior::Error,
2615 temporary,
2616 definition: def.clone(),
2617 }),
2618 )?;
2619
2620 let ViewDefinition {
2621 name,
2622 columns,
2623 query,
2624 } = def;
2625
2626 let query::PlannedRootQuery {
2627 expr,
2628 mut desc,
2629 finishing,
2630 scope: _,
2631 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2632 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2638 &finishing,
2639 expr.arity()
2640 ));
2641 if expr.contains_parameters()? {
2642 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2643 }
2644
2645 let dependencies = expr
2646 .depends_on()
2647 .into_iter()
2648 .map(|gid| scx.catalog.resolve_item_id(&gid))
2649 .collect();
2650
2651 let name = if temporary {
2652 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2653 } else {
2654 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2655 };
2656
2657 plan_utils::maybe_rename_columns(
2658 format!("view {}", scx.catalog.resolve_full_name(&name)),
2659 &mut desc,
2660 columns,
2661 )?;
2662 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2663
2664 if let Some(dup) = names.iter().duplicates().next() {
2665 sql_bail!("column {} specified more than once", dup.quoted());
2666 }
2667
2668 let view = View {
2669 create_sql,
2670 expr,
2671 dependencies,
2672 column_names: names,
2673 temporary,
2674 };
2675
2676 Ok((name, view))
2677}
2678
2679pub fn plan_create_view(
2680 scx: &StatementContext,
2681 mut stmt: CreateViewStatement<Aug>,
2682) -> Result<Plan, PlanError> {
2683 let CreateViewStatement {
2684 temporary,
2685 if_exists,
2686 definition,
2687 } = &mut stmt;
2688 let (name, view) = plan_view(scx, definition, *temporary)?;
2689
2690 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2693
2694 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2695 let if_exists = true;
2696 let cascade = false;
2697 let maybe_item_to_drop = plan_drop_item(
2698 scx,
2699 ObjectType::View,
2700 if_exists,
2701 definition.name.clone(),
2702 cascade,
2703 )?;
2704
2705 if let Some(id) = maybe_item_to_drop {
2707 let dependencies = view.expr.depends_on();
2708 let invalid_drop = scx
2709 .get_item(&id)
2710 .global_ids()
2711 .any(|gid| dependencies.contains(&gid));
2712 if invalid_drop {
2713 let item = scx.catalog.get_item(&id);
2714 sql_bail!(
2715 "cannot replace view {0}: depended upon by new {0} definition",
2716 scx.catalog.resolve_full_name(item.name())
2717 );
2718 }
2719
2720 Some(id)
2721 } else {
2722 None
2723 }
2724 } else {
2725 None
2726 };
2727 let drop_ids = replace
2728 .map(|id| {
2729 scx.catalog
2730 .item_dependents(id)
2731 .into_iter()
2732 .map(|id| id.unwrap_item_id())
2733 .collect()
2734 })
2735 .unwrap_or_default();
2736
2737 validate_view_dependencies(scx, &view.dependencies.0)?;
2738
2739 let full_name = scx.catalog.resolve_full_name(&name);
2741 let partial_name = PartialItemName::from(full_name.clone());
2742 if let (Ok(item), IfExistsBehavior::Error, false) = (
2745 scx.catalog.resolve_item_or_type(&partial_name),
2746 *if_exists,
2747 ignore_if_exists_errors,
2748 ) {
2749 return Err(PlanError::ItemAlreadyExists {
2750 name: full_name.to_string(),
2751 item_type: item.item_type(),
2752 });
2753 }
2754
2755 Ok(Plan::CreateView(CreateViewPlan {
2756 name,
2757 view,
2758 replace,
2759 drop_ids,
2760 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2761 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2762 }))
2763}
2764
2765fn validate_view_dependencies(
2767 scx: &StatementContext,
2768 dependencies: &BTreeSet<CatalogItemId>,
2769) -> Result<(), PlanError> {
2770 for id in dependencies {
2771 let item = scx.catalog.get_item(id);
2772 if item.replacement_target().is_some() {
2773 let name = scx.catalog.minimal_qualification(item.name());
2774 return Err(PlanError::InvalidDependency {
2775 name: name.to_string(),
2776 item_type: format!("replacement {}", item.item_type()),
2777 });
2778 }
2779 }
2780
2781 Ok(())
2782}
2783
2784pub fn describe_create_materialized_view(
2785 _: &StatementContext,
2786 _: CreateMaterializedViewStatement<Aug>,
2787) -> Result<StatementDesc, PlanError> {
2788 Ok(StatementDesc::new(None))
2789}
2790
2791pub fn describe_create_network_policy(
2792 _: &StatementContext,
2793 _: CreateNetworkPolicyStatement<Aug>,
2794) -> Result<StatementDesc, PlanError> {
2795 Ok(StatementDesc::new(None))
2796}
2797
2798pub fn describe_alter_network_policy(
2799 _: &StatementContext,
2800 _: AlterNetworkPolicyStatement<Aug>,
2801) -> Result<StatementDesc, PlanError> {
2802 Ok(StatementDesc::new(None))
2803}
2804
2805pub fn plan_create_materialized_view(
2806 scx: &StatementContext,
2807 mut stmt: CreateMaterializedViewStatement<Aug>,
2808) -> Result<Plan, PlanError> {
2809 let cluster_id =
2810 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2811 stmt.in_cluster = Some(ResolvedClusterName {
2812 id: cluster_id,
2813 print_name: None,
2814 });
2815
2816 let target_replica = match &stmt.in_cluster_replica {
2817 Some(replica_name) => {
2818 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2819
2820 let cluster = scx.catalog.get_cluster(cluster_id);
2821 let replica_id = cluster
2822 .replica_ids()
2823 .get(replica_name.as_str())
2824 .copied()
2825 .ok_or_else(|| {
2826 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2827 })?;
2828 Some(replica_id)
2829 }
2830 None => None,
2831 };
2832
2833 let create_sql =
2834 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2835
2836 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2837 let name = scx.allocate_qualified_name(partial_name.clone())?;
2838
2839 let query::PlannedRootQuery {
2840 expr,
2841 mut desc,
2842 finishing,
2843 scope: _,
2844 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2845 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2847 &finishing,
2848 expr.arity()
2849 ));
2850 if expr.contains_parameters()? {
2851 return Err(PlanError::ParameterNotAllowed(
2852 "materialized views".to_string(),
2853 ));
2854 }
2855
2856 plan_utils::maybe_rename_columns(
2857 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2858 &mut desc,
2859 &stmt.columns,
2860 )?;
2861 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2862
2863 let MaterializedViewOptionExtracted {
2864 assert_not_null,
2865 partition_by,
2866 retain_history,
2867 refresh,
2868 seen: _,
2869 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2870
2871 if let Some(partition_by) = partition_by {
2872 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2873 check_partition_by(&desc, partition_by)?;
2874 }
2875
2876 let refresh_schedule = {
2877 let mut refresh_schedule = RefreshSchedule::default();
2878 let mut on_commits_seen = 0;
2879 for refresh_option_value in refresh {
2880 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2881 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2882 }
2883 match refresh_option_value {
2884 RefreshOptionValue::OnCommit => {
2885 on_commits_seen += 1;
2886 }
2887 RefreshOptionValue::AtCreation => {
2888 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2889 bail_internal!("REFRESH AT CREATION should have been purified away")
2890 }
2891 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2892 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2894 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2895 name: "REFRESH AT",
2896 scope: &Scope::empty(),
2897 relation_type: &SqlRelationType::empty(),
2898 allow_aggregates: false,
2899 allow_subqueries: false,
2900 allow_parameters: false,
2901 allow_windows: false,
2902 };
2903 let hir = plan_expr(ecx, &time)?.cast_to(
2904 ecx,
2905 CastContext::Assignment,
2906 &SqlScalarType::MzTimestamp,
2907 )?;
2908 let timestamp = hir
2910 .into_literal_mz_timestamp()
2911 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2912 refresh_schedule.ats.push(timestamp);
2913 }
2914 RefreshOptionValue::Every(RefreshEveryOptionValue {
2915 interval,
2916 aligned_to,
2917 }) => {
2918 let interval = Interval::try_from_value(Value::Interval(interval))?;
2919 if interval.as_microseconds() <= 0 {
2920 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2921 }
2922 if interval.months != 0 {
2923 sql_bail!("REFRESH interval must not involve units larger than days");
2928 }
2929 let interval = interval.duration()?;
2930 if u64::try_from(interval.as_millis()).is_err()
2935 || Interval::from_duration(&interval).is_err()
2936 {
2937 sql_bail!("REFRESH interval too large");
2938 }
2939 if interval.as_micros() < 1000 {
2940 sql_bail!("REFRESH interval must be at least 1 ms")
2941 }
2942
2943 let mut aligned_to = match aligned_to {
2944 Some(aligned_to) => aligned_to,
2945 None => {
2946 soft_panic_or_log!(
2947 "ALIGNED TO should have been filled in by purification"
2948 );
2949 sql_bail!(
2950 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2951 )
2952 }
2953 };
2954
2955 transform_ast::transform(scx, &mut aligned_to)?;
2957
2958 let ecx = &ExprContext {
2959 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2960 name: "REFRESH EVERY ... ALIGNED TO",
2961 scope: &Scope::empty(),
2962 relation_type: &SqlRelationType::empty(),
2963 allow_aggregates: false,
2964 allow_subqueries: false,
2965 allow_parameters: false,
2966 allow_windows: false,
2967 };
2968 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2969 ecx,
2970 CastContext::Assignment,
2971 &SqlScalarType::MzTimestamp,
2972 )?;
2973 let aligned_to_const = aligned_to_hir
2975 .into_literal_mz_timestamp()
2976 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2977
2978 refresh_schedule.everies.push(RefreshEvery {
2979 interval,
2980 aligned_to: aligned_to_const,
2981 });
2982 }
2983 }
2984 }
2985
2986 if on_commits_seen > 1 {
2987 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2988 }
2989 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2990 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2991 }
2992
2993 if refresh_schedule == RefreshSchedule::default() {
2994 None
2995 } else {
2996 Some(refresh_schedule)
2997 }
2998 };
2999
3000 let as_of = stmt.as_of.map(Timestamp::from);
3001 let compaction_window = plan_retain_history_option(scx, retain_history)?;
3002 let mut non_null_assertions = assert_not_null
3003 .into_iter()
3004 .map(normalize::column_name)
3005 .map(|assertion_name| {
3006 column_names
3007 .iter()
3008 .position(|col| col == &assertion_name)
3009 .ok_or_else(|| {
3010 sql_err!(
3011 "column {} in ASSERT NOT NULL option not found",
3012 assertion_name.quoted()
3013 )
3014 })
3015 })
3016 .collect::<Result<Vec<_>, _>>()?;
3017 non_null_assertions.sort();
3018 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
3019 let dup = &column_names[*dup];
3020 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
3021 }
3022
3023 if let Some(dup) = column_names.iter().duplicates().next() {
3024 sql_bail!("column {} specified more than once", dup.quoted());
3025 }
3026
3027 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
3030 Ok(true) => IfExistsBehavior::Skip,
3031 _ => stmt.if_exists,
3032 };
3033
3034 let mut replace = None;
3035 let mut if_not_exists = false;
3036 match if_exists {
3037 IfExistsBehavior::Replace => {
3038 let if_exists = true;
3039 let cascade = false;
3040 let replace_id = plan_drop_item(
3041 scx,
3042 ObjectType::MaterializedView,
3043 if_exists,
3044 partial_name.clone().into(),
3045 cascade,
3046 )?;
3047
3048 if let Some(id) = replace_id {
3050 let dependencies = expr.depends_on();
3051 let invalid_drop = scx
3052 .get_item(&id)
3053 .global_ids()
3054 .any(|gid| dependencies.contains(&gid));
3055 if invalid_drop {
3056 let item = scx.catalog.get_item(&id);
3057 sql_bail!(
3058 "cannot replace materialized view {0}: depended upon by new {0} definition",
3059 scx.catalog.resolve_full_name(item.name())
3060 );
3061 }
3062 replace = Some(id);
3063 }
3064 }
3065 IfExistsBehavior::Skip => if_not_exists = true,
3066 IfExistsBehavior::Error => (),
3067 }
3068 let drop_ids = replace
3069 .map(|id| {
3070 scx.catalog
3071 .item_dependents(id)
3072 .into_iter()
3073 .map(|id| id.unwrap_item_id())
3074 .collect()
3075 })
3076 .unwrap_or_default();
3077 let mut dependencies: BTreeSet<_> = expr
3078 .depends_on()
3079 .into_iter()
3080 .map(|gid| scx.catalog.resolve_item_id(&gid))
3081 .collect();
3082
3083 let mut replacement_target = None;
3085 if let Some(target_name) = &stmt.replacement_for {
3086 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3087
3088 let target = scx.get_item_by_resolved_name(target_name)?;
3089 if target.item_type() != CatalogItemType::MaterializedView {
3090 return Err(PlanError::InvalidReplacement {
3091 item_type: target.item_type(),
3092 item_name: scx.catalog.minimal_qualification(target.name()),
3093 replacement_type: CatalogItemType::MaterializedView,
3094 replacement_name: partial_name,
3095 });
3096 }
3097 if target.id().is_system() {
3098 sql_bail!(
3099 "cannot replace {} because it is required by the database system",
3100 scx.catalog.minimal_qualification(target.name()),
3101 );
3102 }
3103
3104 for dependent in scx.catalog.item_dependents(target.id()) {
3106 if let ObjectId::Item(id) = dependent
3107 && dependencies.contains(&id)
3108 {
3109 sql_bail!(
3110 "replacement would cause {} to depend on itself",
3111 scx.catalog.minimal_qualification(target.name()),
3112 );
3113 }
3114 }
3115
3116 dependencies.insert(target.id());
3117
3118 for use_id in target.used_by() {
3119 let use_item = scx.get_item(use_id);
3120 if use_item.replacement_target() == Some(target.id()) {
3121 sql_bail!(
3122 "cannot replace {} because it already has a replacement: {}",
3123 scx.catalog.minimal_qualification(target.name()),
3124 scx.catalog.minimal_qualification(use_item.name()),
3125 );
3126 }
3127 }
3128
3129 replacement_target = Some(target.id());
3130 }
3131
3132 validate_view_dependencies(scx, &dependencies)?;
3133
3134 let full_name = scx.catalog.resolve_full_name(&name);
3136 let partial_name = PartialItemName::from(full_name.clone());
3137 if let (IfExistsBehavior::Error, Ok(item)) =
3140 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3141 {
3142 return Err(PlanError::ItemAlreadyExists {
3143 name: full_name.to_string(),
3144 item_type: item.item_type(),
3145 });
3146 }
3147
3148 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3149 name,
3150 materialized_view: MaterializedView {
3151 create_sql,
3152 expr,
3153 dependencies: DependencyIds(dependencies),
3154 column_names,
3155 replacement_target,
3156 cluster_id,
3157 target_replica,
3158 non_null_assertions,
3159 compaction_window,
3160 refresh_schedule,
3161 as_of,
3162 },
3163 replace,
3164 drop_ids,
3165 if_not_exists,
3166 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3167 }))
3168}
3169
3170generate_extracted_config!(
3171 MaterializedViewOption,
3172 (AssertNotNull, Ident, AllowMultiple),
3173 (PartitionBy, Vec<Ident>),
3174 (RetainHistory, OptionalDuration),
3175 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3176);
3177
3178pub fn describe_create_sink(
3179 _: &StatementContext,
3180 _: CreateSinkStatement<Aug>,
3181) -> Result<StatementDesc, PlanError> {
3182 Ok(StatementDesc::new(None))
3183}
3184
3185generate_extracted_config!(
3186 CreateSinkOption,
3187 (Snapshot, bool),
3188 (PartitionStrategy, String),
3189 (Version, u64),
3190 (CommitInterval, Duration)
3191);
3192
3193pub fn plan_create_sink(
3194 scx: &StatementContext,
3195 stmt: CreateSinkStatement<Aug>,
3196) -> Result<Plan, PlanError> {
3197 let Some(name) = stmt.name.clone() else {
3199 return Err(PlanError::MissingName(CatalogItemType::Sink));
3200 };
3201 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3202 let full_name = scx.catalog.resolve_full_name(&name);
3203 let partial_name = PartialItemName::from(full_name.clone());
3204 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3205 return Err(PlanError::ItemAlreadyExists {
3206 name: full_name.to_string(),
3207 item_type: item.item_type(),
3208 });
3209 }
3210
3211 plan_sink(scx, stmt)
3212}
3213
3214fn plan_sink(
3219 scx: &StatementContext,
3220 mut stmt: CreateSinkStatement<Aug>,
3221) -> Result<Plan, PlanError> {
3222 let CreateSinkStatement {
3223 name,
3224 in_cluster: _,
3225 from,
3226 connection,
3227 format,
3228 envelope,
3229 mode,
3230 if_not_exists,
3231 with_options,
3232 } = stmt.clone();
3233
3234 let Some(name) = name else {
3235 return Err(PlanError::MissingName(CatalogItemType::Sink));
3236 };
3237 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3238
3239 let envelope = match (&connection, envelope, mode) {
3240 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3242 SinkEnvelope::Upsert
3243 }
3244 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3245 SinkEnvelope::Debezium
3246 }
3247 (CreateSinkConnection::Kafka { .. }, None, None) => {
3248 sql_bail!("ENVELOPE clause is required")
3249 }
3250 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3251 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3252 }
3253 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3255 SinkEnvelope::Upsert
3256 }
3257 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3258 SinkEnvelope::Append
3259 }
3260 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3261 sql_bail!("MODE clause is required")
3262 }
3263 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3264 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3265 }
3266 };
3267
3268 let from_name = &from;
3269 let from = scx.get_item_by_resolved_name(&from)?;
3270
3271 {
3272 use CatalogItemType::*;
3273 match from.item_type() {
3274 Table | Source | MaterializedView => {
3275 if from.replacement_target().is_some() {
3276 let name = scx.catalog.minimal_qualification(from.name());
3277 return Err(PlanError::InvalidSinkFrom {
3278 name: name.to_string(),
3279 item_type: format!("replacement {}", from.item_type()),
3280 });
3281 }
3282 }
3283 Sink | View | Index | Type | Func | Secret | Connection => {
3284 let name = scx.catalog.minimal_qualification(from.name());
3285 return Err(PlanError::InvalidSinkFrom {
3286 name: name.to_string(),
3287 item_type: from.item_type().to_string(),
3288 });
3289 }
3290 }
3291 }
3292
3293 if from.id().is_system() {
3294 bail_unsupported!("creating a sink directly on a catalog object");
3295 }
3296
3297 let desc = from
3298 .relation_desc()
3299 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3300 let key_indices = match &connection {
3301 CreateSinkConnection::Kafka { key: Some(key), .. }
3302 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3303 let key_columns = key
3304 .key_columns
3305 .clone()
3306 .into_iter()
3307 .map(normalize::column_name)
3308 .collect::<Vec<_>>();
3309 let mut uniq = BTreeSet::new();
3310 for col in key_columns.iter() {
3311 if !uniq.insert(col) {
3312 sql_bail!("duplicate column referenced in KEY: {}", col);
3313 }
3314 }
3315 let indices = key_columns
3316 .iter()
3317 .map(|col| {
3318 let name_idx =
3319 desc.get_by_name(col)
3320 .map(|(idx, _type)| idx)
3321 .ok_or_else(|| {
3322 sql_err!("column referenced in KEY does not exist: {}", col)
3323 })?;
3324 if desc.get_unambiguous_name(name_idx).is_none() {
3325 sql_bail!("column referenced in KEY is ambiguous: {}", col);
3326 }
3327 Ok(name_idx)
3328 })
3329 .collect::<Result<Vec<_>, _>>()?;
3330
3331 if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3334 let cols: Vec<_> = desc.iter().collect();
3335 for &idx in &indices {
3336 let (col_name, col_type) = cols[idx];
3337 let scalar = &col_type.scalar_type;
3338 let is_valid = matches!(
3339 scalar,
3340 SqlScalarType::Bool
3342 | SqlScalarType::Int16
3343 | SqlScalarType::Int32
3344 | SqlScalarType::Int64
3345 | SqlScalarType::UInt16
3346 | SqlScalarType::UInt32
3347 | SqlScalarType::UInt64
3348 | SqlScalarType::Numeric { .. }
3350 | SqlScalarType::Date
3352 | SqlScalarType::Time
3353 | SqlScalarType::Timestamp { .. }
3354 | SqlScalarType::TimestampTz { .. }
3355 | SqlScalarType::Interval
3356 | SqlScalarType::MzTimestamp
3357 | SqlScalarType::String
3359 | SqlScalarType::Char { .. }
3360 | SqlScalarType::VarChar { .. }
3361 | SqlScalarType::PgLegacyChar
3362 | SqlScalarType::PgLegacyName
3363 | SqlScalarType::Bytes
3364 | SqlScalarType::Jsonb
3365 | SqlScalarType::Uuid
3367 | SqlScalarType::Oid
3368 | SqlScalarType::RegProc
3369 | SqlScalarType::RegType
3370 | SqlScalarType::RegClass
3371 | SqlScalarType::MzAclItem
3372 | SqlScalarType::AclItem
3373 | SqlScalarType::Int2Vector
3374 );
3375 if !is_valid {
3376 return Err(PlanError::IcebergSinkUnsupportedKeyType {
3377 column: col_name.to_string(),
3378 column_type: format!("{:?}", scalar),
3379 });
3380 }
3381 }
3382 }
3383
3384 let is_valid_key = desc
3385 .typ()
3386 .keys
3387 .iter()
3388 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3389
3390 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3391 if key.not_enforced {
3392 scx.catalog
3393 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3394 key: key_columns.clone(),
3395 name: name.item.clone(),
3396 })
3397 } else {
3398 return Err(PlanError::UpsertSinkWithInvalidKey {
3399 name: from_name.full_name_str(),
3400 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3401 valid_keys: desc
3402 .typ()
3403 .keys
3404 .iter()
3405 .map(|key| {
3406 key.iter()
3407 .map(|col| desc.get_name(*col).as_str().into())
3408 .collect()
3409 })
3410 .collect(),
3411 });
3412 }
3413 }
3414 Some(indices)
3415 }
3416 CreateSinkConnection::Kafka { key: None, .. }
3417 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3418 };
3419
3420 if key_indices.is_some() && envelope == SinkEnvelope::Append {
3421 sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3422 }
3423
3424 if envelope == SinkEnvelope::Append {
3426 if let CreateSinkConnection::Iceberg { .. } = &connection {
3427 use mz_storage_types::sinks::{
3428 ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3429 };
3430 for (col_name, _) in desc.iter() {
3431 if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3432 || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3433 {
3434 sql_bail!(
3435 "column {} conflicts with the system column that MODE APPEND \
3436 adds to the Iceberg table",
3437 col_name.quoted()
3438 );
3439 }
3440 }
3441 }
3442 }
3443
3444 let headers_index = match &connection {
3445 CreateSinkConnection::Kafka {
3446 headers: Some(headers),
3447 ..
3448 } => {
3449 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3450
3451 match envelope {
3452 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3453 SinkEnvelope::Debezium => {
3454 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3455 }
3456 };
3457
3458 let headers = normalize::column_name(headers.clone());
3459 let (idx, ty) = desc
3460 .get_by_name(&headers)
3461 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3462
3463 if desc.get_unambiguous_name(idx).is_none() {
3464 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3465 }
3466
3467 match &ty.scalar_type {
3468 SqlScalarType::Map { value_type, .. }
3469 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3470 _ => sql_bail!(
3471 "HEADERS column must have type map[text => text] or map[text => bytea]"
3472 ),
3473 }
3474
3475 Some(idx)
3476 }
3477 _ => None,
3478 };
3479
3480 let relation_key_indices = desc.typ().keys.get(0).cloned();
3482
3483 let key_desc_and_indices = key_indices.map(|key_indices| {
3484 let cols = desc
3485 .iter()
3486 .map(|(name, ty)| (name.clone(), ty.clone()))
3487 .collect::<Vec<_>>();
3488 let (names, types): (Vec<_>, Vec<_>) =
3489 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3490 let typ = SqlRelationType::new(types);
3491 (RelationDesc::new(typ, names), key_indices)
3492 });
3493
3494 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3495 return Err(PlanError::UpsertSinkWithoutKey);
3496 }
3497
3498 let CreateSinkOptionExtracted {
3499 snapshot,
3500 version,
3501 partition_strategy: _,
3502 seen: _,
3503 commit_interval,
3504 } = with_options.try_into()?;
3505
3506 let connection_builder = match connection {
3507 CreateSinkConnection::Kafka {
3508 connection,
3509 options,
3510 ..
3511 } => kafka_sink_builder(
3512 scx,
3513 connection,
3514 options,
3515 format,
3516 relation_key_indices,
3517 key_desc_and_indices,
3518 headers_index,
3519 desc.into_owned(),
3520 envelope,
3521 from.id(),
3522 commit_interval,
3523 )?,
3524 CreateSinkConnection::Iceberg {
3525 catalog_connection,
3526 aws_connection,
3527 options,
3528 ..
3529 } => iceberg_sink_builder(
3530 scx,
3531 catalog_connection,
3532 aws_connection,
3533 options,
3534 relation_key_indices,
3535 key_desc_and_indices,
3536 commit_interval,
3537 &desc,
3538 )?,
3539 };
3540
3541 let with_snapshot = snapshot.unwrap_or(true);
3543 let version = version.unwrap_or(0);
3545
3546 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3550 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3551
3552 Ok(Plan::CreateSink(CreateSinkPlan {
3553 name,
3554 sink: Sink {
3555 create_sql,
3556 from: from.global_id(),
3557 connection: connection_builder,
3558 envelope,
3559 version,
3560 commit_interval,
3561 },
3562 with_snapshot,
3563 if_not_exists,
3564 in_cluster: in_cluster.id(),
3565 }))
3566}
3567
3568fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3569 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3570
3571 let existing_keys = desc
3572 .typ()
3573 .keys
3574 .iter()
3575 .map(|key_columns| {
3576 key_columns
3577 .iter()
3578 .map(|col| desc.get_name(*col).as_str())
3579 .join(", ")
3580 })
3581 .join(", ");
3582
3583 sql_err!(
3584 "Key constraint ({}) conflicts with existing key ({})",
3585 user_keys,
3586 existing_keys
3587 )
3588}
3589
3590#[derive(Debug, Default, PartialEq, Clone)]
3593pub struct CsrConfigOptionExtracted {
3594 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3595 pub(crate) avro_key_fullname: Option<String>,
3596 pub(crate) avro_value_fullname: Option<String>,
3597 pub(crate) null_defaults: bool,
3598 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3599 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3600 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3601 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3602}
3603
3604impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3605 type Error = crate::plan::PlanError;
3606 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3607 let mut extracted = CsrConfigOptionExtracted::default();
3608 let mut common_doc_comments = BTreeMap::new();
3609 for option in v {
3610 if !extracted.seen.insert(option.name.clone()) {
3611 return Err(PlanError::Unstructured({
3612 format!("{} specified more than once", option.name)
3613 }));
3614 }
3615 let option_name = option.name.clone();
3616 let option_name_str = option_name.to_ast_string_simple();
3617 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3618 option_name: option_name.to_ast_string_simple(),
3619 err: e.into(),
3620 };
3621 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3622 val.map(|s| match s {
3623 WithOptionValue::Value(Value::String(s)) => {
3624 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3625 }
3626 _ => Err("must be a string".to_string()),
3627 })
3628 .transpose()
3629 .map_err(PlanError::Unstructured)
3630 .map_err(better_error)
3631 };
3632 match option.name {
3633 CsrConfigOptionName::AvroKeyFullname => {
3634 extracted.avro_key_fullname =
3635 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3636 }
3637 CsrConfigOptionName::AvroValueFullname => {
3638 extracted.avro_value_fullname =
3639 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3640 }
3641 CsrConfigOptionName::NullDefaults => {
3642 extracted.null_defaults =
3643 <bool>::try_from_value(option.value).map_err(better_error)?;
3644 }
3645 CsrConfigOptionName::AvroDocOn(doc_on) => {
3646 let value = String::try_from_value(option.value.ok_or_else(|| {
3647 PlanError::InvalidOptionValue {
3648 option_name: option_name_str,
3649 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3650 }
3651 })?)
3652 .map_err(better_error)?;
3653 let key = match doc_on.identifier {
3654 DocOnIdentifier::Column(ast::ColumnName {
3655 relation: ResolvedItemName::Item { id, .. },
3656 column: ResolvedColumnReference::Column { name, index: _ },
3657 }) => DocTarget::Field {
3658 object_id: id,
3659 column_name: name,
3660 },
3661 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3662 DocTarget::Type(id)
3663 }
3664 _ => sql_bail!("invalid DOC ON identifier"),
3665 };
3666
3667 match doc_on.for_schema {
3668 DocOnSchema::KeyOnly => {
3669 extracted.key_doc_options.insert(key, value);
3670 }
3671 DocOnSchema::ValueOnly => {
3672 extracted.value_doc_options.insert(key, value);
3673 }
3674 DocOnSchema::All => {
3675 common_doc_comments.insert(key, value);
3676 }
3677 }
3678 }
3679 CsrConfigOptionName::KeyCompatibilityLevel => {
3680 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3681 }
3682 CsrConfigOptionName::ValueCompatibilityLevel => {
3683 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3684 }
3685 }
3686 }
3687
3688 for (key, value) in common_doc_comments {
3689 if !extracted.key_doc_options.contains_key(&key) {
3690 extracted.key_doc_options.insert(key.clone(), value.clone());
3691 }
3692 if !extracted.value_doc_options.contains_key(&key) {
3693 extracted.value_doc_options.insert(key, value);
3694 }
3695 }
3696 Ok(extracted)
3697 }
3698}
3699
3700fn iceberg_sink_builder(
3701 scx: &StatementContext,
3702 catalog_connection: ResolvedItemName,
3703 storage_connection: Option<ResolvedItemName>,
3704 options: Vec<IcebergSinkConfigOption<Aug>>,
3705 relation_key_indices: Option<Vec<usize>>,
3706 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3707 commit_interval: Option<Duration>,
3708 desc: &RelationDesc,
3709) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3710 ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3714 .map_err(|e| sql_err!("{}", e))?;
3715
3716 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3717 let catalog_connection_id = catalog_connection_item.id();
3718 if !matches!(
3719 catalog_connection_item.connection()?,
3720 Connection::IcebergCatalog(_)
3721 ) {
3722 sql_bail!(
3723 "{} is not an iceberg catalog connection",
3724 scx.catalog
3725 .resolve_full_name(catalog_connection_item.name())
3726 .to_string()
3727 .quoted()
3728 );
3729 };
3730
3731 let storage_connection_item = storage_connection
3732 .map(|c| scx.get_item_by_resolved_name(&c))
3733 .transpose()?;
3734 let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id());
3735 if let Some(c) = &storage_connection_item
3736 && !matches!(c.connection()?, Connection::Aws(_))
3737 {
3738 sql_bail!(
3739 "{} is not an AWS connection",
3740 scx.catalog.resolve_full_name(c.name()).to_string().quoted()
3741 );
3742 }
3743
3744 let IcebergSinkConfigOptionExtracted {
3745 table,
3746 namespace,
3747 seen: _,
3748 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3749
3750 let Some(table) = table else {
3751 sql_bail!("Iceberg sink must specify TABLE");
3752 };
3753 let Some(namespace) = namespace else {
3754 sql_bail!("Iceberg sink must specify NAMESPACE");
3755 };
3756 if commit_interval.is_none() {
3757 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3758 }
3759
3760 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3761 catalog_connection_id,
3762 catalog_connection: catalog_connection_id,
3763 storage_connection_id,
3764 storage_connection: storage_connection_id,
3765 table,
3766 namespace,
3767 relation_key_indices,
3768 key_desc_and_indices,
3769 }))
3770}
3771
3772fn kafka_sink_builder(
3773 scx: &StatementContext,
3774 connection: ResolvedItemName,
3775 options: Vec<KafkaSinkConfigOption<Aug>>,
3776 format: Option<FormatSpecifier<Aug>>,
3777 relation_key_indices: Option<Vec<usize>>,
3778 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3779 headers_index: Option<usize>,
3780 value_desc: RelationDesc,
3781 envelope: SinkEnvelope,
3782 sink_from: CatalogItemId,
3783 commit_interval: Option<Duration>,
3784) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3785 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3787 let connection_id = connection_item.id();
3788 match connection_item.connection()? {
3789 Connection::Kafka(_) => (),
3790 _ => sql_bail!(
3791 "{} is not a kafka connection",
3792 scx.catalog.resolve_full_name(connection_item.name())
3793 ),
3794 };
3795
3796 if commit_interval.is_some() {
3797 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3798 }
3799
3800 let KafkaSinkConfigOptionExtracted {
3801 topic,
3802 compression_type,
3803 partition_by,
3804 progress_group_id_prefix,
3805 transactional_id_prefix,
3806 legacy_ids,
3807 topic_config,
3808 mut topic_metadata_refresh_interval,
3809 topic_partition_count,
3810 topic_replication_factor,
3811 seen: _,
3812 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3813
3814 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3815 (Some(_), Some(true)) => {
3816 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3817 }
3818 (None, Some(true)) => KafkaIdStyle::Legacy,
3819 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3820 };
3821
3822 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3823 (Some(_), Some(true)) => {
3824 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3825 }
3826 (None, Some(true)) => KafkaIdStyle::Legacy,
3827 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3828 };
3829
3830 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3831
3832 if topic_metadata_refresh_interval > MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3833 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3836 } else if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3837 topic_metadata_refresh_interval = MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL;
3840 }
3841
3842 let assert_positive = |val: Option<i32>, name: &str| {
3843 if let Some(val) = val {
3844 if val <= 0 {
3845 sql_bail!("{} must be a positive integer", name);
3846 }
3847 }
3848 val.map(NonNeg::try_from)
3849 .transpose()
3850 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3851 };
3852 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3853 let topic_replication_factor =
3854 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3855
3856 let gen_avro_schema_options = |conn| {
3859 let CsrConnectionAvro {
3860 connection:
3861 CsrConnection {
3862 connection,
3863 options,
3864 },
3865 seed,
3866 key_strategy,
3867 value_strategy,
3868 } = conn;
3869 if seed.is_some() {
3870 sql_bail!("SEED option does not make sense with sinks");
3871 }
3872 if key_strategy.is_some() {
3873 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3874 }
3875 if value_strategy.is_some() {
3876 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3877 }
3878
3879 let item = scx.get_item_by_resolved_name(&connection)?;
3880 let csr_connection = match item.connection()? {
3881 Connection::Csr(_) => item.id(),
3882 _ => {
3883 sql_bail!(
3884 "{} is not a schema registry connection",
3885 scx.catalog
3886 .resolve_full_name(item.name())
3887 .to_string()
3888 .quoted()
3889 )
3890 }
3891 };
3892 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3893
3894 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3895 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3896 }
3897
3898 if key_desc_and_indices.is_some()
3899 && (extracted_options.avro_key_fullname.is_some()
3900 ^ extracted_options.avro_value_fullname.is_some())
3901 {
3902 sql_bail!(
3903 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3904 );
3905 }
3906
3907 Ok((csr_connection, extracted_options))
3908 };
3909
3910 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3911 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3912 Format::Bytes if desc.arity() == 1 => {
3913 let col_type = &desc.typ().column_types[0].scalar_type;
3914 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3915 bail_unsupported!(format!(
3916 "BYTES format with non-encodable type: {:?}",
3917 col_type
3918 ));
3919 }
3920
3921 Ok(KafkaSinkFormatType::Bytes)
3922 }
3923 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3924 Format::Bytes | Format::Text => {
3925 bail_unsupported!("BYTES or TEXT format with multiple columns")
3926 }
3927 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3928 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3929 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3930 let schema = if is_key {
3931 AvroSchemaGenerator::new(
3932 desc.clone(),
3933 false,
3934 options.key_doc_options,
3935 options.avro_key_fullname.as_deref().unwrap_or("row"),
3936 options.null_defaults,
3937 Some(sink_from),
3938 false,
3939 )?
3940 .schema()
3941 .to_string()
3942 } else {
3943 AvroSchemaGenerator::new(
3944 desc.clone(),
3945 matches!(envelope, SinkEnvelope::Debezium),
3946 options.value_doc_options,
3947 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3948 options.null_defaults,
3949 Some(sink_from),
3950 true,
3951 )?
3952 .schema()
3953 .to_string()
3954 };
3955 Ok(KafkaSinkFormatType::Avro {
3956 schema,
3957 compatibility_level: if is_key {
3958 options.key_compatibility_level
3959 } else {
3960 options.value_compatibility_level
3961 },
3962 wire_format: WireFormat::Confluent {
3963 registry: Some(csr_connection),
3964 },
3965 })
3966 }
3967 format => bail_unsupported!(format!("sink format {:?}", format)),
3968 };
3969
3970 let partition_by = match &partition_by {
3971 Some(partition_by) => {
3972 let mut scope = Scope::from_source(None, value_desc.iter_names());
3973
3974 match envelope {
3975 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3976 SinkEnvelope::Debezium => {
3977 let key_indices: HashSet<_> = key_desc_and_indices
3978 .as_ref()
3979 .map(|(_desc, indices)| indices.as_slice())
3980 .unwrap_or_default()
3981 .into_iter()
3982 .collect();
3983 for (i, item) in scope.items.iter_mut().enumerate() {
3984 if !key_indices.contains(&i) {
3985 item.error_if_referenced = Some(|_table, column| {
3986 PlanError::InvalidPartitionByEnvelopeDebezium {
3987 column_name: column.to_string(),
3988 }
3989 });
3990 }
3991 }
3992 }
3993 };
3994
3995 let ecx = &ExprContext {
3996 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3997 name: "PARTITION BY",
3998 scope: &scope,
3999 relation_type: value_desc.typ(),
4000 allow_aggregates: false,
4001 allow_subqueries: false,
4002 allow_parameters: false,
4003 allow_windows: false,
4004 };
4005 let expr = plan_expr(ecx, partition_by)?.cast_to(
4006 ecx,
4007 CastContext::Assignment,
4008 &SqlScalarType::UInt64,
4009 )?;
4010 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4011
4012 Some(expr)
4013 }
4014 _ => None,
4015 };
4016
4017 let format = match format {
4019 Some(FormatSpecifier::KeyValue { key, value }) => {
4020 let key_format = match key_desc_and_indices.as_ref() {
4021 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4022 None => None,
4023 };
4024 KafkaSinkFormat {
4025 value_format: map_format(value, &value_desc, false)?,
4026 key_format,
4027 }
4028 }
4029 Some(FormatSpecifier::Bare(format)) => {
4030 let key_format = match key_desc_and_indices.as_ref() {
4031 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4032 None => None,
4033 };
4034 KafkaSinkFormat {
4035 value_format: map_format(format, &value_desc, false)?,
4036 key_format,
4037 }
4038 }
4039 None => bail_unsupported!("sink without format"),
4040 };
4041
4042 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4043 connection_id,
4044 connection: connection_id,
4045 format,
4046 topic: topic_name,
4047 relation_key_indices,
4048 key_desc_and_indices,
4049 headers_index,
4050 value_desc,
4051 partition_by,
4052 compression_type,
4053 progress_group_id,
4054 transactional_id,
4055 topic_options: KafkaTopicOptions {
4056 partition_count: topic_partition_count,
4057 replication_factor: topic_replication_factor,
4058 topic_config: topic_config.unwrap_or_default(),
4059 },
4060 topic_metadata_refresh_interval,
4061 }))
4062}
4063
4064pub fn describe_create_index(
4065 _: &StatementContext,
4066 _: CreateIndexStatement<Aug>,
4067) -> Result<StatementDesc, PlanError> {
4068 Ok(StatementDesc::new(None))
4069}
4070
4071pub fn plan_create_index(
4072 scx: &StatementContext,
4073 mut stmt: CreateIndexStatement<Aug>,
4074) -> Result<Plan, PlanError> {
4075 let CreateIndexStatement {
4076 name,
4077 on_name,
4078 in_cluster,
4079 key_parts,
4080 with_options,
4081 if_not_exists,
4082 } = &mut stmt;
4083 let on = scx.get_item_by_resolved_name(on_name)?;
4084
4085 {
4086 use CatalogItemType::*;
4087 match on.item_type() {
4088 Table | Source | View | MaterializedView => {
4089 if on.replacement_target().is_some() {
4090 sql_bail!(
4091 "index cannot be created on {} because it is a replacement {}",
4092 on_name.full_name_str(),
4093 on.item_type(),
4094 );
4095 }
4096 }
4097 Sink | Index | Type | Func | Secret | Connection => {
4098 sql_bail!(
4099 "index cannot be created on {} because it is a {}",
4100 on_name.full_name_str(),
4101 on.item_type(),
4102 );
4103 }
4104 }
4105 }
4106
4107 let on_desc = on
4108 .relation_desc()
4109 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4110
4111 let filled_key_parts = match key_parts {
4112 Some(kp) => kp.to_vec(),
4113 None => {
4114 let mut name_counts = BTreeMap::new();
4119 for name in on_desc.iter_names() {
4120 *name_counts.entry(name).or_insert(0usize) += 1;
4121 }
4122 let key = on_desc.typ().default_key();
4123 key.iter()
4124 .map(|i| {
4125 let name = on_desc.get_name(*i);
4126 if name_counts.get(name).copied() == Some(1) {
4127 Expr::Identifier(vec![name.clone().into()])
4128 } else {
4129 Expr::Value(Value::Number((i + 1).to_string()))
4130 }
4131 })
4132 .collect()
4133 }
4134 };
4135 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4136
4137 let index_name = if let Some(name) = name {
4138 QualifiedItemName {
4139 qualifiers: on.name().qualifiers.clone(),
4140 item: normalize::ident(name.clone()),
4141 }
4142 } else {
4143 let mut idx_name = QualifiedItemName {
4144 qualifiers: on.name().qualifiers.clone(),
4145 item: on.name().item.clone(),
4146 };
4147 if key_parts.is_none() {
4148 idx_name.item += "_primary_idx";
4150 } else {
4151 let index_name_col_suffix = keys
4154 .iter()
4155 .map(|k| match k {
4156 mz_expr::MirScalarExpr::Column(i, name) => {
4157 match (on_desc.get_unambiguous_name(*i), &name.0) {
4158 (Some(col_name), _) => col_name.to_string(),
4159 (None, Some(name)) => name.to_string(),
4160 (None, None) => format!("{}", i + 1),
4161 }
4162 }
4163 _ => "expr".to_string(),
4164 })
4165 .join("_");
4166 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4167 .expect("write on strings cannot fail");
4168 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4169 }
4170
4171 if !*if_not_exists {
4172 scx.catalog.find_available_name(idx_name)
4173 } else {
4174 idx_name
4175 }
4176 };
4177
4178 let full_name = scx.catalog.resolve_full_name(&index_name);
4180 let partial_name = PartialItemName::from(full_name.clone());
4181 if let (Ok(item), false, false) = (
4189 scx.catalog.resolve_item_or_type(&partial_name),
4190 *if_not_exists,
4191 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4192 ) {
4193 return Err(PlanError::ItemAlreadyExists {
4194 name: full_name.to_string(),
4195 item_type: item.item_type(),
4196 });
4197 }
4198
4199 let options = plan_index_options(scx, with_options.clone())?;
4200 let cluster_id = match in_cluster {
4201 None => scx.resolve_cluster(None)?.id(),
4202 Some(in_cluster) => in_cluster.id,
4203 };
4204
4205 *in_cluster = Some(ResolvedClusterName {
4206 id: cluster_id,
4207 print_name: None,
4208 });
4209
4210 *name = Some(Ident::new(index_name.item.clone())?);
4212 *key_parts = Some(filled_key_parts);
4213 let if_not_exists = *if_not_exists;
4214
4215 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4216 let compaction_window = options.iter().find_map(|o| {
4217 #[allow(irrefutable_let_patterns)]
4218 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4219 Some(lcw.clone())
4220 } else {
4221 None
4222 }
4223 });
4224
4225 Ok(Plan::CreateIndex(CreateIndexPlan {
4226 name: index_name,
4227 index: Index {
4228 create_sql,
4229 on: on.global_id(),
4230 keys,
4231 cluster_id,
4232 compaction_window,
4233 },
4234 if_not_exists,
4235 }))
4236}
4237
4238pub fn describe_create_type(
4239 _: &StatementContext,
4240 _: CreateTypeStatement<Aug>,
4241) -> Result<StatementDesc, PlanError> {
4242 Ok(StatementDesc::new(None))
4243}
4244
4245pub fn plan_create_type(
4246 scx: &StatementContext,
4247 stmt: CreateTypeStatement<Aug>,
4248) -> Result<Plan, PlanError> {
4249 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4250 let CreateTypeStatement { name, as_type, .. } = stmt;
4251
4252 fn validate_data_type(
4253 scx: &StatementContext,
4254 data_type: ResolvedDataType,
4255 as_type: &str,
4256 key: &str,
4257 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4258 let (id, modifiers) = match data_type {
4259 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4260 _ => sql_bail!(
4261 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4262 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4263 as_type,
4264 key,
4265 data_type.human_readable_name(),
4266 ),
4267 };
4268
4269 let item = scx.catalog.get_item(&id);
4270 match item.type_details() {
4271 None => sql_bail!(
4272 "{} must be of class type, but received {} which is of class {}",
4273 key,
4274 scx.catalog.resolve_full_name(item.name()),
4275 item.item_type()
4276 ),
4277 Some(CatalogTypeDetails {
4278 typ: CatalogType::Char,
4279 ..
4280 }) => {
4281 bail_unsupported!("embedding char type in a list or map")
4282 }
4283 _ => {
4284 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4286
4287 Ok((id, modifiers))
4288 }
4289 }
4290 }
4291
4292 let inner = match as_type {
4293 CreateTypeAs::List { options } => {
4294 let CreateTypeListOptionExtracted {
4295 element_type,
4296 seen: _,
4297 } = CreateTypeListOptionExtracted::try_from(options)?;
4298 let element_type =
4299 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4300 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4301 CatalogType::List {
4302 element_reference: id,
4303 element_modifiers: modifiers,
4304 }
4305 }
4306 CreateTypeAs::Map { options } => {
4307 let CreateTypeMapOptionExtracted {
4308 key_type,
4309 value_type,
4310 seen: _,
4311 } = CreateTypeMapOptionExtracted::try_from(options)?;
4312 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4313 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4314 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4315 let (value_id, value_modifiers) =
4316 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4317 CatalogType::Map {
4318 key_reference: key_id,
4319 key_modifiers,
4320 value_reference: value_id,
4321 value_modifiers,
4322 }
4323 }
4324 CreateTypeAs::Record { column_defs } => {
4325 let mut fields = vec![];
4326 for column_def in column_defs {
4327 let data_type = column_def.data_type;
4328 let key = ident(column_def.name.clone());
4329 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4330 fields.push(CatalogRecordField {
4331 name: ColumnName::from(key.clone()),
4332 type_reference: id,
4333 type_modifiers: modifiers,
4334 });
4335 }
4336 CatalogType::Record { fields }
4337 }
4338 };
4339
4340 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4341
4342 let full_name = scx.catalog.resolve_full_name(&name);
4344 let partial_name = PartialItemName::from(full_name.clone());
4345 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4348 if item.item_type().conflicts_with_type() {
4349 return Err(PlanError::ItemAlreadyExists {
4350 name: full_name.to_string(),
4351 item_type: item.item_type(),
4352 });
4353 }
4354 }
4355
4356 Ok(Plan::CreateType(CreateTypePlan {
4357 name,
4358 typ: Type { create_sql, inner },
4359 }))
4360}
4361
4362generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4363
4364generate_extracted_config!(
4365 CreateTypeMapOption,
4366 (KeyType, ResolvedDataType),
4367 (ValueType, ResolvedDataType)
4368);
4369
4370#[derive(Debug)]
4371pub enum PlannedAlterRoleOption {
4372 Attributes(PlannedRoleAttributes),
4373 Variable(PlannedRoleVariable),
4374}
4375
4376#[derive(Debug, Clone)]
4377pub struct PlannedRoleAttributes {
4378 pub inherit: Option<bool>,
4379 pub password: Option<Password>,
4380 pub scram_iterations: Option<NonZeroU32>,
4381 pub nopassword: Option<bool>,
4385 pub superuser: Option<bool>,
4386 pub login: Option<bool>,
4387}
4388
4389fn plan_role_attributes(
4390 options: Vec<RoleAttribute>,
4391 scx: &StatementContext,
4392) -> Result<PlannedRoleAttributes, PlanError> {
4393 let mut planned_attributes = PlannedRoleAttributes {
4394 inherit: None,
4395 password: None,
4396 scram_iterations: None,
4397 superuser: None,
4398 login: None,
4399 nopassword: None,
4400 };
4401
4402 for option in options {
4403 match option {
4404 RoleAttribute::Inherit | RoleAttribute::NoInherit
4405 if planned_attributes.inherit.is_some() =>
4406 {
4407 sql_bail!("conflicting or redundant options");
4408 }
4409 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4410 bail_never_supported!(
4411 "CREATECLUSTER attribute",
4412 "sql/create-role/#details",
4413 "Use system privileges instead."
4414 );
4415 }
4416 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4417 bail_never_supported!(
4418 "CREATEDB attribute",
4419 "sql/create-role/#details",
4420 "Use system privileges instead."
4421 );
4422 }
4423 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4424 bail_never_supported!(
4425 "CREATEROLE attribute",
4426 "sql/create-role/#details",
4427 "Use system privileges instead."
4428 );
4429 }
4430 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4431 sql_bail!("conflicting or redundant options");
4432 }
4433
4434 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4435 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4436 RoleAttribute::Password(password) => {
4437 if let Some(password) = password {
4438 planned_attributes.password = Some(password.into());
4439 planned_attributes.scram_iterations =
4440 Some(scx.catalog.system_vars().scram_iterations())
4441 } else {
4442 planned_attributes.nopassword = Some(true);
4443 }
4444 }
4445 RoleAttribute::SuperUser => {
4446 if planned_attributes.superuser == Some(false) {
4447 sql_bail!("conflicting or redundant options");
4448 }
4449 planned_attributes.superuser = Some(true);
4450 }
4451 RoleAttribute::NoSuperUser => {
4452 if planned_attributes.superuser == Some(true) {
4453 sql_bail!("conflicting or redundant options");
4454 }
4455 planned_attributes.superuser = Some(false);
4456 }
4457 RoleAttribute::Login => {
4458 if planned_attributes.login == Some(false) {
4459 sql_bail!("conflicting or redundant options");
4460 }
4461 planned_attributes.login = Some(true);
4462 }
4463 RoleAttribute::NoLogin => {
4464 if planned_attributes.login == Some(true) {
4465 sql_bail!("conflicting or redundant options");
4466 }
4467 planned_attributes.login = Some(false);
4468 }
4469 }
4470 }
4471 if planned_attributes.inherit == Some(false) {
4472 bail_unsupported!("non inherit roles");
4473 }
4474
4475 Ok(planned_attributes)
4476}
4477
4478#[derive(Debug)]
4479pub enum PlannedRoleVariable {
4480 Set { name: String, value: VariableValue },
4481 Reset { name: String },
4482}
4483
4484impl PlannedRoleVariable {
4485 pub fn name(&self) -> &str {
4486 match self {
4487 PlannedRoleVariable::Set { name, .. } => name,
4488 PlannedRoleVariable::Reset { name } => name,
4489 }
4490 }
4491}
4492
4493fn plan_role_variable(
4494 scx: &StatementContext,
4495 variable: SetRoleVar,
4496) -> Result<PlannedRoleVariable, PlanError> {
4497 let plan = match variable {
4498 SetRoleVar::Set { name, value } => {
4499 let name = name.to_string();
4500 let value = scl::plan_set_variable_to(value)?;
4501 if let VariableValue::Values(values) = &value {
4504 vars::check_transaction_isolation_feature_flag(
4505 &name,
4506 VarInput::SqlSet(values),
4507 scx.catalog.system_vars(),
4508 )?;
4509 }
4510 PlannedRoleVariable::Set { name, value }
4511 }
4512 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4513 name: name.to_string(),
4514 },
4515 };
4516 Ok(plan)
4517}
4518
4519pub fn describe_create_role(
4520 _: &StatementContext,
4521 _: CreateRoleStatement,
4522) -> Result<StatementDesc, PlanError> {
4523 Ok(StatementDesc::new(None))
4524}
4525
4526pub fn plan_create_role(
4527 scx: &StatementContext,
4528 CreateRoleStatement { name, options }: CreateRoleStatement,
4529) -> Result<Plan, PlanError> {
4530 let attributes = plan_role_attributes(options, scx)?;
4531 Ok(Plan::CreateRole(CreateRolePlan {
4532 name: normalize::ident(name),
4533 attributes: attributes.into(),
4534 }))
4535}
4536
4537pub fn plan_create_network_policy(
4538 ctx: &StatementContext,
4539 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4540) -> Result<Plan, PlanError> {
4541 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4542 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4543
4544 let Some(rule_defs) = policy_options.rules else {
4545 sql_bail!("RULES must be specified when creating network policies.");
4546 };
4547
4548 let mut rules = vec![];
4549 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4550 let NetworkPolicyRuleOptionExtracted {
4551 seen: _,
4552 direction,
4553 action,
4554 address,
4555 } = options.try_into()?;
4556 let (direction, action, address) = match (direction, action, address) {
4557 (Some(direction), Some(action), Some(address)) => (
4558 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4559 NetworkPolicyRuleAction::try_from(action.as_str())?,
4560 PolicyAddress::try_from(address.as_str())?,
4561 ),
4562 (_, _, _) => {
4563 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4564 }
4565 };
4566 rules.push(NetworkPolicyRule {
4567 name: normalize::ident(name),
4568 direction,
4569 action,
4570 address,
4571 });
4572 }
4573
4574 if rules.len()
4575 > ctx
4576 .catalog
4577 .system_vars()
4578 .max_rules_per_network_policy()
4579 .try_into()?
4580 {
4581 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4582 }
4583
4584 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4585 name: normalize::ident(name),
4586 rules,
4587 }))
4588}
4589
4590pub fn plan_alter_network_policy(
4591 ctx: &StatementContext,
4592 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4593) -> Result<Plan, PlanError> {
4594 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4595
4596 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4597 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4598
4599 let Some(rule_defs) = policy_options.rules else {
4600 sql_bail!("RULES must be specified when creating network policies.");
4601 };
4602
4603 let mut rules = vec![];
4604 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4605 let NetworkPolicyRuleOptionExtracted {
4606 seen: _,
4607 direction,
4608 action,
4609 address,
4610 } = options.try_into()?;
4611
4612 let (direction, action, address) = match (direction, action, address) {
4613 (Some(direction), Some(action), Some(address)) => (
4614 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4615 NetworkPolicyRuleAction::try_from(action.as_str())?,
4616 PolicyAddress::try_from(address.as_str())?,
4617 ),
4618 (_, _, _) => {
4619 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4620 }
4621 };
4622 rules.push(NetworkPolicyRule {
4623 name: normalize::ident(name),
4624 direction,
4625 action,
4626 address,
4627 });
4628 }
4629 if rules.len()
4630 > ctx
4631 .catalog
4632 .system_vars()
4633 .max_rules_per_network_policy()
4634 .try_into()?
4635 {
4636 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4637 }
4638
4639 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4640 id: policy.id(),
4641 name: normalize::ident(name),
4642 rules,
4643 }))
4644}
4645
4646pub fn describe_create_cluster(
4647 _: &StatementContext,
4648 _: CreateClusterStatement<Aug>,
4649) -> Result<StatementDesc, PlanError> {
4650 Ok(StatementDesc::new(None))
4651}
4652
4653generate_extracted_config!(
4659 ClusterOption,
4660 (AvailabilityZones, Vec<String>),
4661 (Disk, bool),
4662 (IntrospectionDebugging, bool),
4663 (IntrospectionInterval, OptionalDuration),
4664 (Managed, bool),
4665 (Replicas, Vec<ReplicaDefinition<Aug>>),
4666 (ReplicationFactor, u32),
4667 (Size, String),
4668 (Schedule, ClusterScheduleOptionValue),
4669 (WorkloadClass, OptionalString)
4670);
4671
4672generate_extracted_config!(
4673 NetworkPolicyOption,
4674 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4675);
4676
4677generate_extracted_config!(
4678 NetworkPolicyRuleOption,
4679 (Direction, String),
4680 (Action, String),
4681 (Address, String)
4682);
4683
4684generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4685
4686generate_extracted_config!(
4687 ClusterAlterUntilReadyOption,
4688 (Timeout, Duration),
4689 (OnTimeout, String)
4690);
4691
4692generate_extracted_config!(
4693 ClusterFeature,
4694 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4695 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4696 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4697 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4698 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4699 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4700 (
4701 EnableProjectionPushdownAfterRelationCse,
4702 Option<bool>,
4703 Default(None)
4704 )
4705);
4706
4707pub fn plan_create_cluster(
4711 scx: &StatementContext,
4712 stmt: CreateClusterStatement<Aug>,
4713) -> Result<Plan, PlanError> {
4714 let plan = plan_create_cluster_inner(scx, stmt)?;
4715
4716 if let CreateClusterVariant::Managed(_) = &plan.variant {
4718 let stmt = unplan_create_cluster(scx, plan.clone())
4719 .map_err(|e| PlanError::Replan(e.to_string()))?;
4720 let create_sql = stmt.to_ast_string_stable();
4721 let stmt = parse::parse(&create_sql)
4722 .map_err(|e| PlanError::Replan(e.to_string()))?
4723 .into_element()
4724 .ast;
4725 let (stmt, _resolved_ids) =
4726 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4727 let stmt = match stmt {
4728 Statement::CreateCluster(stmt) => stmt,
4729 stmt => {
4730 return Err(PlanError::Replan(format!(
4731 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4732 )));
4733 }
4734 };
4735 let replan =
4736 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4737 if plan != replan {
4738 return Err(PlanError::Replan(format!(
4739 "replan does not match: plan={plan:?}, replan={replan:?}"
4740 )));
4741 }
4742 }
4743
4744 Ok(Plan::CreateCluster(plan))
4745}
4746
4747pub fn plan_create_cluster_inner(
4748 scx: &StatementContext,
4749 CreateClusterStatement {
4750 name,
4751 options,
4752 features,
4753 }: CreateClusterStatement<Aug>,
4754) -> Result<CreateClusterPlan, PlanError> {
4755 let ClusterOptionExtracted {
4756 availability_zones,
4757 introspection_debugging,
4758 introspection_interval,
4759 managed,
4760 replicas,
4761 replication_factor,
4762 seen: _,
4763 size,
4764 disk,
4765 schedule,
4766 workload_class,
4767 }: ClusterOptionExtracted = options.try_into()?;
4768
4769 let managed = managed.unwrap_or_else(|| replicas.is_none());
4770
4771 if !scx.catalog.active_role_id().is_system() {
4772 if !features.is_empty() {
4773 sql_bail!("FEATURES not supported for non-system users");
4774 }
4775 if workload_class.is_some() {
4776 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4777 }
4778 }
4779
4780 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4781 let workload_class = workload_class.and_then(|v| v.0);
4782
4783 if managed {
4784 if replicas.is_some() {
4785 sql_bail!("REPLICAS not supported for managed clusters");
4786 }
4787 let Some(size) = size else {
4788 sql_bail!("SIZE must be specified for managed clusters");
4789 };
4790
4791 if disk.is_some() {
4792 if scx.catalog.is_cluster_size_cc(&size) {
4796 sql_bail!(
4797 "DISK option not supported for modern cluster sizes because disk is always enabled"
4798 );
4799 }
4800
4801 scx.catalog
4802 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4803 }
4804
4805 let compute = plan_compute_replica_config(
4806 introspection_interval,
4807 introspection_debugging.unwrap_or(false),
4808 )?;
4809
4810 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4811 replication_factor.unwrap_or_else(|| {
4812 scx.catalog
4813 .system_vars()
4814 .default_cluster_replication_factor()
4815 })
4816 } else {
4817 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4818 if replication_factor.is_some() {
4819 sql_bail!(
4820 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4821 );
4822 }
4823 0
4827 };
4828 let availability_zones = availability_zones.unwrap_or_default();
4829
4830 if !availability_zones.is_empty() {
4831 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4832 }
4833
4834 let ClusterFeatureExtracted {
4836 reoptimize_imported_views,
4837 enable_eager_delta_joins,
4838 enable_new_outer_join_lowering,
4839 enable_variadic_left_join_lowering,
4840 enable_letrec_fixpoint_analysis,
4841 enable_join_prioritize_arranged,
4842 enable_projection_pushdown_after_relation_cse,
4843 seen: _,
4844 } = ClusterFeatureExtracted::try_from(features)?;
4845 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4846 reoptimize_imported_views,
4847 enable_eager_delta_joins,
4848 enable_new_outer_join_lowering,
4849 enable_variadic_left_join_lowering,
4850 enable_letrec_fixpoint_analysis,
4851 enable_join_prioritize_arranged,
4852 enable_projection_pushdown_after_relation_cse,
4853 ..Default::default()
4854 };
4855
4856 let schedule = plan_cluster_schedule(schedule)?;
4857
4858 Ok(CreateClusterPlan {
4859 name: normalize::ident(name),
4860 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4861 replication_factor,
4862 size,
4863 availability_zones,
4864 compute,
4865 optimizer_feature_overrides,
4866 schedule,
4867 }),
4868 workload_class,
4869 })
4870 } else {
4871 let Some(replica_defs) = replicas else {
4872 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4873 };
4874 if availability_zones.is_some() {
4875 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4876 }
4877 if replication_factor.is_some() {
4878 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4879 }
4880 if introspection_debugging.is_some() {
4881 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4882 }
4883 if introspection_interval.is_some() {
4884 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4885 }
4886 if size.is_some() {
4887 sql_bail!("SIZE not supported for unmanaged clusters");
4888 }
4889 if disk.is_some() {
4890 sql_bail!("DISK not supported for unmanaged clusters");
4891 }
4892 if !features.is_empty() {
4893 sql_bail!("FEATURES not supported for unmanaged clusters");
4894 }
4895 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4896 sql_bail!(
4897 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4898 );
4899 }
4900
4901 let mut replicas = vec![];
4902 for ReplicaDefinition { name, options } in replica_defs {
4903 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4904 }
4905
4906 Ok(CreateClusterPlan {
4907 name: normalize::ident(name),
4908 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4909 workload_class,
4910 })
4911 }
4912}
4913
4914pub fn unplan_create_cluster(
4918 scx: &StatementContext,
4919 CreateClusterPlan {
4920 name,
4921 variant,
4922 workload_class,
4923 }: CreateClusterPlan,
4924) -> Result<CreateClusterStatement<Aug>, PlanError> {
4925 match variant {
4926 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4927 replication_factor,
4928 size,
4929 availability_zones,
4930 compute,
4931 optimizer_feature_overrides,
4932 schedule,
4933 }) => {
4934 let schedule = unplan_cluster_schedule(schedule);
4935 let OptimizerFeatureOverrides {
4936 enable_reduce_mfp_fusion: _,
4937 enable_cardinality_estimates: _,
4938 persist_fast_path_limit: _,
4939 reoptimize_imported_views,
4940 enable_eager_delta_joins,
4941 enable_new_outer_join_lowering,
4942 enable_variadic_left_join_lowering,
4943 enable_letrec_fixpoint_analysis,
4944 enable_join_prioritize_arranged,
4945 enable_projection_pushdown_after_relation_cse,
4946 enable_less_reduce_in_eqprop: _,
4947 enable_dequadratic_eqprop_map: _,
4948 enable_eq_classes_withholding_errors: _,
4949 enable_fast_path_plan_insights: _,
4950 enable_cast_elimination: _,
4951 enable_case_literal_transform: _,
4952 enable_simplify_quantified_comparisons: _,
4953 enable_coalesce_case_transform: _,
4954 enable_will_distinct_propagation: _,
4955 } = optimizer_feature_overrides;
4956 let features_extracted = ClusterFeatureExtracted {
4958 seen: Default::default(),
4960 reoptimize_imported_views,
4961 enable_eager_delta_joins,
4962 enable_new_outer_join_lowering,
4963 enable_variadic_left_join_lowering,
4964 enable_letrec_fixpoint_analysis,
4965 enable_join_prioritize_arranged,
4966 enable_projection_pushdown_after_relation_cse,
4967 };
4968 let features = features_extracted.into_values(scx.catalog);
4969 let availability_zones = if availability_zones.is_empty() {
4970 None
4971 } else {
4972 Some(availability_zones)
4973 };
4974 let (introspection_interval, introspection_debugging) =
4975 unplan_compute_replica_config(compute);
4976 let replication_factor = match &schedule {
4979 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4980 ClusterScheduleOptionValue::Refresh { .. } => {
4981 soft_assert_or_log!(
4988 replication_factor <= 1,
4989 "replication factor, {replication_factor:?}, must be <= 1 with a refresh schedule"
4990 );
4991 None
4992 }
4993 };
4994 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4995 let options_extracted = ClusterOptionExtracted {
4996 seen: Default::default(),
4998 availability_zones,
4999 disk: None,
5000 introspection_debugging: Some(introspection_debugging),
5001 introspection_interval,
5002 managed: Some(true),
5003 replicas: None,
5004 replication_factor,
5005 size: Some(size),
5006 schedule: Some(schedule),
5007 workload_class,
5008 };
5009 let options = options_extracted.into_values(scx.catalog);
5010 let name = Ident::new_unchecked(name);
5011 Ok(CreateClusterStatement {
5012 name,
5013 options,
5014 features,
5015 })
5016 }
5017 CreateClusterVariant::Unmanaged(_) => {
5018 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5019 }
5020 }
5021}
5022
5023generate_extracted_config!(
5024 ReplicaOption,
5025 (AvailabilityZone, String),
5026 (BilledAs, String),
5027 (ComputeAddresses, Vec<String>),
5028 (ComputectlAddresses, Vec<String>),
5029 (Disk, bool),
5030 (Internal, bool, Default(false)),
5031 (IntrospectionDebugging, bool, Default(false)),
5032 (IntrospectionInterval, OptionalDuration),
5033 (Size, String),
5034 (StorageAddresses, Vec<String>),
5035 (StoragectlAddresses, Vec<String>),
5036 (Workers, u16)
5037);
5038
5039fn plan_replica_config(
5040 scx: &StatementContext,
5041 options: Vec<ReplicaOption<Aug>>,
5042) -> Result<ReplicaConfig, PlanError> {
5043 let ReplicaOptionExtracted {
5044 availability_zone,
5045 billed_as,
5046 computectl_addresses,
5047 disk,
5048 internal,
5049 introspection_debugging,
5050 introspection_interval,
5051 size,
5052 storagectl_addresses,
5053 ..
5054 }: ReplicaOptionExtracted = options.try_into()?;
5055
5056 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5057
5058 match (
5059 size,
5060 availability_zone,
5061 billed_as,
5062 storagectl_addresses,
5063 computectl_addresses,
5064 ) {
5065 (None, _, None, None, None) => {
5067 sql_bail!("SIZE option must be specified");
5070 }
5071 (Some(size), availability_zone, billed_as, None, None) => {
5072 if disk.is_some() {
5073 if scx.catalog.is_cluster_size_cc(&size) {
5077 sql_bail!(
5078 "DISK option not supported for modern cluster sizes because disk is always enabled"
5079 );
5080 }
5081
5082 scx.catalog
5083 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5084 }
5085
5086 Ok(ReplicaConfig::Orchestrated {
5087 size,
5088 availability_zone,
5089 compute,
5090 billed_as,
5091 internal,
5092 })
5093 }
5094
5095 (None, None, None, storagectl_addresses, computectl_addresses) => {
5096 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5097
5098 let Some(storagectl_addrs) = storagectl_addresses else {
5102 sql_bail!("missing STORAGECTL ADDRESSES option");
5103 };
5104 let Some(computectl_addrs) = computectl_addresses else {
5105 sql_bail!("missing COMPUTECTL ADDRESSES option");
5106 };
5107
5108 if storagectl_addrs.len() != computectl_addrs.len() {
5109 sql_bail!(
5110 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5111 );
5112 }
5113
5114 if disk.is_some() {
5115 sql_bail!("DISK can't be specified for unorchestrated clusters");
5116 }
5117
5118 Ok(ReplicaConfig::Unorchestrated {
5119 storagectl_addrs,
5120 computectl_addrs,
5121 compute,
5122 })
5123 }
5124 _ => {
5125 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5128 }
5129 }
5130}
5131
5132fn plan_compute_replica_config(
5136 introspection_interval: Option<OptionalDuration>,
5137 introspection_debugging: bool,
5138) -> Result<ComputeReplicaConfig, PlanError> {
5139 let introspection_interval = introspection_interval
5140 .map(|OptionalDuration(i)| i)
5141 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5142 let introspection = match introspection_interval {
5143 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5144 interval,
5145 debugging: introspection_debugging,
5146 }),
5147 None if introspection_debugging => {
5148 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5149 }
5150 None => None,
5151 };
5152 let compute = ComputeReplicaConfig { introspection };
5153 Ok(compute)
5154}
5155
5156fn unplan_compute_replica_config(
5160 compute_replica_config: ComputeReplicaConfig,
5161) -> (Option<OptionalDuration>, bool) {
5162 match compute_replica_config.introspection {
5163 Some(ComputeReplicaIntrospectionConfig {
5164 debugging,
5165 interval,
5166 }) => (Some(OptionalDuration(Some(interval))), debugging),
5167 None => (Some(OptionalDuration(None)), false),
5168 }
5169}
5170
5171fn plan_cluster_schedule(
5175 schedule: ClusterScheduleOptionValue,
5176) -> Result<ClusterSchedule, PlanError> {
5177 Ok(match schedule {
5178 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5179 ClusterScheduleOptionValue::Refresh {
5181 hydration_time_estimate: None,
5182 } => ClusterSchedule::Refresh {
5183 hydration_time_estimate: Duration::from_millis(0),
5184 },
5185 ClusterScheduleOptionValue::Refresh {
5187 hydration_time_estimate: Some(interval_value),
5188 } => {
5189 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5190 if interval.as_microseconds() < 0 {
5191 sql_bail!(
5192 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5193 interval
5194 );
5195 }
5196 if interval.months != 0 {
5197 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5201 }
5202 let duration = interval.duration()?;
5203 if u64::try_from(duration.as_millis()).is_err()
5204 || Interval::from_duration(&duration).is_err()
5205 {
5206 sql_bail!("HYDRATION TIME ESTIMATE too large");
5207 }
5208 ClusterSchedule::Refresh {
5209 hydration_time_estimate: duration,
5210 }
5211 }
5212 })
5213}
5214
5215fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5219 match schedule {
5220 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5221 ClusterSchedule::Refresh {
5222 hydration_time_estimate,
5223 } => {
5224 let interval = Interval::from_duration(&hydration_time_estimate)
5225 .expect("planning ensured that this is convertible back to Interval");
5226 let interval_value = literal::unplan_interval(&interval);
5227 ClusterScheduleOptionValue::Refresh {
5228 hydration_time_estimate: Some(interval_value),
5229 }
5230 }
5231 }
5232}
5233
5234pub fn describe_create_cluster_replica(
5235 _: &StatementContext,
5236 _: CreateClusterReplicaStatement<Aug>,
5237) -> Result<StatementDesc, PlanError> {
5238 Ok(StatementDesc::new(None))
5239}
5240
5241pub fn plan_create_cluster_replica(
5242 scx: &StatementContext,
5243 CreateClusterReplicaStatement {
5244 definition: ReplicaDefinition { name, options },
5245 of_cluster,
5246 }: CreateClusterReplicaStatement<Aug>,
5247) -> Result<Plan, PlanError> {
5248 let cluster = scx
5249 .catalog
5250 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5251
5252 let config = plan_replica_config(scx, options)?;
5253
5254 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5255 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5256 return Err(PlanError::MangedReplicaName(name.into_string()));
5257 }
5258 } else {
5259 ensure_cluster_is_not_managed(scx, cluster.id())?;
5260 }
5261
5262 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5263 name: normalize::ident(name),
5264 cluster_id: cluster.id(),
5265 config,
5266 }))
5267}
5268
5269pub fn describe_create_secret(
5270 _: &StatementContext,
5271 _: CreateSecretStatement<Aug>,
5272) -> Result<StatementDesc, PlanError> {
5273 Ok(StatementDesc::new(None))
5274}
5275
5276pub fn plan_create_secret(
5277 scx: &StatementContext,
5278 stmt: CreateSecretStatement<Aug>,
5279) -> Result<Plan, PlanError> {
5280 let CreateSecretStatement {
5281 name,
5282 if_not_exists,
5283 value,
5284 } = &stmt;
5285
5286 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5287 let mut create_sql_statement = stmt.clone();
5288 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5289 let create_sql =
5290 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5291 let secret_as = query::plan_secret_as(scx, value.clone())?;
5292
5293 let secret = Secret {
5294 create_sql,
5295 secret_as,
5296 };
5297
5298 Ok(Plan::CreateSecret(CreateSecretPlan {
5299 name,
5300 secret,
5301 if_not_exists: *if_not_exists,
5302 }))
5303}
5304
5305pub fn describe_create_connection(
5306 _: &StatementContext,
5307 _: CreateConnectionStatement<Aug>,
5308) -> Result<StatementDesc, PlanError> {
5309 Ok(StatementDesc::new(None))
5310}
5311
5312generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5313
5314pub fn plan_create_connection(
5315 scx: &StatementContext,
5316 mut stmt: CreateConnectionStatement<Aug>,
5317) -> Result<Plan, PlanError> {
5318 let CreateConnectionStatement {
5319 name,
5320 connection_type,
5321 values,
5322 if_not_exists,
5323 with_options,
5324 } = stmt.clone();
5325 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5326 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5327 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5328
5329 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5330 if options.validate.is_some() {
5331 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5332 }
5333 let validate = match options.validate {
5334 Some(val) => val,
5335 None => {
5336 scx.catalog
5337 .system_vars()
5338 .enable_default_connection_validation()
5339 && details.to_connection().validate_by_default()
5340 }
5341 };
5342
5343 let full_name = scx.catalog.resolve_full_name(&name);
5345 let partial_name = PartialItemName::from(full_name.clone());
5346 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5347 return Err(PlanError::ItemAlreadyExists {
5348 name: full_name.to_string(),
5349 item_type: item.item_type(),
5350 });
5351 }
5352
5353 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5356 stmt.values.retain(|v| {
5357 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5358 });
5359 stmt.values.push(ConnectionOption {
5360 name: ConnectionOptionName::PublicKey1,
5361 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5362 });
5363 stmt.values.push(ConnectionOption {
5364 name: ConnectionOptionName::PublicKey2,
5365 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5366 });
5367 }
5368 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5369
5370 let plan = CreateConnectionPlan {
5371 name,
5372 if_not_exists,
5373 connection: crate::plan::Connection {
5374 create_sql,
5375 details,
5376 },
5377 validate,
5378 };
5379 Ok(Plan::CreateConnection(plan))
5380}
5381
5382fn plan_drop_database(
5383 scx: &StatementContext,
5384 if_exists: bool,
5385 name: &UnresolvedDatabaseName,
5386 cascade: bool,
5387) -> Result<Option<DatabaseId>, PlanError> {
5388 Ok(match resolve_database(scx, name, if_exists)? {
5389 Some(database) => {
5390 if !cascade && database.has_schemas() {
5391 sql_bail!(
5392 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5393 name,
5394 );
5395 }
5396 Some(database.id())
5397 }
5398 None => None,
5399 })
5400}
5401
5402pub fn describe_drop_objects(
5403 _: &StatementContext,
5404 _: DropObjectsStatement,
5405) -> Result<StatementDesc, PlanError> {
5406 Ok(StatementDesc::new(None))
5407}
5408
5409pub fn plan_drop_objects(
5410 scx: &mut StatementContext,
5411 DropObjectsStatement {
5412 object_type,
5413 if_exists,
5414 names,
5415 cascade,
5416 }: DropObjectsStatement,
5417) -> Result<Plan, PlanError> {
5418 if object_type == mz_sql_parser::ast::ObjectType::Func {
5419 bail_unsupported!("DROP FUNCTION");
5420 }
5421 let object_type = object_type.into();
5422
5423 let mut referenced_ids = Vec::new();
5424 for name in names {
5425 let id = match &name {
5426 UnresolvedObjectName::Cluster(name) => {
5427 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5428 }
5429 UnresolvedObjectName::ClusterReplica(name) => {
5430 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5431 }
5432 UnresolvedObjectName::Database(name) => {
5433 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5434 }
5435 UnresolvedObjectName::Schema(name) => {
5436 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5437 }
5438 UnresolvedObjectName::Role(name) => {
5439 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5440 }
5441 UnresolvedObjectName::Item(name) => {
5442 plan_drop_item_name(scx, object_type, if_exists, name.clone())?.map(ObjectId::Item)
5446 }
5447 UnresolvedObjectName::NetworkPolicy(name) => {
5448 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5449 }
5450 };
5451 match id {
5452 Some(id) => referenced_ids.push(id),
5453 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5454 name: name.to_ast_string_simple(),
5455 object_type,
5456 }),
5457 }
5458 }
5459
5460 if !cascade {
5464 let dropped_items: BTreeSet<CatalogItemId> = referenced_ids
5465 .iter()
5466 .filter_map(|id| match id {
5467 ObjectId::Item(id) => Some(*id),
5468 _ => None,
5469 })
5470 .collect();
5471 for id in &dropped_items {
5472 let catalog_item = scx.catalog.get_item(id);
5473 ensure_no_blocking_dependents(scx, object_type, catalog_item, &dropped_items)?;
5474 }
5475 }
5476
5477 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5478
5479 Ok(Plan::DropObjects(DropObjectsPlan {
5480 referenced_ids,
5481 drop_ids,
5482 object_type,
5483 }))
5484}
5485
5486fn plan_drop_schema(
5487 scx: &StatementContext,
5488 if_exists: bool,
5489 name: &UnresolvedSchemaName,
5490 cascade: bool,
5491) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5492 let normalized = normalize::unresolved_schema_name(name.clone())?;
5496 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5497 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5498 }
5499
5500 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5501 Some((database_spec, schema_spec)) => {
5502 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5503 sql_bail!(
5504 "cannot drop schema {name} because it is required by the database system",
5505 );
5506 }
5507 if let SchemaSpecifier::Temporary = schema_spec {
5508 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5509 }
5510 let schema = scx.get_schema(&database_spec, &schema_spec);
5511 if !cascade && schema.has_items() {
5512 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5513 sql_bail!(
5514 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5515 full_schema_name
5516 );
5517 }
5518 Some((database_spec, schema_spec))
5519 }
5520 None => None,
5521 })
5522}
5523
5524fn plan_drop_role(
5525 scx: &StatementContext,
5526 if_exists: bool,
5527 name: &Ident,
5528) -> Result<Option<RoleId>, PlanError> {
5529 match scx.catalog.resolve_role(name.as_str()) {
5530 Ok(role) => {
5531 let id = role.id();
5532 if &id == scx.catalog.active_role_id() {
5533 sql_bail!("current role cannot be dropped");
5534 }
5535 for role in scx.catalog.get_roles() {
5536 for (member_id, grantor_id) in role.membership() {
5537 if &id == grantor_id {
5538 let member_role = scx.catalog.get_role(member_id);
5539 sql_bail!(
5540 "cannot drop role {}: still depended up by membership of role {} in role {}",
5541 name.as_str(),
5542 role.name(),
5543 member_role.name()
5544 );
5545 }
5546 }
5547 }
5548 Ok(Some(role.id()))
5549 }
5550 Err(_) if if_exists => Ok(None),
5551 Err(e) => Err(e.into()),
5552 }
5553}
5554
5555fn plan_drop_cluster(
5556 scx: &StatementContext,
5557 if_exists: bool,
5558 name: &Ident,
5559 cascade: bool,
5560) -> Result<Option<ClusterId>, PlanError> {
5561 Ok(match resolve_cluster(scx, name, if_exists)? {
5562 Some(cluster) => {
5563 if !cascade && !cluster.bound_objects().is_empty() {
5564 return Err(PlanError::DependentObjectsStillExist {
5565 object_type: "cluster".to_string(),
5566 object_name: cluster.name().to_string(),
5567 dependents: Vec::new(),
5568 });
5569 }
5570 Some(cluster.id())
5571 }
5572 None => None,
5573 })
5574}
5575
5576fn plan_drop_network_policy(
5577 scx: &StatementContext,
5578 if_exists: bool,
5579 name: &Ident,
5580) -> Result<Option<NetworkPolicyId>, PlanError> {
5581 match scx.catalog.resolve_network_policy(name.as_str()) {
5582 Ok(policy) => {
5583 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5586 Err(PlanError::NetworkPolicyInUse)
5587 } else {
5588 Ok(Some(policy.id()))
5589 }
5590 }
5591 Err(_) if if_exists => Ok(None),
5592 Err(e) => Err(e.into()),
5593 }
5594}
5595
5596fn plan_drop_cluster_replica(
5597 scx: &StatementContext,
5598 if_exists: bool,
5599 name: &QualifiedReplica,
5600) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5601 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5602 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5603}
5604
5605fn plan_drop_item(
5607 scx: &StatementContext,
5608 object_type: ObjectType,
5609 if_exists: bool,
5610 name: UnresolvedItemName,
5611 cascade: bool,
5612) -> Result<Option<CatalogItemId>, PlanError> {
5613 let Some(id) = plan_drop_item_name(scx, object_type, if_exists, name)? else {
5614 return Ok(None);
5615 };
5616 if !cascade {
5617 let catalog_item = scx.catalog.get_item(&id);
5618 ensure_no_blocking_dependents(scx, object_type, catalog_item, &BTreeSet::new())?;
5619 }
5620 Ok(Some(id))
5621}
5622
5623fn plan_drop_item_name(
5627 scx: &StatementContext,
5628 object_type: ObjectType,
5629 if_exists: bool,
5630 name: UnresolvedItemName,
5631) -> Result<Option<CatalogItemId>, PlanError> {
5632 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5633 Ok(r) => r,
5634 Err(PlanError::MismatchedObjectType {
5636 name,
5637 is_type: ObjectType::MaterializedView,
5638 expected_type: ObjectType::View,
5639 }) => {
5640 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5641 }
5642 e => e?,
5643 };
5644
5645 Ok(match resolved {
5646 Some(catalog_item) => {
5647 if catalog_item.id().is_system() {
5648 sql_bail!(
5649 "cannot drop {} {} because it is required by the database system",
5650 catalog_item.item_type(),
5651 scx.catalog.minimal_qualification(catalog_item.name()),
5652 );
5653 }
5654 Some(catalog_item.id())
5655 }
5656 None => None,
5657 })
5658}
5659
5660fn ensure_no_blocking_dependents(
5665 scx: &StatementContext,
5666 object_type: ObjectType,
5667 catalog_item: &dyn CatalogItem,
5668 also_dropped: &BTreeSet<CatalogItemId>,
5669) -> Result<(), PlanError> {
5670 for id in catalog_item.used_by() {
5671 if also_dropped.contains(id) {
5672 continue;
5673 }
5674 let dep = scx.catalog.get_item(id);
5675 if dependency_prevents_drop(object_type, dep) {
5676 return Err(PlanError::DependentObjectsStillExist {
5677 object_type: catalog_item.item_type().to_string(),
5678 object_name: scx
5679 .catalog
5680 .minimal_qualification(catalog_item.name())
5681 .to_string(),
5682 dependents: vec![(
5683 dep.item_type().to_string(),
5684 scx.catalog.minimal_qualification(dep.name()).to_string(),
5685 )],
5686 });
5687 }
5688 }
5689 Ok(())
5692}
5693
5694fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5696 match object_type {
5697 ObjectType::Type => true,
5698 ObjectType::Table
5699 | ObjectType::View
5700 | ObjectType::MaterializedView
5701 | ObjectType::Source
5702 | ObjectType::Sink
5703 | ObjectType::Index
5704 | ObjectType::Role
5705 | ObjectType::Cluster
5706 | ObjectType::ClusterReplica
5707 | ObjectType::Secret
5708 | ObjectType::Connection
5709 | ObjectType::Database
5710 | ObjectType::Schema
5711 | ObjectType::Func
5712 | ObjectType::NetworkPolicy => match dep.item_type() {
5713 CatalogItemType::Func
5714 | CatalogItemType::Table
5715 | CatalogItemType::Source
5716 | CatalogItemType::View
5717 | CatalogItemType::MaterializedView
5718 | CatalogItemType::Sink
5719 | CatalogItemType::Type
5720 | CatalogItemType::Secret
5721 | CatalogItemType::Connection => true,
5722 CatalogItemType::Index => false,
5723 },
5724 }
5725}
5726
5727pub fn describe_alter_index_options(
5728 _: &StatementContext,
5729 _: AlterIndexStatement<Aug>,
5730) -> Result<StatementDesc, PlanError> {
5731 Ok(StatementDesc::new(None))
5732}
5733
5734pub fn describe_drop_owned(
5735 _: &StatementContext,
5736 _: DropOwnedStatement<Aug>,
5737) -> Result<StatementDesc, PlanError> {
5738 Ok(StatementDesc::new(None))
5739}
5740
5741pub fn plan_drop_owned(
5742 scx: &StatementContext,
5743 drop: DropOwnedStatement<Aug>,
5744) -> Result<Plan, PlanError> {
5745 let cascade = drop.cascade();
5746 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5747 let mut drop_ids = Vec::new();
5748 let mut privilege_revokes = Vec::new();
5749 let mut default_privilege_revokes = Vec::new();
5750
5751 fn update_privilege_revokes(
5752 object_id: SystemObjectId,
5753 privileges: &PrivilegeMap,
5754 role_ids: &BTreeSet<RoleId>,
5755 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5756 ) {
5757 privilege_revokes.extend(iter::zip(
5758 iter::repeat(object_id),
5759 privileges
5760 .all_values()
5761 .filter(|privilege| role_ids.contains(&privilege.grantee))
5762 .cloned(),
5763 ));
5764 }
5765
5766 for replica in scx.catalog.get_cluster_replicas() {
5768 if role_ids.contains(&replica.owner_id()) {
5769 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5770 }
5771 }
5772
5773 for cluster in scx.catalog.get_clusters() {
5775 if role_ids.contains(&cluster.owner_id()) {
5776 if !cascade {
5778 let non_owned_bound_objects: Vec<_> = cluster
5779 .bound_objects()
5780 .into_iter()
5781 .map(|item_id| scx.catalog.get_item(item_id))
5782 .filter(|item| !role_ids.contains(&item.owner_id()))
5783 .collect();
5784 if !non_owned_bound_objects.is_empty() {
5785 let names: Vec<_> = non_owned_bound_objects
5786 .into_iter()
5787 .map(|item| {
5788 (
5789 item.item_type().to_string(),
5790 scx.catalog.resolve_full_name(item.name()).to_string(),
5791 )
5792 })
5793 .collect();
5794 return Err(PlanError::DependentObjectsStillExist {
5795 object_type: "cluster".to_string(),
5796 object_name: cluster.name().to_string(),
5797 dependents: names,
5798 });
5799 }
5800 }
5801 drop_ids.push(cluster.id().into());
5802 }
5803 update_privilege_revokes(
5804 SystemObjectId::Object(cluster.id().into()),
5805 cluster.privileges(),
5806 &role_ids,
5807 &mut privilege_revokes,
5808 );
5809 }
5810
5811 for item in scx.catalog.get_items() {
5813 if role_ids.contains(&item.owner_id()) {
5814 if !cascade {
5815 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5817 let non_owned_dependencies: Vec<_> = used_by
5818 .into_iter()
5819 .map(|item_id| scx.catalog.get_item(item_id))
5820 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5821 .filter(|item| !role_ids.contains(&item.owner_id()))
5822 .collect();
5823 if !non_owned_dependencies.is_empty() {
5824 let names: Vec<_> = non_owned_dependencies
5825 .into_iter()
5826 .map(|item| {
5827 let item_typ = item.item_type().to_string();
5828 let item_name =
5829 scx.catalog.resolve_full_name(item.name()).to_string();
5830 (item_typ, item_name)
5831 })
5832 .collect();
5833 Err(PlanError::DependentObjectsStillExist {
5834 object_type: item.item_type().to_string(),
5835 object_name: scx
5836 .catalog
5837 .resolve_full_name(item.name())
5838 .to_string()
5839 .to_string(),
5840 dependents: names,
5841 })
5842 } else {
5843 Ok(())
5844 }
5845 };
5846
5847 if let Some(id) = item.progress_id() {
5850 let progress_item = scx.catalog.get_item(&id);
5851 check_if_dependents_exist(progress_item.used_by())?;
5852 }
5853 check_if_dependents_exist(item.used_by())?;
5854 }
5855 drop_ids.push(item.id().into());
5856 }
5857 update_privilege_revokes(
5858 SystemObjectId::Object(item.id().into()),
5859 item.privileges(),
5860 &role_ids,
5861 &mut privilege_revokes,
5862 );
5863 }
5864
5865 for schema in scx.catalog.get_schemas() {
5867 if !schema.id().is_temporary() {
5868 if role_ids.contains(&schema.owner_id()) {
5869 if !cascade {
5870 let non_owned_dependencies: Vec<_> = schema
5871 .item_ids()
5872 .map(|item_id| scx.catalog.get_item(&item_id))
5873 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5874 .filter(|item| !role_ids.contains(&item.owner_id()))
5875 .collect();
5876 if !non_owned_dependencies.is_empty() {
5877 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5878 sql_bail!(
5879 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5880 full_schema_name.to_string().quoted()
5881 );
5882 }
5883 }
5884 drop_ids.push((*schema.database(), *schema.id()).into())
5885 }
5886 update_privilege_revokes(
5887 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5888 schema.privileges(),
5889 &role_ids,
5890 &mut privilege_revokes,
5891 );
5892 }
5893 }
5894
5895 for database in scx.catalog.get_databases() {
5897 if role_ids.contains(&database.owner_id()) {
5898 if !cascade {
5899 let non_owned_schemas: Vec<_> = database
5900 .schemas()
5901 .into_iter()
5902 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5903 .collect();
5904 if !non_owned_schemas.is_empty() {
5905 sql_bail!(
5906 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5907 database.name().quoted(),
5908 );
5909 }
5910 }
5911 drop_ids.push(database.id().into());
5912 }
5913 update_privilege_revokes(
5914 SystemObjectId::Object(database.id().into()),
5915 database.privileges(),
5916 &role_ids,
5917 &mut privilege_revokes,
5918 );
5919 }
5920
5921 for network_policy in scx.catalog.get_network_policies() {
5923 if role_ids.contains(&network_policy.owner_id()) {
5924 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5925 }
5926 update_privilege_revokes(
5927 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5928 network_policy.privileges(),
5929 &role_ids,
5930 &mut privilege_revokes,
5931 );
5932 }
5933
5934 update_privilege_revokes(
5936 SystemObjectId::System,
5937 scx.catalog.get_system_privileges(),
5938 &role_ids,
5939 &mut privilege_revokes,
5940 );
5941
5942 for (default_privilege_object, default_privilege_acl_items) in
5943 scx.catalog.get_default_privileges()
5944 {
5945 for default_privilege_acl_item in default_privilege_acl_items {
5946 if role_ids.contains(&default_privilege_object.role_id)
5947 || role_ids.contains(&default_privilege_acl_item.grantee)
5948 {
5949 default_privilege_revokes.push((
5950 default_privilege_object.clone(),
5951 default_privilege_acl_item.clone(),
5952 ));
5953 }
5954 }
5955 }
5956
5957 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5958
5959 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5960 if !system_ids.is_empty() {
5961 let mut owners = system_ids
5962 .into_iter()
5963 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5964 .collect::<BTreeSet<_>>()
5965 .into_iter()
5966 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5967 sql_bail!(
5968 "cannot drop objects owned by role {} because they are required by the database system",
5969 owners.join(", "),
5970 );
5971 }
5972
5973 Ok(Plan::DropOwned(DropOwnedPlan {
5974 role_ids: role_ids.into_iter().collect(),
5975 drop_ids,
5976 privilege_revokes,
5977 default_privilege_revokes,
5978 }))
5979}
5980
5981fn plan_retain_history_option(
5982 scx: &StatementContext,
5983 retain_history: Option<OptionalDuration>,
5984) -> Result<Option<CompactionWindow>, PlanError> {
5985 if let Some(OptionalDuration(lcw)) = retain_history {
5986 Ok(Some(plan_retain_history(scx, lcw)?))
5987 } else {
5988 Ok(None)
5989 }
5990}
5991
5992fn plan_retain_history(
5998 scx: &StatementContext,
5999 lcw: Option<Duration>,
6000) -> Result<CompactionWindow, PlanError> {
6001 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6002 match lcw {
6003 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6008 option_name: "RETAIN HISTORY".to_string(),
6009 err: Box::new(PlanError::Unstructured(
6010 "internal error: unexpectedly zero".to_string(),
6011 )),
6012 }),
6013 Some(duration) => {
6014 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6017 && scx
6018 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6019 .is_err()
6020 {
6021 return Err(PlanError::RetainHistoryLow {
6022 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6023 });
6024 }
6025 Ok(duration.try_into()?)
6026 }
6027 None => {
6030 if scx
6031 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6032 .is_err()
6033 {
6034 Err(PlanError::RetainHistoryRequired)
6035 } else {
6036 Ok(CompactionWindow::DisableCompaction)
6037 }
6038 }
6039 }
6040}
6041
6042generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6043
6044fn plan_index_options(
6045 scx: &StatementContext,
6046 with_opts: Vec<IndexOption<Aug>>,
6047) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6048 if !with_opts.is_empty() {
6049 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6051 }
6052
6053 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6054
6055 let mut out = Vec::with_capacity(1);
6056 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6057 out.push(crate::plan::IndexOption::RetainHistory(cw));
6058 }
6059 Ok(out)
6060}
6061
6062generate_extracted_config!(
6063 TableOption,
6064 (PartitionBy, Vec<Ident>),
6065 (RetainHistory, OptionalDuration),
6066 (RedactedTest, String)
6067);
6068
6069fn plan_table_options(
6070 scx: &StatementContext,
6071 desc: &RelationDesc,
6072 with_opts: Vec<TableOption<Aug>>,
6073) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6074 let TableOptionExtracted {
6075 partition_by,
6076 retain_history,
6077 redacted_test,
6078 ..
6079 }: TableOptionExtracted = with_opts.try_into()?;
6080
6081 if let Some(partition_by) = partition_by {
6082 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6083 check_partition_by(desc, partition_by)?;
6084 }
6085
6086 if redacted_test.is_some() {
6087 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6088 }
6089
6090 let mut out = Vec::with_capacity(1);
6091 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6092 out.push(crate::plan::TableOption::RetainHistory(cw));
6093 }
6094 Ok(out)
6095}
6096
6097pub fn plan_alter_index_options(
6098 scx: &mut StatementContext,
6099 AlterIndexStatement {
6100 index_name,
6101 if_exists,
6102 action,
6103 }: AlterIndexStatement<Aug>,
6104) -> Result<Plan, PlanError> {
6105 let object_type = ObjectType::Index;
6106 match action {
6107 AlterIndexAction::ResetOptions(options) => {
6108 let mut options = options.into_iter();
6109 if let Some(opt) = options.next() {
6110 match opt {
6111 IndexOptionName::RetainHistory => {
6112 if options.next().is_some() {
6113 sql_bail!("RETAIN HISTORY must be only option");
6114 }
6115 return alter_retain_history(
6116 scx,
6117 object_type,
6118 if_exists,
6119 UnresolvedObjectName::Item(index_name),
6120 None,
6121 );
6122 }
6123 }
6124 }
6125 sql_bail!("expected option");
6126 }
6127 AlterIndexAction::SetOptions(options) => {
6128 let mut options = options.into_iter();
6129 if let Some(opt) = options.next() {
6130 match opt.name {
6131 IndexOptionName::RetainHistory => {
6132 if options.next().is_some() {
6133 sql_bail!("RETAIN HISTORY must be only option");
6134 }
6135 return alter_retain_history(
6136 scx,
6137 object_type,
6138 if_exists,
6139 UnresolvedObjectName::Item(index_name),
6140 opt.value,
6141 );
6142 }
6143 }
6144 }
6145 sql_bail!("expected option");
6146 }
6147 }
6148}
6149
6150pub fn describe_alter_cluster_set_options(
6151 _: &StatementContext,
6152 _: AlterClusterStatement<Aug>,
6153) -> Result<StatementDesc, PlanError> {
6154 Ok(StatementDesc::new(None))
6155}
6156
6157pub fn plan_alter_cluster(
6158 scx: &mut StatementContext,
6159 AlterClusterStatement {
6160 name,
6161 action,
6162 if_exists,
6163 }: AlterClusterStatement<Aug>,
6164) -> Result<Plan, PlanError> {
6165 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6166 Some(entry) => entry,
6167 None => {
6168 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6169 name: name.to_ast_string_simple(),
6170 object_type: ObjectType::Cluster,
6171 });
6172
6173 return Ok(Plan::AlterNoop(AlterNoopPlan {
6174 object_type: ObjectType::Cluster,
6175 }));
6176 }
6177 };
6178
6179 let mut options: PlanClusterOption = Default::default();
6180 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6181
6182 match action {
6183 AlterClusterAction::SetOptions {
6184 options: set_options,
6185 with_options,
6186 } => {
6187 let ClusterOptionExtracted {
6188 availability_zones,
6189 introspection_debugging,
6190 introspection_interval,
6191 managed,
6192 replicas: replica_defs,
6193 replication_factor,
6194 seen: _,
6195 size,
6196 disk,
6197 schedule,
6198 workload_class,
6199 }: ClusterOptionExtracted = set_options.try_into()?;
6200
6201 if !scx.catalog.active_role_id().is_system() {
6202 if workload_class.is_some() {
6203 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6204 }
6205 }
6206
6207 match managed.unwrap_or_else(|| cluster.is_managed()) {
6208 true => {
6209 let alter_strategy_extracted =
6210 ClusterAlterOptionExtracted::try_from(with_options)?;
6211 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6212
6213 match alter_strategy {
6214 AlterClusterPlanStrategy::None => {}
6215 _ => {
6216 scx.require_feature_flag(
6217 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6218 )?;
6219 }
6220 }
6221
6222 if replica_defs.is_some() {
6223 sql_bail!("REPLICAS not supported for managed clusters");
6224 }
6225 if schedule.is_some()
6226 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6227 {
6228 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6229
6230 if replication_factor.is_none()
6239 && cluster.replication_factor().is_some_and(|rf| rf > 1)
6240 {
6241 sql_bail!(
6242 "SCHEDULE cannot be set to anything other than MANUAL while the \
6243 cluster's REPLICATION FACTOR is greater than 1; \
6244 set the REPLICATION FACTOR to 1 first"
6245 );
6246 }
6247 }
6248
6249 if replication_factor.is_some() {
6250 if schedule.is_some()
6251 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6252 {
6253 sql_bail!(
6254 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6255 );
6256 }
6257 if let Some(current_schedule) = cluster.schedule() {
6258 if !matches!(current_schedule, ClusterSchedule::Manual) {
6259 sql_bail!(
6260 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6261 );
6262 }
6263 }
6264 }
6265 }
6266 false => {
6267 if !alter_strategy.is_none() {
6268 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6269 }
6270 if availability_zones.is_some() {
6271 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6272 }
6273 if replication_factor.is_some() {
6274 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6275 }
6276 if introspection_debugging.is_some() {
6277 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6278 }
6279 if introspection_interval.is_some() {
6280 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6281 }
6282 if size.is_some() {
6283 sql_bail!("SIZE not supported for unmanaged clusters");
6284 }
6285 if disk.is_some() {
6286 sql_bail!("DISK not supported for unmanaged clusters");
6287 }
6288 if schedule.is_some()
6289 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6290 {
6291 sql_bail!(
6292 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6293 );
6294 }
6295 if let Some(current_schedule) = cluster.schedule() {
6296 if !matches!(current_schedule, ClusterSchedule::Manual)
6297 && schedule.is_none()
6298 {
6299 sql_bail!(
6300 "when switching a cluster to unmanaged, if the managed \
6301 cluster's SCHEDULE is anything other than MANUAL, you have to \
6302 explicitly set the SCHEDULE to MANUAL"
6303 );
6304 }
6305 }
6306 }
6307 }
6308
6309 let mut replicas = vec![];
6310 for ReplicaDefinition { name, options } in
6311 replica_defs.into_iter().flat_map(Vec::into_iter)
6312 {
6313 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6314 }
6315
6316 if let Some(managed) = managed {
6317 options.managed = AlterOptionParameter::Set(managed);
6318 }
6319 if let Some(replication_factor) = replication_factor {
6320 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6321 }
6322 if let Some(size) = &size {
6323 options.size = AlterOptionParameter::Set(size.clone());
6324 }
6325 if let Some(availability_zones) = availability_zones {
6326 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6327 }
6328 if let Some(introspection_debugging) = introspection_debugging {
6329 options.introspection_debugging =
6330 AlterOptionParameter::Set(introspection_debugging);
6331 }
6332 if let Some(introspection_interval) = introspection_interval {
6333 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6334 }
6335 if disk.is_some() {
6336 let size = match size.as_deref() {
6340 Some(s) => s,
6341 None => cluster
6342 .managed_size()
6343 .ok_or_else(|| sql_err!("cluster is not managed"))?,
6344 };
6345 if scx.catalog.is_cluster_size_cc(size) {
6346 sql_bail!(
6347 "DISK option not supported for modern cluster sizes because disk is always enabled"
6348 );
6349 }
6350
6351 scx.catalog
6352 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6353 }
6354 if !replicas.is_empty() {
6355 options.replicas = AlterOptionParameter::Set(replicas);
6356 }
6357 if let Some(schedule) = schedule {
6358 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6359 }
6360 if let Some(workload_class) = workload_class {
6361 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6362 }
6363 }
6364 AlterClusterAction::ResetOptions(reset_options) => {
6365 use AlterOptionParameter::Reset;
6366 use ClusterOptionName::*;
6367
6368 if !scx.catalog.active_role_id().is_system() {
6369 if reset_options.contains(&WorkloadClass) {
6370 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6371 }
6372 }
6373
6374 for option in reset_options {
6375 match option {
6376 AvailabilityZones => options.availability_zones = Reset,
6377 Disk => scx
6378 .catalog
6379 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6380 IntrospectionInterval => options.introspection_interval = Reset,
6381 IntrospectionDebugging => options.introspection_debugging = Reset,
6382 Managed => options.managed = Reset,
6383 Replicas => options.replicas = Reset,
6384 ReplicationFactor => options.replication_factor = Reset,
6385 Size => options.size = Reset,
6386 Schedule => options.schedule = Reset,
6387 WorkloadClass => options.workload_class = Reset,
6388 }
6389 }
6390 }
6391 }
6392 Ok(Plan::AlterCluster(AlterClusterPlan {
6393 id: cluster.id(),
6394 name: cluster.name().to_string(),
6395 options,
6396 strategy: alter_strategy,
6397 }))
6398}
6399
6400pub fn describe_alter_set_cluster(
6401 _: &StatementContext,
6402 _: AlterSetClusterStatement<Aug>,
6403) -> Result<StatementDesc, PlanError> {
6404 Ok(StatementDesc::new(None))
6405}
6406
6407pub fn plan_alter_item_set_cluster(
6408 scx: &StatementContext,
6409 AlterSetClusterStatement {
6410 if_exists,
6411 set_cluster: in_cluster_name,
6412 name,
6413 object_type,
6414 }: AlterSetClusterStatement<Aug>,
6415) -> Result<Plan, PlanError> {
6416 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6417
6418 let object_type = object_type.into();
6419
6420 match object_type {
6422 ObjectType::MaterializedView => {}
6423 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6424 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6425 }
6426 ObjectType::Table
6427 | ObjectType::View
6428 | ObjectType::Type
6429 | ObjectType::Role
6430 | ObjectType::Cluster
6431 | ObjectType::ClusterReplica
6432 | ObjectType::Secret
6433 | ObjectType::Connection
6434 | ObjectType::Database
6435 | ObjectType::Schema
6436 | ObjectType::Func
6437 | ObjectType::NetworkPolicy => {
6438 bail_never_supported!(
6439 format!("ALTER {object_type} SET CLUSTER"),
6440 "sql/alter-set-cluster/",
6441 format!("{object_type} has no associated cluster")
6442 )
6443 }
6444 }
6445
6446 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6447
6448 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6449 Some(entry) => {
6450 let current_cluster = entry.cluster_id();
6451 let Some(current_cluster) = current_cluster else {
6452 sql_bail!("No cluster associated with {name}");
6453 };
6454
6455 if current_cluster == in_cluster.id() {
6456 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6457 } else {
6458 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6459 id: entry.id(),
6460 set_cluster: in_cluster.id(),
6461 }))
6462 }
6463 }
6464 None => {
6465 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6466 name: name.to_ast_string_simple(),
6467 object_type,
6468 });
6469
6470 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6471 }
6472 }
6473}
6474
6475pub fn describe_alter_object_rename(
6476 _: &StatementContext,
6477 _: AlterObjectRenameStatement,
6478) -> Result<StatementDesc, PlanError> {
6479 Ok(StatementDesc::new(None))
6480}
6481
6482pub fn plan_alter_object_rename(
6483 scx: &mut StatementContext,
6484 AlterObjectRenameStatement {
6485 name,
6486 object_type,
6487 to_item_name,
6488 if_exists,
6489 }: AlterObjectRenameStatement,
6490) -> Result<Plan, PlanError> {
6491 let object_type = object_type.into();
6492 match (object_type, name) {
6493 (
6494 ObjectType::View
6495 | ObjectType::MaterializedView
6496 | ObjectType::Table
6497 | ObjectType::Source
6498 | ObjectType::Index
6499 | ObjectType::Sink
6500 | ObjectType::Secret
6501 | ObjectType::Connection,
6502 UnresolvedObjectName::Item(name),
6503 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6504 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6505 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6506 }
6507 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6508 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6509 }
6510 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6511 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6512 }
6513 (object_type, name) => {
6514 bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6517 }
6518 }
6519}
6520
6521pub fn plan_alter_schema_rename(
6522 scx: &mut StatementContext,
6523 name: UnresolvedSchemaName,
6524 to_schema_name: Ident,
6525 if_exists: bool,
6526) -> Result<Plan, PlanError> {
6527 let normalized = normalize::unresolved_schema_name(name.clone())?;
6531 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6532 sql_bail!(
6533 "cannot rename schemas in the ambient database: {:?}",
6534 mz_repr::namespaces::MZ_TEMP_SCHEMA
6535 );
6536 }
6537
6538 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6539 let object_type = ObjectType::Schema;
6540 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6541 name: name.to_ast_string_simple(),
6542 object_type,
6543 });
6544 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6545 };
6546
6547 if scx
6549 .resolve_schema_in_database(&db_spec, &to_schema_name)
6550 .is_ok()
6551 {
6552 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6553 to_schema_name.clone().into_string(),
6554 )));
6555 }
6556
6557 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6559 if schema.id().is_system() {
6560 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6561 }
6562
6563 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6564 cur_schema_spec: (db_spec, schema_spec),
6565 new_schema_name: to_schema_name.into_string(),
6566 }))
6567}
6568
6569pub fn plan_alter_schema_swap<F>(
6570 scx: &mut StatementContext,
6571 name_a: UnresolvedSchemaName,
6572 name_b: Ident,
6573 if_exists: bool,
6574 gen_temp_suffix: F,
6575) -> Result<Plan, PlanError>
6576where
6577 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6578{
6579 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6583 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6584 {
6585 sql_bail!("cannot swap schemas that are in the ambient database");
6586 }
6587 let name_b_str = normalize::ident_ref(&name_b);
6589 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6590 sql_bail!("cannot swap schemas that are in the ambient database");
6591 }
6592
6593 let schema_a = match scx.resolve_schema(name_a.clone()) {
6594 Ok(schema) => schema,
6595 Err(_) if if_exists => {
6596 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6597 name: name_a.to_ast_string_simple(),
6598 object_type: ObjectType::Schema,
6599 });
6600 return Ok(Plan::AlterNoop(AlterNoopPlan {
6601 object_type: ObjectType::Schema,
6602 }));
6603 }
6604 Err(e) => return Err(e),
6605 };
6606
6607 let db_spec = schema_a.database().clone();
6608 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6609 sql_bail!("cannot swap schemas that are in the ambient database");
6610 };
6611 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6612
6613 if schema_a.id().is_system() || schema_b.id().is_system() {
6615 bail_never_supported!("swapping a system schema".to_string())
6616 }
6617
6618 const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6622 let check = |temp_suffix: &str| {
6623 let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6624 temp_name.append_lossy(temp_suffix);
6625 scx.resolve_schema_in_database(&db_spec, &temp_name)
6626 .is_err()
6627 };
6628 let temp_suffix = gen_temp_suffix(&check)?;
6629 let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6630
6631 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6632 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6633 schema_a_name: schema_a.name().schema.to_string(),
6634 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6635 schema_b_name: schema_b.name().schema.to_string(),
6636 name_temp,
6637 }))
6638}
6639
6640pub fn plan_alter_item_rename(
6641 scx: &mut StatementContext,
6642 object_type: ObjectType,
6643 name: UnresolvedItemName,
6644 to_item_name: Ident,
6645 if_exists: bool,
6646) -> Result<Plan, PlanError> {
6647 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6648 Ok(r) => r,
6649 Err(PlanError::MismatchedObjectType {
6651 name,
6652 is_type: ObjectType::MaterializedView,
6653 expected_type: ObjectType::View,
6654 }) => {
6655 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6656 }
6657 e => e?,
6658 };
6659
6660 match resolved {
6661 Some(entry) => {
6662 let full_name = scx.catalog.resolve_full_name(entry.name());
6663 let item_type = entry.item_type();
6664
6665 let proposed_name = QualifiedItemName {
6666 qualifiers: entry.name().qualifiers.clone(),
6667 item: to_item_name.clone().into_string(),
6668 };
6669
6670 let conflicting_type_exists;
6674 let conflicting_item_exists;
6675 if item_type == CatalogItemType::Type {
6676 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6677 conflicting_item_exists = scx
6678 .catalog
6679 .get_item_by_name(&proposed_name)
6680 .map(|item| item.item_type().conflicts_with_type())
6681 .unwrap_or(false);
6682 } else {
6683 conflicting_type_exists = item_type.conflicts_with_type()
6684 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6685 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6686 };
6687 if conflicting_type_exists || conflicting_item_exists {
6688 sql_bail!("catalog item '{}' already exists", to_item_name);
6689 }
6690
6691 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6692 id: entry.id(),
6693 current_full_name: full_name,
6694 to_name: normalize::ident(to_item_name),
6695 object_type,
6696 }))
6697 }
6698 None => {
6699 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6700 name: name.to_ast_string_simple(),
6701 object_type,
6702 });
6703
6704 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6705 }
6706 }
6707}
6708
6709pub fn plan_alter_cluster_rename(
6710 scx: &mut StatementContext,
6711 object_type: ObjectType,
6712 name: Ident,
6713 to_name: Ident,
6714 if_exists: bool,
6715) -> Result<Plan, PlanError> {
6716 match resolve_cluster(scx, &name, if_exists)? {
6717 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6718 id: entry.id(),
6719 name: entry.name().to_string(),
6720 to_name: ident(to_name),
6721 })),
6722 None => {
6723 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6724 name: name.to_ast_string_simple(),
6725 object_type,
6726 });
6727
6728 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6729 }
6730 }
6731}
6732
6733pub fn plan_alter_cluster_swap<F>(
6734 scx: &mut StatementContext,
6735 name_a: Ident,
6736 name_b: Ident,
6737 if_exists: bool,
6738 gen_temp_suffix: F,
6739) -> Result<Plan, PlanError>
6740where
6741 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6742{
6743 let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6744 Ok(cluster) => cluster,
6745 Err(_) if if_exists => {
6746 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6747 name: name_a.to_ast_string_simple(),
6748 object_type: ObjectType::Cluster,
6749 });
6750 return Ok(Plan::AlterNoop(AlterNoopPlan {
6751 object_type: ObjectType::Cluster,
6752 }));
6753 }
6754 Err(e) => return Err(e),
6755 };
6756 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6757
6758 const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6759 let check = |temp_suffix: &str| {
6760 let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6761 temp_name.append_lossy(temp_suffix);
6762 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6763 Err(CatalogError::UnknownCluster(_)) => true,
6765 Ok(_) | Err(_) => false,
6767 }
6768 };
6769 let temp_suffix = gen_temp_suffix(&check)?;
6770 let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6771
6772 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6773 id_a: cluster_a.id(),
6774 id_b: cluster_b.id(),
6775 name_a: name_a.into_string(),
6776 name_b: name_b.into_string(),
6777 name_temp,
6778 }))
6779}
6780
6781pub fn plan_alter_cluster_replica_rename(
6782 scx: &mut StatementContext,
6783 object_type: ObjectType,
6784 name: QualifiedReplica,
6785 to_item_name: Ident,
6786 if_exists: bool,
6787) -> Result<Plan, PlanError> {
6788 match resolve_cluster_replica(scx, &name, if_exists)? {
6789 Some((cluster, replica)) => {
6790 ensure_cluster_is_not_managed(scx, cluster.id())?;
6791 Ok(Plan::AlterClusterReplicaRename(
6792 AlterClusterReplicaRenamePlan {
6793 cluster_id: cluster.id(),
6794 replica_id: replica,
6795 name: QualifiedReplica {
6796 cluster: Ident::new(cluster.name())?,
6797 replica: name.replica,
6798 },
6799 to_name: normalize::ident(to_item_name),
6800 },
6801 ))
6802 }
6803 None => {
6804 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6805 name: name.to_ast_string_simple(),
6806 object_type,
6807 });
6808
6809 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6810 }
6811 }
6812}
6813
6814pub fn describe_alter_object_swap(
6815 _: &StatementContext,
6816 _: AlterObjectSwapStatement,
6817) -> Result<StatementDesc, PlanError> {
6818 Ok(StatementDesc::new(None))
6819}
6820
6821pub fn plan_alter_object_swap(
6822 scx: &mut StatementContext,
6823 stmt: AlterObjectSwapStatement,
6824) -> Result<Plan, PlanError> {
6825 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6826
6827 let AlterObjectSwapStatement {
6828 object_type,
6829 if_exists,
6830 name_a,
6831 name_b,
6832 } = stmt;
6833 let object_type = object_type.into();
6834
6835 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6837 let mut attempts = 0;
6838 let name_temp = loop {
6839 attempts += 1;
6840 if attempts > 10 {
6841 tracing::warn!("Unable to generate temp id for swapping");
6842 sql_bail!("unable to swap!");
6843 }
6844
6845 let short_id = mz_ore::id_gen::temp_id();
6847 if check_fn(&short_id) {
6848 break short_id;
6849 }
6850 };
6851
6852 Ok(name_temp)
6853 };
6854
6855 match (object_type, name_a, name_b) {
6856 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6857 plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6858 }
6859 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6860 plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6861 }
6862 (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6863 bail_internal!("name type does not match object type for ALTER SWAP")
6864 }
6865 (
6866 ObjectType::Table
6867 | ObjectType::View
6868 | ObjectType::MaterializedView
6869 | ObjectType::Source
6870 | ObjectType::Sink
6871 | ObjectType::Index
6872 | ObjectType::Type
6873 | ObjectType::Role
6874 | ObjectType::ClusterReplica
6875 | ObjectType::Secret
6876 | ObjectType::Connection
6877 | ObjectType::Database
6878 | ObjectType::Func
6879 | ObjectType::NetworkPolicy,
6880 _,
6881 _,
6882 ) => Err(PlanError::Unsupported {
6883 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6884 discussion_no: None,
6885 }),
6886 }
6887}
6888
6889pub fn describe_alter_retain_history(
6890 _: &StatementContext,
6891 _: AlterRetainHistoryStatement<Aug>,
6892) -> Result<StatementDesc, PlanError> {
6893 Ok(StatementDesc::new(None))
6894}
6895
6896pub fn plan_alter_retain_history(
6897 scx: &StatementContext,
6898 AlterRetainHistoryStatement {
6899 object_type,
6900 if_exists,
6901 name,
6902 history,
6903 }: AlterRetainHistoryStatement<Aug>,
6904) -> Result<Plan, PlanError> {
6905 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6906}
6907
6908fn alter_retain_history(
6909 scx: &StatementContext,
6910 object_type: ObjectType,
6911 if_exists: bool,
6912 name: UnresolvedObjectName,
6913 history: Option<WithOptionValue<Aug>>,
6914) -> Result<Plan, PlanError> {
6915 let name = match (object_type, name) {
6916 (
6917 ObjectType::View
6919 | ObjectType::MaterializedView
6920 | ObjectType::Table
6921 | ObjectType::Source
6922 | ObjectType::Index,
6923 UnresolvedObjectName::Item(name),
6924 ) => name,
6925 (object_type, _) => {
6926 bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6927 }
6928 };
6929 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6930 Some(entry) => {
6931 let full_name = scx.catalog.resolve_full_name(entry.name());
6932 let item_type = entry.item_type();
6933
6934 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6936 return Err(PlanError::AlterViewOnMaterializedView(
6937 full_name.to_string(),
6938 ));
6939 } else if object_type == ObjectType::View {
6940 sql_bail!("{object_type} does not support RETAIN HISTORY")
6941 } else if object_type != item_type {
6942 sql_bail!(
6943 "\"{}\" is a {} not a {}",
6944 full_name,
6945 entry.item_type(),
6946 format!("{object_type}").to_lowercase()
6947 )
6948 }
6949
6950 let (value, lcw) = match &history {
6952 Some(WithOptionValue::RetainHistoryFor(value)) => {
6953 let window = OptionalDuration::try_from_value(value.clone())?;
6954 (Some(value.clone()), window.0)
6955 }
6956 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6958 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6959 };
6960 let window = plan_retain_history(scx, lcw)?;
6961
6962 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6963 id: entry.id(),
6964 value,
6965 window,
6966 object_type,
6967 }))
6968 }
6969 None => {
6970 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6971 name: name.to_ast_string_simple(),
6972 object_type,
6973 });
6974
6975 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6976 }
6977 }
6978}
6979
6980fn alter_source_timestamp_interval(
6981 scx: &StatementContext,
6982 if_exists: bool,
6983 source_name: UnresolvedItemName,
6984 value: Option<WithOptionValue<Aug>>,
6985) -> Result<Plan, PlanError> {
6986 let object_type = ObjectType::Source;
6987 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6988 Some(entry) => {
6989 let full_name = scx.catalog.resolve_full_name(entry.name());
6990 if entry.item_type() != CatalogItemType::Source {
6991 sql_bail!(
6992 "\"{}\" is a {} not a {}",
6993 full_name,
6994 entry.item_type(),
6995 format!("{object_type}").to_lowercase()
6996 )
6997 }
6998
6999 match value {
7000 Some(val) => {
7001 let val = match val {
7002 WithOptionValue::Value(v) => v,
7003 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
7004 };
7005 let duration = Duration::try_from_value(val.clone())?;
7006
7007 let min = scx.catalog.system_vars().min_timestamp_interval();
7008 let max = scx.catalog.system_vars().max_timestamp_interval();
7009 if duration < min || duration > max {
7010 return Err(PlanError::InvalidTimestampInterval {
7011 min,
7012 max,
7013 requested: duration,
7014 });
7015 }
7016
7017 Ok(Plan::AlterSourceTimestampInterval(
7018 AlterSourceTimestampIntervalPlan {
7019 id: entry.id(),
7020 value: Some(val),
7021 interval: duration,
7022 },
7023 ))
7024 }
7025 None => {
7026 let interval = scx.catalog.system_vars().default_timestamp_interval();
7027 Ok(Plan::AlterSourceTimestampInterval(
7028 AlterSourceTimestampIntervalPlan {
7029 id: entry.id(),
7030 value: None,
7031 interval,
7032 },
7033 ))
7034 }
7035 }
7036 }
7037 None => {
7038 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7039 name: source_name.to_ast_string_simple(),
7040 object_type,
7041 });
7042
7043 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7044 }
7045 }
7046}
7047
7048pub fn describe_alter_secret_options(
7049 _: &StatementContext,
7050 _: AlterSecretStatement<Aug>,
7051) -> Result<StatementDesc, PlanError> {
7052 Ok(StatementDesc::new(None))
7053}
7054
7055pub fn plan_alter_secret(
7056 scx: &mut StatementContext,
7057 stmt: AlterSecretStatement<Aug>,
7058) -> Result<Plan, PlanError> {
7059 let AlterSecretStatement {
7060 name,
7061 if_exists,
7062 value,
7063 } = stmt;
7064 let object_type = ObjectType::Secret;
7065 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7066 Some(entry) => entry.id(),
7067 None => {
7068 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7069 name: name.to_string(),
7070 object_type,
7071 });
7072
7073 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7074 }
7075 };
7076
7077 let secret_as = query::plan_secret_as(scx, value)?;
7078
7079 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7080}
7081
7082pub fn describe_alter_connection(
7083 _: &StatementContext,
7084 _: AlterConnectionStatement<Aug>,
7085) -> Result<StatementDesc, PlanError> {
7086 Ok(StatementDesc::new(None))
7087}
7088
7089generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7090
7091pub fn plan_alter_connection(
7092 scx: &StatementContext,
7093 stmt: AlterConnectionStatement<Aug>,
7094) -> Result<Plan, PlanError> {
7095 let AlterConnectionStatement {
7096 name,
7097 if_exists,
7098 actions,
7099 with_options,
7100 } = stmt;
7101 let conn_name = normalize::unresolved_item_name(name)?;
7102 let entry = match scx.catalog.resolve_item(&conn_name) {
7103 Ok(entry) => entry,
7104 Err(_) if if_exists => {
7105 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7106 name: conn_name.to_string(),
7107 object_type: ObjectType::Connection,
7108 });
7109
7110 return Ok(Plan::AlterNoop(AlterNoopPlan {
7111 object_type: ObjectType::Connection,
7112 }));
7113 }
7114 Err(e) => return Err(e.into()),
7115 };
7116
7117 let connection = entry.connection()?;
7118
7119 if actions
7120 .iter()
7121 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7122 {
7123 if actions.len() > 1 {
7124 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7125 }
7126
7127 if !with_options.is_empty() {
7128 sql_bail!(
7129 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7130 with_options
7131 .iter()
7132 .map(|o| o.to_ast_string_simple())
7133 .join(", ")
7134 );
7135 }
7136
7137 if !matches!(connection, Connection::Ssh(_)) {
7138 sql_bail!(
7139 "{} is not an SSH connection",
7140 scx.catalog.resolve_full_name(entry.name())
7141 )
7142 }
7143
7144 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7145 id: entry.id(),
7146 action: crate::plan::AlterConnectionAction::RotateKeys,
7147 }));
7148 }
7149
7150 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7151 if options.validate.is_some() {
7152 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7153 }
7154
7155 let validate = match options.validate {
7156 Some(val) => val,
7157 None => {
7158 scx.catalog
7159 .system_vars()
7160 .enable_default_connection_validation()
7161 && connection.validate_by_default()
7162 }
7163 };
7164
7165 let connection_type = match connection {
7166 Connection::Aws(_) => CreateConnectionType::Aws,
7167 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7168 Connection::Gcp(_) => CreateConnectionType::Gcp,
7169 Connection::Kafka(_) => CreateConnectionType::Kafka,
7170 Connection::Csr(_) => CreateConnectionType::Csr,
7171 Connection::GlueSchemaRegistry(_) => CreateConnectionType::GlueSchemaRegistry,
7172 Connection::Postgres(_) => CreateConnectionType::Postgres,
7173 Connection::Ssh(_) => CreateConnectionType::Ssh,
7174 Connection::MySql(_) => CreateConnectionType::MySql,
7175 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7176 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7177 };
7178
7179 let specified_options: BTreeSet<_> = actions
7181 .iter()
7182 .map(|action: &AlterConnectionAction<Aug>| match action {
7183 AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7184 AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7185 AlterConnectionAction::RotateKeys => {
7186 Err(internal_err!("RotateKeys is handled separately above"))
7187 }
7188 })
7189 .collect::<Result<_, PlanError>>()?;
7190
7191 for invalid in INALTERABLE_OPTIONS {
7192 if specified_options.contains(invalid) {
7193 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7194 }
7195 }
7196
7197 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7198
7199 let mut set_options_vec: Vec<_> = Vec::new();
7201 let mut drop_options: BTreeSet<_> = BTreeSet::new();
7202 for action in actions {
7203 match action {
7204 AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7205 AlterConnectionAction::DropOption(name) => {
7206 drop_options.insert(name);
7207 }
7208 AlterConnectionAction::RotateKeys => {
7209 bail_internal!("RotateKeys is handled separately above")
7210 }
7211 }
7212 }
7213
7214 let set_options: BTreeMap<_, _> = set_options_vec
7215 .clone()
7216 .into_iter()
7217 .map(|option| (option.name, option.value))
7218 .collect();
7219
7220 let connection_options_extracted =
7224 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7225
7226 let duplicates: Vec<_> = connection_options_extracted
7227 .seen
7228 .intersection(&drop_options)
7229 .collect();
7230
7231 if !duplicates.is_empty() {
7232 sql_bail!(
7233 "cannot both SET and DROP/RESET options {}",
7234 duplicates
7235 .iter()
7236 .map(|option| option.to_string())
7237 .join(", ")
7238 )
7239 }
7240
7241 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7242 let set_options_count = mutually_exclusive_options
7243 .iter()
7244 .filter(|o| set_options.contains_key(o))
7245 .count();
7246 let drop_options_count = mutually_exclusive_options
7247 .iter()
7248 .filter(|o| drop_options.contains(o))
7249 .count();
7250
7251 if set_options_count > 0 && drop_options_count > 0 {
7253 sql_bail!(
7254 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7255 connection_type,
7256 mutually_exclusive_options
7257 .iter()
7258 .map(|option| option.to_string())
7259 .join(", ")
7260 )
7261 }
7262
7263 if set_options_count > 0 || drop_options_count > 0 {
7268 drop_options.extend(mutually_exclusive_options.iter().cloned());
7269 }
7270
7271 }
7274
7275 Ok(Plan::AlterConnection(AlterConnectionPlan {
7276 id: entry.id(),
7277 action: crate::plan::AlterConnectionAction::AlterOptions {
7278 set_options,
7279 drop_options,
7280 validate,
7281 },
7282 }))
7283}
7284
7285pub fn describe_alter_sink(
7286 _: &StatementContext,
7287 _: AlterSinkStatement<Aug>,
7288) -> Result<StatementDesc, PlanError> {
7289 Ok(StatementDesc::new(None))
7290}
7291
7292pub fn plan_alter_sink(
7293 scx: &mut StatementContext,
7294 stmt: AlterSinkStatement<Aug>,
7295) -> Result<Plan, PlanError> {
7296 let AlterSinkStatement {
7297 sink_name,
7298 if_exists,
7299 action,
7300 } = stmt;
7301
7302 let object_type = ObjectType::Sink;
7303 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7304
7305 let Some(item) = item else {
7306 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7307 name: sink_name.to_string(),
7308 object_type,
7309 });
7310
7311 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7312 };
7313 let item = item.at_version(RelationVersionSelector::Latest);
7315
7316 match action {
7317 AlterSinkAction::ChangeRelation(new_from) => {
7318 let create_sql = item.create_sql();
7320 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7321 let [stmt]: [StatementParseResult; 1] = stmts
7322 .try_into()
7323 .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7324 let Statement::CreateSink(stmt) = stmt.ast else {
7325 bail_internal!("create SQL of sink is not a CREATE SINK statement");
7326 };
7327
7328 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7330 stmt.from = new_from;
7331
7332 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7334 bail_internal!("plan_sink did not produce a CreateSink plan");
7335 };
7336
7337 plan.sink.version += 1;
7338
7339 Ok(Plan::AlterSink(AlterSinkPlan {
7340 item_id: item.id(),
7341 global_id: item.global_id(),
7342 sink: plan.sink,
7343 with_snapshot: plan.with_snapshot,
7344 in_cluster: plan.in_cluster,
7345 }))
7346 }
7347 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7348 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7349 }
7350}
7351
7352pub fn describe_alter_source(
7353 _: &StatementContext,
7354 _: AlterSourceStatement<Aug>,
7355) -> Result<StatementDesc, PlanError> {
7356 Ok(StatementDesc::new(None))
7358}
7359
7360generate_extracted_config!(
7361 AlterSourceAddSubsourceOption,
7362 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7363 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7364 (Details, String)
7365);
7366
7367pub fn plan_alter_source(
7368 scx: &mut StatementContext,
7369 stmt: AlterSourceStatement<Aug>,
7370) -> Result<Plan, PlanError> {
7371 let AlterSourceStatement {
7372 source_name,
7373 if_exists,
7374 action,
7375 } = stmt;
7376 let object_type = ObjectType::Source;
7377
7378 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7379 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7380 name: source_name.to_string(),
7381 object_type,
7382 });
7383
7384 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7385 }
7386
7387 match action {
7388 AlterSourceAction::SetOptions(options) => {
7389 let mut options = options.into_iter();
7390 let option = options
7391 .next()
7392 .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7393 if option.name == CreateSourceOptionName::RetainHistory {
7394 if options.next().is_some() {
7395 sql_bail!("RETAIN HISTORY must be only option");
7396 }
7397 return alter_retain_history(
7398 scx,
7399 object_type,
7400 if_exists,
7401 UnresolvedObjectName::Item(source_name),
7402 option.value,
7403 );
7404 }
7405 if option.name == CreateSourceOptionName::TimestampInterval {
7406 if options.next().is_some() {
7407 sql_bail!("TIMESTAMP INTERVAL must be only option");
7408 }
7409 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7410 }
7411 sql_bail!(
7414 "Cannot modify the {} of a SOURCE.",
7415 option.name.to_ast_string_simple()
7416 );
7417 }
7418 AlterSourceAction::ResetOptions(reset) => {
7419 let mut options = reset.into_iter();
7420 let option = options
7421 .next()
7422 .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7423 if option == CreateSourceOptionName::RetainHistory {
7424 if options.next().is_some() {
7425 sql_bail!("RETAIN HISTORY must be only option");
7426 }
7427 return alter_retain_history(
7428 scx,
7429 object_type,
7430 if_exists,
7431 UnresolvedObjectName::Item(source_name),
7432 None,
7433 );
7434 }
7435 if option == CreateSourceOptionName::TimestampInterval {
7436 if options.next().is_some() {
7437 sql_bail!("TIMESTAMP INTERVAL must be only option");
7438 }
7439 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7440 }
7441 sql_bail!(
7442 "Cannot modify the {} of a SOURCE.",
7443 option.to_ast_string_simple()
7444 );
7445 }
7446 AlterSourceAction::DropSubsources { .. } => {
7447 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7448 }
7449 AlterSourceAction::AddSubsources { .. } => {
7450 sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7451 }
7452 AlterSourceAction::RefreshReferences => {
7453 sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7454 }
7455 };
7456}
7457
7458pub fn describe_alter_system_set(
7459 _: &StatementContext,
7460 _: AlterSystemSetStatement,
7461) -> Result<StatementDesc, PlanError> {
7462 Ok(StatementDesc::new(None))
7463}
7464
7465pub fn plan_alter_system_set(
7466 _: &StatementContext,
7467 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7468) -> Result<Plan, PlanError> {
7469 let name = name.to_string();
7470 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7471 name,
7472 value: scl::plan_set_variable_to(to)?,
7473 }))
7474}
7475
7476pub fn describe_alter_system_reset(
7477 _: &StatementContext,
7478 _: AlterSystemResetStatement,
7479) -> Result<StatementDesc, PlanError> {
7480 Ok(StatementDesc::new(None))
7481}
7482
7483pub fn plan_alter_system_reset(
7484 _: &StatementContext,
7485 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7486) -> Result<Plan, PlanError> {
7487 let name = name.to_string();
7488 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7489}
7490
7491pub fn describe_alter_system_reset_all(
7492 _: &StatementContext,
7493 _: AlterSystemResetAllStatement,
7494) -> Result<StatementDesc, PlanError> {
7495 Ok(StatementDesc::new(None))
7496}
7497
7498pub fn plan_alter_system_reset_all(
7499 _: &StatementContext,
7500 _: AlterSystemResetAllStatement,
7501) -> Result<Plan, PlanError> {
7502 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7503}
7504
7505pub fn describe_alter_role(
7506 _: &StatementContext,
7507 _: AlterRoleStatement<Aug>,
7508) -> Result<StatementDesc, PlanError> {
7509 Ok(StatementDesc::new(None))
7510}
7511
7512pub fn plan_alter_role(
7513 scx: &StatementContext,
7514 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7515) -> Result<Plan, PlanError> {
7516 let option = match option {
7517 AlterRoleOption::Attributes(attrs) => {
7518 let attrs = plan_role_attributes(attrs, scx)?;
7519 PlannedAlterRoleOption::Attributes(attrs)
7520 }
7521 AlterRoleOption::Variable(variable) => {
7522 let var = plan_role_variable(scx, variable)?;
7523 PlannedAlterRoleOption::Variable(var)
7524 }
7525 };
7526
7527 Ok(Plan::AlterRole(AlterRolePlan {
7528 id: name.id,
7529 name: name.name,
7530 option,
7531 }))
7532}
7533
7534pub fn describe_alter_table_add_column(
7535 _: &StatementContext,
7536 _: AlterTableAddColumnStatement<Aug>,
7537) -> Result<StatementDesc, PlanError> {
7538 Ok(StatementDesc::new(None))
7539}
7540
7541pub fn plan_alter_table_add_column(
7542 scx: &StatementContext,
7543 stmt: AlterTableAddColumnStatement<Aug>,
7544) -> Result<Plan, PlanError> {
7545 let AlterTableAddColumnStatement {
7546 if_exists,
7547 name,
7548 if_col_not_exist,
7549 column_name,
7550 data_type,
7551 } = stmt;
7552 let object_type = ObjectType::Table;
7553
7554 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7555
7556 let (relation_id, item_name, desc) =
7557 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7558 Some(item) => {
7559 let item_name = scx.catalog.resolve_full_name(item.name());
7561 let item = item.at_version(RelationVersionSelector::Latest);
7562 let desc = item
7563 .relation_desc()
7564 .ok_or_else(|| sql_err!("item does not have a relation description"))?
7565 .into_owned();
7566 (item.id(), item_name, desc)
7567 }
7568 None => {
7569 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7570 name: name.to_ast_string_simple(),
7571 object_type,
7572 });
7573 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7574 }
7575 };
7576
7577 let column_name = ColumnName::from(column_name.as_str());
7578 if desc.get_by_name(&column_name).is_some() {
7579 if if_col_not_exist {
7580 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7581 column_name: column_name.to_string(),
7582 object_name: item_name.item,
7583 });
7584 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7585 } else {
7586 return Err(PlanError::ColumnAlreadyExists {
7587 column_name,
7588 object_name: item_name.item,
7589 });
7590 }
7591 }
7592
7593 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7594 let column_type = scalar_type.nullable(true);
7596 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7598
7599 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7600 relation_id,
7601 column_name,
7602 column_type,
7603 raw_sql_type,
7604 }))
7605}
7606
7607pub fn describe_alter_materialized_view_apply_replacement(
7608 _: &StatementContext,
7609 _: AlterMaterializedViewApplyReplacementStatement,
7610) -> Result<StatementDesc, PlanError> {
7611 Ok(StatementDesc::new(None))
7612}
7613
7614pub fn plan_alter_materialized_view_apply_replacement(
7615 scx: &StatementContext,
7616 stmt: AlterMaterializedViewApplyReplacementStatement,
7617) -> Result<Plan, PlanError> {
7618 let AlterMaterializedViewApplyReplacementStatement {
7619 if_exists,
7620 name,
7621 replacement_name,
7622 } = stmt;
7623
7624 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7625
7626 let object_type = ObjectType::MaterializedView;
7627 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7628 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7629 name: name.to_ast_string_simple(),
7630 object_type,
7631 });
7632 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7633 };
7634
7635 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7636 .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7637
7638 if replacement.replacement_target() != Some(mv.id()) {
7639 return Err(PlanError::InvalidReplacement {
7640 item_type: mv.item_type(),
7641 item_name: scx.catalog.minimal_qualification(mv.name()),
7642 replacement_type: replacement.item_type(),
7643 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7644 });
7645 }
7646
7647 Ok(Plan::AlterMaterializedViewApplyReplacement(
7648 AlterMaterializedViewApplyReplacementPlan {
7649 id: mv.id(),
7650 replacement_id: replacement.id(),
7651 },
7652 ))
7653}
7654
7655pub fn describe_comment(
7656 _: &StatementContext,
7657 _: CommentStatement<Aug>,
7658) -> Result<StatementDesc, PlanError> {
7659 Ok(StatementDesc::new(None))
7660}
7661
7662pub fn plan_comment(
7663 scx: &mut StatementContext,
7664 stmt: CommentStatement<Aug>,
7665) -> Result<Plan, PlanError> {
7666 const MAX_COMMENT_LENGTH: usize = 1024;
7667
7668 let CommentStatement { object, comment } = stmt;
7669
7670 if let Some(c) = &comment {
7672 if c.len() > 1024 {
7673 return Err(PlanError::CommentTooLong {
7674 length: c.len(),
7675 max_size: MAX_COMMENT_LENGTH,
7676 });
7677 }
7678 }
7679
7680 let (object_id, column_pos) = match &object {
7681 com_ty @ CommentObjectType::Table { name }
7682 | com_ty @ CommentObjectType::View { name }
7683 | com_ty @ CommentObjectType::MaterializedView { name }
7684 | com_ty @ CommentObjectType::Index { name }
7685 | com_ty @ CommentObjectType::Func { name }
7686 | com_ty @ CommentObjectType::Connection { name }
7687 | com_ty @ CommentObjectType::Source { name }
7688 | com_ty @ CommentObjectType::Sink { name }
7689 | com_ty @ CommentObjectType::Secret { name } => {
7690 let item = scx.get_item_by_resolved_name(name)?;
7691 match (com_ty, item.item_type()) {
7692 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7693 (CommentObjectId::Table(item.id()), None)
7694 }
7695 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7696 (CommentObjectId::View(item.id()), None)
7697 }
7698 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7699 (CommentObjectId::MaterializedView(item.id()), None)
7700 }
7701 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7702 (CommentObjectId::Index(item.id()), None)
7703 }
7704 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7705 (CommentObjectId::Func(item.id()), None)
7706 }
7707 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7708 (CommentObjectId::Connection(item.id()), None)
7709 }
7710 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7711 (CommentObjectId::Source(item.id()), None)
7712 }
7713 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7714 (CommentObjectId::Sink(item.id()), None)
7715 }
7716 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7717 (CommentObjectId::Secret(item.id()), None)
7718 }
7719 (com_ty, cat_ty) => {
7720 let expected_type = match com_ty {
7721 CommentObjectType::Table { .. } => ObjectType::Table,
7722 CommentObjectType::View { .. } => ObjectType::View,
7723 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7724 CommentObjectType::Index { .. } => ObjectType::Index,
7725 CommentObjectType::Func { .. } => ObjectType::Func,
7726 CommentObjectType::Connection { .. } => ObjectType::Connection,
7727 CommentObjectType::Source { .. } => ObjectType::Source,
7728 CommentObjectType::Sink { .. } => ObjectType::Sink,
7729 CommentObjectType::Secret { .. } => ObjectType::Secret,
7730 _ => sql_bail!("cannot comment on this object type"),
7731 };
7732
7733 return Err(PlanError::InvalidObjectType {
7734 expected_type: SystemObjectType::Object(expected_type),
7735 actual_type: SystemObjectType::Object(cat_ty.into()),
7736 object_name: item.name().item.clone(),
7737 });
7738 }
7739 }
7740 }
7741 CommentObjectType::Type { ty } => match ty {
7742 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7743 sql_bail!("cannot comment on anonymous list or map type");
7744 }
7745 ResolvedDataType::Named { id, modifiers, .. } => {
7746 if !modifiers.is_empty() {
7747 sql_bail!("cannot comment on type with modifiers");
7748 }
7749 (CommentObjectId::Type(*id), None)
7750 }
7751 ResolvedDataType::Error => bail_internal!("unresolved data type"),
7752 },
7753 CommentObjectType::Column { name } => {
7754 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7755 match item.item_type() {
7756 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7757 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7758 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7759 CatalogItemType::MaterializedView => {
7760 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7761 }
7762 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7763 r => {
7764 return Err(PlanError::Unsupported {
7765 feature: format!("Specifying comments on a column of {r}"),
7766 discussion_no: None,
7767 });
7768 }
7769 }
7770 }
7771 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7772 CommentObjectType::Database { name } => {
7773 (CommentObjectId::Database(*name.database_id()), None)
7774 }
7775 CommentObjectType::Schema { name } => {
7776 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7780 sql_bail!(
7781 "cannot comment on schema {} because it is a temporary schema",
7782 mz_repr::namespaces::MZ_TEMP_SCHEMA
7783 );
7784 }
7785 (
7786 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7787 None,
7788 )
7789 }
7790 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7791 CommentObjectType::ClusterReplica { name } => {
7792 let replica = scx.catalog.resolve_cluster_replica(name)?;
7793 (
7794 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7795 None,
7796 )
7797 }
7798 CommentObjectType::NetworkPolicy { name } => {
7799 (CommentObjectId::NetworkPolicy(name.id), None)
7800 }
7801 };
7802
7803 if let Some(p) = column_pos {
7809 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7810 max_num_columns: MAX_NUM_COLUMNS,
7811 req_num_columns: p,
7812 })?;
7813 }
7814
7815 Ok(Plan::Comment(CommentPlan {
7816 object_id,
7817 sub_component: column_pos,
7818 comment,
7819 }))
7820}
7821
7822pub(crate) fn resolve_cluster<'a>(
7823 scx: &'a StatementContext,
7824 name: &'a Ident,
7825 if_exists: bool,
7826) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7827 match scx.resolve_cluster(Some(name)) {
7828 Ok(cluster) => Ok(Some(cluster)),
7829 Err(_) if if_exists => Ok(None),
7830 Err(e) => Err(e),
7831 }
7832}
7833
7834pub(crate) fn resolve_cluster_replica<'a>(
7835 scx: &'a StatementContext,
7836 name: &QualifiedReplica,
7837 if_exists: bool,
7838) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7839 match scx.resolve_cluster(Some(&name.cluster)) {
7840 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7841 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7842 None if if_exists => Ok(None),
7843 None => Err(sql_err!(
7844 "CLUSTER {} has no CLUSTER REPLICA named {}",
7845 cluster.name(),
7846 name.replica.as_str().quoted(),
7847 )),
7848 },
7849 Err(_) if if_exists => Ok(None),
7850 Err(e) => Err(e),
7851 }
7852}
7853
7854pub(crate) fn resolve_database<'a>(
7855 scx: &'a StatementContext,
7856 name: &'a UnresolvedDatabaseName,
7857 if_exists: bool,
7858) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7859 match scx.resolve_database(name) {
7860 Ok(database) => Ok(Some(database)),
7861 Err(_) if if_exists => Ok(None),
7862 Err(e) => Err(e),
7863 }
7864}
7865
7866pub(crate) fn resolve_schema<'a>(
7867 scx: &'a StatementContext,
7868 name: UnresolvedSchemaName,
7869 if_exists: bool,
7870) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7871 match scx.resolve_schema(name) {
7872 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7873 Err(_) if if_exists => Ok(None),
7874 Err(e) => Err(e),
7875 }
7876}
7877
7878pub(crate) fn resolve_network_policy<'a>(
7879 scx: &'a StatementContext,
7880 name: Ident,
7881 if_exists: bool,
7882) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7883 match scx.catalog.resolve_network_policy(&name.to_string()) {
7884 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7885 id: policy.id(),
7886 name: policy.name().to_string(),
7887 })),
7888 Err(_) if if_exists => Ok(None),
7889 Err(e) => Err(e.into()),
7890 }
7891}
7892
7893pub(crate) fn resolve_item_or_type<'a>(
7894 scx: &'a StatementContext,
7895 object_type: ObjectType,
7896 name: UnresolvedItemName,
7897 if_exists: bool,
7898) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7899 let name = normalize::unresolved_item_name(name)?;
7900 let catalog_item = match object_type {
7901 ObjectType::Type => scx.catalog.resolve_type(&name),
7902 ObjectType::Table
7903 | ObjectType::View
7904 | ObjectType::MaterializedView
7905 | ObjectType::Source
7906 | ObjectType::Sink
7907 | ObjectType::Index
7908 | ObjectType::Role
7909 | ObjectType::Cluster
7910 | ObjectType::ClusterReplica
7911 | ObjectType::Secret
7912 | ObjectType::Connection
7913 | ObjectType::Database
7914 | ObjectType::Schema
7915 | ObjectType::Func
7916 | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7917 };
7918
7919 match catalog_item {
7920 Ok(item) => {
7921 let is_type = ObjectType::from(item.item_type());
7922 if object_type == is_type {
7923 Ok(Some(item))
7924 } else {
7925 Err(PlanError::MismatchedObjectType {
7926 name: scx.catalog.minimal_qualification(item.name()),
7927 is_type,
7928 expected_type: object_type,
7929 })
7930 }
7931 }
7932 Err(_) if if_exists => Ok(None),
7933 Err(e) => Err(e.into()),
7934 }
7935}
7936
7937fn ensure_cluster_is_not_managed(
7939 scx: &StatementContext,
7940 cluster_id: ClusterId,
7941) -> Result<(), PlanError> {
7942 let cluster = scx.catalog.get_cluster(cluster_id);
7943 if cluster.is_managed() {
7944 Err(PlanError::ManagedCluster {
7945 cluster_name: cluster.name().to_string(),
7946 })
7947 } else {
7948 Ok(())
7949 }
7950}