1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_arrow_util::builder::ArrowBuilder;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::str::StrExt;
32use mz_ore::{soft_assert_or_log, soft_panic_or_log};
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
59 CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
60 CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
61 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
62 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
63 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
64 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
65 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
66 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
67 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
68 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
69 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
70 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
71 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
72 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
73 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
74 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
75 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
76 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
77 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
78 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
79 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
80 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
81 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
82 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
90 SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
91};
92use mz_storage_types::sources::encoding::{
93 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94 SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110 PostgresSourceConnection, PostgresSourcePublicationDetails,
111 ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
120 SqlServerSourceExtras, Timeline,
121};
122use mz_storage_types::wire_format::WireFormat;
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
141};
142use crate::plan::scope::Scope;
143use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
144use crate::plan::statement::{StatementContext, StatementDesc, scl};
145use crate::plan::typeconv::CastContext;
146use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
147use crate::plan::{
148 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
149 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
150 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
151 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
152 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
153 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
154 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
155 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
156 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
157 CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
158 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
159 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
160 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
161 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
162 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
163 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
164 WebhookValidation, literal, plan_utils, query, transform_ast,
165};
166use crate::session::vars::{
167 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
168 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
169 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS, VarInput,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181const MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60);
182const MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
183
184static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
185 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
186
187fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
191 if partition_by.len() > desc.len() {
192 tracing::error!(
193 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
194 desc.len(),
195 partition_by.len()
196 );
197 partition_by.truncate(desc.len());
198 }
199
200 let desc_prefix = desc.iter().take(partition_by.len());
201 for (idx, ((desc_name, desc_type), partition_name)) in
202 desc_prefix.zip_eq(partition_by).enumerate()
203 {
204 let partition_name = normalize::column_name(partition_name);
205 if *desc_name != partition_name {
206 sql_bail!(
207 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
208 );
209 }
210 if !preserves_order(&desc_type.scalar_type) {
211 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
212 }
213 }
214 Ok(())
215}
216
217pub fn describe_create_database(
218 _: &StatementContext,
219 _: CreateDatabaseStatement,
220) -> Result<StatementDesc, PlanError> {
221 Ok(StatementDesc::new(None))
222}
223
224pub fn plan_create_database(
225 _: &StatementContext,
226 CreateDatabaseStatement {
227 name,
228 if_not_exists,
229 }: CreateDatabaseStatement,
230) -> Result<Plan, PlanError> {
231 Ok(Plan::CreateDatabase(CreateDatabasePlan {
232 name: normalize::ident(name.0),
233 if_not_exists,
234 }))
235}
236
237pub fn describe_create_schema(
238 _: &StatementContext,
239 _: CreateSchemaStatement,
240) -> Result<StatementDesc, PlanError> {
241 Ok(StatementDesc::new(None))
242}
243
244pub fn plan_create_schema(
245 scx: &StatementContext,
246 CreateSchemaStatement {
247 mut name,
248 if_not_exists,
249 }: CreateSchemaStatement,
250) -> Result<Plan, PlanError> {
251 if name.0.len() > 2 {
252 sql_bail!("schema name {} has more than two components", name);
253 }
254 let schema_name = normalize::ident(
255 name.0
256 .pop()
257 .expect("names always have at least one component"),
258 );
259 let database_spec = match name.0.pop() {
260 None => match scx.catalog.active_database() {
261 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
262 None => sql_bail!("no database specified and no active database"),
263 },
264 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
265 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
266 Err(_) => sql_bail!("invalid database {}", n.as_str()),
267 },
268 };
269 Ok(Plan::CreateSchema(CreateSchemaPlan {
270 database_spec,
271 schema_name,
272 if_not_exists,
273 }))
274}
275
276pub fn describe_create_table(
277 _: &StatementContext,
278 _: CreateTableStatement<Aug>,
279) -> Result<StatementDesc, PlanError> {
280 Ok(StatementDesc::new(None))
281}
282
283pub fn plan_create_table(
284 scx: &StatementContext,
285 stmt: CreateTableStatement<Aug>,
286) -> Result<Plan, PlanError> {
287 let CreateTableStatement {
288 name,
289 columns,
290 constraints,
291 if_not_exists,
292 temporary,
293 with_options,
294 } = &stmt;
295
296 let names: Vec<_> = columns
297 .iter()
298 .filter(|c| {
299 let is_versioned = c
303 .options
304 .iter()
305 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
306 !is_versioned
307 })
308 .map(|c| normalize::column_name(c.name.clone()))
309 .collect();
310
311 if let Some(dup) = names.iter().duplicates().next() {
312 sql_bail!("column {} specified more than once", dup.quoted());
313 }
314
315 let mut column_types = Vec::with_capacity(columns.len());
318 let mut defaults = Vec::with_capacity(columns.len());
319 let mut changes = BTreeMap::new();
320 let mut keys = Vec::new();
321
322 for (i, c) in columns.into_iter().enumerate() {
323 let aug_data_type = &c.data_type;
324 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
325 let mut nullable = true;
326 let mut default = Expr::null();
327 let mut versioned = false;
328 for option in &c.options {
329 match &option.option {
330 ColumnOption::NotNull => nullable = false,
331 ColumnOption::Default(expr) => {
332 let mut expr = expr.clone();
335 transform_ast::transform(scx, &mut expr)?;
336 let _ = query::plan_default_expr(scx, &expr, &ty)?;
337 default = expr.clone();
338 }
339 ColumnOption::Unique { is_primary } => {
340 keys.push(vec![i]);
341 if *is_primary {
342 nullable = false;
343 }
344 }
345 ColumnOption::Versioned { action, version } => {
346 let version = RelationVersion::from(*version);
347 versioned = true;
348
349 let name = normalize::column_name(c.name.clone());
350 let typ = ty.clone().nullable(nullable);
351
352 changes.insert(version, (action.clone(), name, typ));
353 }
354 other => {
355 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
356 }
357 }
358 }
359 if !versioned {
362 column_types.push(ty.nullable(nullable));
363 }
364 defaults.push(default);
365 }
366
367 let mut seen_primary = false;
368 'c: for constraint in constraints {
369 match constraint {
370 TableConstraint::Unique {
371 name: _,
372 columns,
373 is_primary,
374 nulls_not_distinct,
375 } => {
376 if seen_primary && *is_primary {
377 sql_bail!(
378 "multiple primary keys for table {} are not allowed",
379 name.to_ast_string_stable()
380 );
381 }
382 seen_primary = *is_primary || seen_primary;
383
384 let mut key = vec![];
385 for column in columns {
386 let column = normalize::column_name(column.clone());
387 match names.iter().position(|name| *name == column) {
388 None => sql_bail!("unknown column in constraint: {}", column),
389 Some(i) => {
390 let nullable = &mut column_types[i].nullable;
391 if *is_primary {
392 if *nulls_not_distinct {
393 sql_bail!(
394 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
395 );
396 }
397
398 *nullable = false;
399 } else if !(*nulls_not_distinct || !*nullable) {
400 break 'c;
403 }
404
405 key.push(i);
406 }
407 }
408 }
409
410 if *is_primary {
411 keys.insert(0, key);
412 } else {
413 keys.push(key);
414 }
415 }
416 TableConstraint::ForeignKey { .. } => {
417 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
420 }
421 TableConstraint::Check { .. } => {
422 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
425 }
426 }
427 }
428
429 if !keys.is_empty() {
430 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
433 }
434
435 let typ = SqlRelationType::new(column_types).with_keys(keys);
436
437 let temporary = *temporary;
438 let name = if temporary {
439 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
440 } else {
441 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
442 };
443
444 let full_name = scx.catalog.resolve_full_name(&name);
446 let partial_name = PartialItemName::from(full_name.clone());
447 if let (false, Ok(item)) = (
450 if_not_exists,
451 scx.catalog.resolve_item_or_type(&partial_name),
452 ) {
453 return Err(PlanError::ItemAlreadyExists {
454 name: full_name.to_string(),
455 item_type: item.item_type(),
456 });
457 }
458
459 let desc = RelationDesc::new(typ, names);
460 let mut desc = VersionedRelationDesc::new(desc);
461 for (version, (_action, name, typ)) in changes.into_iter() {
462 let new_version = desc.add_column(name, typ);
463 if version != new_version {
464 return Err(PlanError::InvalidTable {
465 name: full_name.item,
466 });
467 }
468 }
469
470 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
471
472 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
478 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
479
480 let compaction_window = options.iter().find_map(|o| {
481 #[allow(irrefutable_let_patterns)]
482 if let crate::plan::TableOption::RetainHistory(lcw) = o {
483 Some(lcw.clone())
484 } else {
485 None
486 }
487 });
488
489 let table = Table {
490 create_sql,
491 desc,
492 temporary,
493 compaction_window,
494 data_source: TableDataSource::TableWrites { defaults },
495 };
496 Ok(Plan::CreateTable(CreateTablePlan {
497 name,
498 table,
499 if_not_exists: *if_not_exists,
500 }))
501}
502
503pub fn describe_create_table_from_source(
504 _: &StatementContext,
505 _: CreateTableFromSourceStatement<Aug>,
506) -> Result<StatementDesc, PlanError> {
507 Ok(StatementDesc::new(None))
508}
509
510pub fn describe_create_webhook_source(
511 _: &StatementContext,
512 _: CreateWebhookSourceStatement<Aug>,
513) -> Result<StatementDesc, PlanError> {
514 Ok(StatementDesc::new(None))
515}
516
517pub fn describe_create_source(
518 _: &StatementContext,
519 _: CreateSourceStatement<Aug>,
520) -> Result<StatementDesc, PlanError> {
521 Ok(StatementDesc::new(None))
522}
523
524pub fn describe_create_subsource(
525 _: &StatementContext,
526 _: CreateSubsourceStatement<Aug>,
527) -> Result<StatementDesc, PlanError> {
528 Ok(StatementDesc::new(None))
529}
530
531generate_extracted_config!(
532 CreateSourceOption,
533 (TimestampInterval, Duration),
534 (RetainHistory, OptionalDuration)
535);
536
537generate_extracted_config!(
538 PgConfigOption,
539 (Details, String),
540 (Publication, String),
541 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
542 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
543);
544
545generate_extracted_config!(
546 MySqlConfigOption,
547 (Details, String),
548 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
549 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
550);
551
552generate_extracted_config!(
553 SqlServerConfigOption,
554 (Details, String),
555 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
556 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
557);
558
559pub fn plan_create_webhook_source(
560 scx: &StatementContext,
561 mut stmt: CreateWebhookSourceStatement<Aug>,
562) -> Result<Plan, PlanError> {
563 if stmt.is_table {
564 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
565 }
566
567 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
570 let create_sql =
571 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
572
573 let CreateWebhookSourceStatement {
574 name,
575 if_not_exists,
576 body_format,
577 include_headers,
578 validate_using,
579 is_table,
580 in_cluster: _,
582 } = stmt;
583
584 let validate_using = validate_using
585 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
586 .transpose()?;
587 if let Some(WebhookValidation { expression, .. }) = &validate_using {
588 if !expression.contains_column() {
591 return Err(PlanError::WebhookValidationDoesNotUseColumns);
592 }
593 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
597 return Err(PlanError::WebhookValidationNonDeterministic);
598 }
599 }
600
601 let body_format = match body_format {
602 Format::Bytes => WebhookBodyFormat::Bytes,
603 Format::Json { array } => WebhookBodyFormat::Json { array },
604 Format::Text => WebhookBodyFormat::Text,
605 ty => {
607 return Err(PlanError::Unsupported {
608 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
609 discussion_no: None,
610 });
611 }
612 };
613
614 let mut column_ty = vec![
615 SqlColumnType {
617 scalar_type: SqlScalarType::from(body_format),
618 nullable: false,
619 },
620 ];
621 let mut column_names = vec!["body".to_string()];
622
623 let mut headers = WebhookHeaders::default();
624
625 if let Some(filters) = include_headers.column {
627 column_ty.push(SqlColumnType {
628 scalar_type: SqlScalarType::Map {
629 value_type: Box::new(SqlScalarType::String),
630 custom_id: None,
631 },
632 nullable: false,
633 });
634 column_names.push("headers".to_string());
635
636 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
637 filters.into_iter().partition_map(|filter| {
638 if filter.block {
639 itertools::Either::Right(filter.header_name)
640 } else {
641 itertools::Either::Left(filter.header_name)
642 }
643 });
644 headers.header_column = Some(WebhookHeaderFilters { allow, block });
645 }
646
647 for header in include_headers.mappings {
649 let scalar_type = header
650 .use_bytes
651 .then_some(SqlScalarType::Bytes)
652 .unwrap_or(SqlScalarType::String);
653 column_ty.push(SqlColumnType {
654 scalar_type,
655 nullable: true,
656 });
657 column_names.push(header.column_name.into_string());
658
659 let column_idx = column_ty.len() - 1;
660 assert_eq!(
662 column_idx,
663 column_names.len() - 1,
664 "header column names and types don't match"
665 );
666 headers
667 .mapped_headers
668 .insert(column_idx, (header.header_name, header.use_bytes));
669 }
670
671 let mut unique_check = HashSet::with_capacity(column_names.len());
673 for name in &column_names {
674 if !unique_check.insert(name) {
675 return Err(PlanError::AmbiguousColumn(name.clone().into()));
676 }
677 }
678 if column_names.len() > MAX_NUM_COLUMNS {
679 return Err(PlanError::TooManyColumns {
680 max_num_columns: MAX_NUM_COLUMNS,
681 req_num_columns: column_names.len(),
682 });
683 }
684
685 let typ = SqlRelationType::new(column_ty);
686 let desc = RelationDesc::new(typ, column_names);
687
688 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
690 let full_name = scx.catalog.resolve_full_name(&name);
691 let partial_name = PartialItemName::from(full_name.clone());
692 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
693 return Err(PlanError::ItemAlreadyExists {
694 name: full_name.to_string(),
695 item_type: item.item_type(),
696 });
697 }
698
699 let timeline = Timeline::EpochMilliseconds;
702
703 let plan = if is_table {
704 let data_source = DataSourceDesc::Webhook {
705 validate_using,
706 body_format,
707 headers,
708 cluster_id: Some(in_cluster.id()),
709 };
710 let data_source = TableDataSource::DataSource {
711 desc: data_source,
712 timeline,
713 };
714 Plan::CreateTable(CreateTablePlan {
715 name,
716 if_not_exists,
717 table: Table {
718 create_sql,
719 desc: VersionedRelationDesc::new(desc),
720 temporary: false,
721 compaction_window: None,
722 data_source,
723 },
724 })
725 } else {
726 let data_source = DataSourceDesc::Webhook {
727 validate_using,
728 body_format,
729 headers,
730 cluster_id: None,
732 };
733 Plan::CreateSource(CreateSourcePlan {
734 name,
735 source: Source {
736 create_sql,
737 data_source,
738 desc,
739 compaction_window: None,
740 },
741 if_not_exists,
742 timeline,
743 in_cluster: Some(in_cluster.id()),
744 })
745 };
746
747 Ok(plan)
748}
749
750pub fn plan_create_source(
751 scx: &StatementContext,
752 mut stmt: CreateSourceStatement<Aug>,
753) -> Result<Plan, PlanError> {
754 let CreateSourceStatement {
755 name,
756 in_cluster: _,
757 col_names,
758 connection: source_connection,
759 envelope,
760 if_not_exists,
761 format,
762 key_constraint,
763 include_metadata,
764 with_options,
765 external_references: referenced_subsources,
766 progress_subsource,
767 } = &stmt;
768
769 mz_ore::soft_assert_or_log!(
770 referenced_subsources.is_none(),
771 "referenced subsources must be cleared in purification"
772 );
773
774 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
775 && scx.catalog.system_vars().force_source_table_syntax();
776
777 if force_source_table_syntax {
780 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
781 Err(PlanError::UseTablesForSources(
782 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
783 ))?;
784 }
785 }
786
787 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
788
789 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
790 && include_metadata
791 .iter()
792 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
793 {
794 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
796 }
797 if !matches!(
798 source_connection,
799 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
800 ) && !include_metadata.is_empty()
801 {
802 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
803 }
804
805 if !include_metadata.is_empty()
806 && !matches!(
807 envelope,
808 ast::SourceEnvelope::Upsert { .. }
809 | ast::SourceEnvelope::None
810 | ast::SourceEnvelope::Debezium
811 )
812 {
813 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
814 }
815
816 let external_connection =
817 plan_generic_source_connection(scx, source_connection, include_metadata)?;
818
819 let CreateSourceOptionExtracted {
820 timestamp_interval,
821 retain_history,
822 seen: _,
823 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
824
825 let metadata_columns_desc = match external_connection {
826 GenericSourceConnection::Kafka(KafkaSourceConnection {
827 ref metadata_columns,
828 ..
829 }) => kafka_metadata_columns_desc(metadata_columns),
830 _ => vec![],
831 };
832
833 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
835 scx,
836 &envelope,
837 format,
838 Some(external_connection.default_key_desc()),
839 external_connection.default_value_desc(),
840 include_metadata,
841 metadata_columns_desc,
842 &external_connection,
843 )?;
844 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
845
846 let names: Vec<_> = desc.iter_names().cloned().collect();
847 if let Some(dup) = names.iter().duplicates().next() {
848 sql_bail!("column {} specified more than once", dup.quoted());
849 }
850
851 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
853 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
856
857 let key_columns = columns
858 .into_iter()
859 .map(normalize::column_name)
860 .collect::<Vec<_>>();
861
862 let mut uniq = BTreeSet::new();
863 for col in key_columns.iter() {
864 if !uniq.insert(col) {
865 sql_bail!("Repeated column name in source key constraint: {}", col);
866 }
867 }
868
869 let key_indices = key_columns
870 .iter()
871 .map(|col| {
872 let name_idx = desc
873 .get_by_name(col)
874 .map(|(idx, _type)| idx)
875 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
876 if desc.get_unambiguous_name(name_idx).is_none() {
877 sql_bail!("Ambiguous column in source key constraint: {}", col);
878 }
879 Ok(name_idx)
880 })
881 .collect::<Result<Vec<_>, _>>()?;
882
883 if !desc.typ().keys.is_empty() {
884 return Err(key_constraint_err(&desc, &key_columns));
885 } else {
886 desc = desc.with_key(key_indices);
887 }
888 }
889
890 let timestamp_interval = match timestamp_interval {
891 Some(duration) => {
892 if scx.pcx.is_some() {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 }
906 duration
907 }
908 None => scx.catalog.system_vars().default_timestamp_interval(),
909 };
910
911 let (desc, data_source) = match progress_subsource {
912 Some(name) => {
913 let DeferredItemName::Named(name) = name else {
914 sql_bail!("[internal error] progress subsource must be named during purification");
915 };
916 let ResolvedItemName::Item { id, .. } = name else {
917 sql_bail!("[internal error] invalid target id");
918 };
919
920 let details = match external_connection {
921 GenericSourceConnection::Kafka(ref c) => {
922 SourceExportDetails::Kafka(KafkaSourceExportDetails {
923 metadata_columns: c.metadata_columns.clone(),
924 })
925 }
926 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
927 LoadGenerator::Auction
928 | LoadGenerator::Marketing
929 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
930 LoadGenerator::Counter { .. }
931 | LoadGenerator::Clock
932 | LoadGenerator::Datums
933 | LoadGenerator::KeyValue(_) => {
934 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
935 output: LoadGeneratorOutput::Default,
936 })
937 }
938 },
939 GenericSourceConnection::Postgres(_)
940 | GenericSourceConnection::MySql(_)
941 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
942 };
943
944 let data_source = DataSourceDesc::OldSyntaxIngestion {
945 desc: SourceDesc {
946 connection: external_connection,
947 timestamp_interval,
948 },
949 progress_subsource: *id,
950 data_config: SourceExportDataConfig {
951 encoding,
952 envelope: envelope.clone(),
953 },
954 details,
955 };
956 (desc, data_source)
957 }
958 None => {
959 let desc = external_connection.timestamp_desc();
960 let data_source = DataSourceDesc::Ingestion(SourceDesc {
961 connection: external_connection,
962 timestamp_interval,
963 });
964 (desc, data_source)
965 }
966 };
967
968 let if_not_exists = *if_not_exists;
969 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
970
971 let full_name = scx.catalog.resolve_full_name(&name);
973 let partial_name = PartialItemName::from(full_name.clone());
974 if let (false, Ok(item)) = (
977 if_not_exists,
978 scx.catalog.resolve_item_or_type(&partial_name),
979 ) {
980 return Err(PlanError::ItemAlreadyExists {
981 name: full_name.to_string(),
982 item_type: item.item_type(),
983 });
984 }
985
986 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
990
991 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
992
993 let timeline = match envelope {
995 SourceEnvelope::CdcV2 => {
996 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
997 }
998 _ => Timeline::EpochMilliseconds,
999 };
1000
1001 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1002
1003 let source = Source {
1004 create_sql,
1005 data_source,
1006 desc,
1007 compaction_window,
1008 };
1009
1010 Ok(Plan::CreateSource(CreateSourcePlan {
1011 name,
1012 source,
1013 if_not_exists,
1014 timeline,
1015 in_cluster: Some(in_cluster.id()),
1016 }))
1017}
1018
1019pub fn plan_generic_source_connection(
1020 scx: &StatementContext<'_>,
1021 source_connection: &CreateSourceConnection<Aug>,
1022 include_metadata: &Vec<SourceIncludeMetadata>,
1023) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1024 Ok(match source_connection {
1025 CreateSourceConnection::Kafka {
1026 connection,
1027 options,
1028 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1029 scx,
1030 connection,
1031 options,
1032 include_metadata,
1033 )?),
1034 CreateSourceConnection::Postgres {
1035 connection,
1036 options,
1037 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1038 scx, connection, options,
1039 )?),
1040 CreateSourceConnection::SqlServer {
1041 connection,
1042 options,
1043 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1044 scx, connection, options,
1045 )?),
1046 CreateSourceConnection::MySql {
1047 connection,
1048 options,
1049 } => {
1050 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1051 }
1052 CreateSourceConnection::LoadGenerator { generator, options } => {
1053 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1054 scx,
1055 generator,
1056 options,
1057 include_metadata,
1058 )?)
1059 }
1060 })
1061}
1062
1063fn plan_load_generator_source_connection(
1064 scx: &StatementContext<'_>,
1065 generator: &ast::LoadGenerator,
1066 options: &Vec<LoadGeneratorOption<Aug>>,
1067 include_metadata: &Vec<SourceIncludeMetadata>,
1068) -> Result<LoadGeneratorSourceConnection, PlanError> {
1069 let load_generator =
1070 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1071 let LoadGeneratorOptionExtracted {
1072 tick_interval,
1073 as_of,
1074 up_to,
1075 ..
1076 } = options.clone().try_into()?;
1077 let tick_micros = match tick_interval {
1078 Some(interval) => Some(interval.as_micros().try_into()?),
1079 None => None,
1080 };
1081 if up_to < as_of {
1082 sql_bail!("UP TO cannot be less than AS OF");
1083 }
1084 Ok(LoadGeneratorSourceConnection {
1085 load_generator,
1086 tick_micros,
1087 as_of,
1088 up_to,
1089 })
1090}
1091
1092fn plan_mysql_source_connection(
1093 scx: &StatementContext<'_>,
1094 connection: &ResolvedItemName,
1095 options: &Vec<MySqlConfigOption<Aug>>,
1096) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1097 let connection_item = scx.get_item_by_resolved_name(connection)?;
1098 match connection_item.connection()? {
1099 Connection::MySql(connection) => connection,
1100 _ => sql_bail!(
1101 "{} is not a MySQL connection",
1102 scx.catalog.resolve_full_name(connection_item.name())
1103 ),
1104 };
1105 let MySqlConfigOptionExtracted {
1106 details,
1107 text_columns: _,
1111 exclude_columns: _,
1112 seen: _,
1113 } = options.clone().try_into()?;
1114 let details = details
1115 .as_ref()
1116 .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1117 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1118 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1119 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1120 Ok(MySqlSourceConnection {
1121 connection: connection_item.id(),
1122 connection_id: connection_item.id(),
1123 details,
1124 })
1125}
1126
1127fn plan_sqlserver_source_connection(
1128 scx: &StatementContext<'_>,
1129 connection: &ResolvedItemName,
1130 options: &Vec<SqlServerConfigOption<Aug>>,
1131) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1132 let connection_item = scx.get_item_by_resolved_name(connection)?;
1133 match connection_item.connection()? {
1134 Connection::SqlServer(connection) => connection,
1135 _ => sql_bail!(
1136 "{} is not a SQL Server connection",
1137 scx.catalog.resolve_full_name(connection_item.name())
1138 ),
1139 };
1140 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1141 let details = details
1142 .as_ref()
1143 .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1144 let extras = hex::decode(details)
1145 .map_err(|e| sql_err!("{e}"))
1146 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1147 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1148 Ok(SqlServerSourceConnection {
1149 connection_id: connection_item.id(),
1150 connection: connection_item.id(),
1151 extras,
1152 })
1153}
1154
1155fn plan_postgres_source_connection(
1156 scx: &StatementContext<'_>,
1157 connection: &ResolvedItemName,
1158 options: &Vec<PgConfigOption<Aug>>,
1159) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1160 let connection_item = scx.get_item_by_resolved_name(connection)?;
1161 let PgConfigOptionExtracted {
1162 details,
1163 publication,
1164 text_columns: _,
1168 exclude_columns: _,
1172 seen: _,
1173 } = options.clone().try_into()?;
1174 let details = details
1175 .as_ref()
1176 .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1177 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1178 let details =
1179 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1180 let publication_details =
1181 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1182 Ok(PostgresSourceConnection {
1183 connection: connection_item.id(),
1184 connection_id: connection_item.id(),
1185 publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1187 publication_details,
1188 })
1189}
1190
1191fn plan_kafka_source_connection(
1192 scx: &StatementContext<'_>,
1193 connection_name: &ResolvedItemName,
1194 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1195 include_metadata: &Vec<SourceIncludeMetadata>,
1196) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1197 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1198 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1199 sql_bail!(
1200 "{} is not a kafka connection",
1201 scx.catalog.resolve_full_name(connection_item.name())
1202 )
1203 }
1204 let KafkaSourceConfigOptionExtracted {
1205 group_id_prefix,
1206 topic,
1207 topic_metadata_refresh_interval,
1208 start_timestamp: _, start_offset,
1210 seen: _,
1211 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1212 let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1214 let mut start_offsets = BTreeMap::new();
1215 if let Some(offsets) = start_offset {
1216 for (part, offset) in offsets.iter().enumerate() {
1217 if *offset < 0 {
1218 sql_bail!("START OFFSET must be a nonnegative integer");
1219 }
1220 start_offsets.insert(i32::try_from(part)?, *offset);
1221 }
1222 }
1223 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1224 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1227 }
1228 if topic_metadata_refresh_interval < Duration::from_secs(1) {
1229 sql_bail!("TOPIC METADATA REFRESH INTERVAL must be at least 1 second");
1232 }
1233 let metadata_columns = include_metadata
1234 .into_iter()
1235 .flat_map(|item| match item {
1236 SourceIncludeMetadata::Timestamp { alias } => {
1237 let name = match alias {
1238 Some(name) => name.to_string(),
1239 None => "timestamp".to_owned(),
1240 };
1241 Some((name, KafkaMetadataKind::Timestamp))
1242 }
1243 SourceIncludeMetadata::Partition { alias } => {
1244 let name = match alias {
1245 Some(name) => name.to_string(),
1246 None => "partition".to_owned(),
1247 };
1248 Some((name, KafkaMetadataKind::Partition))
1249 }
1250 SourceIncludeMetadata::Offset { alias } => {
1251 let name = match alias {
1252 Some(name) => name.to_string(),
1253 None => "offset".to_owned(),
1254 };
1255 Some((name, KafkaMetadataKind::Offset))
1256 }
1257 SourceIncludeMetadata::Headers { alias } => {
1258 let name = match alias {
1259 Some(name) => name.to_string(),
1260 None => "headers".to_owned(),
1261 };
1262 Some((name, KafkaMetadataKind::Headers))
1263 }
1264 SourceIncludeMetadata::Header {
1265 alias,
1266 key,
1267 use_bytes,
1268 } => Some((
1269 alias.to_string(),
1270 KafkaMetadataKind::Header {
1271 key: key.clone(),
1272 use_bytes: *use_bytes,
1273 },
1274 )),
1275 SourceIncludeMetadata::Key { .. } => {
1276 None
1278 }
1279 })
1280 .collect();
1281 Ok(KafkaSourceConnection {
1282 connection: connection_item.id(),
1283 connection_id: connection_item.id(),
1284 topic,
1285 start_offsets,
1286 group_id_prefix,
1287 topic_metadata_refresh_interval,
1288 metadata_columns,
1289 })
1290}
1291
1292fn apply_source_envelope_encoding(
1293 scx: &StatementContext,
1294 envelope: &ast::SourceEnvelope,
1295 format: &Option<FormatSpecifier<Aug>>,
1296 key_desc: Option<RelationDesc>,
1297 value_desc: RelationDesc,
1298 include_metadata: &[SourceIncludeMetadata],
1299 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1300 source_connection: &GenericSourceConnection<ReferencedConnection>,
1301) -> Result<
1302 (
1303 RelationDesc,
1304 SourceEnvelope,
1305 Option<SourceDataEncoding<ReferencedConnection>>,
1306 ),
1307 PlanError,
1308> {
1309 let encoding = match format {
1310 Some(format) => Some(get_encoding(scx, format, envelope)?),
1311 None => None,
1312 };
1313
1314 let (key_desc, value_desc) = match &encoding {
1315 Some(encoding) => {
1316 match value_desc.typ().columns() {
1319 [typ] => match typ.scalar_type {
1320 SqlScalarType::Bytes => {}
1321 _ => sql_bail!(
1322 "The schema produced by the source is incompatible with format decoding"
1323 ),
1324 },
1325 _ => sql_bail!(
1326 "The schema produced by the source is incompatible with format decoding"
1327 ),
1328 }
1329
1330 let (key_desc, value_desc) = encoding.desc()?;
1331
1332 let key_desc = key_desc.map(|desc| {
1339 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1340 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1341 if is_kafka && is_envelope_none {
1342 RelationDesc::from_names_and_types(
1343 desc.into_iter()
1344 .map(|(name, typ)| (name, typ.nullable(true))),
1345 )
1346 } else {
1347 desc
1348 }
1349 });
1350 (key_desc, value_desc)
1351 }
1352 None => (key_desc, value_desc),
1353 };
1354
1355 let key_envelope_no_encoding = matches!(
1368 source_connection,
1369 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1370 load_generator: LoadGenerator::KeyValue(_),
1371 ..
1372 })
1373 );
1374 let mut key_envelope = get_key_envelope(
1375 include_metadata,
1376 encoding.as_ref(),
1377 key_envelope_no_encoding,
1378 )?;
1379
1380 match (&envelope, &key_envelope) {
1381 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1382 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1383 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1384 ),
1385 _ => {}
1386 };
1387
1388 let envelope = match &envelope {
1397 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1399 ast::SourceEnvelope::Debezium => {
1400 let after_idx = match typecheck_debezium(&value_desc) {
1402 Ok((_before_idx, after_idx)) => Ok(after_idx),
1403 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1404 Some(DataEncoding::Avro(_)) => Err(type_err),
1405 _ => Err(sql_err!(
1406 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1407 )),
1408 },
1409 }?;
1410
1411 UnplannedSourceEnvelope::Upsert {
1412 style: UpsertStyle::Debezium { after_idx },
1413 }
1414 }
1415 ast::SourceEnvelope::Upsert {
1416 value_decode_err_policy,
1417 } => {
1418 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1419 None => {
1420 if !key_envelope_no_encoding {
1421 bail_unsupported!(format!(
1422 "UPSERT requires a key/value format: {:?}",
1423 format
1424 ))
1425 }
1426 None
1427 }
1428 Some(key_encoding) => Some(key_encoding),
1429 };
1430 if key_envelope == KeyEnvelope::None {
1433 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1434 }
1435 let style = match value_decode_err_policy.as_slice() {
1437 [] => UpsertStyle::Default(key_envelope),
1438 [SourceErrorPolicy::Inline { alias }] => {
1439 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1440 UpsertStyle::ValueErrInline {
1441 key_envelope,
1442 error_column: alias
1443 .as_ref()
1444 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1445 }
1446 }
1447 _ => {
1448 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1449 }
1450 };
1451
1452 UnplannedSourceEnvelope::Upsert { style }
1453 }
1454 ast::SourceEnvelope::CdcV2 => {
1455 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1456 match format {
1458 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1459 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1460 }
1461 UnplannedSourceEnvelope::CdcV2
1462 }
1463 };
1464
1465 let metadata_desc = included_column_desc(metadata_columns_desc);
1466 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1467
1468 Ok((desc, envelope, encoding))
1469}
1470
1471fn plan_source_export_desc(
1474 scx: &StatementContext,
1475 name: &UnresolvedItemName,
1476 columns: &Vec<ColumnDef<Aug>>,
1477 constraints: &Vec<TableConstraint<Aug>>,
1478) -> Result<RelationDesc, PlanError> {
1479 let names: Vec<_> = columns
1480 .iter()
1481 .map(|c| normalize::column_name(c.name.clone()))
1482 .collect();
1483
1484 if let Some(dup) = names.iter().duplicates().next() {
1485 sql_bail!("column {} specified more than once", dup.quoted());
1486 }
1487
1488 let mut column_types = Vec::with_capacity(columns.len());
1491 let mut keys = Vec::new();
1492
1493 for (i, c) in columns.into_iter().enumerate() {
1494 let aug_data_type = &c.data_type;
1495 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1496 let mut nullable = true;
1497 for option in &c.options {
1498 match &option.option {
1499 ColumnOption::NotNull => nullable = false,
1500 ColumnOption::Default(_) => {
1501 bail_unsupported!("Source export with default value")
1502 }
1503 ColumnOption::Unique { is_primary } => {
1504 keys.push(vec![i]);
1505 if *is_primary {
1506 nullable = false;
1507 }
1508 }
1509 other => {
1510 bail_unsupported!(format!("Source export with column constraint: {}", other))
1511 }
1512 }
1513 }
1514 column_types.push(ty.nullable(nullable));
1515 }
1516
1517 let mut seen_primary = false;
1518 'c: for constraint in constraints {
1519 match constraint {
1520 TableConstraint::Unique {
1521 name: _,
1522 columns,
1523 is_primary,
1524 nulls_not_distinct,
1525 } => {
1526 if seen_primary && *is_primary {
1527 sql_bail!(
1528 "multiple primary keys for source export {} are not allowed",
1529 name.to_ast_string_stable()
1530 );
1531 }
1532 seen_primary = *is_primary || seen_primary;
1533
1534 let mut key = vec![];
1535 for column in columns {
1536 let column = normalize::column_name(column.clone());
1537 match names.iter().position(|name| *name == column) {
1538 None => sql_bail!("unknown column in constraint: {}", column),
1539 Some(i) => {
1540 let nullable = &mut column_types[i].nullable;
1541 if *is_primary {
1542 if *nulls_not_distinct {
1543 sql_bail!(
1544 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1545 );
1546 }
1547 *nullable = false;
1548 } else if !(*nulls_not_distinct || !*nullable) {
1549 break 'c;
1552 }
1553
1554 key.push(i);
1555 }
1556 }
1557 }
1558
1559 if *is_primary {
1560 keys.insert(0, key);
1561 } else {
1562 keys.push(key);
1563 }
1564 }
1565 TableConstraint::ForeignKey { .. } => {
1566 bail_unsupported!("Source export with a foreign key")
1567 }
1568 TableConstraint::Check { .. } => {
1569 bail_unsupported!("Source export with a check constraint")
1570 }
1571 }
1572 }
1573
1574 let typ = SqlRelationType::new(column_types).with_keys(keys);
1575 let desc = RelationDesc::new(typ, names);
1576 Ok(desc)
1577}
1578
1579generate_extracted_config!(
1580 CreateSubsourceOption,
1581 (Progress, bool, Default(false)),
1582 (ExternalReference, UnresolvedItemName),
1583 (RetainHistory, OptionalDuration),
1584 (TextColumns, Vec::<Ident>, Default(vec![])),
1585 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1586 (Details, String)
1587);
1588
1589pub fn plan_create_subsource(
1590 scx: &StatementContext,
1591 stmt: CreateSubsourceStatement<Aug>,
1592) -> Result<Plan, PlanError> {
1593 let CreateSubsourceStatement {
1594 name,
1595 columns,
1596 of_source,
1597 constraints,
1598 if_not_exists,
1599 with_options,
1600 } = &stmt;
1601
1602 let CreateSubsourceOptionExtracted {
1603 progress,
1604 retain_history,
1605 external_reference,
1606 text_columns,
1607 exclude_columns,
1608 details,
1609 seen: _,
1610 } = with_options.clone().try_into()?;
1611
1612 if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1617 bail_internal!(
1618 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1619 );
1620 }
1621
1622 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1623
1624 let data_source = if let Some(source_reference) = of_source {
1625 if scx.catalog.system_vars().enable_create_table_from_source()
1628 && scx.catalog.system_vars().force_source_table_syntax()
1629 {
1630 Err(PlanError::UseTablesForSources(
1631 "CREATE SUBSOURCE".to_string(),
1632 ))?;
1633 }
1634
1635 let ingestion_id = *source_reference.item_id();
1638 let external_reference = external_reference.ok_or_else(|| {
1639 sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1640 })?;
1641
1642 let details = details
1645 .as_ref()
1646 .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1647 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1648 let details =
1649 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1650 let details =
1651 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1652 let details = match details {
1653 SourceExportStatementDetails::Postgres { table } => {
1654 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1655 column_casts: crate::pure::postgres::generate_column_casts(
1656 scx,
1657 &table,
1658 &text_columns,
1659 )?,
1660 table,
1661 })
1662 }
1663 SourceExportStatementDetails::MySql {
1664 table,
1665 initial_gtid_set,
1666 binlog_full_metadata,
1667 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1668 table,
1669 initial_gtid_set,
1670 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1671 exclude_columns: exclude_columns
1672 .into_iter()
1673 .map(|c| c.into_string())
1674 .collect(),
1675 binlog_full_metadata,
1676 }),
1677 SourceExportStatementDetails::SqlServer {
1678 table,
1679 capture_instance,
1680 initial_lsn,
1681 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1682 capture_instance,
1683 table,
1684 initial_lsn,
1685 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1686 exclude_columns: exclude_columns
1687 .into_iter()
1688 .map(|c| c.into_string())
1689 .collect(),
1690 }),
1691 SourceExportStatementDetails::LoadGenerator { output } => {
1692 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1693 }
1694 SourceExportStatementDetails::Kafka {} => {
1695 bail_unsupported!("subsources cannot reference Kafka sources")
1696 }
1697 };
1698 DataSourceDesc::IngestionExport {
1699 ingestion_id,
1700 external_reference,
1701 details,
1702 data_config: SourceExportDataConfig {
1704 envelope: SourceEnvelope::None(NoneEnvelope {
1705 key_envelope: KeyEnvelope::None,
1706 key_arity: 0,
1707 }),
1708 encoding: None,
1709 },
1710 }
1711 } else if progress {
1712 DataSourceDesc::Progress
1713 } else {
1714 sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1715 };
1716
1717 let if_not_exists = *if_not_exists;
1718 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1719
1720 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1721
1722 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1723 let source = Source {
1724 create_sql,
1725 data_source,
1726 desc,
1727 compaction_window,
1728 };
1729
1730 Ok(Plan::CreateSource(CreateSourcePlan {
1731 name,
1732 source,
1733 if_not_exists,
1734 timeline: Timeline::EpochMilliseconds,
1735 in_cluster: None,
1736 }))
1737}
1738
1739generate_extracted_config!(
1740 TableFromSourceOption,
1741 (TextColumns, Vec::<Ident>, Default(vec![])),
1742 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1743 (PartitionBy, Vec<Ident>),
1744 (RetainHistory, OptionalDuration),
1745 (Details, String)
1746);
1747
1748pub fn plan_create_table_from_source(
1749 scx: &StatementContext,
1750 stmt: CreateTableFromSourceStatement<Aug>,
1751) -> Result<Plan, PlanError> {
1752 if !scx.catalog.system_vars().enable_create_table_from_source() {
1753 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1754 }
1755
1756 let CreateTableFromSourceStatement {
1757 name,
1758 columns,
1759 constraints,
1760 if_not_exists,
1761 source,
1762 external_reference,
1763 envelope,
1764 format,
1765 include_metadata,
1766 with_options,
1767 } = &stmt;
1768
1769 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1770
1771 let TableFromSourceOptionExtracted {
1772 text_columns,
1773 exclude_columns,
1774 retain_history,
1775 partition_by,
1776 details,
1777 seen: _,
1778 } = with_options.clone().try_into()?;
1779
1780 let source_item = scx.get_item_by_resolved_name(source)?;
1781 let ingestion_id = source_item.id();
1782
1783 let details = details
1786 .as_ref()
1787 .ok_or_else(|| internal_err!("source-export missing details"))?;
1788 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1789 let details =
1790 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1791 let details =
1792 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1793
1794 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1795 && include_metadata
1796 .iter()
1797 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1798 {
1799 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1801 }
1802 if !matches!(
1803 details,
1804 SourceExportStatementDetails::Kafka { .. }
1805 | SourceExportStatementDetails::LoadGenerator { .. }
1806 ) && !include_metadata.is_empty()
1807 {
1808 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1809 }
1810
1811 let details = match details {
1812 SourceExportStatementDetails::Postgres { table } => {
1813 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1814 column_casts: crate::pure::postgres::generate_column_casts(
1815 scx,
1816 &table,
1817 &text_columns,
1818 )?,
1819 table,
1820 })
1821 }
1822 SourceExportStatementDetails::MySql {
1823 table,
1824 initial_gtid_set,
1825 binlog_full_metadata,
1826 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1827 table,
1828 initial_gtid_set,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 binlog_full_metadata,
1835 }),
1836 SourceExportStatementDetails::SqlServer {
1837 table,
1838 capture_instance,
1839 initial_lsn,
1840 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1841 table,
1842 capture_instance,
1843 initial_lsn,
1844 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1845 exclude_columns: exclude_columns
1846 .into_iter()
1847 .map(|c| c.into_string())
1848 .collect(),
1849 }),
1850 SourceExportStatementDetails::LoadGenerator { output } => {
1851 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1852 }
1853 SourceExportStatementDetails::Kafka {} => {
1854 if !include_metadata.is_empty()
1855 && !matches!(
1856 envelope,
1857 ast::SourceEnvelope::Upsert { .. }
1858 | ast::SourceEnvelope::None
1859 | ast::SourceEnvelope::Debezium
1860 )
1861 {
1862 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1864 }
1865
1866 let metadata_columns = include_metadata
1867 .into_iter()
1868 .flat_map(|item| match item {
1869 SourceIncludeMetadata::Timestamp { alias } => {
1870 let name = match alias {
1871 Some(name) => name.to_string(),
1872 None => "timestamp".to_owned(),
1873 };
1874 Some((name, KafkaMetadataKind::Timestamp))
1875 }
1876 SourceIncludeMetadata::Partition { alias } => {
1877 let name = match alias {
1878 Some(name) => name.to_string(),
1879 None => "partition".to_owned(),
1880 };
1881 Some((name, KafkaMetadataKind::Partition))
1882 }
1883 SourceIncludeMetadata::Offset { alias } => {
1884 let name = match alias {
1885 Some(name) => name.to_string(),
1886 None => "offset".to_owned(),
1887 };
1888 Some((name, KafkaMetadataKind::Offset))
1889 }
1890 SourceIncludeMetadata::Headers { alias } => {
1891 let name = match alias {
1892 Some(name) => name.to_string(),
1893 None => "headers".to_owned(),
1894 };
1895 Some((name, KafkaMetadataKind::Headers))
1896 }
1897 SourceIncludeMetadata::Header {
1898 alias,
1899 key,
1900 use_bytes,
1901 } => Some((
1902 alias.to_string(),
1903 KafkaMetadataKind::Header {
1904 key: key.clone(),
1905 use_bytes: *use_bytes,
1906 },
1907 )),
1908 SourceIncludeMetadata::Key { .. } => {
1909 None
1911 }
1912 })
1913 .collect();
1914
1915 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1916 }
1917 };
1918
1919 let source_connection = &source_item
1920 .source_desc()?
1921 .ok_or_else(|| sql_err!("item is not a source"))?
1922 .connection;
1923
1924 let (key_desc, value_desc) =
1929 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1930 let columns = match columns {
1931 TableFromSourceColumns::Defined(columns) => columns,
1932 _ => bail_internal!("expected column definitions to be present"),
1933 };
1934 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1935 (None, desc)
1936 } else {
1937 let key_desc = source_connection.default_key_desc();
1938 let value_desc = source_connection.default_value_desc();
1939 (Some(key_desc), value_desc)
1940 };
1941
1942 let metadata_columns_desc = match &details {
1943 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1944 metadata_columns, ..
1945 }) => kafka_metadata_columns_desc(metadata_columns),
1946 _ => vec![],
1947 };
1948
1949 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1950 scx,
1951 &envelope,
1952 format,
1953 key_desc,
1954 value_desc,
1955 include_metadata,
1956 metadata_columns_desc,
1957 source_connection,
1958 )?;
1959 if let TableFromSourceColumns::Named(col_names) = columns {
1960 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1961 }
1962
1963 let names: Vec<_> = desc.iter_names().cloned().collect();
1964 if let Some(dup) = names.iter().duplicates().next() {
1965 sql_bail!("column {} specified more than once", dup.quoted());
1966 }
1967
1968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1969
1970 let timeline = match envelope {
1973 SourceEnvelope::CdcV2 => {
1974 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1975 }
1976 _ => Timeline::EpochMilliseconds,
1977 };
1978
1979 if let Some(partition_by) = partition_by {
1980 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1981 check_partition_by(&desc, partition_by)?;
1982 }
1983
1984 let data_source = DataSourceDesc::IngestionExport {
1985 ingestion_id,
1986 external_reference: external_reference
1988 .as_ref()
1989 .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1990 .clone(),
1991 details,
1992 data_config: SourceExportDataConfig { envelope, encoding },
1993 };
1994
1995 let if_not_exists = *if_not_exists;
1996
1997 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1998
1999 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2000 let table = Table {
2001 create_sql,
2002 desc: VersionedRelationDesc::new(desc),
2003 temporary: false,
2004 compaction_window,
2005 data_source: TableDataSource::DataSource {
2006 desc: data_source,
2007 timeline,
2008 },
2009 };
2010
2011 Ok(Plan::CreateTable(CreateTablePlan {
2012 name,
2013 table,
2014 if_not_exists,
2015 }))
2016}
2017
2018generate_extracted_config!(
2019 LoadGeneratorOption,
2020 (TickInterval, Duration),
2021 (AsOf, u64, Default(0_u64)),
2022 (UpTo, u64, Default(u64::MAX)),
2023 (ScaleFactor, f64),
2024 (MaxCardinality, u64),
2025 (Keys, u64),
2026 (SnapshotRounds, u64),
2027 (TransactionalSnapshot, bool),
2028 (ValueSize, u64),
2029 (Seed, u64),
2030 (Partitions, u64),
2031 (BatchSize, u64)
2032);
2033
2034impl LoadGeneratorOptionExtracted {
2035 pub(super) fn ensure_only_valid_options(
2036 &self,
2037 loadgen: &ast::LoadGenerator,
2038 ) -> Result<(), PlanError> {
2039 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2040
2041 let mut options = self.seen.clone();
2042
2043 let permitted_options: &[_] = match loadgen {
2044 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2045 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2046 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2047 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2048 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2049 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2050 ast::LoadGenerator::KeyValue => &[
2051 TickInterval,
2052 Keys,
2053 SnapshotRounds,
2054 TransactionalSnapshot,
2055 ValueSize,
2056 Seed,
2057 Partitions,
2058 BatchSize,
2059 ],
2060 };
2061
2062 for o in permitted_options {
2063 options.remove(o);
2064 }
2065
2066 if !options.is_empty() {
2067 sql_bail!(
2068 "{} load generators do not support {} values",
2069 loadgen,
2070 options.iter().join(", ")
2071 )
2072 }
2073
2074 Ok(())
2075 }
2076}
2077
2078pub(crate) fn load_generator_ast_to_generator(
2079 scx: &StatementContext,
2080 loadgen: &ast::LoadGenerator,
2081 options: &[LoadGeneratorOption<Aug>],
2082 include_metadata: &[SourceIncludeMetadata],
2083) -> Result<LoadGenerator, PlanError> {
2084 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2085 extracted.ensure_only_valid_options(loadgen)?;
2086
2087 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2088 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2089 }
2090
2091 let load_generator = match loadgen {
2092 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2093 ast::LoadGenerator::Clock => {
2094 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2095 LoadGenerator::Clock
2096 }
2097 ast::LoadGenerator::Counter => {
2098 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2099 let LoadGeneratorOptionExtracted {
2100 max_cardinality, ..
2101 } = extracted;
2102 LoadGenerator::Counter { max_cardinality }
2103 }
2104 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2105 ast::LoadGenerator::Datums => {
2106 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2107 LoadGenerator::Datums
2108 }
2109 ast::LoadGenerator::Tpch => {
2110 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2111
2112 let sf: f64 = scale_factor.unwrap_or(0.01);
2114 if !sf.is_finite() || sf < 0.0 {
2115 sql_bail!("unsupported scale factor {sf}");
2116 }
2117
2118 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2119 let total = (sf * multiplier).floor();
2120 let mut i = i64::try_cast_from(total)
2121 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2122 if i < 1 {
2123 i = 1;
2124 }
2125 Ok(i)
2126 };
2127
2128 let count_supplier = f_to_i(10_000f64)?;
2131 let count_part = f_to_i(200_000f64)?;
2132 let count_customer = f_to_i(150_000f64)?;
2133 let count_orders = f_to_i(150_000f64 * 10f64)?;
2134 let count_clerk = f_to_i(1_000f64)?;
2135
2136 LoadGenerator::Tpch {
2137 count_supplier,
2138 count_part,
2139 count_customer,
2140 count_orders,
2141 count_clerk,
2142 }
2143 }
2144 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2145 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2146 let LoadGeneratorOptionExtracted {
2147 keys,
2148 snapshot_rounds,
2149 transactional_snapshot,
2150 value_size,
2151 tick_interval,
2152 seed,
2153 partitions,
2154 batch_size,
2155 ..
2156 } = extracted;
2157
2158 let mut include_offset = None;
2159 for im in include_metadata {
2160 match im {
2161 SourceIncludeMetadata::Offset { alias } => {
2162 include_offset = match alias {
2163 Some(alias) => Some(alias.to_string()),
2164 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2165 }
2166 }
2167 SourceIncludeMetadata::Key { .. } => continue,
2168
2169 _ => {
2170 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2171 }
2172 };
2173 }
2174
2175 let lgkv = KeyValueLoadGenerator {
2176 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2177 snapshot_rounds: snapshot_rounds
2178 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2179 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2181 value_size: value_size
2182 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2183 partitions: partitions
2184 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2185 tick_interval,
2186 batch_size: batch_size
2187 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2188 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2189 include_offset,
2190 };
2191
2192 if lgkv.keys == 0
2193 || lgkv.partitions == 0
2194 || lgkv.value_size == 0
2195 || lgkv.batch_size == 0
2196 {
2197 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2198 }
2199
2200 if lgkv.keys % lgkv.partitions != 0 {
2201 sql_bail!("KEYS must be a multiple of PARTITIONS")
2202 }
2203
2204 if lgkv.batch_size > lgkv.keys {
2205 sql_bail!("KEYS must be larger than BATCH SIZE")
2206 }
2207
2208 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2211 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2212 }
2213
2214 if lgkv.snapshot_rounds == 0 {
2215 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2216 }
2217
2218 LoadGenerator::KeyValue(lgkv)
2219 }
2220 };
2221
2222 Ok(load_generator)
2223}
2224
2225fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2226 let before = value_desc.get_by_name(&"before".into());
2227 let (after_idx, after_ty) = value_desc
2228 .get_by_name(&"after".into())
2229 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2230 let before_idx = if let Some((before_idx, before_ty)) = before {
2231 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2232 sql_bail!("'before' column must be of type record");
2233 }
2234 if before_ty != after_ty {
2235 sql_bail!("'before' type differs from 'after' column");
2236 }
2237 Some(before_idx)
2238 } else {
2239 None
2240 };
2241 Ok((before_idx, after_idx))
2242}
2243
2244fn get_encoding(
2245 scx: &StatementContext,
2246 format: &FormatSpecifier<Aug>,
2247 envelope: &ast::SourceEnvelope,
2248) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2249 let encoding = match format {
2250 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2251 FormatSpecifier::KeyValue { key, value } => {
2252 let key = {
2253 let encoding = get_encoding_inner(scx, key)?;
2254 Some(encoding.key.unwrap_or(encoding.value))
2255 };
2256 let value = get_encoding_inner(scx, value)?.value;
2257 SourceDataEncoding { key, value }
2258 }
2259 };
2260
2261 let requires_keyvalue = matches!(
2262 envelope,
2263 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2264 );
2265 let is_keyvalue = encoding.key.is_some();
2266 if requires_keyvalue && !is_keyvalue {
2267 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2268 };
2269
2270 Ok(encoding)
2271}
2272
2273fn source_sink_cluster_config<'a, 'ctx>(
2279 scx: &'a StatementContext<'ctx>,
2280 in_cluster: &mut Option<ResolvedClusterName>,
2281) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2282 let cluster = match in_cluster {
2283 None => {
2284 let cluster = scx.catalog.resolve_cluster(None)?;
2285 *in_cluster = Some(ResolvedClusterName {
2286 id: cluster.id(),
2287 print_name: None,
2288 });
2289 cluster
2290 }
2291 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2292 };
2293
2294 Ok(cluster)
2295}
2296
2297generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2298
2299#[derive(Debug)]
2300pub struct Schema {
2301 pub key_schema: Option<String>,
2302 pub value_schema: String,
2303 pub key_reference_schemas: Vec<String>,
2305 pub value_reference_schemas: Vec<String>,
2307 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2308 pub confluent_wire_format: bool,
2309}
2310
2311fn get_encoding_inner(
2312 scx: &StatementContext,
2313 format: &Format<Aug>,
2314) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2315 let value = match format {
2316 Format::Bytes => DataEncoding::Bytes,
2317 Format::Avro(schema) => {
2318 let Schema {
2319 key_schema,
2320 value_schema,
2321 key_reference_schemas,
2322 value_reference_schemas,
2323 csr_connection,
2324 confluent_wire_format,
2325 } = match schema {
2326 AvroSchema::InlineSchema {
2329 schema: ast::Schema { schema },
2330 with_options,
2331 } => {
2332 let AvroSchemaOptionExtracted {
2333 confluent_wire_format,
2334 ..
2335 } = with_options.clone().try_into()?;
2336
2337 Schema {
2338 key_schema: None,
2339 value_schema: schema.clone(),
2340 key_reference_schemas: vec![],
2341 value_reference_schemas: vec![],
2342 csr_connection: None,
2343 confluent_wire_format,
2344 }
2345 }
2346 AvroSchema::Csr {
2347 csr_connection:
2348 CsrConnectionAvro {
2349 connection,
2350 seed,
2351 key_strategy: _,
2352 value_strategy: _,
2353 },
2354 } => {
2355 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2356 let csr_connection = match item.connection()? {
2357 Connection::Csr(_) => item.id(),
2358 _ => {
2359 sql_bail!(
2360 "{} is not a schema registry connection",
2361 scx.catalog
2362 .resolve_full_name(item.name())
2363 .to_string()
2364 .quoted()
2365 )
2366 }
2367 };
2368
2369 if let Some(seed) = seed {
2370 Schema {
2371 key_schema: seed.key_schema.clone(),
2372 value_schema: seed.value_schema.clone(),
2373 key_reference_schemas: seed.key_reference_schemas.clone(),
2374 value_reference_schemas: seed.value_reference_schemas.clone(),
2375 csr_connection: Some(csr_connection),
2376 confluent_wire_format: true,
2377 }
2378 } else {
2379 sql_bail!("Avro CSR seed resolution has not been performed")
2380 }
2381 }
2382 };
2383
2384 let wire_format = match (csr_connection.clone(), confluent_wire_format) {
2390 (None, false) => WireFormat::None,
2391 (None, true) => WireFormat::Confluent { registry: None },
2392 (Some(c), true) => WireFormat::Confluent { registry: Some(c) },
2393 (Some(_), false) => {
2394 sql_bail!(
2395 "internal error: AVRO source has CSR connection but \
2396 CONFLUENT WIRE FORMAT = false"
2397 )
2398 }
2399 };
2400
2401 if let Some(key_schema) = key_schema {
2402 return Ok(SourceDataEncoding {
2403 key: Some(DataEncoding::Avro(AvroEncoding {
2404 schema: key_schema,
2405 reference_schemas: key_reference_schemas,
2406 wire_format: wire_format.clone(),
2407 })),
2408 value: DataEncoding::Avro(AvroEncoding {
2409 schema: value_schema,
2410 reference_schemas: value_reference_schemas,
2411 wire_format,
2412 }),
2413 });
2414 } else {
2415 DataEncoding::Avro(AvroEncoding {
2416 schema: value_schema,
2417 reference_schemas: value_reference_schemas,
2418 wire_format,
2419 })
2420 }
2421 }
2422 Format::Protobuf(schema) => match schema {
2423 ProtobufSchema::Csr {
2424 csr_connection:
2425 CsrConnectionProtobuf {
2426 connection:
2427 CsrConnection {
2428 connection,
2429 options,
2430 },
2431 seed,
2432 },
2433 } => {
2434 if let Some(CsrSeedProtobuf { key, value }) = seed {
2435 let item = scx.get_item_by_resolved_name(connection)?;
2436 let _ = match item.connection()? {
2437 Connection::Csr(connection) => connection,
2438 _ => {
2439 sql_bail!(
2440 "{} is not a schema registry connection",
2441 scx.catalog
2442 .resolve_full_name(item.name())
2443 .to_string()
2444 .quoted()
2445 )
2446 }
2447 };
2448
2449 if !options.is_empty() {
2450 sql_bail!("Protobuf CSR connections do not support any options");
2451 }
2452
2453 let value = DataEncoding::Protobuf(ProtobufEncoding {
2454 descriptors: strconv::parse_bytes(&value.schema)?,
2455 message_name: value.message_name.clone(),
2456 confluent_wire_format: true,
2457 });
2458 if let Some(key) = key {
2459 return Ok(SourceDataEncoding {
2460 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2461 descriptors: strconv::parse_bytes(&key.schema)?,
2462 message_name: key.message_name.clone(),
2463 confluent_wire_format: true,
2464 })),
2465 value,
2466 });
2467 }
2468 value
2469 } else {
2470 sql_bail!("Protobuf CSR seed resolution has not been performed")
2471 }
2472 }
2473 ProtobufSchema::InlineSchema {
2474 message_name,
2475 schema: ast::Schema { schema },
2476 } => {
2477 let descriptors = strconv::parse_bytes(schema)?;
2478
2479 DataEncoding::Protobuf(ProtobufEncoding {
2480 descriptors,
2481 message_name: message_name.to_owned(),
2482 confluent_wire_format: false,
2483 })
2484 }
2485 },
2486 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2487 regex: mz_repr::adt::regex::Regex::new(regex, false)
2488 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2489 }),
2490 Format::Csv { columns, delimiter } => {
2491 let columns = match columns {
2492 CsvColumns::Header { names } => {
2493 if names.is_empty() {
2494 sql_bail!("[internal error] column spec should get names in purify")
2495 }
2496 ColumnSpec::Header {
2497 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2498 }
2499 }
2500 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2501 };
2502 DataEncoding::Csv(CsvEncoding {
2503 columns,
2504 delimiter: u8::try_from(*delimiter)
2505 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2506 })
2507 }
2508 Format::Json { array: false } => DataEncoding::Json,
2509 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2510 Format::Text => DataEncoding::Text,
2511 };
2512 Ok(SourceDataEncoding { key: None, value })
2513}
2514
2515fn get_key_envelope(
2517 included_items: &[SourceIncludeMetadata],
2518 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2519 key_envelope_no_encoding: bool,
2520) -> Result<KeyEnvelope, PlanError> {
2521 let key_definition = included_items
2522 .iter()
2523 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2524 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2525 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2526 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2527 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2528 (Some(name), _) if key_envelope_no_encoding => {
2529 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2530 }
2531 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2532 (_, None) => {
2533 sql_bail!(
2537 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2538 got bare FORMAT"
2539 );
2540 }
2541 }
2542 } else {
2543 Ok(KeyEnvelope::None)
2544 }
2545}
2546
2547fn get_unnamed_key_envelope(
2550 key: Option<&DataEncoding<ReferencedConnection>>,
2551) -> Result<KeyEnvelope, PlanError> {
2552 let is_composite = match key {
2556 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2557 Some(
2558 DataEncoding::Avro(_)
2559 | DataEncoding::Csv(_)
2560 | DataEncoding::Protobuf(_)
2561 | DataEncoding::Regex { .. },
2562 ) => true,
2563 None => false,
2564 };
2565
2566 if is_composite {
2567 Ok(KeyEnvelope::Flattened)
2568 } else {
2569 Ok(KeyEnvelope::Named("key".to_string()))
2570 }
2571}
2572
2573pub fn describe_create_view(
2574 _: &StatementContext,
2575 _: CreateViewStatement<Aug>,
2576) -> Result<StatementDesc, PlanError> {
2577 Ok(StatementDesc::new(None))
2578}
2579
2580pub fn plan_view(
2581 scx: &StatementContext,
2582 def: &mut ViewDefinition<Aug>,
2583 temporary: bool,
2584) -> Result<(QualifiedItemName, View), PlanError> {
2585 let create_sql = normalize::create_statement(
2586 scx,
2587 Statement::CreateView(CreateViewStatement {
2588 if_exists: IfExistsBehavior::Error,
2589 temporary,
2590 definition: def.clone(),
2591 }),
2592 )?;
2593
2594 let ViewDefinition {
2595 name,
2596 columns,
2597 query,
2598 } = def;
2599
2600 let query::PlannedRootQuery {
2601 expr,
2602 mut desc,
2603 finishing,
2604 scope: _,
2605 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2606 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2612 &finishing,
2613 expr.arity()
2614 ));
2615 if expr.contains_parameters()? {
2616 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2617 }
2618
2619 let dependencies = expr
2620 .depends_on()
2621 .into_iter()
2622 .map(|gid| scx.catalog.resolve_item_id(&gid))
2623 .collect();
2624
2625 let name = if temporary {
2626 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2627 } else {
2628 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2629 };
2630
2631 plan_utils::maybe_rename_columns(
2632 format!("view {}", scx.catalog.resolve_full_name(&name)),
2633 &mut desc,
2634 columns,
2635 )?;
2636 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2637
2638 if let Some(dup) = names.iter().duplicates().next() {
2639 sql_bail!("column {} specified more than once", dup.quoted());
2640 }
2641
2642 let view = View {
2643 create_sql,
2644 expr,
2645 dependencies,
2646 column_names: names,
2647 temporary,
2648 };
2649
2650 Ok((name, view))
2651}
2652
2653pub fn plan_create_view(
2654 scx: &StatementContext,
2655 mut stmt: CreateViewStatement<Aug>,
2656) -> Result<Plan, PlanError> {
2657 let CreateViewStatement {
2658 temporary,
2659 if_exists,
2660 definition,
2661 } = &mut stmt;
2662 let (name, view) = plan_view(scx, definition, *temporary)?;
2663
2664 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2667
2668 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2669 let if_exists = true;
2670 let cascade = false;
2671 let maybe_item_to_drop = plan_drop_item(
2672 scx,
2673 ObjectType::View,
2674 if_exists,
2675 definition.name.clone(),
2676 cascade,
2677 )?;
2678
2679 if let Some(id) = maybe_item_to_drop {
2681 let dependencies = view.expr.depends_on();
2682 let invalid_drop = scx
2683 .get_item(&id)
2684 .global_ids()
2685 .any(|gid| dependencies.contains(&gid));
2686 if invalid_drop {
2687 let item = scx.catalog.get_item(&id);
2688 sql_bail!(
2689 "cannot replace view {0}: depended upon by new {0} definition",
2690 scx.catalog.resolve_full_name(item.name())
2691 );
2692 }
2693
2694 Some(id)
2695 } else {
2696 None
2697 }
2698 } else {
2699 None
2700 };
2701 let drop_ids = replace
2702 .map(|id| {
2703 scx.catalog
2704 .item_dependents(id)
2705 .into_iter()
2706 .map(|id| id.unwrap_item_id())
2707 .collect()
2708 })
2709 .unwrap_or_default();
2710
2711 validate_view_dependencies(scx, &view.dependencies.0)?;
2712
2713 let full_name = scx.catalog.resolve_full_name(&name);
2715 let partial_name = PartialItemName::from(full_name.clone());
2716 if let (Ok(item), IfExistsBehavior::Error, false) = (
2719 scx.catalog.resolve_item_or_type(&partial_name),
2720 *if_exists,
2721 ignore_if_exists_errors,
2722 ) {
2723 return Err(PlanError::ItemAlreadyExists {
2724 name: full_name.to_string(),
2725 item_type: item.item_type(),
2726 });
2727 }
2728
2729 Ok(Plan::CreateView(CreateViewPlan {
2730 name,
2731 view,
2732 replace,
2733 drop_ids,
2734 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2735 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2736 }))
2737}
2738
2739fn validate_view_dependencies(
2741 scx: &StatementContext,
2742 dependencies: &BTreeSet<CatalogItemId>,
2743) -> Result<(), PlanError> {
2744 for id in dependencies {
2745 let item = scx.catalog.get_item(id);
2746 if item.replacement_target().is_some() {
2747 let name = scx.catalog.minimal_qualification(item.name());
2748 return Err(PlanError::InvalidDependency {
2749 name: name.to_string(),
2750 item_type: format!("replacement {}", item.item_type()),
2751 });
2752 }
2753 }
2754
2755 Ok(())
2756}
2757
2758pub fn describe_create_materialized_view(
2759 _: &StatementContext,
2760 _: CreateMaterializedViewStatement<Aug>,
2761) -> Result<StatementDesc, PlanError> {
2762 Ok(StatementDesc::new(None))
2763}
2764
2765pub fn describe_create_network_policy(
2766 _: &StatementContext,
2767 _: CreateNetworkPolicyStatement<Aug>,
2768) -> Result<StatementDesc, PlanError> {
2769 Ok(StatementDesc::new(None))
2770}
2771
2772pub fn describe_alter_network_policy(
2773 _: &StatementContext,
2774 _: AlterNetworkPolicyStatement<Aug>,
2775) -> Result<StatementDesc, PlanError> {
2776 Ok(StatementDesc::new(None))
2777}
2778
2779pub fn plan_create_materialized_view(
2780 scx: &StatementContext,
2781 mut stmt: CreateMaterializedViewStatement<Aug>,
2782) -> Result<Plan, PlanError> {
2783 let cluster_id =
2784 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2785 stmt.in_cluster = Some(ResolvedClusterName {
2786 id: cluster_id,
2787 print_name: None,
2788 });
2789
2790 let target_replica = match &stmt.in_cluster_replica {
2791 Some(replica_name) => {
2792 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2793
2794 let cluster = scx.catalog.get_cluster(cluster_id);
2795 let replica_id = cluster
2796 .replica_ids()
2797 .get(replica_name.as_str())
2798 .copied()
2799 .ok_or_else(|| {
2800 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2801 })?;
2802 Some(replica_id)
2803 }
2804 None => None,
2805 };
2806
2807 let create_sql =
2808 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2809
2810 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2811 let name = scx.allocate_qualified_name(partial_name.clone())?;
2812
2813 let query::PlannedRootQuery {
2814 expr,
2815 mut desc,
2816 finishing,
2817 scope: _,
2818 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2819 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2821 &finishing,
2822 expr.arity()
2823 ));
2824 if expr.contains_parameters()? {
2825 return Err(PlanError::ParameterNotAllowed(
2826 "materialized views".to_string(),
2827 ));
2828 }
2829
2830 plan_utils::maybe_rename_columns(
2831 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2832 &mut desc,
2833 &stmt.columns,
2834 )?;
2835 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2836
2837 let MaterializedViewOptionExtracted {
2838 assert_not_null,
2839 partition_by,
2840 retain_history,
2841 refresh,
2842 seen: _,
2843 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2844
2845 if let Some(partition_by) = partition_by {
2846 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2847 check_partition_by(&desc, partition_by)?;
2848 }
2849
2850 let refresh_schedule = {
2851 let mut refresh_schedule = RefreshSchedule::default();
2852 let mut on_commits_seen = 0;
2853 for refresh_option_value in refresh {
2854 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2855 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2856 }
2857 match refresh_option_value {
2858 RefreshOptionValue::OnCommit => {
2859 on_commits_seen += 1;
2860 }
2861 RefreshOptionValue::AtCreation => {
2862 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2863 bail_internal!("REFRESH AT CREATION should have been purified away")
2864 }
2865 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2866 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2868 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2869 name: "REFRESH AT",
2870 scope: &Scope::empty(),
2871 relation_type: &SqlRelationType::empty(),
2872 allow_aggregates: false,
2873 allow_subqueries: false,
2874 allow_parameters: false,
2875 allow_windows: false,
2876 };
2877 let hir = plan_expr(ecx, &time)?.cast_to(
2878 ecx,
2879 CastContext::Assignment,
2880 &SqlScalarType::MzTimestamp,
2881 )?;
2882 let timestamp = hir
2884 .into_literal_mz_timestamp()
2885 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2886 refresh_schedule.ats.push(timestamp);
2887 }
2888 RefreshOptionValue::Every(RefreshEveryOptionValue {
2889 interval,
2890 aligned_to,
2891 }) => {
2892 let interval = Interval::try_from_value(Value::Interval(interval))?;
2893 if interval.as_microseconds() <= 0 {
2894 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2895 }
2896 if interval.months != 0 {
2897 sql_bail!("REFRESH interval must not involve units larger than days");
2902 }
2903 let interval = interval.duration()?;
2904 if u64::try_from(interval.as_millis()).is_err()
2909 || Interval::from_duration(&interval).is_err()
2910 {
2911 sql_bail!("REFRESH interval too large");
2912 }
2913 if interval.as_micros() < 1000 {
2914 sql_bail!("REFRESH interval must be at least 1 ms")
2915 }
2916
2917 let mut aligned_to = match aligned_to {
2918 Some(aligned_to) => aligned_to,
2919 None => {
2920 soft_panic_or_log!(
2921 "ALIGNED TO should have been filled in by purification"
2922 );
2923 sql_bail!(
2924 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2925 )
2926 }
2927 };
2928
2929 transform_ast::transform(scx, &mut aligned_to)?;
2931
2932 let ecx = &ExprContext {
2933 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2934 name: "REFRESH EVERY ... ALIGNED TO",
2935 scope: &Scope::empty(),
2936 relation_type: &SqlRelationType::empty(),
2937 allow_aggregates: false,
2938 allow_subqueries: false,
2939 allow_parameters: false,
2940 allow_windows: false,
2941 };
2942 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2943 ecx,
2944 CastContext::Assignment,
2945 &SqlScalarType::MzTimestamp,
2946 )?;
2947 let aligned_to_const = aligned_to_hir
2949 .into_literal_mz_timestamp()
2950 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2951
2952 refresh_schedule.everies.push(RefreshEvery {
2953 interval,
2954 aligned_to: aligned_to_const,
2955 });
2956 }
2957 }
2958 }
2959
2960 if on_commits_seen > 1 {
2961 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2962 }
2963 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2964 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2965 }
2966
2967 if refresh_schedule == RefreshSchedule::default() {
2968 None
2969 } else {
2970 Some(refresh_schedule)
2971 }
2972 };
2973
2974 let as_of = stmt.as_of.map(Timestamp::from);
2975 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2976 let mut non_null_assertions = assert_not_null
2977 .into_iter()
2978 .map(normalize::column_name)
2979 .map(|assertion_name| {
2980 column_names
2981 .iter()
2982 .position(|col| col == &assertion_name)
2983 .ok_or_else(|| {
2984 sql_err!(
2985 "column {} in ASSERT NOT NULL option not found",
2986 assertion_name.quoted()
2987 )
2988 })
2989 })
2990 .collect::<Result<Vec<_>, _>>()?;
2991 non_null_assertions.sort();
2992 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2993 let dup = &column_names[*dup];
2994 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2995 }
2996
2997 if let Some(dup) = column_names.iter().duplicates().next() {
2998 sql_bail!("column {} specified more than once", dup.quoted());
2999 }
3000
3001 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
3004 Ok(true) => IfExistsBehavior::Skip,
3005 _ => stmt.if_exists,
3006 };
3007
3008 let mut replace = None;
3009 let mut if_not_exists = false;
3010 match if_exists {
3011 IfExistsBehavior::Replace => {
3012 let if_exists = true;
3013 let cascade = false;
3014 let replace_id = plan_drop_item(
3015 scx,
3016 ObjectType::MaterializedView,
3017 if_exists,
3018 partial_name.clone().into(),
3019 cascade,
3020 )?;
3021
3022 if let Some(id) = replace_id {
3024 let dependencies = expr.depends_on();
3025 let invalid_drop = scx
3026 .get_item(&id)
3027 .global_ids()
3028 .any(|gid| dependencies.contains(&gid));
3029 if invalid_drop {
3030 let item = scx.catalog.get_item(&id);
3031 sql_bail!(
3032 "cannot replace materialized view {0}: depended upon by new {0} definition",
3033 scx.catalog.resolve_full_name(item.name())
3034 );
3035 }
3036 replace = Some(id);
3037 }
3038 }
3039 IfExistsBehavior::Skip => if_not_exists = true,
3040 IfExistsBehavior::Error => (),
3041 }
3042 let drop_ids = replace
3043 .map(|id| {
3044 scx.catalog
3045 .item_dependents(id)
3046 .into_iter()
3047 .map(|id| id.unwrap_item_id())
3048 .collect()
3049 })
3050 .unwrap_or_default();
3051 let mut dependencies: BTreeSet<_> = expr
3052 .depends_on()
3053 .into_iter()
3054 .map(|gid| scx.catalog.resolve_item_id(&gid))
3055 .collect();
3056
3057 let mut replacement_target = None;
3059 if let Some(target_name) = &stmt.replacement_for {
3060 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3061
3062 let target = scx.get_item_by_resolved_name(target_name)?;
3063 if target.item_type() != CatalogItemType::MaterializedView {
3064 return Err(PlanError::InvalidReplacement {
3065 item_type: target.item_type(),
3066 item_name: scx.catalog.minimal_qualification(target.name()),
3067 replacement_type: CatalogItemType::MaterializedView,
3068 replacement_name: partial_name,
3069 });
3070 }
3071 if target.id().is_system() {
3072 sql_bail!(
3073 "cannot replace {} because it is required by the database system",
3074 scx.catalog.minimal_qualification(target.name()),
3075 );
3076 }
3077
3078 for dependent in scx.catalog.item_dependents(target.id()) {
3080 if let ObjectId::Item(id) = dependent
3081 && dependencies.contains(&id)
3082 {
3083 sql_bail!(
3084 "replacement would cause {} to depend on itself",
3085 scx.catalog.minimal_qualification(target.name()),
3086 );
3087 }
3088 }
3089
3090 dependencies.insert(target.id());
3091
3092 for use_id in target.used_by() {
3093 let use_item = scx.get_item(use_id);
3094 if use_item.replacement_target() == Some(target.id()) {
3095 sql_bail!(
3096 "cannot replace {} because it already has a replacement: {}",
3097 scx.catalog.minimal_qualification(target.name()),
3098 scx.catalog.minimal_qualification(use_item.name()),
3099 );
3100 }
3101 }
3102
3103 replacement_target = Some(target.id());
3104 }
3105
3106 validate_view_dependencies(scx, &dependencies)?;
3107
3108 let full_name = scx.catalog.resolve_full_name(&name);
3110 let partial_name = PartialItemName::from(full_name.clone());
3111 if let (IfExistsBehavior::Error, Ok(item)) =
3114 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3115 {
3116 return Err(PlanError::ItemAlreadyExists {
3117 name: full_name.to_string(),
3118 item_type: item.item_type(),
3119 });
3120 }
3121
3122 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3123 name,
3124 materialized_view: MaterializedView {
3125 create_sql,
3126 expr,
3127 dependencies: DependencyIds(dependencies),
3128 column_names,
3129 replacement_target,
3130 cluster_id,
3131 target_replica,
3132 non_null_assertions,
3133 compaction_window,
3134 refresh_schedule,
3135 as_of,
3136 },
3137 replace,
3138 drop_ids,
3139 if_not_exists,
3140 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3141 }))
3142}
3143
3144generate_extracted_config!(
3145 MaterializedViewOption,
3146 (AssertNotNull, Ident, AllowMultiple),
3147 (PartitionBy, Vec<Ident>),
3148 (RetainHistory, OptionalDuration),
3149 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3150);
3151
3152pub fn describe_create_sink(
3153 _: &StatementContext,
3154 _: CreateSinkStatement<Aug>,
3155) -> Result<StatementDesc, PlanError> {
3156 Ok(StatementDesc::new(None))
3157}
3158
3159generate_extracted_config!(
3160 CreateSinkOption,
3161 (Snapshot, bool),
3162 (PartitionStrategy, String),
3163 (Version, u64),
3164 (CommitInterval, Duration)
3165);
3166
3167pub fn plan_create_sink(
3168 scx: &StatementContext,
3169 stmt: CreateSinkStatement<Aug>,
3170) -> Result<Plan, PlanError> {
3171 let Some(name) = stmt.name.clone() else {
3173 return Err(PlanError::MissingName(CatalogItemType::Sink));
3174 };
3175 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3176 let full_name = scx.catalog.resolve_full_name(&name);
3177 let partial_name = PartialItemName::from(full_name.clone());
3178 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3179 return Err(PlanError::ItemAlreadyExists {
3180 name: full_name.to_string(),
3181 item_type: item.item_type(),
3182 });
3183 }
3184
3185 plan_sink(scx, stmt)
3186}
3187
3188fn plan_sink(
3193 scx: &StatementContext,
3194 mut stmt: CreateSinkStatement<Aug>,
3195) -> Result<Plan, PlanError> {
3196 let CreateSinkStatement {
3197 name,
3198 in_cluster: _,
3199 from,
3200 connection,
3201 format,
3202 envelope,
3203 mode,
3204 if_not_exists,
3205 with_options,
3206 } = stmt.clone();
3207
3208 let Some(name) = name else {
3209 return Err(PlanError::MissingName(CatalogItemType::Sink));
3210 };
3211 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3212
3213 let envelope = match (&connection, envelope, mode) {
3214 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3216 SinkEnvelope::Upsert
3217 }
3218 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3219 SinkEnvelope::Debezium
3220 }
3221 (CreateSinkConnection::Kafka { .. }, None, None) => {
3222 sql_bail!("ENVELOPE clause is required")
3223 }
3224 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3225 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3226 }
3227 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3229 SinkEnvelope::Upsert
3230 }
3231 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3232 SinkEnvelope::Append
3233 }
3234 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3235 sql_bail!("MODE clause is required")
3236 }
3237 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3238 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3239 }
3240 };
3241
3242 let from_name = &from;
3243 let from = scx.get_item_by_resolved_name(&from)?;
3244
3245 {
3246 use CatalogItemType::*;
3247 match from.item_type() {
3248 Table | Source | MaterializedView => {
3249 if from.replacement_target().is_some() {
3250 let name = scx.catalog.minimal_qualification(from.name());
3251 return Err(PlanError::InvalidSinkFrom {
3252 name: name.to_string(),
3253 item_type: format!("replacement {}", from.item_type()),
3254 });
3255 }
3256 }
3257 Sink | View | Index | Type | Func | Secret | Connection => {
3258 let name = scx.catalog.minimal_qualification(from.name());
3259 return Err(PlanError::InvalidSinkFrom {
3260 name: name.to_string(),
3261 item_type: from.item_type().to_string(),
3262 });
3263 }
3264 }
3265 }
3266
3267 if from.id().is_system() {
3268 bail_unsupported!("creating a sink directly on a catalog object");
3269 }
3270
3271 let desc = from
3272 .relation_desc()
3273 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3274 let key_indices = match &connection {
3275 CreateSinkConnection::Kafka { key: Some(key), .. }
3276 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3277 let key_columns = key
3278 .key_columns
3279 .clone()
3280 .into_iter()
3281 .map(normalize::column_name)
3282 .collect::<Vec<_>>();
3283 let mut uniq = BTreeSet::new();
3284 for col in key_columns.iter() {
3285 if !uniq.insert(col) {
3286 sql_bail!("duplicate column referenced in KEY: {}", col);
3287 }
3288 }
3289 let indices = key_columns
3290 .iter()
3291 .map(|col| {
3292 let name_idx =
3293 desc.get_by_name(col)
3294 .map(|(idx, _type)| idx)
3295 .ok_or_else(|| {
3296 sql_err!("column referenced in KEY does not exist: {}", col)
3297 })?;
3298 if desc.get_unambiguous_name(name_idx).is_none() {
3299 sql_bail!("column referenced in KEY is ambiguous: {}", col);
3300 }
3301 Ok(name_idx)
3302 })
3303 .collect::<Result<Vec<_>, _>>()?;
3304
3305 if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3308 let cols: Vec<_> = desc.iter().collect();
3309 for &idx in &indices {
3310 let (col_name, col_type) = cols[idx];
3311 let scalar = &col_type.scalar_type;
3312 let is_valid = matches!(
3313 scalar,
3314 SqlScalarType::Bool
3316 | SqlScalarType::Int16
3317 | SqlScalarType::Int32
3318 | SqlScalarType::Int64
3319 | SqlScalarType::UInt16
3320 | SqlScalarType::UInt32
3321 | SqlScalarType::UInt64
3322 | SqlScalarType::Numeric { .. }
3324 | SqlScalarType::Date
3326 | SqlScalarType::Time
3327 | SqlScalarType::Timestamp { .. }
3328 | SqlScalarType::TimestampTz { .. }
3329 | SqlScalarType::Interval
3330 | SqlScalarType::MzTimestamp
3331 | SqlScalarType::String
3333 | SqlScalarType::Char { .. }
3334 | SqlScalarType::VarChar { .. }
3335 | SqlScalarType::PgLegacyChar
3336 | SqlScalarType::PgLegacyName
3337 | SqlScalarType::Bytes
3338 | SqlScalarType::Jsonb
3339 | SqlScalarType::Uuid
3341 | SqlScalarType::Oid
3342 | SqlScalarType::RegProc
3343 | SqlScalarType::RegType
3344 | SqlScalarType::RegClass
3345 | SqlScalarType::MzAclItem
3346 | SqlScalarType::AclItem
3347 | SqlScalarType::Int2Vector
3348 );
3349 if !is_valid {
3350 return Err(PlanError::IcebergSinkUnsupportedKeyType {
3351 column: col_name.to_string(),
3352 column_type: format!("{:?}", scalar),
3353 });
3354 }
3355 }
3356 }
3357
3358 let is_valid_key = desc
3359 .typ()
3360 .keys
3361 .iter()
3362 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3363
3364 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3365 if key.not_enforced {
3366 scx.catalog
3367 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3368 key: key_columns.clone(),
3369 name: name.item.clone(),
3370 })
3371 } else {
3372 return Err(PlanError::UpsertSinkWithInvalidKey {
3373 name: from_name.full_name_str(),
3374 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3375 valid_keys: desc
3376 .typ()
3377 .keys
3378 .iter()
3379 .map(|key| {
3380 key.iter()
3381 .map(|col| desc.get_name(*col).as_str().into())
3382 .collect()
3383 })
3384 .collect(),
3385 });
3386 }
3387 }
3388 Some(indices)
3389 }
3390 CreateSinkConnection::Kafka { key: None, .. }
3391 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3392 };
3393
3394 if key_indices.is_some() && envelope == SinkEnvelope::Append {
3395 sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3396 }
3397
3398 if envelope == SinkEnvelope::Append {
3400 if let CreateSinkConnection::Iceberg { .. } = &connection {
3401 use mz_storage_types::sinks::{
3402 ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3403 };
3404 for (col_name, _) in desc.iter() {
3405 if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3406 || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3407 {
3408 sql_bail!(
3409 "column {} conflicts with the system column that MODE APPEND \
3410 adds to the Iceberg table",
3411 col_name.quoted()
3412 );
3413 }
3414 }
3415 }
3416 }
3417
3418 let headers_index = match &connection {
3419 CreateSinkConnection::Kafka {
3420 headers: Some(headers),
3421 ..
3422 } => {
3423 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3424
3425 match envelope {
3426 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3427 SinkEnvelope::Debezium => {
3428 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3429 }
3430 };
3431
3432 let headers = normalize::column_name(headers.clone());
3433 let (idx, ty) = desc
3434 .get_by_name(&headers)
3435 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3436
3437 if desc.get_unambiguous_name(idx).is_none() {
3438 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3439 }
3440
3441 match &ty.scalar_type {
3442 SqlScalarType::Map { value_type, .. }
3443 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3444 _ => sql_bail!(
3445 "HEADERS column must have type map[text => text] or map[text => bytea]"
3446 ),
3447 }
3448
3449 Some(idx)
3450 }
3451 _ => None,
3452 };
3453
3454 let relation_key_indices = desc.typ().keys.get(0).cloned();
3456
3457 let key_desc_and_indices = key_indices.map(|key_indices| {
3458 let cols = desc
3459 .iter()
3460 .map(|(name, ty)| (name.clone(), ty.clone()))
3461 .collect::<Vec<_>>();
3462 let (names, types): (Vec<_>, Vec<_>) =
3463 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3464 let typ = SqlRelationType::new(types);
3465 (RelationDesc::new(typ, names), key_indices)
3466 });
3467
3468 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3469 return Err(PlanError::UpsertSinkWithoutKey);
3470 }
3471
3472 let CreateSinkOptionExtracted {
3473 snapshot,
3474 version,
3475 partition_strategy: _,
3476 seen: _,
3477 commit_interval,
3478 } = with_options.try_into()?;
3479
3480 let connection_builder = match connection {
3481 CreateSinkConnection::Kafka {
3482 connection,
3483 options,
3484 ..
3485 } => kafka_sink_builder(
3486 scx,
3487 connection,
3488 options,
3489 format,
3490 relation_key_indices,
3491 key_desc_and_indices,
3492 headers_index,
3493 desc.into_owned(),
3494 envelope,
3495 from.id(),
3496 commit_interval,
3497 )?,
3498 CreateSinkConnection::Iceberg {
3499 catalog_connection,
3500 aws_connection,
3501 options,
3502 ..
3503 } => iceberg_sink_builder(
3504 scx,
3505 catalog_connection,
3506 aws_connection,
3507 options,
3508 relation_key_indices,
3509 key_desc_and_indices,
3510 commit_interval,
3511 &desc,
3512 )?,
3513 };
3514
3515 let with_snapshot = snapshot.unwrap_or(true);
3517 let version = version.unwrap_or(0);
3519
3520 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3524 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3525
3526 Ok(Plan::CreateSink(CreateSinkPlan {
3527 name,
3528 sink: Sink {
3529 create_sql,
3530 from: from.global_id(),
3531 connection: connection_builder,
3532 envelope,
3533 version,
3534 commit_interval,
3535 },
3536 with_snapshot,
3537 if_not_exists,
3538 in_cluster: in_cluster.id(),
3539 }))
3540}
3541
3542fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3543 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3544
3545 let existing_keys = desc
3546 .typ()
3547 .keys
3548 .iter()
3549 .map(|key_columns| {
3550 key_columns
3551 .iter()
3552 .map(|col| desc.get_name(*col).as_str())
3553 .join(", ")
3554 })
3555 .join(", ");
3556
3557 sql_err!(
3558 "Key constraint ({}) conflicts with existing key ({})",
3559 user_keys,
3560 existing_keys
3561 )
3562}
3563
3564#[derive(Debug, Default, PartialEq, Clone)]
3567pub struct CsrConfigOptionExtracted {
3568 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3569 pub(crate) avro_key_fullname: Option<String>,
3570 pub(crate) avro_value_fullname: Option<String>,
3571 pub(crate) null_defaults: bool,
3572 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3573 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3574 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3575 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3576}
3577
3578impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3579 type Error = crate::plan::PlanError;
3580 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3581 let mut extracted = CsrConfigOptionExtracted::default();
3582 let mut common_doc_comments = BTreeMap::new();
3583 for option in v {
3584 if !extracted.seen.insert(option.name.clone()) {
3585 return Err(PlanError::Unstructured({
3586 format!("{} specified more than once", option.name)
3587 }));
3588 }
3589 let option_name = option.name.clone();
3590 let option_name_str = option_name.to_ast_string_simple();
3591 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3592 option_name: option_name.to_ast_string_simple(),
3593 err: e.into(),
3594 };
3595 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3596 val.map(|s| match s {
3597 WithOptionValue::Value(Value::String(s)) => {
3598 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3599 }
3600 _ => Err("must be a string".to_string()),
3601 })
3602 .transpose()
3603 .map_err(PlanError::Unstructured)
3604 .map_err(better_error)
3605 };
3606 match option.name {
3607 CsrConfigOptionName::AvroKeyFullname => {
3608 extracted.avro_key_fullname =
3609 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3610 }
3611 CsrConfigOptionName::AvroValueFullname => {
3612 extracted.avro_value_fullname =
3613 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3614 }
3615 CsrConfigOptionName::NullDefaults => {
3616 extracted.null_defaults =
3617 <bool>::try_from_value(option.value).map_err(better_error)?;
3618 }
3619 CsrConfigOptionName::AvroDocOn(doc_on) => {
3620 let value = String::try_from_value(option.value.ok_or_else(|| {
3621 PlanError::InvalidOptionValue {
3622 option_name: option_name_str,
3623 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3624 }
3625 })?)
3626 .map_err(better_error)?;
3627 let key = match doc_on.identifier {
3628 DocOnIdentifier::Column(ast::ColumnName {
3629 relation: ResolvedItemName::Item { id, .. },
3630 column: ResolvedColumnReference::Column { name, index: _ },
3631 }) => DocTarget::Field {
3632 object_id: id,
3633 column_name: name,
3634 },
3635 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3636 DocTarget::Type(id)
3637 }
3638 _ => sql_bail!("invalid DOC ON identifier"),
3639 };
3640
3641 match doc_on.for_schema {
3642 DocOnSchema::KeyOnly => {
3643 extracted.key_doc_options.insert(key, value);
3644 }
3645 DocOnSchema::ValueOnly => {
3646 extracted.value_doc_options.insert(key, value);
3647 }
3648 DocOnSchema::All => {
3649 common_doc_comments.insert(key, value);
3650 }
3651 }
3652 }
3653 CsrConfigOptionName::KeyCompatibilityLevel => {
3654 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3655 }
3656 CsrConfigOptionName::ValueCompatibilityLevel => {
3657 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3658 }
3659 }
3660 }
3661
3662 for (key, value) in common_doc_comments {
3663 if !extracted.key_doc_options.contains_key(&key) {
3664 extracted.key_doc_options.insert(key.clone(), value.clone());
3665 }
3666 if !extracted.value_doc_options.contains_key(&key) {
3667 extracted.value_doc_options.insert(key, value);
3668 }
3669 }
3670 Ok(extracted)
3671 }
3672}
3673
3674fn iceberg_sink_builder(
3675 scx: &StatementContext,
3676 catalog_connection: ResolvedItemName,
3677 storage_connection: Option<ResolvedItemName>,
3678 options: Vec<IcebergSinkConfigOption<Aug>>,
3679 relation_key_indices: Option<Vec<usize>>,
3680 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3681 commit_interval: Option<Duration>,
3682 desc: &RelationDesc,
3683) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3684 ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3688 .map_err(|e| sql_err!("{}", e))?;
3689
3690 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3691 let catalog_connection_id = catalog_connection_item.id();
3692 if !matches!(
3693 catalog_connection_item.connection()?,
3694 Connection::IcebergCatalog(_)
3695 ) {
3696 sql_bail!(
3697 "{} is not an iceberg catalog connection",
3698 scx.catalog
3699 .resolve_full_name(catalog_connection_item.name())
3700 .to_string()
3701 .quoted()
3702 );
3703 };
3704
3705 let storage_connection_item = storage_connection
3706 .map(|c| scx.get_item_by_resolved_name(&c))
3707 .transpose()?;
3708 let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id());
3709 if let Some(c) = &storage_connection_item
3710 && !matches!(c.connection()?, Connection::Aws(_))
3711 {
3712 sql_bail!(
3713 "{} is not an AWS connection",
3714 scx.catalog.resolve_full_name(c.name()).to_string().quoted()
3715 );
3716 }
3717
3718 let IcebergSinkConfigOptionExtracted {
3719 table,
3720 namespace,
3721 seen: _,
3722 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3723
3724 let Some(table) = table else {
3725 sql_bail!("Iceberg sink must specify TABLE");
3726 };
3727 let Some(namespace) = namespace else {
3728 sql_bail!("Iceberg sink must specify NAMESPACE");
3729 };
3730 if commit_interval.is_none() {
3731 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3732 }
3733
3734 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3735 catalog_connection_id,
3736 catalog_connection: catalog_connection_id,
3737 storage_connection_id,
3738 storage_connection: storage_connection_id,
3739 table,
3740 namespace,
3741 relation_key_indices,
3742 key_desc_and_indices,
3743 }))
3744}
3745
3746fn kafka_sink_builder(
3747 scx: &StatementContext,
3748 connection: ResolvedItemName,
3749 options: Vec<KafkaSinkConfigOption<Aug>>,
3750 format: Option<FormatSpecifier<Aug>>,
3751 relation_key_indices: Option<Vec<usize>>,
3752 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3753 headers_index: Option<usize>,
3754 value_desc: RelationDesc,
3755 envelope: SinkEnvelope,
3756 sink_from: CatalogItemId,
3757 commit_interval: Option<Duration>,
3758) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3759 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3761 let connection_id = connection_item.id();
3762 match connection_item.connection()? {
3763 Connection::Kafka(_) => (),
3764 _ => sql_bail!(
3765 "{} is not a kafka connection",
3766 scx.catalog.resolve_full_name(connection_item.name())
3767 ),
3768 };
3769
3770 if commit_interval.is_some() {
3771 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3772 }
3773
3774 let KafkaSinkConfigOptionExtracted {
3775 topic,
3776 compression_type,
3777 partition_by,
3778 progress_group_id_prefix,
3779 transactional_id_prefix,
3780 legacy_ids,
3781 topic_config,
3782 topic_metadata_refresh_interval,
3783 topic_partition_count,
3784 topic_replication_factor,
3785 seen: _,
3786 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3787
3788 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3789 (Some(_), Some(true)) => {
3790 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3791 }
3792 (None, Some(true)) => KafkaIdStyle::Legacy,
3793 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3794 };
3795
3796 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3797 (Some(_), Some(true)) => {
3798 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3799 }
3800 (None, Some(true)) => KafkaIdStyle::Legacy,
3801 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3802 };
3803
3804 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3805
3806 if topic_metadata_refresh_interval > MAX_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3807 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3810 } else if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
3811 sql_bail!("TOPIC METADATA REFRESH INTERVAL must be at least 1 second");
3814 }
3815
3816 let assert_positive = |val: Option<i32>, name: &str| {
3817 if let Some(val) = val {
3818 if val <= 0 {
3819 sql_bail!("{} must be a positive integer", name);
3820 }
3821 }
3822 val.map(NonNeg::try_from)
3823 .transpose()
3824 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3825 };
3826 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3827 let topic_replication_factor =
3828 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3829
3830 let gen_avro_schema_options = |conn| {
3833 let CsrConnectionAvro {
3834 connection:
3835 CsrConnection {
3836 connection,
3837 options,
3838 },
3839 seed,
3840 key_strategy,
3841 value_strategy,
3842 } = conn;
3843 if seed.is_some() {
3844 sql_bail!("SEED option does not make sense with sinks");
3845 }
3846 if key_strategy.is_some() {
3847 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3848 }
3849 if value_strategy.is_some() {
3850 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3851 }
3852
3853 let item = scx.get_item_by_resolved_name(&connection)?;
3854 let csr_connection = match item.connection()? {
3855 Connection::Csr(_) => item.id(),
3856 _ => {
3857 sql_bail!(
3858 "{} is not a schema registry connection",
3859 scx.catalog
3860 .resolve_full_name(item.name())
3861 .to_string()
3862 .quoted()
3863 )
3864 }
3865 };
3866 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3867
3868 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3869 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3870 }
3871
3872 if key_desc_and_indices.is_some()
3873 && (extracted_options.avro_key_fullname.is_some()
3874 ^ extracted_options.avro_value_fullname.is_some())
3875 {
3876 sql_bail!(
3877 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3878 );
3879 }
3880
3881 Ok((csr_connection, extracted_options))
3882 };
3883
3884 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3885 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3886 Format::Bytes if desc.arity() == 1 => {
3887 let col_type = &desc.typ().column_types[0].scalar_type;
3888 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3889 bail_unsupported!(format!(
3890 "BYTES format with non-encodable type: {:?}",
3891 col_type
3892 ));
3893 }
3894
3895 Ok(KafkaSinkFormatType::Bytes)
3896 }
3897 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3898 Format::Bytes | Format::Text => {
3899 bail_unsupported!("BYTES or TEXT format with multiple columns")
3900 }
3901 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3902 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3903 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3904 let schema = if is_key {
3905 AvroSchemaGenerator::new(
3906 desc.clone(),
3907 false,
3908 options.key_doc_options,
3909 options.avro_key_fullname.as_deref().unwrap_or("row"),
3910 options.null_defaults,
3911 Some(sink_from),
3912 false,
3913 )?
3914 .schema()
3915 .to_string()
3916 } else {
3917 AvroSchemaGenerator::new(
3918 desc.clone(),
3919 matches!(envelope, SinkEnvelope::Debezium),
3920 options.value_doc_options,
3921 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3922 options.null_defaults,
3923 Some(sink_from),
3924 true,
3925 )?
3926 .schema()
3927 .to_string()
3928 };
3929 Ok(KafkaSinkFormatType::Avro {
3930 schema,
3931 compatibility_level: if is_key {
3932 options.key_compatibility_level
3933 } else {
3934 options.value_compatibility_level
3935 },
3936 wire_format: WireFormat::Confluent {
3937 registry: Some(csr_connection),
3938 },
3939 })
3940 }
3941 format => bail_unsupported!(format!("sink format {:?}", format)),
3942 };
3943
3944 let partition_by = match &partition_by {
3945 Some(partition_by) => {
3946 let mut scope = Scope::from_source(None, value_desc.iter_names());
3947
3948 match envelope {
3949 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3950 SinkEnvelope::Debezium => {
3951 let key_indices: HashSet<_> = key_desc_and_indices
3952 .as_ref()
3953 .map(|(_desc, indices)| indices.as_slice())
3954 .unwrap_or_default()
3955 .into_iter()
3956 .collect();
3957 for (i, item) in scope.items.iter_mut().enumerate() {
3958 if !key_indices.contains(&i) {
3959 item.error_if_referenced = Some(|_table, column| {
3960 PlanError::InvalidPartitionByEnvelopeDebezium {
3961 column_name: column.to_string(),
3962 }
3963 });
3964 }
3965 }
3966 }
3967 };
3968
3969 let ecx = &ExprContext {
3970 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3971 name: "PARTITION BY",
3972 scope: &scope,
3973 relation_type: value_desc.typ(),
3974 allow_aggregates: false,
3975 allow_subqueries: false,
3976 allow_parameters: false,
3977 allow_windows: false,
3978 };
3979 let expr = plan_expr(ecx, partition_by)?.cast_to(
3980 ecx,
3981 CastContext::Assignment,
3982 &SqlScalarType::UInt64,
3983 )?;
3984 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
3985
3986 Some(expr)
3987 }
3988 _ => None,
3989 };
3990
3991 let format = match format {
3993 Some(FormatSpecifier::KeyValue { key, value }) => {
3994 let key_format = match key_desc_and_indices.as_ref() {
3995 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3996 None => None,
3997 };
3998 KafkaSinkFormat {
3999 value_format: map_format(value, &value_desc, false)?,
4000 key_format,
4001 }
4002 }
4003 Some(FormatSpecifier::Bare(format)) => {
4004 let key_format = match key_desc_and_indices.as_ref() {
4005 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4006 None => None,
4007 };
4008 KafkaSinkFormat {
4009 value_format: map_format(format, &value_desc, false)?,
4010 key_format,
4011 }
4012 }
4013 None => bail_unsupported!("sink without format"),
4014 };
4015
4016 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4017 connection_id,
4018 connection: connection_id,
4019 format,
4020 topic: topic_name,
4021 relation_key_indices,
4022 key_desc_and_indices,
4023 headers_index,
4024 value_desc,
4025 partition_by,
4026 compression_type,
4027 progress_group_id,
4028 transactional_id,
4029 topic_options: KafkaTopicOptions {
4030 partition_count: topic_partition_count,
4031 replication_factor: topic_replication_factor,
4032 topic_config: topic_config.unwrap_or_default(),
4033 },
4034 topic_metadata_refresh_interval,
4035 }))
4036}
4037
4038pub fn describe_create_index(
4039 _: &StatementContext,
4040 _: CreateIndexStatement<Aug>,
4041) -> Result<StatementDesc, PlanError> {
4042 Ok(StatementDesc::new(None))
4043}
4044
4045pub fn plan_create_index(
4046 scx: &StatementContext,
4047 mut stmt: CreateIndexStatement<Aug>,
4048) -> Result<Plan, PlanError> {
4049 let CreateIndexStatement {
4050 name,
4051 on_name,
4052 in_cluster,
4053 key_parts,
4054 with_options,
4055 if_not_exists,
4056 } = &mut stmt;
4057 let on = scx.get_item_by_resolved_name(on_name)?;
4058
4059 {
4060 use CatalogItemType::*;
4061 match on.item_type() {
4062 Table | Source | View | MaterializedView => {
4063 if on.replacement_target().is_some() {
4064 sql_bail!(
4065 "index cannot be created on {} because it is a replacement {}",
4066 on_name.full_name_str(),
4067 on.item_type(),
4068 );
4069 }
4070 }
4071 Sink | Index | Type | Func | Secret | Connection => {
4072 sql_bail!(
4073 "index cannot be created on {} because it is a {}",
4074 on_name.full_name_str(),
4075 on.item_type(),
4076 );
4077 }
4078 }
4079 }
4080
4081 let on_desc = on
4082 .relation_desc()
4083 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4084
4085 let filled_key_parts = match key_parts {
4086 Some(kp) => kp.to_vec(),
4087 None => {
4088 let mut name_counts = BTreeMap::new();
4093 for name in on_desc.iter_names() {
4094 *name_counts.entry(name).or_insert(0usize) += 1;
4095 }
4096 let key = on_desc.typ().default_key();
4097 key.iter()
4098 .map(|i| {
4099 let name = on_desc.get_name(*i);
4100 if name_counts.get(name).copied() == Some(1) {
4101 Expr::Identifier(vec![name.clone().into()])
4102 } else {
4103 Expr::Value(Value::Number((i + 1).to_string()))
4104 }
4105 })
4106 .collect()
4107 }
4108 };
4109 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4110
4111 let index_name = if let Some(name) = name {
4112 QualifiedItemName {
4113 qualifiers: on.name().qualifiers.clone(),
4114 item: normalize::ident(name.clone()),
4115 }
4116 } else {
4117 let mut idx_name = QualifiedItemName {
4118 qualifiers: on.name().qualifiers.clone(),
4119 item: on.name().item.clone(),
4120 };
4121 if key_parts.is_none() {
4122 idx_name.item += "_primary_idx";
4124 } else {
4125 let index_name_col_suffix = keys
4128 .iter()
4129 .map(|k| match k {
4130 mz_expr::MirScalarExpr::Column(i, name) => {
4131 match (on_desc.get_unambiguous_name(*i), &name.0) {
4132 (Some(col_name), _) => col_name.to_string(),
4133 (None, Some(name)) => name.to_string(),
4134 (None, None) => format!("{}", i + 1),
4135 }
4136 }
4137 _ => "expr".to_string(),
4138 })
4139 .join("_");
4140 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4141 .expect("write on strings cannot fail");
4142 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4143 }
4144
4145 if !*if_not_exists {
4146 scx.catalog.find_available_name(idx_name)
4147 } else {
4148 idx_name
4149 }
4150 };
4151
4152 let full_name = scx.catalog.resolve_full_name(&index_name);
4154 let partial_name = PartialItemName::from(full_name.clone());
4155 if let (Ok(item), false, false) = (
4163 scx.catalog.resolve_item_or_type(&partial_name),
4164 *if_not_exists,
4165 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4166 ) {
4167 return Err(PlanError::ItemAlreadyExists {
4168 name: full_name.to_string(),
4169 item_type: item.item_type(),
4170 });
4171 }
4172
4173 let options = plan_index_options(scx, with_options.clone())?;
4174 let cluster_id = match in_cluster {
4175 None => scx.resolve_cluster(None)?.id(),
4176 Some(in_cluster) => in_cluster.id,
4177 };
4178
4179 *in_cluster = Some(ResolvedClusterName {
4180 id: cluster_id,
4181 print_name: None,
4182 });
4183
4184 *name = Some(Ident::new(index_name.item.clone())?);
4186 *key_parts = Some(filled_key_parts);
4187 let if_not_exists = *if_not_exists;
4188
4189 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4190 let compaction_window = options.iter().find_map(|o| {
4191 #[allow(irrefutable_let_patterns)]
4192 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4193 Some(lcw.clone())
4194 } else {
4195 None
4196 }
4197 });
4198
4199 Ok(Plan::CreateIndex(CreateIndexPlan {
4200 name: index_name,
4201 index: Index {
4202 create_sql,
4203 on: on.global_id(),
4204 keys,
4205 cluster_id,
4206 compaction_window,
4207 },
4208 if_not_exists,
4209 }))
4210}
4211
4212pub fn describe_create_type(
4213 _: &StatementContext,
4214 _: CreateTypeStatement<Aug>,
4215) -> Result<StatementDesc, PlanError> {
4216 Ok(StatementDesc::new(None))
4217}
4218
4219pub fn plan_create_type(
4220 scx: &StatementContext,
4221 stmt: CreateTypeStatement<Aug>,
4222) -> Result<Plan, PlanError> {
4223 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4224 let CreateTypeStatement { name, as_type, .. } = stmt;
4225
4226 fn validate_data_type(
4227 scx: &StatementContext,
4228 data_type: ResolvedDataType,
4229 as_type: &str,
4230 key: &str,
4231 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4232 let (id, modifiers) = match data_type {
4233 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4234 _ => sql_bail!(
4235 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4236 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4237 as_type,
4238 key,
4239 data_type.human_readable_name(),
4240 ),
4241 };
4242
4243 let item = scx.catalog.get_item(&id);
4244 match item.type_details() {
4245 None => sql_bail!(
4246 "{} must be of class type, but received {} which is of class {}",
4247 key,
4248 scx.catalog.resolve_full_name(item.name()),
4249 item.item_type()
4250 ),
4251 Some(CatalogTypeDetails {
4252 typ: CatalogType::Char,
4253 ..
4254 }) => {
4255 bail_unsupported!("embedding char type in a list or map")
4256 }
4257 _ => {
4258 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4260
4261 Ok((id, modifiers))
4262 }
4263 }
4264 }
4265
4266 let inner = match as_type {
4267 CreateTypeAs::List { options } => {
4268 let CreateTypeListOptionExtracted {
4269 element_type,
4270 seen: _,
4271 } = CreateTypeListOptionExtracted::try_from(options)?;
4272 let element_type =
4273 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4274 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4275 CatalogType::List {
4276 element_reference: id,
4277 element_modifiers: modifiers,
4278 }
4279 }
4280 CreateTypeAs::Map { options } => {
4281 let CreateTypeMapOptionExtracted {
4282 key_type,
4283 value_type,
4284 seen: _,
4285 } = CreateTypeMapOptionExtracted::try_from(options)?;
4286 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4287 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4288 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4289 let (value_id, value_modifiers) =
4290 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4291 CatalogType::Map {
4292 key_reference: key_id,
4293 key_modifiers,
4294 value_reference: value_id,
4295 value_modifiers,
4296 }
4297 }
4298 CreateTypeAs::Record { column_defs } => {
4299 let mut fields = vec![];
4300 for column_def in column_defs {
4301 let data_type = column_def.data_type;
4302 let key = ident(column_def.name.clone());
4303 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4304 fields.push(CatalogRecordField {
4305 name: ColumnName::from(key.clone()),
4306 type_reference: id,
4307 type_modifiers: modifiers,
4308 });
4309 }
4310 CatalogType::Record { fields }
4311 }
4312 };
4313
4314 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4315
4316 let full_name = scx.catalog.resolve_full_name(&name);
4318 let partial_name = PartialItemName::from(full_name.clone());
4319 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4322 if item.item_type().conflicts_with_type() {
4323 return Err(PlanError::ItemAlreadyExists {
4324 name: full_name.to_string(),
4325 item_type: item.item_type(),
4326 });
4327 }
4328 }
4329
4330 Ok(Plan::CreateType(CreateTypePlan {
4331 name,
4332 typ: Type { create_sql, inner },
4333 }))
4334}
4335
4336generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4337
4338generate_extracted_config!(
4339 CreateTypeMapOption,
4340 (KeyType, ResolvedDataType),
4341 (ValueType, ResolvedDataType)
4342);
4343
4344#[derive(Debug)]
4345pub enum PlannedAlterRoleOption {
4346 Attributes(PlannedRoleAttributes),
4347 Variable(PlannedRoleVariable),
4348}
4349
4350#[derive(Debug, Clone)]
4351pub struct PlannedRoleAttributes {
4352 pub inherit: Option<bool>,
4353 pub password: Option<Password>,
4354 pub scram_iterations: Option<NonZeroU32>,
4355 pub nopassword: Option<bool>,
4359 pub superuser: Option<bool>,
4360 pub login: Option<bool>,
4361}
4362
4363fn plan_role_attributes(
4364 options: Vec<RoleAttribute>,
4365 scx: &StatementContext,
4366) -> Result<PlannedRoleAttributes, PlanError> {
4367 let mut planned_attributes = PlannedRoleAttributes {
4368 inherit: None,
4369 password: None,
4370 scram_iterations: None,
4371 superuser: None,
4372 login: None,
4373 nopassword: None,
4374 };
4375
4376 for option in options {
4377 match option {
4378 RoleAttribute::Inherit | RoleAttribute::NoInherit
4379 if planned_attributes.inherit.is_some() =>
4380 {
4381 sql_bail!("conflicting or redundant options");
4382 }
4383 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4384 bail_never_supported!(
4385 "CREATECLUSTER attribute",
4386 "sql/create-role/#details",
4387 "Use system privileges instead."
4388 );
4389 }
4390 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4391 bail_never_supported!(
4392 "CREATEDB attribute",
4393 "sql/create-role/#details",
4394 "Use system privileges instead."
4395 );
4396 }
4397 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4398 bail_never_supported!(
4399 "CREATEROLE attribute",
4400 "sql/create-role/#details",
4401 "Use system privileges instead."
4402 );
4403 }
4404 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4405 sql_bail!("conflicting or redundant options");
4406 }
4407
4408 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4409 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4410 RoleAttribute::Password(password) => {
4411 if let Some(password) = password {
4412 planned_attributes.password = Some(password.into());
4413 planned_attributes.scram_iterations =
4414 Some(scx.catalog.system_vars().scram_iterations())
4415 } else {
4416 planned_attributes.nopassword = Some(true);
4417 }
4418 }
4419 RoleAttribute::SuperUser => {
4420 if planned_attributes.superuser == Some(false) {
4421 sql_bail!("conflicting or redundant options");
4422 }
4423 planned_attributes.superuser = Some(true);
4424 }
4425 RoleAttribute::NoSuperUser => {
4426 if planned_attributes.superuser == Some(true) {
4427 sql_bail!("conflicting or redundant options");
4428 }
4429 planned_attributes.superuser = Some(false);
4430 }
4431 RoleAttribute::Login => {
4432 if planned_attributes.login == Some(false) {
4433 sql_bail!("conflicting or redundant options");
4434 }
4435 planned_attributes.login = Some(true);
4436 }
4437 RoleAttribute::NoLogin => {
4438 if planned_attributes.login == Some(true) {
4439 sql_bail!("conflicting or redundant options");
4440 }
4441 planned_attributes.login = Some(false);
4442 }
4443 }
4444 }
4445 if planned_attributes.inherit == Some(false) {
4446 bail_unsupported!("non inherit roles");
4447 }
4448
4449 Ok(planned_attributes)
4450}
4451
4452#[derive(Debug)]
4453pub enum PlannedRoleVariable {
4454 Set { name: String, value: VariableValue },
4455 Reset { name: String },
4456}
4457
4458impl PlannedRoleVariable {
4459 pub fn name(&self) -> &str {
4460 match self {
4461 PlannedRoleVariable::Set { name, .. } => name,
4462 PlannedRoleVariable::Reset { name } => name,
4463 }
4464 }
4465}
4466
4467fn plan_role_variable(
4468 scx: &StatementContext,
4469 variable: SetRoleVar,
4470) -> Result<PlannedRoleVariable, PlanError> {
4471 let plan = match variable {
4472 SetRoleVar::Set { name, value } => {
4473 let name = name.to_string();
4474 let value = scl::plan_set_variable_to(value)?;
4475 if let VariableValue::Values(values) = &value {
4478 vars::check_transaction_isolation_feature_flag(
4479 &name,
4480 VarInput::SqlSet(values),
4481 scx.catalog.system_vars(),
4482 )?;
4483 }
4484 PlannedRoleVariable::Set { name, value }
4485 }
4486 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4487 name: name.to_string(),
4488 },
4489 };
4490 Ok(plan)
4491}
4492
4493pub fn describe_create_role(
4494 _: &StatementContext,
4495 _: CreateRoleStatement,
4496) -> Result<StatementDesc, PlanError> {
4497 Ok(StatementDesc::new(None))
4498}
4499
4500pub fn plan_create_role(
4501 scx: &StatementContext,
4502 CreateRoleStatement { name, options }: CreateRoleStatement,
4503) -> Result<Plan, PlanError> {
4504 let attributes = plan_role_attributes(options, scx)?;
4505 Ok(Plan::CreateRole(CreateRolePlan {
4506 name: normalize::ident(name),
4507 attributes: attributes.into(),
4508 }))
4509}
4510
4511pub fn plan_create_network_policy(
4512 ctx: &StatementContext,
4513 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4514) -> Result<Plan, PlanError> {
4515 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4516 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4517
4518 let Some(rule_defs) = policy_options.rules else {
4519 sql_bail!("RULES must be specified when creating network policies.");
4520 };
4521
4522 let mut rules = vec![];
4523 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4524 let NetworkPolicyRuleOptionExtracted {
4525 seen: _,
4526 direction,
4527 action,
4528 address,
4529 } = options.try_into()?;
4530 let (direction, action, address) = match (direction, action, address) {
4531 (Some(direction), Some(action), Some(address)) => (
4532 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4533 NetworkPolicyRuleAction::try_from(action.as_str())?,
4534 PolicyAddress::try_from(address.as_str())?,
4535 ),
4536 (_, _, _) => {
4537 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4538 }
4539 };
4540 rules.push(NetworkPolicyRule {
4541 name: normalize::ident(name),
4542 direction,
4543 action,
4544 address,
4545 });
4546 }
4547
4548 if rules.len()
4549 > ctx
4550 .catalog
4551 .system_vars()
4552 .max_rules_per_network_policy()
4553 .try_into()?
4554 {
4555 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4556 }
4557
4558 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4559 name: normalize::ident(name),
4560 rules,
4561 }))
4562}
4563
4564pub fn plan_alter_network_policy(
4565 ctx: &StatementContext,
4566 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4567) -> Result<Plan, PlanError> {
4568 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4569
4570 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4571 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4572
4573 let Some(rule_defs) = policy_options.rules else {
4574 sql_bail!("RULES must be specified when creating network policies.");
4575 };
4576
4577 let mut rules = vec![];
4578 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4579 let NetworkPolicyRuleOptionExtracted {
4580 seen: _,
4581 direction,
4582 action,
4583 address,
4584 } = options.try_into()?;
4585
4586 let (direction, action, address) = match (direction, action, address) {
4587 (Some(direction), Some(action), Some(address)) => (
4588 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4589 NetworkPolicyRuleAction::try_from(action.as_str())?,
4590 PolicyAddress::try_from(address.as_str())?,
4591 ),
4592 (_, _, _) => {
4593 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4594 }
4595 };
4596 rules.push(NetworkPolicyRule {
4597 name: normalize::ident(name),
4598 direction,
4599 action,
4600 address,
4601 });
4602 }
4603 if rules.len()
4604 > ctx
4605 .catalog
4606 .system_vars()
4607 .max_rules_per_network_policy()
4608 .try_into()?
4609 {
4610 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4611 }
4612
4613 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4614 id: policy.id(),
4615 name: normalize::ident(name),
4616 rules,
4617 }))
4618}
4619
4620pub fn describe_create_cluster(
4621 _: &StatementContext,
4622 _: CreateClusterStatement<Aug>,
4623) -> Result<StatementDesc, PlanError> {
4624 Ok(StatementDesc::new(None))
4625}
4626
4627generate_extracted_config!(
4633 ClusterOption,
4634 (AvailabilityZones, Vec<String>),
4635 (Disk, bool),
4636 (IntrospectionDebugging, bool),
4637 (IntrospectionInterval, OptionalDuration),
4638 (Managed, bool),
4639 (Replicas, Vec<ReplicaDefinition<Aug>>),
4640 (ReplicationFactor, u32),
4641 (Size, String),
4642 (Schedule, ClusterScheduleOptionValue),
4643 (WorkloadClass, OptionalString)
4644);
4645
4646generate_extracted_config!(
4647 NetworkPolicyOption,
4648 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4649);
4650
4651generate_extracted_config!(
4652 NetworkPolicyRuleOption,
4653 (Direction, String),
4654 (Action, String),
4655 (Address, String)
4656);
4657
4658generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4659
4660generate_extracted_config!(
4661 ClusterAlterUntilReadyOption,
4662 (Timeout, Duration),
4663 (OnTimeout, String)
4664);
4665
4666generate_extracted_config!(
4667 ClusterFeature,
4668 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4669 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4670 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4671 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4672 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4673 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4674 (
4675 EnableProjectionPushdownAfterRelationCse,
4676 Option<bool>,
4677 Default(None)
4678 )
4679);
4680
4681pub fn plan_create_cluster(
4685 scx: &StatementContext,
4686 stmt: CreateClusterStatement<Aug>,
4687) -> Result<Plan, PlanError> {
4688 let plan = plan_create_cluster_inner(scx, stmt)?;
4689
4690 if let CreateClusterVariant::Managed(_) = &plan.variant {
4692 let stmt = unplan_create_cluster(scx, plan.clone())
4693 .map_err(|e| PlanError::Replan(e.to_string()))?;
4694 let create_sql = stmt.to_ast_string_stable();
4695 let stmt = parse::parse(&create_sql)
4696 .map_err(|e| PlanError::Replan(e.to_string()))?
4697 .into_element()
4698 .ast;
4699 let (stmt, _resolved_ids) =
4700 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4701 let stmt = match stmt {
4702 Statement::CreateCluster(stmt) => stmt,
4703 stmt => {
4704 return Err(PlanError::Replan(format!(
4705 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4706 )));
4707 }
4708 };
4709 let replan =
4710 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4711 if plan != replan {
4712 return Err(PlanError::Replan(format!(
4713 "replan does not match: plan={plan:?}, replan={replan:?}"
4714 )));
4715 }
4716 }
4717
4718 Ok(Plan::CreateCluster(plan))
4719}
4720
4721pub fn plan_create_cluster_inner(
4722 scx: &StatementContext,
4723 CreateClusterStatement {
4724 name,
4725 options,
4726 features,
4727 }: CreateClusterStatement<Aug>,
4728) -> Result<CreateClusterPlan, PlanError> {
4729 let ClusterOptionExtracted {
4730 availability_zones,
4731 introspection_debugging,
4732 introspection_interval,
4733 managed,
4734 replicas,
4735 replication_factor,
4736 seen: _,
4737 size,
4738 disk,
4739 schedule,
4740 workload_class,
4741 }: ClusterOptionExtracted = options.try_into()?;
4742
4743 let managed = managed.unwrap_or_else(|| replicas.is_none());
4744
4745 if !scx.catalog.active_role_id().is_system() {
4746 if !features.is_empty() {
4747 sql_bail!("FEATURES not supported for non-system users");
4748 }
4749 if workload_class.is_some() {
4750 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4751 }
4752 }
4753
4754 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4755 let workload_class = workload_class.and_then(|v| v.0);
4756
4757 if managed {
4758 if replicas.is_some() {
4759 sql_bail!("REPLICAS not supported for managed clusters");
4760 }
4761 let Some(size) = size else {
4762 sql_bail!("SIZE must be specified for managed clusters");
4763 };
4764
4765 if disk.is_some() {
4766 if scx.catalog.is_cluster_size_cc(&size) {
4770 sql_bail!(
4771 "DISK option not supported for modern cluster sizes because disk is always enabled"
4772 );
4773 }
4774
4775 scx.catalog
4776 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4777 }
4778
4779 let compute = plan_compute_replica_config(
4780 introspection_interval,
4781 introspection_debugging.unwrap_or(false),
4782 )?;
4783
4784 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4785 replication_factor.unwrap_or_else(|| {
4786 scx.catalog
4787 .system_vars()
4788 .default_cluster_replication_factor()
4789 })
4790 } else {
4791 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4792 if replication_factor.is_some() {
4793 sql_bail!(
4794 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4795 );
4796 }
4797 0
4801 };
4802 let availability_zones = availability_zones.unwrap_or_default();
4803
4804 if !availability_zones.is_empty() {
4805 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4806 }
4807
4808 let ClusterFeatureExtracted {
4810 reoptimize_imported_views,
4811 enable_eager_delta_joins,
4812 enable_new_outer_join_lowering,
4813 enable_variadic_left_join_lowering,
4814 enable_letrec_fixpoint_analysis,
4815 enable_join_prioritize_arranged,
4816 enable_projection_pushdown_after_relation_cse,
4817 seen: _,
4818 } = ClusterFeatureExtracted::try_from(features)?;
4819 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4820 reoptimize_imported_views,
4821 enable_eager_delta_joins,
4822 enable_new_outer_join_lowering,
4823 enable_variadic_left_join_lowering,
4824 enable_letrec_fixpoint_analysis,
4825 enable_join_prioritize_arranged,
4826 enable_projection_pushdown_after_relation_cse,
4827 ..Default::default()
4828 };
4829
4830 let schedule = plan_cluster_schedule(schedule)?;
4831
4832 Ok(CreateClusterPlan {
4833 name: normalize::ident(name),
4834 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4835 replication_factor,
4836 size,
4837 availability_zones,
4838 compute,
4839 optimizer_feature_overrides,
4840 schedule,
4841 }),
4842 workload_class,
4843 })
4844 } else {
4845 let Some(replica_defs) = replicas else {
4846 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4847 };
4848 if availability_zones.is_some() {
4849 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4850 }
4851 if replication_factor.is_some() {
4852 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4853 }
4854 if introspection_debugging.is_some() {
4855 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4856 }
4857 if introspection_interval.is_some() {
4858 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4859 }
4860 if size.is_some() {
4861 sql_bail!("SIZE not supported for unmanaged clusters");
4862 }
4863 if disk.is_some() {
4864 sql_bail!("DISK not supported for unmanaged clusters");
4865 }
4866 if !features.is_empty() {
4867 sql_bail!("FEATURES not supported for unmanaged clusters");
4868 }
4869 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4870 sql_bail!(
4871 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4872 );
4873 }
4874
4875 let mut replicas = vec![];
4876 for ReplicaDefinition { name, options } in replica_defs {
4877 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4878 }
4879
4880 Ok(CreateClusterPlan {
4881 name: normalize::ident(name),
4882 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4883 workload_class,
4884 })
4885 }
4886}
4887
4888pub fn unplan_create_cluster(
4892 scx: &StatementContext,
4893 CreateClusterPlan {
4894 name,
4895 variant,
4896 workload_class,
4897 }: CreateClusterPlan,
4898) -> Result<CreateClusterStatement<Aug>, PlanError> {
4899 match variant {
4900 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4901 replication_factor,
4902 size,
4903 availability_zones,
4904 compute,
4905 optimizer_feature_overrides,
4906 schedule,
4907 }) => {
4908 let schedule = unplan_cluster_schedule(schedule);
4909 let OptimizerFeatureOverrides {
4910 enable_reduce_mfp_fusion: _,
4911 enable_cardinality_estimates: _,
4912 persist_fast_path_limit: _,
4913 reoptimize_imported_views,
4914 enable_eager_delta_joins,
4915 enable_new_outer_join_lowering,
4916 enable_variadic_left_join_lowering,
4917 enable_letrec_fixpoint_analysis,
4918 enable_join_prioritize_arranged,
4919 enable_projection_pushdown_after_relation_cse,
4920 enable_less_reduce_in_eqprop: _,
4921 enable_dequadratic_eqprop_map: _,
4922 enable_eq_classes_withholding_errors: _,
4923 enable_fast_path_plan_insights: _,
4924 enable_cast_elimination: _,
4925 enable_case_literal_transform: _,
4926 enable_simplify_quantified_comparisons: _,
4927 enable_coalesce_case_transform: _,
4928 enable_will_distinct_propagation: _,
4929 } = optimizer_feature_overrides;
4930 let features_extracted = ClusterFeatureExtracted {
4932 seen: Default::default(),
4934 reoptimize_imported_views,
4935 enable_eager_delta_joins,
4936 enable_new_outer_join_lowering,
4937 enable_variadic_left_join_lowering,
4938 enable_letrec_fixpoint_analysis,
4939 enable_join_prioritize_arranged,
4940 enable_projection_pushdown_after_relation_cse,
4941 };
4942 let features = features_extracted.into_values(scx.catalog);
4943 let availability_zones = if availability_zones.is_empty() {
4944 None
4945 } else {
4946 Some(availability_zones)
4947 };
4948 let (introspection_interval, introspection_debugging) =
4949 unplan_compute_replica_config(compute);
4950 let replication_factor = match &schedule {
4953 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4954 ClusterScheduleOptionValue::Refresh { .. } => {
4955 soft_assert_or_log!(
4962 replication_factor <= 1,
4963 "replication factor, {replication_factor:?}, must be <= 1 with a refresh schedule"
4964 );
4965 None
4966 }
4967 };
4968 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4969 let options_extracted = ClusterOptionExtracted {
4970 seen: Default::default(),
4972 availability_zones,
4973 disk: None,
4974 introspection_debugging: Some(introspection_debugging),
4975 introspection_interval,
4976 managed: Some(true),
4977 replicas: None,
4978 replication_factor,
4979 size: Some(size),
4980 schedule: Some(schedule),
4981 workload_class,
4982 };
4983 let options = options_extracted.into_values(scx.catalog);
4984 let name = Ident::new_unchecked(name);
4985 Ok(CreateClusterStatement {
4986 name,
4987 options,
4988 features,
4989 })
4990 }
4991 CreateClusterVariant::Unmanaged(_) => {
4992 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4993 }
4994 }
4995}
4996
4997generate_extracted_config!(
4998 ReplicaOption,
4999 (AvailabilityZone, String),
5000 (BilledAs, String),
5001 (ComputeAddresses, Vec<String>),
5002 (ComputectlAddresses, Vec<String>),
5003 (Disk, bool),
5004 (Internal, bool, Default(false)),
5005 (IntrospectionDebugging, bool, Default(false)),
5006 (IntrospectionInterval, OptionalDuration),
5007 (Size, String),
5008 (StorageAddresses, Vec<String>),
5009 (StoragectlAddresses, Vec<String>),
5010 (Workers, u16)
5011);
5012
5013fn plan_replica_config(
5014 scx: &StatementContext,
5015 options: Vec<ReplicaOption<Aug>>,
5016) -> Result<ReplicaConfig, PlanError> {
5017 let ReplicaOptionExtracted {
5018 availability_zone,
5019 billed_as,
5020 computectl_addresses,
5021 disk,
5022 internal,
5023 introspection_debugging,
5024 introspection_interval,
5025 size,
5026 storagectl_addresses,
5027 ..
5028 }: ReplicaOptionExtracted = options.try_into()?;
5029
5030 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5031
5032 match (
5033 size,
5034 availability_zone,
5035 billed_as,
5036 storagectl_addresses,
5037 computectl_addresses,
5038 ) {
5039 (None, _, None, None, None) => {
5041 sql_bail!("SIZE option must be specified");
5044 }
5045 (Some(size), availability_zone, billed_as, None, None) => {
5046 if disk.is_some() {
5047 if scx.catalog.is_cluster_size_cc(&size) {
5051 sql_bail!(
5052 "DISK option not supported for modern cluster sizes because disk is always enabled"
5053 );
5054 }
5055
5056 scx.catalog
5057 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5058 }
5059
5060 Ok(ReplicaConfig::Orchestrated {
5061 size,
5062 availability_zone,
5063 compute,
5064 billed_as,
5065 internal,
5066 })
5067 }
5068
5069 (None, None, None, storagectl_addresses, computectl_addresses) => {
5070 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5071
5072 let Some(storagectl_addrs) = storagectl_addresses else {
5076 sql_bail!("missing STORAGECTL ADDRESSES option");
5077 };
5078 let Some(computectl_addrs) = computectl_addresses else {
5079 sql_bail!("missing COMPUTECTL ADDRESSES option");
5080 };
5081
5082 if storagectl_addrs.len() != computectl_addrs.len() {
5083 sql_bail!(
5084 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5085 );
5086 }
5087
5088 if disk.is_some() {
5089 sql_bail!("DISK can't be specified for unorchestrated clusters");
5090 }
5091
5092 Ok(ReplicaConfig::Unorchestrated {
5093 storagectl_addrs,
5094 computectl_addrs,
5095 compute,
5096 })
5097 }
5098 _ => {
5099 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5102 }
5103 }
5104}
5105
5106fn plan_compute_replica_config(
5110 introspection_interval: Option<OptionalDuration>,
5111 introspection_debugging: bool,
5112) -> Result<ComputeReplicaConfig, PlanError> {
5113 let introspection_interval = introspection_interval
5114 .map(|OptionalDuration(i)| i)
5115 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5116 let introspection = match introspection_interval {
5117 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5118 interval,
5119 debugging: introspection_debugging,
5120 }),
5121 None if introspection_debugging => {
5122 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5123 }
5124 None => None,
5125 };
5126 let compute = ComputeReplicaConfig { introspection };
5127 Ok(compute)
5128}
5129
5130fn unplan_compute_replica_config(
5134 compute_replica_config: ComputeReplicaConfig,
5135) -> (Option<OptionalDuration>, bool) {
5136 match compute_replica_config.introspection {
5137 Some(ComputeReplicaIntrospectionConfig {
5138 debugging,
5139 interval,
5140 }) => (Some(OptionalDuration(Some(interval))), debugging),
5141 None => (Some(OptionalDuration(None)), false),
5142 }
5143}
5144
5145fn plan_cluster_schedule(
5149 schedule: ClusterScheduleOptionValue,
5150) -> Result<ClusterSchedule, PlanError> {
5151 Ok(match schedule {
5152 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5153 ClusterScheduleOptionValue::Refresh {
5155 hydration_time_estimate: None,
5156 } => ClusterSchedule::Refresh {
5157 hydration_time_estimate: Duration::from_millis(0),
5158 },
5159 ClusterScheduleOptionValue::Refresh {
5161 hydration_time_estimate: Some(interval_value),
5162 } => {
5163 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5164 if interval.as_microseconds() < 0 {
5165 sql_bail!(
5166 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5167 interval
5168 );
5169 }
5170 if interval.months != 0 {
5171 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5175 }
5176 let duration = interval.duration()?;
5177 if u64::try_from(duration.as_millis()).is_err()
5178 || Interval::from_duration(&duration).is_err()
5179 {
5180 sql_bail!("HYDRATION TIME ESTIMATE too large");
5181 }
5182 ClusterSchedule::Refresh {
5183 hydration_time_estimate: duration,
5184 }
5185 }
5186 })
5187}
5188
5189fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5193 match schedule {
5194 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5195 ClusterSchedule::Refresh {
5196 hydration_time_estimate,
5197 } => {
5198 let interval = Interval::from_duration(&hydration_time_estimate)
5199 .expect("planning ensured that this is convertible back to Interval");
5200 let interval_value = literal::unplan_interval(&interval);
5201 ClusterScheduleOptionValue::Refresh {
5202 hydration_time_estimate: Some(interval_value),
5203 }
5204 }
5205 }
5206}
5207
5208pub fn describe_create_cluster_replica(
5209 _: &StatementContext,
5210 _: CreateClusterReplicaStatement<Aug>,
5211) -> Result<StatementDesc, PlanError> {
5212 Ok(StatementDesc::new(None))
5213}
5214
5215pub fn plan_create_cluster_replica(
5216 scx: &StatementContext,
5217 CreateClusterReplicaStatement {
5218 definition: ReplicaDefinition { name, options },
5219 of_cluster,
5220 }: CreateClusterReplicaStatement<Aug>,
5221) -> Result<Plan, PlanError> {
5222 let cluster = scx
5223 .catalog
5224 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5225
5226 let config = plan_replica_config(scx, options)?;
5227
5228 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5229 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5230 return Err(PlanError::MangedReplicaName(name.into_string()));
5231 }
5232 } else {
5233 ensure_cluster_is_not_managed(scx, cluster.id())?;
5234 }
5235
5236 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5237 name: normalize::ident(name),
5238 cluster_id: cluster.id(),
5239 config,
5240 }))
5241}
5242
5243pub fn describe_create_secret(
5244 _: &StatementContext,
5245 _: CreateSecretStatement<Aug>,
5246) -> Result<StatementDesc, PlanError> {
5247 Ok(StatementDesc::new(None))
5248}
5249
5250pub fn plan_create_secret(
5251 scx: &StatementContext,
5252 stmt: CreateSecretStatement<Aug>,
5253) -> Result<Plan, PlanError> {
5254 let CreateSecretStatement {
5255 name,
5256 if_not_exists,
5257 value,
5258 } = &stmt;
5259
5260 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5261 let mut create_sql_statement = stmt.clone();
5262 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5263 let create_sql =
5264 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5265 let secret_as = query::plan_secret_as(scx, value.clone())?;
5266
5267 let secret = Secret {
5268 create_sql,
5269 secret_as,
5270 };
5271
5272 Ok(Plan::CreateSecret(CreateSecretPlan {
5273 name,
5274 secret,
5275 if_not_exists: *if_not_exists,
5276 }))
5277}
5278
5279pub fn describe_create_connection(
5280 _: &StatementContext,
5281 _: CreateConnectionStatement<Aug>,
5282) -> Result<StatementDesc, PlanError> {
5283 Ok(StatementDesc::new(None))
5284}
5285
5286generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5287
5288pub fn plan_create_connection(
5289 scx: &StatementContext,
5290 mut stmt: CreateConnectionStatement<Aug>,
5291) -> Result<Plan, PlanError> {
5292 let CreateConnectionStatement {
5293 name,
5294 connection_type,
5295 values,
5296 if_not_exists,
5297 with_options,
5298 } = stmt.clone();
5299 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5300 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5301 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5302
5303 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5304 if options.validate.is_some() {
5305 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5306 }
5307 let validate = match options.validate {
5308 Some(val) => val,
5309 None => {
5310 scx.catalog
5311 .system_vars()
5312 .enable_default_connection_validation()
5313 && details.to_connection().validate_by_default()
5314 }
5315 };
5316
5317 let full_name = scx.catalog.resolve_full_name(&name);
5319 let partial_name = PartialItemName::from(full_name.clone());
5320 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5321 return Err(PlanError::ItemAlreadyExists {
5322 name: full_name.to_string(),
5323 item_type: item.item_type(),
5324 });
5325 }
5326
5327 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5330 stmt.values.retain(|v| {
5331 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5332 });
5333 stmt.values.push(ConnectionOption {
5334 name: ConnectionOptionName::PublicKey1,
5335 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5336 });
5337 stmt.values.push(ConnectionOption {
5338 name: ConnectionOptionName::PublicKey2,
5339 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5340 });
5341 }
5342 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5343
5344 let plan = CreateConnectionPlan {
5345 name,
5346 if_not_exists,
5347 connection: crate::plan::Connection {
5348 create_sql,
5349 details,
5350 },
5351 validate,
5352 };
5353 Ok(Plan::CreateConnection(plan))
5354}
5355
5356fn plan_drop_database(
5357 scx: &StatementContext,
5358 if_exists: bool,
5359 name: &UnresolvedDatabaseName,
5360 cascade: bool,
5361) -> Result<Option<DatabaseId>, PlanError> {
5362 Ok(match resolve_database(scx, name, if_exists)? {
5363 Some(database) => {
5364 if !cascade && database.has_schemas() {
5365 sql_bail!(
5366 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5367 name,
5368 );
5369 }
5370 Some(database.id())
5371 }
5372 None => None,
5373 })
5374}
5375
5376pub fn describe_drop_objects(
5377 _: &StatementContext,
5378 _: DropObjectsStatement,
5379) -> Result<StatementDesc, PlanError> {
5380 Ok(StatementDesc::new(None))
5381}
5382
5383pub fn plan_drop_objects(
5384 scx: &mut StatementContext,
5385 DropObjectsStatement {
5386 object_type,
5387 if_exists,
5388 names,
5389 cascade,
5390 }: DropObjectsStatement,
5391) -> Result<Plan, PlanError> {
5392 if object_type == mz_sql_parser::ast::ObjectType::Func {
5393 bail_unsupported!("DROP FUNCTION");
5394 }
5395 let object_type = object_type.into();
5396
5397 let mut referenced_ids = Vec::new();
5398 for name in names {
5399 let id = match &name {
5400 UnresolvedObjectName::Cluster(name) => {
5401 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5402 }
5403 UnresolvedObjectName::ClusterReplica(name) => {
5404 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5405 }
5406 UnresolvedObjectName::Database(name) => {
5407 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5408 }
5409 UnresolvedObjectName::Schema(name) => {
5410 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5411 }
5412 UnresolvedObjectName::Role(name) => {
5413 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5414 }
5415 UnresolvedObjectName::Item(name) => {
5416 plan_drop_item_name(scx, object_type, if_exists, name.clone())?.map(ObjectId::Item)
5420 }
5421 UnresolvedObjectName::NetworkPolicy(name) => {
5422 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5423 }
5424 };
5425 match id {
5426 Some(id) => referenced_ids.push(id),
5427 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5428 name: name.to_ast_string_simple(),
5429 object_type,
5430 }),
5431 }
5432 }
5433
5434 if !cascade {
5438 let dropped_items: BTreeSet<CatalogItemId> = referenced_ids
5439 .iter()
5440 .filter_map(|id| match id {
5441 ObjectId::Item(id) => Some(*id),
5442 _ => None,
5443 })
5444 .collect();
5445 for id in &dropped_items {
5446 let catalog_item = scx.catalog.get_item(id);
5447 ensure_no_blocking_dependents(scx, object_type, catalog_item, &dropped_items)?;
5448 }
5449 }
5450
5451 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5452
5453 Ok(Plan::DropObjects(DropObjectsPlan {
5454 referenced_ids,
5455 drop_ids,
5456 object_type,
5457 }))
5458}
5459
5460fn plan_drop_schema(
5461 scx: &StatementContext,
5462 if_exists: bool,
5463 name: &UnresolvedSchemaName,
5464 cascade: bool,
5465) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5466 let normalized = normalize::unresolved_schema_name(name.clone())?;
5470 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5471 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5472 }
5473
5474 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5475 Some((database_spec, schema_spec)) => {
5476 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5477 sql_bail!(
5478 "cannot drop schema {name} because it is required by the database system",
5479 );
5480 }
5481 if let SchemaSpecifier::Temporary = schema_spec {
5482 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5483 }
5484 let schema = scx.get_schema(&database_spec, &schema_spec);
5485 if !cascade && schema.has_items() {
5486 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5487 sql_bail!(
5488 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5489 full_schema_name
5490 );
5491 }
5492 Some((database_spec, schema_spec))
5493 }
5494 None => None,
5495 })
5496}
5497
5498fn plan_drop_role(
5499 scx: &StatementContext,
5500 if_exists: bool,
5501 name: &Ident,
5502) -> Result<Option<RoleId>, PlanError> {
5503 match scx.catalog.resolve_role(name.as_str()) {
5504 Ok(role) => {
5505 let id = role.id();
5506 if &id == scx.catalog.active_role_id() {
5507 sql_bail!("current role cannot be dropped");
5508 }
5509 for role in scx.catalog.get_roles() {
5510 for (member_id, grantor_id) in role.membership() {
5511 if &id == grantor_id {
5512 let member_role = scx.catalog.get_role(member_id);
5513 sql_bail!(
5514 "cannot drop role {}: still depended up by membership of role {} in role {}",
5515 name.as_str(),
5516 role.name(),
5517 member_role.name()
5518 );
5519 }
5520 }
5521 }
5522 Ok(Some(role.id()))
5523 }
5524 Err(_) if if_exists => Ok(None),
5525 Err(e) => Err(e.into()),
5526 }
5527}
5528
5529fn plan_drop_cluster(
5530 scx: &StatementContext,
5531 if_exists: bool,
5532 name: &Ident,
5533 cascade: bool,
5534) -> Result<Option<ClusterId>, PlanError> {
5535 Ok(match resolve_cluster(scx, name, if_exists)? {
5536 Some(cluster) => {
5537 if !cascade && !cluster.bound_objects().is_empty() {
5538 return Err(PlanError::DependentObjectsStillExist {
5539 object_type: "cluster".to_string(),
5540 object_name: cluster.name().to_string(),
5541 dependents: Vec::new(),
5542 });
5543 }
5544 Some(cluster.id())
5545 }
5546 None => None,
5547 })
5548}
5549
5550fn plan_drop_network_policy(
5551 scx: &StatementContext,
5552 if_exists: bool,
5553 name: &Ident,
5554) -> Result<Option<NetworkPolicyId>, PlanError> {
5555 match scx.catalog.resolve_network_policy(name.as_str()) {
5556 Ok(policy) => {
5557 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5560 Err(PlanError::NetworkPolicyInUse)
5561 } else {
5562 Ok(Some(policy.id()))
5563 }
5564 }
5565 Err(_) if if_exists => Ok(None),
5566 Err(e) => Err(e.into()),
5567 }
5568}
5569
5570fn plan_drop_cluster_replica(
5571 scx: &StatementContext,
5572 if_exists: bool,
5573 name: &QualifiedReplica,
5574) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5575 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5576 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5577}
5578
5579fn plan_drop_item(
5581 scx: &StatementContext,
5582 object_type: ObjectType,
5583 if_exists: bool,
5584 name: UnresolvedItemName,
5585 cascade: bool,
5586) -> Result<Option<CatalogItemId>, PlanError> {
5587 let Some(id) = plan_drop_item_name(scx, object_type, if_exists, name)? else {
5588 return Ok(None);
5589 };
5590 if !cascade {
5591 let catalog_item = scx.catalog.get_item(&id);
5592 ensure_no_blocking_dependents(scx, object_type, catalog_item, &BTreeSet::new())?;
5593 }
5594 Ok(Some(id))
5595}
5596
5597fn plan_drop_item_name(
5601 scx: &StatementContext,
5602 object_type: ObjectType,
5603 if_exists: bool,
5604 name: UnresolvedItemName,
5605) -> Result<Option<CatalogItemId>, PlanError> {
5606 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5607 Ok(r) => r,
5608 Err(PlanError::MismatchedObjectType {
5610 name,
5611 is_type: ObjectType::MaterializedView,
5612 expected_type: ObjectType::View,
5613 }) => {
5614 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5615 }
5616 e => e?,
5617 };
5618
5619 Ok(match resolved {
5620 Some(catalog_item) => {
5621 if catalog_item.id().is_system() {
5622 sql_bail!(
5623 "cannot drop {} {} because it is required by the database system",
5624 catalog_item.item_type(),
5625 scx.catalog.minimal_qualification(catalog_item.name()),
5626 );
5627 }
5628 Some(catalog_item.id())
5629 }
5630 None => None,
5631 })
5632}
5633
5634fn ensure_no_blocking_dependents(
5639 scx: &StatementContext,
5640 object_type: ObjectType,
5641 catalog_item: &dyn CatalogItem,
5642 also_dropped: &BTreeSet<CatalogItemId>,
5643) -> Result<(), PlanError> {
5644 for id in catalog_item.used_by() {
5645 if also_dropped.contains(id) {
5646 continue;
5647 }
5648 let dep = scx.catalog.get_item(id);
5649 if dependency_prevents_drop(object_type, dep) {
5650 return Err(PlanError::DependentObjectsStillExist {
5651 object_type: catalog_item.item_type().to_string(),
5652 object_name: scx
5653 .catalog
5654 .minimal_qualification(catalog_item.name())
5655 .to_string(),
5656 dependents: vec![(
5657 dep.item_type().to_string(),
5658 scx.catalog.minimal_qualification(dep.name()).to_string(),
5659 )],
5660 });
5661 }
5662 }
5663 Ok(())
5666}
5667
5668fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5670 match object_type {
5671 ObjectType::Type => true,
5672 ObjectType::Table
5673 | ObjectType::View
5674 | ObjectType::MaterializedView
5675 | ObjectType::Source
5676 | ObjectType::Sink
5677 | ObjectType::Index
5678 | ObjectType::Role
5679 | ObjectType::Cluster
5680 | ObjectType::ClusterReplica
5681 | ObjectType::Secret
5682 | ObjectType::Connection
5683 | ObjectType::Database
5684 | ObjectType::Schema
5685 | ObjectType::Func
5686 | ObjectType::NetworkPolicy => match dep.item_type() {
5687 CatalogItemType::Func
5688 | CatalogItemType::Table
5689 | CatalogItemType::Source
5690 | CatalogItemType::View
5691 | CatalogItemType::MaterializedView
5692 | CatalogItemType::Sink
5693 | CatalogItemType::Type
5694 | CatalogItemType::Secret
5695 | CatalogItemType::Connection => true,
5696 CatalogItemType::Index => false,
5697 },
5698 }
5699}
5700
5701pub fn describe_alter_index_options(
5702 _: &StatementContext,
5703 _: AlterIndexStatement<Aug>,
5704) -> Result<StatementDesc, PlanError> {
5705 Ok(StatementDesc::new(None))
5706}
5707
5708pub fn describe_drop_owned(
5709 _: &StatementContext,
5710 _: DropOwnedStatement<Aug>,
5711) -> Result<StatementDesc, PlanError> {
5712 Ok(StatementDesc::new(None))
5713}
5714
5715pub fn plan_drop_owned(
5716 scx: &StatementContext,
5717 drop: DropOwnedStatement<Aug>,
5718) -> Result<Plan, PlanError> {
5719 let cascade = drop.cascade();
5720 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5721 let mut drop_ids = Vec::new();
5722 let mut privilege_revokes = Vec::new();
5723 let mut default_privilege_revokes = Vec::new();
5724
5725 fn update_privilege_revokes(
5726 object_id: SystemObjectId,
5727 privileges: &PrivilegeMap,
5728 role_ids: &BTreeSet<RoleId>,
5729 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5730 ) {
5731 privilege_revokes.extend(iter::zip(
5732 iter::repeat(object_id),
5733 privileges
5734 .all_values()
5735 .filter(|privilege| role_ids.contains(&privilege.grantee))
5736 .cloned(),
5737 ));
5738 }
5739
5740 for replica in scx.catalog.get_cluster_replicas() {
5742 if role_ids.contains(&replica.owner_id()) {
5743 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5744 }
5745 }
5746
5747 for cluster in scx.catalog.get_clusters() {
5749 if role_ids.contains(&cluster.owner_id()) {
5750 if !cascade {
5752 let non_owned_bound_objects: Vec<_> = cluster
5753 .bound_objects()
5754 .into_iter()
5755 .map(|item_id| scx.catalog.get_item(item_id))
5756 .filter(|item| !role_ids.contains(&item.owner_id()))
5757 .collect();
5758 if !non_owned_bound_objects.is_empty() {
5759 let names: Vec<_> = non_owned_bound_objects
5760 .into_iter()
5761 .map(|item| {
5762 (
5763 item.item_type().to_string(),
5764 scx.catalog.resolve_full_name(item.name()).to_string(),
5765 )
5766 })
5767 .collect();
5768 return Err(PlanError::DependentObjectsStillExist {
5769 object_type: "cluster".to_string(),
5770 object_name: cluster.name().to_string(),
5771 dependents: names,
5772 });
5773 }
5774 }
5775 drop_ids.push(cluster.id().into());
5776 }
5777 update_privilege_revokes(
5778 SystemObjectId::Object(cluster.id().into()),
5779 cluster.privileges(),
5780 &role_ids,
5781 &mut privilege_revokes,
5782 );
5783 }
5784
5785 for item in scx.catalog.get_items() {
5787 if role_ids.contains(&item.owner_id()) {
5788 if !cascade {
5789 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5791 let non_owned_dependencies: Vec<_> = used_by
5792 .into_iter()
5793 .map(|item_id| scx.catalog.get_item(item_id))
5794 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5795 .filter(|item| !role_ids.contains(&item.owner_id()))
5796 .collect();
5797 if !non_owned_dependencies.is_empty() {
5798 let names: Vec<_> = non_owned_dependencies
5799 .into_iter()
5800 .map(|item| {
5801 let item_typ = item.item_type().to_string();
5802 let item_name =
5803 scx.catalog.resolve_full_name(item.name()).to_string();
5804 (item_typ, item_name)
5805 })
5806 .collect();
5807 Err(PlanError::DependentObjectsStillExist {
5808 object_type: item.item_type().to_string(),
5809 object_name: scx
5810 .catalog
5811 .resolve_full_name(item.name())
5812 .to_string()
5813 .to_string(),
5814 dependents: names,
5815 })
5816 } else {
5817 Ok(())
5818 }
5819 };
5820
5821 if let Some(id) = item.progress_id() {
5824 let progress_item = scx.catalog.get_item(&id);
5825 check_if_dependents_exist(progress_item.used_by())?;
5826 }
5827 check_if_dependents_exist(item.used_by())?;
5828 }
5829 drop_ids.push(item.id().into());
5830 }
5831 update_privilege_revokes(
5832 SystemObjectId::Object(item.id().into()),
5833 item.privileges(),
5834 &role_ids,
5835 &mut privilege_revokes,
5836 );
5837 }
5838
5839 for schema in scx.catalog.get_schemas() {
5841 if !schema.id().is_temporary() {
5842 if role_ids.contains(&schema.owner_id()) {
5843 if !cascade {
5844 let non_owned_dependencies: Vec<_> = schema
5845 .item_ids()
5846 .map(|item_id| scx.catalog.get_item(&item_id))
5847 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5848 .filter(|item| !role_ids.contains(&item.owner_id()))
5849 .collect();
5850 if !non_owned_dependencies.is_empty() {
5851 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5852 sql_bail!(
5853 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5854 full_schema_name.to_string().quoted()
5855 );
5856 }
5857 }
5858 drop_ids.push((*schema.database(), *schema.id()).into())
5859 }
5860 update_privilege_revokes(
5861 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5862 schema.privileges(),
5863 &role_ids,
5864 &mut privilege_revokes,
5865 );
5866 }
5867 }
5868
5869 for database in scx.catalog.get_databases() {
5871 if role_ids.contains(&database.owner_id()) {
5872 if !cascade {
5873 let non_owned_schemas: Vec<_> = database
5874 .schemas()
5875 .into_iter()
5876 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5877 .collect();
5878 if !non_owned_schemas.is_empty() {
5879 sql_bail!(
5880 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5881 database.name().quoted(),
5882 );
5883 }
5884 }
5885 drop_ids.push(database.id().into());
5886 }
5887 update_privilege_revokes(
5888 SystemObjectId::Object(database.id().into()),
5889 database.privileges(),
5890 &role_ids,
5891 &mut privilege_revokes,
5892 );
5893 }
5894
5895 for network_policy in scx.catalog.get_network_policies() {
5897 if role_ids.contains(&network_policy.owner_id()) {
5898 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5899 }
5900 update_privilege_revokes(
5901 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5902 network_policy.privileges(),
5903 &role_ids,
5904 &mut privilege_revokes,
5905 );
5906 }
5907
5908 update_privilege_revokes(
5910 SystemObjectId::System,
5911 scx.catalog.get_system_privileges(),
5912 &role_ids,
5913 &mut privilege_revokes,
5914 );
5915
5916 for (default_privilege_object, default_privilege_acl_items) in
5917 scx.catalog.get_default_privileges()
5918 {
5919 for default_privilege_acl_item in default_privilege_acl_items {
5920 if role_ids.contains(&default_privilege_object.role_id)
5921 || role_ids.contains(&default_privilege_acl_item.grantee)
5922 {
5923 default_privilege_revokes.push((
5924 default_privilege_object.clone(),
5925 default_privilege_acl_item.clone(),
5926 ));
5927 }
5928 }
5929 }
5930
5931 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5932
5933 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5934 if !system_ids.is_empty() {
5935 let mut owners = system_ids
5936 .into_iter()
5937 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5938 .collect::<BTreeSet<_>>()
5939 .into_iter()
5940 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5941 sql_bail!(
5942 "cannot drop objects owned by role {} because they are required by the database system",
5943 owners.join(", "),
5944 );
5945 }
5946
5947 Ok(Plan::DropOwned(DropOwnedPlan {
5948 role_ids: role_ids.into_iter().collect(),
5949 drop_ids,
5950 privilege_revokes,
5951 default_privilege_revokes,
5952 }))
5953}
5954
5955fn plan_retain_history_option(
5956 scx: &StatementContext,
5957 retain_history: Option<OptionalDuration>,
5958) -> Result<Option<CompactionWindow>, PlanError> {
5959 if let Some(OptionalDuration(lcw)) = retain_history {
5960 Ok(Some(plan_retain_history(scx, lcw)?))
5961 } else {
5962 Ok(None)
5963 }
5964}
5965
5966fn plan_retain_history(
5972 scx: &StatementContext,
5973 lcw: Option<Duration>,
5974) -> Result<CompactionWindow, PlanError> {
5975 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5976 match lcw {
5977 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5982 option_name: "RETAIN HISTORY".to_string(),
5983 err: Box::new(PlanError::Unstructured(
5984 "internal error: unexpectedly zero".to_string(),
5985 )),
5986 }),
5987 Some(duration) => {
5988 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5991 && scx
5992 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5993 .is_err()
5994 {
5995 return Err(PlanError::RetainHistoryLow {
5996 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5997 });
5998 }
5999 Ok(duration.try_into()?)
6000 }
6001 None => {
6004 if scx
6005 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6006 .is_err()
6007 {
6008 Err(PlanError::RetainHistoryRequired)
6009 } else {
6010 Ok(CompactionWindow::DisableCompaction)
6011 }
6012 }
6013 }
6014}
6015
6016generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6017
6018fn plan_index_options(
6019 scx: &StatementContext,
6020 with_opts: Vec<IndexOption<Aug>>,
6021) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6022 if !with_opts.is_empty() {
6023 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6025 }
6026
6027 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6028
6029 let mut out = Vec::with_capacity(1);
6030 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6031 out.push(crate::plan::IndexOption::RetainHistory(cw));
6032 }
6033 Ok(out)
6034}
6035
6036generate_extracted_config!(
6037 TableOption,
6038 (PartitionBy, Vec<Ident>),
6039 (RetainHistory, OptionalDuration),
6040 (RedactedTest, String)
6041);
6042
6043fn plan_table_options(
6044 scx: &StatementContext,
6045 desc: &RelationDesc,
6046 with_opts: Vec<TableOption<Aug>>,
6047) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6048 let TableOptionExtracted {
6049 partition_by,
6050 retain_history,
6051 redacted_test,
6052 ..
6053 }: TableOptionExtracted = with_opts.try_into()?;
6054
6055 if let Some(partition_by) = partition_by {
6056 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6057 check_partition_by(desc, partition_by)?;
6058 }
6059
6060 if redacted_test.is_some() {
6061 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6062 }
6063
6064 let mut out = Vec::with_capacity(1);
6065 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6066 out.push(crate::plan::TableOption::RetainHistory(cw));
6067 }
6068 Ok(out)
6069}
6070
6071pub fn plan_alter_index_options(
6072 scx: &mut StatementContext,
6073 AlterIndexStatement {
6074 index_name,
6075 if_exists,
6076 action,
6077 }: AlterIndexStatement<Aug>,
6078) -> Result<Plan, PlanError> {
6079 let object_type = ObjectType::Index;
6080 match action {
6081 AlterIndexAction::ResetOptions(options) => {
6082 let mut options = options.into_iter();
6083 if let Some(opt) = options.next() {
6084 match opt {
6085 IndexOptionName::RetainHistory => {
6086 if options.next().is_some() {
6087 sql_bail!("RETAIN HISTORY must be only option");
6088 }
6089 return alter_retain_history(
6090 scx,
6091 object_type,
6092 if_exists,
6093 UnresolvedObjectName::Item(index_name),
6094 None,
6095 );
6096 }
6097 }
6098 }
6099 sql_bail!("expected option");
6100 }
6101 AlterIndexAction::SetOptions(options) => {
6102 let mut options = options.into_iter();
6103 if let Some(opt) = options.next() {
6104 match opt.name {
6105 IndexOptionName::RetainHistory => {
6106 if options.next().is_some() {
6107 sql_bail!("RETAIN HISTORY must be only option");
6108 }
6109 return alter_retain_history(
6110 scx,
6111 object_type,
6112 if_exists,
6113 UnresolvedObjectName::Item(index_name),
6114 opt.value,
6115 );
6116 }
6117 }
6118 }
6119 sql_bail!("expected option");
6120 }
6121 }
6122}
6123
6124pub fn describe_alter_cluster_set_options(
6125 _: &StatementContext,
6126 _: AlterClusterStatement<Aug>,
6127) -> Result<StatementDesc, PlanError> {
6128 Ok(StatementDesc::new(None))
6129}
6130
6131pub fn plan_alter_cluster(
6132 scx: &mut StatementContext,
6133 AlterClusterStatement {
6134 name,
6135 action,
6136 if_exists,
6137 }: AlterClusterStatement<Aug>,
6138) -> Result<Plan, PlanError> {
6139 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6140 Some(entry) => entry,
6141 None => {
6142 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6143 name: name.to_ast_string_simple(),
6144 object_type: ObjectType::Cluster,
6145 });
6146
6147 return Ok(Plan::AlterNoop(AlterNoopPlan {
6148 object_type: ObjectType::Cluster,
6149 }));
6150 }
6151 };
6152
6153 let mut options: PlanClusterOption = Default::default();
6154 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6155
6156 match action {
6157 AlterClusterAction::SetOptions {
6158 options: set_options,
6159 with_options,
6160 } => {
6161 let ClusterOptionExtracted {
6162 availability_zones,
6163 introspection_debugging,
6164 introspection_interval,
6165 managed,
6166 replicas: replica_defs,
6167 replication_factor,
6168 seen: _,
6169 size,
6170 disk,
6171 schedule,
6172 workload_class,
6173 }: ClusterOptionExtracted = set_options.try_into()?;
6174
6175 if !scx.catalog.active_role_id().is_system() {
6176 if workload_class.is_some() {
6177 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6178 }
6179 }
6180
6181 match managed.unwrap_or_else(|| cluster.is_managed()) {
6182 true => {
6183 let alter_strategy_extracted =
6184 ClusterAlterOptionExtracted::try_from(with_options)?;
6185 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6186
6187 match alter_strategy {
6188 AlterClusterPlanStrategy::None => {}
6189 _ => {
6190 scx.require_feature_flag(
6191 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6192 )?;
6193 }
6194 }
6195
6196 if replica_defs.is_some() {
6197 sql_bail!("REPLICAS not supported for managed clusters");
6198 }
6199 if schedule.is_some()
6200 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6201 {
6202 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6203
6204 if replication_factor.is_none()
6213 && cluster.replication_factor().is_some_and(|rf| rf > 1)
6214 {
6215 sql_bail!(
6216 "SCHEDULE cannot be set to anything other than MANUAL while the \
6217 cluster's REPLICATION FACTOR is greater than 1; \
6218 set the REPLICATION FACTOR to 1 first"
6219 );
6220 }
6221 }
6222
6223 if replication_factor.is_some() {
6224 if schedule.is_some()
6225 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6226 {
6227 sql_bail!(
6228 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6229 );
6230 }
6231 if let Some(current_schedule) = cluster.schedule() {
6232 if !matches!(current_schedule, ClusterSchedule::Manual) {
6233 sql_bail!(
6234 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6235 );
6236 }
6237 }
6238 }
6239 }
6240 false => {
6241 if !alter_strategy.is_none() {
6242 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6243 }
6244 if availability_zones.is_some() {
6245 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6246 }
6247 if replication_factor.is_some() {
6248 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6249 }
6250 if introspection_debugging.is_some() {
6251 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6252 }
6253 if introspection_interval.is_some() {
6254 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6255 }
6256 if size.is_some() {
6257 sql_bail!("SIZE not supported for unmanaged clusters");
6258 }
6259 if disk.is_some() {
6260 sql_bail!("DISK not supported for unmanaged clusters");
6261 }
6262 if schedule.is_some()
6263 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6264 {
6265 sql_bail!(
6266 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6267 );
6268 }
6269 if let Some(current_schedule) = cluster.schedule() {
6270 if !matches!(current_schedule, ClusterSchedule::Manual)
6271 && schedule.is_none()
6272 {
6273 sql_bail!(
6274 "when switching a cluster to unmanaged, if the managed \
6275 cluster's SCHEDULE is anything other than MANUAL, you have to \
6276 explicitly set the SCHEDULE to MANUAL"
6277 );
6278 }
6279 }
6280 }
6281 }
6282
6283 let mut replicas = vec![];
6284 for ReplicaDefinition { name, options } in
6285 replica_defs.into_iter().flat_map(Vec::into_iter)
6286 {
6287 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6288 }
6289
6290 if let Some(managed) = managed {
6291 options.managed = AlterOptionParameter::Set(managed);
6292 }
6293 if let Some(replication_factor) = replication_factor {
6294 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6295 }
6296 if let Some(size) = &size {
6297 options.size = AlterOptionParameter::Set(size.clone());
6298 }
6299 if let Some(availability_zones) = availability_zones {
6300 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6301 }
6302 if let Some(introspection_debugging) = introspection_debugging {
6303 options.introspection_debugging =
6304 AlterOptionParameter::Set(introspection_debugging);
6305 }
6306 if let Some(introspection_interval) = introspection_interval {
6307 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6308 }
6309 if disk.is_some() {
6310 let size = match size.as_deref() {
6314 Some(s) => s,
6315 None => cluster
6316 .managed_size()
6317 .ok_or_else(|| sql_err!("cluster is not managed"))?,
6318 };
6319 if scx.catalog.is_cluster_size_cc(size) {
6320 sql_bail!(
6321 "DISK option not supported for modern cluster sizes because disk is always enabled"
6322 );
6323 }
6324
6325 scx.catalog
6326 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6327 }
6328 if !replicas.is_empty() {
6329 options.replicas = AlterOptionParameter::Set(replicas);
6330 }
6331 if let Some(schedule) = schedule {
6332 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6333 }
6334 if let Some(workload_class) = workload_class {
6335 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6336 }
6337 }
6338 AlterClusterAction::ResetOptions(reset_options) => {
6339 use AlterOptionParameter::Reset;
6340 use ClusterOptionName::*;
6341
6342 if !scx.catalog.active_role_id().is_system() {
6343 if reset_options.contains(&WorkloadClass) {
6344 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6345 }
6346 }
6347
6348 for option in reset_options {
6349 match option {
6350 AvailabilityZones => options.availability_zones = Reset,
6351 Disk => scx
6352 .catalog
6353 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6354 IntrospectionInterval => options.introspection_interval = Reset,
6355 IntrospectionDebugging => options.introspection_debugging = Reset,
6356 Managed => options.managed = Reset,
6357 Replicas => options.replicas = Reset,
6358 ReplicationFactor => options.replication_factor = Reset,
6359 Size => options.size = Reset,
6360 Schedule => options.schedule = Reset,
6361 WorkloadClass => options.workload_class = Reset,
6362 }
6363 }
6364 }
6365 }
6366 Ok(Plan::AlterCluster(AlterClusterPlan {
6367 id: cluster.id(),
6368 name: cluster.name().to_string(),
6369 options,
6370 strategy: alter_strategy,
6371 }))
6372}
6373
6374pub fn describe_alter_set_cluster(
6375 _: &StatementContext,
6376 _: AlterSetClusterStatement<Aug>,
6377) -> Result<StatementDesc, PlanError> {
6378 Ok(StatementDesc::new(None))
6379}
6380
6381pub fn plan_alter_item_set_cluster(
6382 scx: &StatementContext,
6383 AlterSetClusterStatement {
6384 if_exists,
6385 set_cluster: in_cluster_name,
6386 name,
6387 object_type,
6388 }: AlterSetClusterStatement<Aug>,
6389) -> Result<Plan, PlanError> {
6390 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6391
6392 let object_type = object_type.into();
6393
6394 match object_type {
6396 ObjectType::MaterializedView => {}
6397 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6398 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6399 }
6400 ObjectType::Table
6401 | ObjectType::View
6402 | ObjectType::Type
6403 | ObjectType::Role
6404 | ObjectType::Cluster
6405 | ObjectType::ClusterReplica
6406 | ObjectType::Secret
6407 | ObjectType::Connection
6408 | ObjectType::Database
6409 | ObjectType::Schema
6410 | ObjectType::Func
6411 | ObjectType::NetworkPolicy => {
6412 bail_never_supported!(
6413 format!("ALTER {object_type} SET CLUSTER"),
6414 "sql/alter-set-cluster/",
6415 format!("{object_type} has no associated cluster")
6416 )
6417 }
6418 }
6419
6420 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6421
6422 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6423 Some(entry) => {
6424 let current_cluster = entry.cluster_id();
6425 let Some(current_cluster) = current_cluster else {
6426 sql_bail!("No cluster associated with {name}");
6427 };
6428
6429 if current_cluster == in_cluster.id() {
6430 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6431 } else {
6432 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6433 id: entry.id(),
6434 set_cluster: in_cluster.id(),
6435 }))
6436 }
6437 }
6438 None => {
6439 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6440 name: name.to_ast_string_simple(),
6441 object_type,
6442 });
6443
6444 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6445 }
6446 }
6447}
6448
6449pub fn describe_alter_object_rename(
6450 _: &StatementContext,
6451 _: AlterObjectRenameStatement,
6452) -> Result<StatementDesc, PlanError> {
6453 Ok(StatementDesc::new(None))
6454}
6455
6456pub fn plan_alter_object_rename(
6457 scx: &mut StatementContext,
6458 AlterObjectRenameStatement {
6459 name,
6460 object_type,
6461 to_item_name,
6462 if_exists,
6463 }: AlterObjectRenameStatement,
6464) -> Result<Plan, PlanError> {
6465 let object_type = object_type.into();
6466 match (object_type, name) {
6467 (
6468 ObjectType::View
6469 | ObjectType::MaterializedView
6470 | ObjectType::Table
6471 | ObjectType::Source
6472 | ObjectType::Index
6473 | ObjectType::Sink
6474 | ObjectType::Secret
6475 | ObjectType::Connection,
6476 UnresolvedObjectName::Item(name),
6477 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6478 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6479 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6480 }
6481 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6482 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6483 }
6484 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6485 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6486 }
6487 (object_type, name) => {
6488 bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6491 }
6492 }
6493}
6494
6495pub fn plan_alter_schema_rename(
6496 scx: &mut StatementContext,
6497 name: UnresolvedSchemaName,
6498 to_schema_name: Ident,
6499 if_exists: bool,
6500) -> Result<Plan, PlanError> {
6501 let normalized = normalize::unresolved_schema_name(name.clone())?;
6505 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6506 sql_bail!(
6507 "cannot rename schemas in the ambient database: {:?}",
6508 mz_repr::namespaces::MZ_TEMP_SCHEMA
6509 );
6510 }
6511
6512 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6513 let object_type = ObjectType::Schema;
6514 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6515 name: name.to_ast_string_simple(),
6516 object_type,
6517 });
6518 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6519 };
6520
6521 if scx
6523 .resolve_schema_in_database(&db_spec, &to_schema_name)
6524 .is_ok()
6525 {
6526 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6527 to_schema_name.clone().into_string(),
6528 )));
6529 }
6530
6531 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6533 if schema.id().is_system() {
6534 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6535 }
6536
6537 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6538 cur_schema_spec: (db_spec, schema_spec),
6539 new_schema_name: to_schema_name.into_string(),
6540 }))
6541}
6542
6543pub fn plan_alter_schema_swap<F>(
6544 scx: &mut StatementContext,
6545 name_a: UnresolvedSchemaName,
6546 name_b: Ident,
6547 if_exists: bool,
6548 gen_temp_suffix: F,
6549) -> Result<Plan, PlanError>
6550where
6551 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6552{
6553 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6557 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6558 {
6559 sql_bail!("cannot swap schemas that are in the ambient database");
6560 }
6561 let name_b_str = normalize::ident_ref(&name_b);
6563 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6564 sql_bail!("cannot swap schemas that are in the ambient database");
6565 }
6566
6567 let schema_a = match scx.resolve_schema(name_a.clone()) {
6568 Ok(schema) => schema,
6569 Err(_) if if_exists => {
6570 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6571 name: name_a.to_ast_string_simple(),
6572 object_type: ObjectType::Schema,
6573 });
6574 return Ok(Plan::AlterNoop(AlterNoopPlan {
6575 object_type: ObjectType::Schema,
6576 }));
6577 }
6578 Err(e) => return Err(e),
6579 };
6580
6581 let db_spec = schema_a.database().clone();
6582 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6583 sql_bail!("cannot swap schemas that are in the ambient database");
6584 };
6585 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6586
6587 if schema_a.id().is_system() || schema_b.id().is_system() {
6589 bail_never_supported!("swapping a system schema".to_string())
6590 }
6591
6592 const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6596 let check = |temp_suffix: &str| {
6597 let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6598 temp_name.append_lossy(temp_suffix);
6599 scx.resolve_schema_in_database(&db_spec, &temp_name)
6600 .is_err()
6601 };
6602 let temp_suffix = gen_temp_suffix(&check)?;
6603 let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6604
6605 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6606 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6607 schema_a_name: schema_a.name().schema.to_string(),
6608 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6609 schema_b_name: schema_b.name().schema.to_string(),
6610 name_temp,
6611 }))
6612}
6613
6614pub fn plan_alter_item_rename(
6615 scx: &mut StatementContext,
6616 object_type: ObjectType,
6617 name: UnresolvedItemName,
6618 to_item_name: Ident,
6619 if_exists: bool,
6620) -> Result<Plan, PlanError> {
6621 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6622 Ok(r) => r,
6623 Err(PlanError::MismatchedObjectType {
6625 name,
6626 is_type: ObjectType::MaterializedView,
6627 expected_type: ObjectType::View,
6628 }) => {
6629 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6630 }
6631 e => e?,
6632 };
6633
6634 match resolved {
6635 Some(entry) => {
6636 let full_name = scx.catalog.resolve_full_name(entry.name());
6637 let item_type = entry.item_type();
6638
6639 let proposed_name = QualifiedItemName {
6640 qualifiers: entry.name().qualifiers.clone(),
6641 item: to_item_name.clone().into_string(),
6642 };
6643
6644 let conflicting_type_exists;
6648 let conflicting_item_exists;
6649 if item_type == CatalogItemType::Type {
6650 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6651 conflicting_item_exists = scx
6652 .catalog
6653 .get_item_by_name(&proposed_name)
6654 .map(|item| item.item_type().conflicts_with_type())
6655 .unwrap_or(false);
6656 } else {
6657 conflicting_type_exists = item_type.conflicts_with_type()
6658 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6659 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6660 };
6661 if conflicting_type_exists || conflicting_item_exists {
6662 sql_bail!("catalog item '{}' already exists", to_item_name);
6663 }
6664
6665 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6666 id: entry.id(),
6667 current_full_name: full_name,
6668 to_name: normalize::ident(to_item_name),
6669 object_type,
6670 }))
6671 }
6672 None => {
6673 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6674 name: name.to_ast_string_simple(),
6675 object_type,
6676 });
6677
6678 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6679 }
6680 }
6681}
6682
6683pub fn plan_alter_cluster_rename(
6684 scx: &mut StatementContext,
6685 object_type: ObjectType,
6686 name: Ident,
6687 to_name: Ident,
6688 if_exists: bool,
6689) -> Result<Plan, PlanError> {
6690 match resolve_cluster(scx, &name, if_exists)? {
6691 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6692 id: entry.id(),
6693 name: entry.name().to_string(),
6694 to_name: ident(to_name),
6695 })),
6696 None => {
6697 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6698 name: name.to_ast_string_simple(),
6699 object_type,
6700 });
6701
6702 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6703 }
6704 }
6705}
6706
6707pub fn plan_alter_cluster_swap<F>(
6708 scx: &mut StatementContext,
6709 name_a: Ident,
6710 name_b: Ident,
6711 if_exists: bool,
6712 gen_temp_suffix: F,
6713) -> Result<Plan, PlanError>
6714where
6715 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6716{
6717 let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6718 Ok(cluster) => cluster,
6719 Err(_) if if_exists => {
6720 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6721 name: name_a.to_ast_string_simple(),
6722 object_type: ObjectType::Cluster,
6723 });
6724 return Ok(Plan::AlterNoop(AlterNoopPlan {
6725 object_type: ObjectType::Cluster,
6726 }));
6727 }
6728 Err(e) => return Err(e),
6729 };
6730 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6731
6732 const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6733 let check = |temp_suffix: &str| {
6734 let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6735 temp_name.append_lossy(temp_suffix);
6736 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6737 Err(CatalogError::UnknownCluster(_)) => true,
6739 Ok(_) | Err(_) => false,
6741 }
6742 };
6743 let temp_suffix = gen_temp_suffix(&check)?;
6744 let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6745
6746 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6747 id_a: cluster_a.id(),
6748 id_b: cluster_b.id(),
6749 name_a: name_a.into_string(),
6750 name_b: name_b.into_string(),
6751 name_temp,
6752 }))
6753}
6754
6755pub fn plan_alter_cluster_replica_rename(
6756 scx: &mut StatementContext,
6757 object_type: ObjectType,
6758 name: QualifiedReplica,
6759 to_item_name: Ident,
6760 if_exists: bool,
6761) -> Result<Plan, PlanError> {
6762 match resolve_cluster_replica(scx, &name, if_exists)? {
6763 Some((cluster, replica)) => {
6764 ensure_cluster_is_not_managed(scx, cluster.id())?;
6765 Ok(Plan::AlterClusterReplicaRename(
6766 AlterClusterReplicaRenamePlan {
6767 cluster_id: cluster.id(),
6768 replica_id: replica,
6769 name: QualifiedReplica {
6770 cluster: Ident::new(cluster.name())?,
6771 replica: name.replica,
6772 },
6773 to_name: normalize::ident(to_item_name),
6774 },
6775 ))
6776 }
6777 None => {
6778 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6779 name: name.to_ast_string_simple(),
6780 object_type,
6781 });
6782
6783 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6784 }
6785 }
6786}
6787
6788pub fn describe_alter_object_swap(
6789 _: &StatementContext,
6790 _: AlterObjectSwapStatement,
6791) -> Result<StatementDesc, PlanError> {
6792 Ok(StatementDesc::new(None))
6793}
6794
6795pub fn plan_alter_object_swap(
6796 scx: &mut StatementContext,
6797 stmt: AlterObjectSwapStatement,
6798) -> Result<Plan, PlanError> {
6799 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6800
6801 let AlterObjectSwapStatement {
6802 object_type,
6803 if_exists,
6804 name_a,
6805 name_b,
6806 } = stmt;
6807 let object_type = object_type.into();
6808
6809 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6811 let mut attempts = 0;
6812 let name_temp = loop {
6813 attempts += 1;
6814 if attempts > 10 {
6815 tracing::warn!("Unable to generate temp id for swapping");
6816 sql_bail!("unable to swap!");
6817 }
6818
6819 let short_id = mz_ore::id_gen::temp_id();
6821 if check_fn(&short_id) {
6822 break short_id;
6823 }
6824 };
6825
6826 Ok(name_temp)
6827 };
6828
6829 match (object_type, name_a, name_b) {
6830 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6831 plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6832 }
6833 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6834 plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6835 }
6836 (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6837 bail_internal!("name type does not match object type for ALTER SWAP")
6838 }
6839 (
6840 ObjectType::Table
6841 | ObjectType::View
6842 | ObjectType::MaterializedView
6843 | ObjectType::Source
6844 | ObjectType::Sink
6845 | ObjectType::Index
6846 | ObjectType::Type
6847 | ObjectType::Role
6848 | ObjectType::ClusterReplica
6849 | ObjectType::Secret
6850 | ObjectType::Connection
6851 | ObjectType::Database
6852 | ObjectType::Func
6853 | ObjectType::NetworkPolicy,
6854 _,
6855 _,
6856 ) => Err(PlanError::Unsupported {
6857 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6858 discussion_no: None,
6859 }),
6860 }
6861}
6862
6863pub fn describe_alter_retain_history(
6864 _: &StatementContext,
6865 _: AlterRetainHistoryStatement<Aug>,
6866) -> Result<StatementDesc, PlanError> {
6867 Ok(StatementDesc::new(None))
6868}
6869
6870pub fn plan_alter_retain_history(
6871 scx: &StatementContext,
6872 AlterRetainHistoryStatement {
6873 object_type,
6874 if_exists,
6875 name,
6876 history,
6877 }: AlterRetainHistoryStatement<Aug>,
6878) -> Result<Plan, PlanError> {
6879 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6880}
6881
6882fn alter_retain_history(
6883 scx: &StatementContext,
6884 object_type: ObjectType,
6885 if_exists: bool,
6886 name: UnresolvedObjectName,
6887 history: Option<WithOptionValue<Aug>>,
6888) -> Result<Plan, PlanError> {
6889 let name = match (object_type, name) {
6890 (
6891 ObjectType::View
6893 | ObjectType::MaterializedView
6894 | ObjectType::Table
6895 | ObjectType::Source
6896 | ObjectType::Index,
6897 UnresolvedObjectName::Item(name),
6898 ) => name,
6899 (object_type, _) => {
6900 bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6901 }
6902 };
6903 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6904 Some(entry) => {
6905 let full_name = scx.catalog.resolve_full_name(entry.name());
6906 let item_type = entry.item_type();
6907
6908 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6910 return Err(PlanError::AlterViewOnMaterializedView(
6911 full_name.to_string(),
6912 ));
6913 } else if object_type == ObjectType::View {
6914 sql_bail!("{object_type} does not support RETAIN HISTORY")
6915 } else if object_type != item_type {
6916 sql_bail!(
6917 "\"{}\" is a {} not a {}",
6918 full_name,
6919 entry.item_type(),
6920 format!("{object_type}").to_lowercase()
6921 )
6922 }
6923
6924 let (value, lcw) = match &history {
6926 Some(WithOptionValue::RetainHistoryFor(value)) => {
6927 let window = OptionalDuration::try_from_value(value.clone())?;
6928 (Some(value.clone()), window.0)
6929 }
6930 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6932 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6933 };
6934 let window = plan_retain_history(scx, lcw)?;
6935
6936 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6937 id: entry.id(),
6938 value,
6939 window,
6940 object_type,
6941 }))
6942 }
6943 None => {
6944 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6945 name: name.to_ast_string_simple(),
6946 object_type,
6947 });
6948
6949 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6950 }
6951 }
6952}
6953
6954fn alter_source_timestamp_interval(
6955 scx: &StatementContext,
6956 if_exists: bool,
6957 source_name: UnresolvedItemName,
6958 value: Option<WithOptionValue<Aug>>,
6959) -> Result<Plan, PlanError> {
6960 let object_type = ObjectType::Source;
6961 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6962 Some(entry) => {
6963 let full_name = scx.catalog.resolve_full_name(entry.name());
6964 if entry.item_type() != CatalogItemType::Source {
6965 sql_bail!(
6966 "\"{}\" is a {} not a {}",
6967 full_name,
6968 entry.item_type(),
6969 format!("{object_type}").to_lowercase()
6970 )
6971 }
6972
6973 match value {
6974 Some(val) => {
6975 let val = match val {
6976 WithOptionValue::Value(v) => v,
6977 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
6978 };
6979 let duration = Duration::try_from_value(val.clone())?;
6980
6981 let min = scx.catalog.system_vars().min_timestamp_interval();
6982 let max = scx.catalog.system_vars().max_timestamp_interval();
6983 if duration < min || duration > max {
6984 return Err(PlanError::InvalidTimestampInterval {
6985 min,
6986 max,
6987 requested: duration,
6988 });
6989 }
6990
6991 Ok(Plan::AlterSourceTimestampInterval(
6992 AlterSourceTimestampIntervalPlan {
6993 id: entry.id(),
6994 value: Some(val),
6995 interval: duration,
6996 },
6997 ))
6998 }
6999 None => {
7000 let interval = scx.catalog.system_vars().default_timestamp_interval();
7001 Ok(Plan::AlterSourceTimestampInterval(
7002 AlterSourceTimestampIntervalPlan {
7003 id: entry.id(),
7004 value: None,
7005 interval,
7006 },
7007 ))
7008 }
7009 }
7010 }
7011 None => {
7012 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7013 name: source_name.to_ast_string_simple(),
7014 object_type,
7015 });
7016
7017 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
7018 }
7019 }
7020}
7021
7022pub fn describe_alter_secret_options(
7023 _: &StatementContext,
7024 _: AlterSecretStatement<Aug>,
7025) -> Result<StatementDesc, PlanError> {
7026 Ok(StatementDesc::new(None))
7027}
7028
7029pub fn plan_alter_secret(
7030 scx: &mut StatementContext,
7031 stmt: AlterSecretStatement<Aug>,
7032) -> Result<Plan, PlanError> {
7033 let AlterSecretStatement {
7034 name,
7035 if_exists,
7036 value,
7037 } = stmt;
7038 let object_type = ObjectType::Secret;
7039 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7040 Some(entry) => entry.id(),
7041 None => {
7042 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7043 name: name.to_string(),
7044 object_type,
7045 });
7046
7047 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7048 }
7049 };
7050
7051 let secret_as = query::plan_secret_as(scx, value)?;
7052
7053 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7054}
7055
7056pub fn describe_alter_connection(
7057 _: &StatementContext,
7058 _: AlterConnectionStatement<Aug>,
7059) -> Result<StatementDesc, PlanError> {
7060 Ok(StatementDesc::new(None))
7061}
7062
7063generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7064
7065pub fn plan_alter_connection(
7066 scx: &StatementContext,
7067 stmt: AlterConnectionStatement<Aug>,
7068) -> Result<Plan, PlanError> {
7069 let AlterConnectionStatement {
7070 name,
7071 if_exists,
7072 actions,
7073 with_options,
7074 } = stmt;
7075 let conn_name = normalize::unresolved_item_name(name)?;
7076 let entry = match scx.catalog.resolve_item(&conn_name) {
7077 Ok(entry) => entry,
7078 Err(_) if if_exists => {
7079 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7080 name: conn_name.to_string(),
7081 object_type: ObjectType::Connection,
7082 });
7083
7084 return Ok(Plan::AlterNoop(AlterNoopPlan {
7085 object_type: ObjectType::Connection,
7086 }));
7087 }
7088 Err(e) => return Err(e.into()),
7089 };
7090
7091 let connection = entry.connection()?;
7092
7093 if actions
7094 .iter()
7095 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7096 {
7097 if actions.len() > 1 {
7098 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7099 }
7100
7101 if !with_options.is_empty() {
7102 sql_bail!(
7103 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7104 with_options
7105 .iter()
7106 .map(|o| o.to_ast_string_simple())
7107 .join(", ")
7108 );
7109 }
7110
7111 if !matches!(connection, Connection::Ssh(_)) {
7112 sql_bail!(
7113 "{} is not an SSH connection",
7114 scx.catalog.resolve_full_name(entry.name())
7115 )
7116 }
7117
7118 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7119 id: entry.id(),
7120 action: crate::plan::AlterConnectionAction::RotateKeys,
7121 }));
7122 }
7123
7124 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7125 if options.validate.is_some() {
7126 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7127 }
7128
7129 let validate = match options.validate {
7130 Some(val) => val,
7131 None => {
7132 scx.catalog
7133 .system_vars()
7134 .enable_default_connection_validation()
7135 && connection.validate_by_default()
7136 }
7137 };
7138
7139 let connection_type = match connection {
7140 Connection::Aws(_) => CreateConnectionType::Aws,
7141 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7142 Connection::Gcp(_) => CreateConnectionType::Gcp,
7143 Connection::Kafka(_) => CreateConnectionType::Kafka,
7144 Connection::Csr(_) => CreateConnectionType::Csr,
7145 Connection::GlueSchemaRegistry(_) => CreateConnectionType::GlueSchemaRegistry,
7146 Connection::Postgres(_) => CreateConnectionType::Postgres,
7147 Connection::Ssh(_) => CreateConnectionType::Ssh,
7148 Connection::MySql(_) => CreateConnectionType::MySql,
7149 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7150 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7151 };
7152
7153 let specified_options: BTreeSet<_> = actions
7155 .iter()
7156 .map(|action: &AlterConnectionAction<Aug>| match action {
7157 AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7158 AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7159 AlterConnectionAction::RotateKeys => {
7160 Err(internal_err!("RotateKeys is handled separately above"))
7161 }
7162 })
7163 .collect::<Result<_, PlanError>>()?;
7164
7165 for invalid in INALTERABLE_OPTIONS {
7166 if specified_options.contains(invalid) {
7167 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7168 }
7169 }
7170
7171 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7172
7173 let mut set_options_vec: Vec<_> = Vec::new();
7175 let mut drop_options: BTreeSet<_> = BTreeSet::new();
7176 for action in actions {
7177 match action {
7178 AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7179 AlterConnectionAction::DropOption(name) => {
7180 drop_options.insert(name);
7181 }
7182 AlterConnectionAction::RotateKeys => {
7183 bail_internal!("RotateKeys is handled separately above")
7184 }
7185 }
7186 }
7187
7188 let set_options: BTreeMap<_, _> = set_options_vec
7189 .clone()
7190 .into_iter()
7191 .map(|option| (option.name, option.value))
7192 .collect();
7193
7194 let connection_options_extracted =
7198 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7199
7200 let duplicates: Vec<_> = connection_options_extracted
7201 .seen
7202 .intersection(&drop_options)
7203 .collect();
7204
7205 if !duplicates.is_empty() {
7206 sql_bail!(
7207 "cannot both SET and DROP/RESET options {}",
7208 duplicates
7209 .iter()
7210 .map(|option| option.to_string())
7211 .join(", ")
7212 )
7213 }
7214
7215 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7216 let set_options_count = mutually_exclusive_options
7217 .iter()
7218 .filter(|o| set_options.contains_key(o))
7219 .count();
7220 let drop_options_count = mutually_exclusive_options
7221 .iter()
7222 .filter(|o| drop_options.contains(o))
7223 .count();
7224
7225 if set_options_count > 0 && drop_options_count > 0 {
7227 sql_bail!(
7228 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7229 connection_type,
7230 mutually_exclusive_options
7231 .iter()
7232 .map(|option| option.to_string())
7233 .join(", ")
7234 )
7235 }
7236
7237 if set_options_count > 0 || drop_options_count > 0 {
7242 drop_options.extend(mutually_exclusive_options.iter().cloned());
7243 }
7244
7245 }
7248
7249 Ok(Plan::AlterConnection(AlterConnectionPlan {
7250 id: entry.id(),
7251 action: crate::plan::AlterConnectionAction::AlterOptions {
7252 set_options,
7253 drop_options,
7254 validate,
7255 },
7256 }))
7257}
7258
7259pub fn describe_alter_sink(
7260 _: &StatementContext,
7261 _: AlterSinkStatement<Aug>,
7262) -> Result<StatementDesc, PlanError> {
7263 Ok(StatementDesc::new(None))
7264}
7265
7266pub fn plan_alter_sink(
7267 scx: &mut StatementContext,
7268 stmt: AlterSinkStatement<Aug>,
7269) -> Result<Plan, PlanError> {
7270 let AlterSinkStatement {
7271 sink_name,
7272 if_exists,
7273 action,
7274 } = stmt;
7275
7276 let object_type = ObjectType::Sink;
7277 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7278
7279 let Some(item) = item else {
7280 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7281 name: sink_name.to_string(),
7282 object_type,
7283 });
7284
7285 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7286 };
7287 let item = item.at_version(RelationVersionSelector::Latest);
7289
7290 match action {
7291 AlterSinkAction::ChangeRelation(new_from) => {
7292 let create_sql = item.create_sql();
7294 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7295 let [stmt]: [StatementParseResult; 1] = stmts
7296 .try_into()
7297 .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7298 let Statement::CreateSink(stmt) = stmt.ast else {
7299 bail_internal!("create SQL of sink is not a CREATE SINK statement");
7300 };
7301
7302 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7304 stmt.from = new_from;
7305
7306 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7308 bail_internal!("plan_sink did not produce a CreateSink plan");
7309 };
7310
7311 plan.sink.version += 1;
7312
7313 Ok(Plan::AlterSink(AlterSinkPlan {
7314 item_id: item.id(),
7315 global_id: item.global_id(),
7316 sink: plan.sink,
7317 with_snapshot: plan.with_snapshot,
7318 in_cluster: plan.in_cluster,
7319 }))
7320 }
7321 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7322 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7323 }
7324}
7325
7326pub fn describe_alter_source(
7327 _: &StatementContext,
7328 _: AlterSourceStatement<Aug>,
7329) -> Result<StatementDesc, PlanError> {
7330 Ok(StatementDesc::new(None))
7332}
7333
7334generate_extracted_config!(
7335 AlterSourceAddSubsourceOption,
7336 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7337 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7338 (Details, String)
7339);
7340
7341pub fn plan_alter_source(
7342 scx: &mut StatementContext,
7343 stmt: AlterSourceStatement<Aug>,
7344) -> Result<Plan, PlanError> {
7345 let AlterSourceStatement {
7346 source_name,
7347 if_exists,
7348 action,
7349 } = stmt;
7350 let object_type = ObjectType::Source;
7351
7352 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7353 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7354 name: source_name.to_string(),
7355 object_type,
7356 });
7357
7358 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7359 }
7360
7361 match action {
7362 AlterSourceAction::SetOptions(options) => {
7363 let mut options = options.into_iter();
7364 let option = options
7365 .next()
7366 .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7367 if option.name == CreateSourceOptionName::RetainHistory {
7368 if options.next().is_some() {
7369 sql_bail!("RETAIN HISTORY must be only option");
7370 }
7371 return alter_retain_history(
7372 scx,
7373 object_type,
7374 if_exists,
7375 UnresolvedObjectName::Item(source_name),
7376 option.value,
7377 );
7378 }
7379 if option.name == CreateSourceOptionName::TimestampInterval {
7380 if options.next().is_some() {
7381 sql_bail!("TIMESTAMP INTERVAL must be only option");
7382 }
7383 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7384 }
7385 sql_bail!(
7388 "Cannot modify the {} of a SOURCE.",
7389 option.name.to_ast_string_simple()
7390 );
7391 }
7392 AlterSourceAction::ResetOptions(reset) => {
7393 let mut options = reset.into_iter();
7394 let option = options
7395 .next()
7396 .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7397 if option == CreateSourceOptionName::RetainHistory {
7398 if options.next().is_some() {
7399 sql_bail!("RETAIN HISTORY must be only option");
7400 }
7401 return alter_retain_history(
7402 scx,
7403 object_type,
7404 if_exists,
7405 UnresolvedObjectName::Item(source_name),
7406 None,
7407 );
7408 }
7409 if option == CreateSourceOptionName::TimestampInterval {
7410 if options.next().is_some() {
7411 sql_bail!("TIMESTAMP INTERVAL must be only option");
7412 }
7413 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7414 }
7415 sql_bail!(
7416 "Cannot modify the {} of a SOURCE.",
7417 option.to_ast_string_simple()
7418 );
7419 }
7420 AlterSourceAction::DropSubsources { .. } => {
7421 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7422 }
7423 AlterSourceAction::AddSubsources { .. } => {
7424 sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7425 }
7426 AlterSourceAction::RefreshReferences => {
7427 sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7428 }
7429 };
7430}
7431
7432pub fn describe_alter_system_set(
7433 _: &StatementContext,
7434 _: AlterSystemSetStatement,
7435) -> Result<StatementDesc, PlanError> {
7436 Ok(StatementDesc::new(None))
7437}
7438
7439pub fn plan_alter_system_set(
7440 _: &StatementContext,
7441 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7442) -> Result<Plan, PlanError> {
7443 let name = name.to_string();
7444 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7445 name,
7446 value: scl::plan_set_variable_to(to)?,
7447 }))
7448}
7449
7450pub fn describe_alter_system_reset(
7451 _: &StatementContext,
7452 _: AlterSystemResetStatement,
7453) -> Result<StatementDesc, PlanError> {
7454 Ok(StatementDesc::new(None))
7455}
7456
7457pub fn plan_alter_system_reset(
7458 _: &StatementContext,
7459 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7460) -> Result<Plan, PlanError> {
7461 let name = name.to_string();
7462 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7463}
7464
7465pub fn describe_alter_system_reset_all(
7466 _: &StatementContext,
7467 _: AlterSystemResetAllStatement,
7468) -> Result<StatementDesc, PlanError> {
7469 Ok(StatementDesc::new(None))
7470}
7471
7472pub fn plan_alter_system_reset_all(
7473 _: &StatementContext,
7474 _: AlterSystemResetAllStatement,
7475) -> Result<Plan, PlanError> {
7476 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7477}
7478
7479pub fn describe_alter_role(
7480 _: &StatementContext,
7481 _: AlterRoleStatement<Aug>,
7482) -> Result<StatementDesc, PlanError> {
7483 Ok(StatementDesc::new(None))
7484}
7485
7486pub fn plan_alter_role(
7487 scx: &StatementContext,
7488 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7489) -> Result<Plan, PlanError> {
7490 let option = match option {
7491 AlterRoleOption::Attributes(attrs) => {
7492 let attrs = plan_role_attributes(attrs, scx)?;
7493 PlannedAlterRoleOption::Attributes(attrs)
7494 }
7495 AlterRoleOption::Variable(variable) => {
7496 let var = plan_role_variable(scx, variable)?;
7497 PlannedAlterRoleOption::Variable(var)
7498 }
7499 };
7500
7501 Ok(Plan::AlterRole(AlterRolePlan {
7502 id: name.id,
7503 name: name.name,
7504 option,
7505 }))
7506}
7507
7508pub fn describe_alter_table_add_column(
7509 _: &StatementContext,
7510 _: AlterTableAddColumnStatement<Aug>,
7511) -> Result<StatementDesc, PlanError> {
7512 Ok(StatementDesc::new(None))
7513}
7514
7515pub fn plan_alter_table_add_column(
7516 scx: &StatementContext,
7517 stmt: AlterTableAddColumnStatement<Aug>,
7518) -> Result<Plan, PlanError> {
7519 let AlterTableAddColumnStatement {
7520 if_exists,
7521 name,
7522 if_col_not_exist,
7523 column_name,
7524 data_type,
7525 } = stmt;
7526 let object_type = ObjectType::Table;
7527
7528 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7529
7530 let (relation_id, item_name, desc) =
7531 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7532 Some(item) => {
7533 let item_name = scx.catalog.resolve_full_name(item.name());
7535 let item = item.at_version(RelationVersionSelector::Latest);
7536 let desc = item
7537 .relation_desc()
7538 .ok_or_else(|| sql_err!("item does not have a relation description"))?
7539 .into_owned();
7540 (item.id(), item_name, desc)
7541 }
7542 None => {
7543 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7544 name: name.to_ast_string_simple(),
7545 object_type,
7546 });
7547 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7548 }
7549 };
7550
7551 let column_name = ColumnName::from(column_name.as_str());
7552 if desc.get_by_name(&column_name).is_some() {
7553 if if_col_not_exist {
7554 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7555 column_name: column_name.to_string(),
7556 object_name: item_name.item,
7557 });
7558 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7559 } else {
7560 return Err(PlanError::ColumnAlreadyExists {
7561 column_name,
7562 object_name: item_name.item,
7563 });
7564 }
7565 }
7566
7567 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7568 let column_type = scalar_type.nullable(true);
7570 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7572
7573 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7574 relation_id,
7575 column_name,
7576 column_type,
7577 raw_sql_type,
7578 }))
7579}
7580
7581pub fn describe_alter_materialized_view_apply_replacement(
7582 _: &StatementContext,
7583 _: AlterMaterializedViewApplyReplacementStatement,
7584) -> Result<StatementDesc, PlanError> {
7585 Ok(StatementDesc::new(None))
7586}
7587
7588pub fn plan_alter_materialized_view_apply_replacement(
7589 scx: &StatementContext,
7590 stmt: AlterMaterializedViewApplyReplacementStatement,
7591) -> Result<Plan, PlanError> {
7592 let AlterMaterializedViewApplyReplacementStatement {
7593 if_exists,
7594 name,
7595 replacement_name,
7596 } = stmt;
7597
7598 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7599
7600 let object_type = ObjectType::MaterializedView;
7601 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7602 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7603 name: name.to_ast_string_simple(),
7604 object_type,
7605 });
7606 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7607 };
7608
7609 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7610 .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7611
7612 if replacement.replacement_target() != Some(mv.id()) {
7613 return Err(PlanError::InvalidReplacement {
7614 item_type: mv.item_type(),
7615 item_name: scx.catalog.minimal_qualification(mv.name()),
7616 replacement_type: replacement.item_type(),
7617 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7618 });
7619 }
7620
7621 Ok(Plan::AlterMaterializedViewApplyReplacement(
7622 AlterMaterializedViewApplyReplacementPlan {
7623 id: mv.id(),
7624 replacement_id: replacement.id(),
7625 },
7626 ))
7627}
7628
7629pub fn describe_comment(
7630 _: &StatementContext,
7631 _: CommentStatement<Aug>,
7632) -> Result<StatementDesc, PlanError> {
7633 Ok(StatementDesc::new(None))
7634}
7635
7636pub fn plan_comment(
7637 scx: &mut StatementContext,
7638 stmt: CommentStatement<Aug>,
7639) -> Result<Plan, PlanError> {
7640 const MAX_COMMENT_LENGTH: usize = 1024;
7641
7642 let CommentStatement { object, comment } = stmt;
7643
7644 if let Some(c) = &comment {
7646 if c.len() > 1024 {
7647 return Err(PlanError::CommentTooLong {
7648 length: c.len(),
7649 max_size: MAX_COMMENT_LENGTH,
7650 });
7651 }
7652 }
7653
7654 let (object_id, column_pos) = match &object {
7655 com_ty @ CommentObjectType::Table { name }
7656 | com_ty @ CommentObjectType::View { name }
7657 | com_ty @ CommentObjectType::MaterializedView { name }
7658 | com_ty @ CommentObjectType::Index { name }
7659 | com_ty @ CommentObjectType::Func { name }
7660 | com_ty @ CommentObjectType::Connection { name }
7661 | com_ty @ CommentObjectType::Source { name }
7662 | com_ty @ CommentObjectType::Sink { name }
7663 | com_ty @ CommentObjectType::Secret { name } => {
7664 let item = scx.get_item_by_resolved_name(name)?;
7665 match (com_ty, item.item_type()) {
7666 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7667 (CommentObjectId::Table(item.id()), None)
7668 }
7669 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7670 (CommentObjectId::View(item.id()), None)
7671 }
7672 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7673 (CommentObjectId::MaterializedView(item.id()), None)
7674 }
7675 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7676 (CommentObjectId::Index(item.id()), None)
7677 }
7678 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7679 (CommentObjectId::Func(item.id()), None)
7680 }
7681 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7682 (CommentObjectId::Connection(item.id()), None)
7683 }
7684 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7685 (CommentObjectId::Source(item.id()), None)
7686 }
7687 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7688 (CommentObjectId::Sink(item.id()), None)
7689 }
7690 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7691 (CommentObjectId::Secret(item.id()), None)
7692 }
7693 (com_ty, cat_ty) => {
7694 let expected_type = match com_ty {
7695 CommentObjectType::Table { .. } => ObjectType::Table,
7696 CommentObjectType::View { .. } => ObjectType::View,
7697 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7698 CommentObjectType::Index { .. } => ObjectType::Index,
7699 CommentObjectType::Func { .. } => ObjectType::Func,
7700 CommentObjectType::Connection { .. } => ObjectType::Connection,
7701 CommentObjectType::Source { .. } => ObjectType::Source,
7702 CommentObjectType::Sink { .. } => ObjectType::Sink,
7703 CommentObjectType::Secret { .. } => ObjectType::Secret,
7704 _ => sql_bail!("cannot comment on this object type"),
7705 };
7706
7707 return Err(PlanError::InvalidObjectType {
7708 expected_type: SystemObjectType::Object(expected_type),
7709 actual_type: SystemObjectType::Object(cat_ty.into()),
7710 object_name: item.name().item.clone(),
7711 });
7712 }
7713 }
7714 }
7715 CommentObjectType::Type { ty } => match ty {
7716 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7717 sql_bail!("cannot comment on anonymous list or map type");
7718 }
7719 ResolvedDataType::Named { id, modifiers, .. } => {
7720 if !modifiers.is_empty() {
7721 sql_bail!("cannot comment on type with modifiers");
7722 }
7723 (CommentObjectId::Type(*id), None)
7724 }
7725 ResolvedDataType::Error => bail_internal!("unresolved data type"),
7726 },
7727 CommentObjectType::Column { name } => {
7728 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7729 match item.item_type() {
7730 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7731 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7732 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7733 CatalogItemType::MaterializedView => {
7734 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7735 }
7736 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7737 r => {
7738 return Err(PlanError::Unsupported {
7739 feature: format!("Specifying comments on a column of {r}"),
7740 discussion_no: None,
7741 });
7742 }
7743 }
7744 }
7745 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7746 CommentObjectType::Database { name } => {
7747 (CommentObjectId::Database(*name.database_id()), None)
7748 }
7749 CommentObjectType::Schema { name } => {
7750 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7754 sql_bail!(
7755 "cannot comment on schema {} because it is a temporary schema",
7756 mz_repr::namespaces::MZ_TEMP_SCHEMA
7757 );
7758 }
7759 (
7760 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7761 None,
7762 )
7763 }
7764 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7765 CommentObjectType::ClusterReplica { name } => {
7766 let replica = scx.catalog.resolve_cluster_replica(name)?;
7767 (
7768 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7769 None,
7770 )
7771 }
7772 CommentObjectType::NetworkPolicy { name } => {
7773 (CommentObjectId::NetworkPolicy(name.id), None)
7774 }
7775 };
7776
7777 if let Some(p) = column_pos {
7783 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7784 max_num_columns: MAX_NUM_COLUMNS,
7785 req_num_columns: p,
7786 })?;
7787 }
7788
7789 Ok(Plan::Comment(CommentPlan {
7790 object_id,
7791 sub_component: column_pos,
7792 comment,
7793 }))
7794}
7795
7796pub(crate) fn resolve_cluster<'a>(
7797 scx: &'a StatementContext,
7798 name: &'a Ident,
7799 if_exists: bool,
7800) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7801 match scx.resolve_cluster(Some(name)) {
7802 Ok(cluster) => Ok(Some(cluster)),
7803 Err(_) if if_exists => Ok(None),
7804 Err(e) => Err(e),
7805 }
7806}
7807
7808pub(crate) fn resolve_cluster_replica<'a>(
7809 scx: &'a StatementContext,
7810 name: &QualifiedReplica,
7811 if_exists: bool,
7812) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7813 match scx.resolve_cluster(Some(&name.cluster)) {
7814 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7815 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7816 None if if_exists => Ok(None),
7817 None => Err(sql_err!(
7818 "CLUSTER {} has no CLUSTER REPLICA named {}",
7819 cluster.name(),
7820 name.replica.as_str().quoted(),
7821 )),
7822 },
7823 Err(_) if if_exists => Ok(None),
7824 Err(e) => Err(e),
7825 }
7826}
7827
7828pub(crate) fn resolve_database<'a>(
7829 scx: &'a StatementContext,
7830 name: &'a UnresolvedDatabaseName,
7831 if_exists: bool,
7832) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7833 match scx.resolve_database(name) {
7834 Ok(database) => Ok(Some(database)),
7835 Err(_) if if_exists => Ok(None),
7836 Err(e) => Err(e),
7837 }
7838}
7839
7840pub(crate) fn resolve_schema<'a>(
7841 scx: &'a StatementContext,
7842 name: UnresolvedSchemaName,
7843 if_exists: bool,
7844) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7845 match scx.resolve_schema(name) {
7846 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7847 Err(_) if if_exists => Ok(None),
7848 Err(e) => Err(e),
7849 }
7850}
7851
7852pub(crate) fn resolve_network_policy<'a>(
7853 scx: &'a StatementContext,
7854 name: Ident,
7855 if_exists: bool,
7856) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7857 match scx.catalog.resolve_network_policy(&name.to_string()) {
7858 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7859 id: policy.id(),
7860 name: policy.name().to_string(),
7861 })),
7862 Err(_) if if_exists => Ok(None),
7863 Err(e) => Err(e.into()),
7864 }
7865}
7866
7867pub(crate) fn resolve_item_or_type<'a>(
7868 scx: &'a StatementContext,
7869 object_type: ObjectType,
7870 name: UnresolvedItemName,
7871 if_exists: bool,
7872) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7873 let name = normalize::unresolved_item_name(name)?;
7874 let catalog_item = match object_type {
7875 ObjectType::Type => scx.catalog.resolve_type(&name),
7876 ObjectType::Table
7877 | ObjectType::View
7878 | ObjectType::MaterializedView
7879 | ObjectType::Source
7880 | ObjectType::Sink
7881 | ObjectType::Index
7882 | ObjectType::Role
7883 | ObjectType::Cluster
7884 | ObjectType::ClusterReplica
7885 | ObjectType::Secret
7886 | ObjectType::Connection
7887 | ObjectType::Database
7888 | ObjectType::Schema
7889 | ObjectType::Func
7890 | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7891 };
7892
7893 match catalog_item {
7894 Ok(item) => {
7895 let is_type = ObjectType::from(item.item_type());
7896 if object_type == is_type {
7897 Ok(Some(item))
7898 } else {
7899 Err(PlanError::MismatchedObjectType {
7900 name: scx.catalog.minimal_qualification(item.name()),
7901 is_type,
7902 expected_type: object_type,
7903 })
7904 }
7905 }
7906 Err(_) if if_exists => Ok(None),
7907 Err(e) => Err(e.into()),
7908 }
7909}
7910
7911fn ensure_cluster_is_not_managed(
7913 scx: &StatementContext,
7914 cluster_id: ClusterId,
7915) -> Result<(), PlanError> {
7916 let cluster = scx.catalog.get_cluster(cluster_id);
7917 if cluster.is_managed() {
7918 Err(PlanError::ManagedCluster {
7919 cluster_name: cluster.name().to_string(),
7920 })
7921 } else {
7922 Ok(())
7923 }
7924}