1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::{Either, Itertools};
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName,
59 CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption,
60 CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType,
61 CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement,
62 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
63 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
64 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
65 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
66 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
67 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
68 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
69 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
70 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
71 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
72 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
91 SinkEnvelope, StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::{
115 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
116};
117use mz_storage_types::sources::{
118 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
119 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
120 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
121 SqlServerSourceExtras, Timeline,
122};
123use prost::Message;
124
125use crate::ast::display::AstDisplay;
126use crate::catalog::{
127 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
128 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
129};
130use crate::iceberg::IcebergSinkConfigOptionExtracted;
131use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132use crate::names::{
133 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136};
137use crate::normalize::{self, ident};
138use crate::plan::error::PlanError;
139use crate::plan::query::{
140 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
141 scalar_type_from_sql,
142};
143use crate::plan::scope::Scope;
144use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
145use crate::plan::statement::{StatementContext, StatementDesc, scl};
146use crate::plan::typeconv::CastContext;
147use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
148use crate::plan::{
149 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
150 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
151 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
152 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
153 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
154 AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan,
155 ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig,
156 ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan,
157 CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan,
158 CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
159 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
160 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
161 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
162 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
163 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
164 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
165 WebhookValidation, literal, plan_utils, query, transform_ast,
166};
167use crate::session::vars::{
168 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
169 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
170};
171use crate::{names, parse};
172
173mod connection;
174
175const MAX_NUM_COLUMNS: usize = 256;
180
181static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
182 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
183
184fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
188 if partition_by.len() > desc.len() {
189 tracing::error!(
190 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
191 desc.len(),
192 partition_by.len()
193 );
194 partition_by.truncate(desc.len());
195 }
196
197 let desc_prefix = desc.iter().take(partition_by.len());
198 for (idx, ((desc_name, desc_type), partition_name)) in
199 desc_prefix.zip_eq(partition_by).enumerate()
200 {
201 let partition_name = normalize::column_name(partition_name);
202 if *desc_name != partition_name {
203 sql_bail!(
204 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
205 );
206 }
207 if !preserves_order(&desc_type.scalar_type) {
208 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
209 }
210 }
211 Ok(())
212}
213
214pub fn describe_create_database(
215 _: &StatementContext,
216 _: CreateDatabaseStatement,
217) -> Result<StatementDesc, PlanError> {
218 Ok(StatementDesc::new(None))
219}
220
221pub fn plan_create_database(
222 _: &StatementContext,
223 CreateDatabaseStatement {
224 name,
225 if_not_exists,
226 }: CreateDatabaseStatement,
227) -> Result<Plan, PlanError> {
228 Ok(Plan::CreateDatabase(CreateDatabasePlan {
229 name: normalize::ident(name.0),
230 if_not_exists,
231 }))
232}
233
234pub fn describe_create_schema(
235 _: &StatementContext,
236 _: CreateSchemaStatement,
237) -> Result<StatementDesc, PlanError> {
238 Ok(StatementDesc::new(None))
239}
240
241pub fn plan_create_schema(
242 scx: &StatementContext,
243 CreateSchemaStatement {
244 mut name,
245 if_not_exists,
246 }: CreateSchemaStatement,
247) -> Result<Plan, PlanError> {
248 if name.0.len() > 2 {
249 sql_bail!("schema name {} has more than two components", name);
250 }
251 let schema_name = normalize::ident(
252 name.0
253 .pop()
254 .expect("names always have at least one component"),
255 );
256 let database_spec = match name.0.pop() {
257 None => match scx.catalog.active_database() {
258 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
259 None => sql_bail!("no database specified and no active database"),
260 },
261 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
262 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
263 Err(_) => sql_bail!("invalid database {}", n.as_str()),
264 },
265 };
266 Ok(Plan::CreateSchema(CreateSchemaPlan {
267 database_spec,
268 schema_name,
269 if_not_exists,
270 }))
271}
272
273pub fn describe_create_table(
274 _: &StatementContext,
275 _: CreateTableStatement<Aug>,
276) -> Result<StatementDesc, PlanError> {
277 Ok(StatementDesc::new(None))
278}
279
280pub fn plan_create_table(
281 scx: &StatementContext,
282 stmt: CreateTableStatement<Aug>,
283) -> Result<Plan, PlanError> {
284 let CreateTableStatement {
285 name,
286 columns,
287 constraints,
288 if_not_exists,
289 temporary,
290 with_options,
291 } = &stmt;
292
293 let names: Vec<_> = columns
294 .iter()
295 .filter(|c| {
296 let is_versioned = c
300 .options
301 .iter()
302 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
303 !is_versioned
304 })
305 .map(|c| normalize::column_name(c.name.clone()))
306 .collect();
307
308 if let Some(dup) = names.iter().duplicates().next() {
309 sql_bail!("column {} specified more than once", dup.quoted());
310 }
311
312 let mut column_types = Vec::with_capacity(columns.len());
315 let mut defaults = Vec::with_capacity(columns.len());
316 let mut changes = BTreeMap::new();
317 let mut keys = Vec::new();
318
319 for (i, c) in columns.into_iter().enumerate() {
320 let aug_data_type = &c.data_type;
321 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
322 let mut nullable = true;
323 let mut default = Expr::null();
324 let mut versioned = false;
325 for option in &c.options {
326 match &option.option {
327 ColumnOption::NotNull => nullable = false,
328 ColumnOption::Default(expr) => {
329 let mut expr = expr.clone();
332 transform_ast::transform(scx, &mut expr)?;
333 let _ = query::plan_default_expr(scx, &expr, &ty)?;
334 default = expr.clone();
335 }
336 ColumnOption::Unique { is_primary } => {
337 keys.push(vec![i]);
338 if *is_primary {
339 nullable = false;
340 }
341 }
342 ColumnOption::Versioned { action, version } => {
343 let version = RelationVersion::from(*version);
344 versioned = true;
345
346 let name = normalize::column_name(c.name.clone());
347 let typ = ty.clone().nullable(nullable);
348
349 changes.insert(version, (action.clone(), name, typ));
350 }
351 other => {
352 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
353 }
354 }
355 }
356 if !versioned {
359 column_types.push(ty.nullable(nullable));
360 }
361 defaults.push(default);
362 }
363
364 let mut seen_primary = false;
365 'c: for constraint in constraints {
366 match constraint {
367 TableConstraint::Unique {
368 name: _,
369 columns,
370 is_primary,
371 nulls_not_distinct,
372 } => {
373 if seen_primary && *is_primary {
374 sql_bail!(
375 "multiple primary keys for table {} are not allowed",
376 name.to_ast_string_stable()
377 );
378 }
379 seen_primary = *is_primary || seen_primary;
380
381 let mut key = vec![];
382 for column in columns {
383 let column = normalize::column_name(column.clone());
384 match names.iter().position(|name| *name == column) {
385 None => sql_bail!("unknown column in constraint: {}", column),
386 Some(i) => {
387 let nullable = &mut column_types[i].nullable;
388 if *is_primary {
389 if *nulls_not_distinct {
390 sql_bail!(
391 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
392 );
393 }
394
395 *nullable = false;
396 } else if !(*nulls_not_distinct || !*nullable) {
397 break 'c;
400 }
401
402 key.push(i);
403 }
404 }
405 }
406
407 if *is_primary {
408 keys.insert(0, key);
409 } else {
410 keys.push(key);
411 }
412 }
413 TableConstraint::ForeignKey { .. } => {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
417 }
418 TableConstraint::Check { .. } => {
419 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
422 }
423 }
424 }
425
426 if !keys.is_empty() {
427 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
430 }
431
432 let typ = SqlRelationType::new(column_types).with_keys(keys);
433
434 let temporary = *temporary;
435 let name = if temporary {
436 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
437 } else {
438 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
439 };
440
441 let full_name = scx.catalog.resolve_full_name(&name);
443 let partial_name = PartialItemName::from(full_name.clone());
444 if let (false, Ok(item)) = (
447 if_not_exists,
448 scx.catalog.resolve_item_or_type(&partial_name),
449 ) {
450 return Err(PlanError::ItemAlreadyExists {
451 name: full_name.to_string(),
452 item_type: item.item_type(),
453 });
454 }
455
456 let desc = RelationDesc::new(typ, names);
457 let mut desc = VersionedRelationDesc::new(desc);
458 for (version, (_action, name, typ)) in changes.into_iter() {
459 let new_version = desc.add_column(name, typ);
460 if version != new_version {
461 return Err(PlanError::InvalidTable {
462 name: full_name.item,
463 });
464 }
465 }
466
467 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
468
469 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
475 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
476
477 let compaction_window = options.iter().find_map(|o| {
478 #[allow(irrefutable_let_patterns)]
479 if let crate::plan::TableOption::RetainHistory(lcw) = o {
480 Some(lcw.clone())
481 } else {
482 None
483 }
484 });
485
486 let table = Table {
487 create_sql,
488 desc,
489 temporary,
490 compaction_window,
491 data_source: TableDataSource::TableWrites { defaults },
492 };
493 Ok(Plan::CreateTable(CreateTablePlan {
494 name,
495 table,
496 if_not_exists: *if_not_exists,
497 }))
498}
499
500pub fn describe_create_table_from_source(
501 _: &StatementContext,
502 _: CreateTableFromSourceStatement<Aug>,
503) -> Result<StatementDesc, PlanError> {
504 Ok(StatementDesc::new(None))
505}
506
507pub fn describe_create_webhook_source(
508 _: &StatementContext,
509 _: CreateWebhookSourceStatement<Aug>,
510) -> Result<StatementDesc, PlanError> {
511 Ok(StatementDesc::new(None))
512}
513
514pub fn describe_create_source(
515 _: &StatementContext,
516 _: CreateSourceStatement<Aug>,
517) -> Result<StatementDesc, PlanError> {
518 Ok(StatementDesc::new(None))
519}
520
521pub fn describe_create_subsource(
522 _: &StatementContext,
523 _: CreateSubsourceStatement<Aug>,
524) -> Result<StatementDesc, PlanError> {
525 Ok(StatementDesc::new(None))
526}
527
528generate_extracted_config!(
529 CreateSourceOption,
530 (TimestampInterval, Duration),
531 (RetainHistory, OptionalDuration)
532);
533
534generate_extracted_config!(
535 PgConfigOption,
536 (Details, String),
537 (Publication, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542generate_extracted_config!(
543 MySqlConfigOption,
544 (Details, String),
545 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
546 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
547);
548
549generate_extracted_config!(
550 SqlServerConfigOption,
551 (Details, String),
552 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
553 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
554);
555
556pub fn plan_create_webhook_source(
557 scx: &StatementContext,
558 mut stmt: CreateWebhookSourceStatement<Aug>,
559) -> Result<Plan, PlanError> {
560 if stmt.is_table {
561 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
562 }
563
564 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
567 let enable_multi_replica_sources =
568 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
569 if !enable_multi_replica_sources {
570 if in_cluster.replica_ids().len() > 1 {
571 sql_bail!("cannot create webhook source in cluster with more than one replica")
572 }
573 }
574 let create_sql =
575 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
576
577 let CreateWebhookSourceStatement {
578 name,
579 if_not_exists,
580 body_format,
581 include_headers,
582 validate_using,
583 is_table,
584 in_cluster: _,
586 } = stmt;
587
588 let validate_using = validate_using
589 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
590 .transpose()?;
591 if let Some(WebhookValidation { expression, .. }) = &validate_using {
592 if !expression.contains_column() {
595 return Err(PlanError::WebhookValidationDoesNotUseColumns);
596 }
597 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
601 return Err(PlanError::WebhookValidationNonDeterministic);
602 }
603 }
604
605 let body_format = match body_format {
606 Format::Bytes => WebhookBodyFormat::Bytes,
607 Format::Json { array } => WebhookBodyFormat::Json { array },
608 Format::Text => WebhookBodyFormat::Text,
609 ty => {
611 return Err(PlanError::Unsupported {
612 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
613 discussion_no: None,
614 });
615 }
616 };
617
618 let mut column_ty = vec![
619 SqlColumnType {
621 scalar_type: SqlScalarType::from(body_format),
622 nullable: false,
623 },
624 ];
625 let mut column_names = vec!["body".to_string()];
626
627 let mut headers = WebhookHeaders::default();
628
629 if let Some(filters) = include_headers.column {
631 column_ty.push(SqlColumnType {
632 scalar_type: SqlScalarType::Map {
633 value_type: Box::new(SqlScalarType::String),
634 custom_id: None,
635 },
636 nullable: false,
637 });
638 column_names.push("headers".to_string());
639
640 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
641 filters.into_iter().partition_map(|filter| {
642 if filter.block {
643 itertools::Either::Right(filter.header_name)
644 } else {
645 itertools::Either::Left(filter.header_name)
646 }
647 });
648 headers.header_column = Some(WebhookHeaderFilters { allow, block });
649 }
650
651 for header in include_headers.mappings {
653 let scalar_type = header
654 .use_bytes
655 .then_some(SqlScalarType::Bytes)
656 .unwrap_or(SqlScalarType::String);
657 column_ty.push(SqlColumnType {
658 scalar_type,
659 nullable: true,
660 });
661 column_names.push(header.column_name.into_string());
662
663 let column_idx = column_ty.len() - 1;
664 assert_eq!(
666 column_idx,
667 column_names.len() - 1,
668 "header column names and types don't match"
669 );
670 headers
671 .mapped_headers
672 .insert(column_idx, (header.header_name, header.use_bytes));
673 }
674
675 let mut unique_check = HashSet::with_capacity(column_names.len());
677 for name in &column_names {
678 if !unique_check.insert(name) {
679 return Err(PlanError::AmbiguousColumn(name.clone().into()));
680 }
681 }
682 if column_names.len() > MAX_NUM_COLUMNS {
683 return Err(PlanError::TooManyColumns {
684 max_num_columns: MAX_NUM_COLUMNS,
685 req_num_columns: column_names.len(),
686 });
687 }
688
689 let typ = SqlRelationType::new(column_ty);
690 let desc = RelationDesc::new(typ, column_names);
691
692 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
694 let full_name = scx.catalog.resolve_full_name(&name);
695 let partial_name = PartialItemName::from(full_name.clone());
696 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
697 return Err(PlanError::ItemAlreadyExists {
698 name: full_name.to_string(),
699 item_type: item.item_type(),
700 });
701 }
702
703 let timeline = Timeline::EpochMilliseconds;
706
707 let plan = if is_table {
708 let data_source = DataSourceDesc::Webhook {
709 validate_using,
710 body_format,
711 headers,
712 cluster_id: Some(in_cluster.id()),
713 };
714 let data_source = TableDataSource::DataSource {
715 desc: data_source,
716 timeline,
717 };
718 Plan::CreateTable(CreateTablePlan {
719 name,
720 if_not_exists,
721 table: Table {
722 create_sql,
723 desc: VersionedRelationDesc::new(desc),
724 temporary: false,
725 compaction_window: None,
726 data_source,
727 },
728 })
729 } else {
730 let data_source = DataSourceDesc::Webhook {
731 validate_using,
732 body_format,
733 headers,
734 cluster_id: None,
736 };
737 Plan::CreateSource(CreateSourcePlan {
738 name,
739 source: Source {
740 create_sql,
741 data_source,
742 desc,
743 compaction_window: None,
744 },
745 if_not_exists,
746 timeline,
747 in_cluster: Some(in_cluster.id()),
748 })
749 };
750
751 Ok(plan)
752}
753
754pub fn plan_create_source(
755 scx: &StatementContext,
756 mut stmt: CreateSourceStatement<Aug>,
757) -> Result<Plan, PlanError> {
758 let CreateSourceStatement {
759 name,
760 in_cluster: _,
761 col_names,
762 connection: source_connection,
763 envelope,
764 if_not_exists,
765 format,
766 key_constraint,
767 include_metadata,
768 with_options,
769 external_references: referenced_subsources,
770 progress_subsource,
771 } = &stmt;
772
773 mz_ore::soft_assert_or_log!(
774 referenced_subsources.is_none(),
775 "referenced subsources must be cleared in purification"
776 );
777
778 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
779 && scx.catalog.system_vars().force_source_table_syntax();
780
781 if force_source_table_syntax {
784 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
785 Err(PlanError::UseTablesForSources(
786 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
787 ))?;
788 }
789 }
790
791 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
792
793 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
794 && include_metadata
795 .iter()
796 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
797 {
798 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
800 }
801 if !matches!(
802 source_connection,
803 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
804 ) && !include_metadata.is_empty()
805 {
806 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
807 }
808
809 if !include_metadata.is_empty()
810 && !matches!(
811 envelope,
812 ast::SourceEnvelope::Upsert { .. }
813 | ast::SourceEnvelope::None
814 | ast::SourceEnvelope::Debezium
815 )
816 {
817 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
818 }
819
820 let external_connection =
821 plan_generic_source_connection(scx, source_connection, include_metadata)?;
822
823 let CreateSourceOptionExtracted {
824 timestamp_interval,
825 retain_history,
826 seen: _,
827 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
828
829 let metadata_columns_desc = match external_connection {
830 GenericSourceConnection::Kafka(KafkaSourceConnection {
831 ref metadata_columns,
832 ..
833 }) => kafka_metadata_columns_desc(metadata_columns),
834 _ => vec![],
835 };
836
837 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
839 scx,
840 &envelope,
841 format,
842 Some(external_connection.default_key_desc()),
843 external_connection.default_value_desc(),
844 include_metadata,
845 metadata_columns_desc,
846 &external_connection,
847 )?;
848 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
849
850 let names: Vec<_> = desc.iter_names().cloned().collect();
851 if let Some(dup) = names.iter().duplicates().next() {
852 sql_bail!("column {} specified more than once", dup.quoted());
853 }
854
855 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
857 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
860
861 let key_columns = columns
862 .into_iter()
863 .map(normalize::column_name)
864 .collect::<Vec<_>>();
865
866 let mut uniq = BTreeSet::new();
867 for col in key_columns.iter() {
868 if !uniq.insert(col) {
869 sql_bail!("Repeated column name in source key constraint: {}", col);
870 }
871 }
872
873 let key_indices = key_columns
874 .iter()
875 .map(|col| {
876 let name_idx = desc
877 .get_by_name(col)
878 .map(|(idx, _type)| idx)
879 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
880 if desc.get_unambiguous_name(name_idx).is_none() {
881 sql_bail!("Ambiguous column in source key constraint: {}", col);
882 }
883 Ok(name_idx)
884 })
885 .collect::<Result<Vec<_>, _>>()?;
886
887 if !desc.typ().keys.is_empty() {
888 return Err(key_constraint_err(&desc, &key_columns));
889 } else {
890 desc = desc.with_key(key_indices);
891 }
892 }
893
894 let timestamp_interval = match timestamp_interval {
895 Some(duration) => {
896 let min = scx.catalog.system_vars().min_timestamp_interval();
897 let max = scx.catalog.system_vars().max_timestamp_interval();
898 if duration < min || duration > max {
899 return Err(PlanError::InvalidTimestampInterval {
900 min,
901 max,
902 requested: duration,
903 });
904 }
905 duration
906 }
907 None => scx.catalog.config().timestamp_interval,
908 };
909
910 let (desc, data_source) = match progress_subsource {
911 Some(name) => {
912 let DeferredItemName::Named(name) = name else {
913 sql_bail!("[internal error] progress subsource must be named during purification");
914 };
915 let ResolvedItemName::Item { id, .. } = name else {
916 sql_bail!("[internal error] invalid target id");
917 };
918
919 let details = match external_connection {
920 GenericSourceConnection::Kafka(ref c) => {
921 SourceExportDetails::Kafka(KafkaSourceExportDetails {
922 metadata_columns: c.metadata_columns.clone(),
923 })
924 }
925 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
926 LoadGenerator::Auction
927 | LoadGenerator::Marketing
928 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
929 LoadGenerator::Counter { .. }
930 | LoadGenerator::Clock
931 | LoadGenerator::Datums
932 | LoadGenerator::KeyValue(_) => {
933 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
934 output: LoadGeneratorOutput::Default,
935 })
936 }
937 },
938 GenericSourceConnection::Postgres(_)
939 | GenericSourceConnection::MySql(_)
940 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
941 };
942
943 let data_source = DataSourceDesc::OldSyntaxIngestion {
944 desc: SourceDesc {
945 connection: external_connection,
946 timestamp_interval,
947 },
948 progress_subsource: *id,
949 data_config: SourceExportDataConfig {
950 encoding,
951 envelope: envelope.clone(),
952 },
953 details,
954 };
955 (desc, data_source)
956 }
957 None => {
958 let desc = external_connection.timestamp_desc();
959 let data_source = DataSourceDesc::Ingestion(SourceDesc {
960 connection: external_connection,
961 timestamp_interval,
962 });
963 (desc, data_source)
964 }
965 };
966
967 let if_not_exists = *if_not_exists;
968 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
969
970 let full_name = scx.catalog.resolve_full_name(&name);
972 let partial_name = PartialItemName::from(full_name.clone());
973 if let (false, Ok(item)) = (
976 if_not_exists,
977 scx.catalog.resolve_item_or_type(&partial_name),
978 ) {
979 return Err(PlanError::ItemAlreadyExists {
980 name: full_name.to_string(),
981 item_type: item.item_type(),
982 });
983 }
984
985 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
989
990 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
991
992 let timeline = match envelope {
994 SourceEnvelope::CdcV2 => {
995 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
996 }
997 _ => Timeline::EpochMilliseconds,
998 };
999
1000 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1001
1002 let source = Source {
1003 create_sql,
1004 data_source,
1005 desc,
1006 compaction_window,
1007 };
1008
1009 Ok(Plan::CreateSource(CreateSourcePlan {
1010 name,
1011 source,
1012 if_not_exists,
1013 timeline,
1014 in_cluster: Some(in_cluster.id()),
1015 }))
1016}
1017
1018pub fn plan_generic_source_connection(
1019 scx: &StatementContext<'_>,
1020 source_connection: &CreateSourceConnection<Aug>,
1021 include_metadata: &Vec<SourceIncludeMetadata>,
1022) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1023 Ok(match source_connection {
1024 CreateSourceConnection::Kafka {
1025 connection,
1026 options,
1027 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1028 scx,
1029 connection,
1030 options,
1031 include_metadata,
1032 )?),
1033 CreateSourceConnection::Postgres {
1034 connection,
1035 options,
1036 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1037 scx, connection, options,
1038 )?),
1039 CreateSourceConnection::SqlServer {
1040 connection,
1041 options,
1042 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1043 scx, connection, options,
1044 )?),
1045 CreateSourceConnection::MySql {
1046 connection,
1047 options,
1048 } => {
1049 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1053 scx,
1054 generator,
1055 options,
1056 include_metadata,
1057 )?)
1058 }
1059 })
1060}
1061
1062fn plan_load_generator_source_connection(
1063 scx: &StatementContext<'_>,
1064 generator: &ast::LoadGenerator,
1065 options: &Vec<LoadGeneratorOption<Aug>>,
1066 include_metadata: &Vec<SourceIncludeMetadata>,
1067) -> Result<LoadGeneratorSourceConnection, PlanError> {
1068 let load_generator =
1069 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1070 let LoadGeneratorOptionExtracted {
1071 tick_interval,
1072 as_of,
1073 up_to,
1074 ..
1075 } = options.clone().try_into()?;
1076 let tick_micros = match tick_interval {
1077 Some(interval) => Some(interval.as_micros().try_into()?),
1078 None => None,
1079 };
1080 if up_to < as_of {
1081 sql_bail!("UP TO cannot be less than AS OF");
1082 }
1083 Ok(LoadGeneratorSourceConnection {
1084 load_generator,
1085 tick_micros,
1086 as_of,
1087 up_to,
1088 })
1089}
1090
1091fn plan_mysql_source_connection(
1092 scx: &StatementContext<'_>,
1093 connection: &ResolvedItemName,
1094 options: &Vec<MySqlConfigOption<Aug>>,
1095) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1096 let connection_item = scx.get_item_by_resolved_name(connection)?;
1097 match connection_item.connection()? {
1098 Connection::MySql(connection) => connection,
1099 _ => sql_bail!(
1100 "{} is not a MySQL connection",
1101 scx.catalog.resolve_full_name(connection_item.name())
1102 ),
1103 };
1104 let MySqlConfigOptionExtracted {
1105 details,
1106 text_columns: _,
1110 exclude_columns: _,
1111 seen: _,
1112 } = options.clone().try_into()?;
1113 let details = details
1114 .as_ref()
1115 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1116 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1117 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1118 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1119 Ok(MySqlSourceConnection {
1120 connection: connection_item.id(),
1121 connection_id: connection_item.id(),
1122 details,
1123 })
1124}
1125
1126fn plan_sqlserver_source_connection(
1127 scx: &StatementContext<'_>,
1128 connection: &ResolvedItemName,
1129 options: &Vec<SqlServerConfigOption<Aug>>,
1130) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1131 let connection_item = scx.get_item_by_resolved_name(connection)?;
1132 match connection_item.connection()? {
1133 Connection::SqlServer(connection) => connection,
1134 _ => sql_bail!(
1135 "{} is not a SQL Server connection",
1136 scx.catalog.resolve_full_name(connection_item.name())
1137 ),
1138 };
1139 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1140 let details = details
1141 .as_ref()
1142 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1143 let extras = hex::decode(details)
1144 .map_err(|e| sql_err!("{e}"))
1145 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1146 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1147 Ok(SqlServerSourceConnection {
1148 connection_id: connection_item.id(),
1149 connection: connection_item.id(),
1150 extras,
1151 })
1152}
1153
1154fn plan_postgres_source_connection(
1155 scx: &StatementContext<'_>,
1156 connection: &ResolvedItemName,
1157 options: &Vec<PgConfigOption<Aug>>,
1158) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1159 let connection_item = scx.get_item_by_resolved_name(connection)?;
1160 let PgConfigOptionExtracted {
1161 details,
1162 publication,
1163 text_columns: _,
1167 exclude_columns: _,
1171 seen: _,
1172 } = options.clone().try_into()?;
1173 let details = details
1174 .as_ref()
1175 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1176 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1177 let details =
1178 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1179 let publication_details =
1180 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1181 Ok(PostgresSourceConnection {
1182 connection: connection_item.id(),
1183 connection_id: connection_item.id(),
1184 publication: publication.expect("validated exists during purification"),
1185 publication_details,
1186 })
1187}
1188
1189fn plan_kafka_source_connection(
1190 scx: &StatementContext<'_>,
1191 connection_name: &ResolvedItemName,
1192 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1193 include_metadata: &Vec<SourceIncludeMetadata>,
1194) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1195 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1196 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1197 sql_bail!(
1198 "{} is not a kafka connection",
1199 scx.catalog.resolve_full_name(connection_item.name())
1200 )
1201 }
1202 let KafkaSourceConfigOptionExtracted {
1203 group_id_prefix,
1204 topic,
1205 topic_metadata_refresh_interval,
1206 start_timestamp: _, start_offset,
1208 seen: _,
1209 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1210 let topic = topic.expect("validated exists during purification");
1211 let mut start_offsets = BTreeMap::new();
1212 if let Some(offsets) = start_offset {
1213 for (part, offset) in offsets.iter().enumerate() {
1214 if *offset < 0 {
1215 sql_bail!("START OFFSET must be a nonnegative integer");
1216 }
1217 start_offsets.insert(i32::try_from(part)?, *offset);
1218 }
1219 }
1220 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1221 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1224 }
1225 let metadata_columns = include_metadata
1226 .into_iter()
1227 .flat_map(|item| match item {
1228 SourceIncludeMetadata::Timestamp { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "timestamp".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Timestamp))
1234 }
1235 SourceIncludeMetadata::Partition { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "partition".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Partition))
1241 }
1242 SourceIncludeMetadata::Offset { alias } => {
1243 let name = match alias {
1244 Some(name) => name.to_string(),
1245 None => "offset".to_owned(),
1246 };
1247 Some((name, KafkaMetadataKind::Offset))
1248 }
1249 SourceIncludeMetadata::Headers { alias } => {
1250 let name = match alias {
1251 Some(name) => name.to_string(),
1252 None => "headers".to_owned(),
1253 };
1254 Some((name, KafkaMetadataKind::Headers))
1255 }
1256 SourceIncludeMetadata::Header {
1257 alias,
1258 key,
1259 use_bytes,
1260 } => Some((
1261 alias.to_string(),
1262 KafkaMetadataKind::Header {
1263 key: key.clone(),
1264 use_bytes: *use_bytes,
1265 },
1266 )),
1267 SourceIncludeMetadata::Key { .. } => {
1268 None
1270 }
1271 })
1272 .collect();
1273 Ok(KafkaSourceConnection {
1274 connection: connection_item.id(),
1275 connection_id: connection_item.id(),
1276 topic,
1277 start_offsets,
1278 group_id_prefix,
1279 topic_metadata_refresh_interval,
1280 metadata_columns,
1281 })
1282}
1283
1284fn apply_source_envelope_encoding(
1285 scx: &StatementContext,
1286 envelope: &ast::SourceEnvelope,
1287 format: &Option<FormatSpecifier<Aug>>,
1288 key_desc: Option<RelationDesc>,
1289 value_desc: RelationDesc,
1290 include_metadata: &[SourceIncludeMetadata],
1291 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1292 source_connection: &GenericSourceConnection<ReferencedConnection>,
1293) -> Result<
1294 (
1295 RelationDesc,
1296 SourceEnvelope,
1297 Option<SourceDataEncoding<ReferencedConnection>>,
1298 ),
1299 PlanError,
1300> {
1301 let encoding = match format {
1302 Some(format) => Some(get_encoding(scx, format, envelope)?),
1303 None => None,
1304 };
1305
1306 let (key_desc, value_desc) = match &encoding {
1307 Some(encoding) => {
1308 match value_desc.typ().columns() {
1311 [typ] => match typ.scalar_type {
1312 SqlScalarType::Bytes => {}
1313 _ => sql_bail!(
1314 "The schema produced by the source is incompatible with format decoding"
1315 ),
1316 },
1317 _ => sql_bail!(
1318 "The schema produced by the source is incompatible with format decoding"
1319 ),
1320 }
1321
1322 let (key_desc, value_desc) = encoding.desc()?;
1323
1324 let key_desc = key_desc.map(|desc| {
1331 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1332 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1333 if is_kafka && is_envelope_none {
1334 RelationDesc::from_names_and_types(
1335 desc.into_iter()
1336 .map(|(name, typ)| (name, typ.nullable(true))),
1337 )
1338 } else {
1339 desc
1340 }
1341 });
1342 (key_desc, value_desc)
1343 }
1344 None => (key_desc, value_desc),
1345 };
1346
1347 let key_envelope_no_encoding = matches!(
1360 source_connection,
1361 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1362 load_generator: LoadGenerator::KeyValue(_),
1363 ..
1364 })
1365 );
1366 let mut key_envelope = get_key_envelope(
1367 include_metadata,
1368 encoding.as_ref(),
1369 key_envelope_no_encoding,
1370 )?;
1371
1372 match (&envelope, &key_envelope) {
1373 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1374 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1375 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1376 ),
1377 _ => {}
1378 };
1379
1380 let envelope = match &envelope {
1389 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1391 ast::SourceEnvelope::Debezium => {
1392 let after_idx = match typecheck_debezium(&value_desc) {
1394 Ok((_before_idx, after_idx)) => Ok(after_idx),
1395 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1396 Some(DataEncoding::Avro(_)) => Err(type_err),
1397 _ => Err(sql_err!(
1398 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1399 )),
1400 },
1401 }?;
1402
1403 UnplannedSourceEnvelope::Upsert {
1404 style: UpsertStyle::Debezium { after_idx },
1405 }
1406 }
1407 ast::SourceEnvelope::Upsert {
1408 value_decode_err_policy,
1409 } => {
1410 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1411 None => {
1412 if !key_envelope_no_encoding {
1413 bail_unsupported!(format!(
1414 "UPSERT requires a key/value format: {:?}",
1415 format
1416 ))
1417 }
1418 None
1419 }
1420 Some(key_encoding) => Some(key_encoding),
1421 };
1422 if key_envelope == KeyEnvelope::None {
1425 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1426 }
1427 let style = match value_decode_err_policy.as_slice() {
1429 [] => UpsertStyle::Default(key_envelope),
1430 [SourceErrorPolicy::Inline { alias }] => {
1431 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1432 UpsertStyle::ValueErrInline {
1433 key_envelope,
1434 error_column: alias
1435 .as_ref()
1436 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1437 }
1438 }
1439 _ => {
1440 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1441 }
1442 };
1443
1444 UnplannedSourceEnvelope::Upsert { style }
1445 }
1446 ast::SourceEnvelope::CdcV2 => {
1447 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1448 match format {
1450 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1451 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1452 }
1453 UnplannedSourceEnvelope::CdcV2
1454 }
1455 };
1456
1457 let metadata_desc = included_column_desc(metadata_columns_desc);
1458 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1459
1460 Ok((desc, envelope, encoding))
1461}
1462
1463fn plan_source_export_desc(
1466 scx: &StatementContext,
1467 name: &UnresolvedItemName,
1468 columns: &Vec<ColumnDef<Aug>>,
1469 constraints: &Vec<TableConstraint<Aug>>,
1470) -> Result<RelationDesc, PlanError> {
1471 let names: Vec<_> = columns
1472 .iter()
1473 .map(|c| normalize::column_name(c.name.clone()))
1474 .collect();
1475
1476 if let Some(dup) = names.iter().duplicates().next() {
1477 sql_bail!("column {} specified more than once", dup.quoted());
1478 }
1479
1480 let mut column_types = Vec::with_capacity(columns.len());
1483 let mut keys = Vec::new();
1484
1485 for (i, c) in columns.into_iter().enumerate() {
1486 let aug_data_type = &c.data_type;
1487 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1488 let mut nullable = true;
1489 for option in &c.options {
1490 match &option.option {
1491 ColumnOption::NotNull => nullable = false,
1492 ColumnOption::Default(_) => {
1493 bail_unsupported!("Source export with default value")
1494 }
1495 ColumnOption::Unique { is_primary } => {
1496 keys.push(vec![i]);
1497 if *is_primary {
1498 nullable = false;
1499 }
1500 }
1501 other => {
1502 bail_unsupported!(format!("Source export with column constraint: {}", other))
1503 }
1504 }
1505 }
1506 column_types.push(ty.nullable(nullable));
1507 }
1508
1509 let mut seen_primary = false;
1510 'c: for constraint in constraints {
1511 match constraint {
1512 TableConstraint::Unique {
1513 name: _,
1514 columns,
1515 is_primary,
1516 nulls_not_distinct,
1517 } => {
1518 if seen_primary && *is_primary {
1519 sql_bail!(
1520 "multiple primary keys for source export {} are not allowed",
1521 name.to_ast_string_stable()
1522 );
1523 }
1524 seen_primary = *is_primary || seen_primary;
1525
1526 let mut key = vec![];
1527 for column in columns {
1528 let column = normalize::column_name(column.clone());
1529 match names.iter().position(|name| *name == column) {
1530 None => sql_bail!("unknown column in constraint: {}", column),
1531 Some(i) => {
1532 let nullable = &mut column_types[i].nullable;
1533 if *is_primary {
1534 if *nulls_not_distinct {
1535 sql_bail!(
1536 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1537 );
1538 }
1539 *nullable = false;
1540 } else if !(*nulls_not_distinct || !*nullable) {
1541 break 'c;
1544 }
1545
1546 key.push(i);
1547 }
1548 }
1549 }
1550
1551 if *is_primary {
1552 keys.insert(0, key);
1553 } else {
1554 keys.push(key);
1555 }
1556 }
1557 TableConstraint::ForeignKey { .. } => {
1558 bail_unsupported!("Source export with a foreign key")
1559 }
1560 TableConstraint::Check { .. } => {
1561 bail_unsupported!("Source export with a check constraint")
1562 }
1563 }
1564 }
1565
1566 let typ = SqlRelationType::new(column_types).with_keys(keys);
1567 let desc = RelationDesc::new(typ, names);
1568 Ok(desc)
1569}
1570
1571generate_extracted_config!(
1572 CreateSubsourceOption,
1573 (Progress, bool, Default(false)),
1574 (ExternalReference, UnresolvedItemName),
1575 (RetainHistory, OptionalDuration),
1576 (TextColumns, Vec::<Ident>, Default(vec![])),
1577 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1578 (Details, String)
1579);
1580
1581pub fn plan_create_subsource(
1582 scx: &StatementContext,
1583 stmt: CreateSubsourceStatement<Aug>,
1584) -> Result<Plan, PlanError> {
1585 let CreateSubsourceStatement {
1586 name,
1587 columns,
1588 of_source,
1589 constraints,
1590 if_not_exists,
1591 with_options,
1592 } = &stmt;
1593
1594 let CreateSubsourceOptionExtracted {
1595 progress,
1596 retain_history,
1597 external_reference,
1598 text_columns,
1599 exclude_columns,
1600 details,
1601 seen: _,
1602 } = with_options.clone().try_into()?;
1603
1604 assert!(
1609 progress ^ (external_reference.is_some() && of_source.is_some()),
1610 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1611 );
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.unwrap();
1630
1631 let details = details
1634 .as_ref()
1635 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1636 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1637 let details =
1638 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1641 let details = match details {
1642 SourceExportStatementDetails::Postgres { table } => {
1643 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1644 column_casts: crate::pure::postgres::generate_column_casts(
1645 scx,
1646 &table,
1647 &text_columns,
1648 )?,
1649 table,
1650 })
1651 }
1652 SourceExportStatementDetails::MySql {
1653 table,
1654 initial_gtid_set,
1655 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1656 table,
1657 initial_gtid_set,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::SqlServer {
1665 table,
1666 capture_instance,
1667 initial_lsn,
1668 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1669 capture_instance,
1670 table,
1671 initial_lsn,
1672 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1673 exclude_columns: exclude_columns
1674 .into_iter()
1675 .map(|c| c.into_string())
1676 .collect(),
1677 }),
1678 SourceExportStatementDetails::LoadGenerator { output } => {
1679 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1680 }
1681 SourceExportStatementDetails::Kafka {} => {
1682 bail_unsupported!("subsources cannot reference Kafka sources")
1683 }
1684 };
1685 DataSourceDesc::IngestionExport {
1686 ingestion_id,
1687 external_reference,
1688 details,
1689 data_config: SourceExportDataConfig {
1691 envelope: SourceEnvelope::None(NoneEnvelope {
1692 key_envelope: KeyEnvelope::None,
1693 key_arity: 0,
1694 }),
1695 encoding: None,
1696 },
1697 }
1698 } else if progress {
1699 DataSourceDesc::Progress
1700 } else {
1701 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1702 };
1703
1704 let if_not_exists = *if_not_exists;
1705 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1706
1707 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1708
1709 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1710 let source = Source {
1711 create_sql,
1712 data_source,
1713 desc,
1714 compaction_window,
1715 };
1716
1717 Ok(Plan::CreateSource(CreateSourcePlan {
1718 name,
1719 source,
1720 if_not_exists,
1721 timeline: Timeline::EpochMilliseconds,
1722 in_cluster: None,
1723 }))
1724}
1725
1726generate_extracted_config!(
1727 TableFromSourceOption,
1728 (TextColumns, Vec::<Ident>, Default(vec![])),
1729 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1730 (PartitionBy, Vec<Ident>),
1731 (RetainHistory, OptionalDuration),
1732 (Details, String)
1733);
1734
1735pub fn plan_create_table_from_source(
1736 scx: &StatementContext,
1737 stmt: CreateTableFromSourceStatement<Aug>,
1738) -> Result<Plan, PlanError> {
1739 if !scx.catalog.system_vars().enable_create_table_from_source() {
1740 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1741 }
1742
1743 let CreateTableFromSourceStatement {
1744 name,
1745 columns,
1746 constraints,
1747 if_not_exists,
1748 source,
1749 external_reference,
1750 envelope,
1751 format,
1752 include_metadata,
1753 with_options,
1754 } = &stmt;
1755
1756 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1757
1758 let TableFromSourceOptionExtracted {
1759 text_columns,
1760 exclude_columns,
1761 retain_history,
1762 partition_by,
1763 details,
1764 seen: _,
1765 } = with_options.clone().try_into()?;
1766
1767 let source_item = scx.get_item_by_resolved_name(source)?;
1768 let ingestion_id = source_item.id();
1769
1770 let details = details
1773 .as_ref()
1774 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1775 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1776 let details =
1777 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1778 let details =
1779 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1780
1781 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1782 && include_metadata
1783 .iter()
1784 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1785 {
1786 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1788 }
1789 if !matches!(
1790 details,
1791 SourceExportStatementDetails::Kafka { .. }
1792 | SourceExportStatementDetails::LoadGenerator { .. }
1793 ) && !include_metadata.is_empty()
1794 {
1795 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1796 }
1797
1798 let details = match details {
1799 SourceExportStatementDetails::Postgres { table } => {
1800 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1801 column_casts: crate::pure::postgres::generate_column_casts(
1802 scx,
1803 &table,
1804 &text_columns,
1805 )?,
1806 table,
1807 })
1808 }
1809 SourceExportStatementDetails::MySql {
1810 table,
1811 initial_gtid_set,
1812 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1813 table,
1814 initial_gtid_set,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::SqlServer {
1822 table,
1823 capture_instance,
1824 initial_lsn,
1825 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1826 table,
1827 capture_instance,
1828 initial_lsn,
1829 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1830 exclude_columns: exclude_columns
1831 .into_iter()
1832 .map(|c| c.into_string())
1833 .collect(),
1834 }),
1835 SourceExportStatementDetails::LoadGenerator { output } => {
1836 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1837 }
1838 SourceExportStatementDetails::Kafka {} => {
1839 if !include_metadata.is_empty()
1840 && !matches!(
1841 envelope,
1842 ast::SourceEnvelope::Upsert { .. }
1843 | ast::SourceEnvelope::None
1844 | ast::SourceEnvelope::Debezium
1845 )
1846 {
1847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1849 }
1850
1851 let metadata_columns = include_metadata
1852 .into_iter()
1853 .flat_map(|item| match item {
1854 SourceIncludeMetadata::Timestamp { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "timestamp".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Timestamp))
1860 }
1861 SourceIncludeMetadata::Partition { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "partition".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Partition))
1867 }
1868 SourceIncludeMetadata::Offset { alias } => {
1869 let name = match alias {
1870 Some(name) => name.to_string(),
1871 None => "offset".to_owned(),
1872 };
1873 Some((name, KafkaMetadataKind::Offset))
1874 }
1875 SourceIncludeMetadata::Headers { alias } => {
1876 let name = match alias {
1877 Some(name) => name.to_string(),
1878 None => "headers".to_owned(),
1879 };
1880 Some((name, KafkaMetadataKind::Headers))
1881 }
1882 SourceIncludeMetadata::Header {
1883 alias,
1884 key,
1885 use_bytes,
1886 } => Some((
1887 alias.to_string(),
1888 KafkaMetadataKind::Header {
1889 key: key.clone(),
1890 use_bytes: *use_bytes,
1891 },
1892 )),
1893 SourceIncludeMetadata::Key { .. } => {
1894 None
1896 }
1897 })
1898 .collect();
1899
1900 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1901 }
1902 };
1903
1904 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1905
1906 let (key_desc, value_desc) =
1911 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1912 let columns = match columns {
1913 TableFromSourceColumns::Defined(columns) => columns,
1914 _ => unreachable!(),
1915 };
1916 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1917 (None, desc)
1918 } else {
1919 let key_desc = source_connection.default_key_desc();
1920 let value_desc = source_connection.default_value_desc();
1921 (Some(key_desc), value_desc)
1922 };
1923
1924 let metadata_columns_desc = match &details {
1925 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1926 metadata_columns, ..
1927 }) => kafka_metadata_columns_desc(metadata_columns),
1928 _ => vec![],
1929 };
1930
1931 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1932 scx,
1933 &envelope,
1934 format,
1935 key_desc,
1936 value_desc,
1937 include_metadata,
1938 metadata_columns_desc,
1939 source_connection,
1940 )?;
1941 if let TableFromSourceColumns::Named(col_names) = columns {
1942 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1943 }
1944
1945 let names: Vec<_> = desc.iter_names().cloned().collect();
1946 if let Some(dup) = names.iter().duplicates().next() {
1947 sql_bail!("column {} specified more than once", dup.quoted());
1948 }
1949
1950 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1951
1952 let timeline = match envelope {
1955 SourceEnvelope::CdcV2 => {
1956 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1957 }
1958 _ => Timeline::EpochMilliseconds,
1959 };
1960
1961 if let Some(partition_by) = partition_by {
1962 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1963 check_partition_by(&desc, partition_by)?;
1964 }
1965
1966 let data_source = DataSourceDesc::IngestionExport {
1967 ingestion_id,
1968 external_reference: external_reference
1969 .as_ref()
1970 .expect("populated in purification")
1971 .clone(),
1972 details,
1973 data_config: SourceExportDataConfig { envelope, encoding },
1974 };
1975
1976 let if_not_exists = *if_not_exists;
1977
1978 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1979
1980 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1981 let table = Table {
1982 create_sql,
1983 desc: VersionedRelationDesc::new(desc),
1984 temporary: false,
1985 compaction_window,
1986 data_source: TableDataSource::DataSource {
1987 desc: data_source,
1988 timeline,
1989 },
1990 };
1991
1992 Ok(Plan::CreateTable(CreateTablePlan {
1993 name,
1994 table,
1995 if_not_exists,
1996 }))
1997}
1998
1999generate_extracted_config!(
2000 LoadGeneratorOption,
2001 (TickInterval, Duration),
2002 (AsOf, u64, Default(0_u64)),
2003 (UpTo, u64, Default(u64::MAX)),
2004 (ScaleFactor, f64),
2005 (MaxCardinality, u64),
2006 (Keys, u64),
2007 (SnapshotRounds, u64),
2008 (TransactionalSnapshot, bool),
2009 (ValueSize, u64),
2010 (Seed, u64),
2011 (Partitions, u64),
2012 (BatchSize, u64)
2013);
2014
2015impl LoadGeneratorOptionExtracted {
2016 pub(super) fn ensure_only_valid_options(
2017 &self,
2018 loadgen: &ast::LoadGenerator,
2019 ) -> Result<(), PlanError> {
2020 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2021
2022 let mut options = self.seen.clone();
2023
2024 let permitted_options: &[_] = match loadgen {
2025 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2026 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2027 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2028 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2029 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2030 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2031 ast::LoadGenerator::KeyValue => &[
2032 TickInterval,
2033 Keys,
2034 SnapshotRounds,
2035 TransactionalSnapshot,
2036 ValueSize,
2037 Seed,
2038 Partitions,
2039 BatchSize,
2040 ],
2041 };
2042
2043 for o in permitted_options {
2044 options.remove(o);
2045 }
2046
2047 if !options.is_empty() {
2048 sql_bail!(
2049 "{} load generators do not support {} values",
2050 loadgen,
2051 options.iter().join(", ")
2052 )
2053 }
2054
2055 Ok(())
2056 }
2057}
2058
2059pub(crate) fn load_generator_ast_to_generator(
2060 scx: &StatementContext,
2061 loadgen: &ast::LoadGenerator,
2062 options: &[LoadGeneratorOption<Aug>],
2063 include_metadata: &[SourceIncludeMetadata],
2064) -> Result<LoadGenerator, PlanError> {
2065 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2066 extracted.ensure_only_valid_options(loadgen)?;
2067
2068 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2069 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2070 }
2071
2072 let load_generator = match loadgen {
2073 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2074 ast::LoadGenerator::Clock => {
2075 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2076 LoadGenerator::Clock
2077 }
2078 ast::LoadGenerator::Counter => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2080 let LoadGeneratorOptionExtracted {
2081 max_cardinality, ..
2082 } = extracted;
2083 LoadGenerator::Counter { max_cardinality }
2084 }
2085 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2086 ast::LoadGenerator::Datums => {
2087 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2088 LoadGenerator::Datums
2089 }
2090 ast::LoadGenerator::Tpch => {
2091 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2092
2093 let sf: f64 = scale_factor.unwrap_or(0.01);
2095 if !sf.is_finite() || sf < 0.0 {
2096 sql_bail!("unsupported scale factor {sf}");
2097 }
2098
2099 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2100 let total = (sf * multiplier).floor();
2101 let mut i = i64::try_cast_from(total)
2102 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2103 if i < 1 {
2104 i = 1;
2105 }
2106 Ok(i)
2107 };
2108
2109 let count_supplier = f_to_i(10_000f64)?;
2112 let count_part = f_to_i(200_000f64)?;
2113 let count_customer = f_to_i(150_000f64)?;
2114 let count_orders = f_to_i(150_000f64 * 10f64)?;
2115 let count_clerk = f_to_i(1_000f64)?;
2116
2117 LoadGenerator::Tpch {
2118 count_supplier,
2119 count_part,
2120 count_customer,
2121 count_orders,
2122 count_clerk,
2123 }
2124 }
2125 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2126 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2127 let LoadGeneratorOptionExtracted {
2128 keys,
2129 snapshot_rounds,
2130 transactional_snapshot,
2131 value_size,
2132 tick_interval,
2133 seed,
2134 partitions,
2135 batch_size,
2136 ..
2137 } = extracted;
2138
2139 let mut include_offset = None;
2140 for im in include_metadata {
2141 match im {
2142 SourceIncludeMetadata::Offset { alias } => {
2143 include_offset = match alias {
2144 Some(alias) => Some(alias.to_string()),
2145 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2146 }
2147 }
2148 SourceIncludeMetadata::Key { .. } => continue,
2149
2150 _ => {
2151 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2152 }
2153 };
2154 }
2155
2156 let lgkv = KeyValueLoadGenerator {
2157 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2158 snapshot_rounds: snapshot_rounds
2159 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2160 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2162 value_size: value_size
2163 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2164 partitions: partitions
2165 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2166 tick_interval,
2167 batch_size: batch_size
2168 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2169 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2170 include_offset,
2171 };
2172
2173 if lgkv.keys == 0
2174 || lgkv.partitions == 0
2175 || lgkv.value_size == 0
2176 || lgkv.batch_size == 0
2177 {
2178 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2179 }
2180
2181 if lgkv.keys % lgkv.partitions != 0 {
2182 sql_bail!("KEYS must be a multiple of PARTITIONS")
2183 }
2184
2185 if lgkv.batch_size > lgkv.keys {
2186 sql_bail!("KEYS must be larger than BATCH SIZE")
2187 }
2188
2189 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2192 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2193 }
2194
2195 if lgkv.snapshot_rounds == 0 {
2196 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2197 }
2198
2199 LoadGenerator::KeyValue(lgkv)
2200 }
2201 };
2202
2203 Ok(load_generator)
2204}
2205
2206fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2207 let before = value_desc.get_by_name(&"before".into());
2208 let (after_idx, after_ty) = value_desc
2209 .get_by_name(&"after".into())
2210 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2211 let before_idx = if let Some((before_idx, before_ty)) = before {
2212 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2213 sql_bail!("'before' column must be of type record");
2214 }
2215 if before_ty != after_ty {
2216 sql_bail!("'before' type differs from 'after' column");
2217 }
2218 Some(before_idx)
2219 } else {
2220 None
2221 };
2222 Ok((before_idx, after_idx))
2223}
2224
2225fn get_encoding(
2226 scx: &StatementContext,
2227 format: &FormatSpecifier<Aug>,
2228 envelope: &ast::SourceEnvelope,
2229) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2230 let encoding = match format {
2231 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2232 FormatSpecifier::KeyValue { key, value } => {
2233 let key = {
2234 let encoding = get_encoding_inner(scx, key)?;
2235 Some(encoding.key.unwrap_or(encoding.value))
2236 };
2237 let value = get_encoding_inner(scx, value)?.value;
2238 SourceDataEncoding { key, value }
2239 }
2240 };
2241
2242 let requires_keyvalue = matches!(
2243 envelope,
2244 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2245 );
2246 let is_keyvalue = encoding.key.is_some();
2247 if requires_keyvalue && !is_keyvalue {
2248 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2249 };
2250
2251 Ok(encoding)
2252}
2253
2254fn source_sink_cluster_config<'a, 'ctx>(
2260 scx: &'a StatementContext<'ctx>,
2261 in_cluster: &mut Option<ResolvedClusterName>,
2262) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2263 let cluster = match in_cluster {
2264 None => {
2265 let cluster = scx.catalog.resolve_cluster(None)?;
2266 *in_cluster = Some(ResolvedClusterName {
2267 id: cluster.id(),
2268 print_name: None,
2269 });
2270 cluster
2271 }
2272 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2273 };
2274
2275 Ok(cluster)
2276}
2277
2278generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2279
2280#[derive(Debug)]
2281pub struct Schema {
2282 pub key_schema: Option<String>,
2283 pub value_schema: String,
2284 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2285 pub confluent_wire_format: bool,
2286}
2287
2288fn get_encoding_inner(
2289 scx: &StatementContext,
2290 format: &Format<Aug>,
2291) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2292 let value = match format {
2293 Format::Bytes => DataEncoding::Bytes,
2294 Format::Avro(schema) => {
2295 let Schema {
2296 key_schema,
2297 value_schema,
2298 csr_connection,
2299 confluent_wire_format,
2300 } = match schema {
2301 AvroSchema::InlineSchema {
2304 schema: ast::Schema { schema },
2305 with_options,
2306 } => {
2307 let AvroSchemaOptionExtracted {
2308 confluent_wire_format,
2309 ..
2310 } = with_options.clone().try_into()?;
2311
2312 Schema {
2313 key_schema: None,
2314 value_schema: schema.clone(),
2315 csr_connection: None,
2316 confluent_wire_format,
2317 }
2318 }
2319 AvroSchema::Csr {
2320 csr_connection:
2321 CsrConnectionAvro {
2322 connection,
2323 seed,
2324 key_strategy: _,
2325 value_strategy: _,
2326 },
2327 } => {
2328 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2329 let csr_connection = match item.connection()? {
2330 Connection::Csr(_) => item.id(),
2331 _ => {
2332 sql_bail!(
2333 "{} is not a schema registry connection",
2334 scx.catalog
2335 .resolve_full_name(item.name())
2336 .to_string()
2337 .quoted()
2338 )
2339 }
2340 };
2341
2342 if let Some(seed) = seed {
2343 Schema {
2344 key_schema: seed.key_schema.clone(),
2345 value_schema: seed.value_schema.clone(),
2346 csr_connection: Some(csr_connection),
2347 confluent_wire_format: true,
2348 }
2349 } else {
2350 unreachable!("CSR seed resolution should already have been called: Avro")
2351 }
2352 }
2353 };
2354
2355 if let Some(key_schema) = key_schema {
2356 return Ok(SourceDataEncoding {
2357 key: Some(DataEncoding::Avro(AvroEncoding {
2358 schema: key_schema,
2359 csr_connection: csr_connection.clone(),
2360 confluent_wire_format,
2361 })),
2362 value: DataEncoding::Avro(AvroEncoding {
2363 schema: value_schema,
2364 csr_connection,
2365 confluent_wire_format,
2366 }),
2367 });
2368 } else {
2369 DataEncoding::Avro(AvroEncoding {
2370 schema: value_schema,
2371 csr_connection,
2372 confluent_wire_format,
2373 })
2374 }
2375 }
2376 Format::Protobuf(schema) => match schema {
2377 ProtobufSchema::Csr {
2378 csr_connection:
2379 CsrConnectionProtobuf {
2380 connection:
2381 CsrConnection {
2382 connection,
2383 options,
2384 },
2385 seed,
2386 },
2387 } => {
2388 if let Some(CsrSeedProtobuf { key, value }) = seed {
2389 let item = scx.get_item_by_resolved_name(connection)?;
2390 let _ = match item.connection()? {
2391 Connection::Csr(connection) => connection,
2392 _ => {
2393 sql_bail!(
2394 "{} is not a schema registry connection",
2395 scx.catalog
2396 .resolve_full_name(item.name())
2397 .to_string()
2398 .quoted()
2399 )
2400 }
2401 };
2402
2403 if !options.is_empty() {
2404 sql_bail!("Protobuf CSR connections do not support any options");
2405 }
2406
2407 let value = DataEncoding::Protobuf(ProtobufEncoding {
2408 descriptors: strconv::parse_bytes(&value.schema)?,
2409 message_name: value.message_name.clone(),
2410 confluent_wire_format: true,
2411 });
2412 if let Some(key) = key {
2413 return Ok(SourceDataEncoding {
2414 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2415 descriptors: strconv::parse_bytes(&key.schema)?,
2416 message_name: key.message_name.clone(),
2417 confluent_wire_format: true,
2418 })),
2419 value,
2420 });
2421 }
2422 value
2423 } else {
2424 unreachable!("CSR seed resolution should already have been called: Proto")
2425 }
2426 }
2427 ProtobufSchema::InlineSchema {
2428 message_name,
2429 schema: ast::Schema { schema },
2430 } => {
2431 let descriptors = strconv::parse_bytes(schema)?;
2432
2433 DataEncoding::Protobuf(ProtobufEncoding {
2434 descriptors,
2435 message_name: message_name.to_owned(),
2436 confluent_wire_format: false,
2437 })
2438 }
2439 },
2440 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2441 regex: mz_repr::adt::regex::Regex::new(regex, false)
2442 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2443 }),
2444 Format::Csv { columns, delimiter } => {
2445 let columns = match columns {
2446 CsvColumns::Header { names } => {
2447 if names.is_empty() {
2448 sql_bail!("[internal error] column spec should get names in purify")
2449 }
2450 ColumnSpec::Header {
2451 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2452 }
2453 }
2454 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2455 };
2456 DataEncoding::Csv(CsvEncoding {
2457 columns,
2458 delimiter: u8::try_from(*delimiter)
2459 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2460 })
2461 }
2462 Format::Json { array: false } => DataEncoding::Json,
2463 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2464 Format::Text => DataEncoding::Text,
2465 };
2466 Ok(SourceDataEncoding { key: None, value })
2467}
2468
2469fn get_key_envelope(
2471 included_items: &[SourceIncludeMetadata],
2472 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2473 key_envelope_no_encoding: bool,
2474) -> Result<KeyEnvelope, PlanError> {
2475 let key_definition = included_items
2476 .iter()
2477 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2478 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2479 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2480 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2481 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2482 (Some(name), _) if key_envelope_no_encoding => {
2483 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2484 }
2485 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2486 (_, None) => {
2487 sql_bail!(
2491 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2492 got bare FORMAT"
2493 );
2494 }
2495 }
2496 } else {
2497 Ok(KeyEnvelope::None)
2498 }
2499}
2500
2501fn get_unnamed_key_envelope(
2504 key: Option<&DataEncoding<ReferencedConnection>>,
2505) -> Result<KeyEnvelope, PlanError> {
2506 let is_composite = match key {
2510 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2511 Some(
2512 DataEncoding::Avro(_)
2513 | DataEncoding::Csv(_)
2514 | DataEncoding::Protobuf(_)
2515 | DataEncoding::Regex { .. },
2516 ) => true,
2517 None => false,
2518 };
2519
2520 if is_composite {
2521 Ok(KeyEnvelope::Flattened)
2522 } else {
2523 Ok(KeyEnvelope::Named("key".to_string()))
2524 }
2525}
2526
2527pub fn describe_create_view(
2528 _: &StatementContext,
2529 _: CreateViewStatement<Aug>,
2530) -> Result<StatementDesc, PlanError> {
2531 Ok(StatementDesc::new(None))
2532}
2533
2534pub fn plan_view(
2535 scx: &StatementContext,
2536 def: &mut ViewDefinition<Aug>,
2537 temporary: bool,
2538) -> Result<(QualifiedItemName, View), PlanError> {
2539 let create_sql = normalize::create_statement(
2540 scx,
2541 Statement::CreateView(CreateViewStatement {
2542 if_exists: IfExistsBehavior::Error,
2543 temporary,
2544 definition: def.clone(),
2545 }),
2546 )?;
2547
2548 let ViewDefinition {
2549 name,
2550 columns,
2551 query,
2552 } = def;
2553
2554 let query::PlannedRootQuery {
2555 expr,
2556 mut desc,
2557 finishing,
2558 scope: _,
2559 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2560 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2566 &finishing,
2567 expr.arity()
2568 ));
2569 if expr.contains_parameters()? {
2570 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2571 }
2572
2573 let dependencies = expr
2574 .depends_on()
2575 .into_iter()
2576 .map(|gid| scx.catalog.resolve_item_id(&gid))
2577 .collect();
2578
2579 let name = if temporary {
2580 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2581 } else {
2582 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2583 };
2584
2585 plan_utils::maybe_rename_columns(
2586 format!("view {}", scx.catalog.resolve_full_name(&name)),
2587 &mut desc,
2588 columns,
2589 )?;
2590 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2591
2592 if let Some(dup) = names.iter().duplicates().next() {
2593 sql_bail!("column {} specified more than once", dup.quoted());
2594 }
2595
2596 let view = View {
2597 create_sql,
2598 expr,
2599 dependencies,
2600 column_names: names,
2601 temporary,
2602 };
2603
2604 Ok((name, view))
2605}
2606
2607pub fn plan_create_view(
2608 scx: &StatementContext,
2609 mut stmt: CreateViewStatement<Aug>,
2610) -> Result<Plan, PlanError> {
2611 let CreateViewStatement {
2612 temporary,
2613 if_exists,
2614 definition,
2615 } = &mut stmt;
2616 let (name, view) = plan_view(scx, definition, *temporary)?;
2617
2618 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2621
2622 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2623 let if_exists = true;
2624 let cascade = false;
2625 let maybe_item_to_drop = plan_drop_item(
2626 scx,
2627 ObjectType::View,
2628 if_exists,
2629 definition.name.clone(),
2630 cascade,
2631 )?;
2632
2633 if let Some(id) = maybe_item_to_drop {
2635 let dependencies = view.expr.depends_on();
2636 let invalid_drop = scx
2637 .get_item(&id)
2638 .global_ids()
2639 .any(|gid| dependencies.contains(&gid));
2640 if invalid_drop {
2641 let item = scx.catalog.get_item(&id);
2642 sql_bail!(
2643 "cannot replace view {0}: depended upon by new {0} definition",
2644 scx.catalog.resolve_full_name(item.name())
2645 );
2646 }
2647
2648 Some(id)
2649 } else {
2650 None
2651 }
2652 } else {
2653 None
2654 };
2655 let drop_ids = replace
2656 .map(|id| {
2657 scx.catalog
2658 .item_dependents(id)
2659 .into_iter()
2660 .map(|id| id.unwrap_item_id())
2661 .collect()
2662 })
2663 .unwrap_or_default();
2664
2665 let full_name = scx.catalog.resolve_full_name(&name);
2667 let partial_name = PartialItemName::from(full_name.clone());
2668 if let (Ok(item), IfExistsBehavior::Error, false) = (
2671 scx.catalog.resolve_item_or_type(&partial_name),
2672 *if_exists,
2673 ignore_if_exists_errors,
2674 ) {
2675 return Err(PlanError::ItemAlreadyExists {
2676 name: full_name.to_string(),
2677 item_type: item.item_type(),
2678 });
2679 }
2680
2681 Ok(Plan::CreateView(CreateViewPlan {
2682 name,
2683 view,
2684 replace,
2685 drop_ids,
2686 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2687 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2688 }))
2689}
2690
2691pub fn describe_create_materialized_view(
2692 _: &StatementContext,
2693 _: CreateMaterializedViewStatement<Aug>,
2694) -> Result<StatementDesc, PlanError> {
2695 Ok(StatementDesc::new(None))
2696}
2697
2698pub fn describe_create_continual_task(
2699 _: &StatementContext,
2700 _: CreateContinualTaskStatement<Aug>,
2701) -> Result<StatementDesc, PlanError> {
2702 Ok(StatementDesc::new(None))
2703}
2704
2705pub fn describe_create_network_policy(
2706 _: &StatementContext,
2707 _: CreateNetworkPolicyStatement<Aug>,
2708) -> Result<StatementDesc, PlanError> {
2709 Ok(StatementDesc::new(None))
2710}
2711
2712pub fn describe_alter_network_policy(
2713 _: &StatementContext,
2714 _: AlterNetworkPolicyStatement<Aug>,
2715) -> Result<StatementDesc, PlanError> {
2716 Ok(StatementDesc::new(None))
2717}
2718
2719pub fn plan_create_materialized_view(
2720 scx: &StatementContext,
2721 mut stmt: CreateMaterializedViewStatement<Aug>,
2722) -> Result<Plan, PlanError> {
2723 let cluster_id =
2724 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2725 stmt.in_cluster = Some(ResolvedClusterName {
2726 id: cluster_id,
2727 print_name: None,
2728 });
2729
2730 let create_sql =
2731 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2732
2733 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2734 let name = scx.allocate_qualified_name(partial_name.clone())?;
2735
2736 let query::PlannedRootQuery {
2737 expr,
2738 mut desc,
2739 finishing,
2740 scope: _,
2741 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2742 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2744 &finishing,
2745 expr.arity()
2746 ));
2747 if expr.contains_parameters()? {
2748 return Err(PlanError::ParameterNotAllowed(
2749 "materialized views".to_string(),
2750 ));
2751 }
2752
2753 plan_utils::maybe_rename_columns(
2754 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2755 &mut desc,
2756 &stmt.columns,
2757 )?;
2758 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2759
2760 let MaterializedViewOptionExtracted {
2761 assert_not_null,
2762 partition_by,
2763 retain_history,
2764 refresh,
2765 seen: _,
2766 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2767
2768 if let Some(partition_by) = partition_by {
2769 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2770 check_partition_by(&desc, partition_by)?;
2771 }
2772
2773 let refresh_schedule = {
2774 let mut refresh_schedule = RefreshSchedule::default();
2775 let mut on_commits_seen = 0;
2776 for refresh_option_value in refresh {
2777 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2778 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2779 }
2780 match refresh_option_value {
2781 RefreshOptionValue::OnCommit => {
2782 on_commits_seen += 1;
2783 }
2784 RefreshOptionValue::AtCreation => {
2785 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2786 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2787 }
2788 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2789 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2791 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2792 name: "REFRESH AT",
2793 scope: &Scope::empty(),
2794 relation_type: &SqlRelationType::empty(),
2795 allow_aggregates: false,
2796 allow_subqueries: false,
2797 allow_parameters: false,
2798 allow_windows: false,
2799 };
2800 let hir = plan_expr(ecx, &time)?.cast_to(
2801 ecx,
2802 CastContext::Assignment,
2803 &SqlScalarType::MzTimestamp,
2804 )?;
2805 let timestamp = hir
2807 .into_literal_mz_timestamp()
2808 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2809 refresh_schedule.ats.push(timestamp);
2810 }
2811 RefreshOptionValue::Every(RefreshEveryOptionValue {
2812 interval,
2813 aligned_to,
2814 }) => {
2815 let interval = Interval::try_from_value(Value::Interval(interval))?;
2816 if interval.as_microseconds() <= 0 {
2817 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2818 }
2819 if interval.months != 0 {
2820 sql_bail!("REFRESH interval must not involve units larger than days");
2825 }
2826 let interval = interval.duration()?;
2827 if u64::try_from(interval.as_millis()).is_err() {
2828 sql_bail!("REFRESH interval too large");
2829 }
2830
2831 let mut aligned_to = match aligned_to {
2832 Some(aligned_to) => aligned_to,
2833 None => {
2834 soft_panic_or_log!(
2835 "ALIGNED TO should have been filled in by purification"
2836 );
2837 sql_bail!(
2838 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2839 )
2840 }
2841 };
2842
2843 transform_ast::transform(scx, &mut aligned_to)?;
2845
2846 let ecx = &ExprContext {
2847 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2848 name: "REFRESH EVERY ... ALIGNED TO",
2849 scope: &Scope::empty(),
2850 relation_type: &SqlRelationType::empty(),
2851 allow_aggregates: false,
2852 allow_subqueries: false,
2853 allow_parameters: false,
2854 allow_windows: false,
2855 };
2856 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2857 ecx,
2858 CastContext::Assignment,
2859 &SqlScalarType::MzTimestamp,
2860 )?;
2861 let aligned_to_const = aligned_to_hir
2863 .into_literal_mz_timestamp()
2864 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2865
2866 refresh_schedule.everies.push(RefreshEvery {
2867 interval,
2868 aligned_to: aligned_to_const,
2869 });
2870 }
2871 }
2872 }
2873
2874 if on_commits_seen > 1 {
2875 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2876 }
2877 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2878 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2879 }
2880
2881 if refresh_schedule == RefreshSchedule::default() {
2882 None
2883 } else {
2884 Some(refresh_schedule)
2885 }
2886 };
2887
2888 let as_of = stmt.as_of.map(Timestamp::from);
2889 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2890 let mut non_null_assertions = assert_not_null
2891 .into_iter()
2892 .map(normalize::column_name)
2893 .map(|assertion_name| {
2894 column_names
2895 .iter()
2896 .position(|col| col == &assertion_name)
2897 .ok_or_else(|| {
2898 sql_err!(
2899 "column {} in ASSERT NOT NULL option not found",
2900 assertion_name.quoted()
2901 )
2902 })
2903 })
2904 .collect::<Result<Vec<_>, _>>()?;
2905 non_null_assertions.sort();
2906 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2907 let dup = &column_names[*dup];
2908 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2909 }
2910
2911 if let Some(dup) = column_names.iter().duplicates().next() {
2912 sql_bail!("column {} specified more than once", dup.quoted());
2913 }
2914
2915 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2918 Ok(true) => IfExistsBehavior::Skip,
2919 _ => stmt.if_exists,
2920 };
2921
2922 let mut replace = None;
2923 let mut if_not_exists = false;
2924 match if_exists {
2925 IfExistsBehavior::Replace => {
2926 let if_exists = true;
2927 let cascade = false;
2928 let replace_id = plan_drop_item(
2929 scx,
2930 ObjectType::MaterializedView,
2931 if_exists,
2932 partial_name.clone().into(),
2933 cascade,
2934 )?;
2935
2936 if let Some(id) = replace_id {
2938 let dependencies = expr.depends_on();
2939 let invalid_drop = scx
2940 .get_item(&id)
2941 .global_ids()
2942 .any(|gid| dependencies.contains(&gid));
2943 if invalid_drop {
2944 let item = scx.catalog.get_item(&id);
2945 sql_bail!(
2946 "cannot replace materialized view {0}: depended upon by new {0} definition",
2947 scx.catalog.resolve_full_name(item.name())
2948 );
2949 }
2950 replace = Some(id);
2951 }
2952 }
2953 IfExistsBehavior::Skip => if_not_exists = true,
2954 IfExistsBehavior::Error => (),
2955 }
2956 let drop_ids = replace
2957 .map(|id| {
2958 scx.catalog
2959 .item_dependents(id)
2960 .into_iter()
2961 .map(|id| id.unwrap_item_id())
2962 .collect()
2963 })
2964 .unwrap_or_default();
2965 let mut dependencies: BTreeSet<_> = expr
2966 .depends_on()
2967 .into_iter()
2968 .map(|gid| scx.catalog.resolve_item_id(&gid))
2969 .collect();
2970
2971 let mut replacement_target = None;
2973 if let Some(target_name) = &stmt.replacing {
2974 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
2975
2976 let target = scx.get_item_by_resolved_name(target_name)?;
2977 if target.item_type() != CatalogItemType::MaterializedView {
2978 return Err(PlanError::InvalidReplacement {
2979 item_type: target.item_type(),
2980 item_name: scx.catalog.minimal_qualification(target.name()),
2981 replacement_type: CatalogItemType::MaterializedView,
2982 replacement_name: partial_name,
2983 });
2984 }
2985 if target.id().is_system() {
2986 sql_bail!(
2987 "cannot replace {} because it is required by the database system",
2988 scx.catalog.minimal_qualification(target.name()),
2989 );
2990 }
2991
2992 if !dependencies.insert(target.id()) {
2993 sql_bail!(
2994 "cannot replace {} because it is also a dependency",
2995 scx.catalog.minimal_qualification(target.name()),
2996 );
2997 }
2998
2999 replacement_target = Some(target.id());
3000 }
3001
3002 let full_name = scx.catalog.resolve_full_name(&name);
3004 let partial_name = PartialItemName::from(full_name.clone());
3005 if let (IfExistsBehavior::Error, Ok(item)) =
3008 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3009 {
3010 return Err(PlanError::ItemAlreadyExists {
3011 name: full_name.to_string(),
3012 item_type: item.item_type(),
3013 });
3014 }
3015
3016 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3017 name,
3018 materialized_view: MaterializedView {
3019 create_sql,
3020 expr,
3021 dependencies: DependencyIds(dependencies),
3022 column_names,
3023 replacement_target,
3024 cluster_id,
3025 non_null_assertions,
3026 compaction_window,
3027 refresh_schedule,
3028 as_of,
3029 },
3030 replace,
3031 drop_ids,
3032 if_not_exists,
3033 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3034 }))
3035}
3036
3037generate_extracted_config!(
3038 MaterializedViewOption,
3039 (AssertNotNull, Ident, AllowMultiple),
3040 (PartitionBy, Vec<Ident>),
3041 (RetainHistory, OptionalDuration),
3042 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3043);
3044
3045pub fn plan_create_continual_task(
3046 scx: &StatementContext,
3047 mut stmt: CreateContinualTaskStatement<Aug>,
3048) -> Result<Plan, PlanError> {
3049 match &stmt.sugar {
3050 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3051 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3052 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3053 }
3054 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3055 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3056 }
3057 };
3058 let cluster_id = match &stmt.in_cluster {
3059 None => scx.catalog.resolve_cluster(None)?.id(),
3060 Some(in_cluster) => in_cluster.id,
3061 };
3062 stmt.in_cluster = Some(ResolvedClusterName {
3063 id: cluster_id,
3064 print_name: None,
3065 });
3066
3067 let create_sql =
3068 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3069
3070 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3071
3072 let mut desc = match stmt.columns {
3077 None => None,
3078 Some(columns) => {
3079 let mut desc_columns = Vec::with_capacity(columns.capacity());
3080 for col in columns.iter() {
3081 desc_columns.push((
3082 normalize::column_name(col.name.clone()),
3083 SqlColumnType {
3084 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3085 nullable: false,
3086 },
3087 ));
3088 }
3089 Some(RelationDesc::from_names_and_types(desc_columns))
3090 }
3091 };
3092 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3093 match input.item_type() {
3094 CatalogItemType::ContinualTask
3097 | CatalogItemType::Table
3098 | CatalogItemType::MaterializedView
3099 | CatalogItemType::Source => {}
3100 CatalogItemType::Sink
3101 | CatalogItemType::View
3102 | CatalogItemType::Index
3103 | CatalogItemType::Type
3104 | CatalogItemType::Func
3105 | CatalogItemType::Secret
3106 | CatalogItemType::Connection => {
3107 sql_bail!(
3108 "CONTINUAL TASK cannot use {} as an input",
3109 input.item_type()
3110 );
3111 }
3112 }
3113
3114 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3115 let ct_name = stmt.name;
3116 let placeholder_id = match &ct_name {
3117 ResolvedItemName::ContinualTask { id, name } => {
3118 let desc = match desc.as_ref().cloned() {
3119 Some(x) => x,
3120 None => {
3121 let desc = input.relation_desc().expect("item type checked above");
3126 desc.into_owned()
3127 }
3128 };
3129 qcx.ctes.insert(
3130 *id,
3131 CteDesc {
3132 name: name.item.clone(),
3133 desc,
3134 },
3135 );
3136 Some(*id)
3137 }
3138 _ => None,
3139 };
3140
3141 let mut exprs = Vec::new();
3142 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3143 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3144 let query::PlannedRootQuery {
3145 expr,
3146 desc: desc_query,
3147 finishing,
3148 scope: _,
3149 } = query::plan_ct_query(&mut qcx, query)?;
3150 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3153 &finishing,
3154 expr.arity()
3155 ));
3156 if expr.contains_parameters()? {
3157 if expr.contains_parameters()? {
3158 return Err(PlanError::ParameterNotAllowed(
3159 "continual tasks".to_string(),
3160 ));
3161 }
3162 }
3163 let expr = match desc.as_mut() {
3164 None => {
3165 desc = Some(desc_query);
3166 expr
3167 }
3168 Some(desc) => {
3169 if desc_query.arity() > desc.arity() {
3172 sql_bail!(
3173 "statement {}: INSERT has more expressions than target columns",
3174 idx
3175 );
3176 }
3177 if desc_query.arity() < desc.arity() {
3178 sql_bail!(
3179 "statement {}: INSERT has more target columns than expressions",
3180 idx
3181 );
3182 }
3183 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3186 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3187 let expr = expr.map_err(|e| {
3188 sql_err!(
3189 "statement {}: column {} is of type {} but expression is of type {}",
3190 idx,
3191 desc.get_name(e.column).quoted(),
3192 qcx.humanize_scalar_type(&e.target_type, false),
3193 qcx.humanize_scalar_type(&e.source_type, false),
3194 )
3195 })?;
3196
3197 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3200 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3201 if updated {
3202 let new_types = zip_types().map(|(ct, q)| {
3203 let mut ct = ct.clone();
3204 if q.nullable {
3205 ct.nullable = true;
3206 }
3207 ct
3208 });
3209 *desc = RelationDesc::from_names_and_types(
3210 desc.iter_names().cloned().zip_eq(new_types),
3211 );
3212 }
3213
3214 expr
3215 }
3216 };
3217 match stmt {
3218 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3219 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3220 }
3221 }
3222 let expr = exprs
3225 .into_iter()
3226 .reduce(|acc, expr| acc.union(expr))
3227 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3228 let dependencies = expr
3229 .depends_on()
3230 .into_iter()
3231 .map(|gid| scx.catalog.resolve_item_id(&gid))
3232 .collect();
3233
3234 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3235 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3236 if let Some(dup) = column_names.iter().duplicates().next() {
3237 sql_bail!("column {} specified more than once", dup.quoted());
3238 }
3239
3240 let name = match &ct_name {
3242 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3243 ResolvedItemName::ContinualTask { name, .. } => {
3244 let name = scx.allocate_qualified_name(name.clone())?;
3245 let full_name = scx.catalog.resolve_full_name(&name);
3246 let partial_name = PartialItemName::from(full_name.clone());
3247 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3250 return Err(PlanError::ItemAlreadyExists {
3251 name: full_name.to_string(),
3252 item_type: item.item_type(),
3253 });
3254 }
3255 name
3256 }
3257 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3258 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3259 };
3260
3261 let as_of = stmt.as_of.map(Timestamp::from);
3262 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3263 name,
3264 placeholder_id,
3265 desc,
3266 input_id: input.global_id(),
3267 with_snapshot: snapshot.unwrap_or(true),
3268 continual_task: MaterializedView {
3269 create_sql,
3270 expr,
3271 dependencies,
3272 column_names,
3273 replacement_target: None,
3274 cluster_id,
3275 non_null_assertions: Vec::new(),
3276 compaction_window: None,
3277 refresh_schedule: None,
3278 as_of,
3279 },
3280 }))
3281}
3282
3283fn continual_task_query<'a>(
3284 ct_name: &ResolvedItemName,
3285 stmt: &'a ast::ContinualTaskStmt<Aug>,
3286) -> Option<ast::Query<Aug>> {
3287 match stmt {
3288 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3289 table_name: _,
3290 columns,
3291 source,
3292 returning,
3293 }) => {
3294 if !columns.is_empty() || !returning.is_empty() {
3295 return None;
3296 }
3297 match source {
3298 ast::InsertSource::Query(query) => Some(query.clone()),
3299 ast::InsertSource::DefaultValues => None,
3300 }
3301 }
3302 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3303 table_name: _,
3304 alias,
3305 using,
3306 selection,
3307 }) => {
3308 if !using.is_empty() {
3309 return None;
3310 }
3311 let from = ast::TableWithJoins {
3313 relation: ast::TableFactor::Table {
3314 name: ct_name.clone(),
3315 alias: alias.clone(),
3316 },
3317 joins: Vec::new(),
3318 };
3319 let select = ast::Select {
3320 from: vec![from],
3321 selection: selection.clone(),
3322 distinct: None,
3323 projection: vec![ast::SelectItem::Wildcard],
3324 group_by: Vec::new(),
3325 having: None,
3326 qualify: None,
3327 options: Vec::new(),
3328 };
3329 let query = ast::Query {
3330 ctes: ast::CteBlock::Simple(Vec::new()),
3331 body: ast::SetExpr::Select(Box::new(select)),
3332 order_by: Vec::new(),
3333 limit: None,
3334 offset: None,
3335 };
3336 Some(query)
3338 }
3339 }
3340}
3341
3342generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3343
3344pub fn describe_create_sink(
3345 _: &StatementContext,
3346 _: CreateSinkStatement<Aug>,
3347) -> Result<StatementDesc, PlanError> {
3348 Ok(StatementDesc::new(None))
3349}
3350
3351generate_extracted_config!(
3352 CreateSinkOption,
3353 (Snapshot, bool),
3354 (PartitionStrategy, String),
3355 (Version, u64),
3356 (CommitInterval, Duration)
3357);
3358
3359pub fn plan_create_sink(
3360 scx: &StatementContext,
3361 stmt: CreateSinkStatement<Aug>,
3362) -> Result<Plan, PlanError> {
3363 let Some(name) = stmt.name.clone() else {
3365 return Err(PlanError::MissingName(CatalogItemType::Sink));
3366 };
3367 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3368 let full_name = scx.catalog.resolve_full_name(&name);
3369 let partial_name = PartialItemName::from(full_name.clone());
3370 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3371 return Err(PlanError::ItemAlreadyExists {
3372 name: full_name.to_string(),
3373 item_type: item.item_type(),
3374 });
3375 }
3376
3377 plan_sink(scx, stmt)
3378}
3379
3380fn plan_sink(
3385 scx: &StatementContext,
3386 mut stmt: CreateSinkStatement<Aug>,
3387) -> Result<Plan, PlanError> {
3388 let CreateSinkStatement {
3389 name,
3390 in_cluster: _,
3391 from,
3392 connection,
3393 format,
3394 envelope,
3395 if_not_exists,
3396 with_options,
3397 } = stmt.clone();
3398
3399 let Some(name) = name else {
3400 return Err(PlanError::MissingName(CatalogItemType::Sink));
3401 };
3402 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3403
3404 let envelope = match envelope {
3405 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3406 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3407 None => sql_bail!("ENVELOPE clause is required"),
3408 };
3409
3410 let from_name = &from;
3411 let from = scx.get_item_by_resolved_name(&from)?;
3412
3413 {
3414 use CatalogItemType::*;
3415 match from.item_type() {
3416 Table | Source | MaterializedView | ContinualTask => {}
3417 Sink | View | Index | Type | Func | Secret | Connection => {
3418 let name = scx.catalog.minimal_qualification(from.name());
3419 return Err(PlanError::InvalidSinkFrom {
3420 name: name.to_string(),
3421 item_type: from.item_type(),
3422 });
3423 }
3424 }
3425 }
3426
3427 if from.id().is_system() {
3428 bail_unsupported!("creating a sink directly on a catalog object");
3429 }
3430
3431 let desc = from.relation_desc().expect("item type checked above");
3432 let key_indices = match &connection {
3433 CreateSinkConnection::Kafka { key: Some(key), .. }
3434 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3435 let key_columns = key
3436 .key_columns
3437 .clone()
3438 .into_iter()
3439 .map(normalize::column_name)
3440 .collect::<Vec<_>>();
3441 let mut uniq = BTreeSet::new();
3442 for col in key_columns.iter() {
3443 if !uniq.insert(col) {
3444 sql_bail!("duplicate column referenced in KEY: {}", col);
3445 }
3446 }
3447 let indices = key_columns
3448 .iter()
3449 .map(|col| -> anyhow::Result<usize> {
3450 let name_idx =
3451 desc.get_by_name(col)
3452 .map(|(idx, _type)| idx)
3453 .ok_or_else(|| {
3454 sql_err!("column referenced in KEY does not exist: {}", col)
3455 })?;
3456 if desc.get_unambiguous_name(name_idx).is_none() {
3457 sql_err!("column referenced in KEY is ambiguous: {}", col);
3458 }
3459 Ok(name_idx)
3460 })
3461 .collect::<Result<Vec<_>, _>>()?;
3462 let is_valid_key = desc
3463 .typ()
3464 .keys
3465 .iter()
3466 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3467
3468 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3469 if key.not_enforced {
3470 scx.catalog
3471 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3472 key: key_columns.clone(),
3473 name: name.item.clone(),
3474 })
3475 } else {
3476 return Err(PlanError::UpsertSinkWithInvalidKey {
3477 name: from_name.full_name_str(),
3478 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3479 valid_keys: desc
3480 .typ()
3481 .keys
3482 .iter()
3483 .map(|key| {
3484 key.iter()
3485 .map(|col| desc.get_name(*col).as_str().into())
3486 .collect()
3487 })
3488 .collect(),
3489 });
3490 }
3491 }
3492 Some(indices)
3493 }
3494 CreateSinkConnection::Kafka { key: None, .. }
3495 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3496 };
3497
3498 let headers_index = match &connection {
3499 CreateSinkConnection::Kafka {
3500 headers: Some(headers),
3501 ..
3502 } => {
3503 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3504
3505 match envelope {
3506 SinkEnvelope::Upsert => (),
3507 SinkEnvelope::Debezium => {
3508 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3509 }
3510 };
3511
3512 let headers = normalize::column_name(headers.clone());
3513 let (idx, ty) = desc
3514 .get_by_name(&headers)
3515 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3516
3517 if desc.get_unambiguous_name(idx).is_none() {
3518 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3519 }
3520
3521 match &ty.scalar_type {
3522 SqlScalarType::Map { value_type, .. }
3523 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3524 _ => sql_bail!(
3525 "HEADERS column must have type map[text => text] or map[text => bytea]"
3526 ),
3527 }
3528
3529 Some(idx)
3530 }
3531 _ => None,
3532 };
3533
3534 let relation_key_indices = desc.typ().keys.get(0).cloned();
3536
3537 let key_desc_and_indices = key_indices.map(|key_indices| {
3538 let cols = desc
3539 .iter()
3540 .map(|(name, ty)| (name.clone(), ty.clone()))
3541 .collect::<Vec<_>>();
3542 let (names, types): (Vec<_>, Vec<_>) =
3543 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3544 let typ = SqlRelationType::new(types);
3545 (RelationDesc::new(typ, names), key_indices)
3546 });
3547
3548 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3549 return Err(PlanError::UpsertSinkWithoutKey);
3550 }
3551
3552 let CreateSinkOptionExtracted {
3553 snapshot,
3554 version,
3555 partition_strategy: _,
3556 seen: _,
3557 commit_interval,
3558 } = with_options.try_into()?;
3559
3560 let connection_builder = match connection {
3561 CreateSinkConnection::Kafka {
3562 connection,
3563 options,
3564 ..
3565 } => kafka_sink_builder(
3566 scx,
3567 connection,
3568 options,
3569 format,
3570 relation_key_indices,
3571 key_desc_and_indices,
3572 headers_index,
3573 desc.into_owned(),
3574 envelope,
3575 from.id(),
3576 commit_interval,
3577 )?,
3578 CreateSinkConnection::Iceberg {
3579 connection,
3580 aws_connection,
3581 options,
3582 ..
3583 } => iceberg_sink_builder(
3584 scx,
3585 connection,
3586 aws_connection,
3587 options,
3588 relation_key_indices,
3589 key_desc_and_indices,
3590 commit_interval,
3591 )?,
3592 };
3593
3594 let with_snapshot = snapshot.unwrap_or(true);
3596 let version = version.unwrap_or(0);
3598
3599 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3603 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3604
3605 Ok(Plan::CreateSink(CreateSinkPlan {
3606 name,
3607 sink: Sink {
3608 create_sql,
3609 from: from.global_id(),
3610 connection: connection_builder,
3611 envelope,
3612 version,
3613 commit_interval,
3614 },
3615 with_snapshot,
3616 if_not_exists,
3617 in_cluster: in_cluster.id(),
3618 }))
3619}
3620
3621fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3622 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3623
3624 let existing_keys = desc
3625 .typ()
3626 .keys
3627 .iter()
3628 .map(|key_columns| {
3629 key_columns
3630 .iter()
3631 .map(|col| desc.get_name(*col).as_str())
3632 .join(", ")
3633 })
3634 .join(", ");
3635
3636 sql_err!(
3637 "Key constraint ({}) conflicts with existing key ({})",
3638 user_keys,
3639 existing_keys
3640 )
3641}
3642
3643#[derive(Debug, Default, PartialEq, Clone)]
3646pub struct CsrConfigOptionExtracted {
3647 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3648 pub(crate) avro_key_fullname: Option<String>,
3649 pub(crate) avro_value_fullname: Option<String>,
3650 pub(crate) null_defaults: bool,
3651 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3652 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3653 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3654 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3655}
3656
3657impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3658 type Error = crate::plan::PlanError;
3659 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3660 let mut extracted = CsrConfigOptionExtracted::default();
3661 let mut common_doc_comments = BTreeMap::new();
3662 for option in v {
3663 if !extracted.seen.insert(option.name.clone()) {
3664 return Err(PlanError::Unstructured({
3665 format!("{} specified more than once", option.name)
3666 }));
3667 }
3668 let option_name = option.name.clone();
3669 let option_name_str = option_name.to_ast_string_simple();
3670 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3671 option_name: option_name.to_ast_string_simple(),
3672 err: e.into(),
3673 };
3674 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3675 val.map(|s| match s {
3676 WithOptionValue::Value(Value::String(s)) => {
3677 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3678 }
3679 _ => Err("must be a string".to_string()),
3680 })
3681 .transpose()
3682 .map_err(PlanError::Unstructured)
3683 .map_err(better_error)
3684 };
3685 match option.name {
3686 CsrConfigOptionName::AvroKeyFullname => {
3687 extracted.avro_key_fullname =
3688 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3689 }
3690 CsrConfigOptionName::AvroValueFullname => {
3691 extracted.avro_value_fullname =
3692 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3693 }
3694 CsrConfigOptionName::NullDefaults => {
3695 extracted.null_defaults =
3696 <bool>::try_from_value(option.value).map_err(better_error)?;
3697 }
3698 CsrConfigOptionName::AvroDocOn(doc_on) => {
3699 let value = String::try_from_value(option.value.ok_or_else(|| {
3700 PlanError::InvalidOptionValue {
3701 option_name: option_name_str,
3702 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3703 }
3704 })?)
3705 .map_err(better_error)?;
3706 let key = match doc_on.identifier {
3707 DocOnIdentifier::Column(ast::ColumnName {
3708 relation: ResolvedItemName::Item { id, .. },
3709 column: ResolvedColumnReference::Column { name, index: _ },
3710 }) => DocTarget::Field {
3711 object_id: id,
3712 column_name: name,
3713 },
3714 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3715 DocTarget::Type(id)
3716 }
3717 _ => unreachable!(),
3718 };
3719
3720 match doc_on.for_schema {
3721 DocOnSchema::KeyOnly => {
3722 extracted.key_doc_options.insert(key, value);
3723 }
3724 DocOnSchema::ValueOnly => {
3725 extracted.value_doc_options.insert(key, value);
3726 }
3727 DocOnSchema::All => {
3728 common_doc_comments.insert(key, value);
3729 }
3730 }
3731 }
3732 CsrConfigOptionName::KeyCompatibilityLevel => {
3733 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3734 }
3735 CsrConfigOptionName::ValueCompatibilityLevel => {
3736 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3737 }
3738 }
3739 }
3740
3741 for (key, value) in common_doc_comments {
3742 if !extracted.key_doc_options.contains_key(&key) {
3743 extracted.key_doc_options.insert(key.clone(), value.clone());
3744 }
3745 if !extracted.value_doc_options.contains_key(&key) {
3746 extracted.value_doc_options.insert(key, value);
3747 }
3748 }
3749 Ok(extracted)
3750 }
3751}
3752
3753fn iceberg_sink_builder(
3754 scx: &StatementContext,
3755 catalog_connection: ResolvedItemName,
3756 aws_connection: ResolvedItemName,
3757 options: Vec<IcebergSinkConfigOption<Aug>>,
3758 relation_key_indices: Option<Vec<usize>>,
3759 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3760 commit_interval: Option<Duration>,
3761) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3762 scx.require_feature_flag(&vars::ENABLE_ICEBERG_SINK)?;
3763 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3764 let catalog_connection_id = catalog_connection_item.id();
3765 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3766 let aws_connection_id = aws_connection_item.id();
3767 if !matches!(
3768 catalog_connection_item.connection()?,
3769 Connection::IcebergCatalog(_)
3770 ) {
3771 sql_bail!(
3772 "{} is not an iceberg catalog connection",
3773 scx.catalog
3774 .resolve_full_name(catalog_connection_item.name())
3775 .to_string()
3776 .quoted()
3777 );
3778 };
3779
3780 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3781 sql_bail!(
3782 "{} is not an AWS connection",
3783 scx.catalog
3784 .resolve_full_name(aws_connection_item.name())
3785 .to_string()
3786 .quoted()
3787 );
3788 }
3789
3790 let IcebergSinkConfigOptionExtracted {
3791 table,
3792 namespace,
3793 seen: _,
3794 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3795
3796 let Some(table) = table else {
3797 sql_bail!("Iceberg sink must specify TABLE");
3798 };
3799 let Some(namespace) = namespace else {
3800 sql_bail!("Iceberg sink must specify NAMESPACE");
3801 };
3802 if commit_interval.is_none() {
3803 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3804 }
3805
3806 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3807 catalog_connection_id,
3808 catalog_connection: catalog_connection_id,
3809 aws_connection_id,
3810 aws_connection: aws_connection_id,
3811 table,
3812 namespace,
3813 relation_key_indices,
3814 key_desc_and_indices,
3815 }))
3816}
3817
3818fn kafka_sink_builder(
3819 scx: &StatementContext,
3820 connection: ResolvedItemName,
3821 options: Vec<KafkaSinkConfigOption<Aug>>,
3822 format: Option<FormatSpecifier<Aug>>,
3823 relation_key_indices: Option<Vec<usize>>,
3824 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3825 headers_index: Option<usize>,
3826 value_desc: RelationDesc,
3827 envelope: SinkEnvelope,
3828 sink_from: CatalogItemId,
3829 commit_interval: Option<Duration>,
3830) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3831 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3833 let connection_id = connection_item.id();
3834 match connection_item.connection()? {
3835 Connection::Kafka(_) => (),
3836 _ => sql_bail!(
3837 "{} is not a kafka connection",
3838 scx.catalog.resolve_full_name(connection_item.name())
3839 ),
3840 };
3841
3842 if commit_interval.is_some() {
3843 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3844 }
3845
3846 let KafkaSinkConfigOptionExtracted {
3847 topic,
3848 compression_type,
3849 partition_by,
3850 progress_group_id_prefix,
3851 transactional_id_prefix,
3852 legacy_ids,
3853 topic_config,
3854 topic_metadata_refresh_interval,
3855 topic_partition_count,
3856 topic_replication_factor,
3857 seen: _,
3858 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3859
3860 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3861 (Some(_), Some(true)) => {
3862 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3863 }
3864 (None, Some(true)) => KafkaIdStyle::Legacy,
3865 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3866 };
3867
3868 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3869 (Some(_), Some(true)) => {
3870 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3871 }
3872 (None, Some(true)) => KafkaIdStyle::Legacy,
3873 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3874 };
3875
3876 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3877
3878 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3879 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3882 }
3883
3884 let assert_positive = |val: Option<i32>, name: &str| {
3885 if let Some(val) = val {
3886 if val <= 0 {
3887 sql_bail!("{} must be a positive integer", name);
3888 }
3889 }
3890 val.map(NonNeg::try_from)
3891 .transpose()
3892 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3893 };
3894 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3895 let topic_replication_factor =
3896 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3897
3898 let gen_avro_schema_options = |conn| {
3901 let CsrConnectionAvro {
3902 connection:
3903 CsrConnection {
3904 connection,
3905 options,
3906 },
3907 seed,
3908 key_strategy,
3909 value_strategy,
3910 } = conn;
3911 if seed.is_some() {
3912 sql_bail!("SEED option does not make sense with sinks");
3913 }
3914 if key_strategy.is_some() {
3915 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3916 }
3917 if value_strategy.is_some() {
3918 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3919 }
3920
3921 let item = scx.get_item_by_resolved_name(&connection)?;
3922 let csr_connection = match item.connection()? {
3923 Connection::Csr(_) => item.id(),
3924 _ => {
3925 sql_bail!(
3926 "{} is not a schema registry connection",
3927 scx.catalog
3928 .resolve_full_name(item.name())
3929 .to_string()
3930 .quoted()
3931 )
3932 }
3933 };
3934 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3935
3936 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3937 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3938 }
3939
3940 if key_desc_and_indices.is_some()
3941 && (extracted_options.avro_key_fullname.is_some()
3942 ^ extracted_options.avro_value_fullname.is_some())
3943 {
3944 sql_bail!(
3945 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3946 );
3947 }
3948
3949 Ok((csr_connection, extracted_options))
3950 };
3951
3952 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3953 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3954 Format::Bytes if desc.arity() == 1 => {
3955 let col_type = &desc.typ().column_types[0].scalar_type;
3956 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3957 bail_unsupported!(format!(
3958 "BYTES format with non-encodable type: {:?}",
3959 col_type
3960 ));
3961 }
3962
3963 Ok(KafkaSinkFormatType::Bytes)
3964 }
3965 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3966 Format::Bytes | Format::Text => {
3967 bail_unsupported!("BYTES or TEXT format with multiple columns")
3968 }
3969 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3970 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3971 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3972 let schema = if is_key {
3973 AvroSchemaGenerator::new(
3974 desc.clone(),
3975 false,
3976 options.key_doc_options,
3977 options.avro_key_fullname.as_deref().unwrap_or("row"),
3978 options.null_defaults,
3979 Some(sink_from),
3980 false,
3981 )?
3982 .schema()
3983 .to_string()
3984 } else {
3985 AvroSchemaGenerator::new(
3986 desc.clone(),
3987 matches!(envelope, SinkEnvelope::Debezium),
3988 options.value_doc_options,
3989 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3990 options.null_defaults,
3991 Some(sink_from),
3992 true,
3993 )?
3994 .schema()
3995 .to_string()
3996 };
3997 Ok(KafkaSinkFormatType::Avro {
3998 schema,
3999 compatibility_level: if is_key {
4000 options.key_compatibility_level
4001 } else {
4002 options.value_compatibility_level
4003 },
4004 csr_connection,
4005 })
4006 }
4007 format => bail_unsupported!(format!("sink format {:?}", format)),
4008 };
4009
4010 let partition_by = match &partition_by {
4011 Some(partition_by) => {
4012 let mut scope = Scope::from_source(None, value_desc.iter_names());
4013
4014 match envelope {
4015 SinkEnvelope::Upsert => (),
4016 SinkEnvelope::Debezium => {
4017 let key_indices: HashSet<_> = key_desc_and_indices
4018 .as_ref()
4019 .map(|(_desc, indices)| indices.as_slice())
4020 .unwrap_or_default()
4021 .into_iter()
4022 .collect();
4023 for (i, item) in scope.items.iter_mut().enumerate() {
4024 if !key_indices.contains(&i) {
4025 item.error_if_referenced = Some(|_table, column| {
4026 PlanError::InvalidPartitionByEnvelopeDebezium {
4027 column_name: column.to_string(),
4028 }
4029 });
4030 }
4031 }
4032 }
4033 };
4034
4035 let ecx = &ExprContext {
4036 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
4037 name: "PARTITION BY",
4038 scope: &scope,
4039 relation_type: value_desc.typ(),
4040 allow_aggregates: false,
4041 allow_subqueries: false,
4042 allow_parameters: false,
4043 allow_windows: false,
4044 };
4045 let expr = plan_expr(ecx, partition_by)?.cast_to(
4046 ecx,
4047 CastContext::Assignment,
4048 &SqlScalarType::UInt64,
4049 )?;
4050 let expr = expr.lower_uncorrelated()?;
4051
4052 Some(expr)
4053 }
4054 _ => None,
4055 };
4056
4057 let format = match format {
4059 Some(FormatSpecifier::KeyValue { key, value }) => {
4060 let key_format = match key_desc_and_indices.as_ref() {
4061 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
4062 None => None,
4063 };
4064 KafkaSinkFormat {
4065 value_format: map_format(value, &value_desc, false)?,
4066 key_format,
4067 }
4068 }
4069 Some(FormatSpecifier::Bare(format)) => {
4070 let key_format = match key_desc_and_indices.as_ref() {
4071 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
4072 None => None,
4073 };
4074 KafkaSinkFormat {
4075 value_format: map_format(format, &value_desc, false)?,
4076 key_format,
4077 }
4078 }
4079 None => bail_unsupported!("sink without format"),
4080 };
4081
4082 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
4083 connection_id,
4084 connection: connection_id,
4085 format,
4086 topic: topic_name,
4087 relation_key_indices,
4088 key_desc_and_indices,
4089 headers_index,
4090 value_desc,
4091 partition_by,
4092 compression_type,
4093 progress_group_id,
4094 transactional_id,
4095 topic_options: KafkaTopicOptions {
4096 partition_count: topic_partition_count,
4097 replication_factor: topic_replication_factor,
4098 topic_config: topic_config.unwrap_or_default(),
4099 },
4100 topic_metadata_refresh_interval,
4101 }))
4102}
4103
4104pub fn describe_create_index(
4105 _: &StatementContext,
4106 _: CreateIndexStatement<Aug>,
4107) -> Result<StatementDesc, PlanError> {
4108 Ok(StatementDesc::new(None))
4109}
4110
4111pub fn plan_create_index(
4112 scx: &StatementContext,
4113 mut stmt: CreateIndexStatement<Aug>,
4114) -> Result<Plan, PlanError> {
4115 let CreateIndexStatement {
4116 name,
4117 on_name,
4118 in_cluster,
4119 key_parts,
4120 with_options,
4121 if_not_exists,
4122 } = &mut stmt;
4123 let on = scx.get_item_by_resolved_name(on_name)?;
4124 let on_item_type = on.item_type();
4125
4126 if !matches!(
4127 on_item_type,
4128 CatalogItemType::View
4129 | CatalogItemType::MaterializedView
4130 | CatalogItemType::Source
4131 | CatalogItemType::Table
4132 ) {
4133 sql_bail!(
4134 "index cannot be created on {} because it is a {}",
4135 on_name.full_name_str(),
4136 on.item_type()
4137 )
4138 }
4139
4140 let on_desc = on.relation_desc().expect("item type checked above");
4141
4142 let filled_key_parts = match key_parts {
4143 Some(kp) => kp.to_vec(),
4144 None => {
4145 let key = on_desc.typ().default_key();
4147 key.iter()
4148 .map(|i| match on_desc.get_unambiguous_name(*i) {
4149 Some(n) => Expr::Identifier(vec![n.clone().into()]),
4150 _ => Expr::Value(Value::Number((i + 1).to_string())),
4151 })
4152 .collect()
4153 }
4154 };
4155 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4156
4157 let index_name = if let Some(name) = name {
4158 QualifiedItemName {
4159 qualifiers: on.name().qualifiers.clone(),
4160 item: normalize::ident(name.clone()),
4161 }
4162 } else {
4163 let mut idx_name = QualifiedItemName {
4164 qualifiers: on.name().qualifiers.clone(),
4165 item: on.name().item.clone(),
4166 };
4167 if key_parts.is_none() {
4168 idx_name.item += "_primary_idx";
4170 } else {
4171 let index_name_col_suffix = keys
4174 .iter()
4175 .map(|k| match k {
4176 mz_expr::MirScalarExpr::Column(i, name) => {
4177 match (on_desc.get_unambiguous_name(*i), &name.0) {
4178 (Some(col_name), _) => col_name.to_string(),
4179 (None, Some(name)) => name.to_string(),
4180 (None, None) => format!("{}", i + 1),
4181 }
4182 }
4183 _ => "expr".to_string(),
4184 })
4185 .join("_");
4186 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4187 .expect("write on strings cannot fail");
4188 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4189 }
4190
4191 if !*if_not_exists {
4192 scx.catalog.find_available_name(idx_name)
4193 } else {
4194 idx_name
4195 }
4196 };
4197
4198 let full_name = scx.catalog.resolve_full_name(&index_name);
4200 let partial_name = PartialItemName::from(full_name.clone());
4201 if let (Ok(item), false, false) = (
4209 scx.catalog.resolve_item_or_type(&partial_name),
4210 *if_not_exists,
4211 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4212 ) {
4213 return Err(PlanError::ItemAlreadyExists {
4214 name: full_name.to_string(),
4215 item_type: item.item_type(),
4216 });
4217 }
4218
4219 let options = plan_index_options(scx, with_options.clone())?;
4220 let cluster_id = match in_cluster {
4221 None => scx.resolve_cluster(None)?.id(),
4222 Some(in_cluster) => in_cluster.id,
4223 };
4224
4225 *in_cluster = Some(ResolvedClusterName {
4226 id: cluster_id,
4227 print_name: None,
4228 });
4229
4230 *name = Some(Ident::new(index_name.item.clone())?);
4232 *key_parts = Some(filled_key_parts);
4233 let if_not_exists = *if_not_exists;
4234
4235 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4236 let compaction_window = options.iter().find_map(|o| {
4237 #[allow(irrefutable_let_patterns)]
4238 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4239 Some(lcw.clone())
4240 } else {
4241 None
4242 }
4243 });
4244
4245 Ok(Plan::CreateIndex(CreateIndexPlan {
4246 name: index_name,
4247 index: Index {
4248 create_sql,
4249 on: on.global_id(),
4250 keys,
4251 cluster_id,
4252 compaction_window,
4253 },
4254 if_not_exists,
4255 }))
4256}
4257
4258pub fn describe_create_type(
4259 _: &StatementContext,
4260 _: CreateTypeStatement<Aug>,
4261) -> Result<StatementDesc, PlanError> {
4262 Ok(StatementDesc::new(None))
4263}
4264
4265pub fn plan_create_type(
4266 scx: &StatementContext,
4267 stmt: CreateTypeStatement<Aug>,
4268) -> Result<Plan, PlanError> {
4269 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4270 let CreateTypeStatement { name, as_type, .. } = stmt;
4271
4272 fn validate_data_type(
4273 scx: &StatementContext,
4274 data_type: ResolvedDataType,
4275 as_type: &str,
4276 key: &str,
4277 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4278 let (id, modifiers) = match data_type {
4279 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4280 _ => sql_bail!(
4281 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4282 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4283 as_type,
4284 key,
4285 data_type.human_readable_name(),
4286 ),
4287 };
4288
4289 let item = scx.catalog.get_item(&id);
4290 match item.type_details() {
4291 None => sql_bail!(
4292 "{} must be of class type, but received {} which is of class {}",
4293 key,
4294 scx.catalog.resolve_full_name(item.name()),
4295 item.item_type()
4296 ),
4297 Some(CatalogTypeDetails {
4298 typ: CatalogType::Char,
4299 ..
4300 }) => {
4301 bail_unsupported!("embedding char type in a list or map")
4302 }
4303 _ => {
4304 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4306
4307 Ok((id, modifiers))
4308 }
4309 }
4310 }
4311
4312 let inner = match as_type {
4313 CreateTypeAs::List { options } => {
4314 let CreateTypeListOptionExtracted {
4315 element_type,
4316 seen: _,
4317 } = CreateTypeListOptionExtracted::try_from(options)?;
4318 let element_type =
4319 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4320 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4321 CatalogType::List {
4322 element_reference: id,
4323 element_modifiers: modifiers,
4324 }
4325 }
4326 CreateTypeAs::Map { options } => {
4327 let CreateTypeMapOptionExtracted {
4328 key_type,
4329 value_type,
4330 seen: _,
4331 } = CreateTypeMapOptionExtracted::try_from(options)?;
4332 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4333 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4334 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4335 let (value_id, value_modifiers) =
4336 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4337 CatalogType::Map {
4338 key_reference: key_id,
4339 key_modifiers,
4340 value_reference: value_id,
4341 value_modifiers,
4342 }
4343 }
4344 CreateTypeAs::Record { column_defs } => {
4345 let mut fields = vec![];
4346 for column_def in column_defs {
4347 let data_type = column_def.data_type;
4348 let key = ident(column_def.name.clone());
4349 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4350 fields.push(CatalogRecordField {
4351 name: ColumnName::from(key.clone()),
4352 type_reference: id,
4353 type_modifiers: modifiers,
4354 });
4355 }
4356 CatalogType::Record { fields }
4357 }
4358 };
4359
4360 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4361
4362 let full_name = scx.catalog.resolve_full_name(&name);
4364 let partial_name = PartialItemName::from(full_name.clone());
4365 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4368 if item.item_type().conflicts_with_type() {
4369 return Err(PlanError::ItemAlreadyExists {
4370 name: full_name.to_string(),
4371 item_type: item.item_type(),
4372 });
4373 }
4374 }
4375
4376 Ok(Plan::CreateType(CreateTypePlan {
4377 name,
4378 typ: Type { create_sql, inner },
4379 }))
4380}
4381
4382generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4383
4384generate_extracted_config!(
4385 CreateTypeMapOption,
4386 (KeyType, ResolvedDataType),
4387 (ValueType, ResolvedDataType)
4388);
4389
4390#[derive(Debug)]
4391pub enum PlannedAlterRoleOption {
4392 Attributes(PlannedRoleAttributes),
4393 Variable(PlannedRoleVariable),
4394}
4395
4396#[derive(Debug, Clone)]
4397pub struct PlannedRoleAttributes {
4398 pub inherit: Option<bool>,
4399 pub password: Option<Password>,
4400 pub scram_iterations: Option<NonZeroU32>,
4401 pub nopassword: Option<bool>,
4405 pub superuser: Option<bool>,
4406 pub login: Option<bool>,
4407}
4408
4409fn plan_role_attributes(
4410 options: Vec<RoleAttribute>,
4411 scx: &StatementContext,
4412) -> Result<PlannedRoleAttributes, PlanError> {
4413 let mut planned_attributes = PlannedRoleAttributes {
4414 inherit: None,
4415 password: None,
4416 scram_iterations: None,
4417 superuser: None,
4418 login: None,
4419 nopassword: None,
4420 };
4421
4422 for option in options {
4423 match option {
4424 RoleAttribute::Inherit | RoleAttribute::NoInherit
4425 if planned_attributes.inherit.is_some() =>
4426 {
4427 sql_bail!("conflicting or redundant options");
4428 }
4429 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4430 bail_never_supported!(
4431 "CREATECLUSTER attribute",
4432 "sql/create-role/#details",
4433 "Use system privileges instead."
4434 );
4435 }
4436 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4437 bail_never_supported!(
4438 "CREATEDB attribute",
4439 "sql/create-role/#details",
4440 "Use system privileges instead."
4441 );
4442 }
4443 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4444 bail_never_supported!(
4445 "CREATEROLE attribute",
4446 "sql/create-role/#details",
4447 "Use system privileges instead."
4448 );
4449 }
4450 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4451 sql_bail!("conflicting or redundant options");
4452 }
4453
4454 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4455 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4456 RoleAttribute::Password(password) => {
4457 if let Some(password) = password {
4458 planned_attributes.password = Some(password.into());
4459 planned_attributes.scram_iterations =
4460 Some(scx.catalog.system_vars().scram_iterations())
4461 } else {
4462 planned_attributes.nopassword = Some(true);
4463 }
4464 }
4465 RoleAttribute::SuperUser => {
4466 if planned_attributes.superuser == Some(false) {
4467 sql_bail!("conflicting or redundant options");
4468 }
4469 planned_attributes.superuser = Some(true);
4470 }
4471 RoleAttribute::NoSuperUser => {
4472 if planned_attributes.superuser == Some(true) {
4473 sql_bail!("conflicting or redundant options");
4474 }
4475 planned_attributes.superuser = Some(false);
4476 }
4477 RoleAttribute::Login => {
4478 if planned_attributes.login == Some(false) {
4479 sql_bail!("conflicting or redundant options");
4480 }
4481 planned_attributes.login = Some(true);
4482 }
4483 RoleAttribute::NoLogin => {
4484 if planned_attributes.login == Some(true) {
4485 sql_bail!("conflicting or redundant options");
4486 }
4487 planned_attributes.login = Some(false);
4488 }
4489 }
4490 }
4491 if planned_attributes.inherit == Some(false) {
4492 bail_unsupported!("non inherit roles");
4493 }
4494
4495 Ok(planned_attributes)
4496}
4497
4498#[derive(Debug)]
4499pub enum PlannedRoleVariable {
4500 Set { name: String, value: VariableValue },
4501 Reset { name: String },
4502}
4503
4504impl PlannedRoleVariable {
4505 pub fn name(&self) -> &str {
4506 match self {
4507 PlannedRoleVariable::Set { name, .. } => name,
4508 PlannedRoleVariable::Reset { name } => name,
4509 }
4510 }
4511}
4512
4513fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4514 let plan = match variable {
4515 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4516 name: name.to_string(),
4517 value: scl::plan_set_variable_to(value)?,
4518 },
4519 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4520 name: name.to_string(),
4521 },
4522 };
4523 Ok(plan)
4524}
4525
4526pub fn describe_create_role(
4527 _: &StatementContext,
4528 _: CreateRoleStatement,
4529) -> Result<StatementDesc, PlanError> {
4530 Ok(StatementDesc::new(None))
4531}
4532
4533pub fn plan_create_role(
4534 scx: &StatementContext,
4535 CreateRoleStatement { name, options }: CreateRoleStatement,
4536) -> Result<Plan, PlanError> {
4537 let attributes = plan_role_attributes(options, scx)?;
4538 Ok(Plan::CreateRole(CreateRolePlan {
4539 name: normalize::ident(name),
4540 attributes: attributes.into(),
4541 }))
4542}
4543
4544pub fn plan_create_network_policy(
4545 ctx: &StatementContext,
4546 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4547) -> Result<Plan, PlanError> {
4548 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4549 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4550
4551 let Some(rule_defs) = policy_options.rules else {
4552 sql_bail!("RULES must be specified when creating network policies.");
4553 };
4554
4555 let mut rules = vec![];
4556 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4557 let NetworkPolicyRuleOptionExtracted {
4558 seen: _,
4559 direction,
4560 action,
4561 address,
4562 } = options.try_into()?;
4563 let (direction, action, address) = match (direction, action, address) {
4564 (Some(direction), Some(action), Some(address)) => (
4565 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4566 NetworkPolicyRuleAction::try_from(action.as_str())?,
4567 PolicyAddress::try_from(address.as_str())?,
4568 ),
4569 (_, _, _) => {
4570 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4571 }
4572 };
4573 rules.push(NetworkPolicyRule {
4574 name: normalize::ident(name),
4575 direction,
4576 action,
4577 address,
4578 });
4579 }
4580
4581 if rules.len()
4582 > ctx
4583 .catalog
4584 .system_vars()
4585 .max_rules_per_network_policy()
4586 .try_into()?
4587 {
4588 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4589 }
4590
4591 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4592 name: normalize::ident(name),
4593 rules,
4594 }))
4595}
4596
4597pub fn plan_alter_network_policy(
4598 ctx: &StatementContext,
4599 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4600) -> Result<Plan, PlanError> {
4601 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4602
4603 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4604 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4605
4606 let Some(rule_defs) = policy_options.rules else {
4607 sql_bail!("RULES must be specified when creating network policies.");
4608 };
4609
4610 let mut rules = vec![];
4611 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4612 let NetworkPolicyRuleOptionExtracted {
4613 seen: _,
4614 direction,
4615 action,
4616 address,
4617 } = options.try_into()?;
4618
4619 let (direction, action, address) = match (direction, action, address) {
4620 (Some(direction), Some(action), Some(address)) => (
4621 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4622 NetworkPolicyRuleAction::try_from(action.as_str())?,
4623 PolicyAddress::try_from(address.as_str())?,
4624 ),
4625 (_, _, _) => {
4626 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4627 }
4628 };
4629 rules.push(NetworkPolicyRule {
4630 name: normalize::ident(name),
4631 direction,
4632 action,
4633 address,
4634 });
4635 }
4636 if rules.len()
4637 > ctx
4638 .catalog
4639 .system_vars()
4640 .max_rules_per_network_policy()
4641 .try_into()?
4642 {
4643 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4644 }
4645
4646 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4647 id: policy.id(),
4648 name: normalize::ident(name),
4649 rules,
4650 }))
4651}
4652
4653pub fn describe_create_cluster(
4654 _: &StatementContext,
4655 _: CreateClusterStatement<Aug>,
4656) -> Result<StatementDesc, PlanError> {
4657 Ok(StatementDesc::new(None))
4658}
4659
4660generate_extracted_config!(
4666 ClusterOption,
4667 (AvailabilityZones, Vec<String>),
4668 (Disk, bool),
4669 (IntrospectionDebugging, bool),
4670 (IntrospectionInterval, OptionalDuration),
4671 (Managed, bool),
4672 (Replicas, Vec<ReplicaDefinition<Aug>>),
4673 (ReplicationFactor, u32),
4674 (Size, String),
4675 (Schedule, ClusterScheduleOptionValue),
4676 (WorkloadClass, OptionalString)
4677);
4678
4679generate_extracted_config!(
4680 NetworkPolicyOption,
4681 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4682);
4683
4684generate_extracted_config!(
4685 NetworkPolicyRuleOption,
4686 (Direction, String),
4687 (Action, String),
4688 (Address, String)
4689);
4690
4691generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4692
4693generate_extracted_config!(
4694 ClusterAlterUntilReadyOption,
4695 (Timeout, Duration),
4696 (OnTimeout, String)
4697);
4698
4699generate_extracted_config!(
4700 ClusterFeature,
4701 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4702 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4703 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4704 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4705 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4706 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4707 (
4708 EnableProjectionPushdownAfterRelationCse,
4709 Option<bool>,
4710 Default(None)
4711 )
4712);
4713
4714pub fn plan_create_cluster(
4718 scx: &StatementContext,
4719 stmt: CreateClusterStatement<Aug>,
4720) -> Result<Plan, PlanError> {
4721 let plan = plan_create_cluster_inner(scx, stmt)?;
4722
4723 if let CreateClusterVariant::Managed(_) = &plan.variant {
4725 let stmt = unplan_create_cluster(scx, plan.clone())
4726 .map_err(|e| PlanError::Replan(e.to_string()))?;
4727 let create_sql = stmt.to_ast_string_stable();
4728 let stmt = parse::parse(&create_sql)
4729 .map_err(|e| PlanError::Replan(e.to_string()))?
4730 .into_element()
4731 .ast;
4732 let (stmt, _resolved_ids) =
4733 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4734 let stmt = match stmt {
4735 Statement::CreateCluster(stmt) => stmt,
4736 stmt => {
4737 return Err(PlanError::Replan(format!(
4738 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4739 )));
4740 }
4741 };
4742 let replan =
4743 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4744 if plan != replan {
4745 return Err(PlanError::Replan(format!(
4746 "replan does not match: plan={plan:?}, replan={replan:?}"
4747 )));
4748 }
4749 }
4750
4751 Ok(Plan::CreateCluster(plan))
4752}
4753
4754pub fn plan_create_cluster_inner(
4755 scx: &StatementContext,
4756 CreateClusterStatement {
4757 name,
4758 options,
4759 features,
4760 }: CreateClusterStatement<Aug>,
4761) -> Result<CreateClusterPlan, PlanError> {
4762 let ClusterOptionExtracted {
4763 availability_zones,
4764 introspection_debugging,
4765 introspection_interval,
4766 managed,
4767 replicas,
4768 replication_factor,
4769 seen: _,
4770 size,
4771 disk,
4772 schedule,
4773 workload_class,
4774 }: ClusterOptionExtracted = options.try_into()?;
4775
4776 let managed = managed.unwrap_or_else(|| replicas.is_none());
4777
4778 if !scx.catalog.active_role_id().is_system() {
4779 if !features.is_empty() {
4780 sql_bail!("FEATURES not supported for non-system users");
4781 }
4782 if workload_class.is_some() {
4783 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4784 }
4785 }
4786
4787 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4788 let workload_class = workload_class.and_then(|v| v.0);
4789
4790 if managed {
4791 if replicas.is_some() {
4792 sql_bail!("REPLICAS not supported for managed clusters");
4793 }
4794 let Some(size) = size else {
4795 sql_bail!("SIZE must be specified for managed clusters");
4796 };
4797
4798 if disk.is_some() {
4799 if scx.catalog.is_cluster_size_cc(&size) {
4803 sql_bail!(
4804 "DISK option not supported for modern cluster sizes because disk is always enabled"
4805 );
4806 }
4807
4808 scx.catalog
4809 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4810 }
4811
4812 let compute = plan_compute_replica_config(
4813 introspection_interval,
4814 introspection_debugging.unwrap_or(false),
4815 )?;
4816
4817 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4818 replication_factor.unwrap_or_else(|| {
4819 scx.catalog
4820 .system_vars()
4821 .default_cluster_replication_factor()
4822 })
4823 } else {
4824 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4825 if replication_factor.is_some() {
4826 sql_bail!(
4827 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4828 );
4829 }
4830 0
4834 };
4835 let availability_zones = availability_zones.unwrap_or_default();
4836
4837 if !availability_zones.is_empty() {
4838 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4839 }
4840
4841 let ClusterFeatureExtracted {
4843 reoptimize_imported_views,
4844 enable_eager_delta_joins,
4845 enable_new_outer_join_lowering,
4846 enable_variadic_left_join_lowering,
4847 enable_letrec_fixpoint_analysis,
4848 enable_join_prioritize_arranged,
4849 enable_projection_pushdown_after_relation_cse,
4850 seen: _,
4851 } = ClusterFeatureExtracted::try_from(features)?;
4852 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4853 reoptimize_imported_views,
4854 enable_eager_delta_joins,
4855 enable_new_outer_join_lowering,
4856 enable_variadic_left_join_lowering,
4857 enable_letrec_fixpoint_analysis,
4858 enable_join_prioritize_arranged,
4859 enable_projection_pushdown_after_relation_cse,
4860 ..Default::default()
4861 };
4862
4863 let schedule = plan_cluster_schedule(schedule)?;
4864
4865 Ok(CreateClusterPlan {
4866 name: normalize::ident(name),
4867 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4868 replication_factor,
4869 size,
4870 availability_zones,
4871 compute,
4872 optimizer_feature_overrides,
4873 schedule,
4874 }),
4875 workload_class,
4876 })
4877 } else {
4878 let Some(replica_defs) = replicas else {
4879 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4880 };
4881 if availability_zones.is_some() {
4882 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4883 }
4884 if replication_factor.is_some() {
4885 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4886 }
4887 if introspection_debugging.is_some() {
4888 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4889 }
4890 if introspection_interval.is_some() {
4891 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4892 }
4893 if size.is_some() {
4894 sql_bail!("SIZE not supported for unmanaged clusters");
4895 }
4896 if disk.is_some() {
4897 sql_bail!("DISK not supported for unmanaged clusters");
4898 }
4899 if !features.is_empty() {
4900 sql_bail!("FEATURES not supported for unmanaged clusters");
4901 }
4902 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4903 sql_bail!(
4904 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4905 );
4906 }
4907
4908 let mut replicas = vec![];
4909 for ReplicaDefinition { name, options } in replica_defs {
4910 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4911 }
4912
4913 Ok(CreateClusterPlan {
4914 name: normalize::ident(name),
4915 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4916 workload_class,
4917 })
4918 }
4919}
4920
4921pub fn unplan_create_cluster(
4925 scx: &StatementContext,
4926 CreateClusterPlan {
4927 name,
4928 variant,
4929 workload_class,
4930 }: CreateClusterPlan,
4931) -> Result<CreateClusterStatement<Aug>, PlanError> {
4932 match variant {
4933 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4934 replication_factor,
4935 size,
4936 availability_zones,
4937 compute,
4938 optimizer_feature_overrides,
4939 schedule,
4940 }) => {
4941 let schedule = unplan_cluster_schedule(schedule);
4942 let OptimizerFeatureOverrides {
4943 enable_guard_subquery_tablefunc: _,
4944 enable_consolidate_after_union_negate: _,
4945 enable_reduce_mfp_fusion: _,
4946 enable_cardinality_estimates: _,
4947 persist_fast_path_limit: _,
4948 reoptimize_imported_views,
4949 enable_eager_delta_joins,
4950 enable_new_outer_join_lowering,
4951 enable_variadic_left_join_lowering,
4952 enable_letrec_fixpoint_analysis,
4953 enable_reduce_reduction: _,
4954 enable_join_prioritize_arranged,
4955 enable_projection_pushdown_after_relation_cse,
4956 enable_less_reduce_in_eqprop: _,
4957 enable_dequadratic_eqprop_map: _,
4958 enable_eq_classes_withholding_errors: _,
4959 enable_fast_path_plan_insights: _,
4960 } = optimizer_feature_overrides;
4961 let features_extracted = ClusterFeatureExtracted {
4963 seen: Default::default(),
4965 reoptimize_imported_views,
4966 enable_eager_delta_joins,
4967 enable_new_outer_join_lowering,
4968 enable_variadic_left_join_lowering,
4969 enable_letrec_fixpoint_analysis,
4970 enable_join_prioritize_arranged,
4971 enable_projection_pushdown_after_relation_cse,
4972 };
4973 let features = features_extracted.into_values(scx.catalog);
4974 let availability_zones = if availability_zones.is_empty() {
4975 None
4976 } else {
4977 Some(availability_zones)
4978 };
4979 let (introspection_interval, introspection_debugging) =
4980 unplan_compute_replica_config(compute);
4981 let replication_factor = match &schedule {
4984 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4985 ClusterScheduleOptionValue::Refresh { .. } => {
4986 assert!(
4987 replication_factor <= 1,
4988 "replication factor, {replication_factor:?}, must be <= 1"
4989 );
4990 None
4991 }
4992 };
4993 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4994 let options_extracted = ClusterOptionExtracted {
4995 seen: Default::default(),
4997 availability_zones,
4998 disk: None,
4999 introspection_debugging: Some(introspection_debugging),
5000 introspection_interval,
5001 managed: Some(true),
5002 replicas: None,
5003 replication_factor,
5004 size: Some(size),
5005 schedule: Some(schedule),
5006 workload_class,
5007 };
5008 let options = options_extracted.into_values(scx.catalog);
5009 let name = Ident::new_unchecked(name);
5010 Ok(CreateClusterStatement {
5011 name,
5012 options,
5013 features,
5014 })
5015 }
5016 CreateClusterVariant::Unmanaged(_) => {
5017 bail_unsupported!("SHOW CREATE for unmanaged clusters")
5018 }
5019 }
5020}
5021
5022generate_extracted_config!(
5023 ReplicaOption,
5024 (AvailabilityZone, String),
5025 (BilledAs, String),
5026 (ComputeAddresses, Vec<String>),
5027 (ComputectlAddresses, Vec<String>),
5028 (Disk, bool),
5029 (Internal, bool, Default(false)),
5030 (IntrospectionDebugging, bool, Default(false)),
5031 (IntrospectionInterval, OptionalDuration),
5032 (Size, String),
5033 (StorageAddresses, Vec<String>),
5034 (StoragectlAddresses, Vec<String>),
5035 (Workers, u16)
5036);
5037
5038fn plan_replica_config(
5039 scx: &StatementContext,
5040 options: Vec<ReplicaOption<Aug>>,
5041) -> Result<ReplicaConfig, PlanError> {
5042 let ReplicaOptionExtracted {
5043 availability_zone,
5044 billed_as,
5045 computectl_addresses,
5046 disk,
5047 internal,
5048 introspection_debugging,
5049 introspection_interval,
5050 size,
5051 storagectl_addresses,
5052 ..
5053 }: ReplicaOptionExtracted = options.try_into()?;
5054
5055 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
5056
5057 match (
5058 size,
5059 availability_zone,
5060 billed_as,
5061 storagectl_addresses,
5062 computectl_addresses,
5063 ) {
5064 (None, _, None, None, None) => {
5066 sql_bail!("SIZE option must be specified");
5069 }
5070 (Some(size), availability_zone, billed_as, None, None) => {
5071 if disk.is_some() {
5072 if scx.catalog.is_cluster_size_cc(&size) {
5076 sql_bail!(
5077 "DISK option not supported for modern cluster sizes because disk is always enabled"
5078 );
5079 }
5080
5081 scx.catalog
5082 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5083 }
5084
5085 Ok(ReplicaConfig::Orchestrated {
5086 size,
5087 availability_zone,
5088 compute,
5089 billed_as,
5090 internal,
5091 })
5092 }
5093
5094 (None, None, None, storagectl_addresses, computectl_addresses) => {
5095 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5096
5097 let Some(storagectl_addrs) = storagectl_addresses else {
5101 sql_bail!("missing STORAGECTL ADDRESSES option");
5102 };
5103 let Some(computectl_addrs) = computectl_addresses else {
5104 sql_bail!("missing COMPUTECTL ADDRESSES option");
5105 };
5106
5107 if storagectl_addrs.len() != computectl_addrs.len() {
5108 sql_bail!(
5109 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5110 );
5111 }
5112
5113 if disk.is_some() {
5114 sql_bail!("DISK can't be specified for unorchestrated clusters");
5115 }
5116
5117 Ok(ReplicaConfig::Unorchestrated {
5118 storagectl_addrs,
5119 computectl_addrs,
5120 compute,
5121 })
5122 }
5123 _ => {
5124 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5127 }
5128 }
5129}
5130
5131fn plan_compute_replica_config(
5135 introspection_interval: Option<OptionalDuration>,
5136 introspection_debugging: bool,
5137) -> Result<ComputeReplicaConfig, PlanError> {
5138 let introspection_interval = introspection_interval
5139 .map(|OptionalDuration(i)| i)
5140 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5141 let introspection = match introspection_interval {
5142 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5143 interval,
5144 debugging: introspection_debugging,
5145 }),
5146 None if introspection_debugging => {
5147 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5148 }
5149 None => None,
5150 };
5151 let compute = ComputeReplicaConfig { introspection };
5152 Ok(compute)
5153}
5154
5155fn unplan_compute_replica_config(
5159 compute_replica_config: ComputeReplicaConfig,
5160) -> (Option<OptionalDuration>, bool) {
5161 match compute_replica_config.introspection {
5162 Some(ComputeReplicaIntrospectionConfig {
5163 debugging,
5164 interval,
5165 }) => (Some(OptionalDuration(Some(interval))), debugging),
5166 None => (Some(OptionalDuration(None)), false),
5167 }
5168}
5169
5170fn plan_cluster_schedule(
5174 schedule: ClusterScheduleOptionValue,
5175) -> Result<ClusterSchedule, PlanError> {
5176 Ok(match schedule {
5177 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5178 ClusterScheduleOptionValue::Refresh {
5180 hydration_time_estimate: None,
5181 } => ClusterSchedule::Refresh {
5182 hydration_time_estimate: Duration::from_millis(0),
5183 },
5184 ClusterScheduleOptionValue::Refresh {
5186 hydration_time_estimate: Some(interval_value),
5187 } => {
5188 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5189 if interval.as_microseconds() < 0 {
5190 sql_bail!(
5191 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5192 interval
5193 );
5194 }
5195 if interval.months != 0 {
5196 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5200 }
5201 let duration = interval.duration()?;
5202 if u64::try_from(duration.as_millis()).is_err()
5203 || Interval::from_duration(&duration).is_err()
5204 {
5205 sql_bail!("HYDRATION TIME ESTIMATE too large");
5206 }
5207 ClusterSchedule::Refresh {
5208 hydration_time_estimate: duration,
5209 }
5210 }
5211 })
5212}
5213
5214fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5218 match schedule {
5219 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5220 ClusterSchedule::Refresh {
5221 hydration_time_estimate,
5222 } => {
5223 let interval = Interval::from_duration(&hydration_time_estimate)
5224 .expect("planning ensured that this is convertible back to Interval");
5225 let interval_value = literal::unplan_interval(&interval);
5226 ClusterScheduleOptionValue::Refresh {
5227 hydration_time_estimate: Some(interval_value),
5228 }
5229 }
5230 }
5231}
5232
5233pub fn describe_create_cluster_replica(
5234 _: &StatementContext,
5235 _: CreateClusterReplicaStatement<Aug>,
5236) -> Result<StatementDesc, PlanError> {
5237 Ok(StatementDesc::new(None))
5238}
5239
5240pub fn plan_create_cluster_replica(
5241 scx: &StatementContext,
5242 CreateClusterReplicaStatement {
5243 definition: ReplicaDefinition { name, options },
5244 of_cluster,
5245 }: CreateClusterReplicaStatement<Aug>,
5246) -> Result<Plan, PlanError> {
5247 let cluster = scx
5248 .catalog
5249 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5250 let current_replica_count = cluster.replica_ids().iter().count();
5251 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5252 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5253 return Err(PlanError::CreateReplicaFailStorageObjects {
5254 current_replica_count,
5255 internal_replica_count,
5256 hypothetical_replica_count: current_replica_count + 1,
5257 });
5258 }
5259
5260 let config = plan_replica_config(scx, options)?;
5261
5262 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5263 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5264 return Err(PlanError::MangedReplicaName(name.into_string()));
5265 }
5266 } else {
5267 ensure_cluster_is_not_managed(scx, cluster.id())?;
5268 }
5269
5270 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5271 name: normalize::ident(name),
5272 cluster_id: cluster.id(),
5273 config,
5274 }))
5275}
5276
5277pub fn describe_create_secret(
5278 _: &StatementContext,
5279 _: CreateSecretStatement<Aug>,
5280) -> Result<StatementDesc, PlanError> {
5281 Ok(StatementDesc::new(None))
5282}
5283
5284pub fn plan_create_secret(
5285 scx: &StatementContext,
5286 stmt: CreateSecretStatement<Aug>,
5287) -> Result<Plan, PlanError> {
5288 let CreateSecretStatement {
5289 name,
5290 if_not_exists,
5291 value,
5292 } = &stmt;
5293
5294 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5295 let mut create_sql_statement = stmt.clone();
5296 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5297 let create_sql =
5298 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5299 let secret_as = query::plan_secret_as(scx, value.clone())?;
5300
5301 let secret = Secret {
5302 create_sql,
5303 secret_as,
5304 };
5305
5306 Ok(Plan::CreateSecret(CreateSecretPlan {
5307 name,
5308 secret,
5309 if_not_exists: *if_not_exists,
5310 }))
5311}
5312
5313pub fn describe_create_connection(
5314 _: &StatementContext,
5315 _: CreateConnectionStatement<Aug>,
5316) -> Result<StatementDesc, PlanError> {
5317 Ok(StatementDesc::new(None))
5318}
5319
5320generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5321
5322pub fn plan_create_connection(
5323 scx: &StatementContext,
5324 mut stmt: CreateConnectionStatement<Aug>,
5325) -> Result<Plan, PlanError> {
5326 let CreateConnectionStatement {
5327 name,
5328 connection_type,
5329 values,
5330 if_not_exists,
5331 with_options,
5332 } = stmt.clone();
5333 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5334 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5335 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5336
5337 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5338 if options.validate.is_some() {
5339 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5340 }
5341 let validate = match options.validate {
5342 Some(val) => val,
5343 None => {
5344 scx.catalog
5345 .system_vars()
5346 .enable_default_connection_validation()
5347 && details.to_connection().validate_by_default()
5348 }
5349 };
5350
5351 let full_name = scx.catalog.resolve_full_name(&name);
5353 let partial_name = PartialItemName::from(full_name.clone());
5354 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5355 return Err(PlanError::ItemAlreadyExists {
5356 name: full_name.to_string(),
5357 item_type: item.item_type(),
5358 });
5359 }
5360
5361 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5364 stmt.values.retain(|v| {
5365 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5366 });
5367 stmt.values.push(ConnectionOption {
5368 name: ConnectionOptionName::PublicKey1,
5369 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5370 });
5371 stmt.values.push(ConnectionOption {
5372 name: ConnectionOptionName::PublicKey2,
5373 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5374 });
5375 }
5376 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5377
5378 let plan = CreateConnectionPlan {
5379 name,
5380 if_not_exists,
5381 connection: crate::plan::Connection {
5382 create_sql,
5383 details,
5384 },
5385 validate,
5386 };
5387 Ok(Plan::CreateConnection(plan))
5388}
5389
5390fn plan_drop_database(
5391 scx: &StatementContext,
5392 if_exists: bool,
5393 name: &UnresolvedDatabaseName,
5394 cascade: bool,
5395) -> Result<Option<DatabaseId>, PlanError> {
5396 Ok(match resolve_database(scx, name, if_exists)? {
5397 Some(database) => {
5398 if !cascade && database.has_schemas() {
5399 sql_bail!(
5400 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5401 name,
5402 );
5403 }
5404 Some(database.id())
5405 }
5406 None => None,
5407 })
5408}
5409
5410pub fn describe_drop_objects(
5411 _: &StatementContext,
5412 _: DropObjectsStatement,
5413) -> Result<StatementDesc, PlanError> {
5414 Ok(StatementDesc::new(None))
5415}
5416
5417pub fn plan_drop_objects(
5418 scx: &mut StatementContext,
5419 DropObjectsStatement {
5420 object_type,
5421 if_exists,
5422 names,
5423 cascade,
5424 }: DropObjectsStatement,
5425) -> Result<Plan, PlanError> {
5426 assert_ne!(
5427 object_type,
5428 mz_sql_parser::ast::ObjectType::Func,
5429 "rejected in parser"
5430 );
5431 let object_type = object_type.into();
5432
5433 let mut referenced_ids = Vec::new();
5434 for name in names {
5435 let id = match &name {
5436 UnresolvedObjectName::Cluster(name) => {
5437 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5438 }
5439 UnresolvedObjectName::ClusterReplica(name) => {
5440 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5441 }
5442 UnresolvedObjectName::Database(name) => {
5443 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5444 }
5445 UnresolvedObjectName::Schema(name) => {
5446 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5447 }
5448 UnresolvedObjectName::Role(name) => {
5449 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5450 }
5451 UnresolvedObjectName::Item(name) => {
5452 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5453 .map(ObjectId::Item)
5454 }
5455 UnresolvedObjectName::NetworkPolicy(name) => {
5456 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5457 }
5458 };
5459 match id {
5460 Some(id) => referenced_ids.push(id),
5461 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5462 name: name.to_ast_string_simple(),
5463 object_type,
5464 }),
5465 }
5466 }
5467 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5468
5469 Ok(Plan::DropObjects(DropObjectsPlan {
5470 referenced_ids,
5471 drop_ids,
5472 object_type,
5473 }))
5474}
5475
5476fn plan_drop_schema(
5477 scx: &StatementContext,
5478 if_exists: bool,
5479 name: &UnresolvedSchemaName,
5480 cascade: bool,
5481) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5482 let normalized = normalize::unresolved_schema_name(name.clone())?;
5486 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5487 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5488 }
5489
5490 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5491 Some((database_spec, schema_spec)) => {
5492 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5493 sql_bail!(
5494 "cannot drop schema {name} because it is required by the database system",
5495 );
5496 }
5497 if let SchemaSpecifier::Temporary = schema_spec {
5498 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5499 }
5500 let schema = scx.get_schema(&database_spec, &schema_spec);
5501 if !cascade && schema.has_items() {
5502 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5503 sql_bail!(
5504 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5505 full_schema_name
5506 );
5507 }
5508 Some((database_spec, schema_spec))
5509 }
5510 None => None,
5511 })
5512}
5513
5514fn plan_drop_role(
5515 scx: &StatementContext,
5516 if_exists: bool,
5517 name: &Ident,
5518) -> Result<Option<RoleId>, PlanError> {
5519 match scx.catalog.resolve_role(name.as_str()) {
5520 Ok(role) => {
5521 let id = role.id();
5522 if &id == scx.catalog.active_role_id() {
5523 sql_bail!("current role cannot be dropped");
5524 }
5525 for role in scx.catalog.get_roles() {
5526 for (member_id, grantor_id) in role.membership() {
5527 if &id == grantor_id {
5528 let member_role = scx.catalog.get_role(member_id);
5529 sql_bail!(
5530 "cannot drop role {}: still depended up by membership of role {} in role {}",
5531 name.as_str(),
5532 role.name(),
5533 member_role.name()
5534 );
5535 }
5536 }
5537 }
5538 Ok(Some(role.id()))
5539 }
5540 Err(_) if if_exists => Ok(None),
5541 Err(e) => Err(e.into()),
5542 }
5543}
5544
5545fn plan_drop_cluster(
5546 scx: &StatementContext,
5547 if_exists: bool,
5548 name: &Ident,
5549 cascade: bool,
5550) -> Result<Option<ClusterId>, PlanError> {
5551 Ok(match resolve_cluster(scx, name, if_exists)? {
5552 Some(cluster) => {
5553 if !cascade && !cluster.bound_objects().is_empty() {
5554 return Err(PlanError::DependentObjectsStillExist {
5555 object_type: "cluster".to_string(),
5556 object_name: cluster.name().to_string(),
5557 dependents: Vec::new(),
5558 });
5559 }
5560 Some(cluster.id())
5561 }
5562 None => None,
5563 })
5564}
5565
5566fn plan_drop_network_policy(
5567 scx: &StatementContext,
5568 if_exists: bool,
5569 name: &Ident,
5570) -> Result<Option<NetworkPolicyId>, PlanError> {
5571 match scx.catalog.resolve_network_policy(name.as_str()) {
5572 Ok(policy) => {
5573 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5576 Err(PlanError::NetworkPolicyInUse)
5577 } else {
5578 Ok(Some(policy.id()))
5579 }
5580 }
5581 Err(_) if if_exists => Ok(None),
5582 Err(e) => Err(e.into()),
5583 }
5584}
5585
5586fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5589 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5591 false
5592 } else {
5593 cluster.bound_objects().iter().any(|id| {
5595 let item = scx.catalog.get_item(id);
5596 matches!(
5597 item.item_type(),
5598 CatalogItemType::Sink | CatalogItemType::Source
5599 )
5600 })
5601 }
5602}
5603
5604fn plan_drop_cluster_replica(
5605 scx: &StatementContext,
5606 if_exists: bool,
5607 name: &QualifiedReplica,
5608) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5609 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5610 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5611}
5612
5613fn plan_drop_item(
5615 scx: &StatementContext,
5616 object_type: ObjectType,
5617 if_exists: bool,
5618 name: UnresolvedItemName,
5619 cascade: bool,
5620) -> Result<Option<CatalogItemId>, PlanError> {
5621 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5622 Ok(r) => r,
5623 Err(PlanError::MismatchedObjectType {
5625 name,
5626 is_type: ObjectType::MaterializedView,
5627 expected_type: ObjectType::View,
5628 }) => {
5629 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5630 }
5631 e => e?,
5632 };
5633
5634 Ok(match resolved {
5635 Some(catalog_item) => {
5636 if catalog_item.id().is_system() {
5637 sql_bail!(
5638 "cannot drop {} {} because it is required by the database system",
5639 catalog_item.item_type(),
5640 scx.catalog.minimal_qualification(catalog_item.name()),
5641 );
5642 }
5643
5644 if !cascade {
5645 for id in catalog_item.used_by() {
5646 let dep = scx.catalog.get_item(id);
5647 if dependency_prevents_drop(object_type, dep) {
5648 return Err(PlanError::DependentObjectsStillExist {
5649 object_type: catalog_item.item_type().to_string(),
5650 object_name: scx
5651 .catalog
5652 .minimal_qualification(catalog_item.name())
5653 .to_string(),
5654 dependents: vec![(
5655 dep.item_type().to_string(),
5656 scx.catalog.minimal_qualification(dep.name()).to_string(),
5657 )],
5658 });
5659 }
5660 }
5661 }
5664 Some(catalog_item.id())
5665 }
5666 None => None,
5667 })
5668}
5669
5670fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5672 match object_type {
5673 ObjectType::Type => true,
5674 _ => match dep.item_type() {
5675 CatalogItemType::Func
5676 | CatalogItemType::Table
5677 | CatalogItemType::Source
5678 | CatalogItemType::View
5679 | CatalogItemType::MaterializedView
5680 | CatalogItemType::Sink
5681 | CatalogItemType::Type
5682 | CatalogItemType::Secret
5683 | CatalogItemType::Connection
5684 | CatalogItemType::ContinualTask => true,
5685 CatalogItemType::Index => false,
5686 },
5687 }
5688}
5689
5690pub fn describe_alter_index_options(
5691 _: &StatementContext,
5692 _: AlterIndexStatement<Aug>,
5693) -> Result<StatementDesc, PlanError> {
5694 Ok(StatementDesc::new(None))
5695}
5696
5697pub fn describe_drop_owned(
5698 _: &StatementContext,
5699 _: DropOwnedStatement<Aug>,
5700) -> Result<StatementDesc, PlanError> {
5701 Ok(StatementDesc::new(None))
5702}
5703
5704pub fn plan_drop_owned(
5705 scx: &StatementContext,
5706 drop: DropOwnedStatement<Aug>,
5707) -> Result<Plan, PlanError> {
5708 let cascade = drop.cascade();
5709 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5710 let mut drop_ids = Vec::new();
5711 let mut privilege_revokes = Vec::new();
5712 let mut default_privilege_revokes = Vec::new();
5713
5714 fn update_privilege_revokes(
5715 object_id: SystemObjectId,
5716 privileges: &PrivilegeMap,
5717 role_ids: &BTreeSet<RoleId>,
5718 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5719 ) {
5720 privilege_revokes.extend(iter::zip(
5721 iter::repeat(object_id),
5722 privileges
5723 .all_values()
5724 .filter(|privilege| role_ids.contains(&privilege.grantee))
5725 .cloned(),
5726 ));
5727 }
5728
5729 for replica in scx.catalog.get_cluster_replicas() {
5731 if role_ids.contains(&replica.owner_id()) {
5732 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5733 }
5734 }
5735
5736 for cluster in scx.catalog.get_clusters() {
5738 if role_ids.contains(&cluster.owner_id()) {
5739 if !cascade {
5741 let non_owned_bound_objects: Vec<_> = cluster
5742 .bound_objects()
5743 .into_iter()
5744 .map(|item_id| scx.catalog.get_item(item_id))
5745 .filter(|item| !role_ids.contains(&item.owner_id()))
5746 .collect();
5747 if !non_owned_bound_objects.is_empty() {
5748 let names: Vec<_> = non_owned_bound_objects
5749 .into_iter()
5750 .map(|item| {
5751 (
5752 item.item_type().to_string(),
5753 scx.catalog.resolve_full_name(item.name()).to_string(),
5754 )
5755 })
5756 .collect();
5757 return Err(PlanError::DependentObjectsStillExist {
5758 object_type: "cluster".to_string(),
5759 object_name: cluster.name().to_string(),
5760 dependents: names,
5761 });
5762 }
5763 }
5764 drop_ids.push(cluster.id().into());
5765 }
5766 update_privilege_revokes(
5767 SystemObjectId::Object(cluster.id().into()),
5768 cluster.privileges(),
5769 &role_ids,
5770 &mut privilege_revokes,
5771 );
5772 }
5773
5774 for item in scx.catalog.get_items() {
5776 if role_ids.contains(&item.owner_id()) {
5777 if !cascade {
5778 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5780 let non_owned_dependencies: Vec<_> = used_by
5781 .into_iter()
5782 .map(|item_id| scx.catalog.get_item(item_id))
5783 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5784 .filter(|item| !role_ids.contains(&item.owner_id()))
5785 .collect();
5786 if !non_owned_dependencies.is_empty() {
5787 let names: Vec<_> = non_owned_dependencies
5788 .into_iter()
5789 .map(|item| {
5790 let item_typ = item.item_type().to_string();
5791 let item_name =
5792 scx.catalog.resolve_full_name(item.name()).to_string();
5793 (item_typ, item_name)
5794 })
5795 .collect();
5796 Err(PlanError::DependentObjectsStillExist {
5797 object_type: item.item_type().to_string(),
5798 object_name: scx
5799 .catalog
5800 .resolve_full_name(item.name())
5801 .to_string()
5802 .to_string(),
5803 dependents: names,
5804 })
5805 } else {
5806 Ok(())
5807 }
5808 };
5809
5810 if let Some(id) = item.progress_id() {
5813 let progress_item = scx.catalog.get_item(&id);
5814 check_if_dependents_exist(progress_item.used_by())?;
5815 }
5816 check_if_dependents_exist(item.used_by())?;
5817 }
5818 drop_ids.push(item.id().into());
5819 }
5820 update_privilege_revokes(
5821 SystemObjectId::Object(item.id().into()),
5822 item.privileges(),
5823 &role_ids,
5824 &mut privilege_revokes,
5825 );
5826 }
5827
5828 for schema in scx.catalog.get_schemas() {
5830 if !schema.id().is_temporary() {
5831 if role_ids.contains(&schema.owner_id()) {
5832 if !cascade {
5833 let non_owned_dependencies: Vec<_> = schema
5834 .item_ids()
5835 .map(|item_id| scx.catalog.get_item(&item_id))
5836 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5837 .filter(|item| !role_ids.contains(&item.owner_id()))
5838 .collect();
5839 if !non_owned_dependencies.is_empty() {
5840 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5841 sql_bail!(
5842 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5843 full_schema_name.to_string().quoted()
5844 );
5845 }
5846 }
5847 drop_ids.push((*schema.database(), *schema.id()).into())
5848 }
5849 update_privilege_revokes(
5850 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5851 schema.privileges(),
5852 &role_ids,
5853 &mut privilege_revokes,
5854 );
5855 }
5856 }
5857
5858 for database in scx.catalog.get_databases() {
5860 if role_ids.contains(&database.owner_id()) {
5861 if !cascade {
5862 let non_owned_schemas: Vec<_> = database
5863 .schemas()
5864 .into_iter()
5865 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5866 .collect();
5867 if !non_owned_schemas.is_empty() {
5868 sql_bail!(
5869 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5870 database.name().quoted(),
5871 );
5872 }
5873 }
5874 drop_ids.push(database.id().into());
5875 }
5876 update_privilege_revokes(
5877 SystemObjectId::Object(database.id().into()),
5878 database.privileges(),
5879 &role_ids,
5880 &mut privilege_revokes,
5881 );
5882 }
5883
5884 update_privilege_revokes(
5886 SystemObjectId::System,
5887 scx.catalog.get_system_privileges(),
5888 &role_ids,
5889 &mut privilege_revokes,
5890 );
5891
5892 for (default_privilege_object, default_privilege_acl_items) in
5893 scx.catalog.get_default_privileges()
5894 {
5895 for default_privilege_acl_item in default_privilege_acl_items {
5896 if role_ids.contains(&default_privilege_object.role_id)
5897 || role_ids.contains(&default_privilege_acl_item.grantee)
5898 {
5899 default_privilege_revokes.push((
5900 default_privilege_object.clone(),
5901 default_privilege_acl_item.clone(),
5902 ));
5903 }
5904 }
5905 }
5906
5907 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5908
5909 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5910 if !system_ids.is_empty() {
5911 let mut owners = system_ids
5912 .into_iter()
5913 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5914 .collect::<BTreeSet<_>>()
5915 .into_iter()
5916 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5917 sql_bail!(
5918 "cannot drop objects owned by role {} because they are required by the database system",
5919 owners.join(", "),
5920 );
5921 }
5922
5923 Ok(Plan::DropOwned(DropOwnedPlan {
5924 role_ids: role_ids.into_iter().collect(),
5925 drop_ids,
5926 privilege_revokes,
5927 default_privilege_revokes,
5928 }))
5929}
5930
5931fn plan_retain_history_option(
5932 scx: &StatementContext,
5933 retain_history: Option<OptionalDuration>,
5934) -> Result<Option<CompactionWindow>, PlanError> {
5935 if let Some(OptionalDuration(lcw)) = retain_history {
5936 Ok(Some(plan_retain_history(scx, lcw)?))
5937 } else {
5938 Ok(None)
5939 }
5940}
5941
5942fn plan_retain_history(
5948 scx: &StatementContext,
5949 lcw: Option<Duration>,
5950) -> Result<CompactionWindow, PlanError> {
5951 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5952 match lcw {
5953 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5958 option_name: "RETAIN HISTORY".to_string(),
5959 err: Box::new(PlanError::Unstructured(
5960 "internal error: unexpectedly zero".to_string(),
5961 )),
5962 }),
5963 Some(duration) => {
5964 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5967 && scx
5968 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5969 .is_err()
5970 {
5971 return Err(PlanError::RetainHistoryLow {
5972 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5973 });
5974 }
5975 Ok(duration.try_into()?)
5976 }
5977 None => {
5980 if scx
5981 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5982 .is_err()
5983 {
5984 Err(PlanError::RetainHistoryRequired)
5985 } else {
5986 Ok(CompactionWindow::DisableCompaction)
5987 }
5988 }
5989 }
5990}
5991
5992generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5993
5994fn plan_index_options(
5995 scx: &StatementContext,
5996 with_opts: Vec<IndexOption<Aug>>,
5997) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5998 if !with_opts.is_empty() {
5999 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
6001 }
6002
6003 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
6004
6005 let mut out = Vec::with_capacity(1);
6006 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6007 out.push(crate::plan::IndexOption::RetainHistory(cw));
6008 }
6009 Ok(out)
6010}
6011
6012generate_extracted_config!(
6013 TableOption,
6014 (PartitionBy, Vec<Ident>),
6015 (RetainHistory, OptionalDuration),
6016 (RedactedTest, String)
6017);
6018
6019fn plan_table_options(
6020 scx: &StatementContext,
6021 desc: &RelationDesc,
6022 with_opts: Vec<TableOption<Aug>>,
6023) -> Result<Vec<crate::plan::TableOption>, PlanError> {
6024 let TableOptionExtracted {
6025 partition_by,
6026 retain_history,
6027 redacted_test,
6028 ..
6029 }: TableOptionExtracted = with_opts.try_into()?;
6030
6031 if let Some(partition_by) = partition_by {
6032 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
6033 check_partition_by(desc, partition_by)?;
6034 }
6035
6036 if redacted_test.is_some() {
6037 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
6038 }
6039
6040 let mut out = Vec::with_capacity(1);
6041 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
6042 out.push(crate::plan::TableOption::RetainHistory(cw));
6043 }
6044 Ok(out)
6045}
6046
6047pub fn plan_alter_index_options(
6048 scx: &mut StatementContext,
6049 AlterIndexStatement {
6050 index_name,
6051 if_exists,
6052 action,
6053 }: AlterIndexStatement<Aug>,
6054) -> Result<Plan, PlanError> {
6055 let object_type = ObjectType::Index;
6056 match action {
6057 AlterIndexAction::ResetOptions(options) => {
6058 let mut options = options.into_iter();
6059 if let Some(opt) = options.next() {
6060 match opt {
6061 IndexOptionName::RetainHistory => {
6062 if options.next().is_some() {
6063 sql_bail!("RETAIN HISTORY must be only option");
6064 }
6065 return alter_retain_history(
6066 scx,
6067 object_type,
6068 if_exists,
6069 UnresolvedObjectName::Item(index_name),
6070 None,
6071 );
6072 }
6073 }
6074 }
6075 sql_bail!("expected option");
6076 }
6077 AlterIndexAction::SetOptions(options) => {
6078 let mut options = options.into_iter();
6079 if let Some(opt) = options.next() {
6080 match opt.name {
6081 IndexOptionName::RetainHistory => {
6082 if options.next().is_some() {
6083 sql_bail!("RETAIN HISTORY must be only option");
6084 }
6085 return alter_retain_history(
6086 scx,
6087 object_type,
6088 if_exists,
6089 UnresolvedObjectName::Item(index_name),
6090 opt.value,
6091 );
6092 }
6093 }
6094 }
6095 sql_bail!("expected option");
6096 }
6097 }
6098}
6099
6100pub fn describe_alter_cluster_set_options(
6101 _: &StatementContext,
6102 _: AlterClusterStatement<Aug>,
6103) -> Result<StatementDesc, PlanError> {
6104 Ok(StatementDesc::new(None))
6105}
6106
6107pub fn plan_alter_cluster(
6108 scx: &mut StatementContext,
6109 AlterClusterStatement {
6110 name,
6111 action,
6112 if_exists,
6113 }: AlterClusterStatement<Aug>,
6114) -> Result<Plan, PlanError> {
6115 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6116 Some(entry) => entry,
6117 None => {
6118 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6119 name: name.to_ast_string_simple(),
6120 object_type: ObjectType::Cluster,
6121 });
6122
6123 return Ok(Plan::AlterNoop(AlterNoopPlan {
6124 object_type: ObjectType::Cluster,
6125 }));
6126 }
6127 };
6128
6129 let mut options: PlanClusterOption = Default::default();
6130 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6131
6132 match action {
6133 AlterClusterAction::SetOptions {
6134 options: set_options,
6135 with_options,
6136 } => {
6137 let ClusterOptionExtracted {
6138 availability_zones,
6139 introspection_debugging,
6140 introspection_interval,
6141 managed,
6142 replicas: replica_defs,
6143 replication_factor,
6144 seen: _,
6145 size,
6146 disk,
6147 schedule,
6148 workload_class,
6149 }: ClusterOptionExtracted = set_options.try_into()?;
6150
6151 if !scx.catalog.active_role_id().is_system() {
6152 if workload_class.is_some() {
6153 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6154 }
6155 }
6156
6157 match managed.unwrap_or_else(|| cluster.is_managed()) {
6158 true => {
6159 let alter_strategy_extracted =
6160 ClusterAlterOptionExtracted::try_from(with_options)?;
6161 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6162
6163 match alter_strategy {
6164 AlterClusterPlanStrategy::None => {}
6165 _ => {
6166 scx.require_feature_flag(
6167 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6168 )?;
6169 }
6170 }
6171
6172 if replica_defs.is_some() {
6173 sql_bail!("REPLICAS not supported for managed clusters");
6174 }
6175 if schedule.is_some()
6176 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6177 {
6178 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6179 }
6180
6181 if let Some(replication_factor) = replication_factor {
6182 if schedule.is_some()
6183 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6184 {
6185 sql_bail!(
6186 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6187 );
6188 }
6189 if let Some(current_schedule) = cluster.schedule() {
6190 if !matches!(current_schedule, ClusterSchedule::Manual) {
6191 sql_bail!(
6192 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6193 );
6194 }
6195 }
6196
6197 let internal_replica_count =
6198 cluster.replicas().iter().filter(|r| r.internal()).count();
6199 let hypothetical_replica_count =
6200 internal_replica_count + usize::cast_from(replication_factor);
6201
6202 if contains_single_replica_objects(scx, cluster)
6205 && hypothetical_replica_count > 1
6206 {
6207 return Err(PlanError::CreateReplicaFailStorageObjects {
6208 current_replica_count: cluster.replica_ids().iter().count(),
6209 internal_replica_count,
6210 hypothetical_replica_count,
6211 });
6212 }
6213 } else if alter_strategy.is_some() {
6214 let internal_replica_count =
6218 cluster.replicas().iter().filter(|r| r.internal()).count();
6219 let hypothetical_replica_count = internal_replica_count * 2;
6220 if contains_single_replica_objects(scx, cluster) {
6221 return Err(PlanError::CreateReplicaFailStorageObjects {
6222 current_replica_count: cluster.replica_ids().iter().count(),
6223 internal_replica_count,
6224 hypothetical_replica_count,
6225 });
6226 }
6227 }
6228 }
6229 false => {
6230 if !alter_strategy.is_none() {
6231 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6232 }
6233 if availability_zones.is_some() {
6234 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6235 }
6236 if replication_factor.is_some() {
6237 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6238 }
6239 if introspection_debugging.is_some() {
6240 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6241 }
6242 if introspection_interval.is_some() {
6243 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6244 }
6245 if size.is_some() {
6246 sql_bail!("SIZE not supported for unmanaged clusters");
6247 }
6248 if disk.is_some() {
6249 sql_bail!("DISK not supported for unmanaged clusters");
6250 }
6251 if schedule.is_some()
6252 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6253 {
6254 sql_bail!(
6255 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6256 );
6257 }
6258 if let Some(current_schedule) = cluster.schedule() {
6259 if !matches!(current_schedule, ClusterSchedule::Manual)
6260 && schedule.is_none()
6261 {
6262 sql_bail!(
6263 "when switching a cluster to unmanaged, if the managed \
6264 cluster's SCHEDULE is anything other than MANUAL, you have to \
6265 explicitly set the SCHEDULE to MANUAL"
6266 );
6267 }
6268 }
6269 }
6270 }
6271
6272 let mut replicas = vec![];
6273 for ReplicaDefinition { name, options } in
6274 replica_defs.into_iter().flat_map(Vec::into_iter)
6275 {
6276 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6277 }
6278
6279 if let Some(managed) = managed {
6280 options.managed = AlterOptionParameter::Set(managed);
6281 }
6282 if let Some(replication_factor) = replication_factor {
6283 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6284 }
6285 if let Some(size) = &size {
6286 options.size = AlterOptionParameter::Set(size.clone());
6287 }
6288 if let Some(availability_zones) = availability_zones {
6289 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6290 }
6291 if let Some(introspection_debugging) = introspection_debugging {
6292 options.introspection_debugging =
6293 AlterOptionParameter::Set(introspection_debugging);
6294 }
6295 if let Some(introspection_interval) = introspection_interval {
6296 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6297 }
6298 if disk.is_some() {
6299 let size = size.as_deref().unwrap_or_else(|| {
6303 cluster.managed_size().expect("cluster known to be managed")
6304 });
6305 if scx.catalog.is_cluster_size_cc(size) {
6306 sql_bail!(
6307 "DISK option not supported for modern cluster sizes because disk is always enabled"
6308 );
6309 }
6310
6311 scx.catalog
6312 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6313 }
6314 if !replicas.is_empty() {
6315 options.replicas = AlterOptionParameter::Set(replicas);
6316 }
6317 if let Some(schedule) = schedule {
6318 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6319 }
6320 if let Some(workload_class) = workload_class {
6321 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6322 }
6323 }
6324 AlterClusterAction::ResetOptions(reset_options) => {
6325 use AlterOptionParameter::Reset;
6326 use ClusterOptionName::*;
6327
6328 if !scx.catalog.active_role_id().is_system() {
6329 if reset_options.contains(&WorkloadClass) {
6330 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6331 }
6332 }
6333
6334 for option in reset_options {
6335 match option {
6336 AvailabilityZones => options.availability_zones = Reset,
6337 Disk => scx
6338 .catalog
6339 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6340 IntrospectionInterval => options.introspection_interval = Reset,
6341 IntrospectionDebugging => options.introspection_debugging = Reset,
6342 Managed => options.managed = Reset,
6343 Replicas => options.replicas = Reset,
6344 ReplicationFactor => options.replication_factor = Reset,
6345 Size => options.size = Reset,
6346 Schedule => options.schedule = Reset,
6347 WorkloadClass => options.workload_class = Reset,
6348 }
6349 }
6350 }
6351 }
6352 Ok(Plan::AlterCluster(AlterClusterPlan {
6353 id: cluster.id(),
6354 name: cluster.name().to_string(),
6355 options,
6356 strategy: alter_strategy,
6357 }))
6358}
6359
6360pub fn describe_alter_set_cluster(
6361 _: &StatementContext,
6362 _: AlterSetClusterStatement<Aug>,
6363) -> Result<StatementDesc, PlanError> {
6364 Ok(StatementDesc::new(None))
6365}
6366
6367pub fn plan_alter_item_set_cluster(
6368 scx: &StatementContext,
6369 AlterSetClusterStatement {
6370 if_exists,
6371 set_cluster: in_cluster_name,
6372 name,
6373 object_type,
6374 }: AlterSetClusterStatement<Aug>,
6375) -> Result<Plan, PlanError> {
6376 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6377
6378 let object_type = object_type.into();
6379
6380 match object_type {
6382 ObjectType::MaterializedView => {}
6383 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6384 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6385 }
6386 _ => {
6387 bail_never_supported!(
6388 format!("ALTER {object_type} SET CLUSTER"),
6389 "sql/alter-set-cluster/",
6390 format!("{object_type} has no associated cluster")
6391 )
6392 }
6393 }
6394
6395 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6396
6397 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6398 Some(entry) => {
6399 let current_cluster = entry.cluster_id();
6400 let Some(current_cluster) = current_cluster else {
6401 sql_bail!("No cluster associated with {name}");
6402 };
6403
6404 if current_cluster == in_cluster.id() {
6405 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6406 } else {
6407 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6408 id: entry.id(),
6409 set_cluster: in_cluster.id(),
6410 }))
6411 }
6412 }
6413 None => {
6414 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6415 name: name.to_ast_string_simple(),
6416 object_type,
6417 });
6418
6419 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6420 }
6421 }
6422}
6423
6424pub fn describe_alter_object_rename(
6425 _: &StatementContext,
6426 _: AlterObjectRenameStatement,
6427) -> Result<StatementDesc, PlanError> {
6428 Ok(StatementDesc::new(None))
6429}
6430
6431pub fn plan_alter_object_rename(
6432 scx: &mut StatementContext,
6433 AlterObjectRenameStatement {
6434 name,
6435 object_type,
6436 to_item_name,
6437 if_exists,
6438 }: AlterObjectRenameStatement,
6439) -> Result<Plan, PlanError> {
6440 let object_type = object_type.into();
6441 match (object_type, name) {
6442 (
6443 ObjectType::View
6444 | ObjectType::MaterializedView
6445 | ObjectType::Table
6446 | ObjectType::Source
6447 | ObjectType::Index
6448 | ObjectType::Sink
6449 | ObjectType::Secret
6450 | ObjectType::Connection,
6451 UnresolvedObjectName::Item(name),
6452 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6453 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6454 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6455 }
6456 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6457 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6458 }
6459 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6460 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6461 }
6462 (object_type, name) => {
6463 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6464 }
6465 }
6466}
6467
6468pub fn plan_alter_schema_rename(
6469 scx: &mut StatementContext,
6470 name: UnresolvedSchemaName,
6471 to_schema_name: Ident,
6472 if_exists: bool,
6473) -> Result<Plan, PlanError> {
6474 let normalized = normalize::unresolved_schema_name(name.clone())?;
6478 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6479 sql_bail!(
6480 "cannot rename schemas in the ambient database: {:?}",
6481 mz_repr::namespaces::MZ_TEMP_SCHEMA
6482 );
6483 }
6484
6485 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6486 let object_type = ObjectType::Schema;
6487 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6488 name: name.to_ast_string_simple(),
6489 object_type,
6490 });
6491 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6492 };
6493
6494 if scx
6496 .resolve_schema_in_database(&db_spec, &to_schema_name)
6497 .is_ok()
6498 {
6499 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6500 to_schema_name.clone().into_string(),
6501 )));
6502 }
6503
6504 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6506 if schema.id().is_system() {
6507 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6508 }
6509
6510 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6511 cur_schema_spec: (db_spec, schema_spec),
6512 new_schema_name: to_schema_name.into_string(),
6513 }))
6514}
6515
6516pub fn plan_alter_schema_swap<F>(
6517 scx: &mut StatementContext,
6518 name_a: UnresolvedSchemaName,
6519 name_b: Ident,
6520 gen_temp_suffix: F,
6521) -> Result<Plan, PlanError>
6522where
6523 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6524{
6525 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6529 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6530 {
6531 sql_bail!("cannot swap schemas that are in the ambient database");
6532 }
6533 let name_b_str = normalize::ident_ref(&name_b);
6535 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6536 sql_bail!("cannot swap schemas that are in the ambient database");
6537 }
6538
6539 let schema_a = scx.resolve_schema(name_a.clone())?;
6540
6541 let db_spec = schema_a.database().clone();
6542 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6543 sql_bail!("cannot swap schemas that are in the ambient database");
6544 };
6545 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6546
6547 if schema_a.id().is_system() || schema_b.id().is_system() {
6549 bail_never_supported!("swapping a system schema".to_string())
6550 }
6551
6552 let check = |temp_suffix: &str| {
6556 let mut temp_name = ident!("mz_schema_swap_");
6557 temp_name.append_lossy(temp_suffix);
6558 scx.resolve_schema_in_database(&db_spec, &temp_name)
6559 .is_err()
6560 };
6561 let temp_suffix = gen_temp_suffix(&check)?;
6562 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6563
6564 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6565 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6566 schema_a_name: schema_a.name().schema.to_string(),
6567 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6568 schema_b_name: schema_b.name().schema.to_string(),
6569 name_temp,
6570 }))
6571}
6572
6573pub fn plan_alter_item_rename(
6574 scx: &mut StatementContext,
6575 object_type: ObjectType,
6576 name: UnresolvedItemName,
6577 to_item_name: Ident,
6578 if_exists: bool,
6579) -> Result<Plan, PlanError> {
6580 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6581 Ok(r) => r,
6582 Err(PlanError::MismatchedObjectType {
6584 name,
6585 is_type: ObjectType::MaterializedView,
6586 expected_type: ObjectType::View,
6587 }) => {
6588 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6589 }
6590 e => e?,
6591 };
6592
6593 match resolved {
6594 Some(entry) => {
6595 let full_name = scx.catalog.resolve_full_name(entry.name());
6596 let item_type = entry.item_type();
6597
6598 let proposed_name = QualifiedItemName {
6599 qualifiers: entry.name().qualifiers.clone(),
6600 item: to_item_name.clone().into_string(),
6601 };
6602
6603 let conflicting_type_exists;
6607 let conflicting_item_exists;
6608 if item_type == CatalogItemType::Type {
6609 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6610 conflicting_item_exists = scx
6611 .catalog
6612 .get_item_by_name(&proposed_name)
6613 .map(|item| item.item_type().conflicts_with_type())
6614 .unwrap_or(false);
6615 } else {
6616 conflicting_type_exists = item_type.conflicts_with_type()
6617 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6618 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6619 };
6620 if conflicting_type_exists || conflicting_item_exists {
6621 sql_bail!("catalog item '{}' already exists", to_item_name);
6622 }
6623
6624 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6625 id: entry.id(),
6626 current_full_name: full_name,
6627 to_name: normalize::ident(to_item_name),
6628 object_type,
6629 }))
6630 }
6631 None => {
6632 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6633 name: name.to_ast_string_simple(),
6634 object_type,
6635 });
6636
6637 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6638 }
6639 }
6640}
6641
6642pub fn plan_alter_cluster_rename(
6643 scx: &mut StatementContext,
6644 object_type: ObjectType,
6645 name: Ident,
6646 to_name: Ident,
6647 if_exists: bool,
6648) -> Result<Plan, PlanError> {
6649 match resolve_cluster(scx, &name, if_exists)? {
6650 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6651 id: entry.id(),
6652 name: entry.name().to_string(),
6653 to_name: ident(to_name),
6654 })),
6655 None => {
6656 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6657 name: name.to_ast_string_simple(),
6658 object_type,
6659 });
6660
6661 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6662 }
6663 }
6664}
6665
6666pub fn plan_alter_cluster_swap<F>(
6667 scx: &mut StatementContext,
6668 name_a: Ident,
6669 name_b: Ident,
6670 gen_temp_suffix: F,
6671) -> Result<Plan, PlanError>
6672where
6673 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6674{
6675 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6676 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6677
6678 let check = |temp_suffix: &str| {
6679 let mut temp_name = ident!("mz_schema_swap_");
6680 temp_name.append_lossy(temp_suffix);
6681 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6682 Err(CatalogError::UnknownCluster(_)) => true,
6684 Ok(_) | Err(_) => false,
6686 }
6687 };
6688 let temp_suffix = gen_temp_suffix(&check)?;
6689 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6690
6691 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6692 id_a: cluster_a.id(),
6693 id_b: cluster_b.id(),
6694 name_a: name_a.into_string(),
6695 name_b: name_b.into_string(),
6696 name_temp,
6697 }))
6698}
6699
6700pub fn plan_alter_cluster_replica_rename(
6701 scx: &mut StatementContext,
6702 object_type: ObjectType,
6703 name: QualifiedReplica,
6704 to_item_name: Ident,
6705 if_exists: bool,
6706) -> Result<Plan, PlanError> {
6707 match resolve_cluster_replica(scx, &name, if_exists)? {
6708 Some((cluster, replica)) => {
6709 ensure_cluster_is_not_managed(scx, cluster.id())?;
6710 Ok(Plan::AlterClusterReplicaRename(
6711 AlterClusterReplicaRenamePlan {
6712 cluster_id: cluster.id(),
6713 replica_id: replica,
6714 name: QualifiedReplica {
6715 cluster: Ident::new(cluster.name())?,
6716 replica: name.replica,
6717 },
6718 to_name: normalize::ident(to_item_name),
6719 },
6720 ))
6721 }
6722 None => {
6723 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6724 name: name.to_ast_string_simple(),
6725 object_type,
6726 });
6727
6728 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6729 }
6730 }
6731}
6732
6733pub fn describe_alter_object_swap(
6734 _: &StatementContext,
6735 _: AlterObjectSwapStatement,
6736) -> Result<StatementDesc, PlanError> {
6737 Ok(StatementDesc::new(None))
6738}
6739
6740pub fn plan_alter_object_swap(
6741 scx: &mut StatementContext,
6742 stmt: AlterObjectSwapStatement,
6743) -> Result<Plan, PlanError> {
6744 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6745
6746 let AlterObjectSwapStatement {
6747 object_type,
6748 name_a,
6749 name_b,
6750 } = stmt;
6751 let object_type = object_type.into();
6752
6753 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6755 let mut attempts = 0;
6756 let name_temp = loop {
6757 attempts += 1;
6758 if attempts > 10 {
6759 tracing::warn!("Unable to generate temp id for swapping");
6760 sql_bail!("unable to swap!");
6761 }
6762
6763 let short_id = mz_ore::id_gen::temp_id();
6765 if check_fn(&short_id) {
6766 break short_id;
6767 }
6768 };
6769
6770 Ok(name_temp)
6771 };
6772
6773 match (object_type, name_a, name_b) {
6774 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6775 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6776 }
6777 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6778 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6779 }
6780 (object_type, _, _) => Err(PlanError::Unsupported {
6781 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6782 discussion_no: None,
6783 }),
6784 }
6785}
6786
6787pub fn describe_alter_retain_history(
6788 _: &StatementContext,
6789 _: AlterRetainHistoryStatement<Aug>,
6790) -> Result<StatementDesc, PlanError> {
6791 Ok(StatementDesc::new(None))
6792}
6793
6794pub fn plan_alter_retain_history(
6795 scx: &StatementContext,
6796 AlterRetainHistoryStatement {
6797 object_type,
6798 if_exists,
6799 name,
6800 history,
6801 }: AlterRetainHistoryStatement<Aug>,
6802) -> Result<Plan, PlanError> {
6803 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6804}
6805
6806fn alter_retain_history(
6807 scx: &StatementContext,
6808 object_type: ObjectType,
6809 if_exists: bool,
6810 name: UnresolvedObjectName,
6811 history: Option<WithOptionValue<Aug>>,
6812) -> Result<Plan, PlanError> {
6813 let name = match (object_type, name) {
6814 (
6815 ObjectType::View
6817 | ObjectType::MaterializedView
6818 | ObjectType::Table
6819 | ObjectType::Source
6820 | ObjectType::Index,
6821 UnresolvedObjectName::Item(name),
6822 ) => name,
6823 (object_type, _) => {
6824 sql_bail!("{object_type} does not support RETAIN HISTORY")
6825 }
6826 };
6827 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6828 Some(entry) => {
6829 let full_name = scx.catalog.resolve_full_name(entry.name());
6830 let item_type = entry.item_type();
6831
6832 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6834 return Err(PlanError::AlterViewOnMaterializedView(
6835 full_name.to_string(),
6836 ));
6837 } else if object_type == ObjectType::View {
6838 sql_bail!("{object_type} does not support RETAIN HISTORY")
6839 } else if object_type != item_type {
6840 sql_bail!(
6841 "\"{}\" is a {} not a {}",
6842 full_name,
6843 entry.item_type(),
6844 format!("{object_type}").to_lowercase()
6845 )
6846 }
6847
6848 let (value, lcw) = match &history {
6850 Some(WithOptionValue::RetainHistoryFor(value)) => {
6851 let window = OptionalDuration::try_from_value(value.clone())?;
6852 (Some(value.clone()), window.0)
6853 }
6854 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6856 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6857 };
6858 let window = plan_retain_history(scx, lcw)?;
6859
6860 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6861 id: entry.id(),
6862 value,
6863 window,
6864 object_type,
6865 }))
6866 }
6867 None => {
6868 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6869 name: name.to_ast_string_simple(),
6870 object_type,
6871 });
6872
6873 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6874 }
6875 }
6876}
6877
6878pub fn describe_alter_secret_options(
6879 _: &StatementContext,
6880 _: AlterSecretStatement<Aug>,
6881) -> Result<StatementDesc, PlanError> {
6882 Ok(StatementDesc::new(None))
6883}
6884
6885pub fn plan_alter_secret(
6886 scx: &mut StatementContext,
6887 stmt: AlterSecretStatement<Aug>,
6888) -> Result<Plan, PlanError> {
6889 let AlterSecretStatement {
6890 name,
6891 if_exists,
6892 value,
6893 } = stmt;
6894 let object_type = ObjectType::Secret;
6895 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6896 Some(entry) => entry.id(),
6897 None => {
6898 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6899 name: name.to_string(),
6900 object_type,
6901 });
6902
6903 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6904 }
6905 };
6906
6907 let secret_as = query::plan_secret_as(scx, value)?;
6908
6909 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6910}
6911
6912pub fn describe_alter_connection(
6913 _: &StatementContext,
6914 _: AlterConnectionStatement<Aug>,
6915) -> Result<StatementDesc, PlanError> {
6916 Ok(StatementDesc::new(None))
6917}
6918
6919generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6920
6921pub fn plan_alter_connection(
6922 scx: &StatementContext,
6923 stmt: AlterConnectionStatement<Aug>,
6924) -> Result<Plan, PlanError> {
6925 let AlterConnectionStatement {
6926 name,
6927 if_exists,
6928 actions,
6929 with_options,
6930 } = stmt;
6931 let conn_name = normalize::unresolved_item_name(name)?;
6932 let entry = match scx.catalog.resolve_item(&conn_name) {
6933 Ok(entry) => entry,
6934 Err(_) if if_exists => {
6935 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6936 name: conn_name.to_string(),
6937 object_type: ObjectType::Sink,
6938 });
6939
6940 return Ok(Plan::AlterNoop(AlterNoopPlan {
6941 object_type: ObjectType::Connection,
6942 }));
6943 }
6944 Err(e) => return Err(e.into()),
6945 };
6946
6947 let connection = entry.connection()?;
6948
6949 if actions
6950 .iter()
6951 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6952 {
6953 if actions.len() > 1 {
6954 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6955 }
6956
6957 if !with_options.is_empty() {
6958 sql_bail!(
6959 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6960 with_options
6961 .iter()
6962 .map(|o| o.to_ast_string_simple())
6963 .join(", ")
6964 );
6965 }
6966
6967 if !matches!(connection, Connection::Ssh(_)) {
6968 sql_bail!(
6969 "{} is not an SSH connection",
6970 scx.catalog.resolve_full_name(entry.name())
6971 )
6972 }
6973
6974 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6975 id: entry.id(),
6976 action: crate::plan::AlterConnectionAction::RotateKeys,
6977 }));
6978 }
6979
6980 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6981 if options.validate.is_some() {
6982 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6983 }
6984
6985 let validate = match options.validate {
6986 Some(val) => val,
6987 None => {
6988 scx.catalog
6989 .system_vars()
6990 .enable_default_connection_validation()
6991 && connection.validate_by_default()
6992 }
6993 };
6994
6995 let connection_type = match connection {
6996 Connection::Aws(_) => CreateConnectionType::Aws,
6997 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6998 Connection::Kafka(_) => CreateConnectionType::Kafka,
6999 Connection::Csr(_) => CreateConnectionType::Csr,
7000 Connection::Postgres(_) => CreateConnectionType::Postgres,
7001 Connection::Ssh(_) => CreateConnectionType::Ssh,
7002 Connection::MySql(_) => CreateConnectionType::MySql,
7003 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7004 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7005 };
7006
7007 let specified_options: BTreeSet<_> = actions
7009 .iter()
7010 .map(|action: &AlterConnectionAction<Aug>| match action {
7011 AlterConnectionAction::SetOption(option) => option.name.clone(),
7012 AlterConnectionAction::DropOption(name) => name.clone(),
7013 AlterConnectionAction::RotateKeys => unreachable!(),
7014 })
7015 .collect();
7016
7017 for invalid in INALTERABLE_OPTIONS {
7018 if specified_options.contains(invalid) {
7019 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7020 }
7021 }
7022
7023 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7024
7025 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
7027 actions.into_iter().partition_map(|action| match action {
7028 AlterConnectionAction::SetOption(option) => Either::Left(option),
7029 AlterConnectionAction::DropOption(name) => Either::Right(name),
7030 AlterConnectionAction::RotateKeys => unreachable!(),
7031 });
7032
7033 let set_options: BTreeMap<_, _> = set_options_vec
7034 .clone()
7035 .into_iter()
7036 .map(|option| (option.name, option.value))
7037 .collect();
7038
7039 let connection_options_extracted =
7043 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7044
7045 let duplicates: Vec<_> = connection_options_extracted
7046 .seen
7047 .intersection(&drop_options)
7048 .collect();
7049
7050 if !duplicates.is_empty() {
7051 sql_bail!(
7052 "cannot both SET and DROP/RESET options {}",
7053 duplicates
7054 .iter()
7055 .map(|option| option.to_string())
7056 .join(", ")
7057 )
7058 }
7059
7060 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7061 let set_options_count = mutually_exclusive_options
7062 .iter()
7063 .filter(|o| set_options.contains_key(o))
7064 .count();
7065 let drop_options_count = mutually_exclusive_options
7066 .iter()
7067 .filter(|o| drop_options.contains(o))
7068 .count();
7069
7070 if set_options_count > 0 && drop_options_count > 0 {
7072 sql_bail!(
7073 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7074 connection_type,
7075 mutually_exclusive_options
7076 .iter()
7077 .map(|option| option.to_string())
7078 .join(", ")
7079 )
7080 }
7081
7082 if set_options_count > 0 || drop_options_count > 0 {
7087 drop_options.extend(mutually_exclusive_options.iter().cloned());
7088 }
7089
7090 }
7093
7094 Ok(Plan::AlterConnection(AlterConnectionPlan {
7095 id: entry.id(),
7096 action: crate::plan::AlterConnectionAction::AlterOptions {
7097 set_options,
7098 drop_options,
7099 validate,
7100 },
7101 }))
7102}
7103
7104pub fn describe_alter_sink(
7105 _: &StatementContext,
7106 _: AlterSinkStatement<Aug>,
7107) -> Result<StatementDesc, PlanError> {
7108 Ok(StatementDesc::new(None))
7109}
7110
7111pub fn plan_alter_sink(
7112 scx: &mut StatementContext,
7113 stmt: AlterSinkStatement<Aug>,
7114) -> Result<Plan, PlanError> {
7115 let AlterSinkStatement {
7116 sink_name,
7117 if_exists,
7118 action,
7119 } = stmt;
7120
7121 let object_type = ObjectType::Sink;
7122 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7123
7124 let Some(item) = item else {
7125 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7126 name: sink_name.to_string(),
7127 object_type,
7128 });
7129
7130 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7131 };
7132 let item = item.at_version(RelationVersionSelector::Latest);
7134
7135 match action {
7136 AlterSinkAction::ChangeRelation(new_from) => {
7137 let create_sql = item.create_sql();
7139 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7140 let [stmt]: [StatementParseResult; 1] = stmts
7141 .try_into()
7142 .expect("create sql of sink was not exactly one statement");
7143 let Statement::CreateSink(stmt) = stmt.ast else {
7144 unreachable!("invalid create SQL for sink item");
7145 };
7146
7147 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7149 stmt.from = new_from;
7150
7151 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7153 unreachable!("invalid plan for CREATE SINK statement");
7154 };
7155
7156 plan.sink.version += 1;
7157
7158 Ok(Plan::AlterSink(AlterSinkPlan {
7159 item_id: item.id(),
7160 global_id: item.global_id(),
7161 sink: plan.sink,
7162 with_snapshot: plan.with_snapshot,
7163 in_cluster: plan.in_cluster,
7164 }))
7165 }
7166 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7167 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7168 }
7169}
7170
7171pub fn describe_alter_source(
7172 _: &StatementContext,
7173 _: AlterSourceStatement<Aug>,
7174) -> Result<StatementDesc, PlanError> {
7175 Ok(StatementDesc::new(None))
7177}
7178
7179generate_extracted_config!(
7180 AlterSourceAddSubsourceOption,
7181 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7182 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7183 (Details, String)
7184);
7185
7186pub fn plan_alter_source(
7187 scx: &mut StatementContext,
7188 stmt: AlterSourceStatement<Aug>,
7189) -> Result<Plan, PlanError> {
7190 let AlterSourceStatement {
7191 source_name,
7192 if_exists,
7193 action,
7194 } = stmt;
7195 let object_type = ObjectType::Source;
7196
7197 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7198 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7199 name: source_name.to_string(),
7200 object_type,
7201 });
7202
7203 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7204 }
7205
7206 match action {
7207 AlterSourceAction::SetOptions(options) => {
7208 let mut options = options.into_iter();
7209 let option = options.next().unwrap();
7210 if option.name == CreateSourceOptionName::RetainHistory {
7211 if options.next().is_some() {
7212 sql_bail!("RETAIN HISTORY must be only option");
7213 }
7214 return alter_retain_history(
7215 scx,
7216 object_type,
7217 if_exists,
7218 UnresolvedObjectName::Item(source_name),
7219 option.value,
7220 );
7221 }
7222 sql_bail!(
7225 "Cannot modify the {} of a SOURCE.",
7226 option.name.to_ast_string_simple()
7227 );
7228 }
7229 AlterSourceAction::ResetOptions(reset) => {
7230 let mut options = reset.into_iter();
7231 let option = options.next().unwrap();
7232 if option == CreateSourceOptionName::RetainHistory {
7233 if options.next().is_some() {
7234 sql_bail!("RETAIN HISTORY must be only option");
7235 }
7236 return alter_retain_history(
7237 scx,
7238 object_type,
7239 if_exists,
7240 UnresolvedObjectName::Item(source_name),
7241 None,
7242 );
7243 }
7244 sql_bail!(
7245 "Cannot modify the {} of a SOURCE.",
7246 option.to_ast_string_simple()
7247 );
7248 }
7249 AlterSourceAction::DropSubsources { .. } => {
7250 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7251 }
7252 AlterSourceAction::AddSubsources { .. } => {
7253 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7254 }
7255 AlterSourceAction::RefreshReferences => {
7256 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7257 }
7258 };
7259}
7260
7261pub fn describe_alter_system_set(
7262 _: &StatementContext,
7263 _: AlterSystemSetStatement,
7264) -> Result<StatementDesc, PlanError> {
7265 Ok(StatementDesc::new(None))
7266}
7267
7268pub fn plan_alter_system_set(
7269 _: &StatementContext,
7270 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7271) -> Result<Plan, PlanError> {
7272 let name = name.to_string();
7273 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7274 name,
7275 value: scl::plan_set_variable_to(to)?,
7276 }))
7277}
7278
7279pub fn describe_alter_system_reset(
7280 _: &StatementContext,
7281 _: AlterSystemResetStatement,
7282) -> Result<StatementDesc, PlanError> {
7283 Ok(StatementDesc::new(None))
7284}
7285
7286pub fn plan_alter_system_reset(
7287 _: &StatementContext,
7288 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7289) -> Result<Plan, PlanError> {
7290 let name = name.to_string();
7291 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7292}
7293
7294pub fn describe_alter_system_reset_all(
7295 _: &StatementContext,
7296 _: AlterSystemResetAllStatement,
7297) -> Result<StatementDesc, PlanError> {
7298 Ok(StatementDesc::new(None))
7299}
7300
7301pub fn plan_alter_system_reset_all(
7302 _: &StatementContext,
7303 _: AlterSystemResetAllStatement,
7304) -> Result<Plan, PlanError> {
7305 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7306}
7307
7308pub fn describe_alter_role(
7309 _: &StatementContext,
7310 _: AlterRoleStatement<Aug>,
7311) -> Result<StatementDesc, PlanError> {
7312 Ok(StatementDesc::new(None))
7313}
7314
7315pub fn plan_alter_role(
7316 scx: &StatementContext,
7317 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7318) -> Result<Plan, PlanError> {
7319 let option = match option {
7320 AlterRoleOption::Attributes(attrs) => {
7321 let attrs = plan_role_attributes(attrs, scx)?;
7322 PlannedAlterRoleOption::Attributes(attrs)
7323 }
7324 AlterRoleOption::Variable(variable) => {
7325 let var = plan_role_variable(variable)?;
7326 PlannedAlterRoleOption::Variable(var)
7327 }
7328 };
7329
7330 Ok(Plan::AlterRole(AlterRolePlan {
7331 id: name.id,
7332 name: name.name,
7333 option,
7334 }))
7335}
7336
7337pub fn describe_alter_table_add_column(
7338 _: &StatementContext,
7339 _: AlterTableAddColumnStatement<Aug>,
7340) -> Result<StatementDesc, PlanError> {
7341 Ok(StatementDesc::new(None))
7342}
7343
7344pub fn plan_alter_table_add_column(
7345 scx: &StatementContext,
7346 stmt: AlterTableAddColumnStatement<Aug>,
7347) -> Result<Plan, PlanError> {
7348 let AlterTableAddColumnStatement {
7349 if_exists,
7350 name,
7351 if_col_not_exist,
7352 column_name,
7353 data_type,
7354 } = stmt;
7355 let object_type = ObjectType::Table;
7356
7357 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7358
7359 let (relation_id, item_name, desc) =
7360 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7361 Some(item) => {
7362 let item_name = scx.catalog.resolve_full_name(item.name());
7364 let item = item.at_version(RelationVersionSelector::Latest);
7365 let desc = item.relation_desc().expect("table has desc").into_owned();
7366 (item.id(), item_name, desc)
7367 }
7368 None => {
7369 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7370 name: name.to_ast_string_simple(),
7371 object_type,
7372 });
7373 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7374 }
7375 };
7376
7377 let column_name = ColumnName::from(column_name.as_str());
7378 if desc.get_by_name(&column_name).is_some() {
7379 if if_col_not_exist {
7380 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7381 column_name: column_name.to_string(),
7382 object_name: item_name.item,
7383 });
7384 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7385 } else {
7386 return Err(PlanError::ColumnAlreadyExists {
7387 column_name,
7388 object_name: item_name.item,
7389 });
7390 }
7391 }
7392
7393 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7394 let column_type = scalar_type.nullable(true);
7396 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7398
7399 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7400 relation_id,
7401 column_name,
7402 column_type,
7403 raw_sql_type,
7404 }))
7405}
7406
7407pub fn describe_alter_materialized_view_apply_replacement(
7408 _: &StatementContext,
7409 _: AlterMaterializedViewApplyReplacementStatement,
7410) -> Result<StatementDesc, PlanError> {
7411 Ok(StatementDesc::new(None))
7412}
7413
7414pub fn plan_alter_materialized_view_apply_replacement(
7415 scx: &StatementContext,
7416 stmt: AlterMaterializedViewApplyReplacementStatement,
7417) -> Result<Plan, PlanError> {
7418 let AlterMaterializedViewApplyReplacementStatement {
7419 if_exists,
7420 name,
7421 replacement_name,
7422 } = stmt;
7423
7424 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7425
7426 let object_type = ObjectType::MaterializedView;
7427 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7428 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7429 name: name.to_ast_string_simple(),
7430 object_type,
7431 });
7432 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7433 };
7434
7435 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7436 .expect("if_exists not set");
7437
7438 if replacement.replacement_target() != Some(mv.id()) {
7439 return Err(PlanError::InvalidReplacement {
7440 item_type: mv.item_type(),
7441 item_name: scx.catalog.minimal_qualification(mv.name()),
7442 replacement_type: replacement.item_type(),
7443 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7444 });
7445 }
7446
7447 Ok(Plan::AlterMaterializedViewApplyReplacement(
7448 AlterMaterializedViewApplyReplacementPlan {
7449 id: mv.id(),
7450 replacement_id: replacement.id(),
7451 },
7452 ))
7453}
7454
7455pub fn describe_comment(
7456 _: &StatementContext,
7457 _: CommentStatement<Aug>,
7458) -> Result<StatementDesc, PlanError> {
7459 Ok(StatementDesc::new(None))
7460}
7461
7462pub fn plan_comment(
7463 scx: &mut StatementContext,
7464 stmt: CommentStatement<Aug>,
7465) -> Result<Plan, PlanError> {
7466 const MAX_COMMENT_LENGTH: usize = 1024;
7467
7468 let CommentStatement { object, comment } = stmt;
7469
7470 if let Some(c) = &comment {
7472 if c.len() > 1024 {
7473 return Err(PlanError::CommentTooLong {
7474 length: c.len(),
7475 max_size: MAX_COMMENT_LENGTH,
7476 });
7477 }
7478 }
7479
7480 let (object_id, column_pos) = match &object {
7481 com_ty @ CommentObjectType::Table { name }
7482 | com_ty @ CommentObjectType::View { name }
7483 | com_ty @ CommentObjectType::MaterializedView { name }
7484 | com_ty @ CommentObjectType::Index { name }
7485 | com_ty @ CommentObjectType::Func { name }
7486 | com_ty @ CommentObjectType::Connection { name }
7487 | com_ty @ CommentObjectType::Source { name }
7488 | com_ty @ CommentObjectType::Sink { name }
7489 | com_ty @ CommentObjectType::Secret { name }
7490 | com_ty @ CommentObjectType::ContinualTask { name } => {
7491 let item = scx.get_item_by_resolved_name(name)?;
7492 match (com_ty, item.item_type()) {
7493 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7494 (CommentObjectId::Table(item.id()), None)
7495 }
7496 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7497 (CommentObjectId::View(item.id()), None)
7498 }
7499 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7500 (CommentObjectId::MaterializedView(item.id()), None)
7501 }
7502 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7503 (CommentObjectId::Index(item.id()), None)
7504 }
7505 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7506 (CommentObjectId::Func(item.id()), None)
7507 }
7508 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7509 (CommentObjectId::Connection(item.id()), None)
7510 }
7511 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7512 (CommentObjectId::Source(item.id()), None)
7513 }
7514 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7515 (CommentObjectId::Sink(item.id()), None)
7516 }
7517 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7518 (CommentObjectId::Secret(item.id()), None)
7519 }
7520 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7521 (CommentObjectId::ContinualTask(item.id()), None)
7522 }
7523 (com_ty, cat_ty) => {
7524 let expected_type = match com_ty {
7525 CommentObjectType::Table { .. } => ObjectType::Table,
7526 CommentObjectType::View { .. } => ObjectType::View,
7527 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7528 CommentObjectType::Index { .. } => ObjectType::Index,
7529 CommentObjectType::Func { .. } => ObjectType::Func,
7530 CommentObjectType::Connection { .. } => ObjectType::Connection,
7531 CommentObjectType::Source { .. } => ObjectType::Source,
7532 CommentObjectType::Sink { .. } => ObjectType::Sink,
7533 CommentObjectType::Secret { .. } => ObjectType::Secret,
7534 _ => unreachable!("these are the only types we match on"),
7535 };
7536
7537 return Err(PlanError::InvalidObjectType {
7538 expected_type: SystemObjectType::Object(expected_type),
7539 actual_type: SystemObjectType::Object(cat_ty.into()),
7540 object_name: item.name().item.clone(),
7541 });
7542 }
7543 }
7544 }
7545 CommentObjectType::Type { ty } => match ty {
7546 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7547 sql_bail!("cannot comment on anonymous list or map type");
7548 }
7549 ResolvedDataType::Named { id, modifiers, .. } => {
7550 if !modifiers.is_empty() {
7551 sql_bail!("cannot comment on type with modifiers");
7552 }
7553 (CommentObjectId::Type(*id), None)
7554 }
7555 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7556 },
7557 CommentObjectType::Column { name } => {
7558 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7559 match item.item_type() {
7560 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7561 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7562 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7563 CatalogItemType::MaterializedView => {
7564 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7565 }
7566 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7567 r => {
7568 return Err(PlanError::Unsupported {
7569 feature: format!("Specifying comments on a column of {r}"),
7570 discussion_no: None,
7571 });
7572 }
7573 }
7574 }
7575 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7576 CommentObjectType::Database { name } => {
7577 (CommentObjectId::Database(*name.database_id()), None)
7578 }
7579 CommentObjectType::Schema { name } => {
7580 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7584 sql_bail!(
7585 "cannot comment on schema {} because it is a temporary schema",
7586 mz_repr::namespaces::MZ_TEMP_SCHEMA
7587 );
7588 }
7589 (
7590 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7591 None,
7592 )
7593 }
7594 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7595 CommentObjectType::ClusterReplica { name } => {
7596 let replica = scx.catalog.resolve_cluster_replica(name)?;
7597 (
7598 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7599 None,
7600 )
7601 }
7602 CommentObjectType::NetworkPolicy { name } => {
7603 (CommentObjectId::NetworkPolicy(name.id), None)
7604 }
7605 };
7606
7607 if let Some(p) = column_pos {
7613 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7614 max_num_columns: MAX_NUM_COLUMNS,
7615 req_num_columns: p,
7616 })?;
7617 }
7618
7619 Ok(Plan::Comment(CommentPlan {
7620 object_id,
7621 sub_component: column_pos,
7622 comment,
7623 }))
7624}
7625
7626pub(crate) fn resolve_cluster<'a>(
7627 scx: &'a StatementContext,
7628 name: &'a Ident,
7629 if_exists: bool,
7630) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7631 match scx.resolve_cluster(Some(name)) {
7632 Ok(cluster) => Ok(Some(cluster)),
7633 Err(_) if if_exists => Ok(None),
7634 Err(e) => Err(e),
7635 }
7636}
7637
7638pub(crate) fn resolve_cluster_replica<'a>(
7639 scx: &'a StatementContext,
7640 name: &QualifiedReplica,
7641 if_exists: bool,
7642) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7643 match scx.resolve_cluster(Some(&name.cluster)) {
7644 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7645 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7646 None if if_exists => Ok(None),
7647 None => Err(sql_err!(
7648 "CLUSTER {} has no CLUSTER REPLICA named {}",
7649 cluster.name(),
7650 name.replica.as_str().quoted(),
7651 )),
7652 },
7653 Err(_) if if_exists => Ok(None),
7654 Err(e) => Err(e),
7655 }
7656}
7657
7658pub(crate) fn resolve_database<'a>(
7659 scx: &'a StatementContext,
7660 name: &'a UnresolvedDatabaseName,
7661 if_exists: bool,
7662) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7663 match scx.resolve_database(name) {
7664 Ok(database) => Ok(Some(database)),
7665 Err(_) if if_exists => Ok(None),
7666 Err(e) => Err(e),
7667 }
7668}
7669
7670pub(crate) fn resolve_schema<'a>(
7671 scx: &'a StatementContext,
7672 name: UnresolvedSchemaName,
7673 if_exists: bool,
7674) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7675 match scx.resolve_schema(name) {
7676 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7677 Err(_) if if_exists => Ok(None),
7678 Err(e) => Err(e),
7679 }
7680}
7681
7682pub(crate) fn resolve_network_policy<'a>(
7683 scx: &'a StatementContext,
7684 name: Ident,
7685 if_exists: bool,
7686) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7687 match scx.catalog.resolve_network_policy(&name.to_string()) {
7688 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7689 id: policy.id(),
7690 name: policy.name().to_string(),
7691 })),
7692 Err(_) if if_exists => Ok(None),
7693 Err(e) => Err(e.into()),
7694 }
7695}
7696
7697pub(crate) fn resolve_item_or_type<'a>(
7698 scx: &'a StatementContext,
7699 object_type: ObjectType,
7700 name: UnresolvedItemName,
7701 if_exists: bool,
7702) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7703 let name = normalize::unresolved_item_name(name)?;
7704 let catalog_item = match object_type {
7705 ObjectType::Type => scx.catalog.resolve_type(&name),
7706 _ => scx.catalog.resolve_item(&name),
7707 };
7708
7709 match catalog_item {
7710 Ok(item) => {
7711 let is_type = ObjectType::from(item.item_type());
7712 if object_type == is_type {
7713 Ok(Some(item))
7714 } else {
7715 Err(PlanError::MismatchedObjectType {
7716 name: scx.catalog.minimal_qualification(item.name()),
7717 is_type,
7718 expected_type: object_type,
7719 })
7720 }
7721 }
7722 Err(_) if if_exists => Ok(None),
7723 Err(e) => Err(e.into()),
7724 }
7725}
7726
7727fn ensure_cluster_is_not_managed(
7729 scx: &StatementContext,
7730 cluster_id: ClusterId,
7731) -> Result<(), PlanError> {
7732 let cluster = scx.catalog.get_cluster(cluster_id);
7733 if cluster.is_managed() {
7734 Err(PlanError::ManagedCluster {
7735 cluster_name: cluster.name().to_string(),
7736 })
7737 } else {
7738 Ok(())
7739 }
7740}