1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155 ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156 ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157 CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158 CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165 WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 duration
906 }
907 None => scx.catalog.config().timestamp_interval,
908 };
909
910 let (desc, data_source) = match progress_subsource {
911 Some(name) => {
912 let DeferredItemName::Named(name) = name else {
913 sql_bail!("[internal error] progress subsource must be named during purification");
914 };
915 let ResolvedItemName::Item { id, .. } = name else {
916 sql_bail!("[internal error] invalid target id");
917 };
918
919 let details = match external_connection {
920 GenericSourceConnection::Kafka(ref c) => {
921 SourceExportDetails::Kafka(KafkaSourceExportDetails {
922 metadata_columns: c.metadata_columns.clone(),
923 })
924 }
925 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926 LoadGenerator::Auction
927 | LoadGenerator::Marketing
928 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929 LoadGenerator::Counter { .. }
930 | LoadGenerator::Clock
931 | LoadGenerator::Datums
932 | LoadGenerator::KeyValue(_) => {
933 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934 output: LoadGeneratorOutput::Default,
935 })
936 }
937 },
938 GenericSourceConnection::Postgres(_)
939 | GenericSourceConnection::MySql(_)
940 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941 };
942
943 let data_source = DataSourceDesc::OldSyntaxIngestion {
944 desc: SourceDesc {
945 connection: external_connection,
946 timestamp_interval,
947 },
948 progress_subsource: *id,
949 data_config: SourceExportDataConfig {
950 encoding,
951 envelope: envelope.clone(),
952 },
953 details,
954 };
955 (desc, data_source)
956 }
957 None => {
958 let desc = external_connection.timestamp_desc();
959 let data_source = DataSourceDesc::Ingestion(SourceDesc {
960 connection: external_connection,
961 timestamp_interval,
962 });
963 (desc, data_source)
964 }
965 };
966
967 let if_not_exists = *if_not_exists;
968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970 let full_name = scx.catalog.resolve_full_name(&name);
972 let partial_name = PartialItemName::from(full_name.clone());
973 if let (false, Ok(item)) = (
976 if_not_exists,
977 scx.catalog.resolve_item_or_type(&partial_name),
978 ) {
979 return Err(PlanError::ItemAlreadyExists {
980 name: full_name.to_string(),
981 item_type: item.item_type(),
982 });
983 }
984
985 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992 let timeline = match envelope {
994 SourceEnvelope::CdcV2 => {
995 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996 }
997 _ => Timeline::EpochMilliseconds,
998 };
999
1000 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002 let source = Source {
1003 create_sql,
1004 data_source,
1005 desc,
1006 compaction_window,
1007 };
1008
1009 Ok(Plan::CreateSource(CreateSourcePlan {
1010 name,
1011 source,
1012 if_not_exists,
1013 timeline,
1014 in_cluster: Some(in_cluster.id()),
1015 }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019 scx: &StatementContext<'_>,
1020 source_connection: &CreateSourceConnection<Aug>,
1021 include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023 Ok(match source_connection {
1024 CreateSourceConnection::Kafka {
1025 connection,
1026 options,
1027 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028 scx,
1029 connection,
1030 options,
1031 include_metadata,
1032 )?),
1033 CreateSourceConnection::Postgres {
1034 connection,
1035 options,
1036 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037 scx, connection, options,
1038 )?),
1039 CreateSourceConnection::SqlServer {
1040 connection,
1041 options,
1042 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043 scx, connection, options,
1044 )?),
1045 CreateSourceConnection::MySql {
1046 connection,
1047 options,
1048 } => {
1049 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053 scx,
1054 generator,
1055 options,
1056 include_metadata,
1057 )?)
1058 }
1059 })
1060}
1061
1062fn plan_load_generator_source_connection(
1063 scx: &StatementContext<'_>,
1064 generator: &ast::LoadGenerator,
1065 options: &Vec<LoadGeneratorOption<Aug>>,
1066 include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068 let load_generator =
1069 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070 let LoadGeneratorOptionExtracted {
1071 tick_interval,
1072 as_of,
1073 up_to,
1074 ..
1075 } = options.clone().try_into()?;
1076 let tick_micros = match tick_interval {
1077 Some(interval) => Some(interval.as_micros().try_into()?),
1078 None => None,
1079 };
1080 if up_to < as_of {
1081 sql_bail!("UP TO cannot be less than AS OF");
1082 }
1083 Ok(LoadGeneratorSourceConnection {
1084 load_generator,
1085 tick_micros,
1086 as_of,
1087 up_to,
1088 })
1089}
1090
1091fn plan_mysql_source_connection(
1092 scx: &StatementContext<'_>,
1093 connection: &ResolvedItemName,
1094 options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096 let connection_item = scx.get_item_by_resolved_name(connection)?;
1097 match connection_item.connection()? {
1098 Connection::MySql(connection) => connection,
1099 _ => sql_bail!(
1100 "{} is not a MySQL connection",
1101 scx.catalog.resolve_full_name(connection_item.name())
1102 ),
1103 };
1104 let MySqlConfigOptionExtracted {
1105 details,
1106 text_columns: _,
1110 exclude_columns: _,
1111 seen: _,
1112 } = options.clone().try_into()?;
1113 let details = details
1114 .as_ref()
1115 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119 Ok(MySqlSourceConnection {
1120 connection: connection_item.id(),
1121 connection_id: connection_item.id(),
1122 details,
1123 })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127 scx: &StatementContext<'_>,
1128 connection: &ResolvedItemName,
1129 options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131 let connection_item = scx.get_item_by_resolved_name(connection)?;
1132 match connection_item.connection()? {
1133 Connection::SqlServer(connection) => connection,
1134 _ => sql_bail!(
1135 "{} is not a SQL Server connection",
1136 scx.catalog.resolve_full_name(connection_item.name())
1137 ),
1138 };
1139 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140 let details = details
1141 .as_ref()
1142 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143 let extras = hex::decode(details)
1144 .map_err(|e| sql_err!("{e}"))
1145 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147 Ok(SqlServerSourceConnection {
1148 connection_id: connection_item.id(),
1149 connection: connection_item.id(),
1150 extras,
1151 })
1152}
1153
1154fn plan_postgres_source_connection(
1155 scx: &StatementContext<'_>,
1156 connection: &ResolvedItemName,
1157 options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159 let connection_item = scx.get_item_by_resolved_name(connection)?;
1160 let PgConfigOptionExtracted {
1161 details,
1162 publication,
1163 text_columns: _,
1167 exclude_columns: _,
1171 seen: _,
1172 } = options.clone().try_into()?;
1173 let details = details
1174 .as_ref()
1175 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177 let details =
1178 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179 let publication_details =
1180 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181 Ok(PostgresSourceConnection {
1182 connection: connection_item.id(),
1183 connection_id: connection_item.id(),
1184 publication: publication.expect("validated exists during purification"),
1185 publication_details,
1186 })
1187}
1188
1189fn plan_kafka_source_connection(
1190 scx: &StatementContext<'_>,
1191 connection_name: &ResolvedItemName,
1192 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193 include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197 sql_bail!(
1198 "{} is not a kafka connection",
1199 scx.catalog.resolve_full_name(connection_item.name())
1200 )
1201 }
1202 let KafkaSourceConfigOptionExtracted {
1203 group_id_prefix,
1204 topic,
1205 topic_metadata_refresh_interval,
1206 start_timestamp: _, start_offset,
1208 seen: _,
1209 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210 let topic = topic.expect("validated exists during purification");
1211 let mut start_offsets = BTreeMap::new();
1212 if let Some(offsets) = start_offset {
1213 for (part, offset) in offsets.iter().enumerate() {
1214 if *offset < 0 {
1215 sql_bail!("START OFFSET must be a nonnegative integer");
1216 }
1217 start_offsets.insert(i32::try_from(part)?, *offset);
1218 }
1219 }
1220 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224 }
1225 let metadata_columns = include_metadata
1226 .into_iter()
1227 .flat_map(|item| match item {
1228 SourceIncludeMetadata::Timestamp { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "timestamp".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Timestamp))
1234 }
1235 SourceIncludeMetadata::Partition { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "partition".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Partition))
1241 }
1242 SourceIncludeMetadata::Offset { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "offset".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Offset))
1248 }
1249 SourceIncludeMetadata::Headers { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "headers".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Headers))
1255 }
1256 SourceIncludeMetadata::Header {
1257 alias,
1258 key,
1259 use_bytes,
1260 } => Some((
1261 alias.to_string(),
1262 KafkaMetadataKind::Header {
1263 key: key.clone(),
1264 use_bytes: *use_bytes,
1265 },
1266 )),
1267 SourceIncludeMetadata::Key { .. } => {
1268 None
1270 }
1271 })
1272 .collect();
1273 Ok(KafkaSourceConnection {
1274 connection: connection_item.id(),
1275 connection_id: connection_item.id(),
1276 topic,
1277 start_offsets,
1278 group_id_prefix,
1279 topic_metadata_refresh_interval,
1280 metadata_columns,
1281 })
1282}
1283
1284fn apply_source_envelope_encoding(
1285 scx: &StatementContext,
1286 envelope: &ast::SourceEnvelope,
1287 format: &Option<FormatSpecifier<Aug>>,
1288 key_desc: Option<RelationDesc>,
1289 value_desc: RelationDesc,
1290 include_metadata: &[SourceIncludeMetadata],
1291 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292 source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294 (
1295 RelationDesc,
1296 SourceEnvelope,
1297 Option<SourceDataEncoding<ReferencedConnection>>,
1298 ),
1299 PlanError,
1300> {
1301 let encoding = match format {
1302 Some(format) => Some(get_encoding(scx, format, envelope)?),
1303 None => None,
1304 };
1305
1306 let (key_desc, value_desc) = match &encoding {
1307 Some(encoding) => {
1308 match value_desc.typ().columns() {
1311 [typ] => match typ.scalar_type {
1312 SqlScalarType::Bytes => {}
1313 _ => sql_bail!(
1314 "The schema produced by the source is incompatible with format decoding"
1315 ),
1316 },
1317 _ => sql_bail!(
1318 "The schema produced by the source is incompatible with format decoding"
1319 ),
1320 }
1321
1322 let (key_desc, value_desc) = encoding.desc()?;
1323
1324 let key_desc = key_desc.map(|desc| {
1331 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333 if is_kafka && is_envelope_none {
1334 RelationDesc::from_names_and_types(
1335 desc.into_iter()
1336 .map(|(name, typ)| (name, typ.nullable(true))),
1337 )
1338 } else {
1339 desc
1340 }
1341 });
1342 (key_desc, value_desc)
1343 }
1344 None => (key_desc, value_desc),
1345 };
1346
1347 let key_envelope_no_encoding = matches!(
1360 source_connection,
1361 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362 load_generator: LoadGenerator::KeyValue(_),
1363 ..
1364 })
1365 );
1366 let mut key_envelope = get_key_envelope(
1367 include_metadata,
1368 encoding.as_ref(),
1369 key_envelope_no_encoding,
1370 )?;
1371
1372 match (&envelope, &key_envelope) {
1373 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376 ),
1377 _ => {}
1378 };
1379
1380 let envelope = match &envelope {
1389 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391 ast::SourceEnvelope::Debezium => {
1392 let after_idx = match typecheck_debezium(&value_desc) {
1394 Ok((_before_idx, after_idx)) => Ok(after_idx),
1395 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396 Some(DataEncoding::Avro(_)) => Err(type_err),
1397 _ => Err(sql_err!(
1398 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399 )),
1400 },
1401 }?;
1402
1403 UnplannedSourceEnvelope::Upsert {
1404 style: UpsertStyle::Debezium { after_idx },
1405 }
1406 }
1407 ast::SourceEnvelope::Upsert {
1408 value_decode_err_policy,
1409 } => {
1410 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411 None => {
1412 if !key_envelope_no_encoding {
1413 bail_unsupported!(format!(
1414 "UPSERT requires a key/value format: {:?}",
1415 format
1416 ))
1417 }
1418 None
1419 }
1420 Some(key_encoding) => Some(key_encoding),
1421 };
1422 if key_envelope == KeyEnvelope::None {
1425 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426 }
1427 let style = match value_decode_err_policy.as_slice() {
1429 [] => UpsertStyle::Default(key_envelope),
1430 [SourceErrorPolicy::Inline { alias }] => {
1431 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432 UpsertStyle::ValueErrInline {
1433 key_envelope,
1434 error_column: alias
1435 .as_ref()
1436 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437 }
1438 }
1439 _ => {
1440 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441 }
1442 };
1443
1444 UnplannedSourceEnvelope::Upsert { style }
1445 }
1446 ast::SourceEnvelope::CdcV2 => {
1447 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448 match format {
1450 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452 }
1453 UnplannedSourceEnvelope::CdcV2
1454 }
1455 };
1456
1457 let metadata_desc = included_column_desc(metadata_columns_desc);
1458 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460 Ok((desc, envelope, encoding))
1461}
1462
1463fn plan_source_export_desc(
1466 scx: &StatementContext,
1467 name: &UnresolvedItemName,
1468 columns: &Vec<ColumnDef<Aug>>,
1469 constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471 let names: Vec<_> = columns
1472 .iter()
1473 .map(|c| normalize::column_name(c.name.clone()))
1474 .collect();
1475
1476 if let Some(dup) = names.iter().duplicates().next() {
1477 sql_bail!("column {} specified more than once", dup.quoted());
1478 }
1479
1480 let mut column_types = Vec::with_capacity(columns.len());
1483 let mut keys = Vec::new();
1484
1485 for (i, c) in columns.into_iter().enumerate() {
1486 let aug_data_type = &c.data_type;
1487 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488 let mut nullable = true;
1489 for option in &c.options {
1490 match &option.option {
1491 ColumnOption::NotNull => nullable = false,
1492 ColumnOption::Default(_) => {
1493 bail_unsupported!("Source export with default value")
1494 }
1495 ColumnOption::Unique { is_primary } => {
1496 keys.push(vec![i]);
1497 if *is_primary {
1498 nullable = false;
1499 }
1500 }
1501 other => {
1502 bail_unsupported!(format!("Source export with column constraint: {}", other))
1503 }
1504 }
1505 }
1506 column_types.push(ty.nullable(nullable));
1507 }
1508
1509 let mut seen_primary = false;
1510 'c: for constraint in constraints {
1511 match constraint {
1512 TableConstraint::Unique {
1513 name: _,
1514 columns,
1515 is_primary,
1516 nulls_not_distinct,
1517 } => {
1518 if seen_primary && *is_primary {
1519 sql_bail!(
1520 "multiple primary keys for source export {} are not allowed",
1521 name.to_ast_string_stable()
1522 );
1523 }
1524 seen_primary = *is_primary || seen_primary;
1525
1526 let mut key = vec![];
1527 for column in columns {
1528 let column = normalize::column_name(column.clone());
1529 match names.iter().position(|name| *name == column) {
1530 None => sql_bail!("unknown column in constraint: {}", column),
1531 Some(i) => {
1532 let nullable = &mut column_types[i].nullable;
1533 if *is_primary {
1534 if *nulls_not_distinct {
1535 sql_bail!(
1536 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537 );
1538 }
1539 *nullable = false;
1540 } else if !(*nulls_not_distinct || !*nullable) {
1541 break 'c;
1544 }
1545
1546 key.push(i);
1547 }
1548 }
1549 }
1550
1551 if *is_primary {
1552 keys.insert(0, key);
1553 } else {
1554 keys.push(key);
1555 }
1556 }
1557 TableConstraint::ForeignKey { .. } => {
1558 bail_unsupported!("Source export with a foreign key")
1559 }
1560 TableConstraint::Check { .. } => {
1561 bail_unsupported!("Source export with a check constraint")
1562 }
1563 }
1564 }
1565
1566 let typ = SqlRelationType::new(column_types).with_keys(keys);
1567 let desc = RelationDesc::new(typ, names);
1568 Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572 CreateSubsourceOption,
1573 (Progress, bool, Default(false)),
1574 (ExternalReference, UnresolvedItemName),
1575 (RetainHistory, OptionalDuration),
1576 (TextColumns, Vec::<Ident>, Default(vec![])),
1577 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578 (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582 scx: &StatementContext,
1583 stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585 let CreateSubsourceStatement {
1586 name,
1587 columns,
1588 of_source,
1589 constraints,
1590 if_not_exists,
1591 with_options,
1592 } = &stmt;
1593
1594 let CreateSubsourceOptionExtracted {
1595 progress,
1596 retain_history,
1597 external_reference,
1598 text_columns,
1599 exclude_columns,
1600 details,
1601 seen: _,
1602 } = with_options.clone().try_into()?;
1603
1604 assert!(
1609 progress ^ (external_reference.is_some() && of_source.is_some()),
1610 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611 );
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.unwrap();
1630
1631 let details = details
1634 .as_ref()
1635 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637 let details =
1638 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641 let details = match details {
1642 SourceExportStatementDetails::Postgres { table } => {
1643 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644 column_casts: crate::pure::postgres::generate_column_casts(
1645 scx,
1646 &table,
1647 &text_columns,
1648 )?,
1649 table,
1650 })
1651 }
1652 SourceExportStatementDetails::MySql {
1653 table,
1654 initial_gtid_set,
1655 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656 table,
1657 initial_gtid_set,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::SqlServer {
1665 table,
1666 capture_instance,
1667 initial_lsn,
1668 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669 capture_instance,
1670 table,
1671 initial_lsn,
1672 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673 exclude_columns: exclude_columns
1674 .into_iter()
1675 .map(|c| c.into_string())
1676 .collect(),
1677 }),
1678 SourceExportStatementDetails::LoadGenerator { output } => {
1679 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680 }
1681 SourceExportStatementDetails::Kafka {} => {
1682 bail_unsupported!("subsources cannot reference Kafka sources")
1683 }
1684 };
1685 DataSourceDesc::IngestionExport {
1686 ingestion_id,
1687 external_reference,
1688 details,
1689 data_config: SourceExportDataConfig {
1691 envelope: SourceEnvelope::None(NoneEnvelope {
1692 key_envelope: KeyEnvelope::None,
1693 key_arity: 0,
1694 }),
1695 encoding: None,
1696 },
1697 }
1698 } else if progress {
1699 DataSourceDesc::Progress
1700 } else {
1701 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702 };
1703
1704 let if_not_exists = *if_not_exists;
1705 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710 let source = Source {
1711 create_sql,
1712 data_source,
1713 desc,
1714 compaction_window,
1715 };
1716
1717 Ok(Plan::CreateSource(CreateSourcePlan {
1718 name,
1719 source,
1720 if_not_exists,
1721 timeline: Timeline::EpochMilliseconds,
1722 in_cluster: None,
1723 }))
1724}
1725
1726generate_extracted_config!(
1727 TableFromSourceOption,
1728 (TextColumns, Vec::<Ident>, Default(vec![])),
1729 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730 (PartitionBy, Vec<Ident>),
1731 (RetainHistory, OptionalDuration),
1732 (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736 scx: &StatementContext,
1737 stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739 if !scx.catalog.system_vars().enable_create_table_from_source() {
1740 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741 }
1742
1743 let CreateTableFromSourceStatement {
1744 name,
1745 columns,
1746 constraints,
1747 if_not_exists,
1748 source,
1749 external_reference,
1750 envelope,
1751 format,
1752 include_metadata,
1753 with_options,
1754 } = &stmt;
1755
1756 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758 let TableFromSourceOptionExtracted {
1759 text_columns,
1760 exclude_columns,
1761 retain_history,
1762 partition_by,
1763 details,
1764 seen: _,
1765 } = with_options.clone().try_into()?;
1766
1767 let source_item = scx.get_item_by_resolved_name(source)?;
1768 let ingestion_id = source_item.id();
1769
1770 let details = details
1773 .as_ref()
1774 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776 let details =
1777 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778 let details =
1779 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782 && include_metadata
1783 .iter()
1784 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785 {
1786 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788 }
1789 if !matches!(
1790 details,
1791 SourceExportStatementDetails::Kafka { .. }
1792 | SourceExportStatementDetails::LoadGenerator { .. }
1793 ) && !include_metadata.is_empty()
1794 {
1795 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796 }
1797
1798 let details = match details {
1799 SourceExportStatementDetails::Postgres { table } => {
1800 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801 column_casts: crate::pure::postgres::generate_column_casts(
1802 scx,
1803 &table,
1804 &text_columns,
1805 )?,
1806 table,
1807 })
1808 }
1809 SourceExportStatementDetails::MySql {
1810 table,
1811 initial_gtid_set,
1812 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813 table,
1814 initial_gtid_set,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::SqlServer {
1822 table,
1823 capture_instance,
1824 initial_lsn,
1825 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826 table,
1827 capture_instance,
1828 initial_lsn,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 }),
1835 SourceExportStatementDetails::LoadGenerator { output } => {
1836 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837 }
1838 SourceExportStatementDetails::Kafka {} => {
1839 if !include_metadata.is_empty()
1840 && !matches!(
1841 envelope,
1842 ast::SourceEnvelope::Upsert { .. }
1843 | ast::SourceEnvelope::None
1844 | ast::SourceEnvelope::Debezium
1845 )
1846 {
1847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849 }
1850
1851 let metadata_columns = include_metadata
1852 .into_iter()
1853 .flat_map(|item| match item {
1854 SourceIncludeMetadata::Timestamp { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "timestamp".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Timestamp))
1860 }
1861 SourceIncludeMetadata::Partition { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "partition".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Partition))
1867 }
1868 SourceIncludeMetadata::Offset { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "offset".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Offset))
1874 }
1875 SourceIncludeMetadata::Headers { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "headers".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Headers))
1881 }
1882 SourceIncludeMetadata::Header {
1883 alias,
1884 key,
1885 use_bytes,
1886 } => Some((
1887 alias.to_string(),
1888 KafkaMetadataKind::Header {
1889 key: key.clone(),
1890 use_bytes: *use_bytes,
1891 },
1892 )),
1893 SourceIncludeMetadata::Key { .. } => {
1894 None
1896 }
1897 })
1898 .collect();
1899
1900 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901 }
1902 };
1903
1904 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906 let (key_desc, value_desc) =
1911 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912 let columns = match columns {
1913 TableFromSourceColumns::Defined(columns) => columns,
1914 _ => unreachable!(),
1915 };
1916 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917 (None, desc)
1918 } else {
1919 let key_desc = source_connection.default_key_desc();
1920 let value_desc = source_connection.default_value_desc();
1921 (Some(key_desc), value_desc)
1922 };
1923
1924 let metadata_columns_desc = match &details {
1925 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926 metadata_columns, ..
1927 }) => kafka_metadata_columns_desc(metadata_columns),
1928 _ => vec![],
1929 };
1930
1931 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932 scx,
1933 &envelope,
1934 format,
1935 key_desc,
1936 value_desc,
1937 include_metadata,
1938 metadata_columns_desc,
1939 source_connection,
1940 )?;
1941 if let TableFromSourceColumns::Named(col_names) = columns {
1942 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943 }
1944
1945 let names: Vec<_> = desc.iter_names().cloned().collect();
1946 if let Some(dup) = names.iter().duplicates().next() {
1947 sql_bail!("column {} specified more than once", dup.quoted());
1948 }
1949
1950 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952 let timeline = match envelope {
1955 SourceEnvelope::CdcV2 => {
1956 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957 }
1958 _ => Timeline::EpochMilliseconds,
1959 };
1960
1961 if let Some(partition_by) = partition_by {
1962 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963 check_partition_by(&desc, partition_by)?;
1964 }
1965
1966 let data_source = DataSourceDesc::IngestionExport {
1967 ingestion_id,
1968 external_reference: external_reference
1969 .as_ref()
1970 .expect("populated in purification")
1971 .clone(),
1972 details,
1973 data_config: SourceExportDataConfig { envelope, encoding },
1974 };
1975
1976 let if_not_exists = *if_not_exists;
1977
1978 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981 let table = Table {
1982 create_sql,
1983 desc: VersionedRelationDesc::new(desc),
1984 temporary: false,
1985 compaction_window,
1986 data_source: TableDataSource::DataSource {
1987 desc: data_source,
1988 timeline,
1989 },
1990 };
1991
1992 Ok(Plan::CreateTable(CreateTablePlan {
1993 name,
1994 table,
1995 if_not_exists,
1996 }))
1997}
1998
1999generate_extracted_config!(
2000 LoadGeneratorOption,
2001 (TickInterval, Duration),
2002 (AsOf, u64, Default(0_u64)),
2003 (UpTo, u64, Default(u64::MAX)),
2004 (ScaleFactor, f64),
2005 (MaxCardinality, u64),
2006 (Keys, u64),
2007 (SnapshotRounds, u64),
2008 (TransactionalSnapshot, bool),
2009 (ValueSize, u64),
2010 (Seed, u64),
2011 (Partitions, u64),
2012 (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016 pub(super) fn ensure_only_valid_options(
2017 &self,
2018 loadgen: &ast::LoadGenerator,
2019 ) -> Result<(), PlanError> {
2020 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022 let mut options = self.seen.clone();
2023
2024 let permitted_options: &[_] = match loadgen {
2025 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031 ast::LoadGenerator::KeyValue => &[
2032 TickInterval,
2033 Keys,
2034 SnapshotRounds,
2035 TransactionalSnapshot,
2036 ValueSize,
2037 Seed,
2038 Partitions,
2039 BatchSize,
2040 ],
2041 };
2042
2043 for o in permitted_options {
2044 options.remove(o);
2045 }
2046
2047 if !options.is_empty() {
2048 sql_bail!(
2049 "{} load generators do not support {} values",
2050 loadgen,
2051 options.iter().join(", ")
2052 )
2053 }
2054
2055 Ok(())
2056 }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060 scx: &StatementContext,
2061 loadgen: &ast::LoadGenerator,
2062 options: &[LoadGeneratorOption<Aug>],
2063 include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066 extracted.ensure_only_valid_options(loadgen)?;
2067
2068 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070 }
2071
2072 let load_generator = match loadgen {
2073 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074 ast::LoadGenerator::Clock => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076 LoadGenerator::Clock
2077 }
2078 ast::LoadGenerator::Counter => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080 let LoadGeneratorOptionExtracted {
2081 max_cardinality, ..
2082 } = extracted;
2083 LoadGenerator::Counter { max_cardinality }
2084 }
2085 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086 ast::LoadGenerator::Datums => {
2087 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088 LoadGenerator::Datums
2089 }
2090 ast::LoadGenerator::Tpch => {
2091 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093 let sf: f64 = scale_factor.unwrap_or(0.01);
2095 if !sf.is_finite() || sf < 0.0 {
2096 sql_bail!("unsupported scale factor {sf}");
2097 }
2098
2099 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100 let total = (sf * multiplier).floor();
2101 let mut i = i64::try_cast_from(total)
2102 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103 if i < 1 {
2104 i = 1;
2105 }
2106 Ok(i)
2107 };
2108
2109 let count_supplier = f_to_i(10_000f64)?;
2112 let count_part = f_to_i(200_000f64)?;
2113 let count_customer = f_to_i(150_000f64)?;
2114 let count_orders = f_to_i(150_000f64 * 10f64)?;
2115 let count_clerk = f_to_i(1_000f64)?;
2116
2117 LoadGenerator::Tpch {
2118 count_supplier,
2119 count_part,
2120 count_customer,
2121 count_orders,
2122 count_clerk,
2123 }
2124 }
2125 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127 let LoadGeneratorOptionExtracted {
2128 keys,
2129 snapshot_rounds,
2130 transactional_snapshot,
2131 value_size,
2132 tick_interval,
2133 seed,
2134 partitions,
2135 batch_size,
2136 ..
2137 } = extracted;
2138
2139 let mut include_offset = None;
2140 for im in include_metadata {
2141 match im {
2142 SourceIncludeMetadata::Offset { alias } => {
2143 include_offset = match alias {
2144 Some(alias) => Some(alias.to_string()),
2145 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146 }
2147 }
2148 SourceIncludeMetadata::Key { .. } => continue,
2149
2150 _ => {
2151 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152 }
2153 };
2154 }
2155
2156 let lgkv = KeyValueLoadGenerator {
2157 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158 snapshot_rounds: snapshot_rounds
2159 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162 value_size: value_size
2163 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164 partitions: partitions
2165 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166 tick_interval,
2167 batch_size: batch_size
2168 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170 include_offset,
2171 };
2172
2173 if lgkv.keys == 0
2174 || lgkv.partitions == 0
2175 || lgkv.value_size == 0
2176 || lgkv.batch_size == 0
2177 {
2178 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179 }
2180
2181 if lgkv.keys % lgkv.partitions != 0 {
2182 sql_bail!("KEYS must be a multiple of PARTITIONS")
2183 }
2184
2185 if lgkv.batch_size > lgkv.keys {
2186 sql_bail!("KEYS must be larger than BATCH SIZE")
2187 }
2188
2189 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193 }
2194
2195 if lgkv.snapshot_rounds == 0 {
2196 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197 }
2198
2199 LoadGenerator::KeyValue(lgkv)
2200 }
2201 };
2202
2203 Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207 let before = value_desc.get_by_name(&"before".into());
2208 let (after_idx, after_ty) = value_desc
2209 .get_by_name(&"after".into())
2210 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211 let before_idx = if let Some((before_idx, before_ty)) = before {
2212 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213 sql_bail!("'before' column must be of type record");
2214 }
2215 if before_ty != after_ty {
2216 sql_bail!("'before' type differs from 'after' column");
2217 }
2218 Some(before_idx)
2219 } else {
2220 None
2221 };
2222 Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226 scx: &StatementContext,
2227 format: &FormatSpecifier<Aug>,
2228 envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230 let encoding = match format {
2231 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232 FormatSpecifier::KeyValue { key, value } => {
2233 let key = {
2234 let encoding = get_encoding_inner(scx, key)?;
2235 Some(encoding.key.unwrap_or(encoding.value))
2236 };
2237 let value = get_encoding_inner(scx, value)?.value;
2238 SourceDataEncoding { key, value }
2239 }
2240 };
2241
2242 let requires_keyvalue = matches!(
2243 envelope,
2244 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245 );
2246 let is_keyvalue = encoding.key.is_some();
2247 if requires_keyvalue && !is_keyvalue {
2248 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249 };
2250
2251 Ok(encoding)
2252}
2253
2254fn source_sink_cluster_config<'a, 'ctx>(
2260 scx: &'a StatementContext<'ctx>,
2261 in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263 let cluster = match in_cluster {
2264 None => {
2265 let cluster = scx.catalog.resolve_cluster(None)?;
2266 *in_cluster = Some(ResolvedClusterName {
2267 id: cluster.id(),
2268 print_name: None,
2269 });
2270 cluster
2271 }
2272 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273 };
2274
2275 Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282 pub key_schema: Option<String>,
2283 pub value_schema: String,
2284 pub key_reference_schemas: Vec<String>,
2286 pub value_reference_schemas: Vec<String>,
2288 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2289 pub confluent_wire_format: bool,
2290}
2291
2292fn get_encoding_inner(
2293 scx: &StatementContext,
2294 format: &Format<Aug>,
2295) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2296 let value = match format {
2297 Format::Bytes => DataEncoding::Bytes,
2298 Format::Avro(schema) => {
2299 let Schema {
2300 key_schema,
2301 value_schema,
2302 key_reference_schemas,
2303 value_reference_schemas,
2304 csr_connection,
2305 confluent_wire_format,
2306 } = match schema {
2307 AvroSchema::InlineSchema {
2310 schema: ast::Schema { schema },
2311 with_options,
2312 } => {
2313 let AvroSchemaOptionExtracted {
2314 confluent_wire_format,
2315 ..
2316 } = with_options.clone().try_into()?;
2317
2318 Schema {
2319 key_schema: None,
2320 value_schema: schema.clone(),
2321 key_reference_schemas: vec![],
2322 value_reference_schemas: vec![],
2323 csr_connection: None,
2324 confluent_wire_format,
2325 }
2326 }
2327 AvroSchema::Csr {
2328 csr_connection:
2329 CsrConnectionAvro {
2330 connection,
2331 seed,
2332 key_strategy: _,
2333 value_strategy: _,
2334 },
2335 } => {
2336 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2337 let csr_connection = match item.connection()? {
2338 Connection::Csr(_) => item.id(),
2339 _ => {
2340 sql_bail!(
2341 "{} is not a schema registry connection",
2342 scx.catalog
2343 .resolve_full_name(item.name())
2344 .to_string()
2345 .quoted()
2346 )
2347 }
2348 };
2349
2350 if let Some(seed) = seed {
2351 Schema {
2352 key_schema: seed.key_schema.clone(),
2353 value_schema: seed.value_schema.clone(),
2354 key_reference_schemas: seed.key_reference_schemas.clone(),
2355 value_reference_schemas: seed.value_reference_schemas.clone(),
2356 csr_connection: Some(csr_connection),
2357 confluent_wire_format: true,
2358 }
2359 } else {
2360 unreachable!("CSR seed resolution should already have been called: Avro")
2361 }
2362 }
2363 };
2364
2365 if let Some(key_schema) = key_schema {
2366 return Ok(SourceDataEncoding {
2367 key: Some(DataEncoding::Avro(AvroEncoding {
2368 schema: key_schema,
2369 reference_schemas: key_reference_schemas,
2370 csr_connection: csr_connection.clone(),
2371 confluent_wire_format,
2372 })),
2373 value: DataEncoding::Avro(AvroEncoding {
2374 schema: value_schema,
2375 reference_schemas: value_reference_schemas,
2376 csr_connection,
2377 confluent_wire_format,
2378 }),
2379 });
2380 } else {
2381 DataEncoding::Avro(AvroEncoding {
2382 schema: value_schema,
2383 reference_schemas: value_reference_schemas,
2384 csr_connection,
2385 confluent_wire_format,
2386 })
2387 }
2388 }
2389 Format::Protobuf(schema) => match schema {
2390 ProtobufSchema::Csr {
2391 csr_connection:
2392 CsrConnectionProtobuf {
2393 connection:
2394 CsrConnection {
2395 connection,
2396 options,
2397 },
2398 seed,
2399 },
2400 } => {
2401 if let Some(CsrSeedProtobuf { key, value }) = seed {
2402 let item = scx.get_item_by_resolved_name(connection)?;
2403 let _ = match item.connection()? {
2404 Connection::Csr(connection) => connection,
2405 _ => {
2406 sql_bail!(
2407 "{} is not a schema registry connection",
2408 scx.catalog
2409 .resolve_full_name(item.name())
2410 .to_string()
2411 .quoted()
2412 )
2413 }
2414 };
2415
2416 if !options.is_empty() {
2417 sql_bail!("Protobuf CSR connections do not support any options");
2418 }
2419
2420 let value = DataEncoding::Protobuf(ProtobufEncoding {
2421 descriptors: strconv::parse_bytes(&value.schema)?,
2422 message_name: value.message_name.clone(),
2423 confluent_wire_format: true,
2424 });
2425 if let Some(key) = key {
2426 return Ok(SourceDataEncoding {
2427 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2428 descriptors: strconv::parse_bytes(&key.schema)?,
2429 message_name: key.message_name.clone(),
2430 confluent_wire_format: true,
2431 })),
2432 value,
2433 });
2434 }
2435 value
2436 } else {
2437 unreachable!("CSR seed resolution should already have been called: Proto")
2438 }
2439 }
2440 ProtobufSchema::InlineSchema {
2441 message_name,
2442 schema: ast::Schema { schema },
2443 } => {
2444 let descriptors = strconv::parse_bytes(schema)?;
2445
2446 DataEncoding::Protobuf(ProtobufEncoding {
2447 descriptors,
2448 message_name: message_name.to_owned(),
2449 confluent_wire_format: false,
2450 })
2451 }
2452 },
2453 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2454 regex: mz_repr::adt::regex::Regex::new(regex, false)
2455 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2456 }),
2457 Format::Csv { columns, delimiter } => {
2458 let columns = match columns {
2459 CsvColumns::Header { names } => {
2460 if names.is_empty() {
2461 sql_bail!("[internal error] column spec should get names in purify")
2462 }
2463 ColumnSpec::Header {
2464 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2465 }
2466 }
2467 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2468 };
2469 DataEncoding::Csv(CsvEncoding {
2470 columns,
2471 delimiter: u8::try_from(*delimiter)
2472 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2473 })
2474 }
2475 Format::Json { array: false } => DataEncoding::Json,
2476 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2477 Format::Text => DataEncoding::Text,
2478 };
2479 Ok(SourceDataEncoding { key: None, value })
2480}
2481
2482fn get_key_envelope(
2484 included_items: &[SourceIncludeMetadata],
2485 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2486 key_envelope_no_encoding: bool,
2487) -> Result<KeyEnvelope, PlanError> {
2488 let key_definition = included_items
2489 .iter()
2490 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2491 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2492 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2493 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2494 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2495 (Some(name), _) if key_envelope_no_encoding => {
2496 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2497 }
2498 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2499 (_, None) => {
2500 sql_bail!(
2504 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2505 got bare FORMAT"
2506 );
2507 }
2508 }
2509 } else {
2510 Ok(KeyEnvelope::None)
2511 }
2512}
2513
2514fn get_unnamed_key_envelope(
2517 key: Option<&DataEncoding<ReferencedConnection>>,
2518) -> Result<KeyEnvelope, PlanError> {
2519 let is_composite = match key {
2523 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2524 Some(
2525 DataEncoding::Avro(_)
2526 | DataEncoding::Csv(_)
2527 | DataEncoding::Protobuf(_)
2528 | DataEncoding::Regex { .. },
2529 ) => true,
2530 None => false,
2531 };
2532
2533 if is_composite {
2534 Ok(KeyEnvelope::Flattened)
2535 } else {
2536 Ok(KeyEnvelope::Named("key".to_string()))
2537 }
2538}
2539
2540pub fn describe_create_view(
2541 _: &StatementContext,
2542 _: CreateViewStatement<Aug>,
2543) -> Result<StatementDesc, PlanError> {
2544 Ok(StatementDesc::new(None))
2545}
2546
2547pub fn plan_view(
2548 scx: &StatementContext,
2549 def: &mut ViewDefinition<Aug>,
2550 temporary: bool,
2551) -> Result<(QualifiedItemName, View), PlanError> {
2552 let create_sql = normalize::create_statement(
2553 scx,
2554 Statement::CreateView(CreateViewStatement {
2555 if_exists: IfExistsBehavior::Error,
2556 temporary,
2557 definition: def.clone(),
2558 }),
2559 )?;
2560
2561 let ViewDefinition {
2562 name,
2563 columns,
2564 query,
2565 } = def;
2566
2567 let query::PlannedRootQuery {
2568 expr,
2569 mut desc,
2570 finishing,
2571 scope: _,
2572 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2573 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2579 &finishing,
2580 expr.arity()
2581 ));
2582 if expr.contains_parameters()? {
2583 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2584 }
2585
2586 let dependencies = expr
2587 .depends_on()
2588 .into_iter()
2589 .map(|gid| scx.catalog.resolve_item_id(&gid))
2590 .collect();
2591
2592 let name = if temporary {
2593 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2594 } else {
2595 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2596 };
2597
2598 plan_utils::maybe_rename_columns(
2599 format!("view {}", scx.catalog.resolve_full_name(&name)),
2600 &mut desc,
2601 columns,
2602 )?;
2603 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2604
2605 if let Some(dup) = names.iter().duplicates().next() {
2606 sql_bail!("column {} specified more than once", dup.quoted());
2607 }
2608
2609 let view = View {
2610 create_sql,
2611 expr,
2612 dependencies,
2613 column_names: names,
2614 temporary,
2615 };
2616
2617 Ok((name, view))
2618}
2619
2620pub fn plan_create_view(
2621 scx: &StatementContext,
2622 mut stmt: CreateViewStatement<Aug>,
2623) -> Result<Plan, PlanError> {
2624 let CreateViewStatement {
2625 temporary,
2626 if_exists,
2627 definition,
2628 } = &mut stmt;
2629 let (name, view) = plan_view(scx, definition, *temporary)?;
2630
2631 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2634
2635 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2636 let if_exists = true;
2637 let cascade = false;
2638 let maybe_item_to_drop = plan_drop_item(
2639 scx,
2640 ObjectType::View,
2641 if_exists,
2642 definition.name.clone(),
2643 cascade,
2644 )?;
2645
2646 if let Some(id) = maybe_item_to_drop {
2648 let dependencies = view.expr.depends_on();
2649 let invalid_drop = scx
2650 .get_item(&id)
2651 .global_ids()
2652 .any(|gid| dependencies.contains(&gid));
2653 if invalid_drop {
2654 let item = scx.catalog.get_item(&id);
2655 sql_bail!(
2656 "cannot replace view {0}: depended upon by new {0} definition",
2657 scx.catalog.resolve_full_name(item.name())
2658 );
2659 }
2660
2661 Some(id)
2662 } else {
2663 None
2664 }
2665 } else {
2666 None
2667 };
2668 let drop_ids = replace
2669 .map(|id| {
2670 scx.catalog
2671 .item_dependents(id)
2672 .into_iter()
2673 .map(|id| id.unwrap_item_id())
2674 .collect()
2675 })
2676 .unwrap_or_default();
2677
2678 validate_view_dependencies(scx, &view.dependencies.0)?;
2679
2680 let full_name = scx.catalog.resolve_full_name(&name);
2682 let partial_name = PartialItemName::from(full_name.clone());
2683 if let (Ok(item), IfExistsBehavior::Error, false) = (
2686 scx.catalog.resolve_item_or_type(&partial_name),
2687 *if_exists,
2688 ignore_if_exists_errors,
2689 ) {
2690 return Err(PlanError::ItemAlreadyExists {
2691 name: full_name.to_string(),
2692 item_type: item.item_type(),
2693 });
2694 }
2695
2696 Ok(Plan::CreateView(CreateViewPlan {
2697 name,
2698 view,
2699 replace,
2700 drop_ids,
2701 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2702 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2703 }))
2704}
2705
2706fn validate_view_dependencies(
2708 scx: &StatementContext,
2709 dependencies: &BTreeSet<CatalogItemId>,
2710) -> Result<(), PlanError> {
2711 for id in dependencies {
2712 let item = scx.catalog.get_item(id);
2713 if item.replacement_target().is_some() {
2714 let name = scx.catalog.minimal_qualification(item.name());
2715 return Err(PlanError::InvalidDependency {
2716 name: name.to_string(),
2717 item_type: format!("replacement {}", item.item_type()),
2718 });
2719 }
2720 }
2721
2722 Ok(())
2723}
2724
2725pub fn describe_create_materialized_view(
2726 _: &StatementContext,
2727 _: CreateMaterializedViewStatement<Aug>,
2728) -> Result<StatementDesc, PlanError> {
2729 Ok(StatementDesc::new(None))
2730}
2731
2732pub fn describe_create_continual_task(
2733 _: &StatementContext,
2734 _: CreateContinualTaskStatement<Aug>,
2735) -> Result<StatementDesc, PlanError> {
2736 Ok(StatementDesc::new(None))
2737}
2738
2739pub fn describe_create_network_policy(
2740 _: &StatementContext,
2741 _: CreateNetworkPolicyStatement<Aug>,
2742) -> Result<StatementDesc, PlanError> {
2743 Ok(StatementDesc::new(None))
2744}
2745
2746pub fn describe_alter_network_policy(
2747 _: &StatementContext,
2748 _: AlterNetworkPolicyStatement<Aug>,
2749) -> Result<StatementDesc, PlanError> {
2750 Ok(StatementDesc::new(None))
2751}
2752
2753pub fn plan_create_materialized_view(
2754 scx: &StatementContext,
2755 mut stmt: CreateMaterializedViewStatement<Aug>,
2756) -> Result<Plan, PlanError> {
2757 let cluster_id =
2758 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2759 stmt.in_cluster = Some(ResolvedClusterName {
2760 id: cluster_id,
2761 print_name: None,
2762 });
2763
2764 let create_sql =
2765 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2766
2767 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2768 let name = scx.allocate_qualified_name(partial_name.clone())?;
2769
2770 let query::PlannedRootQuery {
2771 expr,
2772 mut desc,
2773 finishing,
2774 scope: _,
2775 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2776 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2778 &finishing,
2779 expr.arity()
2780 ));
2781 if expr.contains_parameters()? {
2782 return Err(PlanError::ParameterNotAllowed(
2783 "materialized views".to_string(),
2784 ));
2785 }
2786
2787 plan_utils::maybe_rename_columns(
2788 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2789 &mut desc,
2790 &stmt.columns,
2791 )?;
2792 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2793
2794 let MaterializedViewOptionExtracted {
2795 assert_not_null,
2796 partition_by,
2797 retain_history,
2798 refresh,
2799 seen: _,
2800 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2801
2802 if let Some(partition_by) = partition_by {
2803 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2804 check_partition_by(&desc, partition_by)?;
2805 }
2806
2807 let refresh_schedule = {
2808 let mut refresh_schedule = RefreshSchedule::default();
2809 let mut on_commits_seen = 0;
2810 for refresh_option_value in refresh {
2811 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2812 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2813 }
2814 match refresh_option_value {
2815 RefreshOptionValue::OnCommit => {
2816 on_commits_seen += 1;
2817 }
2818 RefreshOptionValue::AtCreation => {
2819 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2820 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2821 }
2822 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2823 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2825 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2826 name: "REFRESH AT",
2827 scope: &Scope::empty(),
2828 relation_type: &SqlRelationType::empty(),
2829 allow_aggregates: false,
2830 allow_subqueries: false,
2831 allow_parameters: false,
2832 allow_windows: false,
2833 };
2834 let hir = plan_expr(ecx, &time)?.cast_to(
2835 ecx,
2836 CastContext::Assignment,
2837 &SqlScalarType::MzTimestamp,
2838 )?;
2839 let timestamp = hir
2841 .into_literal_mz_timestamp()
2842 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2843 refresh_schedule.ats.push(timestamp);
2844 }
2845 RefreshOptionValue::Every(RefreshEveryOptionValue {
2846 interval,
2847 aligned_to,
2848 }) => {
2849 let interval = Interval::try_from_value(Value::Interval(interval))?;
2850 if interval.as_microseconds() <= 0 {
2851 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2852 }
2853 if interval.months != 0 {
2854 sql_bail!("REFRESH interval must not involve units larger than days");
2859 }
2860 let interval = interval.duration()?;
2861 if u64::try_from(interval.as_millis()).is_err() {
2862 sql_bail!("REFRESH interval too large");
2863 }
2864
2865 let mut aligned_to = match aligned_to {
2866 Some(aligned_to) => aligned_to,
2867 None => {
2868 soft_panic_or_log!(
2869 "ALIGNED TO should have been filled in by purification"
2870 );
2871 sql_bail!(
2872 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2873 )
2874 }
2875 };
2876
2877 transform_ast::transform(scx, &mut aligned_to)?;
2879
2880 let ecx = &ExprContext {
2881 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2882 name: "REFRESH EVERY ... ALIGNED TO",
2883 scope: &Scope::empty(),
2884 relation_type: &SqlRelationType::empty(),
2885 allow_aggregates: false,
2886 allow_subqueries: false,
2887 allow_parameters: false,
2888 allow_windows: false,
2889 };
2890 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2891 ecx,
2892 CastContext::Assignment,
2893 &SqlScalarType::MzTimestamp,
2894 )?;
2895 let aligned_to_const = aligned_to_hir
2897 .into_literal_mz_timestamp()
2898 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2899
2900 refresh_schedule.everies.push(RefreshEvery {
2901 interval,
2902 aligned_to: aligned_to_const,
2903 });
2904 }
2905 }
2906 }
2907
2908 if on_commits_seen > 1 {
2909 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2910 }
2911 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2912 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2913 }
2914
2915 if refresh_schedule == RefreshSchedule::default() {
2916 None
2917 } else {
2918 Some(refresh_schedule)
2919 }
2920 };
2921
2922 let as_of = stmt.as_of.map(Timestamp::from);
2923 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2924 let mut non_null_assertions = assert_not_null
2925 .into_iter()
2926 .map(normalize::column_name)
2927 .map(|assertion_name| {
2928 column_names
2929 .iter()
2930 .position(|col| col == &assertion_name)
2931 .ok_or_else(|| {
2932 sql_err!(
2933 "column {} in ASSERT NOT NULL option not found",
2934 assertion_name.quoted()
2935 )
2936 })
2937 })
2938 .collect::<Result<Vec<_>, _>>()?;
2939 non_null_assertions.sort();
2940 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2941 let dup = &column_names[*dup];
2942 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2943 }
2944
2945 if let Some(dup) = column_names.iter().duplicates().next() {
2946 sql_bail!("column {} specified more than once", dup.quoted());
2947 }
2948
2949 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2952 Ok(true) => IfExistsBehavior::Skip,
2953 _ => stmt.if_exists,
2954 };
2955
2956 let mut replace = None;
2957 let mut if_not_exists = false;
2958 match if_exists {
2959 IfExistsBehavior::Replace => {
2960 let if_exists = true;
2961 let cascade = false;
2962 let replace_id = plan_drop_item(
2963 scx,
2964 ObjectType::MaterializedView,
2965 if_exists,
2966 partial_name.clone().into(),
2967 cascade,
2968 )?;
2969
2970 if let Some(id) = replace_id {
2972 let dependencies = expr.depends_on();
2973 let invalid_drop = scx
2974 .get_item(&id)
2975 .global_ids()
2976 .any(|gid| dependencies.contains(&gid));
2977 if invalid_drop {
2978 let item = scx.catalog.get_item(&id);
2979 sql_bail!(
2980 "cannot replace materialized view {0}: depended upon by new {0} definition",
2981 scx.catalog.resolve_full_name(item.name())
2982 );
2983 }
2984 replace = Some(id);
2985 }
2986 }
2987 IfExistsBehavior::Skip => if_not_exists = true,
2988 IfExistsBehavior::Error => (),
2989 }
2990 let drop_ids = replace
2991 .map(|id| {
2992 scx.catalog
2993 .item_dependents(id)
2994 .into_iter()
2995 .map(|id| id.unwrap_item_id())
2996 .collect()
2997 })
2998 .unwrap_or_default();
2999 let mut dependencies: BTreeSet<_> = expr
3000 .depends_on()
3001 .into_iter()
3002 .map(|gid| scx.catalog.resolve_item_id(&gid))
3003 .collect();
3004
3005 let mut replacement_target = None;
3007 if let Some(target_name) = &stmt.replacement_for {
3008 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3009
3010 let target = scx.get_item_by_resolved_name(target_name)?;
3011 if target.item_type() != CatalogItemType::MaterializedView {
3012 return Err(PlanError::InvalidReplacement {
3013 item_type: target.item_type(),
3014 item_name: scx.catalog.minimal_qualification(target.name()),
3015 replacement_type: CatalogItemType::MaterializedView,
3016 replacement_name: partial_name,
3017 });
3018 }
3019 if target.id().is_system() {
3020 sql_bail!(
3021 "cannot replace {} because it is required by the database system",
3022 scx.catalog.minimal_qualification(target.name()),
3023 );
3024 }
3025
3026 for dependent in scx.catalog.item_dependents(target.id()) {
3028 if let ObjectId::Item(id) = dependent
3029 && dependencies.contains(&id)
3030 {
3031 sql_bail!(
3032 "replacement would cause {} to depend on itself",
3033 scx.catalog.minimal_qualification(target.name()),
3034 );
3035 }
3036 }
3037
3038 dependencies.insert(target.id());
3039
3040 for use_id in target.used_by() {
3041 let use_item = scx.get_item(use_id);
3042 if use_item.replacement_target() == Some(target.id()) {
3043 sql_bail!(
3044 "cannot replace {} because it already has a replacement: {}",
3045 scx.catalog.minimal_qualification(target.name()),
3046 scx.catalog.minimal_qualification(use_item.name()),
3047 );
3048 }
3049 }
3050
3051 replacement_target = Some(target.id());
3052 }
3053
3054 validate_view_dependencies(scx, &dependencies)?;
3055
3056 let full_name = scx.catalog.resolve_full_name(&name);
3058 let partial_name = PartialItemName::from(full_name.clone());
3059 if let (IfExistsBehavior::Error, Ok(item)) =
3062 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3063 {
3064 return Err(PlanError::ItemAlreadyExists {
3065 name: full_name.to_string(),
3066 item_type: item.item_type(),
3067 });
3068 }
3069
3070 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3071 name,
3072 materialized_view: MaterializedView {
3073 create_sql,
3074 expr,
3075 dependencies: DependencyIds(dependencies),
3076 column_names,
3077 replacement_target,
3078 cluster_id,
3079 non_null_assertions,
3080 compaction_window,
3081 refresh_schedule,
3082 as_of,
3083 },
3084 replace,
3085 drop_ids,
3086 if_not_exists,
3087 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3088 }))
3089}
3090
3091generate_extracted_config!(
3092 MaterializedViewOption,
3093 (AssertNotNull, Ident, AllowMultiple),
3094 (PartitionBy, Vec<Ident>),
3095 (RetainHistory, OptionalDuration),
3096 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3097);
3098
3099pub fn plan_create_continual_task(
3100 scx: &StatementContext,
3101 mut stmt: CreateContinualTaskStatement<Aug>,
3102) -> Result<Plan, PlanError> {
3103 match &stmt.sugar {
3104 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3105 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3106 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3107 }
3108 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3109 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3110 }
3111 };
3112 let cluster_id = match &stmt.in_cluster {
3113 None => scx.catalog.resolve_cluster(None)?.id(),
3114 Some(in_cluster) => in_cluster.id,
3115 };
3116 stmt.in_cluster = Some(ResolvedClusterName {
3117 id: cluster_id,
3118 print_name: None,
3119 });
3120
3121 let create_sql =
3122 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3123
3124 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3125
3126 let mut desc = match stmt.columns {
3131 None => None,
3132 Some(columns) => {
3133 let mut desc_columns = Vec::with_capacity(columns.capacity());
3134 for col in columns.iter() {
3135 desc_columns.push((
3136 normalize::column_name(col.name.clone()),
3137 SqlColumnType {
3138 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3139 nullable: false,
3140 },
3141 ));
3142 }
3143 Some(RelationDesc::from_names_and_types(desc_columns))
3144 }
3145 };
3146 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3147 match input.item_type() {
3148 CatalogItemType::ContinualTask
3151 | CatalogItemType::Table
3152 | CatalogItemType::MaterializedView
3153 | CatalogItemType::Source => {}
3154 CatalogItemType::Sink
3155 | CatalogItemType::View
3156 | CatalogItemType::Index
3157 | CatalogItemType::Type
3158 | CatalogItemType::Func
3159 | CatalogItemType::Secret
3160 | CatalogItemType::Connection => {
3161 sql_bail!(
3162 "CONTINUAL TASK cannot use {} as an input",
3163 input.item_type()
3164 );
3165 }
3166 }
3167
3168 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3169 let ct_name = stmt.name;
3170 let placeholder_id = match &ct_name {
3171 ResolvedItemName::ContinualTask { id, name } => {
3172 let desc = match desc.as_ref().cloned() {
3173 Some(x) => x,
3174 None => {
3175 let desc = input.relation_desc().expect("item type checked above");
3180 desc.into_owned()
3181 }
3182 };
3183 qcx.ctes.insert(
3184 *id,
3185 CteDesc {
3186 name: name.item.clone(),
3187 desc,
3188 },
3189 );
3190 Some(*id)
3191 }
3192 _ => None,
3193 };
3194
3195 let mut exprs = Vec::new();
3196 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3197 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3198 let query::PlannedRootQuery {
3199 expr,
3200 desc: desc_query,
3201 finishing,
3202 scope: _,
3203 } = query::plan_ct_query(&mut qcx, query)?;
3204 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3207 &finishing,
3208 expr.arity()
3209 ));
3210 if expr.contains_parameters()? {
3211 if expr.contains_parameters()? {
3212 return Err(PlanError::ParameterNotAllowed(
3213 "continual tasks".to_string(),
3214 ));
3215 }
3216 }
3217 let expr = match desc.as_mut() {
3218 None => {
3219 desc = Some(desc_query);
3220 expr
3221 }
3222 Some(desc) => {
3223 if desc_query.arity() > desc.arity() {
3226 sql_bail!(
3227 "statement {}: INSERT has more expressions than target columns",
3228 idx
3229 );
3230 }
3231 if desc_query.arity() < desc.arity() {
3232 sql_bail!(
3233 "statement {}: INSERT has more target columns than expressions",
3234 idx
3235 );
3236 }
3237 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3240 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3241 let expr = expr.map_err(|e| {
3242 sql_err!(
3243 "statement {}: column {} is of type {} but expression is of type {}",
3244 idx,
3245 desc.get_name(e.column).quoted(),
3246 qcx.humanize_scalar_type(&e.target_type, false),
3247 qcx.humanize_scalar_type(&e.source_type, false),
3248 )
3249 })?;
3250
3251 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3254 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3255 if updated {
3256 let new_types = zip_types().map(|(ct, q)| {
3257 let mut ct = ct.clone();
3258 if q.nullable {
3259 ct.nullable = true;
3260 }
3261 ct
3262 });
3263 *desc = RelationDesc::from_names_and_types(
3264 desc.iter_names().cloned().zip_eq(new_types),
3265 );
3266 }
3267
3268 expr
3269 }
3270 };
3271 match stmt {
3272 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3273 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3274 }
3275 }
3276 let expr = exprs
3279 .into_iter()
3280 .reduce(|acc, expr| acc.union(expr))
3281 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3282 let dependencies = expr
3283 .depends_on()
3284 .into_iter()
3285 .map(|gid| scx.catalog.resolve_item_id(&gid))
3286 .collect();
3287
3288 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3289 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3290 if let Some(dup) = column_names.iter().duplicates().next() {
3291 sql_bail!("column {} specified more than once", dup.quoted());
3292 }
3293
3294 let name = match &ct_name {
3296 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3297 ResolvedItemName::ContinualTask { name, .. } => {
3298 let name = scx.allocate_qualified_name(name.clone())?;
3299 let full_name = scx.catalog.resolve_full_name(&name);
3300 let partial_name = PartialItemName::from(full_name.clone());
3301 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3304 return Err(PlanError::ItemAlreadyExists {
3305 name: full_name.to_string(),
3306 item_type: item.item_type(),
3307 });
3308 }
3309 name
3310 }
3311 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3312 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3313 };
3314
3315 let as_of = stmt.as_of.map(Timestamp::from);
3316 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3317 name,
3318 placeholder_id,
3319 desc,
3320 input_id: input.global_id(),
3321 with_snapshot: snapshot.unwrap_or(true),
3322 continual_task: MaterializedView {
3323 create_sql,
3324 expr,
3325 dependencies,
3326 column_names,
3327 replacement_target: None,
3328 cluster_id,
3329 non_null_assertions: Vec::new(),
3330 compaction_window: None,
3331 refresh_schedule: None,
3332 as_of,
3333 },
3334 }))
3335}
3336
3337fn continual_task_query<'a>(
3338 ct_name: &ResolvedItemName,
3339 stmt: &'a ast::ContinualTaskStmt<Aug>,
3340) -> Option<ast::Query<Aug>> {
3341 match stmt {
3342 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3343 table_name: _,
3344 columns,
3345 source,
3346 returning,
3347 }) => {
3348 if !columns.is_empty() || !returning.is_empty() {
3349 return None;
3350 }
3351 match source {
3352 ast::InsertSource::Query(query) => Some(query.clone()),
3353 ast::InsertSource::DefaultValues => None,
3354 }
3355 }
3356 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3357 table_name: _,
3358 alias,
3359 using,
3360 selection,
3361 }) => {
3362 if !using.is_empty() {
3363 return None;
3364 }
3365 let from = ast::TableWithJoins {
3367 relation: ast::TableFactor::Table {
3368 name: ct_name.clone(),
3369 alias: alias.clone(),
3370 },
3371 joins: Vec::new(),
3372 };
3373 let select = ast::Select {
3374 from: vec![from],
3375 selection: selection.clone(),
3376 distinct: None,
3377 projection: vec![ast::SelectItem::Wildcard],
3378 group_by: Vec::new(),
3379 having: None,
3380 qualify: None,
3381 options: Vec::new(),
3382 };
3383 let query = ast::Query {
3384 ctes: ast::CteBlock::Simple(Vec::new()),
3385 body: ast::SetExpr::Select(Box::new(select)),
3386 order_by: Vec::new(),
3387 limit: None,
3388 offset: None,
3389 };
3390 Some(query)
3392 }
3393 }
3394}
3395
3396generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3397
3398pub fn describe_create_sink(
3399 _: &StatementContext,
3400 _: CreateSinkStatement<Aug>,
3401) -> Result<StatementDesc, PlanError> {
3402 Ok(StatementDesc::new(None))
3403}
3404
3405generate_extracted_config!(
3406 CreateSinkOption,
3407 (Snapshot, bool),
3408 (PartitionStrategy, String),
3409 (Version, u64),
3410 (CommitInterval, Duration)
3411);
3412
3413pub fn plan_create_sink(
3414 scx: &StatementContext,
3415 stmt: CreateSinkStatement<Aug>,
3416) -> Result<Plan, PlanError> {
3417 let Some(name) = stmt.name.clone() else {
3419 return Err(PlanError::MissingName(CatalogItemType::Sink));
3420 };
3421 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3422 let full_name = scx.catalog.resolve_full_name(&name);
3423 let partial_name = PartialItemName::from(full_name.clone());
3424 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3425 return Err(PlanError::ItemAlreadyExists {
3426 name: full_name.to_string(),
3427 item_type: item.item_type(),
3428 });
3429 }
3430
3431 plan_sink(scx, stmt)
3432}
3433
3434fn plan_sink(
3439 scx: &StatementContext,
3440 mut stmt: CreateSinkStatement<Aug>,
3441) -> Result<Plan, PlanError> {
3442 let CreateSinkStatement {
3443 name,
3444 in_cluster: _,
3445 from,
3446 connection,
3447 format,
3448 envelope,
3449 mode,
3450 if_not_exists,
3451 with_options,
3452 } = stmt.clone();
3453
3454 let Some(name) = name else {
3455 return Err(PlanError::MissingName(CatalogItemType::Sink));
3456 };
3457 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3458
3459 let envelope = match (&connection, envelope, mode) {
3460 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3462 SinkEnvelope::Upsert
3463 }
3464 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3465 SinkEnvelope::Debezium
3466 }
3467 (CreateSinkConnection::Kafka { .. }, None, None) => {
3468 sql_bail!("ENVELOPE clause is required")
3469 }
3470 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3471 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3472 }
3473 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3475 SinkEnvelope::Upsert
3476 }
3477 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3478 sql_bail!("MODE clause is required")
3479 }
3480 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3481 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3482 }
3483 };
3484
3485 let from_name = &from;
3486 let from = scx.get_item_by_resolved_name(&from)?;
3487
3488 {
3489 use CatalogItemType::*;
3490 match from.item_type() {
3491 Table | Source | MaterializedView | ContinualTask => {
3492 if from.replacement_target().is_some() {
3493 let name = scx.catalog.minimal_qualification(from.name());
3494 return Err(PlanError::InvalidSinkFrom {
3495 name: name.to_string(),
3496 item_type: format!("replacement {}", from.item_type()),
3497 });
3498 }
3499 }
3500 Sink | View | Index | Type | Func | Secret | Connection => {
3501 let name = scx.catalog.minimal_qualification(from.name());
3502 return Err(PlanError::InvalidSinkFrom {
3503 name: name.to_string(),
3504 item_type: from.item_type().to_string(),
3505 });
3506 }
3507 }
3508 }
3509
3510 if from.id().is_system() {
3511 bail_unsupported!("creating a sink directly on a catalog object");
3512 }
3513
3514 let desc = from.relation_desc().expect("item type checked above");
3515 let key_indices = match &connection {
3516 CreateSinkConnection::Kafka { key: Some(key), .. }
3517 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3518 let key_columns = key
3519 .key_columns
3520 .clone()
3521 .into_iter()
3522 .map(normalize::column_name)
3523 .collect::<Vec<_>>();
3524 let mut uniq = BTreeSet::new();
3525 for col in key_columns.iter() {
3526 if !uniq.insert(col) {
3527 sql_bail!("duplicate column referenced in KEY: {}", col);
3528 }
3529 }
3530 let indices = key_columns
3531 .iter()
3532 .map(|col| -> anyhow::Result<usize> {
3533 let name_idx =
3534 desc.get_by_name(col)
3535 .map(|(idx, _type)| idx)
3536 .ok_or_else(|| {
3537 sql_err!("column referenced in KEY does not exist: {}", col)
3538 })?;
3539 if desc.get_unambiguous_name(name_idx).is_none() {
3540 sql_err!("column referenced in KEY is ambiguous: {}", col);
3541 }
3542 Ok(name_idx)
3543 })
3544 .collect::<Result<Vec<_>, _>>()?;
3545 let is_valid_key = desc
3546 .typ()
3547 .keys
3548 .iter()
3549 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3550
3551 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3552 if key.not_enforced {
3553 scx.catalog
3554 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3555 key: key_columns.clone(),
3556 name: name.item.clone(),
3557 })
3558 } else {
3559 return Err(PlanError::UpsertSinkWithInvalidKey {
3560 name: from_name.full_name_str(),
3561 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3562 valid_keys: desc
3563 .typ()
3564 .keys
3565 .iter()
3566 .map(|key| {
3567 key.iter()
3568 .map(|col| desc.get_name(*col).as_str().into())
3569 .collect()
3570 })
3571 .collect(),
3572 });
3573 }
3574 }
3575 Some(indices)
3576 }
3577 CreateSinkConnection::Kafka { key: None, .. }
3578 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3579 };
3580
3581 let headers_index = match &connection {
3582 CreateSinkConnection::Kafka {
3583 headers: Some(headers),
3584 ..
3585 } => {
3586 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3587
3588 match envelope {
3589 SinkEnvelope::Upsert => (),
3590 SinkEnvelope::Debezium => {
3591 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3592 }
3593 };
3594
3595 let headers = normalize::column_name(headers.clone());
3596 let (idx, ty) = desc
3597 .get_by_name(&headers)
3598 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3599
3600 if desc.get_unambiguous_name(idx).is_none() {
3601 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3602 }
3603
3604 match &ty.scalar_type {
3605 SqlScalarType::Map { value_type, .. }
3606 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3607 _ => sql_bail!(
3608 "HEADERS column must have type map[text => text] or map[text => bytea]"
3609 ),
3610 }
3611
3612 Some(idx)
3613 }
3614 _ => None,
3615 };
3616
3617 let relation_key_indices = desc.typ().keys.get(0).cloned();
3619
3620 let key_desc_and_indices = key_indices.map(|key_indices| {
3621 let cols = desc
3622 .iter()
3623 .map(|(name, ty)| (name.clone(), ty.clone()))
3624 .collect::<Vec<_>>();
3625 let (names, types): (Vec<_>, Vec<_>) =
3626 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3627 let typ = SqlRelationType::new(types);
3628 (RelationDesc::new(typ, names), key_indices)
3629 });
3630
3631 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3632 return Err(PlanError::UpsertSinkWithoutKey);
3633 }
3634
3635 let CreateSinkOptionExtracted {
3636 snapshot,
3637 version,
3638 partition_strategy: _,
3639 seen: _,
3640 commit_interval,
3641 } = with_options.try_into()?;
3642
3643 let connection_builder = match connection {
3644 CreateSinkConnection::Kafka {
3645 connection,
3646 options,
3647 ..
3648 } => kafka_sink_builder(
3649 scx,
3650 connection,
3651 options,
3652 format,
3653 relation_key_indices,
3654 key_desc_and_indices,
3655 headers_index,
3656 desc.into_owned(),
3657 envelope,
3658 from.id(),
3659 commit_interval,
3660 )?,
3661 CreateSinkConnection::Iceberg {
3662 connection,
3663 aws_connection,
3664 options,
3665 ..
3666 } => iceberg_sink_builder(
3667 scx,
3668 connection,
3669 aws_connection,
3670 options,
3671 relation_key_indices,
3672 key_desc_and_indices,
3673 commit_interval,
3674 )?,
3675 };
3676
3677 let with_snapshot = snapshot.unwrap_or(true);
3679 let version = version.unwrap_or(0);
3681
3682 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3686 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3687
3688 Ok(Plan::CreateSink(CreateSinkPlan {
3689 name,
3690 sink: Sink {
3691 create_sql,
3692 from: from.global_id(),
3693 connection: connection_builder,
3694 envelope,
3695 version,
3696 commit_interval,
3697 },
3698 with_snapshot,
3699 if_not_exists,
3700 in_cluster: in_cluster.id(),
3701 }))
3702}
3703
3704fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3705 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3706
3707 let existing_keys = desc
3708 .typ()
3709 .keys
3710 .iter()
3711 .map(|key_columns| {
3712 key_columns
3713 .iter()
3714 .map(|col| desc.get_name(*col).as_str())
3715 .join(", ")
3716 })
3717 .join(", ");
3718
3719 sql_err!(
3720 "Key constraint ({}) conflicts with existing key ({})",
3721 user_keys,
3722 existing_keys
3723 )
3724}
3725
3726#[derive(Debug, Default, PartialEq, Clone)]
3729pub struct CsrConfigOptionExtracted {
3730 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3731 pub(crate) avro_key_fullname: Option<String>,
3732 pub(crate) avro_value_fullname: Option<String>,
3733 pub(crate) null_defaults: bool,
3734 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3735 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3736 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3737 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3738}
3739
3740impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3741 type Error = crate::plan::PlanError;
3742 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3743 let mut extracted = CsrConfigOptionExtracted::default();
3744 let mut common_doc_comments = BTreeMap::new();
3745 for option in v {
3746 if !extracted.seen.insert(option.name.clone()) {
3747 return Err(PlanError::Unstructured({
3748 format!("{} specified more than once", option.name)
3749 }));
3750 }
3751 let option_name = option.name.clone();
3752 let option_name_str = option_name.to_ast_string_simple();
3753 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3754 option_name: option_name.to_ast_string_simple(),
3755 err: e.into(),
3756 };
3757 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3758 val.map(|s| match s {
3759 WithOptionValue::Value(Value::String(s)) => {
3760 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3761 }
3762 _ => Err("must be a string".to_string()),
3763 })
3764 .transpose()
3765 .map_err(PlanError::Unstructured)
3766 .map_err(better_error)
3767 };
3768 match option.name {
3769 CsrConfigOptionName::AvroKeyFullname => {
3770 extracted.avro_key_fullname =
3771 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3772 }
3773 CsrConfigOptionName::AvroValueFullname => {
3774 extracted.avro_value_fullname =
3775 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3776 }
3777 CsrConfigOptionName::NullDefaults => {
3778 extracted.null_defaults =
3779 <bool>::try_from_value(option.value).map_err(better_error)?;
3780 }
3781 CsrConfigOptionName::AvroDocOn(doc_on) => {
3782 let value = String::try_from_value(option.value.ok_or_else(|| {
3783 PlanError::InvalidOptionValue {
3784 option_name: option_name_str,
3785 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3786 }
3787 })?)
3788 .map_err(better_error)?;
3789 let key = match doc_on.identifier {
3790 DocOnIdentifier::Column(ast::ColumnName {
3791 relation: ResolvedItemName::Item { id, .. },
3792 column: ResolvedColumnReference::Column { name, index: _ },
3793 }) => DocTarget::Field {
3794 object_id: id,
3795 column_name: name,
3796 },
3797 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3798 DocTarget::Type(id)
3799 }
3800 _ => unreachable!(),
3801 };
3802
3803 match doc_on.for_schema {
3804 DocOnSchema::KeyOnly => {
3805 extracted.key_doc_options.insert(key, value);
3806 }
3807 DocOnSchema::ValueOnly => {
3808 extracted.value_doc_options.insert(key, value);
3809 }
3810 DocOnSchema::All => {
3811 common_doc_comments.insert(key, value);
3812 }
3813 }
3814 }
3815 CsrConfigOptionName::KeyCompatibilityLevel => {
3816 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3817 }
3818 CsrConfigOptionName::ValueCompatibilityLevel => {
3819 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3820 }
3821 }
3822 }
3823
3824 for (key, value) in common_doc_comments {
3825 if !extracted.key_doc_options.contains_key(&key) {
3826 extracted.key_doc_options.insert(key.clone(), value.clone());
3827 }
3828 if !extracted.value_doc_options.contains_key(&key) {
3829 extracted.value_doc_options.insert(key, value);
3830 }
3831 }
3832 Ok(extracted)
3833 }
3834}
3835
3836fn iceberg_sink_builder(
3837 scx: &StatementContext,
3838 catalog_connection: ResolvedItemName,
3839 aws_connection: ResolvedItemName,
3840 options: Vec<IcebergSinkConfigOption<Aug>>,
3841 relation_key_indices: Option<Vec<usize>>,
3842 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3843 commit_interval: Option<Duration>,
3844) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3845 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3846 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3847 let catalog_connection_id = catalog_connection_item.id();
3848 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3849 let aws_connection_id = aws_connection_item.id();
3850 if !matches!(
3851 catalog_connection_item.connection()?,
3852 Connection::IcebergCatalog(_)
3853 ) {
3854 sql_bail!(
3855 "{} is not an iceberg catalog connection",
3856 scx.catalog
3857 .resolve_full_name(catalog_connection_item.name())
3858 .to_string()
3859 .quoted()
3860 );
3861 };
3862
3863 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3864 sql_bail!(
3865 "{} is not an AWS connection",
3866 scx.catalog
3867 .resolve_full_name(aws_connection_item.name())
3868 .to_string()
3869 .quoted()
3870 );
3871 }
3872
3873 let IcebergSinkConfigOptionExtracted {
3874 table,
3875 namespace,
3876 seen: _,
3877 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3878
3879 let Some(table) = table else {
3880 sql_bail!("Iceberg sink must specify TABLE");
3881 };
3882 let Some(namespace) = namespace else {
3883 sql_bail!("Iceberg sink must specify NAMESPACE");
3884 };
3885 if commit_interval.is_none() {
3886 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3887 }
3888
3889 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3890 catalog_connection_id,
3891 catalog_connection: catalog_connection_id,
3892 aws_connection_id,
3893 aws_connection: aws_connection_id,
3894 table,
3895 namespace,
3896 relation_key_indices,
3897 key_desc_and_indices,
3898 }))
3899}
3900
3901fn kafka_sink_builder(
3902 scx: &StatementContext,
3903 connection: ResolvedItemName,
3904 options: Vec<KafkaSinkConfigOption<Aug>>,
3905 format: Option<FormatSpecifier<Aug>>,
3906 relation_key_indices: Option<Vec<usize>>,
3907 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3908 headers_index: Option<usize>,
3909 value_desc: RelationDesc,
3910 envelope: SinkEnvelope,
3911 sink_from: CatalogItemId,
3912 commit_interval: Option<Duration>,
3913) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3914 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3916 let connection_id = connection_item.id();
3917 match connection_item.connection()? {
3918 Connection::Kafka(_) => (),
3919 _ => sql_bail!(
3920 "{} is not a kafka connection",
3921 scx.catalog.resolve_full_name(connection_item.name())
3922 ),
3923 };
3924
3925 if commit_interval.is_some() {
3926 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3927 }
3928
3929 let KafkaSinkConfigOptionExtracted {
3930 topic,
3931 compression_type,
3932 partition_by,
3933 progress_group_id_prefix,
3934 transactional_id_prefix,
3935 legacy_ids,
3936 topic_config,
3937 topic_metadata_refresh_interval,
3938 topic_partition_count,
3939 topic_replication_factor,
3940 seen: _,
3941 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3942
3943 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3944 (Some(_), Some(true)) => {
3945 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3946 }
3947 (None, Some(true)) => KafkaIdStyle::Legacy,
3948 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3949 };
3950
3951 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3952 (Some(_), Some(true)) => {
3953 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3954 }
3955 (None, Some(true)) => KafkaIdStyle::Legacy,
3956 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3957 };
3958
3959 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3960
3961 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3962 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3965 }
3966
3967 let assert_positive = |val: Option<i32>, name: &str| {
3968 if let Some(val) = val {
3969 if val <= 0 {
3970 sql_bail!("{} must be a positive integer", name);
3971 }
3972 }
3973 val.map(NonNeg::try_from)
3974 .transpose()
3975 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3976 };
3977 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3978 let topic_replication_factor =
3979 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3980
3981 let gen_avro_schema_options = |conn| {
3984 let CsrConnectionAvro {
3985 connection:
3986 CsrConnection {
3987 connection,
3988 options,
3989 },
3990 seed,
3991 key_strategy,
3992 value_strategy,
3993 } = conn;
3994 if seed.is_some() {
3995 sql_bail!("SEED option does not make sense with sinks");
3996 }
3997 if key_strategy.is_some() {
3998 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3999 }
4000 if value_strategy.is_some() {
4001 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
4002 }
4003
4004 let item = scx.get_item_by_resolved_name(&connection)?;
4005 let csr_connection = match item.connection()? {
4006 Connection::Csr(_) => item.id(),
4007 _ => {
4008 sql_bail!(
4009 "{} is not a schema registry connection",
4010 scx.catalog
4011 .resolve_full_name(item.name())
4012 .to_string()
4013 .quoted()
4014 )
4015 }
4016 };
4017 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
4018
4019 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
4020 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
4021 }
4022
4023 if key_desc_and_indices.is_some()
4024 && (extracted_options.avro_key_fullname.is_some()
4025 ^ extracted_options.avro_value_fullname.is_some())
4026 {
4027 sql_bail!(
4028 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
4029 );
4030 }
4031
4032 Ok((csr_connection, extracted_options))
4033 };
4034
4035 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
4036 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
4037 Format::Bytes if desc.arity() == 1 => {
4038 let col_type = &desc.typ().column_types[0].scalar_type;
4039 if !mz_pgrepr::Value::can_encode_binary(col_type) {
4040 bail_unsupported!(format!(
4041 "BYTES format with non-encodable type: {:?}",
4042 col_type
4043 ));
4044 }
4045
4046 Ok(KafkaSinkFormatType::Bytes)
4047 }
4048 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4049 Format::Bytes | Format::Text => {
4050 bail_unsupported!("BYTES or TEXT format with multiple columns")
4051 }
4052 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4053 Format::Avro(AvroSchema::Csr { csr_connection }) => {
4054 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4055 let schema = if is_key {
4056 AvroSchemaGenerator::new(
4057 desc.clone(),
4058 false,
4059 options.key_doc_options,
4060 options.avro_key_fullname.as_deref().unwrap_or("row"),
4061 options.null_defaults,
4062 Some(sink_from),
4063 false,
4064 )?
4065 .schema()
4066 .to_string()
4067 } else {
4068 AvroSchemaGenerator::new(
4069 desc.clone(),
4070 matches!(envelope, SinkEnvelope::Debezium),
4071 options.value_doc_options,
4072 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4073 options.null_defaults,
4074 Some(sink_from),
4075 true,
4076 )?
4077 .schema()
4078 .to_string()
4079 };
4080 Ok(KafkaSinkFormatType::Avro {
4081 schema,
4082 compatibility_level: if is_key {
4083 options.key_compatibility_level
4084 } else {
4085 options.value_compatibility_level
4086 },
4087 csr_connection,
4088 })
4089 }
4090 format => bail_unsupported!(format!("sink format {:?}", format)),
4091 };
4092
4093 let partition_by = match &partition_by {
4094 Some(partition_by) => {
4095 let mut scope = Scope::from_source(None, value_desc.iter_names());
4096
4097 match envelope {
4098 SinkEnvelope::Upsert => (),
4099 SinkEnvelope::Debezium => {
4100 let key_indices: HashSet<_> = key_desc_and_indices
4101 .as_ref()
4102 .map(|(_desc, indices)| indices.as_slice())
4103 .unwrap_or_default()
4104 .into_iter()
4105 .collect();
4106 for (i, item) in scope.items.iter_mut().enumerate() {
4107 if !key_indices.contains(&i) {
4108 item.error_if_referenced = Some(|_table, column| {
4109 PlanError::InvalidPartitionByEnvelopeDebezium {
4110 column_name: column.to_string(),
4111 }
4112 });
4113 }
4114 }
4115 }
4116 };
4117
4118 let ecx = &ExprContext {
4119 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4120 name: "PARTITION BY",
4121 scope: &scope,
4122 relation_type: value_desc.typ(),
4123 allow_aggregates: false,
4124 allow_subqueries: false,
4125 allow_parameters: false,
4126 allow_windows: false,
4127 };
4128 let expr = plan_expr(ecx, partition_by)?.cast_to(
4129 ecx,
4130 CastContext::Assignment,
4131 &SqlScalarType::UInt64,
4132 )?;
4133 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4134
4135 Some(expr)
4136 }
4137 _ => None,
4138 };
4139
4140 let format = match format {
4142 Some(FormatSpecifier::KeyValue { key, value }) => {
4143 let key_format = match key_desc_and_indices.as_ref() {
4144 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4145 None => None,
4146 };
4147 KafkaSinkFormat {
4148 value_format: map_format(value, &value_desc, false)?,
4149 key_format,
4150 }
4151 }
4152 Some(FormatSpecifier::Bare(format)) => {
4153 let key_format = match key_desc_and_indices.as_ref() {
4154 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4155 None => None,
4156 };
4157 KafkaSinkFormat {
4158 value_format: map_format(format, &value_desc, false)?,
4159 key_format,
4160 }
4161 }
4162 None => bail_unsupported!("sink without format"),
4163 };
4164
4165 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4166 connection_id,
4167 connection: connection_id,
4168 format,
4169 topic: topic_name,
4170 relation_key_indices,
4171 key_desc_and_indices,
4172 headers_index,
4173 value_desc,
4174 partition_by,
4175 compression_type,
4176 progress_group_id,
4177 transactional_id,
4178 topic_options: KafkaTopicOptions {
4179 partition_count: topic_partition_count,
4180 replication_factor: topic_replication_factor,
4181 topic_config: topic_config.unwrap_or_default(),
4182 },
4183 topic_metadata_refresh_interval,
4184 }))
4185}
4186
4187pub fn describe_create_index(
4188 _: &StatementContext,
4189 _: CreateIndexStatement<Aug>,
4190) -> Result<StatementDesc, PlanError> {
4191 Ok(StatementDesc::new(None))
4192}
4193
4194pub fn plan_create_index(
4195 scx: &StatementContext,
4196 mut stmt: CreateIndexStatement<Aug>,
4197) -> Result<Plan, PlanError> {
4198 let CreateIndexStatement {
4199 name,
4200 on_name,
4201 in_cluster,
4202 key_parts,
4203 with_options,
4204 if_not_exists,
4205 } = &mut stmt;
4206 let on = scx.get_item_by_resolved_name(on_name)?;
4207
4208 {
4209 use CatalogItemType::*;
4210 match on.item_type() {
4211 Table | Source | View | MaterializedView | ContinualTask => {
4212 if on.replacement_target().is_some() {
4213 sql_bail!(
4214 "index cannot be created on {} because it is a replacement {}",
4215 on_name.full_name_str(),
4216 on.item_type(),
4217 );
4218 }
4219 }
4220 Sink | Index | Type | Func | Secret | Connection => {
4221 sql_bail!(
4222 "index cannot be created on {} because it is a {}",
4223 on_name.full_name_str(),
4224 on.item_type(),
4225 );
4226 }
4227 }
4228 }
4229
4230 let on_desc = on.relation_desc().expect("item type checked above");
4231
4232 let filled_key_parts = match key_parts {
4233 Some(kp) => kp.to_vec(),
4234 None => {
4235 let key = on_desc.typ().default_key();
4237 key.iter()
4238 .map(|i| match on_desc.get_unambiguous_name(*i) {
4239 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4240 _ => Expr::Value(Value::Number((i + 1).to_string())),
4241 })
4242 .collect()
4243 }
4244 };
4245 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4246
4247 let index_name = if let Some(name) = name {
4248 QualifiedItemName {
4249 qualifiers: on.name().qualifiers.clone(),
4250 item: normalize::ident(name.clone()),
4251 }
4252 } else {
4253 let mut idx_name = QualifiedItemName {
4254 qualifiers: on.name().qualifiers.clone(),
4255 item: on.name().item.clone(),
4256 };
4257 if key_parts.is_none() {
4258 idx_name.item += "_primary_idx";
4260 } else {
4261 let index_name_col_suffix = keys
4264 .iter()
4265 .map(|k| match k {
4266 mz_expr::MirScalarExpr::Column(i, name) => {
4267 match (on_desc.get_unambiguous_name(*i), &name.0) {
4268 (Some(col_name), _) => col_name.to_string(),
4269 (None, Some(name)) => name.to_string(),
4270 (None, None) => format!("{}", i + 1),
4271 }
4272 }
4273 _ => "expr".to_string(),
4274 })
4275 .join("_");
4276 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4277 .expect("write on strings cannot fail");
4278 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4279 }
4280
4281 if !*if_not_exists {
4282 scx.catalog.find_available_name(idx_name)
4283 } else {
4284 idx_name
4285 }
4286 };
4287
4288 let full_name = scx.catalog.resolve_full_name(&index_name);
4290 let partial_name = PartialItemName::from(full_name.clone());
4291 if let (Ok(item), false, false) = (
4299 scx.catalog.resolve_item_or_type(&partial_name),
4300 *if_not_exists,
4301 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4302 ) {
4303 return Err(PlanError::ItemAlreadyExists {
4304 name: full_name.to_string(),
4305 item_type: item.item_type(),
4306 });
4307 }
4308
4309 let options = plan_index_options(scx, with_options.clone())?;
4310 let cluster_id = match in_cluster {
4311 None => scx.resolve_cluster(None)?.id(),
4312 Some(in_cluster) => in_cluster.id,
4313 };
4314
4315 *in_cluster = Some(ResolvedClusterName {
4316 id: cluster_id,
4317 print_name: None,
4318 });
4319
4320 *name = Some(Ident::new(index_name.item.clone())?);
4322 *key_parts = Some(filled_key_parts);
4323 let if_not_exists = *if_not_exists;
4324
4325 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4326 let compaction_window = options.iter().find_map(|o| {
4327 #[allow(irrefutable_let_patterns)]
4328 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4329 Some(lcw.clone())
4330 } else {
4331 None
4332 }
4333 });
4334
4335 Ok(Plan::CreateIndex(CreateIndexPlan {
4336 name: index_name,
4337 index: Index {
4338 create_sql,
4339 on: on.global_id(),
4340 keys,
4341 cluster_id,
4342 compaction_window,
4343 },
4344 if_not_exists,
4345 }))
4346}
4347
4348pub fn describe_create_type(
4349 _: &StatementContext,
4350 _: CreateTypeStatement<Aug>,
4351) -> Result<StatementDesc, PlanError> {
4352 Ok(StatementDesc::new(None))
4353}
4354
4355pub fn plan_create_type(
4356 scx: &StatementContext,
4357 stmt: CreateTypeStatement<Aug>,
4358) -> Result<Plan, PlanError> {
4359 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4360 let CreateTypeStatement { name, as_type, .. } = stmt;
4361
4362 fn validate_data_type(
4363 scx: &StatementContext,
4364 data_type: ResolvedDataType,
4365 as_type: &str,
4366 key: &str,
4367 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4368 let (id, modifiers) = match data_type {
4369 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4370 _ => sql_bail!(
4371 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4372 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4373 as_type,
4374 key,
4375 data_type.human_readable_name(),
4376 ),
4377 };
4378
4379 let item = scx.catalog.get_item(&id);
4380 match item.type_details() {
4381 None => sql_bail!(
4382 "{} must be of class type, but received {} which is of class {}",
4383 key,
4384 scx.catalog.resolve_full_name(item.name()),
4385 item.item_type()
4386 ),
4387 Some(CatalogTypeDetails {
4388 typ: CatalogType::Char,
4389 ..
4390 }) => {
4391 bail_unsupported!("embedding char type in a list or map")
4392 }
4393 _ => {
4394 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4396
4397 Ok((id, modifiers))
4398 }
4399 }
4400 }
4401
4402 let inner = match as_type {
4403 CreateTypeAs::List { options } => {
4404 let CreateTypeListOptionExtracted {
4405 element_type,
4406 seen: _,
4407 } = CreateTypeListOptionExtracted::try_from(options)?;
4408 let element_type =
4409 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4410 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4411 CatalogType::List {
4412 element_reference: id,
4413 element_modifiers: modifiers,
4414 }
4415 }
4416 CreateTypeAs::Map { options } => {
4417 let CreateTypeMapOptionExtracted {
4418 key_type,
4419 value_type,
4420 seen: _,
4421 } = CreateTypeMapOptionExtracted::try_from(options)?;
4422 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4423 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4424 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4425 let (value_id, value_modifiers) =
4426 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4427 CatalogType::Map {
4428 key_reference: key_id,
4429 key_modifiers,
4430 value_reference: value_id,
4431 value_modifiers,
4432 }
4433 }
4434 CreateTypeAs::Record { column_defs } => {
4435 let mut fields = vec![];
4436 for column_def in column_defs {
4437 let data_type = column_def.data_type;
4438 let key = ident(column_def.name.clone());
4439 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4440 fields.push(CatalogRecordField {
4441 name: ColumnName::from(key.clone()),
4442 type_reference: id,
4443 type_modifiers: modifiers,
4444 });
4445 }
4446 CatalogType::Record { fields }
4447 }
4448 };
4449
4450 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4451
4452 let full_name = scx.catalog.resolve_full_name(&name);
4454 let partial_name = PartialItemName::from(full_name.clone());
4455 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4458 if item.item_type().conflicts_with_type() {
4459 return Err(PlanError::ItemAlreadyExists {
4460 name: full_name.to_string(),
4461 item_type: item.item_type(),
4462 });
4463 }
4464 }
4465
4466 Ok(Plan::CreateType(CreateTypePlan {
4467 name,
4468 typ: Type { create_sql, inner },
4469 }))
4470}
4471
4472generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4473
4474generate_extracted_config!(
4475 CreateTypeMapOption,
4476 (KeyType, ResolvedDataType),
4477 (ValueType, ResolvedDataType)
4478);
4479
4480#[derive(Debug)]
4481pub enum PlannedAlterRoleOption {
4482 Attributes(PlannedRoleAttributes),
4483 Variable(PlannedRoleVariable),
4484}
4485
4486#[derive(Debug, Clone)]
4487pub struct PlannedRoleAttributes {
4488 pub inherit: Option<bool>,
4489 pub password: Option<Password>,
4490 pub scram_iterations: Option<NonZeroU32>,
4491 pub nopassword: Option<bool>,
4495 pub superuser: Option<bool>,
4496 pub login: Option<bool>,
4497}
4498
4499fn plan_role_attributes(
4500 options: Vec<RoleAttribute>,
4501 scx: &StatementContext,
4502) -> Result<PlannedRoleAttributes, PlanError> {
4503 let mut planned_attributes = PlannedRoleAttributes {
4504 inherit: None,
4505 password: None,
4506 scram_iterations: None,
4507 superuser: None,
4508 login: None,
4509 nopassword: None,
4510 };
4511
4512 for option in options {
4513 match option {
4514 RoleAttribute::Inherit | RoleAttribute::NoInherit
4515 if planned_attributes.inherit.is_some() =>
4516 {
4517 sql_bail!("conflicting or redundant options");
4518 }
4519 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4520 bail_never_supported!(
4521 "CREATECLUSTER attribute",
4522 "sql/create-role/#details",
4523 "Use system privileges instead."
4524 );
4525 }
4526 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4527 bail_never_supported!(
4528 "CREATEDB attribute",
4529 "sql/create-role/#details",
4530 "Use system privileges instead."
4531 );
4532 }
4533 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4534 bail_never_supported!(
4535 "CREATEROLE attribute",
4536 "sql/create-role/#details",
4537 "Use system privileges instead."
4538 );
4539 }
4540 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4541 sql_bail!("conflicting or redundant options");
4542 }
4543
4544 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4545 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4546 RoleAttribute::Password(password) => {
4547 if let Some(password) = password {
4548 planned_attributes.password = Some(password.into());
4549 planned_attributes.scram_iterations =
4550 Some(scx.catalog.system_vars().scram_iterations())
4551 } else {
4552 planned_attributes.nopassword = Some(true);
4553 }
4554 }
4555 RoleAttribute::SuperUser => {
4556 if planned_attributes.superuser == Some(false) {
4557 sql_bail!("conflicting or redundant options");
4558 }
4559 planned_attributes.superuser = Some(true);
4560 }
4561 RoleAttribute::NoSuperUser => {
4562 if planned_attributes.superuser == Some(true) {
4563 sql_bail!("conflicting or redundant options");
4564 }
4565 planned_attributes.superuser = Some(false);
4566 }
4567 RoleAttribute::Login => {
4568 if planned_attributes.login == Some(false) {
4569 sql_bail!("conflicting or redundant options");
4570 }
4571 planned_attributes.login = Some(true);
4572 }
4573 RoleAttribute::NoLogin => {
4574 if planned_attributes.login == Some(true) {
4575 sql_bail!("conflicting or redundant options");
4576 }
4577 planned_attributes.login = Some(false);
4578 }
4579 }
4580 }
4581 if planned_attributes.inherit == Some(false) {
4582 bail_unsupported!("non inherit roles");
4583 }
4584
4585 Ok(planned_attributes)
4586}
4587
4588#[derive(Debug)]
4589pub enum PlannedRoleVariable {
4590 Set { name: String, value: VariableValue },
4591 Reset { name: String },
4592}
4593
4594impl PlannedRoleVariable {
4595 pub fn name(&self) -> &str {
4596 match self {
4597 PlannedRoleVariable::Set { name, .. } => name,
4598 PlannedRoleVariable::Reset { name } => name,
4599 }
4600 }
4601}
4602
4603fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4604 let plan = match variable {
4605 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4606 name: name.to_string(),
4607 value: scl::plan_set_variable_to(value)?,
4608 },
4609 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4610 name: name.to_string(),
4611 },
4612 };
4613 Ok(plan)
4614}
4615
4616pub fn describe_create_role(
4617 _: &StatementContext,
4618 _: CreateRoleStatement,
4619) -> Result<StatementDesc, PlanError> {
4620 Ok(StatementDesc::new(None))
4621}
4622
4623pub fn plan_create_role(
4624 scx: &StatementContext,
4625 CreateRoleStatement { name, options }: CreateRoleStatement,
4626) -> Result<Plan, PlanError> {
4627 let attributes = plan_role_attributes(options, scx)?;
4628 Ok(Plan::CreateRole(CreateRolePlan {
4629 name: normalize::ident(name),
4630 attributes: attributes.into(),
4631 }))
4632}
4633
4634pub fn plan_create_network_policy(
4635 ctx: &StatementContext,
4636 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4637) -> Result<Plan, PlanError> {
4638 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4639 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4640
4641 let Some(rule_defs) = policy_options.rules else {
4642 sql_bail!("RULES must be specified when creating network policies.");
4643 };
4644
4645 let mut rules = vec![];
4646 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4647 let NetworkPolicyRuleOptionExtracted {
4648 seen: _,
4649 direction,
4650 action,
4651 address,
4652 } = options.try_into()?;
4653 let (direction, action, address) = match (direction, action, address) {
4654 (Some(direction), Some(action), Some(address)) => (
4655 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4656 NetworkPolicyRuleAction::try_from(action.as_str())?,
4657 PolicyAddress::try_from(address.as_str())?,
4658 ),
4659 (_, _, _) => {
4660 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4661 }
4662 };
4663 rules.push(NetworkPolicyRule {
4664 name: normalize::ident(name),
4665 direction,
4666 action,
4667 address,
4668 });
4669 }
4670
4671 if rules.len()
4672 > ctx
4673 .catalog
4674 .system_vars()
4675 .max_rules_per_network_policy()
4676 .try_into()?
4677 {
4678 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4679 }
4680
4681 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4682 name: normalize::ident(name),
4683 rules,
4684 }))
4685}
4686
4687pub fn plan_alter_network_policy(
4688 ctx: &StatementContext,
4689 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4690) -> Result<Plan, PlanError> {
4691 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4692
4693 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4694 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4695
4696 let Some(rule_defs) = policy_options.rules else {
4697 sql_bail!("RULES must be specified when creating network policies.");
4698 };
4699
4700 let mut rules = vec![];
4701 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4702 let NetworkPolicyRuleOptionExtracted {
4703 seen: _,
4704 direction,
4705 action,
4706 address,
4707 } = options.try_into()?;
4708
4709 let (direction, action, address) = match (direction, action, address) {
4710 (Some(direction), Some(action), Some(address)) => (
4711 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4712 NetworkPolicyRuleAction::try_from(action.as_str())?,
4713 PolicyAddress::try_from(address.as_str())?,
4714 ),
4715 (_, _, _) => {
4716 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4717 }
4718 };
4719 rules.push(NetworkPolicyRule {
4720 name: normalize::ident(name),
4721 direction,
4722 action,
4723 address,
4724 });
4725 }
4726 if rules.len()
4727 > ctx
4728 .catalog
4729 .system_vars()
4730 .max_rules_per_network_policy()
4731 .try_into()?
4732 {
4733 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4734 }
4735
4736 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4737 id: policy.id(),
4738 name: normalize::ident(name),
4739 rules,
4740 }))
4741}
4742
4743pub fn describe_create_cluster(
4744 _: &StatementContext,
4745 _: CreateClusterStatement<Aug>,
4746) -> Result<StatementDesc, PlanError> {
4747 Ok(StatementDesc::new(None))
4748}
4749
4750generate_extracted_config!(
4756 ClusterOption,
4757 (AvailabilityZones, Vec<String>),
4758 (Disk, bool),
4759 (IntrospectionDebugging, bool),
4760 (IntrospectionInterval, OptionalDuration),
4761 (Managed, bool),
4762 (Replicas, Vec<ReplicaDefinition<Aug>>),
4763 (ReplicationFactor, u32),
4764 (Size, String),
4765 (Schedule, ClusterScheduleOptionValue),
4766 (WorkloadClass, OptionalString)
4767);
4768
4769generate_extracted_config!(
4770 NetworkPolicyOption,
4771 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4772);
4773
4774generate_extracted_config!(
4775 NetworkPolicyRuleOption,
4776 (Direction, String),
4777 (Action, String),
4778 (Address, String)
4779);
4780
4781generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4782
4783generate_extracted_config!(
4784 ClusterAlterUntilReadyOption,
4785 (Timeout, Duration),
4786 (OnTimeout, String)
4787);
4788
4789generate_extracted_config!(
4790 ClusterFeature,
4791 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4792 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4793 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4794 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4795 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4796 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4797 (
4798 EnableProjectionPushdownAfterRelationCse,
4799 Option<bool>,
4800 Default(None)
4801 )
4802);
4803
4804pub fn plan_create_cluster(
4808 scx: &StatementContext,
4809 stmt: CreateClusterStatement<Aug>,
4810) -> Result<Plan, PlanError> {
4811 let plan = plan_create_cluster_inner(scx, stmt)?;
4812
4813 if let CreateClusterVariant::Managed(_) = &plan.variant {
4815 let stmt = unplan_create_cluster(scx, plan.clone())
4816 .map_err(|e| PlanError::Replan(e.to_string()))?;
4817 let create_sql = stmt.to_ast_string_stable();
4818 let stmt = parse::parse(&create_sql)
4819 .map_err(|e| PlanError::Replan(e.to_string()))?
4820 .into_element()
4821 .ast;
4822 let (stmt, _resolved_ids) =
4823 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4824 let stmt = match stmt {
4825 Statement::CreateCluster(stmt) => stmt,
4826 stmt => {
4827 return Err(PlanError::Replan(format!(
4828 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4829 )));
4830 }
4831 };
4832 let replan =
4833 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4834 if plan != replan {
4835 return Err(PlanError::Replan(format!(
4836 "replan does not match: plan={plan:?}, replan={replan:?}"
4837 )));
4838 }
4839 }
4840
4841 Ok(Plan::CreateCluster(plan))
4842}
4843
4844pub fn plan_create_cluster_inner(
4845 scx: &StatementContext,
4846 CreateClusterStatement {
4847 name,
4848 options,
4849 features,
4850 }: CreateClusterStatement<Aug>,
4851) -> Result<CreateClusterPlan, PlanError> {
4852 let ClusterOptionExtracted {
4853 availability_zones,
4854 introspection_debugging,
4855 introspection_interval,
4856 managed,
4857 replicas,
4858 replication_factor,
4859 seen: _,
4860 size,
4861 disk,
4862 schedule,
4863 workload_class,
4864 }: ClusterOptionExtracted = options.try_into()?;
4865
4866 let managed = managed.unwrap_or_else(|| replicas.is_none());
4867
4868 if !scx.catalog.active_role_id().is_system() {
4869 if !features.is_empty() {
4870 sql_bail!("FEATURES not supported for non-system users");
4871 }
4872 if workload_class.is_some() {
4873 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4874 }
4875 }
4876
4877 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4878 let workload_class = workload_class.and_then(|v| v.0);
4879
4880 if managed {
4881 if replicas.is_some() {
4882 sql_bail!("REPLICAS not supported for managed clusters");
4883 }
4884 let Some(size) = size else {
4885 sql_bail!("SIZE must be specified for managed clusters");
4886 };
4887
4888 if disk.is_some() {
4889 if scx.catalog.is_cluster_size_cc(&size) {
4893 sql_bail!(
4894 "DISK option not supported for modern cluster sizes because disk is always enabled"
4895 );
4896 }
4897
4898 scx.catalog
4899 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4900 }
4901
4902 let compute = plan_compute_replica_config(
4903 introspection_interval,
4904 introspection_debugging.unwrap_or(false),
4905 )?;
4906
4907 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4908 replication_factor.unwrap_or_else(|| {
4909 scx.catalog
4910 .system_vars()
4911 .default_cluster_replication_factor()
4912 })
4913 } else {
4914 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4915 if replication_factor.is_some() {
4916 sql_bail!(
4917 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4918 );
4919 }
4920 0
4924 };
4925 let availability_zones = availability_zones.unwrap_or_default();
4926
4927 if !availability_zones.is_empty() {
4928 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4929 }
4930
4931 let ClusterFeatureExtracted {
4933 reoptimize_imported_views,
4934 enable_eager_delta_joins,
4935 enable_new_outer_join_lowering,
4936 enable_variadic_left_join_lowering,
4937 enable_letrec_fixpoint_analysis,
4938 enable_join_prioritize_arranged,
4939 enable_projection_pushdown_after_relation_cse,
4940 seen: _,
4941 } = ClusterFeatureExtracted::try_from(features)?;
4942 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4943 reoptimize_imported_views,
4944 enable_eager_delta_joins,
4945 enable_new_outer_join_lowering,
4946 enable_variadic_left_join_lowering,
4947 enable_letrec_fixpoint_analysis,
4948 enable_join_prioritize_arranged,
4949 enable_projection_pushdown_after_relation_cse,
4950 ..Default::default()
4951 };
4952
4953 let schedule = plan_cluster_schedule(schedule)?;
4954
4955 Ok(CreateClusterPlan {
4956 name: normalize::ident(name),
4957 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4958 replication_factor,
4959 size,
4960 availability_zones,
4961 compute,
4962 optimizer_feature_overrides,
4963 schedule,
4964 }),
4965 workload_class,
4966 })
4967 } else {
4968 let Some(replica_defs) = replicas else {
4969 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4970 };
4971 if availability_zones.is_some() {
4972 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4973 }
4974 if replication_factor.is_some() {
4975 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4976 }
4977 if introspection_debugging.is_some() {
4978 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4979 }
4980 if introspection_interval.is_some() {
4981 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4982 }
4983 if size.is_some() {
4984 sql_bail!("SIZE not supported for unmanaged clusters");
4985 }
4986 if disk.is_some() {
4987 sql_bail!("DISK not supported for unmanaged clusters");
4988 }
4989 if !features.is_empty() {
4990 sql_bail!("FEATURES not supported for unmanaged clusters");
4991 }
4992 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4993 sql_bail!(
4994 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4995 );
4996 }
4997
4998 let mut replicas = vec![];
4999 for ReplicaDefinition { name, options } in replica_defs {
5000 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
5001 }
5002
5003 Ok(CreateClusterPlan {
5004 name: normalize::ident(name),
5005 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
5006 workload_class,
5007 })
5008 }
5009}
5010
5011pub fn unplan_create_cluster(
5015 scx: &StatementContext,
5016 CreateClusterPlan {
5017 name,
5018 variant,
5019 workload_class,
5020 }: CreateClusterPlan,
5021) -> Result<CreateClusterStatement<Aug>, PlanError> {
5022 match variant {
5023 CreateClusterVariant::Managed(CreateClusterManagedPlan {
5024 replication_factor,
5025 size,
5026 availability_zones,
5027 compute,
5028 optimizer_feature_overrides,
5029 schedule,
5030 }) => {
5031 let schedule = unplan_cluster_schedule(schedule);
5032 let OptimizerFeatureOverrides {
5033 enable_guard_subquery_tablefunc: _,
5034 enable_consolidate_after_union_negate: _,
5035 enable_reduce_mfp_fusion: _,
5036 enable_cardinality_estimates: _,
5037 persist_fast_path_limit: _,
5038 reoptimize_imported_views,
5039 enable_eager_delta_joins,
5040 enable_new_outer_join_lowering,
5041 enable_variadic_left_join_lowering,
5042 enable_letrec_fixpoint_analysis,
5043 enable_join_prioritize_arranged,
5044 enable_projection_pushdown_after_relation_cse,
5045 enable_less_reduce_in_eqprop: _,
5046 enable_dequadratic_eqprop_map: _,
5047 enable_eq_classes_withholding_errors: _,
5048 enable_fast_path_plan_insights: _,
5049 enable_cast_elimination: _,
5050 } = optimizer_feature_overrides;
5051 let features_extracted = ClusterFeatureExtracted {
5053 seen: Default::default(),
5055 reoptimize_imported_views,
5056 enable_eager_delta_joins,
5057 enable_new_outer_join_lowering,
5058 enable_variadic_left_join_lowering,
5059 enable_letrec_fixpoint_analysis,
5060 enable_join_prioritize_arranged,
5061 enable_projection_pushdown_after_relation_cse,
5062 };
5063 let features = features_extracted.into_values(scx.catalog);
5064 let availability_zones = if availability_zones.is_empty() {
5065 None
5066 } else {
5067 Some(availability_zones)
5068 };
5069 let (introspection_interval, introspection_debugging) =
5070 unplan_compute_replica_config(compute);
5071 let replication_factor = match &schedule {
5074 ClusterScheduleOptionValue::Manual => Some(replication_factor),
5075 ClusterScheduleOptionValue::Refresh { .. } => {
5076 assert!(
5077 replication_factor <= 1,
5078 "replication factor, {replication_factor:?}, must be <= 1"
5079 );
5080 None
5081 }
5082 };
5083 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5084 let options_extracted = ClusterOptionExtracted {
5085 seen: Default::default(),
5087 availability_zones,
5088 disk: None,
5089 introspection_debugging: Some(introspection_debugging),
5090 introspection_interval,
5091 managed: Some(true),
5092 replicas: None,
5093 replication_factor,
5094 size: Some(size),
5095 schedule: Some(schedule),
5096 workload_class,
5097 };
5098 let options = options_extracted.into_values(scx.catalog);
5099 let name = Ident::new_unchecked(name);
5100 Ok(CreateClusterStatement {
5101 name,
5102 options,
5103 features,
5104 })
5105 }
5106 CreateClusterVariant::Unmanaged(_) => {
5107 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5108 }
5109 }
5110}
5111
5112generate_extracted_config!(
5113 ReplicaOption,
5114 (AvailabilityZone, String),
5115 (BilledAs, String),
5116 (ComputeAddresses, Vec<String>),
5117 (ComputectlAddresses, Vec<String>),
5118 (Disk, bool),
5119 (Internal, bool, Default(false)),
5120 (IntrospectionDebugging, bool, Default(false)),
5121 (IntrospectionInterval, OptionalDuration),
5122 (Size, String),
5123 (StorageAddresses, Vec<String>),
5124 (StoragectlAddresses, Vec<String>),
5125 (Workers, u16)
5126);
5127
5128fn plan_replica_config(
5129 scx: &StatementContext,
5130 options: Vec<ReplicaOption<Aug>>,
5131) -> Result<ReplicaConfig, PlanError> {
5132 let ReplicaOptionExtracted {
5133 availability_zone,
5134 billed_as,
5135 computectl_addresses,
5136 disk,
5137 internal,
5138 introspection_debugging,
5139 introspection_interval,
5140 size,
5141 storagectl_addresses,
5142 ..
5143 }: ReplicaOptionExtracted = options.try_into()?;
5144
5145 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5146
5147 match (
5148 size,
5149 availability_zone,
5150 billed_as,
5151 storagectl_addresses,
5152 computectl_addresses,
5153 ) {
5154 (None, _, None, None, None) => {
5156 sql_bail!("SIZE option must be specified");
5159 }
5160 (Some(size), availability_zone, billed_as, None, None) => {
5161 if disk.is_some() {
5162 if scx.catalog.is_cluster_size_cc(&size) {
5166 sql_bail!(
5167 "DISK option not supported for modern cluster sizes because disk is always enabled"
5168 );
5169 }
5170
5171 scx.catalog
5172 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5173 }
5174
5175 Ok(ReplicaConfig::Orchestrated {
5176 size,
5177 availability_zone,
5178 compute,
5179 billed_as,
5180 internal,
5181 })
5182 }
5183
5184 (None, None, None, storagectl_addresses, computectl_addresses) => {
5185 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5186
5187 let Some(storagectl_addrs) = storagectl_addresses else {
5191 sql_bail!("missing STORAGECTL ADDRESSES option");
5192 };
5193 let Some(computectl_addrs) = computectl_addresses else {
5194 sql_bail!("missing COMPUTECTL ADDRESSES option");
5195 };
5196
5197 if storagectl_addrs.len() != computectl_addrs.len() {
5198 sql_bail!(
5199 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5200 );
5201 }
5202
5203 if disk.is_some() {
5204 sql_bail!("DISK can't be specified for unorchestrated clusters");
5205 }
5206
5207 Ok(ReplicaConfig::Unorchestrated {
5208 storagectl_addrs,
5209 computectl_addrs,
5210 compute,
5211 })
5212 }
5213 _ => {
5214 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5217 }
5218 }
5219}
5220
5221fn plan_compute_replica_config(
5225 introspection_interval: Option<OptionalDuration>,
5226 introspection_debugging: bool,
5227) -> Result<ComputeReplicaConfig, PlanError> {
5228 let introspection_interval = introspection_interval
5229 .map(|OptionalDuration(i)| i)
5230 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5231 let introspection = match introspection_interval {
5232 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5233 interval,
5234 debugging: introspection_debugging,
5235 }),
5236 None if introspection_debugging => {
5237 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5238 }
5239 None => None,
5240 };
5241 let compute = ComputeReplicaConfig { introspection };
5242 Ok(compute)
5243}
5244
5245fn unplan_compute_replica_config(
5249 compute_replica_config: ComputeReplicaConfig,
5250) -> (Option<OptionalDuration>, bool) {
5251 match compute_replica_config.introspection {
5252 Some(ComputeReplicaIntrospectionConfig {
5253 debugging,
5254 interval,
5255 }) => (Some(OptionalDuration(Some(interval))), debugging),
5256 None => (Some(OptionalDuration(None)), false),
5257 }
5258}
5259
5260fn plan_cluster_schedule(
5264 schedule: ClusterScheduleOptionValue,
5265) -> Result<ClusterSchedule, PlanError> {
5266 Ok(match schedule {
5267 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5268 ClusterScheduleOptionValue::Refresh {
5270 hydration_time_estimate: None,
5271 } => ClusterSchedule::Refresh {
5272 hydration_time_estimate: Duration::from_millis(0),
5273 },
5274 ClusterScheduleOptionValue::Refresh {
5276 hydration_time_estimate: Some(interval_value),
5277 } => {
5278 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5279 if interval.as_microseconds() < 0 {
5280 sql_bail!(
5281 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5282 interval
5283 );
5284 }
5285 if interval.months != 0 {
5286 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5290 }
5291 let duration = interval.duration()?;
5292 if u64::try_from(duration.as_millis()).is_err()
5293 || Interval::from_duration(&duration).is_err()
5294 {
5295 sql_bail!("HYDRATION TIME ESTIMATE too large");
5296 }
5297 ClusterSchedule::Refresh {
5298 hydration_time_estimate: duration,
5299 }
5300 }
5301 })
5302}
5303
5304fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5308 match schedule {
5309 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5310 ClusterSchedule::Refresh {
5311 hydration_time_estimate,
5312 } => {
5313 let interval = Interval::from_duration(&hydration_time_estimate)
5314 .expect("planning ensured that this is convertible back to Interval");
5315 let interval_value = literal::unplan_interval(&interval);
5316 ClusterScheduleOptionValue::Refresh {
5317 hydration_time_estimate: Some(interval_value),
5318 }
5319 }
5320 }
5321}
5322
5323pub fn describe_create_cluster_replica(
5324 _: &StatementContext,
5325 _: CreateClusterReplicaStatement<Aug>,
5326) -> Result<StatementDesc, PlanError> {
5327 Ok(StatementDesc::new(None))
5328}
5329
5330pub fn plan_create_cluster_replica(
5331 scx: &StatementContext,
5332 CreateClusterReplicaStatement {
5333 definition: ReplicaDefinition { name, options },
5334 of_cluster,
5335 }: CreateClusterReplicaStatement<Aug>,
5336) -> Result<Plan, PlanError> {
5337 let cluster = scx
5338 .catalog
5339 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5340 let current_replica_count = cluster.replica_ids().iter().count();
5341 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5342 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5343 return Err(PlanError::CreateReplicaFailStorageObjects {
5344 current_replica_count,
5345 internal_replica_count,
5346 hypothetical_replica_count: current_replica_count + 1,
5347 });
5348 }
5349
5350 let config = plan_replica_config(scx, options)?;
5351
5352 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5353 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5354 return Err(PlanError::MangedReplicaName(name.into_string()));
5355 }
5356 } else {
5357 ensure_cluster_is_not_managed(scx, cluster.id())?;
5358 }
5359
5360 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5361 name: normalize::ident(name),
5362 cluster_id: cluster.id(),
5363 config,
5364 }))
5365}
5366
5367pub fn describe_create_secret(
5368 _: &StatementContext,
5369 _: CreateSecretStatement<Aug>,
5370) -> Result<StatementDesc, PlanError> {
5371 Ok(StatementDesc::new(None))
5372}
5373
5374pub fn plan_create_secret(
5375 scx: &StatementContext,
5376 stmt: CreateSecretStatement<Aug>,
5377) -> Result<Plan, PlanError> {
5378 let CreateSecretStatement {
5379 name,
5380 if_not_exists,
5381 value,
5382 } = &stmt;
5383
5384 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5385 let mut create_sql_statement = stmt.clone();
5386 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5387 let create_sql =
5388 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5389 let secret_as = query::plan_secret_as(scx, value.clone())?;
5390
5391 let secret = Secret {
5392 create_sql,
5393 secret_as,
5394 };
5395
5396 Ok(Plan::CreateSecret(CreateSecretPlan {
5397 name,
5398 secret,
5399 if_not_exists: *if_not_exists,
5400 }))
5401}
5402
5403pub fn describe_create_connection(
5404 _: &StatementContext,
5405 _: CreateConnectionStatement<Aug>,
5406) -> Result<StatementDesc, PlanError> {
5407 Ok(StatementDesc::new(None))
5408}
5409
5410generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5411
5412pub fn plan_create_connection(
5413 scx: &StatementContext,
5414 mut stmt: CreateConnectionStatement<Aug>,
5415) -> Result<Plan, PlanError> {
5416 let CreateConnectionStatement {
5417 name,
5418 connection_type,
5419 values,
5420 if_not_exists,
5421 with_options,
5422 } = stmt.clone();
5423 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5424 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5425 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5426
5427 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5428 if options.validate.is_some() {
5429 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5430 }
5431 let validate = match options.validate {
5432 Some(val) => val,
5433 None => {
5434 scx.catalog
5435 .system_vars()
5436 .enable_default_connection_validation()
5437 && details.to_connection().validate_by_default()
5438 }
5439 };
5440
5441 let full_name = scx.catalog.resolve_full_name(&name);
5443 let partial_name = PartialItemName::from(full_name.clone());
5444 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5445 return Err(PlanError::ItemAlreadyExists {
5446 name: full_name.to_string(),
5447 item_type: item.item_type(),
5448 });
5449 }
5450
5451 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5454 stmt.values.retain(|v| {
5455 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5456 });
5457 stmt.values.push(ConnectionOption {
5458 name: ConnectionOptionName::PublicKey1,
5459 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5460 });
5461 stmt.values.push(ConnectionOption {
5462 name: ConnectionOptionName::PublicKey2,
5463 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5464 });
5465 }
5466 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5467
5468 let plan = CreateConnectionPlan {
5469 name,
5470 if_not_exists,
5471 connection: crate::plan::Connection {
5472 create_sql,
5473 details,
5474 },
5475 validate,
5476 };
5477 Ok(Plan::CreateConnection(plan))
5478}
5479
5480fn plan_drop_database(
5481 scx: &StatementContext,
5482 if_exists: bool,
5483 name: &UnresolvedDatabaseName,
5484 cascade: bool,
5485) -> Result<Option<DatabaseId>, PlanError> {
5486 Ok(match resolve_database(scx, name, if_exists)? {
5487 Some(database) => {
5488 if !cascade && database.has_schemas() {
5489 sql_bail!(
5490 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5491 name,
5492 );
5493 }
5494 Some(database.id())
5495 }
5496 None => None,
5497 })
5498}
5499
5500pub fn describe_drop_objects(
5501 _: &StatementContext,
5502 _: DropObjectsStatement,
5503) -> Result<StatementDesc, PlanError> {
5504 Ok(StatementDesc::new(None))
5505}
5506
5507pub fn plan_drop_objects(
5508 scx: &mut StatementContext,
5509 DropObjectsStatement {
5510 object_type,
5511 if_exists,
5512 names,
5513 cascade,
5514 }: DropObjectsStatement,
5515) -> Result<Plan, PlanError> {
5516 assert_ne!(
5517 object_type,
5518 mz_sql_parser::ast::ObjectType::Func,
5519 "rejected in parser"
5520 );
5521 let object_type = object_type.into();
5522
5523 let mut referenced_ids = Vec::new();
5524 for name in names {
5525 let id = match &name {
5526 UnresolvedObjectName::Cluster(name) => {
5527 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5528 }
5529 UnresolvedObjectName::ClusterReplica(name) => {
5530 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5531 }
5532 UnresolvedObjectName::Database(name) => {
5533 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5534 }
5535 UnresolvedObjectName::Schema(name) => {
5536 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5537 }
5538 UnresolvedObjectName::Role(name) => {
5539 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5540 }
5541 UnresolvedObjectName::Item(name) => {
5542 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5543 .map(ObjectId::Item)
5544 }
5545 UnresolvedObjectName::NetworkPolicy(name) => {
5546 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5547 }
5548 };
5549 match id {
5550 Some(id) => referenced_ids.push(id),
5551 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5552 name: name.to_ast_string_simple(),
5553 object_type,
5554 }),
5555 }
5556 }
5557 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5558
5559 Ok(Plan::DropObjects(DropObjectsPlan {
5560 referenced_ids,
5561 drop_ids,
5562 object_type,
5563 }))
5564}
5565
5566fn plan_drop_schema(
5567 scx: &StatementContext,
5568 if_exists: bool,
5569 name: &UnresolvedSchemaName,
5570 cascade: bool,
5571) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5572 let normalized = normalize::unresolved_schema_name(name.clone())?;
5576 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5577 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5578 }
5579
5580 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5581 Some((database_spec, schema_spec)) => {
5582 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5583 sql_bail!(
5584 "cannot drop schema {name} because it is required by the database system",
5585 );
5586 }
5587 if let SchemaSpecifier::Temporary = schema_spec {
5588 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5589 }
5590 let schema = scx.get_schema(&database_spec, &schema_spec);
5591 if !cascade && schema.has_items() {
5592 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5593 sql_bail!(
5594 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5595 full_schema_name
5596 );
5597 }
5598 Some((database_spec, schema_spec))
5599 }
5600 None => None,
5601 })
5602}
5603
5604fn plan_drop_role(
5605 scx: &StatementContext,
5606 if_exists: bool,
5607 name: &Ident,
5608) -> Result<Option<RoleId>, PlanError> {
5609 match scx.catalog.resolve_role(name.as_str()) {
5610 Ok(role) => {
5611 let id = role.id();
5612 if &id == scx.catalog.active_role_id() {
5613 sql_bail!("current role cannot be dropped");
5614 }
5615 for role in scx.catalog.get_roles() {
5616 for (member_id, grantor_id) in role.membership() {
5617 if &id == grantor_id {
5618 let member_role = scx.catalog.get_role(member_id);
5619 sql_bail!(
5620 "cannot drop role {}: still depended up by membership of role {} in role {}",
5621 name.as_str(),
5622 role.name(),
5623 member_role.name()
5624 );
5625 }
5626 }
5627 }
5628 Ok(Some(role.id()))
5629 }
5630 Err(_) if if_exists => Ok(None),
5631 Err(e) => Err(e.into()),
5632 }
5633}
5634
5635fn plan_drop_cluster(
5636 scx: &StatementContext,
5637 if_exists: bool,
5638 name: &Ident,
5639 cascade: bool,
5640) -> Result<Option<ClusterId>, PlanError> {
5641 Ok(match resolve_cluster(scx, name, if_exists)? {
5642 Some(cluster) => {
5643 if !cascade && !cluster.bound_objects().is_empty() {
5644 return Err(PlanError::DependentObjectsStillExist {
5645 object_type: "cluster".to_string(),
5646 object_name: cluster.name().to_string(),
5647 dependents: Vec::new(),
5648 });
5649 }
5650 Some(cluster.id())
5651 }
5652 None => None,
5653 })
5654}
5655
5656fn plan_drop_network_policy(
5657 scx: &StatementContext,
5658 if_exists: bool,
5659 name: &Ident,
5660) -> Result<Option<NetworkPolicyId>, PlanError> {
5661 match scx.catalog.resolve_network_policy(name.as_str()) {
5662 Ok(policy) => {
5663 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5666 Err(PlanError::NetworkPolicyInUse)
5667 } else {
5668 Ok(Some(policy.id()))
5669 }
5670 }
5671 Err(_) if if_exists => Ok(None),
5672 Err(e) => Err(e.into()),
5673 }
5674}
5675
5676fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5679 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5681 false
5682 } else {
5683 cluster.bound_objects().iter().any(|id| {
5685 let item = scx.catalog.get_item(id);
5686 matches!(
5687 item.item_type(),
5688 CatalogItemType::Sink | CatalogItemType::Source
5689 )
5690 })
5691 }
5692}
5693
5694fn plan_drop_cluster_replica(
5695 scx: &StatementContext,
5696 if_exists: bool,
5697 name: &QualifiedReplica,
5698) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5699 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5700 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5701}
5702
5703fn plan_drop_item(
5705 scx: &StatementContext,
5706 object_type: ObjectType,
5707 if_exists: bool,
5708 name: UnresolvedItemName,
5709 cascade: bool,
5710) -> Result<Option<CatalogItemId>, PlanError> {
5711 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5712 Ok(r) => r,
5713 Err(PlanError::MismatchedObjectType {
5715 name,
5716 is_type: ObjectType::MaterializedView,
5717 expected_type: ObjectType::View,
5718 }) => {
5719 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5720 }
5721 e => e?,
5722 };
5723
5724 Ok(match resolved {
5725 Some(catalog_item) => {
5726 if catalog_item.id().is_system() {
5727 sql_bail!(
5728 "cannot drop {} {} because it is required by the database system",
5729 catalog_item.item_type(),
5730 scx.catalog.minimal_qualification(catalog_item.name()),
5731 );
5732 }
5733
5734 if !cascade {
5735 for id in catalog_item.used_by() {
5736 let dep = scx.catalog.get_item(id);
5737 if dependency_prevents_drop(object_type, dep) {
5738 return Err(PlanError::DependentObjectsStillExist {
5739 object_type: catalog_item.item_type().to_string(),
5740 object_name: scx
5741 .catalog
5742 .minimal_qualification(catalog_item.name())
5743 .to_string(),
5744 dependents: vec![(
5745 dep.item_type().to_string(),
5746 scx.catalog.minimal_qualification(dep.name()).to_string(),
5747 )],
5748 });
5749 }
5750 }
5751 }
5754 Some(catalog_item.id())
5755 }
5756 None => None,
5757 })
5758}
5759
5760fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5762 match object_type {
5763 ObjectType::Type => true,
5764 _ => match dep.item_type() {
5765 CatalogItemType::Func
5766 | CatalogItemType::Table
5767 | CatalogItemType::Source
5768 | CatalogItemType::View
5769 | CatalogItemType::MaterializedView
5770 | CatalogItemType::Sink
5771 | CatalogItemType::Type
5772 | CatalogItemType::Secret
5773 | CatalogItemType::Connection
5774 | CatalogItemType::ContinualTask => true,
5775 CatalogItemType::Index => false,
5776 },
5777 }
5778}
5779
5780pub fn describe_alter_index_options(
5781 _: &StatementContext,
5782 _: AlterIndexStatement<Aug>,
5783) -> Result<StatementDesc, PlanError> {
5784 Ok(StatementDesc::new(None))
5785}
5786
5787pub fn describe_drop_owned(
5788 _: &StatementContext,
5789 _: DropOwnedStatement<Aug>,
5790) -> Result<StatementDesc, PlanError> {
5791 Ok(StatementDesc::new(None))
5792}
5793
5794pub fn plan_drop_owned(
5795 scx: &StatementContext,
5796 drop: DropOwnedStatement<Aug>,
5797) -> Result<Plan, PlanError> {
5798 let cascade = drop.cascade();
5799 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5800 let mut drop_ids = Vec::new();
5801 let mut privilege_revokes = Vec::new();
5802 let mut default_privilege_revokes = Vec::new();
5803
5804 fn update_privilege_revokes(
5805 object_id: SystemObjectId,
5806 privileges: &PrivilegeMap,
5807 role_ids: &BTreeSet<RoleId>,
5808 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5809 ) {
5810 privilege_revokes.extend(iter::zip(
5811 iter::repeat(object_id),
5812 privileges
5813 .all_values()
5814 .filter(|privilege| role_ids.contains(&privilege.grantee))
5815 .cloned(),
5816 ));
5817 }
5818
5819 for replica in scx.catalog.get_cluster_replicas() {
5821 if role_ids.contains(&replica.owner_id()) {
5822 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5823 }
5824 }
5825
5826 for cluster in scx.catalog.get_clusters() {
5828 if role_ids.contains(&cluster.owner_id()) {
5829 if !cascade {
5831 let non_owned_bound_objects: Vec<_> = cluster
5832 .bound_objects()
5833 .into_iter()
5834 .map(|item_id| scx.catalog.get_item(item_id))
5835 .filter(|item| !role_ids.contains(&item.owner_id()))
5836 .collect();
5837 if !non_owned_bound_objects.is_empty() {
5838 let names: Vec<_> = non_owned_bound_objects
5839 .into_iter()
5840 .map(|item| {
5841 (
5842 item.item_type().to_string(),
5843 scx.catalog.resolve_full_name(item.name()).to_string(),
5844 )
5845 })
5846 .collect();
5847 return Err(PlanError::DependentObjectsStillExist {
5848 object_type: "cluster".to_string(),
5849 object_name: cluster.name().to_string(),
5850 dependents: names,
5851 });
5852 }
5853 }
5854 drop_ids.push(cluster.id().into());
5855 }
5856 update_privilege_revokes(
5857 SystemObjectId::Object(cluster.id().into()),
5858 cluster.privileges(),
5859 &role_ids,
5860 &mut privilege_revokes,
5861 );
5862 }
5863
5864 for item in scx.catalog.get_items() {
5866 if role_ids.contains(&item.owner_id()) {
5867 if !cascade {
5868 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5870 let non_owned_dependencies: Vec<_> = used_by
5871 .into_iter()
5872 .map(|item_id| scx.catalog.get_item(item_id))
5873 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5874 .filter(|item| !role_ids.contains(&item.owner_id()))
5875 .collect();
5876 if !non_owned_dependencies.is_empty() {
5877 let names: Vec<_> = non_owned_dependencies
5878 .into_iter()
5879 .map(|item| {
5880 let item_typ = item.item_type().to_string();
5881 let item_name =
5882 scx.catalog.resolve_full_name(item.name()).to_string();
5883 (item_typ, item_name)
5884 })
5885 .collect();
5886 Err(PlanError::DependentObjectsStillExist {
5887 object_type: item.item_type().to_string(),
5888 object_name: scx
5889 .catalog
5890 .resolve_full_name(item.name())
5891 .to_string()
5892 .to_string(),
5893 dependents: names,
5894 })
5895 } else {
5896 Ok(())
5897 }
5898 };
5899
5900 if let Some(id) = item.progress_id() {
5903 let progress_item = scx.catalog.get_item(&id);
5904 check_if_dependents_exist(progress_item.used_by())?;
5905 }
5906 check_if_dependents_exist(item.used_by())?;
5907 }
5908 drop_ids.push(item.id().into());
5909 }
5910 update_privilege_revokes(
5911 SystemObjectId::Object(item.id().into()),
5912 item.privileges(),
5913 &role_ids,
5914 &mut privilege_revokes,
5915 );
5916 }
5917
5918 for schema in scx.catalog.get_schemas() {
5920 if !schema.id().is_temporary() {
5921 if role_ids.contains(&schema.owner_id()) {
5922 if !cascade {
5923 let non_owned_dependencies: Vec<_> = schema
5924 .item_ids()
5925 .map(|item_id| scx.catalog.get_item(&item_id))
5926 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5927 .filter(|item| !role_ids.contains(&item.owner_id()))
5928 .collect();
5929 if !non_owned_dependencies.is_empty() {
5930 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5931 sql_bail!(
5932 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5933 full_schema_name.to_string().quoted()
5934 );
5935 }
5936 }
5937 drop_ids.push((*schema.database(), *schema.id()).into())
5938 }
5939 update_privilege_revokes(
5940 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5941 schema.privileges(),
5942 &role_ids,
5943 &mut privilege_revokes,
5944 );
5945 }
5946 }
5947
5948 for database in scx.catalog.get_databases() {
5950 if role_ids.contains(&database.owner_id()) {
5951 if !cascade {
5952 let non_owned_schemas: Vec<_> = database
5953 .schemas()
5954 .into_iter()
5955 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5956 .collect();
5957 if !non_owned_schemas.is_empty() {
5958 sql_bail!(
5959 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5960 database.name().quoted(),
5961 );
5962 }
5963 }
5964 drop_ids.push(database.id().into());
5965 }
5966 update_privilege_revokes(
5967 SystemObjectId::Object(database.id().into()),
5968 database.privileges(),
5969 &role_ids,
5970 &mut privilege_revokes,
5971 );
5972 }
5973
5974 update_privilege_revokes(
5976 SystemObjectId::System,
5977 scx.catalog.get_system_privileges(),
5978 &role_ids,
5979 &mut privilege_revokes,
5980 );
5981
5982 for (default_privilege_object, default_privilege_acl_items) in
5983 scx.catalog.get_default_privileges()
5984 {
5985 for default_privilege_acl_item in default_privilege_acl_items {
5986 if role_ids.contains(&default_privilege_object.role_id)
5987 || role_ids.contains(&default_privilege_acl_item.grantee)
5988 {
5989 default_privilege_revokes.push((
5990 default_privilege_object.clone(),
5991 default_privilege_acl_item.clone(),
5992 ));
5993 }
5994 }
5995 }
5996
5997 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5998
5999 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
6000 if !system_ids.is_empty() {
6001 let mut owners = system_ids
6002 .into_iter()
6003 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
6004 .collect::<BTreeSet<_>>()
6005 .into_iter()
6006 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
6007 sql_bail!(
6008 "cannot drop objects owned by role {} because they are required by the database system",
6009 owners.join(", "),
6010 );
6011 }
6012
6013 Ok(Plan::DropOwned(DropOwnedPlan {
6014 role_ids: role_ids.into_iter().collect(),
6015 drop_ids,
6016 privilege_revokes,
6017 default_privilege_revokes,
6018 }))
6019}
6020
6021fn plan_retain_history_option(
6022 scx: &StatementContext,
6023 retain_history: Option<OptionalDuration>,
6024) -> Result<Option<CompactionWindow>, PlanError> {
6025 if let Some(OptionalDuration(lcw)) = retain_history {
6026 Ok(Some(plan_retain_history(scx, lcw)?))
6027 } else {
6028 Ok(None)
6029 }
6030}
6031
6032fn plan_retain_history(
6038 scx: &StatementContext,
6039 lcw: Option<Duration>,
6040) -> Result<CompactionWindow, PlanError> {
6041 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6042 match lcw {
6043 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6048 option_name: "RETAIN HISTORY".to_string(),
6049 err: Box::new(PlanError::Unstructured(
6050 "internal error: unexpectedly zero".to_string(),
6051 )),
6052 }),
6053 Some(duration) => {
6054 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6057 && scx
6058 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6059 .is_err()
6060 {
6061 return Err(PlanError::RetainHistoryLow {
6062 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6063 });
6064 }
6065 Ok(duration.try_into()?)
6066 }
6067 None => {
6070 if scx
6071 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6072 .is_err()
6073 {
6074 Err(PlanError::RetainHistoryRequired)
6075 } else {
6076 Ok(CompactionWindow::DisableCompaction)
6077 }
6078 }
6079 }
6080}
6081
6082generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6083
6084fn plan_index_options(
6085 scx: &StatementContext,
6086 with_opts: Vec<IndexOption<Aug>>,
6087) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6088 if !with_opts.is_empty() {
6089 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6091 }
6092
6093 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6094
6095 let mut out = Vec::with_capacity(1);
6096 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6097 out.push(crate::plan::IndexOption::RetainHistory(cw));
6098 }
6099 Ok(out)
6100}
6101
6102generate_extracted_config!(
6103 TableOption,
6104 (PartitionBy, Vec<Ident>),
6105 (RetainHistory, OptionalDuration),
6106 (RedactedTest, String)
6107);
6108
6109fn plan_table_options(
6110 scx: &StatementContext,
6111 desc: &RelationDesc,
6112 with_opts: Vec<TableOption<Aug>>,
6113) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6114 let TableOptionExtracted {
6115 partition_by,
6116 retain_history,
6117 redacted_test,
6118 ..
6119 }: TableOptionExtracted = with_opts.try_into()?;
6120
6121 if let Some(partition_by) = partition_by {
6122 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6123 check_partition_by(desc, partition_by)?;
6124 }
6125
6126 if redacted_test.is_some() {
6127 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6128 }
6129
6130 let mut out = Vec::with_capacity(1);
6131 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6132 out.push(crate::plan::TableOption::RetainHistory(cw));
6133 }
6134 Ok(out)
6135}
6136
6137pub fn plan_alter_index_options(
6138 scx: &mut StatementContext,
6139 AlterIndexStatement {
6140 index_name,
6141 if_exists,
6142 action,
6143 }: AlterIndexStatement<Aug>,
6144) -> Result<Plan, PlanError> {
6145 let object_type = ObjectType::Index;
6146 match action {
6147 AlterIndexAction::ResetOptions(options) => {
6148 let mut options = options.into_iter();
6149 if let Some(opt) = options.next() {
6150 match opt {
6151 IndexOptionName::RetainHistory => {
6152 if options.next().is_some() {
6153 sql_bail!("RETAIN HISTORY must be only option");
6154 }
6155 return alter_retain_history(
6156 scx,
6157 object_type,
6158 if_exists,
6159 UnresolvedObjectName::Item(index_name),
6160 None,
6161 );
6162 }
6163 }
6164 }
6165 sql_bail!("expected option");
6166 }
6167 AlterIndexAction::SetOptions(options) => {
6168 let mut options = options.into_iter();
6169 if let Some(opt) = options.next() {
6170 match opt.name {
6171 IndexOptionName::RetainHistory => {
6172 if options.next().is_some() {
6173 sql_bail!("RETAIN HISTORY must be only option");
6174 }
6175 return alter_retain_history(
6176 scx,
6177 object_type,
6178 if_exists,
6179 UnresolvedObjectName::Item(index_name),
6180 opt.value,
6181 );
6182 }
6183 }
6184 }
6185 sql_bail!("expected option");
6186 }
6187 }
6188}
6189
6190pub fn describe_alter_cluster_set_options(
6191 _: &StatementContext,
6192 _: AlterClusterStatement<Aug>,
6193) -> Result<StatementDesc, PlanError> {
6194 Ok(StatementDesc::new(None))
6195}
6196
6197pub fn plan_alter_cluster(
6198 scx: &mut StatementContext,
6199 AlterClusterStatement {
6200 name,
6201 action,
6202 if_exists,
6203 }: AlterClusterStatement<Aug>,
6204) -> Result<Plan, PlanError> {
6205 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6206 Some(entry) => entry,
6207 None => {
6208 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6209 name: name.to_ast_string_simple(),
6210 object_type: ObjectType::Cluster,
6211 });
6212
6213 return Ok(Plan::AlterNoop(AlterNoopPlan {
6214 object_type: ObjectType::Cluster,
6215 }));
6216 }
6217 };
6218
6219 let mut options: PlanClusterOption = Default::default();
6220 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6221
6222 match action {
6223 AlterClusterAction::SetOptions {
6224 options: set_options,
6225 with_options,
6226 } => {
6227 let ClusterOptionExtracted {
6228 availability_zones,
6229 introspection_debugging,
6230 introspection_interval,
6231 managed,
6232 replicas: replica_defs,
6233 replication_factor,
6234 seen: _,
6235 size,
6236 disk,
6237 schedule,
6238 workload_class,
6239 }: ClusterOptionExtracted = set_options.try_into()?;
6240
6241 if !scx.catalog.active_role_id().is_system() {
6242 if workload_class.is_some() {
6243 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6244 }
6245 }
6246
6247 match managed.unwrap_or_else(|| cluster.is_managed()) {
6248 true => {
6249 let alter_strategy_extracted =
6250 ClusterAlterOptionExtracted::try_from(with_options)?;
6251 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6252
6253 match alter_strategy {
6254 AlterClusterPlanStrategy::None => {}
6255 _ => {
6256 scx.require_feature_flag(
6257 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6258 )?;
6259 }
6260 }
6261
6262 if replica_defs.is_some() {
6263 sql_bail!("REPLICAS not supported for managed clusters");
6264 }
6265 if schedule.is_some()
6266 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6267 {
6268 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6269 }
6270
6271 if let Some(replication_factor) = replication_factor {
6272 if schedule.is_some()
6273 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6274 {
6275 sql_bail!(
6276 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6277 );
6278 }
6279 if let Some(current_schedule) = cluster.schedule() {
6280 if !matches!(current_schedule, ClusterSchedule::Manual) {
6281 sql_bail!(
6282 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6283 );
6284 }
6285 }
6286
6287 let internal_replica_count =
6288 cluster.replicas().iter().filter(|r| r.internal()).count();
6289 let hypothetical_replica_count =
6290 internal_replica_count + usize::cast_from(replication_factor);
6291
6292 if contains_single_replica_objects(scx, cluster)
6295 && hypothetical_replica_count > 1
6296 {
6297 return Err(PlanError::CreateReplicaFailStorageObjects {
6298 current_replica_count: cluster.replica_ids().iter().count(),
6299 internal_replica_count,
6300 hypothetical_replica_count,
6301 });
6302 }
6303 } else if alter_strategy.is_some() {
6304 let internal_replica_count =
6308 cluster.replicas().iter().filter(|r| r.internal()).count();
6309 let hypothetical_replica_count = internal_replica_count * 2;
6310 if contains_single_replica_objects(scx, cluster) {
6311 return Err(PlanError::CreateReplicaFailStorageObjects {
6312 current_replica_count: cluster.replica_ids().iter().count(),
6313 internal_replica_count,
6314 hypothetical_replica_count,
6315 });
6316 }
6317 }
6318 }
6319 false => {
6320 if !alter_strategy.is_none() {
6321 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6322 }
6323 if availability_zones.is_some() {
6324 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6325 }
6326 if replication_factor.is_some() {
6327 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6328 }
6329 if introspection_debugging.is_some() {
6330 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6331 }
6332 if introspection_interval.is_some() {
6333 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6334 }
6335 if size.is_some() {
6336 sql_bail!("SIZE not supported for unmanaged clusters");
6337 }
6338 if disk.is_some() {
6339 sql_bail!("DISK not supported for unmanaged clusters");
6340 }
6341 if schedule.is_some()
6342 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6343 {
6344 sql_bail!(
6345 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6346 );
6347 }
6348 if let Some(current_schedule) = cluster.schedule() {
6349 if !matches!(current_schedule, ClusterSchedule::Manual)
6350 && schedule.is_none()
6351 {
6352 sql_bail!(
6353 "when switching a cluster to unmanaged, if the managed \
6354 cluster's SCHEDULE is anything other than MANUAL, you have to \
6355 explicitly set the SCHEDULE to MANUAL"
6356 );
6357 }
6358 }
6359 }
6360 }
6361
6362 let mut replicas = vec![];
6363 for ReplicaDefinition { name, options } in
6364 replica_defs.into_iter().flat_map(Vec::into_iter)
6365 {
6366 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6367 }
6368
6369 if let Some(managed) = managed {
6370 options.managed = AlterOptionParameter::Set(managed);
6371 }
6372 if let Some(replication_factor) = replication_factor {
6373 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6374 }
6375 if let Some(size) = &size {
6376 options.size = AlterOptionParameter::Set(size.clone());
6377 }
6378 if let Some(availability_zones) = availability_zones {
6379 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6380 }
6381 if let Some(introspection_debugging) = introspection_debugging {
6382 options.introspection_debugging =
6383 AlterOptionParameter::Set(introspection_debugging);
6384 }
6385 if let Some(introspection_interval) = introspection_interval {
6386 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6387 }
6388 if disk.is_some() {
6389 let size = size.as_deref().unwrap_or_else(|| {
6393 cluster.managed_size().expect("cluster known to be managed")
6394 });
6395 if scx.catalog.is_cluster_size_cc(size) {
6396 sql_bail!(
6397 "DISK option not supported for modern cluster sizes because disk is always enabled"
6398 );
6399 }
6400
6401 scx.catalog
6402 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6403 }
6404 if !replicas.is_empty() {
6405 options.replicas = AlterOptionParameter::Set(replicas);
6406 }
6407 if let Some(schedule) = schedule {
6408 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6409 }
6410 if let Some(workload_class) = workload_class {
6411 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6412 }
6413 }
6414 AlterClusterAction::ResetOptions(reset_options) => {
6415 use AlterOptionParameter::Reset;
6416 use ClusterOptionName::*;
6417
6418 if !scx.catalog.active_role_id().is_system() {
6419 if reset_options.contains(&WorkloadClass) {
6420 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6421 }
6422 }
6423
6424 for option in reset_options {
6425 match option {
6426 AvailabilityZones => options.availability_zones = Reset,
6427 Disk => scx
6428 .catalog
6429 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6430 IntrospectionInterval => options.introspection_interval = Reset,
6431 IntrospectionDebugging => options.introspection_debugging = Reset,
6432 Managed => options.managed = Reset,
6433 Replicas => options.replicas = Reset,
6434 ReplicationFactor => options.replication_factor = Reset,
6435 Size => options.size = Reset,
6436 Schedule => options.schedule = Reset,
6437 WorkloadClass => options.workload_class = Reset,
6438 }
6439 }
6440 }
6441 }
6442 Ok(Plan::AlterCluster(AlterClusterPlan {
6443 id: cluster.id(),
6444 name: cluster.name().to_string(),
6445 options,
6446 strategy: alter_strategy,
6447 }))
6448}
6449
6450pub fn describe_alter_set_cluster(
6451 _: &StatementContext,
6452 _: AlterSetClusterStatement<Aug>,
6453) -> Result<StatementDesc, PlanError> {
6454 Ok(StatementDesc::new(None))
6455}
6456
6457pub fn plan_alter_item_set_cluster(
6458 scx: &StatementContext,
6459 AlterSetClusterStatement {
6460 if_exists,
6461 set_cluster: in_cluster_name,
6462 name,
6463 object_type,
6464 }: AlterSetClusterStatement<Aug>,
6465) -> Result<Plan, PlanError> {
6466 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6467
6468 let object_type = object_type.into();
6469
6470 match object_type {
6472 ObjectType::MaterializedView => {}
6473 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6474 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6475 }
6476 _ => {
6477 bail_never_supported!(
6478 format!("ALTER {object_type} SET CLUSTER"),
6479 "sql/alter-set-cluster/",
6480 format!("{object_type} has no associated cluster")
6481 )
6482 }
6483 }
6484
6485 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6486
6487 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6488 Some(entry) => {
6489 let current_cluster = entry.cluster_id();
6490 let Some(current_cluster) = current_cluster else {
6491 sql_bail!("No cluster associated with {name}");
6492 };
6493
6494 if current_cluster == in_cluster.id() {
6495 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6496 } else {
6497 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6498 id: entry.id(),
6499 set_cluster: in_cluster.id(),
6500 }))
6501 }
6502 }
6503 None => {
6504 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6505 name: name.to_ast_string_simple(),
6506 object_type,
6507 });
6508
6509 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6510 }
6511 }
6512}
6513
6514pub fn describe_alter_object_rename(
6515 _: &StatementContext,
6516 _: AlterObjectRenameStatement,
6517) -> Result<StatementDesc, PlanError> {
6518 Ok(StatementDesc::new(None))
6519}
6520
6521pub fn plan_alter_object_rename(
6522 scx: &mut StatementContext,
6523 AlterObjectRenameStatement {
6524 name,
6525 object_type,
6526 to_item_name,
6527 if_exists,
6528 }: AlterObjectRenameStatement,
6529) -> Result<Plan, PlanError> {
6530 let object_type = object_type.into();
6531 match (object_type, name) {
6532 (
6533 ObjectType::View
6534 | ObjectType::MaterializedView
6535 | ObjectType::Table
6536 | ObjectType::Source
6537 | ObjectType::Index
6538 | ObjectType::Sink
6539 | ObjectType::Secret
6540 | ObjectType::Connection,
6541 UnresolvedObjectName::Item(name),
6542 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6543 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6544 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6545 }
6546 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6547 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6548 }
6549 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6550 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6551 }
6552 (object_type, name) => {
6553 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6554 }
6555 }
6556}
6557
6558pub fn plan_alter_schema_rename(
6559 scx: &mut StatementContext,
6560 name: UnresolvedSchemaName,
6561 to_schema_name: Ident,
6562 if_exists: bool,
6563) -> Result<Plan, PlanError> {
6564 let normalized = normalize::unresolved_schema_name(name.clone())?;
6568 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6569 sql_bail!(
6570 "cannot rename schemas in the ambient database: {:?}",
6571 mz_repr::namespaces::MZ_TEMP_SCHEMA
6572 );
6573 }
6574
6575 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6576 let object_type = ObjectType::Schema;
6577 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6578 name: name.to_ast_string_simple(),
6579 object_type,
6580 });
6581 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6582 };
6583
6584 if scx
6586 .resolve_schema_in_database(&db_spec, &to_schema_name)
6587 .is_ok()
6588 {
6589 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6590 to_schema_name.clone().into_string(),
6591 )));
6592 }
6593
6594 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6596 if schema.id().is_system() {
6597 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6598 }
6599
6600 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6601 cur_schema_spec: (db_spec, schema_spec),
6602 new_schema_name: to_schema_name.into_string(),
6603 }))
6604}
6605
6606pub fn plan_alter_schema_swap<F>(
6607 scx: &mut StatementContext,
6608 name_a: UnresolvedSchemaName,
6609 name_b: Ident,
6610 gen_temp_suffix: F,
6611) -> Result<Plan, PlanError>
6612where
6613 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6614{
6615 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6619 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6620 {
6621 sql_bail!("cannot swap schemas that are in the ambient database");
6622 }
6623 let name_b_str = normalize::ident_ref(&name_b);
6625 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6626 sql_bail!("cannot swap schemas that are in the ambient database");
6627 }
6628
6629 let schema_a = scx.resolve_schema(name_a.clone())?;
6630
6631 let db_spec = schema_a.database().clone();
6632 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6633 sql_bail!("cannot swap schemas that are in the ambient database");
6634 };
6635 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6636
6637 if schema_a.id().is_system() || schema_b.id().is_system() {
6639 bail_never_supported!("swapping a system schema".to_string())
6640 }
6641
6642 let check = |temp_suffix: &str| {
6646 let mut temp_name = ident!("mz_schema_swap_");
6647 temp_name.append_lossy(temp_suffix);
6648 scx.resolve_schema_in_database(&db_spec, &temp_name)
6649 .is_err()
6650 };
6651 let temp_suffix = gen_temp_suffix(&check)?;
6652 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6653
6654 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6655 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6656 schema_a_name: schema_a.name().schema.to_string(),
6657 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6658 schema_b_name: schema_b.name().schema.to_string(),
6659 name_temp,
6660 }))
6661}
6662
6663pub fn plan_alter_item_rename(
6664 scx: &mut StatementContext,
6665 object_type: ObjectType,
6666 name: UnresolvedItemName,
6667 to_item_name: Ident,
6668 if_exists: bool,
6669) -> Result<Plan, PlanError> {
6670 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6671 Ok(r) => r,
6672 Err(PlanError::MismatchedObjectType {
6674 name,
6675 is_type: ObjectType::MaterializedView,
6676 expected_type: ObjectType::View,
6677 }) => {
6678 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6679 }
6680 e => e?,
6681 };
6682
6683 match resolved {
6684 Some(entry) => {
6685 let full_name = scx.catalog.resolve_full_name(entry.name());
6686 let item_type = entry.item_type();
6687
6688 let proposed_name = QualifiedItemName {
6689 qualifiers: entry.name().qualifiers.clone(),
6690 item: to_item_name.clone().into_string(),
6691 };
6692
6693 let conflicting_type_exists;
6697 let conflicting_item_exists;
6698 if item_type == CatalogItemType::Type {
6699 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6700 conflicting_item_exists = scx
6701 .catalog
6702 .get_item_by_name(&proposed_name)
6703 .map(|item| item.item_type().conflicts_with_type())
6704 .unwrap_or(false);
6705 } else {
6706 conflicting_type_exists = item_type.conflicts_with_type()
6707 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6708 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6709 };
6710 if conflicting_type_exists || conflicting_item_exists {
6711 sql_bail!("catalog item '{}' already exists", to_item_name);
6712 }
6713
6714 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6715 id: entry.id(),
6716 current_full_name: full_name,
6717 to_name: normalize::ident(to_item_name),
6718 object_type,
6719 }))
6720 }
6721 None => {
6722 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6723 name: name.to_ast_string_simple(),
6724 object_type,
6725 });
6726
6727 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6728 }
6729 }
6730}
6731
6732pub fn plan_alter_cluster_rename(
6733 scx: &mut StatementContext,
6734 object_type: ObjectType,
6735 name: Ident,
6736 to_name: Ident,
6737 if_exists: bool,
6738) -> Result<Plan, PlanError> {
6739 match resolve_cluster(scx, &name, if_exists)? {
6740 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6741 id: entry.id(),
6742 name: entry.name().to_string(),
6743 to_name: ident(to_name),
6744 })),
6745 None => {
6746 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6747 name: name.to_ast_string_simple(),
6748 object_type,
6749 });
6750
6751 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6752 }
6753 }
6754}
6755
6756pub fn plan_alter_cluster_swap<F>(
6757 scx: &mut StatementContext,
6758 name_a: Ident,
6759 name_b: Ident,
6760 gen_temp_suffix: F,
6761) -> Result<Plan, PlanError>
6762where
6763 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6764{
6765 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6766 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6767
6768 let check = |temp_suffix: &str| {
6769 let mut temp_name = ident!("mz_schema_swap_");
6770 temp_name.append_lossy(temp_suffix);
6771 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6772 Err(CatalogError::UnknownCluster(_)) => true,
6774 Ok(_) | Err(_) => false,
6776 }
6777 };
6778 let temp_suffix = gen_temp_suffix(&check)?;
6779 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6780
6781 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6782 id_a: cluster_a.id(),
6783 id_b: cluster_b.id(),
6784 name_a: name_a.into_string(),
6785 name_b: name_b.into_string(),
6786 name_temp,
6787 }))
6788}
6789
6790pub fn plan_alter_cluster_replica_rename(
6791 scx: &mut StatementContext,
6792 object_type: ObjectType,
6793 name: QualifiedReplica,
6794 to_item_name: Ident,
6795 if_exists: bool,
6796) -> Result<Plan, PlanError> {
6797 match resolve_cluster_replica(scx, &name, if_exists)? {
6798 Some((cluster, replica)) => {
6799 ensure_cluster_is_not_managed(scx, cluster.id())?;
6800 Ok(Plan::AlterClusterReplicaRename(
6801 AlterClusterReplicaRenamePlan {
6802 cluster_id: cluster.id(),
6803 replica_id: replica,
6804 name: QualifiedReplica {
6805 cluster: Ident::new(cluster.name())?,
6806 replica: name.replica,
6807 },
6808 to_name: normalize::ident(to_item_name),
6809 },
6810 ))
6811 }
6812 None => {
6813 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6814 name: name.to_ast_string_simple(),
6815 object_type,
6816 });
6817
6818 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6819 }
6820 }
6821}
6822
6823pub fn describe_alter_object_swap(
6824 _: &StatementContext,
6825 _: AlterObjectSwapStatement,
6826) -> Result<StatementDesc, PlanError> {
6827 Ok(StatementDesc::new(None))
6828}
6829
6830pub fn plan_alter_object_swap(
6831 scx: &mut StatementContext,
6832 stmt: AlterObjectSwapStatement,
6833) -> Result<Plan, PlanError> {
6834 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6835
6836 let AlterObjectSwapStatement {
6837 object_type,
6838 name_a,
6839 name_b,
6840 } = stmt;
6841 let object_type = object_type.into();
6842
6843 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6845 let mut attempts = 0;
6846 let name_temp = loop {
6847 attempts += 1;
6848 if attempts > 10 {
6849 tracing::warn!("Unable to generate temp id for swapping");
6850 sql_bail!("unable to swap!");
6851 }
6852
6853 let short_id = mz_ore::id_gen::temp_id();
6855 if check_fn(&short_id) {
6856 break short_id;
6857 }
6858 };
6859
6860 Ok(name_temp)
6861 };
6862
6863 match (object_type, name_a, name_b) {
6864 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6865 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6866 }
6867 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6868 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6869 }
6870 (object_type, _, _) => Err(PlanError::Unsupported {
6871 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6872 discussion_no: None,
6873 }),
6874 }
6875}
6876
6877pub fn describe_alter_retain_history(
6878 _: &StatementContext,
6879 _: AlterRetainHistoryStatement<Aug>,
6880) -> Result<StatementDesc, PlanError> {
6881 Ok(StatementDesc::new(None))
6882}
6883
6884pub fn plan_alter_retain_history(
6885 scx: &StatementContext,
6886 AlterRetainHistoryStatement {
6887 object_type,
6888 if_exists,
6889 name,
6890 history,
6891 }: AlterRetainHistoryStatement<Aug>,
6892) -> Result<Plan, PlanError> {
6893 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6894}
6895
6896fn alter_retain_history(
6897 scx: &StatementContext,
6898 object_type: ObjectType,
6899 if_exists: bool,
6900 name: UnresolvedObjectName,
6901 history: Option<WithOptionValue<Aug>>,
6902) -> Result<Plan, PlanError> {
6903 let name = match (object_type, name) {
6904 (
6905 ObjectType::View
6907 | ObjectType::MaterializedView
6908 | ObjectType::Table
6909 | ObjectType::Source
6910 | ObjectType::Index,
6911 UnresolvedObjectName::Item(name),
6912 ) => name,
6913 (object_type, _) => {
6914 sql_bail!("{object_type} does not support RETAIN HISTORY")
6915 }
6916 };
6917 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6918 Some(entry) => {
6919 let full_name = scx.catalog.resolve_full_name(entry.name());
6920 let item_type = entry.item_type();
6921
6922 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6924 return Err(PlanError::AlterViewOnMaterializedView(
6925 full_name.to_string(),
6926 ));
6927 } else if object_type == ObjectType::View {
6928 sql_bail!("{object_type} does not support RETAIN HISTORY")
6929 } else if object_type != item_type {
6930 sql_bail!(
6931 "\"{}\" is a {} not a {}",
6932 full_name,
6933 entry.item_type(),
6934 format!("{object_type}").to_lowercase()
6935 )
6936 }
6937
6938 let (value, lcw) = match &history {
6940 Some(WithOptionValue::RetainHistoryFor(value)) => {
6941 let window = OptionalDuration::try_from_value(value.clone())?;
6942 (Some(value.clone()), window.0)
6943 }
6944 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6946 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6947 };
6948 let window = plan_retain_history(scx, lcw)?;
6949
6950 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6951 id: entry.id(),
6952 value,
6953 window,
6954 object_type,
6955 }))
6956 }
6957 None => {
6958 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6959 name: name.to_ast_string_simple(),
6960 object_type,
6961 });
6962
6963 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6964 }
6965 }
6966}
6967
6968pub fn describe_alter_secret_options(
6969 _: &StatementContext,
6970 _: AlterSecretStatement<Aug>,
6971) -> Result<StatementDesc, PlanError> {
6972 Ok(StatementDesc::new(None))
6973}
6974
6975pub fn plan_alter_secret(
6976 scx: &mut StatementContext,
6977 stmt: AlterSecretStatement<Aug>,
6978) -> Result<Plan, PlanError> {
6979 let AlterSecretStatement {
6980 name,
6981 if_exists,
6982 value,
6983 } = stmt;
6984 let object_type = ObjectType::Secret;
6985 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6986 Some(entry) => entry.id(),
6987 None => {
6988 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6989 name: name.to_string(),
6990 object_type,
6991 });
6992
6993 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6994 }
6995 };
6996
6997 let secret_as = query::plan_secret_as(scx, value)?;
6998
6999 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
7000}
7001
7002pub fn describe_alter_connection(
7003 _: &StatementContext,
7004 _: AlterConnectionStatement<Aug>,
7005) -> Result<StatementDesc, PlanError> {
7006 Ok(StatementDesc::new(None))
7007}
7008
7009generate_extracted_config!(AlterConnectionOption, (Validate, bool));
7010
7011pub fn plan_alter_connection(
7012 scx: &StatementContext,
7013 stmt: AlterConnectionStatement<Aug>,
7014) -> Result<Plan, PlanError> {
7015 let AlterConnectionStatement {
7016 name,
7017 if_exists,
7018 actions,
7019 with_options,
7020 } = stmt;
7021 let conn_name = normalize::unresolved_item_name(name)?;
7022 let entry = match scx.catalog.resolve_item(&conn_name) {
7023 Ok(entry) => entry,
7024 Err(_) if if_exists => {
7025 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7026 name: conn_name.to_string(),
7027 object_type: ObjectType::Sink,
7028 });
7029
7030 return Ok(Plan::AlterNoop(AlterNoopPlan {
7031 object_type: ObjectType::Connection,
7032 }));
7033 }
7034 Err(e) => return Err(e.into()),
7035 };
7036
7037 let connection = entry.connection()?;
7038
7039 if actions
7040 .iter()
7041 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7042 {
7043 if actions.len() > 1 {
7044 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7045 }
7046
7047 if !with_options.is_empty() {
7048 sql_bail!(
7049 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7050 with_options
7051 .iter()
7052 .map(|o| o.to_ast_string_simple())
7053 .join(", ")
7054 );
7055 }
7056
7057 if !matches!(connection, Connection::Ssh(_)) {
7058 sql_bail!(
7059 "{} is not an SSH connection",
7060 scx.catalog.resolve_full_name(entry.name())
7061 )
7062 }
7063
7064 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7065 id: entry.id(),
7066 action: crate::plan::AlterConnectionAction::RotateKeys,
7067 }));
7068 }
7069
7070 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7071 if options.validate.is_some() {
7072 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7073 }
7074
7075 let validate = match options.validate {
7076 Some(val) => val,
7077 None => {
7078 scx.catalog
7079 .system_vars()
7080 .enable_default_connection_validation()
7081 && connection.validate_by_default()
7082 }
7083 };
7084
7085 let connection_type = match connection {
7086 Connection::Aws(_) => CreateConnectionType::Aws,
7087 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7088 Connection::Kafka(_) => CreateConnectionType::Kafka,
7089 Connection::Csr(_) => CreateConnectionType::Csr,
7090 Connection::Postgres(_) => CreateConnectionType::Postgres,
7091 Connection::Ssh(_) => CreateConnectionType::Ssh,
7092 Connection::MySql(_) => CreateConnectionType::MySql,
7093 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7094 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7095 };
7096
7097 let specified_options: BTreeSet<_> = actions
7099 .iter()
7100 .map(|action: &AlterConnectionAction<Aug>| match action {
7101 AlterConnectionAction::SetOption(option) => option.name.clone(),
7102 AlterConnectionAction::DropOption(name) => name.clone(),
7103 AlterConnectionAction::RotateKeys => unreachable!(),
7104 })
7105 .collect();
7106
7107 for invalid in INALTERABLE_OPTIONS {
7108 if specified_options.contains(invalid) {
7109 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7110 }
7111 }
7112
7113 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7114
7115 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7117 actions.into_iter().partition_map(|action| match action {
7118 AlterConnectionAction::SetOption(option) => Either::Left(option),
7119 AlterConnectionAction::DropOption(name) => Either::Right(name),
7120 AlterConnectionAction::RotateKeys => unreachable!(),
7121 });
7122
7123 let set_options: BTreeMap<_, _> = set_options_vec
7124 .clone()
7125 .into_iter()
7126 .map(|option| (option.name, option.value))
7127 .collect();
7128
7129 let connection_options_extracted =
7133 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7134
7135 let duplicates: Vec<_> = connection_options_extracted
7136 .seen
7137 .intersection(&drop_options)
7138 .collect();
7139
7140 if !duplicates.is_empty() {
7141 sql_bail!(
7142 "cannot both SET and DROP/RESET options {}",
7143 duplicates
7144 .iter()
7145 .map(|option| option.to_string())
7146 .join(", ")
7147 )
7148 }
7149
7150 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7151 let set_options_count = mutually_exclusive_options
7152 .iter()
7153 .filter(|o| set_options.contains_key(o))
7154 .count();
7155 let drop_options_count = mutually_exclusive_options
7156 .iter()
7157 .filter(|o| drop_options.contains(o))
7158 .count();
7159
7160 if set_options_count > 0 && drop_options_count > 0 {
7162 sql_bail!(
7163 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7164 connection_type,
7165 mutually_exclusive_options
7166 .iter()
7167 .map(|option| option.to_string())
7168 .join(", ")
7169 )
7170 }
7171
7172 if set_options_count > 0 || drop_options_count > 0 {
7177 drop_options.extend(mutually_exclusive_options.iter().cloned());
7178 }
7179
7180 }
7183
7184 Ok(Plan::AlterConnection(AlterConnectionPlan {
7185 id: entry.id(),
7186 action: crate::plan::AlterConnectionAction::AlterOptions {
7187 set_options,
7188 drop_options,
7189 validate,
7190 },
7191 }))
7192}
7193
7194pub fn describe_alter_sink(
7195 _: &StatementContext,
7196 _: AlterSinkStatement<Aug>,
7197) -> Result<StatementDesc, PlanError> {
7198 Ok(StatementDesc::new(None))
7199}
7200
7201pub fn plan_alter_sink(
7202 scx: &mut StatementContext,
7203 stmt: AlterSinkStatement<Aug>,
7204) -> Result<Plan, PlanError> {
7205 let AlterSinkStatement {
7206 sink_name,
7207 if_exists,
7208 action,
7209 } = stmt;
7210
7211 let object_type = ObjectType::Sink;
7212 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7213
7214 let Some(item) = item else {
7215 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7216 name: sink_name.to_string(),
7217 object_type,
7218 });
7219
7220 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7221 };
7222 let item = item.at_version(RelationVersionSelector::Latest);
7224
7225 match action {
7226 AlterSinkAction::ChangeRelation(new_from) => {
7227 let create_sql = item.create_sql();
7229 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7230 let [stmt]: [StatementParseResult; 1] = stmts
7231 .try_into()
7232 .expect("create sql of sink was not exactly one statement");
7233 let Statement::CreateSink(stmt) = stmt.ast else {
7234 unreachable!("invalid create SQL for sink item");
7235 };
7236
7237 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7239 stmt.from = new_from;
7240
7241 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7243 unreachable!("invalid plan for CREATE SINK statement");
7244 };
7245
7246 plan.sink.version += 1;
7247
7248 Ok(Plan::AlterSink(AlterSinkPlan {
7249 item_id: item.id(),
7250 global_id: item.global_id(),
7251 sink: plan.sink,
7252 with_snapshot: plan.with_snapshot,
7253 in_cluster: plan.in_cluster,
7254 }))
7255 }
7256 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7257 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7258 }
7259}
7260
7261pub fn describe_alter_source(
7262 _: &StatementContext,
7263 _: AlterSourceStatement<Aug>,
7264) -> Result<StatementDesc, PlanError> {
7265 Ok(StatementDesc::new(None))
7267}
7268
7269generate_extracted_config!(
7270 AlterSourceAddSubsourceOption,
7271 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7272 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7273 (Details, String)
7274);
7275
7276pub fn plan_alter_source(
7277 scx: &mut StatementContext,
7278 stmt: AlterSourceStatement<Aug>,
7279) -> Result<Plan, PlanError> {
7280 let AlterSourceStatement {
7281 source_name,
7282 if_exists,
7283 action,
7284 } = stmt;
7285 let object_type = ObjectType::Source;
7286
7287 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7288 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7289 name: source_name.to_string(),
7290 object_type,
7291 });
7292
7293 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7294 }
7295
7296 match action {
7297 AlterSourceAction::SetOptions(options) => {
7298 let mut options = options.into_iter();
7299 let option = options.next().unwrap();
7300 if option.name == CreateSourceOptionName::RetainHistory {
7301 if options.next().is_some() {
7302 sql_bail!("RETAIN HISTORY must be only option");
7303 }
7304 return alter_retain_history(
7305 scx,
7306 object_type,
7307 if_exists,
7308 UnresolvedObjectName::Item(source_name),
7309 option.value,
7310 );
7311 }
7312 sql_bail!(
7315 "Cannot modify the {} of a SOURCE.",
7316 option.name.to_ast_string_simple()
7317 );
7318 }
7319 AlterSourceAction::ResetOptions(reset) => {
7320 let mut options = reset.into_iter();
7321 let option = options.next().unwrap();
7322 if option == CreateSourceOptionName::RetainHistory {
7323 if options.next().is_some() {
7324 sql_bail!("RETAIN HISTORY must be only option");
7325 }
7326 return alter_retain_history(
7327 scx,
7328 object_type,
7329 if_exists,
7330 UnresolvedObjectName::Item(source_name),
7331 None,
7332 );
7333 }
7334 sql_bail!(
7335 "Cannot modify the {} of a SOURCE.",
7336 option.to_ast_string_simple()
7337 );
7338 }
7339 AlterSourceAction::DropSubsources { .. } => {
7340 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7341 }
7342 AlterSourceAction::AddSubsources { .. } => {
7343 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7344 }
7345 AlterSourceAction::RefreshReferences => {
7346 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7347 }
7348 };
7349}
7350
7351pub fn describe_alter_system_set(
7352 _: &StatementContext,
7353 _: AlterSystemSetStatement,
7354) -> Result<StatementDesc, PlanError> {
7355 Ok(StatementDesc::new(None))
7356}
7357
7358pub fn plan_alter_system_set(
7359 _: &StatementContext,
7360 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7361) -> Result<Plan, PlanError> {
7362 let name = name.to_string();
7363 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7364 name,
7365 value: scl::plan_set_variable_to(to)?,
7366 }))
7367}
7368
7369pub fn describe_alter_system_reset(
7370 _: &StatementContext,
7371 _: AlterSystemResetStatement,
7372) -> Result<StatementDesc, PlanError> {
7373 Ok(StatementDesc::new(None))
7374}
7375
7376pub fn plan_alter_system_reset(
7377 _: &StatementContext,
7378 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7379) -> Result<Plan, PlanError> {
7380 let name = name.to_string();
7381 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7382}
7383
7384pub fn describe_alter_system_reset_all(
7385 _: &StatementContext,
7386 _: AlterSystemResetAllStatement,
7387) -> Result<StatementDesc, PlanError> {
7388 Ok(StatementDesc::new(None))
7389}
7390
7391pub fn plan_alter_system_reset_all(
7392 _: &StatementContext,
7393 _: AlterSystemResetAllStatement,
7394) -> Result<Plan, PlanError> {
7395 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7396}
7397
7398pub fn describe_alter_role(
7399 _: &StatementContext,
7400 _: AlterRoleStatement<Aug>,
7401) -> Result<StatementDesc, PlanError> {
7402 Ok(StatementDesc::new(None))
7403}
7404
7405pub fn plan_alter_role(
7406 scx: &StatementContext,
7407 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7408) -> Result<Plan, PlanError> {
7409 let option = match option {
7410 AlterRoleOption::Attributes(attrs) => {
7411 let attrs = plan_role_attributes(attrs, scx)?;
7412 PlannedAlterRoleOption::Attributes(attrs)
7413 }
7414 AlterRoleOption::Variable(variable) => {
7415 let var = plan_role_variable(variable)?;
7416 PlannedAlterRoleOption::Variable(var)
7417 }
7418 };
7419
7420 Ok(Plan::AlterRole(AlterRolePlan {
7421 id: name.id,
7422 name: name.name,
7423 option,
7424 }))
7425}
7426
7427pub fn describe_alter_table_add_column(
7428 _: &StatementContext,
7429 _: AlterTableAddColumnStatement<Aug>,
7430) -> Result<StatementDesc, PlanError> {
7431 Ok(StatementDesc::new(None))
7432}
7433
7434pub fn plan_alter_table_add_column(
7435 scx: &StatementContext,
7436 stmt: AlterTableAddColumnStatement<Aug>,
7437) -> Result<Plan, PlanError> {
7438 let AlterTableAddColumnStatement {
7439 if_exists,
7440 name,
7441 if_col_not_exist,
7442 column_name,
7443 data_type,
7444 } = stmt;
7445 let object_type = ObjectType::Table;
7446
7447 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7448
7449 let (relation_id, item_name, desc) =
7450 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7451 Some(item) => {
7452 let item_name = scx.catalog.resolve_full_name(item.name());
7454 let item = item.at_version(RelationVersionSelector::Latest);
7455 let desc = item.relation_desc().expect("table has desc").into_owned();
7456 (item.id(), item_name, desc)
7457 }
7458 None => {
7459 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7460 name: name.to_ast_string_simple(),
7461 object_type,
7462 });
7463 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7464 }
7465 };
7466
7467 let column_name = ColumnName::from(column_name.as_str());
7468 if desc.get_by_name(&column_name).is_some() {
7469 if if_col_not_exist {
7470 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7471 column_name: column_name.to_string(),
7472 object_name: item_name.item,
7473 });
7474 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7475 } else {
7476 return Err(PlanError::ColumnAlreadyExists {
7477 column_name,
7478 object_name: item_name.item,
7479 });
7480 }
7481 }
7482
7483 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7484 let column_type = scalar_type.nullable(true);
7486 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7488
7489 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7490 relation_id,
7491 column_name,
7492 column_type,
7493 raw_sql_type,
7494 }))
7495}
7496
7497pub fn describe_alter_materialized_view_apply_replacement(
7498 _: &StatementContext,
7499 _: AlterMaterializedViewApplyReplacementStatement,
7500) -> Result<StatementDesc, PlanError> {
7501 Ok(StatementDesc::new(None))
7502}
7503
7504pub fn plan_alter_materialized_view_apply_replacement(
7505 scx: &StatementContext,
7506 stmt: AlterMaterializedViewApplyReplacementStatement,
7507) -> Result<Plan, PlanError> {
7508 let AlterMaterializedViewApplyReplacementStatement {
7509 if_exists,
7510 name,
7511 replacement_name,
7512 } = stmt;
7513
7514 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7515
7516 let object_type = ObjectType::MaterializedView;
7517 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7518 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7519 name: name.to_ast_string_simple(),
7520 object_type,
7521 });
7522 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7523 };
7524
7525 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7526 .expect("if_exists not set");
7527
7528 if replacement.replacement_target() != Some(mv.id()) {
7529 return Err(PlanError::InvalidReplacement {
7530 item_type: mv.item_type(),
7531 item_name: scx.catalog.minimal_qualification(mv.name()),
7532 replacement_type: replacement.item_type(),
7533 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7534 });
7535 }
7536
7537 Ok(Plan::AlterMaterializedViewApplyReplacement(
7538 AlterMaterializedViewApplyReplacementPlan {
7539 id: mv.id(),
7540 replacement_id: replacement.id(),
7541 },
7542 ))
7543}
7544
7545pub fn describe_comment(
7546 _: &StatementContext,
7547 _: CommentStatement<Aug>,
7548) -> Result<StatementDesc, PlanError> {
7549 Ok(StatementDesc::new(None))
7550}
7551
7552pub fn plan_comment(
7553 scx: &mut StatementContext,
7554 stmt: CommentStatement<Aug>,
7555) -> Result<Plan, PlanError> {
7556 const MAX_COMMENT_LENGTH: usize = 1024;
7557
7558 let CommentStatement { object, comment } = stmt;
7559
7560 if let Some(c) = &comment {
7562 if c.len() > 1024 {
7563 return Err(PlanError::CommentTooLong {
7564 length: c.len(),
7565 max_size: MAX_COMMENT_LENGTH,
7566 });
7567 }
7568 }
7569
7570 let (object_id, column_pos) = match &object {
7571 com_ty @ CommentObjectType::Table { name }
7572 | com_ty @ CommentObjectType::View { name }
7573 | com_ty @ CommentObjectType::MaterializedView { name }
7574 | com_ty @ CommentObjectType::Index { name }
7575 | com_ty @ CommentObjectType::Func { name }
7576 | com_ty @ CommentObjectType::Connection { name }
7577 | com_ty @ CommentObjectType::Source { name }
7578 | com_ty @ CommentObjectType::Sink { name }
7579 | com_ty @ CommentObjectType::Secret { name }
7580 | com_ty @ CommentObjectType::ContinualTask { name } => {
7581 let item = scx.get_item_by_resolved_name(name)?;
7582 match (com_ty, item.item_type()) {
7583 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7584 (CommentObjectId::Table(item.id()), None)
7585 }
7586 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7587 (CommentObjectId::View(item.id()), None)
7588 }
7589 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7590 (CommentObjectId::MaterializedView(item.id()), None)
7591 }
7592 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7593 (CommentObjectId::Index(item.id()), None)
7594 }
7595 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7596 (CommentObjectId::Func(item.id()), None)
7597 }
7598 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7599 (CommentObjectId::Connection(item.id()), None)
7600 }
7601 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7602 (CommentObjectId::Source(item.id()), None)
7603 }
7604 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7605 (CommentObjectId::Sink(item.id()), None)
7606 }
7607 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7608 (CommentObjectId::Secret(item.id()), None)
7609 }
7610 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7611 (CommentObjectId::ContinualTask(item.id()), None)
7612 }
7613 (com_ty, cat_ty) => {
7614 let expected_type = match com_ty {
7615 CommentObjectType::Table { .. } => ObjectType::Table,
7616 CommentObjectType::View { .. } => ObjectType::View,
7617 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7618 CommentObjectType::Index { .. } => ObjectType::Index,
7619 CommentObjectType::Func { .. } => ObjectType::Func,
7620 CommentObjectType::Connection { .. } => ObjectType::Connection,
7621 CommentObjectType::Source { .. } => ObjectType::Source,
7622 CommentObjectType::Sink { .. } => ObjectType::Sink,
7623 CommentObjectType::Secret { .. } => ObjectType::Secret,
7624 _ => unreachable!("these are the only types we match on"),
7625 };
7626
7627 return Err(PlanError::InvalidObjectType {
7628 expected_type: SystemObjectType::Object(expected_type),
7629 actual_type: SystemObjectType::Object(cat_ty.into()),
7630 object_name: item.name().item.clone(),
7631 });
7632 }
7633 }
7634 }
7635 CommentObjectType::Type { ty } => match ty {
7636 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7637 sql_bail!("cannot comment on anonymous list or map type");
7638 }
7639 ResolvedDataType::Named { id, modifiers, .. } => {
7640 if !modifiers.is_empty() {
7641 sql_bail!("cannot comment on type with modifiers");
7642 }
7643 (CommentObjectId::Type(*id), None)
7644 }
7645 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7646 },
7647 CommentObjectType::Column { name } => {
7648 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7649 match item.item_type() {
7650 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7651 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7652 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7653 CatalogItemType::MaterializedView => {
7654 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7655 }
7656 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7657 r => {
7658 return Err(PlanError::Unsupported {
7659 feature: format!("Specifying comments on a column of {r}"),
7660 discussion_no: None,
7661 });
7662 }
7663 }
7664 }
7665 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7666 CommentObjectType::Database { name } => {
7667 (CommentObjectId::Database(*name.database_id()), None)
7668 }
7669 CommentObjectType::Schema { name } => {
7670 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7674 sql_bail!(
7675 "cannot comment on schema {} because it is a temporary schema",
7676 mz_repr::namespaces::MZ_TEMP_SCHEMA
7677 );
7678 }
7679 (
7680 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7681 None,
7682 )
7683 }
7684 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7685 CommentObjectType::ClusterReplica { name } => {
7686 let replica = scx.catalog.resolve_cluster_replica(name)?;
7687 (
7688 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7689 None,
7690 )
7691 }
7692 CommentObjectType::NetworkPolicy { name } => {
7693 (CommentObjectId::NetworkPolicy(name.id), None)
7694 }
7695 };
7696
7697 if let Some(p) = column_pos {
7703 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7704 max_num_columns: MAX_NUM_COLUMNS,
7705 req_num_columns: p,
7706 })?;
7707 }
7708
7709 Ok(Plan::Comment(CommentPlan {
7710 object_id,
7711 sub_component: column_pos,
7712 comment,
7713 }))
7714}
7715
7716pub(crate) fn resolve_cluster<'a>(
7717 scx: &'a StatementContext,
7718 name: &'a Ident,
7719 if_exists: bool,
7720) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7721 match scx.resolve_cluster(Some(name)) {
7722 Ok(cluster) => Ok(Some(cluster)),
7723 Err(_) if if_exists => Ok(None),
7724 Err(e) => Err(e),
7725 }
7726}
7727
7728pub(crate) fn resolve_cluster_replica<'a>(
7729 scx: &'a StatementContext,
7730 name: &QualifiedReplica,
7731 if_exists: bool,
7732) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7733 match scx.resolve_cluster(Some(&name.cluster)) {
7734 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7735 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7736 None if if_exists => Ok(None),
7737 None => Err(sql_err!(
7738 "CLUSTER {} has no CLUSTER REPLICA named {}",
7739 cluster.name(),
7740 name.replica.as_str().quoted(),
7741 )),
7742 },
7743 Err(_) if if_exists => Ok(None),
7744 Err(e) => Err(e),
7745 }
7746}
7747
7748pub(crate) fn resolve_database<'a>(
7749 scx: &'a StatementContext,
7750 name: &'a UnresolvedDatabaseName,
7751 if_exists: bool,
7752) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7753 match scx.resolve_database(name) {
7754 Ok(database) => Ok(Some(database)),
7755 Err(_) if if_exists => Ok(None),
7756 Err(e) => Err(e),
7757 }
7758}
7759
7760pub(crate) fn resolve_schema<'a>(
7761 scx: &'a StatementContext,
7762 name: UnresolvedSchemaName,
7763 if_exists: bool,
7764) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7765 match scx.resolve_schema(name) {
7766 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7767 Err(_) if if_exists => Ok(None),
7768 Err(e) => Err(e),
7769 }
7770}
7771
7772pub(crate) fn resolve_network_policy<'a>(
7773 scx: &'a StatementContext,
7774 name: Ident,
7775 if_exists: bool,
7776) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7777 match scx.catalog.resolve_network_policy(&name.to_string()) {
7778 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7779 id: policy.id(),
7780 name: policy.name().to_string(),
7781 })),
7782 Err(_) if if_exists => Ok(None),
7783 Err(e) => Err(e.into()),
7784 }
7785}
7786
7787pub(crate) fn resolve_item_or_type<'a>(
7788 scx: &'a StatementContext,
7789 object_type: ObjectType,
7790 name: UnresolvedItemName,
7791 if_exists: bool,
7792) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7793 let name = normalize::unresolved_item_name(name)?;
7794 let catalog_item = match object_type {
7795 ObjectType::Type => scx.catalog.resolve_type(&name),
7796 _ => scx.catalog.resolve_item(&name),
7797 };
7798
7799 match catalog_item {
7800 Ok(item) => {
7801 let is_type = ObjectType::from(item.item_type());
7802 if object_type == is_type {
7803 Ok(Some(item))
7804 } else {
7805 Err(PlanError::MismatchedObjectType {
7806 name: scx.catalog.minimal_qualification(item.name()),
7807 is_type,
7808 expected_type: object_type,
7809 })
7810 }
7811 }
7812 Err(_) if if_exists => Ok(None),
7813 Err(e) => Err(e.into()),
7814 }
7815}
7816
7817fn ensure_cluster_is_not_managed(
7819 scx: &StatementContext,
7820 cluster_id: ClusterId,
7821) -> Result<(), PlanError> {
7822 let cluster = scx.catalog.get_cluster(cluster_id);
7823 if cluster.is_managed() {
7824 Err(PlanError::ManagedCluster {
7825 cluster_name: cluster.name().to_string(),
7826 })
7827 } else {
7828 Ok(())
7829 }
7830}