1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155 ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156 ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157 CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158 CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165 WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 duration
906 }
907 None => scx.catalog.config().timestamp_interval,
908 };
909
910 let (desc, data_source) = match progress_subsource {
911 Some(name) => {
912 let DeferredItemName::Named(name) = name else {
913 sql_bail!("[internal error] progress subsource must be named during purification");
914 };
915 let ResolvedItemName::Item { id, .. } = name else {
916 sql_bail!("[internal error] invalid target id");
917 };
918
919 let details = match external_connection {
920 GenericSourceConnection::Kafka(ref c) => {
921 SourceExportDetails::Kafka(KafkaSourceExportDetails {
922 metadata_columns: c.metadata_columns.clone(),
923 })
924 }
925 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926 LoadGenerator::Auction
927 | LoadGenerator::Marketing
928 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929 LoadGenerator::Counter { .. }
930 | LoadGenerator::Clock
931 | LoadGenerator::Datums
932 | LoadGenerator::KeyValue(_) => {
933 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934 output: LoadGeneratorOutput::Default,
935 })
936 }
937 },
938 GenericSourceConnection::Postgres(_)
939 | GenericSourceConnection::MySql(_)
940 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941 };
942
943 let data_source = DataSourceDesc::OldSyntaxIngestion {
944 desc: SourceDesc {
945 connection: external_connection,
946 timestamp_interval,
947 },
948 progress_subsource: *id,
949 data_config: SourceExportDataConfig {
950 encoding,
951 envelope: envelope.clone(),
952 },
953 details,
954 };
955 (desc, data_source)
956 }
957 None => {
958 let desc = external_connection.timestamp_desc();
959 let data_source = DataSourceDesc::Ingestion(SourceDesc {
960 connection: external_connection,
961 timestamp_interval,
962 });
963 (desc, data_source)
964 }
965 };
966
967 let if_not_exists = *if_not_exists;
968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970 let full_name = scx.catalog.resolve_full_name(&name);
972 let partial_name = PartialItemName::from(full_name.clone());
973 if let (false, Ok(item)) = (
976 if_not_exists,
977 scx.catalog.resolve_item_or_type(&partial_name),
978 ) {
979 return Err(PlanError::ItemAlreadyExists {
980 name: full_name.to_string(),
981 item_type: item.item_type(),
982 });
983 }
984
985 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992 let timeline = match envelope {
994 SourceEnvelope::CdcV2 => {
995 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996 }
997 _ => Timeline::EpochMilliseconds,
998 };
999
1000 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002 let source = Source {
1003 create_sql,
1004 data_source,
1005 desc,
1006 compaction_window,
1007 };
1008
1009 Ok(Plan::CreateSource(CreateSourcePlan {
1010 name,
1011 source,
1012 if_not_exists,
1013 timeline,
1014 in_cluster: Some(in_cluster.id()),
1015 }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019 scx: &StatementContext<'_>,
1020 source_connection: &CreateSourceConnection<Aug>,
1021 include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023 Ok(match source_connection {
1024 CreateSourceConnection::Kafka {
1025 connection,
1026 options,
1027 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028 scx,
1029 connection,
1030 options,
1031 include_metadata,
1032 )?),
1033 CreateSourceConnection::Postgres {
1034 connection,
1035 options,
1036 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037 scx, connection, options,
1038 )?),
1039 CreateSourceConnection::SqlServer {
1040 connection,
1041 options,
1042 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043 scx, connection, options,
1044 )?),
1045 CreateSourceConnection::MySql {
1046 connection,
1047 options,
1048 } => {
1049 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053 scx,
1054 generator,
1055 options,
1056 include_metadata,
1057 )?)
1058 }
1059 })
1060}
1061
1062fn plan_load_generator_source_connection(
1063 scx: &StatementContext<'_>,
1064 generator: &ast::LoadGenerator,
1065 options: &Vec<LoadGeneratorOption<Aug>>,
1066 include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068 let load_generator =
1069 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070 let LoadGeneratorOptionExtracted {
1071 tick_interval,
1072 as_of,
1073 up_to,
1074 ..
1075 } = options.clone().try_into()?;
1076 let tick_micros = match tick_interval {
1077 Some(interval) => Some(interval.as_micros().try_into()?),
1078 None => None,
1079 };
1080 if up_to < as_of {
1081 sql_bail!("UP TO cannot be less than AS OF");
1082 }
1083 Ok(LoadGeneratorSourceConnection {
1084 load_generator,
1085 tick_micros,
1086 as_of,
1087 up_to,
1088 })
1089}
1090
1091fn plan_mysql_source_connection(
1092 scx: &StatementContext<'_>,
1093 connection: &ResolvedItemName,
1094 options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096 let connection_item = scx.get_item_by_resolved_name(connection)?;
1097 match connection_item.connection()? {
1098 Connection::MySql(connection) => connection,
1099 _ => sql_bail!(
1100 "{} is not a MySQL connection",
1101 scx.catalog.resolve_full_name(connection_item.name())
1102 ),
1103 };
1104 let MySqlConfigOptionExtracted {
1105 details,
1106 text_columns: _,
1110 exclude_columns: _,
1111 seen: _,
1112 } = options.clone().try_into()?;
1113 let details = details
1114 .as_ref()
1115 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119 Ok(MySqlSourceConnection {
1120 connection: connection_item.id(),
1121 connection_id: connection_item.id(),
1122 details,
1123 })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127 scx: &StatementContext<'_>,
1128 connection: &ResolvedItemName,
1129 options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131 let connection_item = scx.get_item_by_resolved_name(connection)?;
1132 match connection_item.connection()? {
1133 Connection::SqlServer(connection) => connection,
1134 _ => sql_bail!(
1135 "{} is not a SQL Server connection",
1136 scx.catalog.resolve_full_name(connection_item.name())
1137 ),
1138 };
1139 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140 let details = details
1141 .as_ref()
1142 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143 let extras = hex::decode(details)
1144 .map_err(|e| sql_err!("{e}"))
1145 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147 Ok(SqlServerSourceConnection {
1148 connection_id: connection_item.id(),
1149 connection: connection_item.id(),
1150 extras,
1151 })
1152}
1153
1154fn plan_postgres_source_connection(
1155 scx: &StatementContext<'_>,
1156 connection: &ResolvedItemName,
1157 options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159 let connection_item = scx.get_item_by_resolved_name(connection)?;
1160 let PgConfigOptionExtracted {
1161 details,
1162 publication,
1163 text_columns: _,
1167 exclude_columns: _,
1171 seen: _,
1172 } = options.clone().try_into()?;
1173 let details = details
1174 .as_ref()
1175 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177 let details =
1178 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179 let publication_details =
1180 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181 Ok(PostgresSourceConnection {
1182 connection: connection_item.id(),
1183 connection_id: connection_item.id(),
1184 publication: publication.expect("validated exists during purification"),
1185 publication_details,
1186 })
1187}
1188
1189fn plan_kafka_source_connection(
1190 scx: &StatementContext<'_>,
1191 connection_name: &ResolvedItemName,
1192 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193 include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197 sql_bail!(
1198 "{} is not a kafka connection",
1199 scx.catalog.resolve_full_name(connection_item.name())
1200 )
1201 }
1202 let KafkaSourceConfigOptionExtracted {
1203 group_id_prefix,
1204 topic,
1205 topic_metadata_refresh_interval,
1206 start_timestamp: _, start_offset,
1208 seen: _,
1209 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210 let topic = topic.expect("validated exists during purification");
1211 let mut start_offsets = BTreeMap::new();
1212 if let Some(offsets) = start_offset {
1213 for (part, offset) in offsets.iter().enumerate() {
1214 if *offset < 0 {
1215 sql_bail!("START OFFSET must be a nonnegative integer");
1216 }
1217 start_offsets.insert(i32::try_from(part)?, *offset);
1218 }
1219 }
1220 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224 }
1225 let metadata_columns = include_metadata
1226 .into_iter()
1227 .flat_map(|item| match item {
1228 SourceIncludeMetadata::Timestamp { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "timestamp".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Timestamp))
1234 }
1235 SourceIncludeMetadata::Partition { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "partition".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Partition))
1241 }
1242 SourceIncludeMetadata::Offset { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "offset".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Offset))
1248 }
1249 SourceIncludeMetadata::Headers { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "headers".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Headers))
1255 }
1256 SourceIncludeMetadata::Header {
1257 alias,
1258 key,
1259 use_bytes,
1260 } => Some((
1261 alias.to_string(),
1262 KafkaMetadataKind::Header {
1263 key: key.clone(),
1264 use_bytes: *use_bytes,
1265 },
1266 )),
1267 SourceIncludeMetadata::Key { .. } => {
1268 None
1270 }
1271 })
1272 .collect();
1273 Ok(KafkaSourceConnection {
1274 connection: connection_item.id(),
1275 connection_id: connection_item.id(),
1276 topic,
1277 start_offsets,
1278 group_id_prefix,
1279 topic_metadata_refresh_interval,
1280 metadata_columns,
1281 })
1282}
1283
1284fn apply_source_envelope_encoding(
1285 scx: &StatementContext,
1286 envelope: &ast::SourceEnvelope,
1287 format: &Option<FormatSpecifier<Aug>>,
1288 key_desc: Option<RelationDesc>,
1289 value_desc: RelationDesc,
1290 include_metadata: &[SourceIncludeMetadata],
1291 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292 source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294 (
1295 RelationDesc,
1296 SourceEnvelope,
1297 Option<SourceDataEncoding<ReferencedConnection>>,
1298 ),
1299 PlanError,
1300> {
1301 let encoding = match format {
1302 Some(format) => Some(get_encoding(scx, format, envelope)?),
1303 None => None,
1304 };
1305
1306 let (key_desc, value_desc) = match &encoding {
1307 Some(encoding) => {
1308 match value_desc.typ().columns() {
1311 [typ] => match typ.scalar_type {
1312 SqlScalarType::Bytes => {}
1313 _ => sql_bail!(
1314 "The schema produced by the source is incompatible with format decoding"
1315 ),
1316 },
1317 _ => sql_bail!(
1318 "The schema produced by the source is incompatible with format decoding"
1319 ),
1320 }
1321
1322 let (key_desc, value_desc) = encoding.desc()?;
1323
1324 let key_desc = key_desc.map(|desc| {
1331 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333 if is_kafka && is_envelope_none {
1334 RelationDesc::from_names_and_types(
1335 desc.into_iter()
1336 .map(|(name, typ)| (name, typ.nullable(true))),
1337 )
1338 } else {
1339 desc
1340 }
1341 });
1342 (key_desc, value_desc)
1343 }
1344 None => (key_desc, value_desc),
1345 };
1346
1347 let key_envelope_no_encoding = matches!(
1360 source_connection,
1361 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362 load_generator: LoadGenerator::KeyValue(_),
1363 ..
1364 })
1365 );
1366 let mut key_envelope = get_key_envelope(
1367 include_metadata,
1368 encoding.as_ref(),
1369 key_envelope_no_encoding,
1370 )?;
1371
1372 match (&envelope, &key_envelope) {
1373 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376 ),
1377 _ => {}
1378 };
1379
1380 let envelope = match &envelope {
1389 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391 ast::SourceEnvelope::Debezium => {
1392 let after_idx = match typecheck_debezium(&value_desc) {
1394 Ok((_before_idx, after_idx)) => Ok(after_idx),
1395 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396 Some(DataEncoding::Avro(_)) => Err(type_err),
1397 _ => Err(sql_err!(
1398 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399 )),
1400 },
1401 }?;
1402
1403 UnplannedSourceEnvelope::Upsert {
1404 style: UpsertStyle::Debezium { after_idx },
1405 }
1406 }
1407 ast::SourceEnvelope::Upsert {
1408 value_decode_err_policy,
1409 } => {
1410 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411 None => {
1412 if !key_envelope_no_encoding {
1413 bail_unsupported!(format!(
1414 "UPSERT requires a key/value format: {:?}",
1415 format
1416 ))
1417 }
1418 None
1419 }
1420 Some(key_encoding) => Some(key_encoding),
1421 };
1422 if key_envelope == KeyEnvelope::None {
1425 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426 }
1427 let style = match value_decode_err_policy.as_slice() {
1429 [] => UpsertStyle::Default(key_envelope),
1430 [SourceErrorPolicy::Inline { alias }] => {
1431 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432 UpsertStyle::ValueErrInline {
1433 key_envelope,
1434 error_column: alias
1435 .as_ref()
1436 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437 }
1438 }
1439 _ => {
1440 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441 }
1442 };
1443
1444 UnplannedSourceEnvelope::Upsert { style }
1445 }
1446 ast::SourceEnvelope::CdcV2 => {
1447 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448 match format {
1450 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452 }
1453 UnplannedSourceEnvelope::CdcV2
1454 }
1455 };
1456
1457 let metadata_desc = included_column_desc(metadata_columns_desc);
1458 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460 Ok((desc, envelope, encoding))
1461}
1462
1463fn plan_source_export_desc(
1466 scx: &StatementContext,
1467 name: &UnresolvedItemName,
1468 columns: &Vec<ColumnDef<Aug>>,
1469 constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471 let names: Vec<_> = columns
1472 .iter()
1473 .map(|c| normalize::column_name(c.name.clone()))
1474 .collect();
1475
1476 if let Some(dup) = names.iter().duplicates().next() {
1477 sql_bail!("column {} specified more than once", dup.quoted());
1478 }
1479
1480 let mut column_types = Vec::with_capacity(columns.len());
1483 let mut keys = Vec::new();
1484
1485 for (i, c) in columns.into_iter().enumerate() {
1486 let aug_data_type = &c.data_type;
1487 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488 let mut nullable = true;
1489 for option in &c.options {
1490 match &option.option {
1491 ColumnOption::NotNull => nullable = false,
1492 ColumnOption::Default(_) => {
1493 bail_unsupported!("Source export with default value")
1494 }
1495 ColumnOption::Unique { is_primary } => {
1496 keys.push(vec![i]);
1497 if *is_primary {
1498 nullable = false;
1499 }
1500 }
1501 other => {
1502 bail_unsupported!(format!("Source export with column constraint: {}", other))
1503 }
1504 }
1505 }
1506 column_types.push(ty.nullable(nullable));
1507 }
1508
1509 let mut seen_primary = false;
1510 'c: for constraint in constraints {
1511 match constraint {
1512 TableConstraint::Unique {
1513 name: _,
1514 columns,
1515 is_primary,
1516 nulls_not_distinct,
1517 } => {
1518 if seen_primary && *is_primary {
1519 sql_bail!(
1520 "multiple primary keys for source export {} are not allowed",
1521 name.to_ast_string_stable()
1522 );
1523 }
1524 seen_primary = *is_primary || seen_primary;
1525
1526 let mut key = vec![];
1527 for column in columns {
1528 let column = normalize::column_name(column.clone());
1529 match names.iter().position(|name| *name == column) {
1530 None => sql_bail!("unknown column in constraint: {}", column),
1531 Some(i) => {
1532 let nullable = &mut column_types[i].nullable;
1533 if *is_primary {
1534 if *nulls_not_distinct {
1535 sql_bail!(
1536 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537 );
1538 }
1539 *nullable = false;
1540 } else if !(*nulls_not_distinct || !*nullable) {
1541 break 'c;
1544 }
1545
1546 key.push(i);
1547 }
1548 }
1549 }
1550
1551 if *is_primary {
1552 keys.insert(0, key);
1553 } else {
1554 keys.push(key);
1555 }
1556 }
1557 TableConstraint::ForeignKey { .. } => {
1558 bail_unsupported!("Source export with a foreign key")
1559 }
1560 TableConstraint::Check { .. } => {
1561 bail_unsupported!("Source export with a check constraint")
1562 }
1563 }
1564 }
1565
1566 let typ = SqlRelationType::new(column_types).with_keys(keys);
1567 let desc = RelationDesc::new(typ, names);
1568 Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572 CreateSubsourceOption,
1573 (Progress, bool, Default(false)),
1574 (ExternalReference, UnresolvedItemName),
1575 (RetainHistory, OptionalDuration),
1576 (TextColumns, Vec::<Ident>, Default(vec![])),
1577 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578 (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582 scx: &StatementContext,
1583 stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585 let CreateSubsourceStatement {
1586 name,
1587 columns,
1588 of_source,
1589 constraints,
1590 if_not_exists,
1591 with_options,
1592 } = &stmt;
1593
1594 let CreateSubsourceOptionExtracted {
1595 progress,
1596 retain_history,
1597 external_reference,
1598 text_columns,
1599 exclude_columns,
1600 details,
1601 seen: _,
1602 } = with_options.clone().try_into()?;
1603
1604 assert!(
1609 progress ^ (external_reference.is_some() && of_source.is_some()),
1610 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611 );
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.unwrap();
1630
1631 let details = details
1634 .as_ref()
1635 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637 let details =
1638 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641 let details = match details {
1642 SourceExportStatementDetails::Postgres { table } => {
1643 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644 column_casts: crate::pure::postgres::generate_column_casts(
1645 scx,
1646 &table,
1647 &text_columns,
1648 )?,
1649 table,
1650 })
1651 }
1652 SourceExportStatementDetails::MySql {
1653 table,
1654 initial_gtid_set,
1655 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656 table,
1657 initial_gtid_set,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::SqlServer {
1665 table,
1666 capture_instance,
1667 initial_lsn,
1668 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669 capture_instance,
1670 table,
1671 initial_lsn,
1672 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673 exclude_columns: exclude_columns
1674 .into_iter()
1675 .map(|c| c.into_string())
1676 .collect(),
1677 }),
1678 SourceExportStatementDetails::LoadGenerator { output } => {
1679 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680 }
1681 SourceExportStatementDetails::Kafka {} => {
1682 bail_unsupported!("subsources cannot reference Kafka sources")
1683 }
1684 };
1685 DataSourceDesc::IngestionExport {
1686 ingestion_id,
1687 external_reference,
1688 details,
1689 data_config: SourceExportDataConfig {
1691 envelope: SourceEnvelope::None(NoneEnvelope {
1692 key_envelope: KeyEnvelope::None,
1693 key_arity: 0,
1694 }),
1695 encoding: None,
1696 },
1697 }
1698 } else if progress {
1699 DataSourceDesc::Progress
1700 } else {
1701 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702 };
1703
1704 let if_not_exists = *if_not_exists;
1705 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710 let source = Source {
1711 create_sql,
1712 data_source,
1713 desc,
1714 compaction_window,
1715 };
1716
1717 Ok(Plan::CreateSource(CreateSourcePlan {
1718 name,
1719 source,
1720 if_not_exists,
1721 timeline: Timeline::EpochMilliseconds,
1722 in_cluster: None,
1723 }))
1724}
1725
1726generate_extracted_config!(
1727 TableFromSourceOption,
1728 (TextColumns, Vec::<Ident>, Default(vec![])),
1729 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730 (PartitionBy, Vec<Ident>),
1731 (RetainHistory, OptionalDuration),
1732 (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736 scx: &StatementContext,
1737 stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739 if !scx.catalog.system_vars().enable_create_table_from_source() {
1740 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741 }
1742
1743 let CreateTableFromSourceStatement {
1744 name,
1745 columns,
1746 constraints,
1747 if_not_exists,
1748 source,
1749 external_reference,
1750 envelope,
1751 format,
1752 include_metadata,
1753 with_options,
1754 } = &stmt;
1755
1756 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758 let TableFromSourceOptionExtracted {
1759 text_columns,
1760 exclude_columns,
1761 retain_history,
1762 partition_by,
1763 details,
1764 seen: _,
1765 } = with_options.clone().try_into()?;
1766
1767 let source_item = scx.get_item_by_resolved_name(source)?;
1768 let ingestion_id = source_item.id();
1769
1770 let details = details
1773 .as_ref()
1774 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776 let details =
1777 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778 let details =
1779 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782 && include_metadata
1783 .iter()
1784 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785 {
1786 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788 }
1789 if !matches!(
1790 details,
1791 SourceExportStatementDetails::Kafka { .. }
1792 | SourceExportStatementDetails::LoadGenerator { .. }
1793 ) && !include_metadata.is_empty()
1794 {
1795 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796 }
1797
1798 let details = match details {
1799 SourceExportStatementDetails::Postgres { table } => {
1800 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801 column_casts: crate::pure::postgres::generate_column_casts(
1802 scx,
1803 &table,
1804 &text_columns,
1805 )?,
1806 table,
1807 })
1808 }
1809 SourceExportStatementDetails::MySql {
1810 table,
1811 initial_gtid_set,
1812 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813 table,
1814 initial_gtid_set,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::SqlServer {
1822 table,
1823 capture_instance,
1824 initial_lsn,
1825 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826 table,
1827 capture_instance,
1828 initial_lsn,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 }),
1835 SourceExportStatementDetails::LoadGenerator { output } => {
1836 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837 }
1838 SourceExportStatementDetails::Kafka {} => {
1839 if !include_metadata.is_empty()
1840 && !matches!(
1841 envelope,
1842 ast::SourceEnvelope::Upsert { .. }
1843 | ast::SourceEnvelope::None
1844 | ast::SourceEnvelope::Debezium
1845 )
1846 {
1847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849 }
1850
1851 let metadata_columns = include_metadata
1852 .into_iter()
1853 .flat_map(|item| match item {
1854 SourceIncludeMetadata::Timestamp { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "timestamp".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Timestamp))
1860 }
1861 SourceIncludeMetadata::Partition { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "partition".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Partition))
1867 }
1868 SourceIncludeMetadata::Offset { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "offset".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Offset))
1874 }
1875 SourceIncludeMetadata::Headers { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "headers".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Headers))
1881 }
1882 SourceIncludeMetadata::Header {
1883 alias,
1884 key,
1885 use_bytes,
1886 } => Some((
1887 alias.to_string(),
1888 KafkaMetadataKind::Header {
1889 key: key.clone(),
1890 use_bytes: *use_bytes,
1891 },
1892 )),
1893 SourceIncludeMetadata::Key { .. } => {
1894 None
1896 }
1897 })
1898 .collect();
1899
1900 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901 }
1902 };
1903
1904 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906 let (key_desc, value_desc) =
1911 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912 let columns = match columns {
1913 TableFromSourceColumns::Defined(columns) => columns,
1914 _ => unreachable!(),
1915 };
1916 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917 (None, desc)
1918 } else {
1919 let key_desc = source_connection.default_key_desc();
1920 let value_desc = source_connection.default_value_desc();
1921 (Some(key_desc), value_desc)
1922 };
1923
1924 let metadata_columns_desc = match &details {
1925 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926 metadata_columns, ..
1927 }) => kafka_metadata_columns_desc(metadata_columns),
1928 _ => vec![],
1929 };
1930
1931 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932 scx,
1933 &envelope,
1934 format,
1935 key_desc,
1936 value_desc,
1937 include_metadata,
1938 metadata_columns_desc,
1939 source_connection,
1940 )?;
1941 if let TableFromSourceColumns::Named(col_names) = columns {
1942 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943 }
1944
1945 let names: Vec<_> = desc.iter_names().cloned().collect();
1946 if let Some(dup) = names.iter().duplicates().next() {
1947 sql_bail!("column {} specified more than once", dup.quoted());
1948 }
1949
1950 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952 let timeline = match envelope {
1955 SourceEnvelope::CdcV2 => {
1956 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957 }
1958 _ => Timeline::EpochMilliseconds,
1959 };
1960
1961 if let Some(partition_by) = partition_by {
1962 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963 check_partition_by(&desc, partition_by)?;
1964 }
1965
1966 let data_source = DataSourceDesc::IngestionExport {
1967 ingestion_id,
1968 external_reference: external_reference
1969 .as_ref()
1970 .expect("populated in purification")
1971 .clone(),
1972 details,
1973 data_config: SourceExportDataConfig { envelope, encoding },
1974 };
1975
1976 let if_not_exists = *if_not_exists;
1977
1978 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981 let table = Table {
1982 create_sql,
1983 desc: VersionedRelationDesc::new(desc),
1984 temporary: false,
1985 compaction_window,
1986 data_source: TableDataSource::DataSource {
1987 desc: data_source,
1988 timeline,
1989 },
1990 };
1991
1992 Ok(Plan::CreateTable(CreateTablePlan {
1993 name,
1994 table,
1995 if_not_exists,
1996 }))
1997}
1998
1999generate_extracted_config!(
2000 LoadGeneratorOption,
2001 (TickInterval, Duration),
2002 (AsOf, u64, Default(0_u64)),
2003 (UpTo, u64, Default(u64::MAX)),
2004 (ScaleFactor, f64),
2005 (MaxCardinality, u64),
2006 (Keys, u64),
2007 (SnapshotRounds, u64),
2008 (TransactionalSnapshot, bool),
2009 (ValueSize, u64),
2010 (Seed, u64),
2011 (Partitions, u64),
2012 (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016 pub(super) fn ensure_only_valid_options(
2017 &self,
2018 loadgen: &ast::LoadGenerator,
2019 ) -> Result<(), PlanError> {
2020 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022 let mut options = self.seen.clone();
2023
2024 let permitted_options: &[_] = match loadgen {
2025 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031 ast::LoadGenerator::KeyValue => &[
2032 TickInterval,
2033 Keys,
2034 SnapshotRounds,
2035 TransactionalSnapshot,
2036 ValueSize,
2037 Seed,
2038 Partitions,
2039 BatchSize,
2040 ],
2041 };
2042
2043 for o in permitted_options {
2044 options.remove(o);
2045 }
2046
2047 if !options.is_empty() {
2048 sql_bail!(
2049 "{} load generators do not support {} values",
2050 loadgen,
2051 options.iter().join(", ")
2052 )
2053 }
2054
2055 Ok(())
2056 }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060 scx: &StatementContext,
2061 loadgen: &ast::LoadGenerator,
2062 options: &[LoadGeneratorOption<Aug>],
2063 include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066 extracted.ensure_only_valid_options(loadgen)?;
2067
2068 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070 }
2071
2072 let load_generator = match loadgen {
2073 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074 ast::LoadGenerator::Clock => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076 LoadGenerator::Clock
2077 }
2078 ast::LoadGenerator::Counter => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080 let LoadGeneratorOptionExtracted {
2081 max_cardinality, ..
2082 } = extracted;
2083 LoadGenerator::Counter { max_cardinality }
2084 }
2085 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086 ast::LoadGenerator::Datums => {
2087 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088 LoadGenerator::Datums
2089 }
2090 ast::LoadGenerator::Tpch => {
2091 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093 let sf: f64 = scale_factor.unwrap_or(0.01);
2095 if !sf.is_finite() || sf < 0.0 {
2096 sql_bail!("unsupported scale factor {sf}");
2097 }
2098
2099 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100 let total = (sf * multiplier).floor();
2101 let mut i = i64::try_cast_from(total)
2102 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103 if i < 1 {
2104 i = 1;
2105 }
2106 Ok(i)
2107 };
2108
2109 let count_supplier = f_to_i(10_000f64)?;
2112 let count_part = f_to_i(200_000f64)?;
2113 let count_customer = f_to_i(150_000f64)?;
2114 let count_orders = f_to_i(150_000f64 * 10f64)?;
2115 let count_clerk = f_to_i(1_000f64)?;
2116
2117 LoadGenerator::Tpch {
2118 count_supplier,
2119 count_part,
2120 count_customer,
2121 count_orders,
2122 count_clerk,
2123 }
2124 }
2125 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127 let LoadGeneratorOptionExtracted {
2128 keys,
2129 snapshot_rounds,
2130 transactional_snapshot,
2131 value_size,
2132 tick_interval,
2133 seed,
2134 partitions,
2135 batch_size,
2136 ..
2137 } = extracted;
2138
2139 let mut include_offset = None;
2140 for im in include_metadata {
2141 match im {
2142 SourceIncludeMetadata::Offset { alias } => {
2143 include_offset = match alias {
2144 Some(alias) => Some(alias.to_string()),
2145 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146 }
2147 }
2148 SourceIncludeMetadata::Key { .. } => continue,
2149
2150 _ => {
2151 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152 }
2153 };
2154 }
2155
2156 let lgkv = KeyValueLoadGenerator {
2157 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158 snapshot_rounds: snapshot_rounds
2159 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162 value_size: value_size
2163 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164 partitions: partitions
2165 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166 tick_interval,
2167 batch_size: batch_size
2168 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170 include_offset,
2171 };
2172
2173 if lgkv.keys == 0
2174 || lgkv.partitions == 0
2175 || lgkv.value_size == 0
2176 || lgkv.batch_size == 0
2177 {
2178 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179 }
2180
2181 if lgkv.keys % lgkv.partitions != 0 {
2182 sql_bail!("KEYS must be a multiple of PARTITIONS")
2183 }
2184
2185 if lgkv.batch_size > lgkv.keys {
2186 sql_bail!("KEYS must be larger than BATCH SIZE")
2187 }
2188
2189 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193 }
2194
2195 if lgkv.snapshot_rounds == 0 {
2196 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197 }
2198
2199 LoadGenerator::KeyValue(lgkv)
2200 }
2201 };
2202
2203 Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207 let before = value_desc.get_by_name(&"before".into());
2208 let (after_idx, after_ty) = value_desc
2209 .get_by_name(&"after".into())
2210 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211 let before_idx = if let Some((before_idx, before_ty)) = before {
2212 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213 sql_bail!("'before' column must be of type record");
2214 }
2215 if before_ty != after_ty {
2216 sql_bail!("'before' type differs from 'after' column");
2217 }
2218 Some(before_idx)
2219 } else {
2220 None
2221 };
2222 Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226 scx: &StatementContext,
2227 format: &FormatSpecifier<Aug>,
2228 envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230 let encoding = match format {
2231 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232 FormatSpecifier::KeyValue { key, value } => {
2233 let key = {
2234 let encoding = get_encoding_inner(scx, key)?;
2235 Some(encoding.key.unwrap_or(encoding.value))
2236 };
2237 let value = get_encoding_inner(scx, value)?.value;
2238 SourceDataEncoding { key, value }
2239 }
2240 };
2241
2242 let requires_keyvalue = matches!(
2243 envelope,
2244 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245 );
2246 let is_keyvalue = encoding.key.is_some();
2247 if requires_keyvalue && !is_keyvalue {
2248 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249 };
2250
2251 Ok(encoding)
2252}
2253
2254fn source_sink_cluster_config<'a, 'ctx>(
2260 scx: &'a StatementContext<'ctx>,
2261 in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263 let cluster = match in_cluster {
2264 None => {
2265 let cluster = scx.catalog.resolve_cluster(None)?;
2266 *in_cluster = Some(ResolvedClusterName {
2267 id: cluster.id(),
2268 print_name: None,
2269 });
2270 cluster
2271 }
2272 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273 };
2274
2275 Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282 pub key_schema: Option<String>,
2283 pub value_schema: String,
2284 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285 pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289 scx: &StatementContext,
2290 format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292 let value = match format {
2293 Format::Bytes => DataEncoding::Bytes,
2294 Format::Avro(schema) => {
2295 let Schema {
2296 key_schema,
2297 value_schema,
2298 csr_connection,
2299 confluent_wire_format,
2300 } = match schema {
2301 AvroSchema::InlineSchema {
2304 schema: ast::Schema { schema },
2305 with_options,
2306 } => {
2307 let AvroSchemaOptionExtracted {
2308 confluent_wire_format,
2309 ..
2310 } = with_options.clone().try_into()?;
2311
2312 Schema {
2313 key_schema: None,
2314 value_schema: schema.clone(),
2315 csr_connection: None,
2316 confluent_wire_format,
2317 }
2318 }
2319 AvroSchema::Csr {
2320 csr_connection:
2321 CsrConnectionAvro {
2322 connection,
2323 seed,
2324 key_strategy: _,
2325 value_strategy: _,
2326 },
2327 } => {
2328 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329 let csr_connection = match item.connection()? {
2330 Connection::Csr(_) => item.id(),
2331 _ => {
2332 sql_bail!(
2333 "{} is not a schema registry connection",
2334 scx.catalog
2335 .resolve_full_name(item.name())
2336 .to_string()
2337 .quoted()
2338 )
2339 }
2340 };
2341
2342 if let Some(seed) = seed {
2343 Schema {
2344 key_schema: seed.key_schema.clone(),
2345 value_schema: seed.value_schema.clone(),
2346 csr_connection: Some(csr_connection),
2347 confluent_wire_format: true,
2348 }
2349 } else {
2350 unreachable!("CSR seed resolution should already have been called: Avro")
2351 }
2352 }
2353 };
2354
2355 if let Some(key_schema) = key_schema {
2356 return Ok(SourceDataEncoding {
2357 key: Some(DataEncoding::Avro(AvroEncoding {
2358 schema: key_schema,
2359 csr_connection: csr_connection.clone(),
2360 confluent_wire_format,
2361 })),
2362 value: DataEncoding::Avro(AvroEncoding {
2363 schema: value_schema,
2364 csr_connection,
2365 confluent_wire_format,
2366 }),
2367 });
2368 } else {
2369 DataEncoding::Avro(AvroEncoding {
2370 schema: value_schema,
2371 csr_connection,
2372 confluent_wire_format,
2373 })
2374 }
2375 }
2376 Format::Protobuf(schema) => match schema {
2377 ProtobufSchema::Csr {
2378 csr_connection:
2379 CsrConnectionProtobuf {
2380 connection:
2381 CsrConnection {
2382 connection,
2383 options,
2384 },
2385 seed,
2386 },
2387 } => {
2388 if let Some(CsrSeedProtobuf { key, value }) = seed {
2389 let item = scx.get_item_by_resolved_name(connection)?;
2390 let _ = match item.connection()? {
2391 Connection::Csr(connection) => connection,
2392 _ => {
2393 sql_bail!(
2394 "{} is not a schema registry connection",
2395 scx.catalog
2396 .resolve_full_name(item.name())
2397 .to_string()
2398 .quoted()
2399 )
2400 }
2401 };
2402
2403 if !options.is_empty() {
2404 sql_bail!("Protobuf CSR connections do not support any options");
2405 }
2406
2407 let value = DataEncoding::Protobuf(ProtobufEncoding {
2408 descriptors: strconv::parse_bytes(&value.schema)?,
2409 message_name: value.message_name.clone(),
2410 confluent_wire_format: true,
2411 });
2412 if let Some(key) = key {
2413 return Ok(SourceDataEncoding {
2414 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415 descriptors: strconv::parse_bytes(&key.schema)?,
2416 message_name: key.message_name.clone(),
2417 confluent_wire_format: true,
2418 })),
2419 value,
2420 });
2421 }
2422 value
2423 } else {
2424 unreachable!("CSR seed resolution should already have been called: Proto")
2425 }
2426 }
2427 ProtobufSchema::InlineSchema {
2428 message_name,
2429 schema: ast::Schema { schema },
2430 } => {
2431 let descriptors = strconv::parse_bytes(schema)?;
2432
2433 DataEncoding::Protobuf(ProtobufEncoding {
2434 descriptors,
2435 message_name: message_name.to_owned(),
2436 confluent_wire_format: false,
2437 })
2438 }
2439 },
2440 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441 regex: mz_repr::adt::regex::Regex::new(regex, false)
2442 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443 }),
2444 Format::Csv { columns, delimiter } => {
2445 let columns = match columns {
2446 CsvColumns::Header { names } => {
2447 if names.is_empty() {
2448 sql_bail!("[internal error] column spec should get names in purify")
2449 }
2450 ColumnSpec::Header {
2451 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452 }
2453 }
2454 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455 };
2456 DataEncoding::Csv(CsvEncoding {
2457 columns,
2458 delimiter: u8::try_from(*delimiter)
2459 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460 })
2461 }
2462 Format::Json { array: false } => DataEncoding::Json,
2463 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464 Format::Text => DataEncoding::Text,
2465 };
2466 Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469fn get_key_envelope(
2471 included_items: &[SourceIncludeMetadata],
2472 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473 key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475 let key_definition = included_items
2476 .iter()
2477 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482 (Some(name), _) if key_envelope_no_encoding => {
2483 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484 }
2485 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486 (_, None) => {
2487 sql_bail!(
2491 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492 got bare FORMAT"
2493 );
2494 }
2495 }
2496 } else {
2497 Ok(KeyEnvelope::None)
2498 }
2499}
2500
2501fn get_unnamed_key_envelope(
2504 key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506 let is_composite = match key {
2510 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511 Some(
2512 DataEncoding::Avro(_)
2513 | DataEncoding::Csv(_)
2514 | DataEncoding::Protobuf(_)
2515 | DataEncoding::Regex { .. },
2516 ) => true,
2517 None => false,
2518 };
2519
2520 if is_composite {
2521 Ok(KeyEnvelope::Flattened)
2522 } else {
2523 Ok(KeyEnvelope::Named("key".to_string()))
2524 }
2525}
2526
2527pub fn describe_create_view(
2528 _: &StatementContext,
2529 _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531 Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535 scx: &StatementContext,
2536 def: &mut ViewDefinition<Aug>,
2537 temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539 let create_sql = normalize::create_statement(
2540 scx,
2541 Statement::CreateView(CreateViewStatement {
2542 if_exists: IfExistsBehavior::Error,
2543 temporary,
2544 definition: def.clone(),
2545 }),
2546 )?;
2547
2548 let ViewDefinition {
2549 name,
2550 columns,
2551 query,
2552 } = def;
2553
2554 let query::PlannedRootQuery {
2555 expr,
2556 mut desc,
2557 finishing,
2558 scope: _,
2559 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566 &finishing,
2567 expr.arity()
2568 ));
2569 if expr.contains_parameters()? {
2570 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571 }
2572
2573 let dependencies = expr
2574 .depends_on()
2575 .into_iter()
2576 .map(|gid| scx.catalog.resolve_item_id(&gid))
2577 .collect();
2578
2579 let name = if temporary {
2580 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581 } else {
2582 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583 };
2584
2585 plan_utils::maybe_rename_columns(
2586 format!("view {}", scx.catalog.resolve_full_name(&name)),
2587 &mut desc,
2588 columns,
2589 )?;
2590 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592 if let Some(dup) = names.iter().duplicates().next() {
2593 sql_bail!("column {} specified more than once", dup.quoted());
2594 }
2595
2596 let view = View {
2597 create_sql,
2598 expr,
2599 dependencies,
2600 column_names: names,
2601 temporary,
2602 };
2603
2604 Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608 scx: &StatementContext,
2609 mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611 let CreateViewStatement {
2612 temporary,
2613 if_exists,
2614 definition,
2615 } = &mut stmt;
2616 let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623 let if_exists = true;
2624 let cascade = false;
2625 let maybe_item_to_drop = plan_drop_item(
2626 scx,
2627 ObjectType::View,
2628 if_exists,
2629 definition.name.clone(),
2630 cascade,
2631 )?;
2632
2633 if let Some(id) = maybe_item_to_drop {
2635 let dependencies = view.expr.depends_on();
2636 let invalid_drop = scx
2637 .get_item(&id)
2638 .global_ids()
2639 .any(|gid| dependencies.contains(&gid));
2640 if invalid_drop {
2641 let item = scx.catalog.get_item(&id);
2642 sql_bail!(
2643 "cannot replace view {0}: depended upon by new {0} definition",
2644 scx.catalog.resolve_full_name(item.name())
2645 );
2646 }
2647
2648 Some(id)
2649 } else {
2650 None
2651 }
2652 } else {
2653 None
2654 };
2655 let drop_ids = replace
2656 .map(|id| {
2657 scx.catalog
2658 .item_dependents(id)
2659 .into_iter()
2660 .map(|id| id.unwrap_item_id())
2661 .collect()
2662 })
2663 .unwrap_or_default();
2664
2665 validate_view_dependencies(scx, &view.dependencies.0)?;
2666
2667 let full_name = scx.catalog.resolve_full_name(&name);
2669 let partial_name = PartialItemName::from(full_name.clone());
2670 if let (Ok(item), IfExistsBehavior::Error, false) = (
2673 scx.catalog.resolve_item_or_type(&partial_name),
2674 *if_exists,
2675 ignore_if_exists_errors,
2676 ) {
2677 return Err(PlanError::ItemAlreadyExists {
2678 name: full_name.to_string(),
2679 item_type: item.item_type(),
2680 });
2681 }
2682
2683 Ok(Plan::CreateView(CreateViewPlan {
2684 name,
2685 view,
2686 replace,
2687 drop_ids,
2688 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2689 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2690 }))
2691}
2692
2693fn validate_view_dependencies(
2695 scx: &StatementContext,
2696 dependencies: &BTreeSet<CatalogItemId>,
2697) -> Result<(), PlanError> {
2698 for id in dependencies {
2699 let item = scx.catalog.get_item(id);
2700 if item.replacement_target().is_some() {
2701 let name = scx.catalog.minimal_qualification(item.name());
2702 return Err(PlanError::InvalidDependency {
2703 name: name.to_string(),
2704 item_type: format!("replacement {}", item.item_type()),
2705 });
2706 }
2707 }
2708
2709 Ok(())
2710}
2711
2712pub fn describe_create_materialized_view(
2713 _: &StatementContext,
2714 _: CreateMaterializedViewStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716 Ok(StatementDesc::new(None))
2717}
2718
2719pub fn describe_create_continual_task(
2720 _: &StatementContext,
2721 _: CreateContinualTaskStatement<Aug>,
2722) -> Result<StatementDesc, PlanError> {
2723 Ok(StatementDesc::new(None))
2724}
2725
2726pub fn describe_create_network_policy(
2727 _: &StatementContext,
2728 _: CreateNetworkPolicyStatement<Aug>,
2729) -> Result<StatementDesc, PlanError> {
2730 Ok(StatementDesc::new(None))
2731}
2732
2733pub fn describe_alter_network_policy(
2734 _: &StatementContext,
2735 _: AlterNetworkPolicyStatement<Aug>,
2736) -> Result<StatementDesc, PlanError> {
2737 Ok(StatementDesc::new(None))
2738}
2739
2740pub fn plan_create_materialized_view(
2741 scx: &StatementContext,
2742 mut stmt: CreateMaterializedViewStatement<Aug>,
2743) -> Result<Plan, PlanError> {
2744 let cluster_id =
2745 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2746 stmt.in_cluster = Some(ResolvedClusterName {
2747 id: cluster_id,
2748 print_name: None,
2749 });
2750
2751 let create_sql =
2752 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2753
2754 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2755 let name = scx.allocate_qualified_name(partial_name.clone())?;
2756
2757 let query::PlannedRootQuery {
2758 expr,
2759 mut desc,
2760 finishing,
2761 scope: _,
2762 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2763 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2765 &finishing,
2766 expr.arity()
2767 ));
2768 if expr.contains_parameters()? {
2769 return Err(PlanError::ParameterNotAllowed(
2770 "materialized views".to_string(),
2771 ));
2772 }
2773
2774 plan_utils::maybe_rename_columns(
2775 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2776 &mut desc,
2777 &stmt.columns,
2778 )?;
2779 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2780
2781 let MaterializedViewOptionExtracted {
2782 assert_not_null,
2783 partition_by,
2784 retain_history,
2785 refresh,
2786 seen: _,
2787 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2788
2789 if let Some(partition_by) = partition_by {
2790 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2791 check_partition_by(&desc, partition_by)?;
2792 }
2793
2794 let refresh_schedule = {
2795 let mut refresh_schedule = RefreshSchedule::default();
2796 let mut on_commits_seen = 0;
2797 for refresh_option_value in refresh {
2798 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2799 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2800 }
2801 match refresh_option_value {
2802 RefreshOptionValue::OnCommit => {
2803 on_commits_seen += 1;
2804 }
2805 RefreshOptionValue::AtCreation => {
2806 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2807 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2808 }
2809 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2810 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2812 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2813 name: "REFRESH AT",
2814 scope: &Scope::empty(),
2815 relation_type: &SqlRelationType::empty(),
2816 allow_aggregates: false,
2817 allow_subqueries: false,
2818 allow_parameters: false,
2819 allow_windows: false,
2820 };
2821 let hir = plan_expr(ecx, &time)?.cast_to(
2822 ecx,
2823 CastContext::Assignment,
2824 &SqlScalarType::MzTimestamp,
2825 )?;
2826 let timestamp = hir
2828 .into_literal_mz_timestamp()
2829 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2830 refresh_schedule.ats.push(timestamp);
2831 }
2832 RefreshOptionValue::Every(RefreshEveryOptionValue {
2833 interval,
2834 aligned_to,
2835 }) => {
2836 let interval = Interval::try_from_value(Value::Interval(interval))?;
2837 if interval.as_microseconds() <= 0 {
2838 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2839 }
2840 if interval.months != 0 {
2841 sql_bail!("REFRESH interval must not involve units larger than days");
2846 }
2847 let interval = interval.duration()?;
2848 if u64::try_from(interval.as_millis()).is_err() {
2849 sql_bail!("REFRESH interval too large");
2850 }
2851
2852 let mut aligned_to = match aligned_to {
2853 Some(aligned_to) => aligned_to,
2854 None => {
2855 soft_panic_or_log!(
2856 "ALIGNED TO should have been filled in by purification"
2857 );
2858 sql_bail!(
2859 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2860 )
2861 }
2862 };
2863
2864 transform_ast::transform(scx, &mut aligned_to)?;
2866
2867 let ecx = &ExprContext {
2868 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2869 name: "REFRESH EVERY ... ALIGNED TO",
2870 scope: &Scope::empty(),
2871 relation_type: &SqlRelationType::empty(),
2872 allow_aggregates: false,
2873 allow_subqueries: false,
2874 allow_parameters: false,
2875 allow_windows: false,
2876 };
2877 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2878 ecx,
2879 CastContext::Assignment,
2880 &SqlScalarType::MzTimestamp,
2881 )?;
2882 let aligned_to_const = aligned_to_hir
2884 .into_literal_mz_timestamp()
2885 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2886
2887 refresh_schedule.everies.push(RefreshEvery {
2888 interval,
2889 aligned_to: aligned_to_const,
2890 });
2891 }
2892 }
2893 }
2894
2895 if on_commits_seen > 1 {
2896 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2897 }
2898 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2899 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2900 }
2901
2902 if refresh_schedule == RefreshSchedule::default() {
2903 None
2904 } else {
2905 Some(refresh_schedule)
2906 }
2907 };
2908
2909 let as_of = stmt.as_of.map(Timestamp::from);
2910 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2911 let mut non_null_assertions = assert_not_null
2912 .into_iter()
2913 .map(normalize::column_name)
2914 .map(|assertion_name| {
2915 column_names
2916 .iter()
2917 .position(|col| col == &assertion_name)
2918 .ok_or_else(|| {
2919 sql_err!(
2920 "column {} in ASSERT NOT NULL option not found",
2921 assertion_name.quoted()
2922 )
2923 })
2924 })
2925 .collect::<Result<Vec<_>, _>>()?;
2926 non_null_assertions.sort();
2927 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2928 let dup = &column_names[*dup];
2929 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2930 }
2931
2932 if let Some(dup) = column_names.iter().duplicates().next() {
2933 sql_bail!("column {} specified more than once", dup.quoted());
2934 }
2935
2936 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2939 Ok(true) => IfExistsBehavior::Skip,
2940 _ => stmt.if_exists,
2941 };
2942
2943 let mut replace = None;
2944 let mut if_not_exists = false;
2945 match if_exists {
2946 IfExistsBehavior::Replace => {
2947 let if_exists = true;
2948 let cascade = false;
2949 let replace_id = plan_drop_item(
2950 scx,
2951 ObjectType::MaterializedView,
2952 if_exists,
2953 partial_name.clone().into(),
2954 cascade,
2955 )?;
2956
2957 if let Some(id) = replace_id {
2959 let dependencies = expr.depends_on();
2960 let invalid_drop = scx
2961 .get_item(&id)
2962 .global_ids()
2963 .any(|gid| dependencies.contains(&gid));
2964 if invalid_drop {
2965 let item = scx.catalog.get_item(&id);
2966 sql_bail!(
2967 "cannot replace materialized view {0}: depended upon by new {0} definition",
2968 scx.catalog.resolve_full_name(item.name())
2969 );
2970 }
2971 replace = Some(id);
2972 }
2973 }
2974 IfExistsBehavior::Skip => if_not_exists = true,
2975 IfExistsBehavior::Error => (),
2976 }
2977 let drop_ids = replace
2978 .map(|id| {
2979 scx.catalog
2980 .item_dependents(id)
2981 .into_iter()
2982 .map(|id| id.unwrap_item_id())
2983 .collect()
2984 })
2985 .unwrap_or_default();
2986 let mut dependencies: BTreeSet<_> = expr
2987 .depends_on()
2988 .into_iter()
2989 .map(|gid| scx.catalog.resolve_item_id(&gid))
2990 .collect();
2991
2992 let mut replacement_target = None;
2994 if let Some(target_name) = &stmt.replacement_for {
2995 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2996
2997 let target = scx.get_item_by_resolved_name(target_name)?;
2998 if target.item_type() != CatalogItemType::MaterializedView {
2999 return Err(PlanError::InvalidReplacement {
3000 item_type: target.item_type(),
3001 item_name: scx.catalog.minimal_qualification(target.name()),
3002 replacement_type: CatalogItemType::MaterializedView,
3003 replacement_name: partial_name,
3004 });
3005 }
3006 if target.id().is_system() {
3007 sql_bail!(
3008 "cannot replace {} because it is required by the database system",
3009 scx.catalog.minimal_qualification(target.name()),
3010 );
3011 }
3012
3013 if !dependencies.insert(target.id()) {
3014 sql_bail!(
3015 "cannot replace {} because it is also a dependency",
3016 scx.catalog.minimal_qualification(target.name()),
3017 );
3018 }
3019
3020 for use_id in target.used_by() {
3021 let use_item = scx.get_item(use_id);
3022 if use_item.replacement_target() == Some(target.id()) {
3023 sql_bail!(
3024 "cannot replace {} because it already has a replacement: {}",
3025 scx.catalog.minimal_qualification(target.name()),
3026 scx.catalog.minimal_qualification(use_item.name()),
3027 );
3028 }
3029 }
3030
3031 replacement_target = Some(target.id());
3032 }
3033
3034 validate_view_dependencies(scx, &dependencies)?;
3035
3036 let full_name = scx.catalog.resolve_full_name(&name);
3038 let partial_name = PartialItemName::from(full_name.clone());
3039 if let (IfExistsBehavior::Error, Ok(item)) =
3042 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3043 {
3044 return Err(PlanError::ItemAlreadyExists {
3045 name: full_name.to_string(),
3046 item_type: item.item_type(),
3047 });
3048 }
3049
3050 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3051 name,
3052 materialized_view: MaterializedView {
3053 create_sql,
3054 expr,
3055 dependencies: DependencyIds(dependencies),
3056 column_names,
3057 replacement_target,
3058 cluster_id,
3059 non_null_assertions,
3060 compaction_window,
3061 refresh_schedule,
3062 as_of,
3063 },
3064 replace,
3065 drop_ids,
3066 if_not_exists,
3067 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3068 }))
3069}
3070
3071generate_extracted_config!(
3072 MaterializedViewOption,
3073 (AssertNotNull, Ident, AllowMultiple),
3074 (PartitionBy, Vec<Ident>),
3075 (RetainHistory, OptionalDuration),
3076 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3077);
3078
3079pub fn plan_create_continual_task(
3080 scx: &StatementContext,
3081 mut stmt: CreateContinualTaskStatement<Aug>,
3082) -> Result<Plan, PlanError> {
3083 match &stmt.sugar {
3084 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3085 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3086 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3087 }
3088 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3089 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3090 }
3091 };
3092 let cluster_id = match &stmt.in_cluster {
3093 None => scx.catalog.resolve_cluster(None)?.id(),
3094 Some(in_cluster) => in_cluster.id,
3095 };
3096 stmt.in_cluster = Some(ResolvedClusterName {
3097 id: cluster_id,
3098 print_name: None,
3099 });
3100
3101 let create_sql =
3102 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3103
3104 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3105
3106 let mut desc = match stmt.columns {
3111 None => None,
3112 Some(columns) => {
3113 let mut desc_columns = Vec::with_capacity(columns.capacity());
3114 for col in columns.iter() {
3115 desc_columns.push((
3116 normalize::column_name(col.name.clone()),
3117 SqlColumnType {
3118 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3119 nullable: false,
3120 },
3121 ));
3122 }
3123 Some(RelationDesc::from_names_and_types(desc_columns))
3124 }
3125 };
3126 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3127 match input.item_type() {
3128 CatalogItemType::ContinualTask
3131 | CatalogItemType::Table
3132 | CatalogItemType::MaterializedView
3133 | CatalogItemType::Source => {}
3134 CatalogItemType::Sink
3135 | CatalogItemType::View
3136 | CatalogItemType::Index
3137 | CatalogItemType::Type
3138 | CatalogItemType::Func
3139 | CatalogItemType::Secret
3140 | CatalogItemType::Connection => {
3141 sql_bail!(
3142 "CONTINUAL TASK cannot use {} as an input",
3143 input.item_type()
3144 );
3145 }
3146 }
3147
3148 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3149 let ct_name = stmt.name;
3150 let placeholder_id = match &ct_name {
3151 ResolvedItemName::ContinualTask { id, name } => {
3152 let desc = match desc.as_ref().cloned() {
3153 Some(x) => x,
3154 None => {
3155 let desc = input.relation_desc().expect("item type checked above");
3160 desc.into_owned()
3161 }
3162 };
3163 qcx.ctes.insert(
3164 *id,
3165 CteDesc {
3166 name: name.item.clone(),
3167 desc,
3168 },
3169 );
3170 Some(*id)
3171 }
3172 _ => None,
3173 };
3174
3175 let mut exprs = Vec::new();
3176 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3177 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3178 let query::PlannedRootQuery {
3179 expr,
3180 desc: desc_query,
3181 finishing,
3182 scope: _,
3183 } = query::plan_ct_query(&mut qcx, query)?;
3184 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3187 &finishing,
3188 expr.arity()
3189 ));
3190 if expr.contains_parameters()? {
3191 if expr.contains_parameters()? {
3192 return Err(PlanError::ParameterNotAllowed(
3193 "continual tasks".to_string(),
3194 ));
3195 }
3196 }
3197 let expr = match desc.as_mut() {
3198 None => {
3199 desc = Some(desc_query);
3200 expr
3201 }
3202 Some(desc) => {
3203 if desc_query.arity() > desc.arity() {
3206 sql_bail!(
3207 "statement {}: INSERT has more expressions than target columns",
3208 idx
3209 );
3210 }
3211 if desc_query.arity() < desc.arity() {
3212 sql_bail!(
3213 "statement {}: INSERT has more target columns than expressions",
3214 idx
3215 );
3216 }
3217 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3220 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3221 let expr = expr.map_err(|e| {
3222 sql_err!(
3223 "statement {}: column {} is of type {} but expression is of type {}",
3224 idx,
3225 desc.get_name(e.column).quoted(),
3226 qcx.humanize_scalar_type(&e.target_type, false),
3227 qcx.humanize_scalar_type(&e.source_type, false),
3228 )
3229 })?;
3230
3231 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3234 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3235 if updated {
3236 let new_types = zip_types().map(|(ct, q)| {
3237 let mut ct = ct.clone();
3238 if q.nullable {
3239 ct.nullable = true;
3240 }
3241 ct
3242 });
3243 *desc = RelationDesc::from_names_and_types(
3244 desc.iter_names().cloned().zip_eq(new_types),
3245 );
3246 }
3247
3248 expr
3249 }
3250 };
3251 match stmt {
3252 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3253 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3254 }
3255 }
3256 let expr = exprs
3259 .into_iter()
3260 .reduce(|acc, expr| acc.union(expr))
3261 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3262 let dependencies = expr
3263 .depends_on()
3264 .into_iter()
3265 .map(|gid| scx.catalog.resolve_item_id(&gid))
3266 .collect();
3267
3268 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3269 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3270 if let Some(dup) = column_names.iter().duplicates().next() {
3271 sql_bail!("column {} specified more than once", dup.quoted());
3272 }
3273
3274 let name = match &ct_name {
3276 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3277 ResolvedItemName::ContinualTask { name, .. } => {
3278 let name = scx.allocate_qualified_name(name.clone())?;
3279 let full_name = scx.catalog.resolve_full_name(&name);
3280 let partial_name = PartialItemName::from(full_name.clone());
3281 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3284 return Err(PlanError::ItemAlreadyExists {
3285 name: full_name.to_string(),
3286 item_type: item.item_type(),
3287 });
3288 }
3289 name
3290 }
3291 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3292 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3293 };
3294
3295 let as_of = stmt.as_of.map(Timestamp::from);
3296 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3297 name,
3298 placeholder_id,
3299 desc,
3300 input_id: input.global_id(),
3301 with_snapshot: snapshot.unwrap_or(true),
3302 continual_task: MaterializedView {
3303 create_sql,
3304 expr,
3305 dependencies,
3306 column_names,
3307 replacement_target: None,
3308 cluster_id,
3309 non_null_assertions: Vec::new(),
3310 compaction_window: None,
3311 refresh_schedule: None,
3312 as_of,
3313 },
3314 }))
3315}
3316
3317fn continual_task_query<'a>(
3318 ct_name: &ResolvedItemName,
3319 stmt: &'a ast::ContinualTaskStmt<Aug>,
3320) -> Option<ast::Query<Aug>> {
3321 match stmt {
3322 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3323 table_name: _,
3324 columns,
3325 source,
3326 returning,
3327 }) => {
3328 if !columns.is_empty() || !returning.is_empty() {
3329 return None;
3330 }
3331 match source {
3332 ast::InsertSource::Query(query) => Some(query.clone()),
3333 ast::InsertSource::DefaultValues => None,
3334 }
3335 }
3336 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3337 table_name: _,
3338 alias,
3339 using,
3340 selection,
3341 }) => {
3342 if !using.is_empty() {
3343 return None;
3344 }
3345 let from = ast::TableWithJoins {
3347 relation: ast::TableFactor::Table {
3348 name: ct_name.clone(),
3349 alias: alias.clone(),
3350 },
3351 joins: Vec::new(),
3352 };
3353 let select = ast::Select {
3354 from: vec![from],
3355 selection: selection.clone(),
3356 distinct: None,
3357 projection: vec![ast::SelectItem::Wildcard],
3358 group_by: Vec::new(),
3359 having: None,
3360 qualify: None,
3361 options: Vec::new(),
3362 };
3363 let query = ast::Query {
3364 ctes: ast::CteBlock::Simple(Vec::new()),
3365 body: ast::SetExpr::Select(Box::new(select)),
3366 order_by: Vec::new(),
3367 limit: None,
3368 offset: None,
3369 };
3370 Some(query)
3372 }
3373 }
3374}
3375
3376generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3377
3378pub fn describe_create_sink(
3379 _: &StatementContext,
3380 _: CreateSinkStatement<Aug>,
3381) -> Result<StatementDesc, PlanError> {
3382 Ok(StatementDesc::new(None))
3383}
3384
3385generate_extracted_config!(
3386 CreateSinkOption,
3387 (Snapshot, bool),
3388 (PartitionStrategy, String),
3389 (Version, u64),
3390 (CommitInterval, Duration)
3391);
3392
3393pub fn plan_create_sink(
3394 scx: &StatementContext,
3395 stmt: CreateSinkStatement<Aug>,
3396) -> Result<Plan, PlanError> {
3397 let Some(name) = stmt.name.clone() else {
3399 return Err(PlanError::MissingName(CatalogItemType::Sink));
3400 };
3401 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3402 let full_name = scx.catalog.resolve_full_name(&name);
3403 let partial_name = PartialItemName::from(full_name.clone());
3404 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3405 return Err(PlanError::ItemAlreadyExists {
3406 name: full_name.to_string(),
3407 item_type: item.item_type(),
3408 });
3409 }
3410
3411 plan_sink(scx, stmt)
3412}
3413
3414fn plan_sink(
3419 scx: &StatementContext,
3420 mut stmt: CreateSinkStatement<Aug>,
3421) -> Result<Plan, PlanError> {
3422 let CreateSinkStatement {
3423 name,
3424 in_cluster: _,
3425 from,
3426 connection,
3427 format,
3428 envelope,
3429 if_not_exists,
3430 with_options,
3431 } = stmt.clone();
3432
3433 let Some(name) = name else {
3434 return Err(PlanError::MissingName(CatalogItemType::Sink));
3435 };
3436 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3437
3438 let envelope = match envelope {
3439 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3440 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3441 None => sql_bail!("ENVELOPE clause is required"),
3442 };
3443
3444 let from_name = &from;
3445 let from = scx.get_item_by_resolved_name(&from)?;
3446
3447 {
3448 use CatalogItemType::*;
3449 match from.item_type() {
3450 Table | Source | MaterializedView | ContinualTask => {
3451 if from.replacement_target().is_some() {
3452 let name = scx.catalog.minimal_qualification(from.name());
3453 return Err(PlanError::InvalidSinkFrom {
3454 name: name.to_string(),
3455 item_type: format!("replacement {}", from.item_type()),
3456 });
3457 }
3458 }
3459 Sink | View | Index | Type | Func | Secret | Connection => {
3460 let name = scx.catalog.minimal_qualification(from.name());
3461 return Err(PlanError::InvalidSinkFrom {
3462 name: name.to_string(),
3463 item_type: from.item_type().to_string(),
3464 });
3465 }
3466 }
3467 }
3468
3469 if from.id().is_system() {
3470 bail_unsupported!("creating a sink directly on a catalog object");
3471 }
3472
3473 let desc = from.relation_desc().expect("item type checked above");
3474 let key_indices = match &connection {
3475 CreateSinkConnection::Kafka { key: Some(key), .. }
3476 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3477 let key_columns = key
3478 .key_columns
3479 .clone()
3480 .into_iter()
3481 .map(normalize::column_name)
3482 .collect::<Vec<_>>();
3483 let mut uniq = BTreeSet::new();
3484 for col in key_columns.iter() {
3485 if !uniq.insert(col) {
3486 sql_bail!("duplicate column referenced in KEY: {}", col);
3487 }
3488 }
3489 let indices = key_columns
3490 .iter()
3491 .map(|col| -> anyhow::Result<usize> {
3492 let name_idx =
3493 desc.get_by_name(col)
3494 .map(|(idx, _type)| idx)
3495 .ok_or_else(|| {
3496 sql_err!("column referenced in KEY does not exist: {}", col)
3497 })?;
3498 if desc.get_unambiguous_name(name_idx).is_none() {
3499 sql_err!("column referenced in KEY is ambiguous: {}", col);
3500 }
3501 Ok(name_idx)
3502 })
3503 .collect::<Result<Vec<_>, _>>()?;
3504 let is_valid_key = desc
3505 .typ()
3506 .keys
3507 .iter()
3508 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3509
3510 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3511 if key.not_enforced {
3512 scx.catalog
3513 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3514 key: key_columns.clone(),
3515 name: name.item.clone(),
3516 })
3517 } else {
3518 return Err(PlanError::UpsertSinkWithInvalidKey {
3519 name: from_name.full_name_str(),
3520 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3521 valid_keys: desc
3522 .typ()
3523 .keys
3524 .iter()
3525 .map(|key| {
3526 key.iter()
3527 .map(|col| desc.get_name(*col).as_str().into())
3528 .collect()
3529 })
3530 .collect(),
3531 });
3532 }
3533 }
3534 Some(indices)
3535 }
3536 CreateSinkConnection::Kafka { key: None, .. }
3537 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3538 };
3539
3540 let headers_index = match &connection {
3541 CreateSinkConnection::Kafka {
3542 headers: Some(headers),
3543 ..
3544 } => {
3545 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3546
3547 match envelope {
3548 SinkEnvelope::Upsert => (),
3549 SinkEnvelope::Debezium => {
3550 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3551 }
3552 };
3553
3554 let headers = normalize::column_name(headers.clone());
3555 let (idx, ty) = desc
3556 .get_by_name(&headers)
3557 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3558
3559 if desc.get_unambiguous_name(idx).is_none() {
3560 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3561 }
3562
3563 match &ty.scalar_type {
3564 SqlScalarType::Map { value_type, .. }
3565 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3566 _ => sql_bail!(
3567 "HEADERS column must have type map[text => text] or map[text => bytea]"
3568 ),
3569 }
3570
3571 Some(idx)
3572 }
3573 _ => None,
3574 };
3575
3576 let relation_key_indices = desc.typ().keys.get(0).cloned();
3578
3579 let key_desc_and_indices = key_indices.map(|key_indices| {
3580 let cols = desc
3581 .iter()
3582 .map(|(name, ty)| (name.clone(), ty.clone()))
3583 .collect::<Vec<_>>();
3584 let (names, types): (Vec<_>, Vec<_>) =
3585 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3586 let typ = SqlRelationType::new(types);
3587 (RelationDesc::new(typ, names), key_indices)
3588 });
3589
3590 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3591 return Err(PlanError::UpsertSinkWithoutKey);
3592 }
3593
3594 let CreateSinkOptionExtracted {
3595 snapshot,
3596 version,
3597 partition_strategy: _,
3598 seen: _,
3599 commit_interval,
3600 } = with_options.try_into()?;
3601
3602 let connection_builder = match connection {
3603 CreateSinkConnection::Kafka {
3604 connection,
3605 options,
3606 ..
3607 } => kafka_sink_builder(
3608 scx,
3609 connection,
3610 options,
3611 format,
3612 relation_key_indices,
3613 key_desc_and_indices,
3614 headers_index,
3615 desc.into_owned(),
3616 envelope,
3617 from.id(),
3618 commit_interval,
3619 )?,
3620 CreateSinkConnection::Iceberg {
3621 connection,
3622 aws_connection,
3623 options,
3624 ..
3625 } => iceberg_sink_builder(
3626 scx,
3627 connection,
3628 aws_connection,
3629 options,
3630 relation_key_indices,
3631 key_desc_and_indices,
3632 commit_interval,
3633 )?,
3634 };
3635
3636 let with_snapshot = snapshot.unwrap_or(true);
3638 let version = version.unwrap_or(0);
3640
3641 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3645 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3646
3647 Ok(Plan::CreateSink(CreateSinkPlan {
3648 name,
3649 sink: Sink {
3650 create_sql,
3651 from: from.global_id(),
3652 connection: connection_builder,
3653 envelope,
3654 version,
3655 commit_interval,
3656 },
3657 with_snapshot,
3658 if_not_exists,
3659 in_cluster: in_cluster.id(),
3660 }))
3661}
3662
3663fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3664 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3665
3666 let existing_keys = desc
3667 .typ()
3668 .keys
3669 .iter()
3670 .map(|key_columns| {
3671 key_columns
3672 .iter()
3673 .map(|col| desc.get_name(*col).as_str())
3674 .join(", ")
3675 })
3676 .join(", ");
3677
3678 sql_err!(
3679 "Key constraint ({}) conflicts with existing key ({})",
3680 user_keys,
3681 existing_keys
3682 )
3683}
3684
3685#[derive(Debug, Default, PartialEq, Clone)]
3688pub struct CsrConfigOptionExtracted {
3689 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3690 pub(crate) avro_key_fullname: Option<String>,
3691 pub(crate) avro_value_fullname: Option<String>,
3692 pub(crate) null_defaults: bool,
3693 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3694 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3695 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3696 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3697}
3698
3699impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3700 type Error = crate::plan::PlanError;
3701 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3702 let mut extracted = CsrConfigOptionExtracted::default();
3703 let mut common_doc_comments = BTreeMap::new();
3704 for option in v {
3705 if !extracted.seen.insert(option.name.clone()) {
3706 return Err(PlanError::Unstructured({
3707 format!("{} specified more than once", option.name)
3708 }));
3709 }
3710 let option_name = option.name.clone();
3711 let option_name_str = option_name.to_ast_string_simple();
3712 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3713 option_name: option_name.to_ast_string_simple(),
3714 err: e.into(),
3715 };
3716 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3717 val.map(|s| match s {
3718 WithOptionValue::Value(Value::String(s)) => {
3719 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3720 }
3721 _ => Err("must be a string".to_string()),
3722 })
3723 .transpose()
3724 .map_err(PlanError::Unstructured)
3725 .map_err(better_error)
3726 };
3727 match option.name {
3728 CsrConfigOptionName::AvroKeyFullname => {
3729 extracted.avro_key_fullname =
3730 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3731 }
3732 CsrConfigOptionName::AvroValueFullname => {
3733 extracted.avro_value_fullname =
3734 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3735 }
3736 CsrConfigOptionName::NullDefaults => {
3737 extracted.null_defaults =
3738 <bool>::try_from_value(option.value).map_err(better_error)?;
3739 }
3740 CsrConfigOptionName::AvroDocOn(doc_on) => {
3741 let value = String::try_from_value(option.value.ok_or_else(|| {
3742 PlanError::InvalidOptionValue {
3743 option_name: option_name_str,
3744 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3745 }
3746 })?)
3747 .map_err(better_error)?;
3748 let key = match doc_on.identifier {
3749 DocOnIdentifier::Column(ast::ColumnName {
3750 relation: ResolvedItemName::Item { id, .. },
3751 column: ResolvedColumnReference::Column { name, index: _ },
3752 }) => DocTarget::Field {
3753 object_id: id,
3754 column_name: name,
3755 },
3756 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3757 DocTarget::Type(id)
3758 }
3759 _ => unreachable!(),
3760 };
3761
3762 match doc_on.for_schema {
3763 DocOnSchema::KeyOnly => {
3764 extracted.key_doc_options.insert(key, value);
3765 }
3766 DocOnSchema::ValueOnly => {
3767 extracted.value_doc_options.insert(key, value);
3768 }
3769 DocOnSchema::All => {
3770 common_doc_comments.insert(key, value);
3771 }
3772 }
3773 }
3774 CsrConfigOptionName::KeyCompatibilityLevel => {
3775 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3776 }
3777 CsrConfigOptionName::ValueCompatibilityLevel => {
3778 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3779 }
3780 }
3781 }
3782
3783 for (key, value) in common_doc_comments {
3784 if !extracted.key_doc_options.contains_key(&key) {
3785 extracted.key_doc_options.insert(key.clone(), value.clone());
3786 }
3787 if !extracted.value_doc_options.contains_key(&key) {
3788 extracted.value_doc_options.insert(key, value);
3789 }
3790 }
3791 Ok(extracted)
3792 }
3793}
3794
3795fn iceberg_sink_builder(
3796 scx: &StatementContext,
3797 catalog_connection: ResolvedItemName,
3798 aws_connection: ResolvedItemName,
3799 options: Vec<IcebergSinkConfigOption<Aug>>,
3800 relation_key_indices: Option<Vec<usize>>,
3801 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3802 commit_interval: Option<Duration>,
3803) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3804 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3805 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3806 let catalog_connection_id = catalog_connection_item.id();
3807 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3808 let aws_connection_id = aws_connection_item.id();
3809 if !matches!(
3810 catalog_connection_item.connection()?,
3811 Connection::IcebergCatalog(_)
3812 ) {
3813 sql_bail!(
3814 "{} is not an iceberg catalog connection",
3815 scx.catalog
3816 .resolve_full_name(catalog_connection_item.name())
3817 .to_string()
3818 .quoted()
3819 );
3820 };
3821
3822 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3823 sql_bail!(
3824 "{} is not an AWS connection",
3825 scx.catalog
3826 .resolve_full_name(aws_connection_item.name())
3827 .to_string()
3828 .quoted()
3829 );
3830 }
3831
3832 let IcebergSinkConfigOptionExtracted {
3833 table,
3834 namespace,
3835 seen: _,
3836 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3837
3838 let Some(table) = table else {
3839 sql_bail!("Iceberg sink must specify TABLE");
3840 };
3841 let Some(namespace) = namespace else {
3842 sql_bail!("Iceberg sink must specify NAMESPACE");
3843 };
3844 if commit_interval.is_none() {
3845 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3846 }
3847
3848 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3849 catalog_connection_id,
3850 catalog_connection: catalog_connection_id,
3851 aws_connection_id,
3852 aws_connection: aws_connection_id,
3853 table,
3854 namespace,
3855 relation_key_indices,
3856 key_desc_and_indices,
3857 }))
3858}
3859
3860fn kafka_sink_builder(
3861 scx: &StatementContext,
3862 connection: ResolvedItemName,
3863 options: Vec<KafkaSinkConfigOption<Aug>>,
3864 format: Option<FormatSpecifier<Aug>>,
3865 relation_key_indices: Option<Vec<usize>>,
3866 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3867 headers_index: Option<usize>,
3868 value_desc: RelationDesc,
3869 envelope: SinkEnvelope,
3870 sink_from: CatalogItemId,
3871 commit_interval: Option<Duration>,
3872) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3873 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3875 let connection_id = connection_item.id();
3876 match connection_item.connection()? {
3877 Connection::Kafka(_) => (),
3878 _ => sql_bail!(
3879 "{} is not a kafka connection",
3880 scx.catalog.resolve_full_name(connection_item.name())
3881 ),
3882 };
3883
3884 if commit_interval.is_some() {
3885 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3886 }
3887
3888 let KafkaSinkConfigOptionExtracted {
3889 topic,
3890 compression_type,
3891 partition_by,
3892 progress_group_id_prefix,
3893 transactional_id_prefix,
3894 legacy_ids,
3895 topic_config,
3896 topic_metadata_refresh_interval,
3897 topic_partition_count,
3898 topic_replication_factor,
3899 seen: _,
3900 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3901
3902 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3903 (Some(_), Some(true)) => {
3904 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3905 }
3906 (None, Some(true)) => KafkaIdStyle::Legacy,
3907 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3908 };
3909
3910 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3911 (Some(_), Some(true)) => {
3912 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3913 }
3914 (None, Some(true)) => KafkaIdStyle::Legacy,
3915 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3916 };
3917
3918 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3919
3920 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3921 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3924 }
3925
3926 let assert_positive = |val: Option<i32>, name: &str| {
3927 if let Some(val) = val {
3928 if val <= 0 {
3929 sql_bail!("{} must be a positive integer", name);
3930 }
3931 }
3932 val.map(NonNeg::try_from)
3933 .transpose()
3934 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3935 };
3936 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3937 let topic_replication_factor =
3938 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3939
3940 let gen_avro_schema_options = |conn| {
3943 let CsrConnectionAvro {
3944 connection:
3945 CsrConnection {
3946 connection,
3947 options,
3948 },
3949 seed,
3950 key_strategy,
3951 value_strategy,
3952 } = conn;
3953 if seed.is_some() {
3954 sql_bail!("SEED option does not make sense with sinks");
3955 }
3956 if key_strategy.is_some() {
3957 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3958 }
3959 if value_strategy.is_some() {
3960 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3961 }
3962
3963 let item = scx.get_item_by_resolved_name(&connection)?;
3964 let csr_connection = match item.connection()? {
3965 Connection::Csr(_) => item.id(),
3966 _ => {
3967 sql_bail!(
3968 "{} is not a schema registry connection",
3969 scx.catalog
3970 .resolve_full_name(item.name())
3971 .to_string()
3972 .quoted()
3973 )
3974 }
3975 };
3976 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3977
3978 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3979 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3980 }
3981
3982 if key_desc_and_indices.is_some()
3983 && (extracted_options.avro_key_fullname.is_some()
3984 ^ extracted_options.avro_value_fullname.is_some())
3985 {
3986 sql_bail!(
3987 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3988 );
3989 }
3990
3991 Ok((csr_connection, extracted_options))
3992 };
3993
3994 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3995 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3996 Format::Bytes if desc.arity() == 1 => {
3997 let col_type = &desc.typ().column_types[0].scalar_type;
3998 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3999 bail_unsupported!(format!(
4000 "BYTES format with non-encodable type: {:?}",
4001 col_type
4002 ));
4003 }
4004
4005 Ok(KafkaSinkFormatType::Bytes)
4006 }
4007 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
4008 Format::Bytes | Format::Text => {
4009 bail_unsupported!("BYTES or TEXT format with multiple columns")
4010 }
4011 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
4012 Format::Avro(AvroSchema::Csr { csr_connection }) => {
4013 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
4014 let schema = if is_key {
4015 AvroSchemaGenerator::new(
4016 desc.clone(),
4017 false,
4018 options.key_doc_options,
4019 options.avro_key_fullname.as_deref().unwrap_or("row"),
4020 options.null_defaults,
4021 Some(sink_from),
4022 false,
4023 )?
4024 .schema()
4025 .to_string()
4026 } else {
4027 AvroSchemaGenerator::new(
4028 desc.clone(),
4029 matches!(envelope, SinkEnvelope::Debezium),
4030 options.value_doc_options,
4031 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
4032 options.null_defaults,
4033 Some(sink_from),
4034 true,
4035 )?
4036 .schema()
4037 .to_string()
4038 };
4039 Ok(KafkaSinkFormatType::Avro {
4040 schema,
4041 compatibility_level: if is_key {
4042 options.key_compatibility_level
4043 } else {
4044 options.value_compatibility_level
4045 },
4046 csr_connection,
4047 })
4048 }
4049 format => bail_unsupported!(format!("sink format {:?}", format)),
4050 };
4051
4052 let partition_by = match &partition_by {
4053 Some(partition_by) => {
4054 let mut scope = Scope::from_source(None, value_desc.iter_names());
4055
4056 match envelope {
4057 SinkEnvelope::Upsert => (),
4058 SinkEnvelope::Debezium => {
4059 let key_indices: HashSet<_> = key_desc_and_indices
4060 .as_ref()
4061 .map(|(_desc, indices)| indices.as_slice())
4062 .unwrap_or_default()
4063 .into_iter()
4064 .collect();
4065 for (i, item) in scope.items.iter_mut().enumerate() {
4066 if !key_indices.contains(&i) {
4067 item.error_if_referenced = Some(|_table, column| {
4068 PlanError::InvalidPartitionByEnvelopeDebezium {
4069 column_name: column.to_string(),
4070 }
4071 });
4072 }
4073 }
4074 }
4075 };
4076
4077 let ecx = &ExprContext {
4078 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4079 name: "PARTITION BY",
4080 scope: &scope,
4081 relation_type: value_desc.typ(),
4082 allow_aggregates: false,
4083 allow_subqueries: false,
4084 allow_parameters: false,
4085 allow_windows: false,
4086 };
4087 let expr = plan_expr(ecx, partition_by)?.cast_to(
4088 ecx,
4089 CastContext::Assignment,
4090 &SqlScalarType::UInt64,
4091 )?;
4092 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
4093
4094 Some(expr)
4095 }
4096 _ => None,
4097 };
4098
4099 let format = match format {
4101 Some(FormatSpecifier::KeyValue { key, value }) => {
4102 let key_format = match key_desc_and_indices.as_ref() {
4103 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4104 None => None,
4105 };
4106 KafkaSinkFormat {
4107 value_format: map_format(value, &value_desc, false)?,
4108 key_format,
4109 }
4110 }
4111 Some(FormatSpecifier::Bare(format)) => {
4112 let key_format = match key_desc_and_indices.as_ref() {
4113 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4114 None => None,
4115 };
4116 KafkaSinkFormat {
4117 value_format: map_format(format, &value_desc, false)?,
4118 key_format,
4119 }
4120 }
4121 None => bail_unsupported!("sink without format"),
4122 };
4123
4124 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4125 connection_id,
4126 connection: connection_id,
4127 format,
4128 topic: topic_name,
4129 relation_key_indices,
4130 key_desc_and_indices,
4131 headers_index,
4132 value_desc,
4133 partition_by,
4134 compression_type,
4135 progress_group_id,
4136 transactional_id,
4137 topic_options: KafkaTopicOptions {
4138 partition_count: topic_partition_count,
4139 replication_factor: topic_replication_factor,
4140 topic_config: topic_config.unwrap_or_default(),
4141 },
4142 topic_metadata_refresh_interval,
4143 }))
4144}
4145
4146pub fn describe_create_index(
4147 _: &StatementContext,
4148 _: CreateIndexStatement<Aug>,
4149) -> Result<StatementDesc, PlanError> {
4150 Ok(StatementDesc::new(None))
4151}
4152
4153pub fn plan_create_index(
4154 scx: &StatementContext,
4155 mut stmt: CreateIndexStatement<Aug>,
4156) -> Result<Plan, PlanError> {
4157 let CreateIndexStatement {
4158 name,
4159 on_name,
4160 in_cluster,
4161 key_parts,
4162 with_options,
4163 if_not_exists,
4164 } = &mut stmt;
4165 let on = scx.get_item_by_resolved_name(on_name)?;
4166
4167 {
4168 use CatalogItemType::*;
4169 match on.item_type() {
4170 Table | Source | View | MaterializedView | ContinualTask => {
4171 if on.replacement_target().is_some() {
4172 sql_bail!(
4173 "index cannot be created on {} because it is a replacement {}",
4174 on_name.full_name_str(),
4175 on.item_type(),
4176 );
4177 }
4178 }
4179 Sink | Index | Type | Func | Secret | Connection => {
4180 sql_bail!(
4181 "index cannot be created on {} because it is a {}",
4182 on_name.full_name_str(),
4183 on.item_type(),
4184 );
4185 }
4186 }
4187 }
4188
4189 let on_desc = on.relation_desc().expect("item type checked above");
4190
4191 let filled_key_parts = match key_parts {
4192 Some(kp) => kp.to_vec(),
4193 None => {
4194 let key = on_desc.typ().default_key();
4196 key.iter()
4197 .map(|i| match on_desc.get_unambiguous_name(*i) {
4198 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4199 _ => Expr::Value(Value::Number((i + 1).to_string())),
4200 })
4201 .collect()
4202 }
4203 };
4204 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4205
4206 let index_name = if let Some(name) = name {
4207 QualifiedItemName {
4208 qualifiers: on.name().qualifiers.clone(),
4209 item: normalize::ident(name.clone()),
4210 }
4211 } else {
4212 let mut idx_name = QualifiedItemName {
4213 qualifiers: on.name().qualifiers.clone(),
4214 item: on.name().item.clone(),
4215 };
4216 if key_parts.is_none() {
4217 idx_name.item += "_primary_idx";
4219 } else {
4220 let index_name_col_suffix = keys
4223 .iter()
4224 .map(|k| match k {
4225 mz_expr::MirScalarExpr::Column(i, name) => {
4226 match (on_desc.get_unambiguous_name(*i), &name.0) {
4227 (Some(col_name), _) => col_name.to_string(),
4228 (None, Some(name)) => name.to_string(),
4229 (None, None) => format!("{}", i + 1),
4230 }
4231 }
4232 _ => "expr".to_string(),
4233 })
4234 .join("_");
4235 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4236 .expect("write on strings cannot fail");
4237 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4238 }
4239
4240 if !*if_not_exists {
4241 scx.catalog.find_available_name(idx_name)
4242 } else {
4243 idx_name
4244 }
4245 };
4246
4247 let full_name = scx.catalog.resolve_full_name(&index_name);
4249 let partial_name = PartialItemName::from(full_name.clone());
4250 if let (Ok(item), false, false) = (
4258 scx.catalog.resolve_item_or_type(&partial_name),
4259 *if_not_exists,
4260 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4261 ) {
4262 return Err(PlanError::ItemAlreadyExists {
4263 name: full_name.to_string(),
4264 item_type: item.item_type(),
4265 });
4266 }
4267
4268 let options = plan_index_options(scx, with_options.clone())?;
4269 let cluster_id = match in_cluster {
4270 None => scx.resolve_cluster(None)?.id(),
4271 Some(in_cluster) => in_cluster.id,
4272 };
4273
4274 *in_cluster = Some(ResolvedClusterName {
4275 id: cluster_id,
4276 print_name: None,
4277 });
4278
4279 *name = Some(Ident::new(index_name.item.clone())?);
4281 *key_parts = Some(filled_key_parts);
4282 let if_not_exists = *if_not_exists;
4283
4284 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4285 let compaction_window = options.iter().find_map(|o| {
4286 #[allow(irrefutable_let_patterns)]
4287 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4288 Some(lcw.clone())
4289 } else {
4290 None
4291 }
4292 });
4293
4294 Ok(Plan::CreateIndex(CreateIndexPlan {
4295 name: index_name,
4296 index: Index {
4297 create_sql,
4298 on: on.global_id(),
4299 keys,
4300 cluster_id,
4301 compaction_window,
4302 },
4303 if_not_exists,
4304 }))
4305}
4306
4307pub fn describe_create_type(
4308 _: &StatementContext,
4309 _: CreateTypeStatement<Aug>,
4310) -> Result<StatementDesc, PlanError> {
4311 Ok(StatementDesc::new(None))
4312}
4313
4314pub fn plan_create_type(
4315 scx: &StatementContext,
4316 stmt: CreateTypeStatement<Aug>,
4317) -> Result<Plan, PlanError> {
4318 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4319 let CreateTypeStatement { name, as_type, .. } = stmt;
4320
4321 fn validate_data_type(
4322 scx: &StatementContext,
4323 data_type: ResolvedDataType,
4324 as_type: &str,
4325 key: &str,
4326 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4327 let (id, modifiers) = match data_type {
4328 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4329 _ => sql_bail!(
4330 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4331 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4332 as_type,
4333 key,
4334 data_type.human_readable_name(),
4335 ),
4336 };
4337
4338 let item = scx.catalog.get_item(&id);
4339 match item.type_details() {
4340 None => sql_bail!(
4341 "{} must be of class type, but received {} which is of class {}",
4342 key,
4343 scx.catalog.resolve_full_name(item.name()),
4344 item.item_type()
4345 ),
4346 Some(CatalogTypeDetails {
4347 typ: CatalogType::Char,
4348 ..
4349 }) => {
4350 bail_unsupported!("embedding char type in a list or map")
4351 }
4352 _ => {
4353 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4355
4356 Ok((id, modifiers))
4357 }
4358 }
4359 }
4360
4361 let inner = match as_type {
4362 CreateTypeAs::List { options } => {
4363 let CreateTypeListOptionExtracted {
4364 element_type,
4365 seen: _,
4366 } = CreateTypeListOptionExtracted::try_from(options)?;
4367 let element_type =
4368 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4369 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4370 CatalogType::List {
4371 element_reference: id,
4372 element_modifiers: modifiers,
4373 }
4374 }
4375 CreateTypeAs::Map { options } => {
4376 let CreateTypeMapOptionExtracted {
4377 key_type,
4378 value_type,
4379 seen: _,
4380 } = CreateTypeMapOptionExtracted::try_from(options)?;
4381 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4382 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4383 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4384 let (value_id, value_modifiers) =
4385 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4386 CatalogType::Map {
4387 key_reference: key_id,
4388 key_modifiers,
4389 value_reference: value_id,
4390 value_modifiers,
4391 }
4392 }
4393 CreateTypeAs::Record { column_defs } => {
4394 let mut fields = vec![];
4395 for column_def in column_defs {
4396 let data_type = column_def.data_type;
4397 let key = ident(column_def.name.clone());
4398 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4399 fields.push(CatalogRecordField {
4400 name: ColumnName::from(key.clone()),
4401 type_reference: id,
4402 type_modifiers: modifiers,
4403 });
4404 }
4405 CatalogType::Record { fields }
4406 }
4407 };
4408
4409 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4410
4411 let full_name = scx.catalog.resolve_full_name(&name);
4413 let partial_name = PartialItemName::from(full_name.clone());
4414 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4417 if item.item_type().conflicts_with_type() {
4418 return Err(PlanError::ItemAlreadyExists {
4419 name: full_name.to_string(),
4420 item_type: item.item_type(),
4421 });
4422 }
4423 }
4424
4425 Ok(Plan::CreateType(CreateTypePlan {
4426 name,
4427 typ: Type { create_sql, inner },
4428 }))
4429}
4430
4431generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4432
4433generate_extracted_config!(
4434 CreateTypeMapOption,
4435 (KeyType, ResolvedDataType),
4436 (ValueType, ResolvedDataType)
4437);
4438
4439#[derive(Debug)]
4440pub enum PlannedAlterRoleOption {
4441 Attributes(PlannedRoleAttributes),
4442 Variable(PlannedRoleVariable),
4443}
4444
4445#[derive(Debug, Clone)]
4446pub struct PlannedRoleAttributes {
4447 pub inherit: Option<bool>,
4448 pub password: Option<Password>,
4449 pub scram_iterations: Option<NonZeroU32>,
4450 pub nopassword: Option<bool>,
4454 pub superuser: Option<bool>,
4455 pub login: Option<bool>,
4456}
4457
4458fn plan_role_attributes(
4459 options: Vec<RoleAttribute>,
4460 scx: &StatementContext,
4461) -> Result<PlannedRoleAttributes, PlanError> {
4462 let mut planned_attributes = PlannedRoleAttributes {
4463 inherit: None,
4464 password: None,
4465 scram_iterations: None,
4466 superuser: None,
4467 login: None,
4468 nopassword: None,
4469 };
4470
4471 for option in options {
4472 match option {
4473 RoleAttribute::Inherit | RoleAttribute::NoInherit
4474 if planned_attributes.inherit.is_some() =>
4475 {
4476 sql_bail!("conflicting or redundant options");
4477 }
4478 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4479 bail_never_supported!(
4480 "CREATECLUSTER attribute",
4481 "sql/create-role/#details",
4482 "Use system privileges instead."
4483 );
4484 }
4485 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4486 bail_never_supported!(
4487 "CREATEDB attribute",
4488 "sql/create-role/#details",
4489 "Use system privileges instead."
4490 );
4491 }
4492 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4493 bail_never_supported!(
4494 "CREATEROLE attribute",
4495 "sql/create-role/#details",
4496 "Use system privileges instead."
4497 );
4498 }
4499 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4500 sql_bail!("conflicting or redundant options");
4501 }
4502
4503 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4504 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4505 RoleAttribute::Password(password) => {
4506 if let Some(password) = password {
4507 planned_attributes.password = Some(password.into());
4508 planned_attributes.scram_iterations =
4509 Some(scx.catalog.system_vars().scram_iterations())
4510 } else {
4511 planned_attributes.nopassword = Some(true);
4512 }
4513 }
4514 RoleAttribute::SuperUser => {
4515 if planned_attributes.superuser == Some(false) {
4516 sql_bail!("conflicting or redundant options");
4517 }
4518 planned_attributes.superuser = Some(true);
4519 }
4520 RoleAttribute::NoSuperUser => {
4521 if planned_attributes.superuser == Some(true) {
4522 sql_bail!("conflicting or redundant options");
4523 }
4524 planned_attributes.superuser = Some(false);
4525 }
4526 RoleAttribute::Login => {
4527 if planned_attributes.login == Some(false) {
4528 sql_bail!("conflicting or redundant options");
4529 }
4530 planned_attributes.login = Some(true);
4531 }
4532 RoleAttribute::NoLogin => {
4533 if planned_attributes.login == Some(true) {
4534 sql_bail!("conflicting or redundant options");
4535 }
4536 planned_attributes.login = Some(false);
4537 }
4538 }
4539 }
4540 if planned_attributes.inherit == Some(false) {
4541 bail_unsupported!("non inherit roles");
4542 }
4543
4544 Ok(planned_attributes)
4545}
4546
4547#[derive(Debug)]
4548pub enum PlannedRoleVariable {
4549 Set { name: String, value: VariableValue },
4550 Reset { name: String },
4551}
4552
4553impl PlannedRoleVariable {
4554 pub fn name(&self) -> &str {
4555 match self {
4556 PlannedRoleVariable::Set { name, .. } => name,
4557 PlannedRoleVariable::Reset { name } => name,
4558 }
4559 }
4560}
4561
4562fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4563 let plan = match variable {
4564 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4565 name: name.to_string(),
4566 value: scl::plan_set_variable_to(value)?,
4567 },
4568 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4569 name: name.to_string(),
4570 },
4571 };
4572 Ok(plan)
4573}
4574
4575pub fn describe_create_role(
4576 _: &StatementContext,
4577 _: CreateRoleStatement,
4578) -> Result<StatementDesc, PlanError> {
4579 Ok(StatementDesc::new(None))
4580}
4581
4582pub fn plan_create_role(
4583 scx: &StatementContext,
4584 CreateRoleStatement { name, options }: CreateRoleStatement,
4585) -> Result<Plan, PlanError> {
4586 let attributes = plan_role_attributes(options, scx)?;
4587 Ok(Plan::CreateRole(CreateRolePlan {
4588 name: normalize::ident(name),
4589 attributes: attributes.into(),
4590 }))
4591}
4592
4593pub fn plan_create_network_policy(
4594 ctx: &StatementContext,
4595 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4596) -> Result<Plan, PlanError> {
4597 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4598 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4599
4600 let Some(rule_defs) = policy_options.rules else {
4601 sql_bail!("RULES must be specified when creating network policies.");
4602 };
4603
4604 let mut rules = vec![];
4605 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4606 let NetworkPolicyRuleOptionExtracted {
4607 seen: _,
4608 direction,
4609 action,
4610 address,
4611 } = options.try_into()?;
4612 let (direction, action, address) = match (direction, action, address) {
4613 (Some(direction), Some(action), Some(address)) => (
4614 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4615 NetworkPolicyRuleAction::try_from(action.as_str())?,
4616 PolicyAddress::try_from(address.as_str())?,
4617 ),
4618 (_, _, _) => {
4619 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4620 }
4621 };
4622 rules.push(NetworkPolicyRule {
4623 name: normalize::ident(name),
4624 direction,
4625 action,
4626 address,
4627 });
4628 }
4629
4630 if rules.len()
4631 > ctx
4632 .catalog
4633 .system_vars()
4634 .max_rules_per_network_policy()
4635 .try_into()?
4636 {
4637 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4638 }
4639
4640 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4641 name: normalize::ident(name),
4642 rules,
4643 }))
4644}
4645
4646pub fn plan_alter_network_policy(
4647 ctx: &StatementContext,
4648 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4649) -> Result<Plan, PlanError> {
4650 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4651
4652 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4653 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4654
4655 let Some(rule_defs) = policy_options.rules else {
4656 sql_bail!("RULES must be specified when creating network policies.");
4657 };
4658
4659 let mut rules = vec![];
4660 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4661 let NetworkPolicyRuleOptionExtracted {
4662 seen: _,
4663 direction,
4664 action,
4665 address,
4666 } = options.try_into()?;
4667
4668 let (direction, action, address) = match (direction, action, address) {
4669 (Some(direction), Some(action), Some(address)) => (
4670 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4671 NetworkPolicyRuleAction::try_from(action.as_str())?,
4672 PolicyAddress::try_from(address.as_str())?,
4673 ),
4674 (_, _, _) => {
4675 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4676 }
4677 };
4678 rules.push(NetworkPolicyRule {
4679 name: normalize::ident(name),
4680 direction,
4681 action,
4682 address,
4683 });
4684 }
4685 if rules.len()
4686 > ctx
4687 .catalog
4688 .system_vars()
4689 .max_rules_per_network_policy()
4690 .try_into()?
4691 {
4692 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4693 }
4694
4695 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4696 id: policy.id(),
4697 name: normalize::ident(name),
4698 rules,
4699 }))
4700}
4701
4702pub fn describe_create_cluster(
4703 _: &StatementContext,
4704 _: CreateClusterStatement<Aug>,
4705) -> Result<StatementDesc, PlanError> {
4706 Ok(StatementDesc::new(None))
4707}
4708
4709generate_extracted_config!(
4715 ClusterOption,
4716 (AvailabilityZones, Vec<String>),
4717 (Disk, bool),
4718 (IntrospectionDebugging, bool),
4719 (IntrospectionInterval, OptionalDuration),
4720 (Managed, bool),
4721 (Replicas, Vec<ReplicaDefinition<Aug>>),
4722 (ReplicationFactor, u32),
4723 (Size, String),
4724 (Schedule, ClusterScheduleOptionValue),
4725 (WorkloadClass, OptionalString)
4726);
4727
4728generate_extracted_config!(
4729 NetworkPolicyOption,
4730 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4731);
4732
4733generate_extracted_config!(
4734 NetworkPolicyRuleOption,
4735 (Direction, String),
4736 (Action, String),
4737 (Address, String)
4738);
4739
4740generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4741
4742generate_extracted_config!(
4743 ClusterAlterUntilReadyOption,
4744 (Timeout, Duration),
4745 (OnTimeout, String)
4746);
4747
4748generate_extracted_config!(
4749 ClusterFeature,
4750 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4751 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4752 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4753 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4754 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4755 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4756 (
4757 EnableProjectionPushdownAfterRelationCse,
4758 Option<bool>,
4759 Default(None)
4760 )
4761);
4762
4763pub fn plan_create_cluster(
4767 scx: &StatementContext,
4768 stmt: CreateClusterStatement<Aug>,
4769) -> Result<Plan, PlanError> {
4770 let plan = plan_create_cluster_inner(scx, stmt)?;
4771
4772 if let CreateClusterVariant::Managed(_) = &plan.variant {
4774 let stmt = unplan_create_cluster(scx, plan.clone())
4775 .map_err(|e| PlanError::Replan(e.to_string()))?;
4776 let create_sql = stmt.to_ast_string_stable();
4777 let stmt = parse::parse(&create_sql)
4778 .map_err(|e| PlanError::Replan(e.to_string()))?
4779 .into_element()
4780 .ast;
4781 let (stmt, _resolved_ids) =
4782 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4783 let stmt = match stmt {
4784 Statement::CreateCluster(stmt) => stmt,
4785 stmt => {
4786 return Err(PlanError::Replan(format!(
4787 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4788 )));
4789 }
4790 };
4791 let replan =
4792 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4793 if plan != replan {
4794 return Err(PlanError::Replan(format!(
4795 "replan does not match: plan={plan:?}, replan={replan:?}"
4796 )));
4797 }
4798 }
4799
4800 Ok(Plan::CreateCluster(plan))
4801}
4802
4803pub fn plan_create_cluster_inner(
4804 scx: &StatementContext,
4805 CreateClusterStatement {
4806 name,
4807 options,
4808 features,
4809 }: CreateClusterStatement<Aug>,
4810) -> Result<CreateClusterPlan, PlanError> {
4811 let ClusterOptionExtracted {
4812 availability_zones,
4813 introspection_debugging,
4814 introspection_interval,
4815 managed,
4816 replicas,
4817 replication_factor,
4818 seen: _,
4819 size,
4820 disk,
4821 schedule,
4822 workload_class,
4823 }: ClusterOptionExtracted = options.try_into()?;
4824
4825 let managed = managed.unwrap_or_else(|| replicas.is_none());
4826
4827 if !scx.catalog.active_role_id().is_system() {
4828 if !features.is_empty() {
4829 sql_bail!("FEATURES not supported for non-system users");
4830 }
4831 if workload_class.is_some() {
4832 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4833 }
4834 }
4835
4836 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4837 let workload_class = workload_class.and_then(|v| v.0);
4838
4839 if managed {
4840 if replicas.is_some() {
4841 sql_bail!("REPLICAS not supported for managed clusters");
4842 }
4843 let Some(size) = size else {
4844 sql_bail!("SIZE must be specified for managed clusters");
4845 };
4846
4847 if disk.is_some() {
4848 if scx.catalog.is_cluster_size_cc(&size) {
4852 sql_bail!(
4853 "DISK option not supported for modern cluster sizes because disk is always enabled"
4854 );
4855 }
4856
4857 scx.catalog
4858 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4859 }
4860
4861 let compute = plan_compute_replica_config(
4862 introspection_interval,
4863 introspection_debugging.unwrap_or(false),
4864 )?;
4865
4866 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4867 replication_factor.unwrap_or_else(|| {
4868 scx.catalog
4869 .system_vars()
4870 .default_cluster_replication_factor()
4871 })
4872 } else {
4873 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4874 if replication_factor.is_some() {
4875 sql_bail!(
4876 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4877 );
4878 }
4879 0
4883 };
4884 let availability_zones = availability_zones.unwrap_or_default();
4885
4886 if !availability_zones.is_empty() {
4887 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4888 }
4889
4890 let ClusterFeatureExtracted {
4892 reoptimize_imported_views,
4893 enable_eager_delta_joins,
4894 enable_new_outer_join_lowering,
4895 enable_variadic_left_join_lowering,
4896 enable_letrec_fixpoint_analysis,
4897 enable_join_prioritize_arranged,
4898 enable_projection_pushdown_after_relation_cse,
4899 seen: _,
4900 } = ClusterFeatureExtracted::try_from(features)?;
4901 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4902 reoptimize_imported_views,
4903 enable_eager_delta_joins,
4904 enable_new_outer_join_lowering,
4905 enable_variadic_left_join_lowering,
4906 enable_letrec_fixpoint_analysis,
4907 enable_join_prioritize_arranged,
4908 enable_projection_pushdown_after_relation_cse,
4909 ..Default::default()
4910 };
4911
4912 let schedule = plan_cluster_schedule(schedule)?;
4913
4914 Ok(CreateClusterPlan {
4915 name: normalize::ident(name),
4916 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4917 replication_factor,
4918 size,
4919 availability_zones,
4920 compute,
4921 optimizer_feature_overrides,
4922 schedule,
4923 }),
4924 workload_class,
4925 })
4926 } else {
4927 let Some(replica_defs) = replicas else {
4928 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4929 };
4930 if availability_zones.is_some() {
4931 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4932 }
4933 if replication_factor.is_some() {
4934 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4935 }
4936 if introspection_debugging.is_some() {
4937 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4938 }
4939 if introspection_interval.is_some() {
4940 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4941 }
4942 if size.is_some() {
4943 sql_bail!("SIZE not supported for unmanaged clusters");
4944 }
4945 if disk.is_some() {
4946 sql_bail!("DISK not supported for unmanaged clusters");
4947 }
4948 if !features.is_empty() {
4949 sql_bail!("FEATURES not supported for unmanaged clusters");
4950 }
4951 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4952 sql_bail!(
4953 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4954 );
4955 }
4956
4957 let mut replicas = vec![];
4958 for ReplicaDefinition { name, options } in replica_defs {
4959 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4960 }
4961
4962 Ok(CreateClusterPlan {
4963 name: normalize::ident(name),
4964 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4965 workload_class,
4966 })
4967 }
4968}
4969
4970pub fn unplan_create_cluster(
4974 scx: &StatementContext,
4975 CreateClusterPlan {
4976 name,
4977 variant,
4978 workload_class,
4979 }: CreateClusterPlan,
4980) -> Result<CreateClusterStatement<Aug>, PlanError> {
4981 match variant {
4982 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4983 replication_factor,
4984 size,
4985 availability_zones,
4986 compute,
4987 optimizer_feature_overrides,
4988 schedule,
4989 }) => {
4990 let schedule = unplan_cluster_schedule(schedule);
4991 let OptimizerFeatureOverrides {
4992 enable_guard_subquery_tablefunc: _,
4993 enable_consolidate_after_union_negate: _,
4994 enable_reduce_mfp_fusion: _,
4995 enable_cardinality_estimates: _,
4996 persist_fast_path_limit: _,
4997 reoptimize_imported_views,
4998 enable_eager_delta_joins,
4999 enable_new_outer_join_lowering,
5000 enable_variadic_left_join_lowering,
5001 enable_letrec_fixpoint_analysis,
5002 enable_reduce_reduction: _,
5003 enable_join_prioritize_arranged,
5004 enable_projection_pushdown_after_relation_cse,
5005 enable_less_reduce_in_eqprop: _,
5006 enable_dequadratic_eqprop_map: _,
5007 enable_eq_classes_withholding_errors: _,
5008 enable_fast_path_plan_insights: _,
5009 enable_cast_elimination: _,
5010 } = optimizer_feature_overrides;
5011 let features_extracted = ClusterFeatureExtracted {
5013 seen: Default::default(),
5015 reoptimize_imported_views,
5016 enable_eager_delta_joins,
5017 enable_new_outer_join_lowering,
5018 enable_variadic_left_join_lowering,
5019 enable_letrec_fixpoint_analysis,
5020 enable_join_prioritize_arranged,
5021 enable_projection_pushdown_after_relation_cse,
5022 };
5023 let features = features_extracted.into_values(scx.catalog);
5024 let availability_zones = if availability_zones.is_empty() {
5025 None
5026 } else {
5027 Some(availability_zones)
5028 };
5029 let (introspection_interval, introspection_debugging) =
5030 unplan_compute_replica_config(compute);
5031 let replication_factor = match &schedule {
5034 ClusterScheduleOptionValue::Manual => Some(replication_factor),
5035 ClusterScheduleOptionValue::Refresh { .. } => {
5036 assert!(
5037 replication_factor <= 1,
5038 "replication factor, {replication_factor:?}, must be <= 1"
5039 );
5040 None
5041 }
5042 };
5043 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
5044 let options_extracted = ClusterOptionExtracted {
5045 seen: Default::default(),
5047 availability_zones,
5048 disk: None,
5049 introspection_debugging: Some(introspection_debugging),
5050 introspection_interval,
5051 managed: Some(true),
5052 replicas: None,
5053 replication_factor,
5054 size: Some(size),
5055 schedule: Some(schedule),
5056 workload_class,
5057 };
5058 let options = options_extracted.into_values(scx.catalog);
5059 let name = Ident::new_unchecked(name);
5060 Ok(CreateClusterStatement {
5061 name,
5062 options,
5063 features,
5064 })
5065 }
5066 CreateClusterVariant::Unmanaged(_) => {
5067 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5068 }
5069 }
5070}
5071
5072generate_extracted_config!(
5073 ReplicaOption,
5074 (AvailabilityZone, String),
5075 (BilledAs, String),
5076 (ComputeAddresses, Vec<String>),
5077 (ComputectlAddresses, Vec<String>),
5078 (Disk, bool),
5079 (Internal, bool, Default(false)),
5080 (IntrospectionDebugging, bool, Default(false)),
5081 (IntrospectionInterval, OptionalDuration),
5082 (Size, String),
5083 (StorageAddresses, Vec<String>),
5084 (StoragectlAddresses, Vec<String>),
5085 (Workers, u16)
5086);
5087
5088fn plan_replica_config(
5089 scx: &StatementContext,
5090 options: Vec<ReplicaOption<Aug>>,
5091) -> Result<ReplicaConfig, PlanError> {
5092 let ReplicaOptionExtracted {
5093 availability_zone,
5094 billed_as,
5095 computectl_addresses,
5096 disk,
5097 internal,
5098 introspection_debugging,
5099 introspection_interval,
5100 size,
5101 storagectl_addresses,
5102 ..
5103 }: ReplicaOptionExtracted = options.try_into()?;
5104
5105 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5106
5107 match (
5108 size,
5109 availability_zone,
5110 billed_as,
5111 storagectl_addresses,
5112 computectl_addresses,
5113 ) {
5114 (None, _, None, None, None) => {
5116 sql_bail!("SIZE option must be specified");
5119 }
5120 (Some(size), availability_zone, billed_as, None, None) => {
5121 if disk.is_some() {
5122 if scx.catalog.is_cluster_size_cc(&size) {
5126 sql_bail!(
5127 "DISK option not supported for modern cluster sizes because disk is always enabled"
5128 );
5129 }
5130
5131 scx.catalog
5132 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5133 }
5134
5135 Ok(ReplicaConfig::Orchestrated {
5136 size,
5137 availability_zone,
5138 compute,
5139 billed_as,
5140 internal,
5141 })
5142 }
5143
5144 (None, None, None, storagectl_addresses, computectl_addresses) => {
5145 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5146
5147 let Some(storagectl_addrs) = storagectl_addresses else {
5151 sql_bail!("missing STORAGECTL ADDRESSES option");
5152 };
5153 let Some(computectl_addrs) = computectl_addresses else {
5154 sql_bail!("missing COMPUTECTL ADDRESSES option");
5155 };
5156
5157 if storagectl_addrs.len() != computectl_addrs.len() {
5158 sql_bail!(
5159 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5160 );
5161 }
5162
5163 if disk.is_some() {
5164 sql_bail!("DISK can't be specified for unorchestrated clusters");
5165 }
5166
5167 Ok(ReplicaConfig::Unorchestrated {
5168 storagectl_addrs,
5169 computectl_addrs,
5170 compute,
5171 })
5172 }
5173 _ => {
5174 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5177 }
5178 }
5179}
5180
5181fn plan_compute_replica_config(
5185 introspection_interval: Option<OptionalDuration>,
5186 introspection_debugging: bool,
5187) -> Result<ComputeReplicaConfig, PlanError> {
5188 let introspection_interval = introspection_interval
5189 .map(|OptionalDuration(i)| i)
5190 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5191 let introspection = match introspection_interval {
5192 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5193 interval,
5194 debugging: introspection_debugging,
5195 }),
5196 None if introspection_debugging => {
5197 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5198 }
5199 None => None,
5200 };
5201 let compute = ComputeReplicaConfig { introspection };
5202 Ok(compute)
5203}
5204
5205fn unplan_compute_replica_config(
5209 compute_replica_config: ComputeReplicaConfig,
5210) -> (Option<OptionalDuration>, bool) {
5211 match compute_replica_config.introspection {
5212 Some(ComputeReplicaIntrospectionConfig {
5213 debugging,
5214 interval,
5215 }) => (Some(OptionalDuration(Some(interval))), debugging),
5216 None => (Some(OptionalDuration(None)), false),
5217 }
5218}
5219
5220fn plan_cluster_schedule(
5224 schedule: ClusterScheduleOptionValue,
5225) -> Result<ClusterSchedule, PlanError> {
5226 Ok(match schedule {
5227 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5228 ClusterScheduleOptionValue::Refresh {
5230 hydration_time_estimate: None,
5231 } => ClusterSchedule::Refresh {
5232 hydration_time_estimate: Duration::from_millis(0),
5233 },
5234 ClusterScheduleOptionValue::Refresh {
5236 hydration_time_estimate: Some(interval_value),
5237 } => {
5238 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5239 if interval.as_microseconds() < 0 {
5240 sql_bail!(
5241 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5242 interval
5243 );
5244 }
5245 if interval.months != 0 {
5246 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5250 }
5251 let duration = interval.duration()?;
5252 if u64::try_from(duration.as_millis()).is_err()
5253 || Interval::from_duration(&duration).is_err()
5254 {
5255 sql_bail!("HYDRATION TIME ESTIMATE too large");
5256 }
5257 ClusterSchedule::Refresh {
5258 hydration_time_estimate: duration,
5259 }
5260 }
5261 })
5262}
5263
5264fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5268 match schedule {
5269 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5270 ClusterSchedule::Refresh {
5271 hydration_time_estimate,
5272 } => {
5273 let interval = Interval::from_duration(&hydration_time_estimate)
5274 .expect("planning ensured that this is convertible back to Interval");
5275 let interval_value = literal::unplan_interval(&interval);
5276 ClusterScheduleOptionValue::Refresh {
5277 hydration_time_estimate: Some(interval_value),
5278 }
5279 }
5280 }
5281}
5282
5283pub fn describe_create_cluster_replica(
5284 _: &StatementContext,
5285 _: CreateClusterReplicaStatement<Aug>,
5286) -> Result<StatementDesc, PlanError> {
5287 Ok(StatementDesc::new(None))
5288}
5289
5290pub fn plan_create_cluster_replica(
5291 scx: &StatementContext,
5292 CreateClusterReplicaStatement {
5293 definition: ReplicaDefinition { name, options },
5294 of_cluster,
5295 }: CreateClusterReplicaStatement<Aug>,
5296) -> Result<Plan, PlanError> {
5297 let cluster = scx
5298 .catalog
5299 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5300 let current_replica_count = cluster.replica_ids().iter().count();
5301 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5302 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5303 return Err(PlanError::CreateReplicaFailStorageObjects {
5304 current_replica_count,
5305 internal_replica_count,
5306 hypothetical_replica_count: current_replica_count + 1,
5307 });
5308 }
5309
5310 let config = plan_replica_config(scx, options)?;
5311
5312 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5313 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5314 return Err(PlanError::MangedReplicaName(name.into_string()));
5315 }
5316 } else {
5317 ensure_cluster_is_not_managed(scx, cluster.id())?;
5318 }
5319
5320 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5321 name: normalize::ident(name),
5322 cluster_id: cluster.id(),
5323 config,
5324 }))
5325}
5326
5327pub fn describe_create_secret(
5328 _: &StatementContext,
5329 _: CreateSecretStatement<Aug>,
5330) -> Result<StatementDesc, PlanError> {
5331 Ok(StatementDesc::new(None))
5332}
5333
5334pub fn plan_create_secret(
5335 scx: &StatementContext,
5336 stmt: CreateSecretStatement<Aug>,
5337) -> Result<Plan, PlanError> {
5338 let CreateSecretStatement {
5339 name,
5340 if_not_exists,
5341 value,
5342 } = &stmt;
5343
5344 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5345 let mut create_sql_statement = stmt.clone();
5346 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5347 let create_sql =
5348 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5349 let secret_as = query::plan_secret_as(scx, value.clone())?;
5350
5351 let secret = Secret {
5352 create_sql,
5353 secret_as,
5354 };
5355
5356 Ok(Plan::CreateSecret(CreateSecretPlan {
5357 name,
5358 secret,
5359 if_not_exists: *if_not_exists,
5360 }))
5361}
5362
5363pub fn describe_create_connection(
5364 _: &StatementContext,
5365 _: CreateConnectionStatement<Aug>,
5366) -> Result<StatementDesc, PlanError> {
5367 Ok(StatementDesc::new(None))
5368}
5369
5370generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5371
5372pub fn plan_create_connection(
5373 scx: &StatementContext,
5374 mut stmt: CreateConnectionStatement<Aug>,
5375) -> Result<Plan, PlanError> {
5376 let CreateConnectionStatement {
5377 name,
5378 connection_type,
5379 values,
5380 if_not_exists,
5381 with_options,
5382 } = stmt.clone();
5383 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5384 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5385 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5386
5387 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5388 if options.validate.is_some() {
5389 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5390 }
5391 let validate = match options.validate {
5392 Some(val) => val,
5393 None => {
5394 scx.catalog
5395 .system_vars()
5396 .enable_default_connection_validation()
5397 && details.to_connection().validate_by_default()
5398 }
5399 };
5400
5401 let full_name = scx.catalog.resolve_full_name(&name);
5403 let partial_name = PartialItemName::from(full_name.clone());
5404 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5405 return Err(PlanError::ItemAlreadyExists {
5406 name: full_name.to_string(),
5407 item_type: item.item_type(),
5408 });
5409 }
5410
5411 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5414 stmt.values.retain(|v| {
5415 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5416 });
5417 stmt.values.push(ConnectionOption {
5418 name: ConnectionOptionName::PublicKey1,
5419 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5420 });
5421 stmt.values.push(ConnectionOption {
5422 name: ConnectionOptionName::PublicKey2,
5423 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5424 });
5425 }
5426 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5427
5428 let plan = CreateConnectionPlan {
5429 name,
5430 if_not_exists,
5431 connection: crate::plan::Connection {
5432 create_sql,
5433 details,
5434 },
5435 validate,
5436 };
5437 Ok(Plan::CreateConnection(plan))
5438}
5439
5440fn plan_drop_database(
5441 scx: &StatementContext,
5442 if_exists: bool,
5443 name: &UnresolvedDatabaseName,
5444 cascade: bool,
5445) -> Result<Option<DatabaseId>, PlanError> {
5446 Ok(match resolve_database(scx, name, if_exists)? {
5447 Some(database) => {
5448 if !cascade && database.has_schemas() {
5449 sql_bail!(
5450 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5451 name,
5452 );
5453 }
5454 Some(database.id())
5455 }
5456 None => None,
5457 })
5458}
5459
5460pub fn describe_drop_objects(
5461 _: &StatementContext,
5462 _: DropObjectsStatement,
5463) -> Result<StatementDesc, PlanError> {
5464 Ok(StatementDesc::new(None))
5465}
5466
5467pub fn plan_drop_objects(
5468 scx: &mut StatementContext,
5469 DropObjectsStatement {
5470 object_type,
5471 if_exists,
5472 names,
5473 cascade,
5474 }: DropObjectsStatement,
5475) -> Result<Plan, PlanError> {
5476 assert_ne!(
5477 object_type,
5478 mz_sql_parser::ast::ObjectType::Func,
5479 "rejected in parser"
5480 );
5481 let object_type = object_type.into();
5482
5483 let mut referenced_ids = Vec::new();
5484 for name in names {
5485 let id = match &name {
5486 UnresolvedObjectName::Cluster(name) => {
5487 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5488 }
5489 UnresolvedObjectName::ClusterReplica(name) => {
5490 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5491 }
5492 UnresolvedObjectName::Database(name) => {
5493 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5494 }
5495 UnresolvedObjectName::Schema(name) => {
5496 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5497 }
5498 UnresolvedObjectName::Role(name) => {
5499 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5500 }
5501 UnresolvedObjectName::Item(name) => {
5502 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5503 .map(ObjectId::Item)
5504 }
5505 UnresolvedObjectName::NetworkPolicy(name) => {
5506 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5507 }
5508 };
5509 match id {
5510 Some(id) => referenced_ids.push(id),
5511 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5512 name: name.to_ast_string_simple(),
5513 object_type,
5514 }),
5515 }
5516 }
5517 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5518
5519 Ok(Plan::DropObjects(DropObjectsPlan {
5520 referenced_ids,
5521 drop_ids,
5522 object_type,
5523 }))
5524}
5525
5526fn plan_drop_schema(
5527 scx: &StatementContext,
5528 if_exists: bool,
5529 name: &UnresolvedSchemaName,
5530 cascade: bool,
5531) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5532 let normalized = normalize::unresolved_schema_name(name.clone())?;
5536 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5537 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5538 }
5539
5540 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5541 Some((database_spec, schema_spec)) => {
5542 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5543 sql_bail!(
5544 "cannot drop schema {name} because it is required by the database system",
5545 );
5546 }
5547 if let SchemaSpecifier::Temporary = schema_spec {
5548 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5549 }
5550 let schema = scx.get_schema(&database_spec, &schema_spec);
5551 if !cascade && schema.has_items() {
5552 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5553 sql_bail!(
5554 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5555 full_schema_name
5556 );
5557 }
5558 Some((database_spec, schema_spec))
5559 }
5560 None => None,
5561 })
5562}
5563
5564fn plan_drop_role(
5565 scx: &StatementContext,
5566 if_exists: bool,
5567 name: &Ident,
5568) -> Result<Option<RoleId>, PlanError> {
5569 match scx.catalog.resolve_role(name.as_str()) {
5570 Ok(role) => {
5571 let id = role.id();
5572 if &id == scx.catalog.active_role_id() {
5573 sql_bail!("current role cannot be dropped");
5574 }
5575 for role in scx.catalog.get_roles() {
5576 for (member_id, grantor_id) in role.membership() {
5577 if &id == grantor_id {
5578 let member_role = scx.catalog.get_role(member_id);
5579 sql_bail!(
5580 "cannot drop role {}: still depended up by membership of role {} in role {}",
5581 name.as_str(),
5582 role.name(),
5583 member_role.name()
5584 );
5585 }
5586 }
5587 }
5588 Ok(Some(role.id()))
5589 }
5590 Err(_) if if_exists => Ok(None),
5591 Err(e) => Err(e.into()),
5592 }
5593}
5594
5595fn plan_drop_cluster(
5596 scx: &StatementContext,
5597 if_exists: bool,
5598 name: &Ident,
5599 cascade: bool,
5600) -> Result<Option<ClusterId>, PlanError> {
5601 Ok(match resolve_cluster(scx, name, if_exists)? {
5602 Some(cluster) => {
5603 if !cascade && !cluster.bound_objects().is_empty() {
5604 return Err(PlanError::DependentObjectsStillExist {
5605 object_type: "cluster".to_string(),
5606 object_name: cluster.name().to_string(),
5607 dependents: Vec::new(),
5608 });
5609 }
5610 Some(cluster.id())
5611 }
5612 None => None,
5613 })
5614}
5615
5616fn plan_drop_network_policy(
5617 scx: &StatementContext,
5618 if_exists: bool,
5619 name: &Ident,
5620) -> Result<Option<NetworkPolicyId>, PlanError> {
5621 match scx.catalog.resolve_network_policy(name.as_str()) {
5622 Ok(policy) => {
5623 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5626 Err(PlanError::NetworkPolicyInUse)
5627 } else {
5628 Ok(Some(policy.id()))
5629 }
5630 }
5631 Err(_) if if_exists => Ok(None),
5632 Err(e) => Err(e.into()),
5633 }
5634}
5635
5636fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5639 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5641 false
5642 } else {
5643 cluster.bound_objects().iter().any(|id| {
5645 let item = scx.catalog.get_item(id);
5646 matches!(
5647 item.item_type(),
5648 CatalogItemType::Sink | CatalogItemType::Source
5649 )
5650 })
5651 }
5652}
5653
5654fn plan_drop_cluster_replica(
5655 scx: &StatementContext,
5656 if_exists: bool,
5657 name: &QualifiedReplica,
5658) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5659 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5660 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5661}
5662
5663fn plan_drop_item(
5665 scx: &StatementContext,
5666 object_type: ObjectType,
5667 if_exists: bool,
5668 name: UnresolvedItemName,
5669 cascade: bool,
5670) -> Result<Option<CatalogItemId>, PlanError> {
5671 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5672 Ok(r) => r,
5673 Err(PlanError::MismatchedObjectType {
5675 name,
5676 is_type: ObjectType::MaterializedView,
5677 expected_type: ObjectType::View,
5678 }) => {
5679 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5680 }
5681 e => e?,
5682 };
5683
5684 Ok(match resolved {
5685 Some(catalog_item) => {
5686 if catalog_item.id().is_system() {
5687 sql_bail!(
5688 "cannot drop {} {} because it is required by the database system",
5689 catalog_item.item_type(),
5690 scx.catalog.minimal_qualification(catalog_item.name()),
5691 );
5692 }
5693
5694 if !cascade {
5695 for id in catalog_item.used_by() {
5696 let dep = scx.catalog.get_item(id);
5697 if dependency_prevents_drop(object_type, dep) {
5698 return Err(PlanError::DependentObjectsStillExist {
5699 object_type: catalog_item.item_type().to_string(),
5700 object_name: scx
5701 .catalog
5702 .minimal_qualification(catalog_item.name())
5703 .to_string(),
5704 dependents: vec![(
5705 dep.item_type().to_string(),
5706 scx.catalog.minimal_qualification(dep.name()).to_string(),
5707 )],
5708 });
5709 }
5710 }
5711 }
5714 Some(catalog_item.id())
5715 }
5716 None => None,
5717 })
5718}
5719
5720fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5722 match object_type {
5723 ObjectType::Type => true,
5724 _ => match dep.item_type() {
5725 CatalogItemType::Func
5726 | CatalogItemType::Table
5727 | CatalogItemType::Source
5728 | CatalogItemType::View
5729 | CatalogItemType::MaterializedView
5730 | CatalogItemType::Sink
5731 | CatalogItemType::Type
5732 | CatalogItemType::Secret
5733 | CatalogItemType::Connection
5734 | CatalogItemType::ContinualTask => true,
5735 CatalogItemType::Index => false,
5736 },
5737 }
5738}
5739
5740pub fn describe_alter_index_options(
5741 _: &StatementContext,
5742 _: AlterIndexStatement<Aug>,
5743) -> Result<StatementDesc, PlanError> {
5744 Ok(StatementDesc::new(None))
5745}
5746
5747pub fn describe_drop_owned(
5748 _: &StatementContext,
5749 _: DropOwnedStatement<Aug>,
5750) -> Result<StatementDesc, PlanError> {
5751 Ok(StatementDesc::new(None))
5752}
5753
5754pub fn plan_drop_owned(
5755 scx: &StatementContext,
5756 drop: DropOwnedStatement<Aug>,
5757) -> Result<Plan, PlanError> {
5758 let cascade = drop.cascade();
5759 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5760 let mut drop_ids = Vec::new();
5761 let mut privilege_revokes = Vec::new();
5762 let mut default_privilege_revokes = Vec::new();
5763
5764 fn update_privilege_revokes(
5765 object_id: SystemObjectId,
5766 privileges: &PrivilegeMap,
5767 role_ids: &BTreeSet<RoleId>,
5768 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5769 ) {
5770 privilege_revokes.extend(iter::zip(
5771 iter::repeat(object_id),
5772 privileges
5773 .all_values()
5774 .filter(|privilege| role_ids.contains(&privilege.grantee))
5775 .cloned(),
5776 ));
5777 }
5778
5779 for replica in scx.catalog.get_cluster_replicas() {
5781 if role_ids.contains(&replica.owner_id()) {
5782 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5783 }
5784 }
5785
5786 for cluster in scx.catalog.get_clusters() {
5788 if role_ids.contains(&cluster.owner_id()) {
5789 if !cascade {
5791 let non_owned_bound_objects: Vec<_> = cluster
5792 .bound_objects()
5793 .into_iter()
5794 .map(|item_id| scx.catalog.get_item(item_id))
5795 .filter(|item| !role_ids.contains(&item.owner_id()))
5796 .collect();
5797 if !non_owned_bound_objects.is_empty() {
5798 let names: Vec<_> = non_owned_bound_objects
5799 .into_iter()
5800 .map(|item| {
5801 (
5802 item.item_type().to_string(),
5803 scx.catalog.resolve_full_name(item.name()).to_string(),
5804 )
5805 })
5806 .collect();
5807 return Err(PlanError::DependentObjectsStillExist {
5808 object_type: "cluster".to_string(),
5809 object_name: cluster.name().to_string(),
5810 dependents: names,
5811 });
5812 }
5813 }
5814 drop_ids.push(cluster.id().into());
5815 }
5816 update_privilege_revokes(
5817 SystemObjectId::Object(cluster.id().into()),
5818 cluster.privileges(),
5819 &role_ids,
5820 &mut privilege_revokes,
5821 );
5822 }
5823
5824 for item in scx.catalog.get_items() {
5826 if role_ids.contains(&item.owner_id()) {
5827 if !cascade {
5828 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5830 let non_owned_dependencies: Vec<_> = used_by
5831 .into_iter()
5832 .map(|item_id| scx.catalog.get_item(item_id))
5833 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5834 .filter(|item| !role_ids.contains(&item.owner_id()))
5835 .collect();
5836 if !non_owned_dependencies.is_empty() {
5837 let names: Vec<_> = non_owned_dependencies
5838 .into_iter()
5839 .map(|item| {
5840 let item_typ = item.item_type().to_string();
5841 let item_name =
5842 scx.catalog.resolve_full_name(item.name()).to_string();
5843 (item_typ, item_name)
5844 })
5845 .collect();
5846 Err(PlanError::DependentObjectsStillExist {
5847 object_type: item.item_type().to_string(),
5848 object_name: scx
5849 .catalog
5850 .resolve_full_name(item.name())
5851 .to_string()
5852 .to_string(),
5853 dependents: names,
5854 })
5855 } else {
5856 Ok(())
5857 }
5858 };
5859
5860 if let Some(id) = item.progress_id() {
5863 let progress_item = scx.catalog.get_item(&id);
5864 check_if_dependents_exist(progress_item.used_by())?;
5865 }
5866 check_if_dependents_exist(item.used_by())?;
5867 }
5868 drop_ids.push(item.id().into());
5869 }
5870 update_privilege_revokes(
5871 SystemObjectId::Object(item.id().into()),
5872 item.privileges(),
5873 &role_ids,
5874 &mut privilege_revokes,
5875 );
5876 }
5877
5878 for schema in scx.catalog.get_schemas() {
5880 if !schema.id().is_temporary() {
5881 if role_ids.contains(&schema.owner_id()) {
5882 if !cascade {
5883 let non_owned_dependencies: Vec<_> = schema
5884 .item_ids()
5885 .map(|item_id| scx.catalog.get_item(&item_id))
5886 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5887 .filter(|item| !role_ids.contains(&item.owner_id()))
5888 .collect();
5889 if !non_owned_dependencies.is_empty() {
5890 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5891 sql_bail!(
5892 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5893 full_schema_name.to_string().quoted()
5894 );
5895 }
5896 }
5897 drop_ids.push((*schema.database(), *schema.id()).into())
5898 }
5899 update_privilege_revokes(
5900 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5901 schema.privileges(),
5902 &role_ids,
5903 &mut privilege_revokes,
5904 );
5905 }
5906 }
5907
5908 for database in scx.catalog.get_databases() {
5910 if role_ids.contains(&database.owner_id()) {
5911 if !cascade {
5912 let non_owned_schemas: Vec<_> = database
5913 .schemas()
5914 .into_iter()
5915 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5916 .collect();
5917 if !non_owned_schemas.is_empty() {
5918 sql_bail!(
5919 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5920 database.name().quoted(),
5921 );
5922 }
5923 }
5924 drop_ids.push(database.id().into());
5925 }
5926 update_privilege_revokes(
5927 SystemObjectId::Object(database.id().into()),
5928 database.privileges(),
5929 &role_ids,
5930 &mut privilege_revokes,
5931 );
5932 }
5933
5934 update_privilege_revokes(
5936 SystemObjectId::System,
5937 scx.catalog.get_system_privileges(),
5938 &role_ids,
5939 &mut privilege_revokes,
5940 );
5941
5942 for (default_privilege_object, default_privilege_acl_items) in
5943 scx.catalog.get_default_privileges()
5944 {
5945 for default_privilege_acl_item in default_privilege_acl_items {
5946 if role_ids.contains(&default_privilege_object.role_id)
5947 || role_ids.contains(&default_privilege_acl_item.grantee)
5948 {
5949 default_privilege_revokes.push((
5950 default_privilege_object.clone(),
5951 default_privilege_acl_item.clone(),
5952 ));
5953 }
5954 }
5955 }
5956
5957 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5958
5959 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5960 if !system_ids.is_empty() {
5961 let mut owners = system_ids
5962 .into_iter()
5963 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5964 .collect::<BTreeSet<_>>()
5965 .into_iter()
5966 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5967 sql_bail!(
5968 "cannot drop objects owned by role {} because they are required by the database system",
5969 owners.join(", "),
5970 );
5971 }
5972
5973 Ok(Plan::DropOwned(DropOwnedPlan {
5974 role_ids: role_ids.into_iter().collect(),
5975 drop_ids,
5976 privilege_revokes,
5977 default_privilege_revokes,
5978 }))
5979}
5980
5981fn plan_retain_history_option(
5982 scx: &StatementContext,
5983 retain_history: Option<OptionalDuration>,
5984) -> Result<Option<CompactionWindow>, PlanError> {
5985 if let Some(OptionalDuration(lcw)) = retain_history {
5986 Ok(Some(plan_retain_history(scx, lcw)?))
5987 } else {
5988 Ok(None)
5989 }
5990}
5991
5992fn plan_retain_history(
5998 scx: &StatementContext,
5999 lcw: Option<Duration>,
6000) -> Result<CompactionWindow, PlanError> {
6001 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
6002 match lcw {
6003 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
6008 option_name: "RETAIN HISTORY".to_string(),
6009 err: Box::new(PlanError::Unstructured(
6010 "internal error: unexpectedly zero".to_string(),
6011 )),
6012 }),
6013 Some(duration) => {
6014 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
6017 && scx
6018 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6019 .is_err()
6020 {
6021 return Err(PlanError::RetainHistoryLow {
6022 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
6023 });
6024 }
6025 Ok(duration.try_into()?)
6026 }
6027 None => {
6030 if scx
6031 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
6032 .is_err()
6033 {
6034 Err(PlanError::RetainHistoryRequired)
6035 } else {
6036 Ok(CompactionWindow::DisableCompaction)
6037 }
6038 }
6039 }
6040}
6041
6042generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
6043
6044fn plan_index_options(
6045 scx: &StatementContext,
6046 with_opts: Vec<IndexOption<Aug>>,
6047) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
6048 if !with_opts.is_empty() {
6049 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6051 }
6052
6053 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6054
6055 let mut out = Vec::with_capacity(1);
6056 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6057 out.push(crate::plan::IndexOption::RetainHistory(cw));
6058 }
6059 Ok(out)
6060}
6061
6062generate_extracted_config!(
6063 TableOption,
6064 (PartitionBy, Vec<Ident>),
6065 (RetainHistory, OptionalDuration),
6066 (RedactedTest, String)
6067);
6068
6069fn plan_table_options(
6070 scx: &StatementContext,
6071 desc: &RelationDesc,
6072 with_opts: Vec<TableOption<Aug>>,
6073) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6074 let TableOptionExtracted {
6075 partition_by,
6076 retain_history,
6077 redacted_test,
6078 ..
6079 }: TableOptionExtracted = with_opts.try_into()?;
6080
6081 if let Some(partition_by) = partition_by {
6082 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6083 check_partition_by(desc, partition_by)?;
6084 }
6085
6086 if redacted_test.is_some() {
6087 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6088 }
6089
6090 let mut out = Vec::with_capacity(1);
6091 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6092 out.push(crate::plan::TableOption::RetainHistory(cw));
6093 }
6094 Ok(out)
6095}
6096
6097pub fn plan_alter_index_options(
6098 scx: &mut StatementContext,
6099 AlterIndexStatement {
6100 index_name,
6101 if_exists,
6102 action,
6103 }: AlterIndexStatement<Aug>,
6104) -> Result<Plan, PlanError> {
6105 let object_type = ObjectType::Index;
6106 match action {
6107 AlterIndexAction::ResetOptions(options) => {
6108 let mut options = options.into_iter();
6109 if let Some(opt) = options.next() {
6110 match opt {
6111 IndexOptionName::RetainHistory => {
6112 if options.next().is_some() {
6113 sql_bail!("RETAIN HISTORY must be only option");
6114 }
6115 return alter_retain_history(
6116 scx,
6117 object_type,
6118 if_exists,
6119 UnresolvedObjectName::Item(index_name),
6120 None,
6121 );
6122 }
6123 }
6124 }
6125 sql_bail!("expected option");
6126 }
6127 AlterIndexAction::SetOptions(options) => {
6128 let mut options = options.into_iter();
6129 if let Some(opt) = options.next() {
6130 match opt.name {
6131 IndexOptionName::RetainHistory => {
6132 if options.next().is_some() {
6133 sql_bail!("RETAIN HISTORY must be only option");
6134 }
6135 return alter_retain_history(
6136 scx,
6137 object_type,
6138 if_exists,
6139 UnresolvedObjectName::Item(index_name),
6140 opt.value,
6141 );
6142 }
6143 }
6144 }
6145 sql_bail!("expected option");
6146 }
6147 }
6148}
6149
6150pub fn describe_alter_cluster_set_options(
6151 _: &StatementContext,
6152 _: AlterClusterStatement<Aug>,
6153) -> Result<StatementDesc, PlanError> {
6154 Ok(StatementDesc::new(None))
6155}
6156
6157pub fn plan_alter_cluster(
6158 scx: &mut StatementContext,
6159 AlterClusterStatement {
6160 name,
6161 action,
6162 if_exists,
6163 }: AlterClusterStatement<Aug>,
6164) -> Result<Plan, PlanError> {
6165 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6166 Some(entry) => entry,
6167 None => {
6168 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6169 name: name.to_ast_string_simple(),
6170 object_type: ObjectType::Cluster,
6171 });
6172
6173 return Ok(Plan::AlterNoop(AlterNoopPlan {
6174 object_type: ObjectType::Cluster,
6175 }));
6176 }
6177 };
6178
6179 let mut options: PlanClusterOption = Default::default();
6180 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6181
6182 match action {
6183 AlterClusterAction::SetOptions {
6184 options: set_options,
6185 with_options,
6186 } => {
6187 let ClusterOptionExtracted {
6188 availability_zones,
6189 introspection_debugging,
6190 introspection_interval,
6191 managed,
6192 replicas: replica_defs,
6193 replication_factor,
6194 seen: _,
6195 size,
6196 disk,
6197 schedule,
6198 workload_class,
6199 }: ClusterOptionExtracted = set_options.try_into()?;
6200
6201 if !scx.catalog.active_role_id().is_system() {
6202 if workload_class.is_some() {
6203 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6204 }
6205 }
6206
6207 match managed.unwrap_or_else(|| cluster.is_managed()) {
6208 true => {
6209 let alter_strategy_extracted =
6210 ClusterAlterOptionExtracted::try_from(with_options)?;
6211 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6212
6213 match alter_strategy {
6214 AlterClusterPlanStrategy::None => {}
6215 _ => {
6216 scx.require_feature_flag(
6217 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6218 )?;
6219 }
6220 }
6221
6222 if replica_defs.is_some() {
6223 sql_bail!("REPLICAS not supported for managed clusters");
6224 }
6225 if schedule.is_some()
6226 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6227 {
6228 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6229 }
6230
6231 if let Some(replication_factor) = replication_factor {
6232 if schedule.is_some()
6233 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6234 {
6235 sql_bail!(
6236 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6237 );
6238 }
6239 if let Some(current_schedule) = cluster.schedule() {
6240 if !matches!(current_schedule, ClusterSchedule::Manual) {
6241 sql_bail!(
6242 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6243 );
6244 }
6245 }
6246
6247 let internal_replica_count =
6248 cluster.replicas().iter().filter(|r| r.internal()).count();
6249 let hypothetical_replica_count =
6250 internal_replica_count + usize::cast_from(replication_factor);
6251
6252 if contains_single_replica_objects(scx, cluster)
6255 && hypothetical_replica_count > 1
6256 {
6257 return Err(PlanError::CreateReplicaFailStorageObjects {
6258 current_replica_count: cluster.replica_ids().iter().count(),
6259 internal_replica_count,
6260 hypothetical_replica_count,
6261 });
6262 }
6263 } else if alter_strategy.is_some() {
6264 let internal_replica_count =
6268 cluster.replicas().iter().filter(|r| r.internal()).count();
6269 let hypothetical_replica_count = internal_replica_count * 2;
6270 if contains_single_replica_objects(scx, cluster) {
6271 return Err(PlanError::CreateReplicaFailStorageObjects {
6272 current_replica_count: cluster.replica_ids().iter().count(),
6273 internal_replica_count,
6274 hypothetical_replica_count,
6275 });
6276 }
6277 }
6278 }
6279 false => {
6280 if !alter_strategy.is_none() {
6281 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6282 }
6283 if availability_zones.is_some() {
6284 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6285 }
6286 if replication_factor.is_some() {
6287 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6288 }
6289 if introspection_debugging.is_some() {
6290 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6291 }
6292 if introspection_interval.is_some() {
6293 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6294 }
6295 if size.is_some() {
6296 sql_bail!("SIZE not supported for unmanaged clusters");
6297 }
6298 if disk.is_some() {
6299 sql_bail!("DISK not supported for unmanaged clusters");
6300 }
6301 if schedule.is_some()
6302 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6303 {
6304 sql_bail!(
6305 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6306 );
6307 }
6308 if let Some(current_schedule) = cluster.schedule() {
6309 if !matches!(current_schedule, ClusterSchedule::Manual)
6310 && schedule.is_none()
6311 {
6312 sql_bail!(
6313 "when switching a cluster to unmanaged, if the managed \
6314 cluster's SCHEDULE is anything other than MANUAL, you have to \
6315 explicitly set the SCHEDULE to MANUAL"
6316 );
6317 }
6318 }
6319 }
6320 }
6321
6322 let mut replicas = vec![];
6323 for ReplicaDefinition { name, options } in
6324 replica_defs.into_iter().flat_map(Vec::into_iter)
6325 {
6326 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6327 }
6328
6329 if let Some(managed) = managed {
6330 options.managed = AlterOptionParameter::Set(managed);
6331 }
6332 if let Some(replication_factor) = replication_factor {
6333 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6334 }
6335 if let Some(size) = &size {
6336 options.size = AlterOptionParameter::Set(size.clone());
6337 }
6338 if let Some(availability_zones) = availability_zones {
6339 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6340 }
6341 if let Some(introspection_debugging) = introspection_debugging {
6342 options.introspection_debugging =
6343 AlterOptionParameter::Set(introspection_debugging);
6344 }
6345 if let Some(introspection_interval) = introspection_interval {
6346 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6347 }
6348 if disk.is_some() {
6349 let size = size.as_deref().unwrap_or_else(|| {
6353 cluster.managed_size().expect("cluster known to be managed")
6354 });
6355 if scx.catalog.is_cluster_size_cc(size) {
6356 sql_bail!(
6357 "DISK option not supported for modern cluster sizes because disk is always enabled"
6358 );
6359 }
6360
6361 scx.catalog
6362 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6363 }
6364 if !replicas.is_empty() {
6365 options.replicas = AlterOptionParameter::Set(replicas);
6366 }
6367 if let Some(schedule) = schedule {
6368 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6369 }
6370 if let Some(workload_class) = workload_class {
6371 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6372 }
6373 }
6374 AlterClusterAction::ResetOptions(reset_options) => {
6375 use AlterOptionParameter::Reset;
6376 use ClusterOptionName::*;
6377
6378 if !scx.catalog.active_role_id().is_system() {
6379 if reset_options.contains(&WorkloadClass) {
6380 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6381 }
6382 }
6383
6384 for option in reset_options {
6385 match option {
6386 AvailabilityZones => options.availability_zones = Reset,
6387 Disk => scx
6388 .catalog
6389 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6390 IntrospectionInterval => options.introspection_interval = Reset,
6391 IntrospectionDebugging => options.introspection_debugging = Reset,
6392 Managed => options.managed = Reset,
6393 Replicas => options.replicas = Reset,
6394 ReplicationFactor => options.replication_factor = Reset,
6395 Size => options.size = Reset,
6396 Schedule => options.schedule = Reset,
6397 WorkloadClass => options.workload_class = Reset,
6398 }
6399 }
6400 }
6401 }
6402 Ok(Plan::AlterCluster(AlterClusterPlan {
6403 id: cluster.id(),
6404 name: cluster.name().to_string(),
6405 options,
6406 strategy: alter_strategy,
6407 }))
6408}
6409
6410pub fn describe_alter_set_cluster(
6411 _: &StatementContext,
6412 _: AlterSetClusterStatement<Aug>,
6413) -> Result<StatementDesc, PlanError> {
6414 Ok(StatementDesc::new(None))
6415}
6416
6417pub fn plan_alter_item_set_cluster(
6418 scx: &StatementContext,
6419 AlterSetClusterStatement {
6420 if_exists,
6421 set_cluster: in_cluster_name,
6422 name,
6423 object_type,
6424 }: AlterSetClusterStatement<Aug>,
6425) -> Result<Plan, PlanError> {
6426 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6427
6428 let object_type = object_type.into();
6429
6430 match object_type {
6432 ObjectType::MaterializedView => {}
6433 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6434 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6435 }
6436 _ => {
6437 bail_never_supported!(
6438 format!("ALTER {object_type} SET CLUSTER"),
6439 "sql/alter-set-cluster/",
6440 format!("{object_type} has no associated cluster")
6441 )
6442 }
6443 }
6444
6445 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6446
6447 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6448 Some(entry) => {
6449 let current_cluster = entry.cluster_id();
6450 let Some(current_cluster) = current_cluster else {
6451 sql_bail!("No cluster associated with {name}");
6452 };
6453
6454 if current_cluster == in_cluster.id() {
6455 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6456 } else {
6457 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6458 id: entry.id(),
6459 set_cluster: in_cluster.id(),
6460 }))
6461 }
6462 }
6463 None => {
6464 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6465 name: name.to_ast_string_simple(),
6466 object_type,
6467 });
6468
6469 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6470 }
6471 }
6472}
6473
6474pub fn describe_alter_object_rename(
6475 _: &StatementContext,
6476 _: AlterObjectRenameStatement,
6477) -> Result<StatementDesc, PlanError> {
6478 Ok(StatementDesc::new(None))
6479}
6480
6481pub fn plan_alter_object_rename(
6482 scx: &mut StatementContext,
6483 AlterObjectRenameStatement {
6484 name,
6485 object_type,
6486 to_item_name,
6487 if_exists,
6488 }: AlterObjectRenameStatement,
6489) -> Result<Plan, PlanError> {
6490 let object_type = object_type.into();
6491 match (object_type, name) {
6492 (
6493 ObjectType::View
6494 | ObjectType::MaterializedView
6495 | ObjectType::Table
6496 | ObjectType::Source
6497 | ObjectType::Index
6498 | ObjectType::Sink
6499 | ObjectType::Secret
6500 | ObjectType::Connection,
6501 UnresolvedObjectName::Item(name),
6502 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6503 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6504 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6505 }
6506 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6507 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6508 }
6509 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6510 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6511 }
6512 (object_type, name) => {
6513 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6514 }
6515 }
6516}
6517
6518pub fn plan_alter_schema_rename(
6519 scx: &mut StatementContext,
6520 name: UnresolvedSchemaName,
6521 to_schema_name: Ident,
6522 if_exists: bool,
6523) -> Result<Plan, PlanError> {
6524 let normalized = normalize::unresolved_schema_name(name.clone())?;
6528 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6529 sql_bail!(
6530 "cannot rename schemas in the ambient database: {:?}",
6531 mz_repr::namespaces::MZ_TEMP_SCHEMA
6532 );
6533 }
6534
6535 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6536 let object_type = ObjectType::Schema;
6537 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6538 name: name.to_ast_string_simple(),
6539 object_type,
6540 });
6541 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6542 };
6543
6544 if scx
6546 .resolve_schema_in_database(&db_spec, &to_schema_name)
6547 .is_ok()
6548 {
6549 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6550 to_schema_name.clone().into_string(),
6551 )));
6552 }
6553
6554 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6556 if schema.id().is_system() {
6557 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6558 }
6559
6560 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6561 cur_schema_spec: (db_spec, schema_spec),
6562 new_schema_name: to_schema_name.into_string(),
6563 }))
6564}
6565
6566pub fn plan_alter_schema_swap<F>(
6567 scx: &mut StatementContext,
6568 name_a: UnresolvedSchemaName,
6569 name_b: Ident,
6570 gen_temp_suffix: F,
6571) -> Result<Plan, PlanError>
6572where
6573 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6574{
6575 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6579 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6580 {
6581 sql_bail!("cannot swap schemas that are in the ambient database");
6582 }
6583 let name_b_str = normalize::ident_ref(&name_b);
6585 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6586 sql_bail!("cannot swap schemas that are in the ambient database");
6587 }
6588
6589 let schema_a = scx.resolve_schema(name_a.clone())?;
6590
6591 let db_spec = schema_a.database().clone();
6592 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6593 sql_bail!("cannot swap schemas that are in the ambient database");
6594 };
6595 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6596
6597 if schema_a.id().is_system() || schema_b.id().is_system() {
6599 bail_never_supported!("swapping a system schema".to_string())
6600 }
6601
6602 let check = |temp_suffix: &str| {
6606 let mut temp_name = ident!("mz_schema_swap_");
6607 temp_name.append_lossy(temp_suffix);
6608 scx.resolve_schema_in_database(&db_spec, &temp_name)
6609 .is_err()
6610 };
6611 let temp_suffix = gen_temp_suffix(&check)?;
6612 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6613
6614 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6615 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6616 schema_a_name: schema_a.name().schema.to_string(),
6617 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6618 schema_b_name: schema_b.name().schema.to_string(),
6619 name_temp,
6620 }))
6621}
6622
6623pub fn plan_alter_item_rename(
6624 scx: &mut StatementContext,
6625 object_type: ObjectType,
6626 name: UnresolvedItemName,
6627 to_item_name: Ident,
6628 if_exists: bool,
6629) -> Result<Plan, PlanError> {
6630 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6631 Ok(r) => r,
6632 Err(PlanError::MismatchedObjectType {
6634 name,
6635 is_type: ObjectType::MaterializedView,
6636 expected_type: ObjectType::View,
6637 }) => {
6638 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6639 }
6640 e => e?,
6641 };
6642
6643 match resolved {
6644 Some(entry) => {
6645 let full_name = scx.catalog.resolve_full_name(entry.name());
6646 let item_type = entry.item_type();
6647
6648 let proposed_name = QualifiedItemName {
6649 qualifiers: entry.name().qualifiers.clone(),
6650 item: to_item_name.clone().into_string(),
6651 };
6652
6653 let conflicting_type_exists;
6657 let conflicting_item_exists;
6658 if item_type == CatalogItemType::Type {
6659 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6660 conflicting_item_exists = scx
6661 .catalog
6662 .get_item_by_name(&proposed_name)
6663 .map(|item| item.item_type().conflicts_with_type())
6664 .unwrap_or(false);
6665 } else {
6666 conflicting_type_exists = item_type.conflicts_with_type()
6667 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6668 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6669 };
6670 if conflicting_type_exists || conflicting_item_exists {
6671 sql_bail!("catalog item '{}' already exists", to_item_name);
6672 }
6673
6674 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6675 id: entry.id(),
6676 current_full_name: full_name,
6677 to_name: normalize::ident(to_item_name),
6678 object_type,
6679 }))
6680 }
6681 None => {
6682 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6683 name: name.to_ast_string_simple(),
6684 object_type,
6685 });
6686
6687 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6688 }
6689 }
6690}
6691
6692pub fn plan_alter_cluster_rename(
6693 scx: &mut StatementContext,
6694 object_type: ObjectType,
6695 name: Ident,
6696 to_name: Ident,
6697 if_exists: bool,
6698) -> Result<Plan, PlanError> {
6699 match resolve_cluster(scx, &name, if_exists)? {
6700 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6701 id: entry.id(),
6702 name: entry.name().to_string(),
6703 to_name: ident(to_name),
6704 })),
6705 None => {
6706 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6707 name: name.to_ast_string_simple(),
6708 object_type,
6709 });
6710
6711 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6712 }
6713 }
6714}
6715
6716pub fn plan_alter_cluster_swap<F>(
6717 scx: &mut StatementContext,
6718 name_a: Ident,
6719 name_b: Ident,
6720 gen_temp_suffix: F,
6721) -> Result<Plan, PlanError>
6722where
6723 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6724{
6725 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6726 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6727
6728 let check = |temp_suffix: &str| {
6729 let mut temp_name = ident!("mz_schema_swap_");
6730 temp_name.append_lossy(temp_suffix);
6731 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6732 Err(CatalogError::UnknownCluster(_)) => true,
6734 Ok(_) | Err(_) => false,
6736 }
6737 };
6738 let temp_suffix = gen_temp_suffix(&check)?;
6739 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6740
6741 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6742 id_a: cluster_a.id(),
6743 id_b: cluster_b.id(),
6744 name_a: name_a.into_string(),
6745 name_b: name_b.into_string(),
6746 name_temp,
6747 }))
6748}
6749
6750pub fn plan_alter_cluster_replica_rename(
6751 scx: &mut StatementContext,
6752 object_type: ObjectType,
6753 name: QualifiedReplica,
6754 to_item_name: Ident,
6755 if_exists: bool,
6756) -> Result<Plan, PlanError> {
6757 match resolve_cluster_replica(scx, &name, if_exists)? {
6758 Some((cluster, replica)) => {
6759 ensure_cluster_is_not_managed(scx, cluster.id())?;
6760 Ok(Plan::AlterClusterReplicaRename(
6761 AlterClusterReplicaRenamePlan {
6762 cluster_id: cluster.id(),
6763 replica_id: replica,
6764 name: QualifiedReplica {
6765 cluster: Ident::new(cluster.name())?,
6766 replica: name.replica,
6767 },
6768 to_name: normalize::ident(to_item_name),
6769 },
6770 ))
6771 }
6772 None => {
6773 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6774 name: name.to_ast_string_simple(),
6775 object_type,
6776 });
6777
6778 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6779 }
6780 }
6781}
6782
6783pub fn describe_alter_object_swap(
6784 _: &StatementContext,
6785 _: AlterObjectSwapStatement,
6786) -> Result<StatementDesc, PlanError> {
6787 Ok(StatementDesc::new(None))
6788}
6789
6790pub fn plan_alter_object_swap(
6791 scx: &mut StatementContext,
6792 stmt: AlterObjectSwapStatement,
6793) -> Result<Plan, PlanError> {
6794 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6795
6796 let AlterObjectSwapStatement {
6797 object_type,
6798 name_a,
6799 name_b,
6800 } = stmt;
6801 let object_type = object_type.into();
6802
6803 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6805 let mut attempts = 0;
6806 let name_temp = loop {
6807 attempts += 1;
6808 if attempts > 10 {
6809 tracing::warn!("Unable to generate temp id for swapping");
6810 sql_bail!("unable to swap!");
6811 }
6812
6813 let short_id = mz_ore::id_gen::temp_id();
6815 if check_fn(&short_id) {
6816 break short_id;
6817 }
6818 };
6819
6820 Ok(name_temp)
6821 };
6822
6823 match (object_type, name_a, name_b) {
6824 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6825 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6826 }
6827 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6828 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6829 }
6830 (object_type, _, _) => Err(PlanError::Unsupported {
6831 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6832 discussion_no: None,
6833 }),
6834 }
6835}
6836
6837pub fn describe_alter_retain_history(
6838 _: &StatementContext,
6839 _: AlterRetainHistoryStatement<Aug>,
6840) -> Result<StatementDesc, PlanError> {
6841 Ok(StatementDesc::new(None))
6842}
6843
6844pub fn plan_alter_retain_history(
6845 scx: &StatementContext,
6846 AlterRetainHistoryStatement {
6847 object_type,
6848 if_exists,
6849 name,
6850 history,
6851 }: AlterRetainHistoryStatement<Aug>,
6852) -> Result<Plan, PlanError> {
6853 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6854}
6855
6856fn alter_retain_history(
6857 scx: &StatementContext,
6858 object_type: ObjectType,
6859 if_exists: bool,
6860 name: UnresolvedObjectName,
6861 history: Option<WithOptionValue<Aug>>,
6862) -> Result<Plan, PlanError> {
6863 let name = match (object_type, name) {
6864 (
6865 ObjectType::View
6867 | ObjectType::MaterializedView
6868 | ObjectType::Table
6869 | ObjectType::Source
6870 | ObjectType::Index,
6871 UnresolvedObjectName::Item(name),
6872 ) => name,
6873 (object_type, _) => {
6874 sql_bail!("{object_type} does not support RETAIN HISTORY")
6875 }
6876 };
6877 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6878 Some(entry) => {
6879 let full_name = scx.catalog.resolve_full_name(entry.name());
6880 let item_type = entry.item_type();
6881
6882 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6884 return Err(PlanError::AlterViewOnMaterializedView(
6885 full_name.to_string(),
6886 ));
6887 } else if object_type == ObjectType::View {
6888 sql_bail!("{object_type} does not support RETAIN HISTORY")
6889 } else if object_type != item_type {
6890 sql_bail!(
6891 "\"{}\" is a {} not a {}",
6892 full_name,
6893 entry.item_type(),
6894 format!("{object_type}").to_lowercase()
6895 )
6896 }
6897
6898 let (value, lcw) = match &history {
6900 Some(WithOptionValue::RetainHistoryFor(value)) => {
6901 let window = OptionalDuration::try_from_value(value.clone())?;
6902 (Some(value.clone()), window.0)
6903 }
6904 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6906 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6907 };
6908 let window = plan_retain_history(scx, lcw)?;
6909
6910 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6911 id: entry.id(),
6912 value,
6913 window,
6914 object_type,
6915 }))
6916 }
6917 None => {
6918 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6919 name: name.to_ast_string_simple(),
6920 object_type,
6921 });
6922
6923 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6924 }
6925 }
6926}
6927
6928pub fn describe_alter_secret_options(
6929 _: &StatementContext,
6930 _: AlterSecretStatement<Aug>,
6931) -> Result<StatementDesc, PlanError> {
6932 Ok(StatementDesc::new(None))
6933}
6934
6935pub fn plan_alter_secret(
6936 scx: &mut StatementContext,
6937 stmt: AlterSecretStatement<Aug>,
6938) -> Result<Plan, PlanError> {
6939 let AlterSecretStatement {
6940 name,
6941 if_exists,
6942 value,
6943 } = stmt;
6944 let object_type = ObjectType::Secret;
6945 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6946 Some(entry) => entry.id(),
6947 None => {
6948 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6949 name: name.to_string(),
6950 object_type,
6951 });
6952
6953 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6954 }
6955 };
6956
6957 let secret_as = query::plan_secret_as(scx, value)?;
6958
6959 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6960}
6961
6962pub fn describe_alter_connection(
6963 _: &StatementContext,
6964 _: AlterConnectionStatement<Aug>,
6965) -> Result<StatementDesc, PlanError> {
6966 Ok(StatementDesc::new(None))
6967}
6968
6969generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6970
6971pub fn plan_alter_connection(
6972 scx: &StatementContext,
6973 stmt: AlterConnectionStatement<Aug>,
6974) -> Result<Plan, PlanError> {
6975 let AlterConnectionStatement {
6976 name,
6977 if_exists,
6978 actions,
6979 with_options,
6980 } = stmt;
6981 let conn_name = normalize::unresolved_item_name(name)?;
6982 let entry = match scx.catalog.resolve_item(&conn_name) {
6983 Ok(entry) => entry,
6984 Err(_) if if_exists => {
6985 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6986 name: conn_name.to_string(),
6987 object_type: ObjectType::Sink,
6988 });
6989
6990 return Ok(Plan::AlterNoop(AlterNoopPlan {
6991 object_type: ObjectType::Connection,
6992 }));
6993 }
6994 Err(e) => return Err(e.into()),
6995 };
6996
6997 let connection = entry.connection()?;
6998
6999 if actions
7000 .iter()
7001 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
7002 {
7003 if actions.len() > 1 {
7004 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
7005 }
7006
7007 if !with_options.is_empty() {
7008 sql_bail!(
7009 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
7010 with_options
7011 .iter()
7012 .map(|o| o.to_ast_string_simple())
7013 .join(", ")
7014 );
7015 }
7016
7017 if !matches!(connection, Connection::Ssh(_)) {
7018 sql_bail!(
7019 "{} is not an SSH connection",
7020 scx.catalog.resolve_full_name(entry.name())
7021 )
7022 }
7023
7024 return Ok(Plan::AlterConnection(AlterConnectionPlan {
7025 id: entry.id(),
7026 action: crate::plan::AlterConnectionAction::RotateKeys,
7027 }));
7028 }
7029
7030 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
7031 if options.validate.is_some() {
7032 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7033 }
7034
7035 let validate = match options.validate {
7036 Some(val) => val,
7037 None => {
7038 scx.catalog
7039 .system_vars()
7040 .enable_default_connection_validation()
7041 && connection.validate_by_default()
7042 }
7043 };
7044
7045 let connection_type = match connection {
7046 Connection::Aws(_) => CreateConnectionType::Aws,
7047 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7048 Connection::Kafka(_) => CreateConnectionType::Kafka,
7049 Connection::Csr(_) => CreateConnectionType::Csr,
7050 Connection::Postgres(_) => CreateConnectionType::Postgres,
7051 Connection::Ssh(_) => CreateConnectionType::Ssh,
7052 Connection::MySql(_) => CreateConnectionType::MySql,
7053 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7054 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7055 };
7056
7057 let specified_options: BTreeSet<_> = actions
7059 .iter()
7060 .map(|action: &AlterConnectionAction<Aug>| match action {
7061 AlterConnectionAction::SetOption(option) => option.name.clone(),
7062 AlterConnectionAction::DropOption(name) => name.clone(),
7063 AlterConnectionAction::RotateKeys => unreachable!(),
7064 })
7065 .collect();
7066
7067 for invalid in INALTERABLE_OPTIONS {
7068 if specified_options.contains(invalid) {
7069 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7070 }
7071 }
7072
7073 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7074
7075 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7077 actions.into_iter().partition_map(|action| match action {
7078 AlterConnectionAction::SetOption(option) => Either::Left(option),
7079 AlterConnectionAction::DropOption(name) => Either::Right(name),
7080 AlterConnectionAction::RotateKeys => unreachable!(),
7081 });
7082
7083 let set_options: BTreeMap<_, _> = set_options_vec
7084 .clone()
7085 .into_iter()
7086 .map(|option| (option.name, option.value))
7087 .collect();
7088
7089 let connection_options_extracted =
7093 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7094
7095 let duplicates: Vec<_> = connection_options_extracted
7096 .seen
7097 .intersection(&drop_options)
7098 .collect();
7099
7100 if !duplicates.is_empty() {
7101 sql_bail!(
7102 "cannot both SET and DROP/RESET options {}",
7103 duplicates
7104 .iter()
7105 .map(|option| option.to_string())
7106 .join(", ")
7107 )
7108 }
7109
7110 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7111 let set_options_count = mutually_exclusive_options
7112 .iter()
7113 .filter(|o| set_options.contains_key(o))
7114 .count();
7115 let drop_options_count = mutually_exclusive_options
7116 .iter()
7117 .filter(|o| drop_options.contains(o))
7118 .count();
7119
7120 if set_options_count > 0 && drop_options_count > 0 {
7122 sql_bail!(
7123 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7124 connection_type,
7125 mutually_exclusive_options
7126 .iter()
7127 .map(|option| option.to_string())
7128 .join(", ")
7129 )
7130 }
7131
7132 if set_options_count > 0 || drop_options_count > 0 {
7137 drop_options.extend(mutually_exclusive_options.iter().cloned());
7138 }
7139
7140 }
7143
7144 Ok(Plan::AlterConnection(AlterConnectionPlan {
7145 id: entry.id(),
7146 action: crate::plan::AlterConnectionAction::AlterOptions {
7147 set_options,
7148 drop_options,
7149 validate,
7150 },
7151 }))
7152}
7153
7154pub fn describe_alter_sink(
7155 _: &StatementContext,
7156 _: AlterSinkStatement<Aug>,
7157) -> Result<StatementDesc, PlanError> {
7158 Ok(StatementDesc::new(None))
7159}
7160
7161pub fn plan_alter_sink(
7162 scx: &mut StatementContext,
7163 stmt: AlterSinkStatement<Aug>,
7164) -> Result<Plan, PlanError> {
7165 let AlterSinkStatement {
7166 sink_name,
7167 if_exists,
7168 action,
7169 } = stmt;
7170
7171 let object_type = ObjectType::Sink;
7172 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7173
7174 let Some(item) = item else {
7175 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7176 name: sink_name.to_string(),
7177 object_type,
7178 });
7179
7180 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7181 };
7182 let item = item.at_version(RelationVersionSelector::Latest);
7184
7185 match action {
7186 AlterSinkAction::ChangeRelation(new_from) => {
7187 let create_sql = item.create_sql();
7189 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7190 let [stmt]: [StatementParseResult; 1] = stmts
7191 .try_into()
7192 .expect("create sql of sink was not exactly one statement");
7193 let Statement::CreateSink(stmt) = stmt.ast else {
7194 unreachable!("invalid create SQL for sink item");
7195 };
7196
7197 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7199 stmt.from = new_from;
7200
7201 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7203 unreachable!("invalid plan for CREATE SINK statement");
7204 };
7205
7206 plan.sink.version += 1;
7207
7208 Ok(Plan::AlterSink(AlterSinkPlan {
7209 item_id: item.id(),
7210 global_id: item.global_id(),
7211 sink: plan.sink,
7212 with_snapshot: plan.with_snapshot,
7213 in_cluster: plan.in_cluster,
7214 }))
7215 }
7216 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7217 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7218 }
7219}
7220
7221pub fn describe_alter_source(
7222 _: &StatementContext,
7223 _: AlterSourceStatement<Aug>,
7224) -> Result<StatementDesc, PlanError> {
7225 Ok(StatementDesc::new(None))
7227}
7228
7229generate_extracted_config!(
7230 AlterSourceAddSubsourceOption,
7231 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7232 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7233 (Details, String)
7234);
7235
7236pub fn plan_alter_source(
7237 scx: &mut StatementContext,
7238 stmt: AlterSourceStatement<Aug>,
7239) -> Result<Plan, PlanError> {
7240 let AlterSourceStatement {
7241 source_name,
7242 if_exists,
7243 action,
7244 } = stmt;
7245 let object_type = ObjectType::Source;
7246
7247 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7248 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7249 name: source_name.to_string(),
7250 object_type,
7251 });
7252
7253 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7254 }
7255
7256 match action {
7257 AlterSourceAction::SetOptions(options) => {
7258 let mut options = options.into_iter();
7259 let option = options.next().unwrap();
7260 if option.name == CreateSourceOptionName::RetainHistory {
7261 if options.next().is_some() {
7262 sql_bail!("RETAIN HISTORY must be only option");
7263 }
7264 return alter_retain_history(
7265 scx,
7266 object_type,
7267 if_exists,
7268 UnresolvedObjectName::Item(source_name),
7269 option.value,
7270 );
7271 }
7272 sql_bail!(
7275 "Cannot modify the {} of a SOURCE.",
7276 option.name.to_ast_string_simple()
7277 );
7278 }
7279 AlterSourceAction::ResetOptions(reset) => {
7280 let mut options = reset.into_iter();
7281 let option = options.next().unwrap();
7282 if option == CreateSourceOptionName::RetainHistory {
7283 if options.next().is_some() {
7284 sql_bail!("RETAIN HISTORY must be only option");
7285 }
7286 return alter_retain_history(
7287 scx,
7288 object_type,
7289 if_exists,
7290 UnresolvedObjectName::Item(source_name),
7291 None,
7292 );
7293 }
7294 sql_bail!(
7295 "Cannot modify the {} of a SOURCE.",
7296 option.to_ast_string_simple()
7297 );
7298 }
7299 AlterSourceAction::DropSubsources { .. } => {
7300 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7301 }
7302 AlterSourceAction::AddSubsources { .. } => {
7303 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7304 }
7305 AlterSourceAction::RefreshReferences => {
7306 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7307 }
7308 };
7309}
7310
7311pub fn describe_alter_system_set(
7312 _: &StatementContext,
7313 _: AlterSystemSetStatement,
7314) -> Result<StatementDesc, PlanError> {
7315 Ok(StatementDesc::new(None))
7316}
7317
7318pub fn plan_alter_system_set(
7319 _: &StatementContext,
7320 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7321) -> Result<Plan, PlanError> {
7322 let name = name.to_string();
7323 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7324 name,
7325 value: scl::plan_set_variable_to(to)?,
7326 }))
7327}
7328
7329pub fn describe_alter_system_reset(
7330 _: &StatementContext,
7331 _: AlterSystemResetStatement,
7332) -> Result<StatementDesc, PlanError> {
7333 Ok(StatementDesc::new(None))
7334}
7335
7336pub fn plan_alter_system_reset(
7337 _: &StatementContext,
7338 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7339) -> Result<Plan, PlanError> {
7340 let name = name.to_string();
7341 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7342}
7343
7344pub fn describe_alter_system_reset_all(
7345 _: &StatementContext,
7346 _: AlterSystemResetAllStatement,
7347) -> Result<StatementDesc, PlanError> {
7348 Ok(StatementDesc::new(None))
7349}
7350
7351pub fn plan_alter_system_reset_all(
7352 _: &StatementContext,
7353 _: AlterSystemResetAllStatement,
7354) -> Result<Plan, PlanError> {
7355 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7356}
7357
7358pub fn describe_alter_role(
7359 _: &StatementContext,
7360 _: AlterRoleStatement<Aug>,
7361) -> Result<StatementDesc, PlanError> {
7362 Ok(StatementDesc::new(None))
7363}
7364
7365pub fn plan_alter_role(
7366 scx: &StatementContext,
7367 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7368) -> Result<Plan, PlanError> {
7369 let option = match option {
7370 AlterRoleOption::Attributes(attrs) => {
7371 let attrs = plan_role_attributes(attrs, scx)?;
7372 PlannedAlterRoleOption::Attributes(attrs)
7373 }
7374 AlterRoleOption::Variable(variable) => {
7375 let var = plan_role_variable(variable)?;
7376 PlannedAlterRoleOption::Variable(var)
7377 }
7378 };
7379
7380 Ok(Plan::AlterRole(AlterRolePlan {
7381 id: name.id,
7382 name: name.name,
7383 option,
7384 }))
7385}
7386
7387pub fn describe_alter_table_add_column(
7388 _: &StatementContext,
7389 _: AlterTableAddColumnStatement<Aug>,
7390) -> Result<StatementDesc, PlanError> {
7391 Ok(StatementDesc::new(None))
7392}
7393
7394pub fn plan_alter_table_add_column(
7395 scx: &StatementContext,
7396 stmt: AlterTableAddColumnStatement<Aug>,
7397) -> Result<Plan, PlanError> {
7398 let AlterTableAddColumnStatement {
7399 if_exists,
7400 name,
7401 if_col_not_exist,
7402 column_name,
7403 data_type,
7404 } = stmt;
7405 let object_type = ObjectType::Table;
7406
7407 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7408
7409 let (relation_id, item_name, desc) =
7410 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7411 Some(item) => {
7412 let item_name = scx.catalog.resolve_full_name(item.name());
7414 let item = item.at_version(RelationVersionSelector::Latest);
7415 let desc = item.relation_desc().expect("table has desc").into_owned();
7416 (item.id(), item_name, desc)
7417 }
7418 None => {
7419 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7420 name: name.to_ast_string_simple(),
7421 object_type,
7422 });
7423 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7424 }
7425 };
7426
7427 let column_name = ColumnName::from(column_name.as_str());
7428 if desc.get_by_name(&column_name).is_some() {
7429 if if_col_not_exist {
7430 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7431 column_name: column_name.to_string(),
7432 object_name: item_name.item,
7433 });
7434 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7435 } else {
7436 return Err(PlanError::ColumnAlreadyExists {
7437 column_name,
7438 object_name: item_name.item,
7439 });
7440 }
7441 }
7442
7443 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7444 let column_type = scalar_type.nullable(true);
7446 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7448
7449 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7450 relation_id,
7451 column_name,
7452 column_type,
7453 raw_sql_type,
7454 }))
7455}
7456
7457pub fn describe_alter_materialized_view_apply_replacement(
7458 _: &StatementContext,
7459 _: AlterMaterializedViewApplyReplacementStatement,
7460) -> Result<StatementDesc, PlanError> {
7461 Ok(StatementDesc::new(None))
7462}
7463
7464pub fn plan_alter_materialized_view_apply_replacement(
7465 scx: &StatementContext,
7466 stmt: AlterMaterializedViewApplyReplacementStatement,
7467) -> Result<Plan, PlanError> {
7468 let AlterMaterializedViewApplyReplacementStatement {
7469 if_exists,
7470 name,
7471 replacement_name,
7472 } = stmt;
7473
7474 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7475
7476 let object_type = ObjectType::MaterializedView;
7477 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7478 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7479 name: name.to_ast_string_simple(),
7480 object_type,
7481 });
7482 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7483 };
7484
7485 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7486 .expect("if_exists not set");
7487
7488 if replacement.replacement_target() != Some(mv.id()) {
7489 return Err(PlanError::InvalidReplacement {
7490 item_type: mv.item_type(),
7491 item_name: scx.catalog.minimal_qualification(mv.name()),
7492 replacement_type: replacement.item_type(),
7493 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7494 });
7495 }
7496
7497 Ok(Plan::AlterMaterializedViewApplyReplacement(
7498 AlterMaterializedViewApplyReplacementPlan {
7499 id: mv.id(),
7500 replacement_id: replacement.id(),
7501 },
7502 ))
7503}
7504
7505pub fn describe_comment(
7506 _: &StatementContext,
7507 _: CommentStatement<Aug>,
7508) -> Result<StatementDesc, PlanError> {
7509 Ok(StatementDesc::new(None))
7510}
7511
7512pub fn plan_comment(
7513 scx: &mut StatementContext,
7514 stmt: CommentStatement<Aug>,
7515) -> Result<Plan, PlanError> {
7516 const MAX_COMMENT_LENGTH: usize = 1024;
7517
7518 let CommentStatement { object, comment } = stmt;
7519
7520 if let Some(c) = &comment {
7522 if c.len() > 1024 {
7523 return Err(PlanError::CommentTooLong {
7524 length: c.len(),
7525 max_size: MAX_COMMENT_LENGTH,
7526 });
7527 }
7528 }
7529
7530 let (object_id, column_pos) = match &object {
7531 com_ty @ CommentObjectType::Table { name }
7532 | com_ty @ CommentObjectType::View { name }
7533 | com_ty @ CommentObjectType::MaterializedView { name }
7534 | com_ty @ CommentObjectType::Index { name }
7535 | com_ty @ CommentObjectType::Func { name }
7536 | com_ty @ CommentObjectType::Connection { name }
7537 | com_ty @ CommentObjectType::Source { name }
7538 | com_ty @ CommentObjectType::Sink { name }
7539 | com_ty @ CommentObjectType::Secret { name }
7540 | com_ty @ CommentObjectType::ContinualTask { name } => {
7541 let item = scx.get_item_by_resolved_name(name)?;
7542 match (com_ty, item.item_type()) {
7543 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7544 (CommentObjectId::Table(item.id()), None)
7545 }
7546 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7547 (CommentObjectId::View(item.id()), None)
7548 }
7549 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7550 (CommentObjectId::MaterializedView(item.id()), None)
7551 }
7552 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7553 (CommentObjectId::Index(item.id()), None)
7554 }
7555 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7556 (CommentObjectId::Func(item.id()), None)
7557 }
7558 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7559 (CommentObjectId::Connection(item.id()), None)
7560 }
7561 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7562 (CommentObjectId::Source(item.id()), None)
7563 }
7564 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7565 (CommentObjectId::Sink(item.id()), None)
7566 }
7567 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7568 (CommentObjectId::Secret(item.id()), None)
7569 }
7570 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7571 (CommentObjectId::ContinualTask(item.id()), None)
7572 }
7573 (com_ty, cat_ty) => {
7574 let expected_type = match com_ty {
7575 CommentObjectType::Table { .. } => ObjectType::Table,
7576 CommentObjectType::View { .. } => ObjectType::View,
7577 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7578 CommentObjectType::Index { .. } => ObjectType::Index,
7579 CommentObjectType::Func { .. } => ObjectType::Func,
7580 CommentObjectType::Connection { .. } => ObjectType::Connection,
7581 CommentObjectType::Source { .. } => ObjectType::Source,
7582 CommentObjectType::Sink { .. } => ObjectType::Sink,
7583 CommentObjectType::Secret { .. } => ObjectType::Secret,
7584 _ => unreachable!("these are the only types we match on"),
7585 };
7586
7587 return Err(PlanError::InvalidObjectType {
7588 expected_type: SystemObjectType::Object(expected_type),
7589 actual_type: SystemObjectType::Object(cat_ty.into()),
7590 object_name: item.name().item.clone(),
7591 });
7592 }
7593 }
7594 }
7595 CommentObjectType::Type { ty } => match ty {
7596 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7597 sql_bail!("cannot comment on anonymous list or map type");
7598 }
7599 ResolvedDataType::Named { id, modifiers, .. } => {
7600 if !modifiers.is_empty() {
7601 sql_bail!("cannot comment on type with modifiers");
7602 }
7603 (CommentObjectId::Type(*id), None)
7604 }
7605 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7606 },
7607 CommentObjectType::Column { name } => {
7608 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7609 match item.item_type() {
7610 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7611 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7612 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7613 CatalogItemType::MaterializedView => {
7614 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7615 }
7616 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7617 r => {
7618 return Err(PlanError::Unsupported {
7619 feature: format!("Specifying comments on a column of {r}"),
7620 discussion_no: None,
7621 });
7622 }
7623 }
7624 }
7625 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7626 CommentObjectType::Database { name } => {
7627 (CommentObjectId::Database(*name.database_id()), None)
7628 }
7629 CommentObjectType::Schema { name } => {
7630 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7634 sql_bail!(
7635 "cannot comment on schema {} because it is a temporary schema",
7636 mz_repr::namespaces::MZ_TEMP_SCHEMA
7637 );
7638 }
7639 (
7640 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7641 None,
7642 )
7643 }
7644 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7645 CommentObjectType::ClusterReplica { name } => {
7646 let replica = scx.catalog.resolve_cluster_replica(name)?;
7647 (
7648 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7649 None,
7650 )
7651 }
7652 CommentObjectType::NetworkPolicy { name } => {
7653 (CommentObjectId::NetworkPolicy(name.id), None)
7654 }
7655 };
7656
7657 if let Some(p) = column_pos {
7663 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7664 max_num_columns: MAX_NUM_COLUMNS,
7665 req_num_columns: p,
7666 })?;
7667 }
7668
7669 Ok(Plan::Comment(CommentPlan {
7670 object_id,
7671 sub_component: column_pos,
7672 comment,
7673 }))
7674}
7675
7676pub(crate) fn resolve_cluster<'a>(
7677 scx: &'a StatementContext,
7678 name: &'a Ident,
7679 if_exists: bool,
7680) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7681 match scx.resolve_cluster(Some(name)) {
7682 Ok(cluster) => Ok(Some(cluster)),
7683 Err(_) if if_exists => Ok(None),
7684 Err(e) => Err(e),
7685 }
7686}
7687
7688pub(crate) fn resolve_cluster_replica<'a>(
7689 scx: &'a StatementContext,
7690 name: &QualifiedReplica,
7691 if_exists: bool,
7692) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7693 match scx.resolve_cluster(Some(&name.cluster)) {
7694 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7695 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7696 None if if_exists => Ok(None),
7697 None => Err(sql_err!(
7698 "CLUSTER {} has no CLUSTER REPLICA named {}",
7699 cluster.name(),
7700 name.replica.as_str().quoted(),
7701 )),
7702 },
7703 Err(_) if if_exists => Ok(None),
7704 Err(e) => Err(e),
7705 }
7706}
7707
7708pub(crate) fn resolve_database<'a>(
7709 scx: &'a StatementContext,
7710 name: &'a UnresolvedDatabaseName,
7711 if_exists: bool,
7712) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7713 match scx.resolve_database(name) {
7714 Ok(database) => Ok(Some(database)),
7715 Err(_) if if_exists => Ok(None),
7716 Err(e) => Err(e),
7717 }
7718}
7719
7720pub(crate) fn resolve_schema<'a>(
7721 scx: &'a StatementContext,
7722 name: UnresolvedSchemaName,
7723 if_exists: bool,
7724) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7725 match scx.resolve_schema(name) {
7726 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7727 Err(_) if if_exists => Ok(None),
7728 Err(e) => Err(e),
7729 }
7730}
7731
7732pub(crate) fn resolve_network_policy<'a>(
7733 scx: &'a StatementContext,
7734 name: Ident,
7735 if_exists: bool,
7736) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7737 match scx.catalog.resolve_network_policy(&name.to_string()) {
7738 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7739 id: policy.id(),
7740 name: policy.name().to_string(),
7741 })),
7742 Err(_) if if_exists => Ok(None),
7743 Err(e) => Err(e.into()),
7744 }
7745}
7746
7747pub(crate) fn resolve_item_or_type<'a>(
7748 scx: &'a StatementContext,
7749 object_type: ObjectType,
7750 name: UnresolvedItemName,
7751 if_exists: bool,
7752) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7753 let name = normalize::unresolved_item_name(name)?;
7754 let catalog_item = match object_type {
7755 ObjectType::Type => scx.catalog.resolve_type(&name),
7756 _ => scx.catalog.resolve_item(&name),
7757 };
7758
7759 match catalog_item {
7760 Ok(item) => {
7761 let is_type = ObjectType::from(item.item_type());
7762 if object_type == is_type {
7763 Ok(Some(item))
7764 } else {
7765 Err(PlanError::MismatchedObjectType {
7766 name: scx.catalog.minimal_qualification(item.name()),
7767 is_type,
7768 expected_type: object_type,
7769 })
7770 }
7771 }
7772 Err(_) if if_exists => Ok(None),
7773 Err(e) => Err(e.into()),
7774 }
7775}
7776
7777fn ensure_cluster_is_not_managed(
7779 scx: &StatementContext,
7780 cluster_id: ClusterId,
7781) -> Result<(), PlanError> {
7782 let cluster = scx.catalog.get_cluster(cluster_id);
7783 if cluster.is_managed() {
7784 Err(PlanError::ManagedCluster {
7785 cluster_name: cluster.name().to_string(),
7786 })
7787 } else {
7788 Ok(())
7789 }
7790}