1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::num::NonZeroU32;
19use std::time::Duration;
20
21use itertools::Itertools;
22use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
23use mz_arrow_util::builder::ArrowBuilder;
24use mz_auth::password::Password;
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_expr::{CollectionPlan, UnmaterializableFunc};
27use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
28use mz_ore::cast::{CastFrom, TryCastFrom};
29use mz_ore::collections::{CollectionExt, HashSet};
30use mz_ore::num::NonNeg;
31use mz_ore::soft_panic_or_log;
32use mz_ore::str::StrExt;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
42 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
43 preserves_order, strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement,
49 AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement,
50 AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement,
51 AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption,
52 AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement,
53 AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema,
54 AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName,
55 ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName,
56 ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName,
57 ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement,
58 ConnectionOption, ConnectionOptionName, CreateClusterReplicaStatement, CreateClusterStatement,
59 CreateConnectionOption, CreateConnectionOptionName, CreateConnectionStatement,
60 CreateConnectionType, CreateDatabaseStatement, CreateIndexStatement,
61 CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement,
62 CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption,
63 CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption,
64 CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption,
65 CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement,
66 CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName,
67 CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement,
68 CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
69 CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName,
70 DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format,
71 FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption,
72 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
73 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
74 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
75 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
76 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
77 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
78 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
79 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
80 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
81 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
82 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89 IcebergSinkConnection, KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType,
90 SinkEnvelope, StorageSinkConnection, iceberg_type_overrides,
91};
92use mz_storage_types::sources::encoding::{
93 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94 SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110 PostgresSourceConnection, PostgresSourcePublicationDetails,
111 ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
120 SqlServerSourceExtras, Timeline,
121};
122use prost::Message;
123
124use crate::ast::display::AstDisplay;
125use crate::catalog::{
126 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
127 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
128};
129use crate::iceberg::IcebergSinkConfigOptionExtracted;
130use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
131use crate::names::{
132 Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
133 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
134 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
135};
136use crate::normalize::{self, ident};
137use crate::plan::error::PlanError;
138use crate::plan::query::{
139 ExprContext, QueryLifetime, plan_expr, scalar_type_from_catalog, scalar_type_from_sql,
140};
141use crate::plan::scope::Scope;
142use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
143use crate::plan::statement::{StatementContext, StatementDesc, scl};
144use crate::plan::typeconv::CastContext;
145use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
146use crate::plan::{
147 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
148 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
149 AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan,
150 AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan,
151 AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan,
152 AlterSourceTimestampIntervalPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
153 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
154 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
155 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
156 CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
157 CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan,
158 CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc,
159 DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule,
160 NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice,
161 PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type,
162 VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders,
163 WebhookValidation, literal, plan_utils, query, transform_ast,
164};
165use crate::session::vars::{
166 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
167 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
168 ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS,
169};
170use crate::{names, parse};
171
172mod connection;
173
174const MAX_NUM_COLUMNS: usize = 256;
179
180static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
181 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
182
183fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
187 if partition_by.len() > desc.len() {
188 tracing::error!(
189 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
190 desc.len(),
191 partition_by.len()
192 );
193 partition_by.truncate(desc.len());
194 }
195
196 let desc_prefix = desc.iter().take(partition_by.len());
197 for (idx, ((desc_name, desc_type), partition_name)) in
198 desc_prefix.zip_eq(partition_by).enumerate()
199 {
200 let partition_name = normalize::column_name(partition_name);
201 if *desc_name != partition_name {
202 sql_bail!(
203 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
204 );
205 }
206 if !preserves_order(&desc_type.scalar_type) {
207 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
208 }
209 }
210 Ok(())
211}
212
213pub fn describe_create_database(
214 _: &StatementContext,
215 _: CreateDatabaseStatement,
216) -> Result<StatementDesc, PlanError> {
217 Ok(StatementDesc::new(None))
218}
219
220pub fn plan_create_database(
221 _: &StatementContext,
222 CreateDatabaseStatement {
223 name,
224 if_not_exists,
225 }: CreateDatabaseStatement,
226) -> Result<Plan, PlanError> {
227 Ok(Plan::CreateDatabase(CreateDatabasePlan {
228 name: normalize::ident(name.0),
229 if_not_exists,
230 }))
231}
232
233pub fn describe_create_schema(
234 _: &StatementContext,
235 _: CreateSchemaStatement,
236) -> Result<StatementDesc, PlanError> {
237 Ok(StatementDesc::new(None))
238}
239
240pub fn plan_create_schema(
241 scx: &StatementContext,
242 CreateSchemaStatement {
243 mut name,
244 if_not_exists,
245 }: CreateSchemaStatement,
246) -> Result<Plan, PlanError> {
247 if name.0.len() > 2 {
248 sql_bail!("schema name {} has more than two components", name);
249 }
250 let schema_name = normalize::ident(
251 name.0
252 .pop()
253 .expect("names always have at least one component"),
254 );
255 let database_spec = match name.0.pop() {
256 None => match scx.catalog.active_database() {
257 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
258 None => sql_bail!("no database specified and no active database"),
259 },
260 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
261 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
262 Err(_) => sql_bail!("invalid database {}", n.as_str()),
263 },
264 };
265 Ok(Plan::CreateSchema(CreateSchemaPlan {
266 database_spec,
267 schema_name,
268 if_not_exists,
269 }))
270}
271
272pub fn describe_create_table(
273 _: &StatementContext,
274 _: CreateTableStatement<Aug>,
275) -> Result<StatementDesc, PlanError> {
276 Ok(StatementDesc::new(None))
277}
278
279pub fn plan_create_table(
280 scx: &StatementContext,
281 stmt: CreateTableStatement<Aug>,
282) -> Result<Plan, PlanError> {
283 let CreateTableStatement {
284 name,
285 columns,
286 constraints,
287 if_not_exists,
288 temporary,
289 with_options,
290 } = &stmt;
291
292 let names: Vec<_> = columns
293 .iter()
294 .filter(|c| {
295 let is_versioned = c
299 .options
300 .iter()
301 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
302 !is_versioned
303 })
304 .map(|c| normalize::column_name(c.name.clone()))
305 .collect();
306
307 if let Some(dup) = names.iter().duplicates().next() {
308 sql_bail!("column {} specified more than once", dup.quoted());
309 }
310
311 let mut column_types = Vec::with_capacity(columns.len());
314 let mut defaults = Vec::with_capacity(columns.len());
315 let mut changes = BTreeMap::new();
316 let mut keys = Vec::new();
317
318 for (i, c) in columns.into_iter().enumerate() {
319 let aug_data_type = &c.data_type;
320 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
321 let mut nullable = true;
322 let mut default = Expr::null();
323 let mut versioned = false;
324 for option in &c.options {
325 match &option.option {
326 ColumnOption::NotNull => nullable = false,
327 ColumnOption::Default(expr) => {
328 let mut expr = expr.clone();
331 transform_ast::transform(scx, &mut expr)?;
332 let _ = query::plan_default_expr(scx, &expr, &ty)?;
333 default = expr.clone();
334 }
335 ColumnOption::Unique { is_primary } => {
336 keys.push(vec![i]);
337 if *is_primary {
338 nullable = false;
339 }
340 }
341 ColumnOption::Versioned { action, version } => {
342 let version = RelationVersion::from(*version);
343 versioned = true;
344
345 let name = normalize::column_name(c.name.clone());
346 let typ = ty.clone().nullable(nullable);
347
348 changes.insert(version, (action.clone(), name, typ));
349 }
350 other => {
351 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
352 }
353 }
354 }
355 if !versioned {
358 column_types.push(ty.nullable(nullable));
359 }
360 defaults.push(default);
361 }
362
363 let mut seen_primary = false;
364 'c: for constraint in constraints {
365 match constraint {
366 TableConstraint::Unique {
367 name: _,
368 columns,
369 is_primary,
370 nulls_not_distinct,
371 } => {
372 if seen_primary && *is_primary {
373 sql_bail!(
374 "multiple primary keys for table {} are not allowed",
375 name.to_ast_string_stable()
376 );
377 }
378 seen_primary = *is_primary || seen_primary;
379
380 let mut key = vec![];
381 for column in columns {
382 let column = normalize::column_name(column.clone());
383 match names.iter().position(|name| *name == column) {
384 None => sql_bail!("unknown column in constraint: {}", column),
385 Some(i) => {
386 let nullable = &mut column_types[i].nullable;
387 if *is_primary {
388 if *nulls_not_distinct {
389 sql_bail!(
390 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
391 );
392 }
393
394 *nullable = false;
395 } else if !(*nulls_not_distinct || !*nullable) {
396 break 'c;
399 }
400
401 key.push(i);
402 }
403 }
404 }
405
406 if *is_primary {
407 keys.insert(0, key);
408 } else {
409 keys.push(key);
410 }
411 }
412 TableConstraint::ForeignKey { .. } => {
413 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
416 }
417 TableConstraint::Check { .. } => {
418 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
421 }
422 }
423 }
424
425 if !keys.is_empty() {
426 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
429 }
430
431 let typ = SqlRelationType::new(column_types).with_keys(keys);
432
433 let temporary = *temporary;
434 let name = if temporary {
435 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
436 } else {
437 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
438 };
439
440 let full_name = scx.catalog.resolve_full_name(&name);
442 let partial_name = PartialItemName::from(full_name.clone());
443 if let (false, Ok(item)) = (
446 if_not_exists,
447 scx.catalog.resolve_item_or_type(&partial_name),
448 ) {
449 return Err(PlanError::ItemAlreadyExists {
450 name: full_name.to_string(),
451 item_type: item.item_type(),
452 });
453 }
454
455 let desc = RelationDesc::new(typ, names);
456 let mut desc = VersionedRelationDesc::new(desc);
457 for (version, (_action, name, typ)) in changes.into_iter() {
458 let new_version = desc.add_column(name, typ);
459 if version != new_version {
460 return Err(PlanError::InvalidTable {
461 name: full_name.item,
462 });
463 }
464 }
465
466 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
467
468 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
474 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
475
476 let compaction_window = options.iter().find_map(|o| {
477 #[allow(irrefutable_let_patterns)]
478 if let crate::plan::TableOption::RetainHistory(lcw) = o {
479 Some(lcw.clone())
480 } else {
481 None
482 }
483 });
484
485 let table = Table {
486 create_sql,
487 desc,
488 temporary,
489 compaction_window,
490 data_source: TableDataSource::TableWrites { defaults },
491 };
492 Ok(Plan::CreateTable(CreateTablePlan {
493 name,
494 table,
495 if_not_exists: *if_not_exists,
496 }))
497}
498
499pub fn describe_create_table_from_source(
500 _: &StatementContext,
501 _: CreateTableFromSourceStatement<Aug>,
502) -> Result<StatementDesc, PlanError> {
503 Ok(StatementDesc::new(None))
504}
505
506pub fn describe_create_webhook_source(
507 _: &StatementContext,
508 _: CreateWebhookSourceStatement<Aug>,
509) -> Result<StatementDesc, PlanError> {
510 Ok(StatementDesc::new(None))
511}
512
513pub fn describe_create_source(
514 _: &StatementContext,
515 _: CreateSourceStatement<Aug>,
516) -> Result<StatementDesc, PlanError> {
517 Ok(StatementDesc::new(None))
518}
519
520pub fn describe_create_subsource(
521 _: &StatementContext,
522 _: CreateSubsourceStatement<Aug>,
523) -> Result<StatementDesc, PlanError> {
524 Ok(StatementDesc::new(None))
525}
526
527generate_extracted_config!(
528 CreateSourceOption,
529 (TimestampInterval, Duration),
530 (RetainHistory, OptionalDuration)
531);
532
533generate_extracted_config!(
534 PgConfigOption,
535 (Details, String),
536 (Publication, String),
537 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
538 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
539);
540
541generate_extracted_config!(
542 MySqlConfigOption,
543 (Details, String),
544 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
545 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
546);
547
548generate_extracted_config!(
549 SqlServerConfigOption,
550 (Details, String),
551 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
552 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
553);
554
555pub fn plan_create_webhook_source(
556 scx: &StatementContext,
557 mut stmt: CreateWebhookSourceStatement<Aug>,
558) -> Result<Plan, PlanError> {
559 if stmt.is_table {
560 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
561 }
562
563 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
566 let create_sql =
567 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
568
569 let CreateWebhookSourceStatement {
570 name,
571 if_not_exists,
572 body_format,
573 include_headers,
574 validate_using,
575 is_table,
576 in_cluster: _,
578 } = stmt;
579
580 let validate_using = validate_using
581 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
582 .transpose()?;
583 if let Some(WebhookValidation { expression, .. }) = &validate_using {
584 if !expression.contains_column() {
587 return Err(PlanError::WebhookValidationDoesNotUseColumns);
588 }
589 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
593 return Err(PlanError::WebhookValidationNonDeterministic);
594 }
595 }
596
597 let body_format = match body_format {
598 Format::Bytes => WebhookBodyFormat::Bytes,
599 Format::Json { array } => WebhookBodyFormat::Json { array },
600 Format::Text => WebhookBodyFormat::Text,
601 ty => {
603 return Err(PlanError::Unsupported {
604 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
605 discussion_no: None,
606 });
607 }
608 };
609
610 let mut column_ty = vec![
611 SqlColumnType {
613 scalar_type: SqlScalarType::from(body_format),
614 nullable: false,
615 },
616 ];
617 let mut column_names = vec!["body".to_string()];
618
619 let mut headers = WebhookHeaders::default();
620
621 if let Some(filters) = include_headers.column {
623 column_ty.push(SqlColumnType {
624 scalar_type: SqlScalarType::Map {
625 value_type: Box::new(SqlScalarType::String),
626 custom_id: None,
627 },
628 nullable: false,
629 });
630 column_names.push("headers".to_string());
631
632 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
633 filters.into_iter().partition_map(|filter| {
634 if filter.block {
635 itertools::Either::Right(filter.header_name)
636 } else {
637 itertools::Either::Left(filter.header_name)
638 }
639 });
640 headers.header_column = Some(WebhookHeaderFilters { allow, block });
641 }
642
643 for header in include_headers.mappings {
645 let scalar_type = header
646 .use_bytes
647 .then_some(SqlScalarType::Bytes)
648 .unwrap_or(SqlScalarType::String);
649 column_ty.push(SqlColumnType {
650 scalar_type,
651 nullable: true,
652 });
653 column_names.push(header.column_name.into_string());
654
655 let column_idx = column_ty.len() - 1;
656 assert_eq!(
658 column_idx,
659 column_names.len() - 1,
660 "header column names and types don't match"
661 );
662 headers
663 .mapped_headers
664 .insert(column_idx, (header.header_name, header.use_bytes));
665 }
666
667 let mut unique_check = HashSet::with_capacity(column_names.len());
669 for name in &column_names {
670 if !unique_check.insert(name) {
671 return Err(PlanError::AmbiguousColumn(name.clone().into()));
672 }
673 }
674 if column_names.len() > MAX_NUM_COLUMNS {
675 return Err(PlanError::TooManyColumns {
676 max_num_columns: MAX_NUM_COLUMNS,
677 req_num_columns: column_names.len(),
678 });
679 }
680
681 let typ = SqlRelationType::new(column_ty);
682 let desc = RelationDesc::new(typ, column_names);
683
684 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
686 let full_name = scx.catalog.resolve_full_name(&name);
687 let partial_name = PartialItemName::from(full_name.clone());
688 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
689 return Err(PlanError::ItemAlreadyExists {
690 name: full_name.to_string(),
691 item_type: item.item_type(),
692 });
693 }
694
695 let timeline = Timeline::EpochMilliseconds;
698
699 let plan = if is_table {
700 let data_source = DataSourceDesc::Webhook {
701 validate_using,
702 body_format,
703 headers,
704 cluster_id: Some(in_cluster.id()),
705 };
706 let data_source = TableDataSource::DataSource {
707 desc: data_source,
708 timeline,
709 };
710 Plan::CreateTable(CreateTablePlan {
711 name,
712 if_not_exists,
713 table: Table {
714 create_sql,
715 desc: VersionedRelationDesc::new(desc),
716 temporary: false,
717 compaction_window: None,
718 data_source,
719 },
720 })
721 } else {
722 let data_source = DataSourceDesc::Webhook {
723 validate_using,
724 body_format,
725 headers,
726 cluster_id: None,
728 };
729 Plan::CreateSource(CreateSourcePlan {
730 name,
731 source: Source {
732 create_sql,
733 data_source,
734 desc,
735 compaction_window: None,
736 },
737 if_not_exists,
738 timeline,
739 in_cluster: Some(in_cluster.id()),
740 })
741 };
742
743 Ok(plan)
744}
745
746pub fn plan_create_source(
747 scx: &StatementContext,
748 mut stmt: CreateSourceStatement<Aug>,
749) -> Result<Plan, PlanError> {
750 let CreateSourceStatement {
751 name,
752 in_cluster: _,
753 col_names,
754 connection: source_connection,
755 envelope,
756 if_not_exists,
757 format,
758 key_constraint,
759 include_metadata,
760 with_options,
761 external_references: referenced_subsources,
762 progress_subsource,
763 } = &stmt;
764
765 mz_ore::soft_assert_or_log!(
766 referenced_subsources.is_none(),
767 "referenced subsources must be cleared in purification"
768 );
769
770 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
771 && scx.catalog.system_vars().force_source_table_syntax();
772
773 if force_source_table_syntax {
776 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
777 Err(PlanError::UseTablesForSources(
778 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
779 ))?;
780 }
781 }
782
783 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
784
785 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
786 && include_metadata
787 .iter()
788 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
789 {
790 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
792 }
793 if !matches!(
794 source_connection,
795 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
796 ) && !include_metadata.is_empty()
797 {
798 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
799 }
800
801 if !include_metadata.is_empty()
802 && !matches!(
803 envelope,
804 ast::SourceEnvelope::Upsert { .. }
805 | ast::SourceEnvelope::None
806 | ast::SourceEnvelope::Debezium
807 )
808 {
809 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
810 }
811
812 let external_connection =
813 plan_generic_source_connection(scx, source_connection, include_metadata)?;
814
815 let CreateSourceOptionExtracted {
816 timestamp_interval,
817 retain_history,
818 seen: _,
819 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
820
821 let metadata_columns_desc = match external_connection {
822 GenericSourceConnection::Kafka(KafkaSourceConnection {
823 ref metadata_columns,
824 ..
825 }) => kafka_metadata_columns_desc(metadata_columns),
826 _ => vec![],
827 };
828
829 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
831 scx,
832 &envelope,
833 format,
834 Some(external_connection.default_key_desc()),
835 external_connection.default_value_desc(),
836 include_metadata,
837 metadata_columns_desc,
838 &external_connection,
839 )?;
840 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
841
842 let names: Vec<_> = desc.iter_names().cloned().collect();
843 if let Some(dup) = names.iter().duplicates().next() {
844 sql_bail!("column {} specified more than once", dup.quoted());
845 }
846
847 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
849 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
852
853 let key_columns = columns
854 .into_iter()
855 .map(normalize::column_name)
856 .collect::<Vec<_>>();
857
858 let mut uniq = BTreeSet::new();
859 for col in key_columns.iter() {
860 if !uniq.insert(col) {
861 sql_bail!("Repeated column name in source key constraint: {}", col);
862 }
863 }
864
865 let key_indices = key_columns
866 .iter()
867 .map(|col| {
868 let name_idx = desc
869 .get_by_name(col)
870 .map(|(idx, _type)| idx)
871 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
872 if desc.get_unambiguous_name(name_idx).is_none() {
873 sql_bail!("Ambiguous column in source key constraint: {}", col);
874 }
875 Ok(name_idx)
876 })
877 .collect::<Result<Vec<_>, _>>()?;
878
879 if !desc.typ().keys.is_empty() {
880 return Err(key_constraint_err(&desc, &key_columns));
881 } else {
882 desc = desc.with_key(key_indices);
883 }
884 }
885
886 let timestamp_interval = match timestamp_interval {
887 Some(duration) => {
888 if scx.pcx.is_some() {
892 let min = scx.catalog.system_vars().min_timestamp_interval();
893 let max = scx.catalog.system_vars().max_timestamp_interval();
894 if duration < min || duration > max {
895 return Err(PlanError::InvalidTimestampInterval {
896 min,
897 max,
898 requested: duration,
899 });
900 }
901 }
902 duration
903 }
904 None => scx.catalog.system_vars().default_timestamp_interval(),
905 };
906
907 let (desc, data_source) = match progress_subsource {
908 Some(name) => {
909 let DeferredItemName::Named(name) = name else {
910 sql_bail!("[internal error] progress subsource must be named during purification");
911 };
912 let ResolvedItemName::Item { id, .. } = name else {
913 sql_bail!("[internal error] invalid target id");
914 };
915
916 let details = match external_connection {
917 GenericSourceConnection::Kafka(ref c) => {
918 SourceExportDetails::Kafka(KafkaSourceExportDetails {
919 metadata_columns: c.metadata_columns.clone(),
920 })
921 }
922 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
923 LoadGenerator::Auction
924 | LoadGenerator::Marketing
925 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
926 LoadGenerator::Counter { .. }
927 | LoadGenerator::Clock
928 | LoadGenerator::Datums
929 | LoadGenerator::KeyValue(_) => {
930 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
931 output: LoadGeneratorOutput::Default,
932 })
933 }
934 },
935 GenericSourceConnection::Postgres(_)
936 | GenericSourceConnection::MySql(_)
937 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
938 };
939
940 let data_source = DataSourceDesc::OldSyntaxIngestion {
941 desc: SourceDesc {
942 connection: external_connection,
943 timestamp_interval,
944 },
945 progress_subsource: *id,
946 data_config: SourceExportDataConfig {
947 encoding,
948 envelope: envelope.clone(),
949 },
950 details,
951 };
952 (desc, data_source)
953 }
954 None => {
955 let desc = external_connection.timestamp_desc();
956 let data_source = DataSourceDesc::Ingestion(SourceDesc {
957 connection: external_connection,
958 timestamp_interval,
959 });
960 (desc, data_source)
961 }
962 };
963
964 let if_not_exists = *if_not_exists;
965 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
966
967 let full_name = scx.catalog.resolve_full_name(&name);
969 let partial_name = PartialItemName::from(full_name.clone());
970 if let (false, Ok(item)) = (
973 if_not_exists,
974 scx.catalog.resolve_item_or_type(&partial_name),
975 ) {
976 return Err(PlanError::ItemAlreadyExists {
977 name: full_name.to_string(),
978 item_type: item.item_type(),
979 });
980 }
981
982 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
986
987 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
988
989 let timeline = match envelope {
991 SourceEnvelope::CdcV2 => {
992 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
993 }
994 _ => Timeline::EpochMilliseconds,
995 };
996
997 let compaction_window = plan_retain_history_option(scx, retain_history)?;
998
999 let source = Source {
1000 create_sql,
1001 data_source,
1002 desc,
1003 compaction_window,
1004 };
1005
1006 Ok(Plan::CreateSource(CreateSourcePlan {
1007 name,
1008 source,
1009 if_not_exists,
1010 timeline,
1011 in_cluster: Some(in_cluster.id()),
1012 }))
1013}
1014
1015pub fn plan_generic_source_connection(
1016 scx: &StatementContext<'_>,
1017 source_connection: &CreateSourceConnection<Aug>,
1018 include_metadata: &Vec<SourceIncludeMetadata>,
1019) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1020 Ok(match source_connection {
1021 CreateSourceConnection::Kafka {
1022 connection,
1023 options,
1024 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1025 scx,
1026 connection,
1027 options,
1028 include_metadata,
1029 )?),
1030 CreateSourceConnection::Postgres {
1031 connection,
1032 options,
1033 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1034 scx, connection, options,
1035 )?),
1036 CreateSourceConnection::SqlServer {
1037 connection,
1038 options,
1039 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1040 scx, connection, options,
1041 )?),
1042 CreateSourceConnection::MySql {
1043 connection,
1044 options,
1045 } => {
1046 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1047 }
1048 CreateSourceConnection::LoadGenerator { generator, options } => {
1049 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1050 scx,
1051 generator,
1052 options,
1053 include_metadata,
1054 )?)
1055 }
1056 })
1057}
1058
1059fn plan_load_generator_source_connection(
1060 scx: &StatementContext<'_>,
1061 generator: &ast::LoadGenerator,
1062 options: &Vec<LoadGeneratorOption<Aug>>,
1063 include_metadata: &Vec<SourceIncludeMetadata>,
1064) -> Result<LoadGeneratorSourceConnection, PlanError> {
1065 let load_generator =
1066 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1067 let LoadGeneratorOptionExtracted {
1068 tick_interval,
1069 as_of,
1070 up_to,
1071 ..
1072 } = options.clone().try_into()?;
1073 let tick_micros = match tick_interval {
1074 Some(interval) => Some(interval.as_micros().try_into()?),
1075 None => None,
1076 };
1077 if up_to < as_of {
1078 sql_bail!("UP TO cannot be less than AS OF");
1079 }
1080 Ok(LoadGeneratorSourceConnection {
1081 load_generator,
1082 tick_micros,
1083 as_of,
1084 up_to,
1085 })
1086}
1087
1088fn plan_mysql_source_connection(
1089 scx: &StatementContext<'_>,
1090 connection: &ResolvedItemName,
1091 options: &Vec<MySqlConfigOption<Aug>>,
1092) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1093 let connection_item = scx.get_item_by_resolved_name(connection)?;
1094 match connection_item.connection()? {
1095 Connection::MySql(connection) => connection,
1096 _ => sql_bail!(
1097 "{} is not a MySQL connection",
1098 scx.catalog.resolve_full_name(connection_item.name())
1099 ),
1100 };
1101 let MySqlConfigOptionExtracted {
1102 details,
1103 text_columns: _,
1107 exclude_columns: _,
1108 seen: _,
1109 } = options.clone().try_into()?;
1110 let details = details
1111 .as_ref()
1112 .ok_or_else(|| internal_err!("MySQL source missing details"))?;
1113 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1114 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1115 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1116 Ok(MySqlSourceConnection {
1117 connection: connection_item.id(),
1118 connection_id: connection_item.id(),
1119 details,
1120 })
1121}
1122
1123fn plan_sqlserver_source_connection(
1124 scx: &StatementContext<'_>,
1125 connection: &ResolvedItemName,
1126 options: &Vec<SqlServerConfigOption<Aug>>,
1127) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1128 let connection_item = scx.get_item_by_resolved_name(connection)?;
1129 match connection_item.connection()? {
1130 Connection::SqlServer(connection) => connection,
1131 _ => sql_bail!(
1132 "{} is not a SQL Server connection",
1133 scx.catalog.resolve_full_name(connection_item.name())
1134 ),
1135 };
1136 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1137 let details = details
1138 .as_ref()
1139 .ok_or_else(|| internal_err!("SQL Server source missing details"))?;
1140 let extras = hex::decode(details)
1141 .map_err(|e| sql_err!("{e}"))
1142 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1143 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1144 Ok(SqlServerSourceConnection {
1145 connection_id: connection_item.id(),
1146 connection: connection_item.id(),
1147 extras,
1148 })
1149}
1150
1151fn plan_postgres_source_connection(
1152 scx: &StatementContext<'_>,
1153 connection: &ResolvedItemName,
1154 options: &Vec<PgConfigOption<Aug>>,
1155) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1156 let connection_item = scx.get_item_by_resolved_name(connection)?;
1157 let PgConfigOptionExtracted {
1158 details,
1159 publication,
1160 text_columns: _,
1164 exclude_columns: _,
1168 seen: _,
1169 } = options.clone().try_into()?;
1170 let details = details
1171 .as_ref()
1172 .ok_or_else(|| internal_err!("Postgres source missing details"))?;
1173 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1174 let details =
1175 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1176 let publication_details =
1177 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1178 Ok(PostgresSourceConnection {
1179 connection: connection_item.id(),
1180 connection_id: connection_item.id(),
1181 publication: publication.ok_or_else(|| internal_err!("PUBLICATION option is required"))?,
1183 publication_details,
1184 })
1185}
1186
1187fn plan_kafka_source_connection(
1188 scx: &StatementContext<'_>,
1189 connection_name: &ResolvedItemName,
1190 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1191 include_metadata: &Vec<SourceIncludeMetadata>,
1192) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1193 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1194 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1195 sql_bail!(
1196 "{} is not a kafka connection",
1197 scx.catalog.resolve_full_name(connection_item.name())
1198 )
1199 }
1200 let KafkaSourceConfigOptionExtracted {
1201 group_id_prefix,
1202 topic,
1203 topic_metadata_refresh_interval,
1204 start_timestamp: _, start_offset,
1206 seen: _,
1207 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1208 let topic = topic.ok_or_else(|| internal_err!("TOPIC option is required"))?;
1210 let mut start_offsets = BTreeMap::new();
1211 if let Some(offsets) = start_offset {
1212 for (part, offset) in offsets.iter().enumerate() {
1213 if *offset < 0 {
1214 sql_bail!("START OFFSET must be a nonnegative integer");
1215 }
1216 start_offsets.insert(i32::try_from(part)?, *offset);
1217 }
1218 }
1219 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1220 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1223 }
1224 let metadata_columns = include_metadata
1225 .into_iter()
1226 .flat_map(|item| match item {
1227 SourceIncludeMetadata::Timestamp { alias } => {
1228 let name = match alias {
1229 Some(name) => name.to_string(),
1230 None => "timestamp".to_owned(),
1231 };
1232 Some((name, KafkaMetadataKind::Timestamp))
1233 }
1234 SourceIncludeMetadata::Partition { alias } => {
1235 let name = match alias {
1236 Some(name) => name.to_string(),
1237 None => "partition".to_owned(),
1238 };
1239 Some((name, KafkaMetadataKind::Partition))
1240 }
1241 SourceIncludeMetadata::Offset { alias } => {
1242 let name = match alias {
1243 Some(name) => name.to_string(),
1244 None => "offset".to_owned(),
1245 };
1246 Some((name, KafkaMetadataKind::Offset))
1247 }
1248 SourceIncludeMetadata::Headers { alias } => {
1249 let name = match alias {
1250 Some(name) => name.to_string(),
1251 None => "headers".to_owned(),
1252 };
1253 Some((name, KafkaMetadataKind::Headers))
1254 }
1255 SourceIncludeMetadata::Header {
1256 alias,
1257 key,
1258 use_bytes,
1259 } => Some((
1260 alias.to_string(),
1261 KafkaMetadataKind::Header {
1262 key: key.clone(),
1263 use_bytes: *use_bytes,
1264 },
1265 )),
1266 SourceIncludeMetadata::Key { .. } => {
1267 None
1269 }
1270 })
1271 .collect();
1272 Ok(KafkaSourceConnection {
1273 connection: connection_item.id(),
1274 connection_id: connection_item.id(),
1275 topic,
1276 start_offsets,
1277 group_id_prefix,
1278 topic_metadata_refresh_interval,
1279 metadata_columns,
1280 })
1281}
1282
1283fn apply_source_envelope_encoding(
1284 scx: &StatementContext,
1285 envelope: &ast::SourceEnvelope,
1286 format: &Option<FormatSpecifier<Aug>>,
1287 key_desc: Option<RelationDesc>,
1288 value_desc: RelationDesc,
1289 include_metadata: &[SourceIncludeMetadata],
1290 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1291 source_connection: &GenericSourceConnection<ReferencedConnection>,
1292) -> Result<
1293 (
1294 RelationDesc,
1295 SourceEnvelope,
1296 Option<SourceDataEncoding<ReferencedConnection>>,
1297 ),
1298 PlanError,
1299> {
1300 let encoding = match format {
1301 Some(format) => Some(get_encoding(scx, format, envelope)?),
1302 None => None,
1303 };
1304
1305 let (key_desc, value_desc) = match &encoding {
1306 Some(encoding) => {
1307 match value_desc.typ().columns() {
1310 [typ] => match typ.scalar_type {
1311 SqlScalarType::Bytes => {}
1312 _ => sql_bail!(
1313 "The schema produced by the source is incompatible with format decoding"
1314 ),
1315 },
1316 _ => sql_bail!(
1317 "The schema produced by the source is incompatible with format decoding"
1318 ),
1319 }
1320
1321 let (key_desc, value_desc) = encoding.desc()?;
1322
1323 let key_desc = key_desc.map(|desc| {
1330 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1331 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1332 if is_kafka && is_envelope_none {
1333 RelationDesc::from_names_and_types(
1334 desc.into_iter()
1335 .map(|(name, typ)| (name, typ.nullable(true))),
1336 )
1337 } else {
1338 desc
1339 }
1340 });
1341 (key_desc, value_desc)
1342 }
1343 None => (key_desc, value_desc),
1344 };
1345
1346 let key_envelope_no_encoding = matches!(
1359 source_connection,
1360 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1361 load_generator: LoadGenerator::KeyValue(_),
1362 ..
1363 })
1364 );
1365 let mut key_envelope = get_key_envelope(
1366 include_metadata,
1367 encoding.as_ref(),
1368 key_envelope_no_encoding,
1369 )?;
1370
1371 match (&envelope, &key_envelope) {
1372 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1373 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1374 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1375 ),
1376 _ => {}
1377 };
1378
1379 let envelope = match &envelope {
1388 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1390 ast::SourceEnvelope::Debezium => {
1391 let after_idx = match typecheck_debezium(&value_desc) {
1393 Ok((_before_idx, after_idx)) => Ok(after_idx),
1394 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1395 Some(DataEncoding::Avro(_)) => Err(type_err),
1396 _ => Err(sql_err!(
1397 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1398 )),
1399 },
1400 }?;
1401
1402 UnplannedSourceEnvelope::Upsert {
1403 style: UpsertStyle::Debezium { after_idx },
1404 }
1405 }
1406 ast::SourceEnvelope::Upsert {
1407 value_decode_err_policy,
1408 } => {
1409 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1410 None => {
1411 if !key_envelope_no_encoding {
1412 bail_unsupported!(format!(
1413 "UPSERT requires a key/value format: {:?}",
1414 format
1415 ))
1416 }
1417 None
1418 }
1419 Some(key_encoding) => Some(key_encoding),
1420 };
1421 if key_envelope == KeyEnvelope::None {
1424 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1425 }
1426 let style = match value_decode_err_policy.as_slice() {
1428 [] => UpsertStyle::Default(key_envelope),
1429 [SourceErrorPolicy::Inline { alias }] => {
1430 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1431 UpsertStyle::ValueErrInline {
1432 key_envelope,
1433 error_column: alias
1434 .as_ref()
1435 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1436 }
1437 }
1438 _ => {
1439 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1440 }
1441 };
1442
1443 UnplannedSourceEnvelope::Upsert { style }
1444 }
1445 ast::SourceEnvelope::CdcV2 => {
1446 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1447 match format {
1449 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1450 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1451 }
1452 UnplannedSourceEnvelope::CdcV2
1453 }
1454 };
1455
1456 let metadata_desc = included_column_desc(metadata_columns_desc);
1457 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1458
1459 Ok((desc, envelope, encoding))
1460}
1461
1462fn plan_source_export_desc(
1465 scx: &StatementContext,
1466 name: &UnresolvedItemName,
1467 columns: &Vec<ColumnDef<Aug>>,
1468 constraints: &Vec<TableConstraint<Aug>>,
1469) -> Result<RelationDesc, PlanError> {
1470 let names: Vec<_> = columns
1471 .iter()
1472 .map(|c| normalize::column_name(c.name.clone()))
1473 .collect();
1474
1475 if let Some(dup) = names.iter().duplicates().next() {
1476 sql_bail!("column {} specified more than once", dup.quoted());
1477 }
1478
1479 let mut column_types = Vec::with_capacity(columns.len());
1482 let mut keys = Vec::new();
1483
1484 for (i, c) in columns.into_iter().enumerate() {
1485 let aug_data_type = &c.data_type;
1486 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1487 let mut nullable = true;
1488 for option in &c.options {
1489 match &option.option {
1490 ColumnOption::NotNull => nullable = false,
1491 ColumnOption::Default(_) => {
1492 bail_unsupported!("Source export with default value")
1493 }
1494 ColumnOption::Unique { is_primary } => {
1495 keys.push(vec![i]);
1496 if *is_primary {
1497 nullable = false;
1498 }
1499 }
1500 other => {
1501 bail_unsupported!(format!("Source export with column constraint: {}", other))
1502 }
1503 }
1504 }
1505 column_types.push(ty.nullable(nullable));
1506 }
1507
1508 let mut seen_primary = false;
1509 'c: for constraint in constraints {
1510 match constraint {
1511 TableConstraint::Unique {
1512 name: _,
1513 columns,
1514 is_primary,
1515 nulls_not_distinct,
1516 } => {
1517 if seen_primary && *is_primary {
1518 sql_bail!(
1519 "multiple primary keys for source export {} are not allowed",
1520 name.to_ast_string_stable()
1521 );
1522 }
1523 seen_primary = *is_primary || seen_primary;
1524
1525 let mut key = vec![];
1526 for column in columns {
1527 let column = normalize::column_name(column.clone());
1528 match names.iter().position(|name| *name == column) {
1529 None => sql_bail!("unknown column in constraint: {}", column),
1530 Some(i) => {
1531 let nullable = &mut column_types[i].nullable;
1532 if *is_primary {
1533 if *nulls_not_distinct {
1534 sql_bail!(
1535 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1536 );
1537 }
1538 *nullable = false;
1539 } else if !(*nulls_not_distinct || !*nullable) {
1540 break 'c;
1543 }
1544
1545 key.push(i);
1546 }
1547 }
1548 }
1549
1550 if *is_primary {
1551 keys.insert(0, key);
1552 } else {
1553 keys.push(key);
1554 }
1555 }
1556 TableConstraint::ForeignKey { .. } => {
1557 bail_unsupported!("Source export with a foreign key")
1558 }
1559 TableConstraint::Check { .. } => {
1560 bail_unsupported!("Source export with a check constraint")
1561 }
1562 }
1563 }
1564
1565 let typ = SqlRelationType::new(column_types).with_keys(keys);
1566 let desc = RelationDesc::new(typ, names);
1567 Ok(desc)
1568}
1569
1570generate_extracted_config!(
1571 CreateSubsourceOption,
1572 (Progress, bool, Default(false)),
1573 (ExternalReference, UnresolvedItemName),
1574 (RetainHistory, OptionalDuration),
1575 (TextColumns, Vec::<Ident>, Default(vec![])),
1576 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1577 (Details, String)
1578);
1579
1580pub fn plan_create_subsource(
1581 scx: &StatementContext,
1582 stmt: CreateSubsourceStatement<Aug>,
1583) -> Result<Plan, PlanError> {
1584 let CreateSubsourceStatement {
1585 name,
1586 columns,
1587 of_source,
1588 constraints,
1589 if_not_exists,
1590 with_options,
1591 } = &stmt;
1592
1593 let CreateSubsourceOptionExtracted {
1594 progress,
1595 retain_history,
1596 external_reference,
1597 text_columns,
1598 exclude_columns,
1599 details,
1600 seen: _,
1601 } = with_options.clone().try_into()?;
1602
1603 if !(progress ^ (external_reference.is_some() && of_source.is_some())) {
1608 bail_internal!(
1609 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1610 );
1611 }
1612
1613 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1614
1615 let data_source = if let Some(source_reference) = of_source {
1616 if scx.catalog.system_vars().enable_create_table_from_source()
1619 && scx.catalog.system_vars().force_source_table_syntax()
1620 {
1621 Err(PlanError::UseTablesForSources(
1622 "CREATE SUBSOURCE".to_string(),
1623 ))?;
1624 }
1625
1626 let ingestion_id = *source_reference.item_id();
1629 let external_reference = external_reference.ok_or_else(|| {
1630 sql_err!("CREATE SUBSOURCE with REFERENCES requires EXTERNAL REFERENCE option")
1631 })?;
1632
1633 let details = details
1636 .as_ref()
1637 .ok_or_else(|| internal_err!("source-export subsource missing details"))?;
1638 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1639 let details =
1640 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1641 let details =
1642 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1643 let details = match details {
1644 SourceExportStatementDetails::Postgres { table } => {
1645 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1646 column_casts: crate::pure::postgres::generate_column_casts(
1647 scx,
1648 &table,
1649 &text_columns,
1650 )?,
1651 table,
1652 })
1653 }
1654 SourceExportStatementDetails::MySql {
1655 table,
1656 initial_gtid_set,
1657 binlog_full_metadata,
1658 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1659 table,
1660 initial_gtid_set,
1661 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1662 exclude_columns: exclude_columns
1663 .into_iter()
1664 .map(|c| c.into_string())
1665 .collect(),
1666 binlog_full_metadata,
1667 }),
1668 SourceExportStatementDetails::SqlServer {
1669 table,
1670 capture_instance,
1671 initial_lsn,
1672 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1673 capture_instance,
1674 table,
1675 initial_lsn,
1676 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1677 exclude_columns: exclude_columns
1678 .into_iter()
1679 .map(|c| c.into_string())
1680 .collect(),
1681 }),
1682 SourceExportStatementDetails::LoadGenerator { output } => {
1683 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1684 }
1685 SourceExportStatementDetails::Kafka {} => {
1686 bail_unsupported!("subsources cannot reference Kafka sources")
1687 }
1688 };
1689 DataSourceDesc::IngestionExport {
1690 ingestion_id,
1691 external_reference,
1692 details,
1693 data_config: SourceExportDataConfig {
1695 envelope: SourceEnvelope::None(NoneEnvelope {
1696 key_envelope: KeyEnvelope::None,
1697 key_arity: 0,
1698 }),
1699 encoding: None,
1700 },
1701 }
1702 } else if progress {
1703 DataSourceDesc::Progress
1704 } else {
1705 sql_bail!("CREATE SUBSOURCE must specify one of PROGRESS or REFERENCES option")
1706 };
1707
1708 let if_not_exists = *if_not_exists;
1709 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1710
1711 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1712
1713 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1714 let source = Source {
1715 create_sql,
1716 data_source,
1717 desc,
1718 compaction_window,
1719 };
1720
1721 Ok(Plan::CreateSource(CreateSourcePlan {
1722 name,
1723 source,
1724 if_not_exists,
1725 timeline: Timeline::EpochMilliseconds,
1726 in_cluster: None,
1727 }))
1728}
1729
1730generate_extracted_config!(
1731 TableFromSourceOption,
1732 (TextColumns, Vec::<Ident>, Default(vec![])),
1733 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1734 (PartitionBy, Vec<Ident>),
1735 (RetainHistory, OptionalDuration),
1736 (Details, String)
1737);
1738
1739pub fn plan_create_table_from_source(
1740 scx: &StatementContext,
1741 stmt: CreateTableFromSourceStatement<Aug>,
1742) -> Result<Plan, PlanError> {
1743 if !scx.catalog.system_vars().enable_create_table_from_source() {
1744 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1745 }
1746
1747 let CreateTableFromSourceStatement {
1748 name,
1749 columns,
1750 constraints,
1751 if_not_exists,
1752 source,
1753 external_reference,
1754 envelope,
1755 format,
1756 include_metadata,
1757 with_options,
1758 } = &stmt;
1759
1760 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1761
1762 let TableFromSourceOptionExtracted {
1763 text_columns,
1764 exclude_columns,
1765 retain_history,
1766 partition_by,
1767 details,
1768 seen: _,
1769 } = with_options.clone().try_into()?;
1770
1771 let source_item = scx.get_item_by_resolved_name(source)?;
1772 let ingestion_id = source_item.id();
1773
1774 let details = details
1777 .as_ref()
1778 .ok_or_else(|| internal_err!("source-export missing details"))?;
1779 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1780 let details =
1781 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1782 let details =
1783 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1784
1785 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1786 && include_metadata
1787 .iter()
1788 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1789 {
1790 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1792 }
1793 if !matches!(
1794 details,
1795 SourceExportStatementDetails::Kafka { .. }
1796 | SourceExportStatementDetails::LoadGenerator { .. }
1797 ) && !include_metadata.is_empty()
1798 {
1799 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1800 }
1801
1802 let details = match details {
1803 SourceExportStatementDetails::Postgres { table } => {
1804 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1805 column_casts: crate::pure::postgres::generate_column_casts(
1806 scx,
1807 &table,
1808 &text_columns,
1809 )?,
1810 table,
1811 })
1812 }
1813 SourceExportStatementDetails::MySql {
1814 table,
1815 initial_gtid_set,
1816 binlog_full_metadata,
1817 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1818 table,
1819 initial_gtid_set,
1820 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1821 exclude_columns: exclude_columns
1822 .into_iter()
1823 .map(|c| c.into_string())
1824 .collect(),
1825 binlog_full_metadata,
1826 }),
1827 SourceExportStatementDetails::SqlServer {
1828 table,
1829 capture_instance,
1830 initial_lsn,
1831 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1832 table,
1833 capture_instance,
1834 initial_lsn,
1835 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1836 exclude_columns: exclude_columns
1837 .into_iter()
1838 .map(|c| c.into_string())
1839 .collect(),
1840 }),
1841 SourceExportStatementDetails::LoadGenerator { output } => {
1842 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1843 }
1844 SourceExportStatementDetails::Kafka {} => {
1845 if !include_metadata.is_empty()
1846 && !matches!(
1847 envelope,
1848 ast::SourceEnvelope::Upsert { .. }
1849 | ast::SourceEnvelope::None
1850 | ast::SourceEnvelope::Debezium
1851 )
1852 {
1853 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1855 }
1856
1857 let metadata_columns = include_metadata
1858 .into_iter()
1859 .flat_map(|item| match item {
1860 SourceIncludeMetadata::Timestamp { alias } => {
1861 let name = match alias {
1862 Some(name) => name.to_string(),
1863 None => "timestamp".to_owned(),
1864 };
1865 Some((name, KafkaMetadataKind::Timestamp))
1866 }
1867 SourceIncludeMetadata::Partition { alias } => {
1868 let name = match alias {
1869 Some(name) => name.to_string(),
1870 None => "partition".to_owned(),
1871 };
1872 Some((name, KafkaMetadataKind::Partition))
1873 }
1874 SourceIncludeMetadata::Offset { alias } => {
1875 let name = match alias {
1876 Some(name) => name.to_string(),
1877 None => "offset".to_owned(),
1878 };
1879 Some((name, KafkaMetadataKind::Offset))
1880 }
1881 SourceIncludeMetadata::Headers { alias } => {
1882 let name = match alias {
1883 Some(name) => name.to_string(),
1884 None => "headers".to_owned(),
1885 };
1886 Some((name, KafkaMetadataKind::Headers))
1887 }
1888 SourceIncludeMetadata::Header {
1889 alias,
1890 key,
1891 use_bytes,
1892 } => Some((
1893 alias.to_string(),
1894 KafkaMetadataKind::Header {
1895 key: key.clone(),
1896 use_bytes: *use_bytes,
1897 },
1898 )),
1899 SourceIncludeMetadata::Key { .. } => {
1900 None
1902 }
1903 })
1904 .collect();
1905
1906 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1907 }
1908 };
1909
1910 let source_connection = &source_item
1911 .source_desc()?
1912 .ok_or_else(|| sql_err!("item is not a source"))?
1913 .connection;
1914
1915 let (key_desc, value_desc) =
1920 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1921 let columns = match columns {
1922 TableFromSourceColumns::Defined(columns) => columns,
1923 _ => bail_internal!("expected column definitions to be present"),
1924 };
1925 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1926 (None, desc)
1927 } else {
1928 let key_desc = source_connection.default_key_desc();
1929 let value_desc = source_connection.default_value_desc();
1930 (Some(key_desc), value_desc)
1931 };
1932
1933 let metadata_columns_desc = match &details {
1934 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1935 metadata_columns, ..
1936 }) => kafka_metadata_columns_desc(metadata_columns),
1937 _ => vec![],
1938 };
1939
1940 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1941 scx,
1942 &envelope,
1943 format,
1944 key_desc,
1945 value_desc,
1946 include_metadata,
1947 metadata_columns_desc,
1948 source_connection,
1949 )?;
1950 if let TableFromSourceColumns::Named(col_names) = columns {
1951 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1952 }
1953
1954 let names: Vec<_> = desc.iter_names().cloned().collect();
1955 if let Some(dup) = names.iter().duplicates().next() {
1956 sql_bail!("column {} specified more than once", dup.quoted());
1957 }
1958
1959 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1960
1961 let timeline = match envelope {
1964 SourceEnvelope::CdcV2 => {
1965 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1966 }
1967 _ => Timeline::EpochMilliseconds,
1968 };
1969
1970 if let Some(partition_by) = partition_by {
1971 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1972 check_partition_by(&desc, partition_by)?;
1973 }
1974
1975 let data_source = DataSourceDesc::IngestionExport {
1976 ingestion_id,
1977 external_reference: external_reference
1979 .as_ref()
1980 .ok_or_else(|| sql_err!("EXTERNAL REFERENCE is required"))?
1981 .clone(),
1982 details,
1983 data_config: SourceExportDataConfig { envelope, encoding },
1984 };
1985
1986 let if_not_exists = *if_not_exists;
1987
1988 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1989
1990 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1991 let table = Table {
1992 create_sql,
1993 desc: VersionedRelationDesc::new(desc),
1994 temporary: false,
1995 compaction_window,
1996 data_source: TableDataSource::DataSource {
1997 desc: data_source,
1998 timeline,
1999 },
2000 };
2001
2002 Ok(Plan::CreateTable(CreateTablePlan {
2003 name,
2004 table,
2005 if_not_exists,
2006 }))
2007}
2008
2009generate_extracted_config!(
2010 LoadGeneratorOption,
2011 (TickInterval, Duration),
2012 (AsOf, u64, Default(0_u64)),
2013 (UpTo, u64, Default(u64::MAX)),
2014 (ScaleFactor, f64),
2015 (MaxCardinality, u64),
2016 (Keys, u64),
2017 (SnapshotRounds, u64),
2018 (TransactionalSnapshot, bool),
2019 (ValueSize, u64),
2020 (Seed, u64),
2021 (Partitions, u64),
2022 (BatchSize, u64)
2023);
2024
2025impl LoadGeneratorOptionExtracted {
2026 pub(super) fn ensure_only_valid_options(
2027 &self,
2028 loadgen: &ast::LoadGenerator,
2029 ) -> Result<(), PlanError> {
2030 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2031
2032 let mut options = self.seen.clone();
2033
2034 let permitted_options: &[_] = match loadgen {
2035 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2036 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2037 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2038 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2039 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2040 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2041 ast::LoadGenerator::KeyValue => &[
2042 TickInterval,
2043 Keys,
2044 SnapshotRounds,
2045 TransactionalSnapshot,
2046 ValueSize,
2047 Seed,
2048 Partitions,
2049 BatchSize,
2050 ],
2051 };
2052
2053 for o in permitted_options {
2054 options.remove(o);
2055 }
2056
2057 if !options.is_empty() {
2058 sql_bail!(
2059 "{} load generators do not support {} values",
2060 loadgen,
2061 options.iter().join(", ")
2062 )
2063 }
2064
2065 Ok(())
2066 }
2067}
2068
2069pub(crate) fn load_generator_ast_to_generator(
2070 scx: &StatementContext,
2071 loadgen: &ast::LoadGenerator,
2072 options: &[LoadGeneratorOption<Aug>],
2073 include_metadata: &[SourceIncludeMetadata],
2074) -> Result<LoadGenerator, PlanError> {
2075 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2076 extracted.ensure_only_valid_options(loadgen)?;
2077
2078 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2079 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2080 }
2081
2082 let load_generator = match loadgen {
2083 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2084 ast::LoadGenerator::Clock => {
2085 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_CLOCK)?;
2086 LoadGenerator::Clock
2087 }
2088 ast::LoadGenerator::Counter => {
2089 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_COUNTER)?;
2090 let LoadGeneratorOptionExtracted {
2091 max_cardinality, ..
2092 } = extracted;
2093 LoadGenerator::Counter { max_cardinality }
2094 }
2095 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2096 ast::LoadGenerator::Datums => {
2097 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_DATUMS)?;
2098 LoadGenerator::Datums
2099 }
2100 ast::LoadGenerator::Tpch => {
2101 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2102
2103 let sf: f64 = scale_factor.unwrap_or(0.01);
2105 if !sf.is_finite() || sf < 0.0 {
2106 sql_bail!("unsupported scale factor {sf}");
2107 }
2108
2109 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2110 let total = (sf * multiplier).floor();
2111 let mut i = i64::try_cast_from(total)
2112 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2113 if i < 1 {
2114 i = 1;
2115 }
2116 Ok(i)
2117 };
2118
2119 let count_supplier = f_to_i(10_000f64)?;
2122 let count_part = f_to_i(200_000f64)?;
2123 let count_customer = f_to_i(150_000f64)?;
2124 let count_orders = f_to_i(150_000f64 * 10f64)?;
2125 let count_clerk = f_to_i(1_000f64)?;
2126
2127 LoadGenerator::Tpch {
2128 count_supplier,
2129 count_part,
2130 count_customer,
2131 count_orders,
2132 count_clerk,
2133 }
2134 }
2135 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2136 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2137 let LoadGeneratorOptionExtracted {
2138 keys,
2139 snapshot_rounds,
2140 transactional_snapshot,
2141 value_size,
2142 tick_interval,
2143 seed,
2144 partitions,
2145 batch_size,
2146 ..
2147 } = extracted;
2148
2149 let mut include_offset = None;
2150 for im in include_metadata {
2151 match im {
2152 SourceIncludeMetadata::Offset { alias } => {
2153 include_offset = match alias {
2154 Some(alias) => Some(alias.to_string()),
2155 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2156 }
2157 }
2158 SourceIncludeMetadata::Key { .. } => continue,
2159
2160 _ => {
2161 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2162 }
2163 };
2164 }
2165
2166 let lgkv = KeyValueLoadGenerator {
2167 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2168 snapshot_rounds: snapshot_rounds
2169 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2170 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2172 value_size: value_size
2173 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2174 partitions: partitions
2175 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2176 tick_interval,
2177 batch_size: batch_size
2178 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2179 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2180 include_offset,
2181 };
2182
2183 if lgkv.keys == 0
2184 || lgkv.partitions == 0
2185 || lgkv.value_size == 0
2186 || lgkv.batch_size == 0
2187 {
2188 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2189 }
2190
2191 if lgkv.keys % lgkv.partitions != 0 {
2192 sql_bail!("KEYS must be a multiple of PARTITIONS")
2193 }
2194
2195 if lgkv.batch_size > lgkv.keys {
2196 sql_bail!("KEYS must be larger than BATCH SIZE")
2197 }
2198
2199 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2202 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2203 }
2204
2205 if lgkv.snapshot_rounds == 0 {
2206 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2207 }
2208
2209 LoadGenerator::KeyValue(lgkv)
2210 }
2211 };
2212
2213 Ok(load_generator)
2214}
2215
2216fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2217 let before = value_desc.get_by_name(&"before".into());
2218 let (after_idx, after_ty) = value_desc
2219 .get_by_name(&"after".into())
2220 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2221 let before_idx = if let Some((before_idx, before_ty)) = before {
2222 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2223 sql_bail!("'before' column must be of type record");
2224 }
2225 if before_ty != after_ty {
2226 sql_bail!("'before' type differs from 'after' column");
2227 }
2228 Some(before_idx)
2229 } else {
2230 None
2231 };
2232 Ok((before_idx, after_idx))
2233}
2234
2235fn get_encoding(
2236 scx: &StatementContext,
2237 format: &FormatSpecifier<Aug>,
2238 envelope: &ast::SourceEnvelope,
2239) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2240 let encoding = match format {
2241 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2242 FormatSpecifier::KeyValue { key, value } => {
2243 let key = {
2244 let encoding = get_encoding_inner(scx, key)?;
2245 Some(encoding.key.unwrap_or(encoding.value))
2246 };
2247 let value = get_encoding_inner(scx, value)?.value;
2248 SourceDataEncoding { key, value }
2249 }
2250 };
2251
2252 let requires_keyvalue = matches!(
2253 envelope,
2254 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2255 );
2256 let is_keyvalue = encoding.key.is_some();
2257 if requires_keyvalue && !is_keyvalue {
2258 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2259 };
2260
2261 Ok(encoding)
2262}
2263
2264fn source_sink_cluster_config<'a, 'ctx>(
2270 scx: &'a StatementContext<'ctx>,
2271 in_cluster: &mut Option<ResolvedClusterName>,
2272) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2273 let cluster = match in_cluster {
2274 None => {
2275 let cluster = scx.catalog.resolve_cluster(None)?;
2276 *in_cluster = Some(ResolvedClusterName {
2277 id: cluster.id(),
2278 print_name: None,
2279 });
2280 cluster
2281 }
2282 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2283 };
2284
2285 Ok(cluster)
2286}
2287
2288generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2289
2290#[derive(Debug)]
2291pub struct Schema {
2292 pub key_schema: Option<String>,
2293 pub value_schema: String,
2294 pub key_reference_schemas: Vec<String>,
2296 pub value_reference_schemas: Vec<String>,
2298 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2299 pub confluent_wire_format: bool,
2300}
2301
2302fn get_encoding_inner(
2303 scx: &StatementContext,
2304 format: &Format<Aug>,
2305) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2306 let value = match format {
2307 Format::Bytes => DataEncoding::Bytes,
2308 Format::Avro(schema) => {
2309 let Schema {
2310 key_schema,
2311 value_schema,
2312 key_reference_schemas,
2313 value_reference_schemas,
2314 csr_connection,
2315 confluent_wire_format,
2316 } = match schema {
2317 AvroSchema::InlineSchema {
2320 schema: ast::Schema { schema },
2321 with_options,
2322 } => {
2323 let AvroSchemaOptionExtracted {
2324 confluent_wire_format,
2325 ..
2326 } = with_options.clone().try_into()?;
2327
2328 Schema {
2329 key_schema: None,
2330 value_schema: schema.clone(),
2331 key_reference_schemas: vec![],
2332 value_reference_schemas: vec![],
2333 csr_connection: None,
2334 confluent_wire_format,
2335 }
2336 }
2337 AvroSchema::Csr {
2338 csr_connection:
2339 CsrConnectionAvro {
2340 connection,
2341 seed,
2342 key_strategy: _,
2343 value_strategy: _,
2344 },
2345 } => {
2346 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2347 let csr_connection = match item.connection()? {
2348 Connection::Csr(_) => item.id(),
2349 _ => {
2350 sql_bail!(
2351 "{} is not a schema registry connection",
2352 scx.catalog
2353 .resolve_full_name(item.name())
2354 .to_string()
2355 .quoted()
2356 )
2357 }
2358 };
2359
2360 if let Some(seed) = seed {
2361 Schema {
2362 key_schema: seed.key_schema.clone(),
2363 value_schema: seed.value_schema.clone(),
2364 key_reference_schemas: seed.key_reference_schemas.clone(),
2365 value_reference_schemas: seed.value_reference_schemas.clone(),
2366 csr_connection: Some(csr_connection),
2367 confluent_wire_format: true,
2368 }
2369 } else {
2370 sql_bail!("Avro CSR seed resolution has not been performed")
2371 }
2372 }
2373 };
2374
2375 if let Some(key_schema) = key_schema {
2376 return Ok(SourceDataEncoding {
2377 key: Some(DataEncoding::Avro(AvroEncoding {
2378 schema: key_schema,
2379 reference_schemas: key_reference_schemas,
2380 csr_connection: csr_connection.clone(),
2381 confluent_wire_format,
2382 })),
2383 value: DataEncoding::Avro(AvroEncoding {
2384 schema: value_schema,
2385 reference_schemas: value_reference_schemas,
2386 csr_connection,
2387 confluent_wire_format,
2388 }),
2389 });
2390 } else {
2391 DataEncoding::Avro(AvroEncoding {
2392 schema: value_schema,
2393 reference_schemas: value_reference_schemas,
2394 csr_connection,
2395 confluent_wire_format,
2396 })
2397 }
2398 }
2399 Format::Protobuf(schema) => match schema {
2400 ProtobufSchema::Csr {
2401 csr_connection:
2402 CsrConnectionProtobuf {
2403 connection:
2404 CsrConnection {
2405 connection,
2406 options,
2407 },
2408 seed,
2409 },
2410 } => {
2411 if let Some(CsrSeedProtobuf { key, value }) = seed {
2412 let item = scx.get_item_by_resolved_name(connection)?;
2413 let _ = match item.connection()? {
2414 Connection::Csr(connection) => connection,
2415 _ => {
2416 sql_bail!(
2417 "{} is not a schema registry connection",
2418 scx.catalog
2419 .resolve_full_name(item.name())
2420 .to_string()
2421 .quoted()
2422 )
2423 }
2424 };
2425
2426 if !options.is_empty() {
2427 sql_bail!("Protobuf CSR connections do not support any options");
2428 }
2429
2430 let value = DataEncoding::Protobuf(ProtobufEncoding {
2431 descriptors: strconv::parse_bytes(&value.schema)?,
2432 message_name: value.message_name.clone(),
2433 confluent_wire_format: true,
2434 });
2435 if let Some(key) = key {
2436 return Ok(SourceDataEncoding {
2437 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2438 descriptors: strconv::parse_bytes(&key.schema)?,
2439 message_name: key.message_name.clone(),
2440 confluent_wire_format: true,
2441 })),
2442 value,
2443 });
2444 }
2445 value
2446 } else {
2447 sql_bail!("Protobuf CSR seed resolution has not been performed")
2448 }
2449 }
2450 ProtobufSchema::InlineSchema {
2451 message_name,
2452 schema: ast::Schema { schema },
2453 } => {
2454 let descriptors = strconv::parse_bytes(schema)?;
2455
2456 DataEncoding::Protobuf(ProtobufEncoding {
2457 descriptors,
2458 message_name: message_name.to_owned(),
2459 confluent_wire_format: false,
2460 })
2461 }
2462 },
2463 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2464 regex: mz_repr::adt::regex::Regex::new(regex, false)
2465 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2466 }),
2467 Format::Csv { columns, delimiter } => {
2468 let columns = match columns {
2469 CsvColumns::Header { names } => {
2470 if names.is_empty() {
2471 sql_bail!("[internal error] column spec should get names in purify")
2472 }
2473 ColumnSpec::Header {
2474 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2475 }
2476 }
2477 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2478 };
2479 DataEncoding::Csv(CsvEncoding {
2480 columns,
2481 delimiter: u8::try_from(*delimiter)
2482 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2483 })
2484 }
2485 Format::Json { array: false } => DataEncoding::Json,
2486 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2487 Format::Text => DataEncoding::Text,
2488 };
2489 Ok(SourceDataEncoding { key: None, value })
2490}
2491
2492fn get_key_envelope(
2494 included_items: &[SourceIncludeMetadata],
2495 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2496 key_envelope_no_encoding: bool,
2497) -> Result<KeyEnvelope, PlanError> {
2498 let key_definition = included_items
2499 .iter()
2500 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2501 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2502 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2503 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2504 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2505 (Some(name), _) if key_envelope_no_encoding => {
2506 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2507 }
2508 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2509 (_, None) => {
2510 sql_bail!(
2514 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2515 got bare FORMAT"
2516 );
2517 }
2518 }
2519 } else {
2520 Ok(KeyEnvelope::None)
2521 }
2522}
2523
2524fn get_unnamed_key_envelope(
2527 key: Option<&DataEncoding<ReferencedConnection>>,
2528) -> Result<KeyEnvelope, PlanError> {
2529 let is_composite = match key {
2533 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2534 Some(
2535 DataEncoding::Avro(_)
2536 | DataEncoding::Csv(_)
2537 | DataEncoding::Protobuf(_)
2538 | DataEncoding::Regex { .. },
2539 ) => true,
2540 None => false,
2541 };
2542
2543 if is_composite {
2544 Ok(KeyEnvelope::Flattened)
2545 } else {
2546 Ok(KeyEnvelope::Named("key".to_string()))
2547 }
2548}
2549
2550pub fn describe_create_view(
2551 _: &StatementContext,
2552 _: CreateViewStatement<Aug>,
2553) -> Result<StatementDesc, PlanError> {
2554 Ok(StatementDesc::new(None))
2555}
2556
2557pub fn plan_view(
2558 scx: &StatementContext,
2559 def: &mut ViewDefinition<Aug>,
2560 temporary: bool,
2561) -> Result<(QualifiedItemName, View), PlanError> {
2562 let create_sql = normalize::create_statement(
2563 scx,
2564 Statement::CreateView(CreateViewStatement {
2565 if_exists: IfExistsBehavior::Error,
2566 temporary,
2567 definition: def.clone(),
2568 }),
2569 )?;
2570
2571 let ViewDefinition {
2572 name,
2573 columns,
2574 query,
2575 } = def;
2576
2577 let query::PlannedRootQuery {
2578 expr,
2579 mut desc,
2580 finishing,
2581 scope: _,
2582 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2583 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2589 &finishing,
2590 expr.arity()
2591 ));
2592 if expr.contains_parameters()? {
2593 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2594 }
2595
2596 let dependencies = expr
2597 .depends_on()
2598 .into_iter()
2599 .map(|gid| scx.catalog.resolve_item_id(&gid))
2600 .collect();
2601
2602 let name = if temporary {
2603 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2604 } else {
2605 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2606 };
2607
2608 plan_utils::maybe_rename_columns(
2609 format!("view {}", scx.catalog.resolve_full_name(&name)),
2610 &mut desc,
2611 columns,
2612 )?;
2613 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2614
2615 if let Some(dup) = names.iter().duplicates().next() {
2616 sql_bail!("column {} specified more than once", dup.quoted());
2617 }
2618
2619 let view = View {
2620 create_sql,
2621 expr,
2622 dependencies,
2623 column_names: names,
2624 temporary,
2625 };
2626
2627 Ok((name, view))
2628}
2629
2630pub fn plan_create_view(
2631 scx: &StatementContext,
2632 mut stmt: CreateViewStatement<Aug>,
2633) -> Result<Plan, PlanError> {
2634 let CreateViewStatement {
2635 temporary,
2636 if_exists,
2637 definition,
2638 } = &mut stmt;
2639 let (name, view) = plan_view(scx, definition, *temporary)?;
2640
2641 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2644
2645 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2646 let if_exists = true;
2647 let cascade = false;
2648 let maybe_item_to_drop = plan_drop_item(
2649 scx,
2650 ObjectType::View,
2651 if_exists,
2652 definition.name.clone(),
2653 cascade,
2654 )?;
2655
2656 if let Some(id) = maybe_item_to_drop {
2658 let dependencies = view.expr.depends_on();
2659 let invalid_drop = scx
2660 .get_item(&id)
2661 .global_ids()
2662 .any(|gid| dependencies.contains(&gid));
2663 if invalid_drop {
2664 let item = scx.catalog.get_item(&id);
2665 sql_bail!(
2666 "cannot replace view {0}: depended upon by new {0} definition",
2667 scx.catalog.resolve_full_name(item.name())
2668 );
2669 }
2670
2671 Some(id)
2672 } else {
2673 None
2674 }
2675 } else {
2676 None
2677 };
2678 let drop_ids = replace
2679 .map(|id| {
2680 scx.catalog
2681 .item_dependents(id)
2682 .into_iter()
2683 .map(|id| id.unwrap_item_id())
2684 .collect()
2685 })
2686 .unwrap_or_default();
2687
2688 validate_view_dependencies(scx, &view.dependencies.0)?;
2689
2690 let full_name = scx.catalog.resolve_full_name(&name);
2692 let partial_name = PartialItemName::from(full_name.clone());
2693 if let (Ok(item), IfExistsBehavior::Error, false) = (
2696 scx.catalog.resolve_item_or_type(&partial_name),
2697 *if_exists,
2698 ignore_if_exists_errors,
2699 ) {
2700 return Err(PlanError::ItemAlreadyExists {
2701 name: full_name.to_string(),
2702 item_type: item.item_type(),
2703 });
2704 }
2705
2706 Ok(Plan::CreateView(CreateViewPlan {
2707 name,
2708 view,
2709 replace,
2710 drop_ids,
2711 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2712 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2713 }))
2714}
2715
2716fn validate_view_dependencies(
2718 scx: &StatementContext,
2719 dependencies: &BTreeSet<CatalogItemId>,
2720) -> Result<(), PlanError> {
2721 for id in dependencies {
2722 let item = scx.catalog.get_item(id);
2723 if item.replacement_target().is_some() {
2724 let name = scx.catalog.minimal_qualification(item.name());
2725 return Err(PlanError::InvalidDependency {
2726 name: name.to_string(),
2727 item_type: format!("replacement {}", item.item_type()),
2728 });
2729 }
2730 }
2731
2732 Ok(())
2733}
2734
2735pub fn describe_create_materialized_view(
2736 _: &StatementContext,
2737 _: CreateMaterializedViewStatement<Aug>,
2738) -> Result<StatementDesc, PlanError> {
2739 Ok(StatementDesc::new(None))
2740}
2741
2742pub fn describe_create_network_policy(
2743 _: &StatementContext,
2744 _: CreateNetworkPolicyStatement<Aug>,
2745) -> Result<StatementDesc, PlanError> {
2746 Ok(StatementDesc::new(None))
2747}
2748
2749pub fn describe_alter_network_policy(
2750 _: &StatementContext,
2751 _: AlterNetworkPolicyStatement<Aug>,
2752) -> Result<StatementDesc, PlanError> {
2753 Ok(StatementDesc::new(None))
2754}
2755
2756pub fn plan_create_materialized_view(
2757 scx: &StatementContext,
2758 mut stmt: CreateMaterializedViewStatement<Aug>,
2759) -> Result<Plan, PlanError> {
2760 let cluster_id =
2761 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2762 stmt.in_cluster = Some(ResolvedClusterName {
2763 id: cluster_id,
2764 print_name: None,
2765 });
2766
2767 let target_replica = match &stmt.in_cluster_replica {
2768 Some(replica_name) => {
2769 scx.require_feature_flag(&ENABLE_REPLICA_TARGETED_MATERIALIZED_VIEWS)?;
2770
2771 let cluster = scx.catalog.get_cluster(cluster_id);
2772 let replica_id = cluster
2773 .replica_ids()
2774 .get(replica_name.as_str())
2775 .copied()
2776 .ok_or_else(|| {
2777 CatalogError::UnknownClusterReplica(replica_name.as_str().to_string())
2778 })?;
2779 Some(replica_id)
2780 }
2781 None => None,
2782 };
2783
2784 let create_sql =
2785 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2786
2787 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2788 let name = scx.allocate_qualified_name(partial_name.clone())?;
2789
2790 let query::PlannedRootQuery {
2791 expr,
2792 mut desc,
2793 finishing,
2794 scope: _,
2795 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2796 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2798 &finishing,
2799 expr.arity()
2800 ));
2801 if expr.contains_parameters()? {
2802 return Err(PlanError::ParameterNotAllowed(
2803 "materialized views".to_string(),
2804 ));
2805 }
2806
2807 plan_utils::maybe_rename_columns(
2808 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2809 &mut desc,
2810 &stmt.columns,
2811 )?;
2812 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2813
2814 let MaterializedViewOptionExtracted {
2815 assert_not_null,
2816 partition_by,
2817 retain_history,
2818 refresh,
2819 seen: _,
2820 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2821
2822 if let Some(partition_by) = partition_by {
2823 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2824 check_partition_by(&desc, partition_by)?;
2825 }
2826
2827 let refresh_schedule = {
2828 let mut refresh_schedule = RefreshSchedule::default();
2829 let mut on_commits_seen = 0;
2830 for refresh_option_value in refresh {
2831 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2832 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2833 }
2834 match refresh_option_value {
2835 RefreshOptionValue::OnCommit => {
2836 on_commits_seen += 1;
2837 }
2838 RefreshOptionValue::AtCreation => {
2839 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2840 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2841 }
2842 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2843 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2845 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2846 name: "REFRESH AT",
2847 scope: &Scope::empty(),
2848 relation_type: &SqlRelationType::empty(),
2849 allow_aggregates: false,
2850 allow_subqueries: false,
2851 allow_parameters: false,
2852 allow_windows: false,
2853 };
2854 let hir = plan_expr(ecx, &time)?.cast_to(
2855 ecx,
2856 CastContext::Assignment,
2857 &SqlScalarType::MzTimestamp,
2858 )?;
2859 let timestamp = hir
2861 .into_literal_mz_timestamp()
2862 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2863 refresh_schedule.ats.push(timestamp);
2864 }
2865 RefreshOptionValue::Every(RefreshEveryOptionValue {
2866 interval,
2867 aligned_to,
2868 }) => {
2869 let interval = Interval::try_from_value(Value::Interval(interval))?;
2870 if interval.as_microseconds() <= 0 {
2871 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2872 }
2873 if interval.months != 0 {
2874 sql_bail!("REFRESH interval must not involve units larger than days");
2879 }
2880 let interval = interval.duration()?;
2881 if u64::try_from(interval.as_millis()).is_err() {
2882 sql_bail!("REFRESH interval too large");
2883 }
2884
2885 let mut aligned_to = match aligned_to {
2886 Some(aligned_to) => aligned_to,
2887 None => {
2888 soft_panic_or_log!(
2889 "ALIGNED TO should have been filled in by purification"
2890 );
2891 sql_bail!(
2892 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2893 )
2894 }
2895 };
2896
2897 transform_ast::transform(scx, &mut aligned_to)?;
2899
2900 let ecx = &ExprContext {
2901 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2902 name: "REFRESH EVERY ... ALIGNED TO",
2903 scope: &Scope::empty(),
2904 relation_type: &SqlRelationType::empty(),
2905 allow_aggregates: false,
2906 allow_subqueries: false,
2907 allow_parameters: false,
2908 allow_windows: false,
2909 };
2910 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2911 ecx,
2912 CastContext::Assignment,
2913 &SqlScalarType::MzTimestamp,
2914 )?;
2915 let aligned_to_const = aligned_to_hir
2917 .into_literal_mz_timestamp()
2918 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2919
2920 refresh_schedule.everies.push(RefreshEvery {
2921 interval,
2922 aligned_to: aligned_to_const,
2923 });
2924 }
2925 }
2926 }
2927
2928 if on_commits_seen > 1 {
2929 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2930 }
2931 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2932 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2933 }
2934
2935 if refresh_schedule == RefreshSchedule::default() {
2936 None
2937 } else {
2938 Some(refresh_schedule)
2939 }
2940 };
2941
2942 let as_of = stmt.as_of.map(Timestamp::from);
2943 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2944 let mut non_null_assertions = assert_not_null
2945 .into_iter()
2946 .map(normalize::column_name)
2947 .map(|assertion_name| {
2948 column_names
2949 .iter()
2950 .position(|col| col == &assertion_name)
2951 .ok_or_else(|| {
2952 sql_err!(
2953 "column {} in ASSERT NOT NULL option not found",
2954 assertion_name.quoted()
2955 )
2956 })
2957 })
2958 .collect::<Result<Vec<_>, _>>()?;
2959 non_null_assertions.sort();
2960 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2961 let dup = &column_names[*dup];
2962 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2963 }
2964
2965 if let Some(dup) = column_names.iter().duplicates().next() {
2966 sql_bail!("column {} specified more than once", dup.quoted());
2967 }
2968
2969 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2972 Ok(true) => IfExistsBehavior::Skip,
2973 _ => stmt.if_exists,
2974 };
2975
2976 let mut replace = None;
2977 let mut if_not_exists = false;
2978 match if_exists {
2979 IfExistsBehavior::Replace => {
2980 let if_exists = true;
2981 let cascade = false;
2982 let replace_id = plan_drop_item(
2983 scx,
2984 ObjectType::MaterializedView,
2985 if_exists,
2986 partial_name.clone().into(),
2987 cascade,
2988 )?;
2989
2990 if let Some(id) = replace_id {
2992 let dependencies = expr.depends_on();
2993 let invalid_drop = scx
2994 .get_item(&id)
2995 .global_ids()
2996 .any(|gid| dependencies.contains(&gid));
2997 if invalid_drop {
2998 let item = scx.catalog.get_item(&id);
2999 sql_bail!(
3000 "cannot replace materialized view {0}: depended upon by new {0} definition",
3001 scx.catalog.resolve_full_name(item.name())
3002 );
3003 }
3004 replace = Some(id);
3005 }
3006 }
3007 IfExistsBehavior::Skip => if_not_exists = true,
3008 IfExistsBehavior::Error => (),
3009 }
3010 let drop_ids = replace
3011 .map(|id| {
3012 scx.catalog
3013 .item_dependents(id)
3014 .into_iter()
3015 .map(|id| id.unwrap_item_id())
3016 .collect()
3017 })
3018 .unwrap_or_default();
3019 let mut dependencies: BTreeSet<_> = expr
3020 .depends_on()
3021 .into_iter()
3022 .map(|gid| scx.catalog.resolve_item_id(&gid))
3023 .collect();
3024
3025 let mut replacement_target = None;
3027 if let Some(target_name) = &stmt.replacement_for {
3028 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
3029
3030 let target = scx.get_item_by_resolved_name(target_name)?;
3031 if target.item_type() != CatalogItemType::MaterializedView {
3032 return Err(PlanError::InvalidReplacement {
3033 item_type: target.item_type(),
3034 item_name: scx.catalog.minimal_qualification(target.name()),
3035 replacement_type: CatalogItemType::MaterializedView,
3036 replacement_name: partial_name,
3037 });
3038 }
3039 if target.id().is_system() {
3040 sql_bail!(
3041 "cannot replace {} because it is required by the database system",
3042 scx.catalog.minimal_qualification(target.name()),
3043 );
3044 }
3045
3046 for dependent in scx.catalog.item_dependents(target.id()) {
3048 if let ObjectId::Item(id) = dependent
3049 && dependencies.contains(&id)
3050 {
3051 sql_bail!(
3052 "replacement would cause {} to depend on itself",
3053 scx.catalog.minimal_qualification(target.name()),
3054 );
3055 }
3056 }
3057
3058 dependencies.insert(target.id());
3059
3060 for use_id in target.used_by() {
3061 let use_item = scx.get_item(use_id);
3062 if use_item.replacement_target() == Some(target.id()) {
3063 sql_bail!(
3064 "cannot replace {} because it already has a replacement: {}",
3065 scx.catalog.minimal_qualification(target.name()),
3066 scx.catalog.minimal_qualification(use_item.name()),
3067 );
3068 }
3069 }
3070
3071 replacement_target = Some(target.id());
3072 }
3073
3074 validate_view_dependencies(scx, &dependencies)?;
3075
3076 let full_name = scx.catalog.resolve_full_name(&name);
3078 let partial_name = PartialItemName::from(full_name.clone());
3079 if let (IfExistsBehavior::Error, Ok(item)) =
3082 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
3083 {
3084 return Err(PlanError::ItemAlreadyExists {
3085 name: full_name.to_string(),
3086 item_type: item.item_type(),
3087 });
3088 }
3089
3090 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
3091 name,
3092 materialized_view: MaterializedView {
3093 create_sql,
3094 expr,
3095 dependencies: DependencyIds(dependencies),
3096 column_names,
3097 replacement_target,
3098 cluster_id,
3099 target_replica,
3100 non_null_assertions,
3101 compaction_window,
3102 refresh_schedule,
3103 as_of,
3104 },
3105 replace,
3106 drop_ids,
3107 if_not_exists,
3108 ambiguous_columns: *scx.ambiguous_columns.borrow(),
3109 }))
3110}
3111
3112generate_extracted_config!(
3113 MaterializedViewOption,
3114 (AssertNotNull, Ident, AllowMultiple),
3115 (PartitionBy, Vec<Ident>),
3116 (RetainHistory, OptionalDuration),
3117 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
3118);
3119
3120pub fn describe_create_sink(
3121 _: &StatementContext,
3122 _: CreateSinkStatement<Aug>,
3123) -> Result<StatementDesc, PlanError> {
3124 Ok(StatementDesc::new(None))
3125}
3126
3127generate_extracted_config!(
3128 CreateSinkOption,
3129 (Snapshot, bool),
3130 (PartitionStrategy, String),
3131 (Version, u64),
3132 (CommitInterval, Duration)
3133);
3134
3135pub fn plan_create_sink(
3136 scx: &StatementContext,
3137 stmt: CreateSinkStatement<Aug>,
3138) -> Result<Plan, PlanError> {
3139 let Some(name) = stmt.name.clone() else {
3141 return Err(PlanError::MissingName(CatalogItemType::Sink));
3142 };
3143 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3144 let full_name = scx.catalog.resolve_full_name(&name);
3145 let partial_name = PartialItemName::from(full_name.clone());
3146 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3147 return Err(PlanError::ItemAlreadyExists {
3148 name: full_name.to_string(),
3149 item_type: item.item_type(),
3150 });
3151 }
3152
3153 plan_sink(scx, stmt)
3154}
3155
3156fn plan_sink(
3161 scx: &StatementContext,
3162 mut stmt: CreateSinkStatement<Aug>,
3163) -> Result<Plan, PlanError> {
3164 let CreateSinkStatement {
3165 name,
3166 in_cluster: _,
3167 from,
3168 connection,
3169 format,
3170 envelope,
3171 mode,
3172 if_not_exists,
3173 with_options,
3174 } = stmt.clone();
3175
3176 let Some(name) = name else {
3177 return Err(PlanError::MissingName(CatalogItemType::Sink));
3178 };
3179 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3180
3181 let envelope = match (&connection, envelope, mode) {
3182 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Upsert), None) => {
3184 SinkEnvelope::Upsert
3185 }
3186 (CreateSinkConnection::Kafka { .. }, Some(ast::SinkEnvelope::Debezium), None) => {
3187 SinkEnvelope::Debezium
3188 }
3189 (CreateSinkConnection::Kafka { .. }, None, None) => {
3190 sql_bail!("ENVELOPE clause is required")
3191 }
3192 (CreateSinkConnection::Kafka { .. }, _, Some(_)) => {
3193 sql_bail!("MODE is not supported for Kafka sinks, use ENVELOPE instead")
3194 }
3195 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
3197 SinkEnvelope::Upsert
3198 }
3199 (CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
3200 SinkEnvelope::Append
3201 }
3202 (CreateSinkConnection::Iceberg { .. }, None, None) => {
3203 sql_bail!("MODE clause is required")
3204 }
3205 (CreateSinkConnection::Iceberg { .. }, Some(_), _) => {
3206 sql_bail!("ENVELOPE is not supported for Iceberg sinks, use MODE instead")
3207 }
3208 };
3209
3210 let from_name = &from;
3211 let from = scx.get_item_by_resolved_name(&from)?;
3212
3213 {
3214 use CatalogItemType::*;
3215 match from.item_type() {
3216 Table | Source | MaterializedView => {
3217 if from.replacement_target().is_some() {
3218 let name = scx.catalog.minimal_qualification(from.name());
3219 return Err(PlanError::InvalidSinkFrom {
3220 name: name.to_string(),
3221 item_type: format!("replacement {}", from.item_type()),
3222 });
3223 }
3224 }
3225 Sink | View | Index | Type | Func | Secret | Connection => {
3226 let name = scx.catalog.minimal_qualification(from.name());
3227 return Err(PlanError::InvalidSinkFrom {
3228 name: name.to_string(),
3229 item_type: from.item_type().to_string(),
3230 });
3231 }
3232 }
3233 }
3234
3235 if from.id().is_system() {
3236 bail_unsupported!("creating a sink directly on a catalog object");
3237 }
3238
3239 let desc = from
3240 .relation_desc()
3241 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
3242 let key_indices = match &connection {
3243 CreateSinkConnection::Kafka { key: Some(key), .. }
3244 | CreateSinkConnection::Iceberg { key: Some(key), .. } => {
3245 let key_columns = key
3246 .key_columns
3247 .clone()
3248 .into_iter()
3249 .map(normalize::column_name)
3250 .collect::<Vec<_>>();
3251 let mut uniq = BTreeSet::new();
3252 for col in key_columns.iter() {
3253 if !uniq.insert(col) {
3254 sql_bail!("duplicate column referenced in KEY: {}", col);
3255 }
3256 }
3257 let indices = key_columns
3258 .iter()
3259 .map(|col| {
3260 let name_idx =
3261 desc.get_by_name(col)
3262 .map(|(idx, _type)| idx)
3263 .ok_or_else(|| {
3264 sql_err!("column referenced in KEY does not exist: {}", col)
3265 })?;
3266 if desc.get_unambiguous_name(name_idx).is_none() {
3267 sql_bail!("column referenced in KEY is ambiguous: {}", col);
3268 }
3269 Ok(name_idx)
3270 })
3271 .collect::<Result<Vec<_>, _>>()?;
3272
3273 if matches!(&connection, CreateSinkConnection::Iceberg { .. }) {
3276 let cols: Vec<_> = desc.iter().collect();
3277 for &idx in &indices {
3278 let (col_name, col_type) = cols[idx];
3279 let scalar = &col_type.scalar_type;
3280 let is_valid = matches!(
3281 scalar,
3282 SqlScalarType::Bool
3284 | SqlScalarType::Int16
3285 | SqlScalarType::Int32
3286 | SqlScalarType::Int64
3287 | SqlScalarType::UInt16
3288 | SqlScalarType::UInt32
3289 | SqlScalarType::UInt64
3290 | SqlScalarType::Numeric { .. }
3292 | SqlScalarType::Date
3294 | SqlScalarType::Time
3295 | SqlScalarType::Timestamp { .. }
3296 | SqlScalarType::TimestampTz { .. }
3297 | SqlScalarType::Interval
3298 | SqlScalarType::MzTimestamp
3299 | SqlScalarType::String
3301 | SqlScalarType::Char { .. }
3302 | SqlScalarType::VarChar { .. }
3303 | SqlScalarType::PgLegacyChar
3304 | SqlScalarType::PgLegacyName
3305 | SqlScalarType::Bytes
3306 | SqlScalarType::Jsonb
3307 | SqlScalarType::Uuid
3309 | SqlScalarType::Oid
3310 | SqlScalarType::RegProc
3311 | SqlScalarType::RegType
3312 | SqlScalarType::RegClass
3313 | SqlScalarType::MzAclItem
3314 | SqlScalarType::AclItem
3315 | SqlScalarType::Int2Vector
3316 | SqlScalarType::Range { .. }
3318 );
3319 if !is_valid {
3320 return Err(PlanError::IcebergSinkUnsupportedKeyType {
3321 column: col_name.to_string(),
3322 column_type: format!("{:?}", scalar),
3323 });
3324 }
3325 }
3326 }
3327
3328 let is_valid_key = desc
3329 .typ()
3330 .keys
3331 .iter()
3332 .any(|key_columns| key_columns.iter().all(|column| indices.contains(column)));
3333
3334 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3335 if key.not_enforced {
3336 scx.catalog
3337 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3338 key: key_columns.clone(),
3339 name: name.item.clone(),
3340 })
3341 } else {
3342 return Err(PlanError::UpsertSinkWithInvalidKey {
3343 name: from_name.full_name_str(),
3344 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3345 valid_keys: desc
3346 .typ()
3347 .keys
3348 .iter()
3349 .map(|key| {
3350 key.iter()
3351 .map(|col| desc.get_name(*col).as_str().into())
3352 .collect()
3353 })
3354 .collect(),
3355 });
3356 }
3357 }
3358 Some(indices)
3359 }
3360 CreateSinkConnection::Kafka { key: None, .. }
3361 | CreateSinkConnection::Iceberg { key: None, .. } => None,
3362 };
3363
3364 if key_indices.is_some() && envelope == SinkEnvelope::Append {
3365 sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
3366 }
3367
3368 if envelope == SinkEnvelope::Append {
3370 if let CreateSinkConnection::Iceberg { .. } = &connection {
3371 use mz_storage_types::sinks::{
3372 ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
3373 };
3374 for (col_name, _) in desc.iter() {
3375 if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
3376 || col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
3377 {
3378 sql_bail!(
3379 "column {} conflicts with the system column that MODE APPEND \
3380 adds to the Iceberg table",
3381 col_name.quoted()
3382 );
3383 }
3384 }
3385 }
3386 }
3387
3388 let headers_index = match &connection {
3389 CreateSinkConnection::Kafka {
3390 headers: Some(headers),
3391 ..
3392 } => {
3393 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3394
3395 match envelope {
3396 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3397 SinkEnvelope::Debezium => {
3398 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3399 }
3400 };
3401
3402 let headers = normalize::column_name(headers.clone());
3403 let (idx, ty) = desc
3404 .get_by_name(&headers)
3405 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3406
3407 if desc.get_unambiguous_name(idx).is_none() {
3408 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3409 }
3410
3411 match &ty.scalar_type {
3412 SqlScalarType::Map { value_type, .. }
3413 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3414 _ => sql_bail!(
3415 "HEADERS column must have type map[text => text] or map[text => bytea]"
3416 ),
3417 }
3418
3419 Some(idx)
3420 }
3421 _ => None,
3422 };
3423
3424 let relation_key_indices = desc.typ().keys.get(0).cloned();
3426
3427 let key_desc_and_indices = key_indices.map(|key_indices| {
3428 let cols = desc
3429 .iter()
3430 .map(|(name, ty)| (name.clone(), ty.clone()))
3431 .collect::<Vec<_>>();
3432 let (names, types): (Vec<_>, Vec<_>) =
3433 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3434 let typ = SqlRelationType::new(types);
3435 (RelationDesc::new(typ, names), key_indices)
3436 });
3437
3438 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3439 return Err(PlanError::UpsertSinkWithoutKey);
3440 }
3441
3442 let CreateSinkOptionExtracted {
3443 snapshot,
3444 version,
3445 partition_strategy: _,
3446 seen: _,
3447 commit_interval,
3448 } = with_options.try_into()?;
3449
3450 let connection_builder = match connection {
3451 CreateSinkConnection::Kafka {
3452 connection,
3453 options,
3454 ..
3455 } => kafka_sink_builder(
3456 scx,
3457 connection,
3458 options,
3459 format,
3460 relation_key_indices,
3461 key_desc_and_indices,
3462 headers_index,
3463 desc.into_owned(),
3464 envelope,
3465 from.id(),
3466 commit_interval,
3467 )?,
3468 CreateSinkConnection::Iceberg {
3469 connection,
3470 aws_connection,
3471 options,
3472 ..
3473 } => iceberg_sink_builder(
3474 scx,
3475 connection,
3476 aws_connection,
3477 options,
3478 relation_key_indices,
3479 key_desc_and_indices,
3480 commit_interval,
3481 &desc,
3482 )?,
3483 };
3484
3485 let with_snapshot = snapshot.unwrap_or(true);
3487 let version = version.unwrap_or(0);
3489
3490 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3494 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3495
3496 Ok(Plan::CreateSink(CreateSinkPlan {
3497 name,
3498 sink: Sink {
3499 create_sql,
3500 from: from.global_id(),
3501 connection: connection_builder,
3502 envelope,
3503 version,
3504 commit_interval,
3505 },
3506 with_snapshot,
3507 if_not_exists,
3508 in_cluster: in_cluster.id(),
3509 }))
3510}
3511
3512fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3513 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3514
3515 let existing_keys = desc
3516 .typ()
3517 .keys
3518 .iter()
3519 .map(|key_columns| {
3520 key_columns
3521 .iter()
3522 .map(|col| desc.get_name(*col).as_str())
3523 .join(", ")
3524 })
3525 .join(", ");
3526
3527 sql_err!(
3528 "Key constraint ({}) conflicts with existing key ({})",
3529 user_keys,
3530 existing_keys
3531 )
3532}
3533
3534#[derive(Debug, Default, PartialEq, Clone)]
3537pub struct CsrConfigOptionExtracted {
3538 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3539 pub(crate) avro_key_fullname: Option<String>,
3540 pub(crate) avro_value_fullname: Option<String>,
3541 pub(crate) null_defaults: bool,
3542 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3543 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3544 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3545 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3546}
3547
3548impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3549 type Error = crate::plan::PlanError;
3550 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3551 let mut extracted = CsrConfigOptionExtracted::default();
3552 let mut common_doc_comments = BTreeMap::new();
3553 for option in v {
3554 if !extracted.seen.insert(option.name.clone()) {
3555 return Err(PlanError::Unstructured({
3556 format!("{} specified more than once", option.name)
3557 }));
3558 }
3559 let option_name = option.name.clone();
3560 let option_name_str = option_name.to_ast_string_simple();
3561 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3562 option_name: option_name.to_ast_string_simple(),
3563 err: e.into(),
3564 };
3565 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3566 val.map(|s| match s {
3567 WithOptionValue::Value(Value::String(s)) => {
3568 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3569 }
3570 _ => Err("must be a string".to_string()),
3571 })
3572 .transpose()
3573 .map_err(PlanError::Unstructured)
3574 .map_err(better_error)
3575 };
3576 match option.name {
3577 CsrConfigOptionName::AvroKeyFullname => {
3578 extracted.avro_key_fullname =
3579 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3580 }
3581 CsrConfigOptionName::AvroValueFullname => {
3582 extracted.avro_value_fullname =
3583 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3584 }
3585 CsrConfigOptionName::NullDefaults => {
3586 extracted.null_defaults =
3587 <bool>::try_from_value(option.value).map_err(better_error)?;
3588 }
3589 CsrConfigOptionName::AvroDocOn(doc_on) => {
3590 let value = String::try_from_value(option.value.ok_or_else(|| {
3591 PlanError::InvalidOptionValue {
3592 option_name: option_name_str,
3593 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3594 }
3595 })?)
3596 .map_err(better_error)?;
3597 let key = match doc_on.identifier {
3598 DocOnIdentifier::Column(ast::ColumnName {
3599 relation: ResolvedItemName::Item { id, .. },
3600 column: ResolvedColumnReference::Column { name, index: _ },
3601 }) => DocTarget::Field {
3602 object_id: id,
3603 column_name: name,
3604 },
3605 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3606 DocTarget::Type(id)
3607 }
3608 _ => sql_bail!("invalid DOC ON identifier"),
3609 };
3610
3611 match doc_on.for_schema {
3612 DocOnSchema::KeyOnly => {
3613 extracted.key_doc_options.insert(key, value);
3614 }
3615 DocOnSchema::ValueOnly => {
3616 extracted.value_doc_options.insert(key, value);
3617 }
3618 DocOnSchema::All => {
3619 common_doc_comments.insert(key, value);
3620 }
3621 }
3622 }
3623 CsrConfigOptionName::KeyCompatibilityLevel => {
3624 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3625 }
3626 CsrConfigOptionName::ValueCompatibilityLevel => {
3627 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3628 }
3629 }
3630 }
3631
3632 for (key, value) in common_doc_comments {
3633 if !extracted.key_doc_options.contains_key(&key) {
3634 extracted.key_doc_options.insert(key.clone(), value.clone());
3635 }
3636 if !extracted.value_doc_options.contains_key(&key) {
3637 extracted.value_doc_options.insert(key, value);
3638 }
3639 }
3640 Ok(extracted)
3641 }
3642}
3643
3644fn iceberg_sink_builder(
3645 scx: &StatementContext,
3646 catalog_connection: ResolvedItemName,
3647 aws_connection: ResolvedItemName,
3648 options: Vec<IcebergSinkConfigOption<Aug>>,
3649 relation_key_indices: Option<Vec<usize>>,
3650 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3651 commit_interval: Option<Duration>,
3652 desc: &RelationDesc,
3653) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3654 ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
3658 .map_err(|e| sql_err!("{}", e))?;
3659 let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
3660 let catalog_connection_id = catalog_connection_item.id();
3661 let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3662 let aws_connection_id = aws_connection_item.id();
3663 if !matches!(
3664 catalog_connection_item.connection()?,
3665 Connection::IcebergCatalog(_)
3666 ) {
3667 sql_bail!(
3668 "{} is not an iceberg catalog connection",
3669 scx.catalog
3670 .resolve_full_name(catalog_connection_item.name())
3671 .to_string()
3672 .quoted()
3673 );
3674 };
3675
3676 if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3677 sql_bail!(
3678 "{} is not an AWS connection",
3679 scx.catalog
3680 .resolve_full_name(aws_connection_item.name())
3681 .to_string()
3682 .quoted()
3683 );
3684 }
3685
3686 let IcebergSinkConfigOptionExtracted {
3687 table,
3688 namespace,
3689 seen: _,
3690 }: IcebergSinkConfigOptionExtracted = options.try_into()?;
3691
3692 let Some(table) = table else {
3693 sql_bail!("Iceberg sink must specify TABLE");
3694 };
3695 let Some(namespace) = namespace else {
3696 sql_bail!("Iceberg sink must specify NAMESPACE");
3697 };
3698 if commit_interval.is_none() {
3699 sql_bail!("Iceberg sink must specify COMMIT INTERVAL");
3700 }
3701
3702 Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
3703 catalog_connection_id,
3704 catalog_connection: catalog_connection_id,
3705 aws_connection_id,
3706 aws_connection: aws_connection_id,
3707 table,
3708 namespace,
3709 relation_key_indices,
3710 key_desc_and_indices,
3711 }))
3712}
3713
3714fn kafka_sink_builder(
3715 scx: &StatementContext,
3716 connection: ResolvedItemName,
3717 options: Vec<KafkaSinkConfigOption<Aug>>,
3718 format: Option<FormatSpecifier<Aug>>,
3719 relation_key_indices: Option<Vec<usize>>,
3720 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3721 headers_index: Option<usize>,
3722 value_desc: RelationDesc,
3723 envelope: SinkEnvelope,
3724 sink_from: CatalogItemId,
3725 commit_interval: Option<Duration>,
3726) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3727 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3729 let connection_id = connection_item.id();
3730 match connection_item.connection()? {
3731 Connection::Kafka(_) => (),
3732 _ => sql_bail!(
3733 "{} is not a kafka connection",
3734 scx.catalog.resolve_full_name(connection_item.name())
3735 ),
3736 };
3737
3738 if commit_interval.is_some() {
3739 sql_bail!("COMMIT INTERVAL option is not supported with KAFKA sinks");
3740 }
3741
3742 let KafkaSinkConfigOptionExtracted {
3743 topic,
3744 compression_type,
3745 partition_by,
3746 progress_group_id_prefix,
3747 transactional_id_prefix,
3748 legacy_ids,
3749 topic_config,
3750 topic_metadata_refresh_interval,
3751 topic_partition_count,
3752 topic_replication_factor,
3753 seen: _,
3754 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3755
3756 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3757 (Some(_), Some(true)) => {
3758 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3759 }
3760 (None, Some(true)) => KafkaIdStyle::Legacy,
3761 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3762 };
3763
3764 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3765 (Some(_), Some(true)) => {
3766 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3767 }
3768 (None, Some(true)) => KafkaIdStyle::Legacy,
3769 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3770 };
3771
3772 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3773
3774 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3775 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3778 }
3779
3780 let assert_positive = |val: Option<i32>, name: &str| {
3781 if let Some(val) = val {
3782 if val <= 0 {
3783 sql_bail!("{} must be a positive integer", name);
3784 }
3785 }
3786 val.map(NonNeg::try_from)
3787 .transpose()
3788 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3789 };
3790 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3791 let topic_replication_factor =
3792 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3793
3794 let gen_avro_schema_options = |conn| {
3797 let CsrConnectionAvro {
3798 connection:
3799 CsrConnection {
3800 connection,
3801 options,
3802 },
3803 seed,
3804 key_strategy,
3805 value_strategy,
3806 } = conn;
3807 if seed.is_some() {
3808 sql_bail!("SEED option does not make sense with sinks");
3809 }
3810 if key_strategy.is_some() {
3811 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3812 }
3813 if value_strategy.is_some() {
3814 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3815 }
3816
3817 let item = scx.get_item_by_resolved_name(&connection)?;
3818 let csr_connection = match item.connection()? {
3819 Connection::Csr(_) => item.id(),
3820 _ => {
3821 sql_bail!(
3822 "{} is not a schema registry connection",
3823 scx.catalog
3824 .resolve_full_name(item.name())
3825 .to_string()
3826 .quoted()
3827 )
3828 }
3829 };
3830 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3831
3832 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3833 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3834 }
3835
3836 if key_desc_and_indices.is_some()
3837 && (extracted_options.avro_key_fullname.is_some()
3838 ^ extracted_options.avro_value_fullname.is_some())
3839 {
3840 sql_bail!(
3841 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3842 );
3843 }
3844
3845 Ok((csr_connection, extracted_options))
3846 };
3847
3848 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3849 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3850 Format::Bytes if desc.arity() == 1 => {
3851 let col_type = &desc.typ().column_types[0].scalar_type;
3852 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3853 bail_unsupported!(format!(
3854 "BYTES format with non-encodable type: {:?}",
3855 col_type
3856 ));
3857 }
3858
3859 Ok(KafkaSinkFormatType::Bytes)
3860 }
3861 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3862 Format::Bytes | Format::Text => {
3863 bail_unsupported!("BYTES or TEXT format with multiple columns")
3864 }
3865 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3866 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3867 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3868 let schema = if is_key {
3869 AvroSchemaGenerator::new(
3870 desc.clone(),
3871 false,
3872 options.key_doc_options,
3873 options.avro_key_fullname.as_deref().unwrap_or("row"),
3874 options.null_defaults,
3875 Some(sink_from),
3876 false,
3877 )?
3878 .schema()
3879 .to_string()
3880 } else {
3881 AvroSchemaGenerator::new(
3882 desc.clone(),
3883 matches!(envelope, SinkEnvelope::Debezium),
3884 options.value_doc_options,
3885 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3886 options.null_defaults,
3887 Some(sink_from),
3888 true,
3889 )?
3890 .schema()
3891 .to_string()
3892 };
3893 Ok(KafkaSinkFormatType::Avro {
3894 schema,
3895 compatibility_level: if is_key {
3896 options.key_compatibility_level
3897 } else {
3898 options.value_compatibility_level
3899 },
3900 csr_connection,
3901 })
3902 }
3903 format => bail_unsupported!(format!("sink format {:?}", format)),
3904 };
3905
3906 let partition_by = match &partition_by {
3907 Some(partition_by) => {
3908 let mut scope = Scope::from_source(None, value_desc.iter_names());
3909
3910 match envelope {
3911 SinkEnvelope::Upsert | SinkEnvelope::Append => (),
3912 SinkEnvelope::Debezium => {
3913 let key_indices: HashSet<_> = key_desc_and_indices
3914 .as_ref()
3915 .map(|(_desc, indices)| indices.as_slice())
3916 .unwrap_or_default()
3917 .into_iter()
3918 .collect();
3919 for (i, item) in scope.items.iter_mut().enumerate() {
3920 if !key_indices.contains(&i) {
3921 item.error_if_referenced = Some(|_table, column| {
3922 PlanError::InvalidPartitionByEnvelopeDebezium {
3923 column_name: column.to_string(),
3924 }
3925 });
3926 }
3927 }
3928 }
3929 };
3930
3931 let ecx = &ExprContext {
3932 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3933 name: "PARTITION BY",
3934 scope: &scope,
3935 relation_type: value_desc.typ(),
3936 allow_aggregates: false,
3937 allow_subqueries: false,
3938 allow_parameters: false,
3939 allow_windows: false,
3940 };
3941 let expr = plan_expr(ecx, partition_by)?.cast_to(
3942 ecx,
3943 CastContext::Assignment,
3944 &SqlScalarType::UInt64,
3945 )?;
3946 let expr = expr.lower_uncorrelated(scx.catalog.system_vars())?;
3947
3948 Some(expr)
3949 }
3950 _ => None,
3951 };
3952
3953 let format = match format {
3955 Some(FormatSpecifier::KeyValue { key, value }) => {
3956 let key_format = match key_desc_and_indices.as_ref() {
3957 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3958 None => None,
3959 };
3960 KafkaSinkFormat {
3961 value_format: map_format(value, &value_desc, false)?,
3962 key_format,
3963 }
3964 }
3965 Some(FormatSpecifier::Bare(format)) => {
3966 let key_format = match key_desc_and_indices.as_ref() {
3967 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3968 None => None,
3969 };
3970 KafkaSinkFormat {
3971 value_format: map_format(format, &value_desc, false)?,
3972 key_format,
3973 }
3974 }
3975 None => bail_unsupported!("sink without format"),
3976 };
3977
3978 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3979 connection_id,
3980 connection: connection_id,
3981 format,
3982 topic: topic_name,
3983 relation_key_indices,
3984 key_desc_and_indices,
3985 headers_index,
3986 value_desc,
3987 partition_by,
3988 compression_type,
3989 progress_group_id,
3990 transactional_id,
3991 topic_options: KafkaTopicOptions {
3992 partition_count: topic_partition_count,
3993 replication_factor: topic_replication_factor,
3994 topic_config: topic_config.unwrap_or_default(),
3995 },
3996 topic_metadata_refresh_interval,
3997 }))
3998}
3999
4000pub fn describe_create_index(
4001 _: &StatementContext,
4002 _: CreateIndexStatement<Aug>,
4003) -> Result<StatementDesc, PlanError> {
4004 Ok(StatementDesc::new(None))
4005}
4006
4007pub fn plan_create_index(
4008 scx: &StatementContext,
4009 mut stmt: CreateIndexStatement<Aug>,
4010) -> Result<Plan, PlanError> {
4011 let CreateIndexStatement {
4012 name,
4013 on_name,
4014 in_cluster,
4015 key_parts,
4016 with_options,
4017 if_not_exists,
4018 } = &mut stmt;
4019 let on = scx.get_item_by_resolved_name(on_name)?;
4020
4021 {
4022 use CatalogItemType::*;
4023 match on.item_type() {
4024 Table | Source | View | MaterializedView => {
4025 if on.replacement_target().is_some() {
4026 sql_bail!(
4027 "index cannot be created on {} because it is a replacement {}",
4028 on_name.full_name_str(),
4029 on.item_type(),
4030 );
4031 }
4032 }
4033 Sink | Index | Type | Func | Secret | Connection => {
4034 sql_bail!(
4035 "index cannot be created on {} because it is a {}",
4036 on_name.full_name_str(),
4037 on.item_type(),
4038 );
4039 }
4040 }
4041 }
4042
4043 let on_desc = on
4044 .relation_desc()
4045 .ok_or_else(|| sql_err!("item does not have a relation description"))?;
4046
4047 let filled_key_parts = match key_parts {
4048 Some(kp) => kp.to_vec(),
4049 None => {
4050 let mut name_counts = BTreeMap::new();
4055 for name in on_desc.iter_names() {
4056 *name_counts.entry(name).or_insert(0usize) += 1;
4057 }
4058 let key = on_desc.typ().default_key();
4059 key.iter()
4060 .map(|i| {
4061 let name = on_desc.get_name(*i);
4062 if name_counts.get(name).copied() == Some(1) {
4063 Expr::Identifier(vec![name.clone().into()])
4064 } else {
4065 Expr::Value(Value::Number((i + 1).to_string()))
4066 }
4067 })
4068 .collect()
4069 }
4070 };
4071 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4072
4073 let index_name = if let Some(name) = name {
4074 QualifiedItemName {
4075 qualifiers: on.name().qualifiers.clone(),
4076 item: normalize::ident(name.clone()),
4077 }
4078 } else {
4079 let mut idx_name = QualifiedItemName {
4080 qualifiers: on.name().qualifiers.clone(),
4081 item: on.name().item.clone(),
4082 };
4083 if key_parts.is_none() {
4084 idx_name.item += "_primary_idx";
4086 } else {
4087 let index_name_col_suffix = keys
4090 .iter()
4091 .map(|k| match k {
4092 mz_expr::MirScalarExpr::Column(i, name) => {
4093 match (on_desc.get_unambiguous_name(*i), &name.0) {
4094 (Some(col_name), _) => col_name.to_string(),
4095 (None, Some(name)) => name.to_string(),
4096 (None, None) => format!("{}", i + 1),
4097 }
4098 }
4099 _ => "expr".to_string(),
4100 })
4101 .join("_");
4102 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4103 .expect("write on strings cannot fail");
4104 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4105 }
4106
4107 if !*if_not_exists {
4108 scx.catalog.find_available_name(idx_name)
4109 } else {
4110 idx_name
4111 }
4112 };
4113
4114 let full_name = scx.catalog.resolve_full_name(&index_name);
4116 let partial_name = PartialItemName::from(full_name.clone());
4117 if let (Ok(item), false, false) = (
4125 scx.catalog.resolve_item_or_type(&partial_name),
4126 *if_not_exists,
4127 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4128 ) {
4129 return Err(PlanError::ItemAlreadyExists {
4130 name: full_name.to_string(),
4131 item_type: item.item_type(),
4132 });
4133 }
4134
4135 let options = plan_index_options(scx, with_options.clone())?;
4136 let cluster_id = match in_cluster {
4137 None => scx.resolve_cluster(None)?.id(),
4138 Some(in_cluster) => in_cluster.id,
4139 };
4140
4141 *in_cluster = Some(ResolvedClusterName {
4142 id: cluster_id,
4143 print_name: None,
4144 });
4145
4146 *name = Some(Ident::new(index_name.item.clone())?);
4148 *key_parts = Some(filled_key_parts);
4149 let if_not_exists = *if_not_exists;
4150
4151 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4152 let compaction_window = options.iter().find_map(|o| {
4153 #[allow(irrefutable_let_patterns)]
4154 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4155 Some(lcw.clone())
4156 } else {
4157 None
4158 }
4159 });
4160
4161 Ok(Plan::CreateIndex(CreateIndexPlan {
4162 name: index_name,
4163 index: Index {
4164 create_sql,
4165 on: on.global_id(),
4166 keys,
4167 cluster_id,
4168 compaction_window,
4169 },
4170 if_not_exists,
4171 }))
4172}
4173
4174pub fn describe_create_type(
4175 _: &StatementContext,
4176 _: CreateTypeStatement<Aug>,
4177) -> Result<StatementDesc, PlanError> {
4178 Ok(StatementDesc::new(None))
4179}
4180
4181pub fn plan_create_type(
4182 scx: &StatementContext,
4183 stmt: CreateTypeStatement<Aug>,
4184) -> Result<Plan, PlanError> {
4185 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4186 let CreateTypeStatement { name, as_type, .. } = stmt;
4187
4188 fn validate_data_type(
4189 scx: &StatementContext,
4190 data_type: ResolvedDataType,
4191 as_type: &str,
4192 key: &str,
4193 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4194 let (id, modifiers) = match data_type {
4195 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4196 _ => sql_bail!(
4197 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4198 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4199 as_type,
4200 key,
4201 data_type.human_readable_name(),
4202 ),
4203 };
4204
4205 let item = scx.catalog.get_item(&id);
4206 match item.type_details() {
4207 None => sql_bail!(
4208 "{} must be of class type, but received {} which is of class {}",
4209 key,
4210 scx.catalog.resolve_full_name(item.name()),
4211 item.item_type()
4212 ),
4213 Some(CatalogTypeDetails {
4214 typ: CatalogType::Char,
4215 ..
4216 }) => {
4217 bail_unsupported!("embedding char type in a list or map")
4218 }
4219 _ => {
4220 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4222
4223 Ok((id, modifiers))
4224 }
4225 }
4226 }
4227
4228 let inner = match as_type {
4229 CreateTypeAs::List { options } => {
4230 let CreateTypeListOptionExtracted {
4231 element_type,
4232 seen: _,
4233 } = CreateTypeListOptionExtracted::try_from(options)?;
4234 let element_type =
4235 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4236 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4237 CatalogType::List {
4238 element_reference: id,
4239 element_modifiers: modifiers,
4240 }
4241 }
4242 CreateTypeAs::Map { options } => {
4243 let CreateTypeMapOptionExtracted {
4244 key_type,
4245 value_type,
4246 seen: _,
4247 } = CreateTypeMapOptionExtracted::try_from(options)?;
4248 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4249 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4250 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4251 let (value_id, value_modifiers) =
4252 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4253 CatalogType::Map {
4254 key_reference: key_id,
4255 key_modifiers,
4256 value_reference: value_id,
4257 value_modifiers,
4258 }
4259 }
4260 CreateTypeAs::Record { column_defs } => {
4261 let mut fields = vec![];
4262 for column_def in column_defs {
4263 let data_type = column_def.data_type;
4264 let key = ident(column_def.name.clone());
4265 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4266 fields.push(CatalogRecordField {
4267 name: ColumnName::from(key.clone()),
4268 type_reference: id,
4269 type_modifiers: modifiers,
4270 });
4271 }
4272 CatalogType::Record { fields }
4273 }
4274 };
4275
4276 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4277
4278 let full_name = scx.catalog.resolve_full_name(&name);
4280 let partial_name = PartialItemName::from(full_name.clone());
4281 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4284 if item.item_type().conflicts_with_type() {
4285 return Err(PlanError::ItemAlreadyExists {
4286 name: full_name.to_string(),
4287 item_type: item.item_type(),
4288 });
4289 }
4290 }
4291
4292 Ok(Plan::CreateType(CreateTypePlan {
4293 name,
4294 typ: Type { create_sql, inner },
4295 }))
4296}
4297
4298generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4299
4300generate_extracted_config!(
4301 CreateTypeMapOption,
4302 (KeyType, ResolvedDataType),
4303 (ValueType, ResolvedDataType)
4304);
4305
4306#[derive(Debug)]
4307pub enum PlannedAlterRoleOption {
4308 Attributes(PlannedRoleAttributes),
4309 Variable(PlannedRoleVariable),
4310}
4311
4312#[derive(Debug, Clone)]
4313pub struct PlannedRoleAttributes {
4314 pub inherit: Option<bool>,
4315 pub password: Option<Password>,
4316 pub scram_iterations: Option<NonZeroU32>,
4317 pub nopassword: Option<bool>,
4321 pub superuser: Option<bool>,
4322 pub login: Option<bool>,
4323}
4324
4325fn plan_role_attributes(
4326 options: Vec<RoleAttribute>,
4327 scx: &StatementContext,
4328) -> Result<PlannedRoleAttributes, PlanError> {
4329 let mut planned_attributes = PlannedRoleAttributes {
4330 inherit: None,
4331 password: None,
4332 scram_iterations: None,
4333 superuser: None,
4334 login: None,
4335 nopassword: None,
4336 };
4337
4338 for option in options {
4339 match option {
4340 RoleAttribute::Inherit | RoleAttribute::NoInherit
4341 if planned_attributes.inherit.is_some() =>
4342 {
4343 sql_bail!("conflicting or redundant options");
4344 }
4345 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4346 bail_never_supported!(
4347 "CREATECLUSTER attribute",
4348 "sql/create-role/#details",
4349 "Use system privileges instead."
4350 );
4351 }
4352 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4353 bail_never_supported!(
4354 "CREATEDB attribute",
4355 "sql/create-role/#details",
4356 "Use system privileges instead."
4357 );
4358 }
4359 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4360 bail_never_supported!(
4361 "CREATEROLE attribute",
4362 "sql/create-role/#details",
4363 "Use system privileges instead."
4364 );
4365 }
4366 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4367 sql_bail!("conflicting or redundant options");
4368 }
4369
4370 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4371 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4372 RoleAttribute::Password(password) => {
4373 if let Some(password) = password {
4374 planned_attributes.password = Some(password.into());
4375 planned_attributes.scram_iterations =
4376 Some(scx.catalog.system_vars().scram_iterations())
4377 } else {
4378 planned_attributes.nopassword = Some(true);
4379 }
4380 }
4381 RoleAttribute::SuperUser => {
4382 if planned_attributes.superuser == Some(false) {
4383 sql_bail!("conflicting or redundant options");
4384 }
4385 planned_attributes.superuser = Some(true);
4386 }
4387 RoleAttribute::NoSuperUser => {
4388 if planned_attributes.superuser == Some(true) {
4389 sql_bail!("conflicting or redundant options");
4390 }
4391 planned_attributes.superuser = Some(false);
4392 }
4393 RoleAttribute::Login => {
4394 if planned_attributes.login == Some(false) {
4395 sql_bail!("conflicting or redundant options");
4396 }
4397 planned_attributes.login = Some(true);
4398 }
4399 RoleAttribute::NoLogin => {
4400 if planned_attributes.login == Some(true) {
4401 sql_bail!("conflicting or redundant options");
4402 }
4403 planned_attributes.login = Some(false);
4404 }
4405 }
4406 }
4407 if planned_attributes.inherit == Some(false) {
4408 bail_unsupported!("non inherit roles");
4409 }
4410
4411 Ok(planned_attributes)
4412}
4413
4414#[derive(Debug)]
4415pub enum PlannedRoleVariable {
4416 Set { name: String, value: VariableValue },
4417 Reset { name: String },
4418}
4419
4420impl PlannedRoleVariable {
4421 pub fn name(&self) -> &str {
4422 match self {
4423 PlannedRoleVariable::Set { name, .. } => name,
4424 PlannedRoleVariable::Reset { name } => name,
4425 }
4426 }
4427}
4428
4429fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4430 let plan = match variable {
4431 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4432 name: name.to_string(),
4433 value: scl::plan_set_variable_to(value)?,
4434 },
4435 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4436 name: name.to_string(),
4437 },
4438 };
4439 Ok(plan)
4440}
4441
4442pub fn describe_create_role(
4443 _: &StatementContext,
4444 _: CreateRoleStatement,
4445) -> Result<StatementDesc, PlanError> {
4446 Ok(StatementDesc::new(None))
4447}
4448
4449pub fn plan_create_role(
4450 scx: &StatementContext,
4451 CreateRoleStatement { name, options }: CreateRoleStatement,
4452) -> Result<Plan, PlanError> {
4453 let attributes = plan_role_attributes(options, scx)?;
4454 Ok(Plan::CreateRole(CreateRolePlan {
4455 name: normalize::ident(name),
4456 attributes: attributes.into(),
4457 }))
4458}
4459
4460pub fn plan_create_network_policy(
4461 ctx: &StatementContext,
4462 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4463) -> Result<Plan, PlanError> {
4464 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4465 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4466
4467 let Some(rule_defs) = policy_options.rules else {
4468 sql_bail!("RULES must be specified when creating network policies.");
4469 };
4470
4471 let mut rules = vec![];
4472 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4473 let NetworkPolicyRuleOptionExtracted {
4474 seen: _,
4475 direction,
4476 action,
4477 address,
4478 } = options.try_into()?;
4479 let (direction, action, address) = match (direction, action, address) {
4480 (Some(direction), Some(action), Some(address)) => (
4481 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4482 NetworkPolicyRuleAction::try_from(action.as_str())?,
4483 PolicyAddress::try_from(address.as_str())?,
4484 ),
4485 (_, _, _) => {
4486 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4487 }
4488 };
4489 rules.push(NetworkPolicyRule {
4490 name: normalize::ident(name),
4491 direction,
4492 action,
4493 address,
4494 });
4495 }
4496
4497 if rules.len()
4498 > ctx
4499 .catalog
4500 .system_vars()
4501 .max_rules_per_network_policy()
4502 .try_into()?
4503 {
4504 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4505 }
4506
4507 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4508 name: normalize::ident(name),
4509 rules,
4510 }))
4511}
4512
4513pub fn plan_alter_network_policy(
4514 ctx: &StatementContext,
4515 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4516) -> Result<Plan, PlanError> {
4517 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4518
4519 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4520 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4521
4522 let Some(rule_defs) = policy_options.rules else {
4523 sql_bail!("RULES must be specified when creating network policies.");
4524 };
4525
4526 let mut rules = vec![];
4527 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4528 let NetworkPolicyRuleOptionExtracted {
4529 seen: _,
4530 direction,
4531 action,
4532 address,
4533 } = options.try_into()?;
4534
4535 let (direction, action, address) = match (direction, action, address) {
4536 (Some(direction), Some(action), Some(address)) => (
4537 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4538 NetworkPolicyRuleAction::try_from(action.as_str())?,
4539 PolicyAddress::try_from(address.as_str())?,
4540 ),
4541 (_, _, _) => {
4542 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4543 }
4544 };
4545 rules.push(NetworkPolicyRule {
4546 name: normalize::ident(name),
4547 direction,
4548 action,
4549 address,
4550 });
4551 }
4552 if rules.len()
4553 > ctx
4554 .catalog
4555 .system_vars()
4556 .max_rules_per_network_policy()
4557 .try_into()?
4558 {
4559 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4560 }
4561
4562 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4563 id: policy.id(),
4564 name: normalize::ident(name),
4565 rules,
4566 }))
4567}
4568
4569pub fn describe_create_cluster(
4570 _: &StatementContext,
4571 _: CreateClusterStatement<Aug>,
4572) -> Result<StatementDesc, PlanError> {
4573 Ok(StatementDesc::new(None))
4574}
4575
4576generate_extracted_config!(
4582 ClusterOption,
4583 (AvailabilityZones, Vec<String>),
4584 (Disk, bool),
4585 (IntrospectionDebugging, bool),
4586 (IntrospectionInterval, OptionalDuration),
4587 (Managed, bool),
4588 (Replicas, Vec<ReplicaDefinition<Aug>>),
4589 (ReplicationFactor, u32),
4590 (Size, String),
4591 (Schedule, ClusterScheduleOptionValue),
4592 (WorkloadClass, OptionalString)
4593);
4594
4595generate_extracted_config!(
4596 NetworkPolicyOption,
4597 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4598);
4599
4600generate_extracted_config!(
4601 NetworkPolicyRuleOption,
4602 (Direction, String),
4603 (Action, String),
4604 (Address, String)
4605);
4606
4607generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4608
4609generate_extracted_config!(
4610 ClusterAlterUntilReadyOption,
4611 (Timeout, Duration),
4612 (OnTimeout, String)
4613);
4614
4615generate_extracted_config!(
4616 ClusterFeature,
4617 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4618 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4619 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4620 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4621 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4622 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4623 (
4624 EnableProjectionPushdownAfterRelationCse,
4625 Option<bool>,
4626 Default(None)
4627 )
4628);
4629
4630pub fn plan_create_cluster(
4634 scx: &StatementContext,
4635 stmt: CreateClusterStatement<Aug>,
4636) -> Result<Plan, PlanError> {
4637 let plan = plan_create_cluster_inner(scx, stmt)?;
4638
4639 if let CreateClusterVariant::Managed(_) = &plan.variant {
4641 let stmt = unplan_create_cluster(scx, plan.clone())
4642 .map_err(|e| PlanError::Replan(e.to_string()))?;
4643 let create_sql = stmt.to_ast_string_stable();
4644 let stmt = parse::parse(&create_sql)
4645 .map_err(|e| PlanError::Replan(e.to_string()))?
4646 .into_element()
4647 .ast;
4648 let (stmt, _resolved_ids) =
4649 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4650 let stmt = match stmt {
4651 Statement::CreateCluster(stmt) => stmt,
4652 stmt => {
4653 return Err(PlanError::Replan(format!(
4654 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4655 )));
4656 }
4657 };
4658 let replan =
4659 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4660 if plan != replan {
4661 return Err(PlanError::Replan(format!(
4662 "replan does not match: plan={plan:?}, replan={replan:?}"
4663 )));
4664 }
4665 }
4666
4667 Ok(Plan::CreateCluster(plan))
4668}
4669
4670pub fn plan_create_cluster_inner(
4671 scx: &StatementContext,
4672 CreateClusterStatement {
4673 name,
4674 options,
4675 features,
4676 }: CreateClusterStatement<Aug>,
4677) -> Result<CreateClusterPlan, PlanError> {
4678 let ClusterOptionExtracted {
4679 availability_zones,
4680 introspection_debugging,
4681 introspection_interval,
4682 managed,
4683 replicas,
4684 replication_factor,
4685 seen: _,
4686 size,
4687 disk,
4688 schedule,
4689 workload_class,
4690 }: ClusterOptionExtracted = options.try_into()?;
4691
4692 let managed = managed.unwrap_or_else(|| replicas.is_none());
4693
4694 if !scx.catalog.active_role_id().is_system() {
4695 if !features.is_empty() {
4696 sql_bail!("FEATURES not supported for non-system users");
4697 }
4698 if workload_class.is_some() {
4699 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4700 }
4701 }
4702
4703 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4704 let workload_class = workload_class.and_then(|v| v.0);
4705
4706 if managed {
4707 if replicas.is_some() {
4708 sql_bail!("REPLICAS not supported for managed clusters");
4709 }
4710 let Some(size) = size else {
4711 sql_bail!("SIZE must be specified for managed clusters");
4712 };
4713
4714 if disk.is_some() {
4715 if scx.catalog.is_cluster_size_cc(&size) {
4719 sql_bail!(
4720 "DISK option not supported for modern cluster sizes because disk is always enabled"
4721 );
4722 }
4723
4724 scx.catalog
4725 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4726 }
4727
4728 let compute = plan_compute_replica_config(
4729 introspection_interval,
4730 introspection_debugging.unwrap_or(false),
4731 )?;
4732
4733 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4734 replication_factor.unwrap_or_else(|| {
4735 scx.catalog
4736 .system_vars()
4737 .default_cluster_replication_factor()
4738 })
4739 } else {
4740 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4741 if replication_factor.is_some() {
4742 sql_bail!(
4743 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4744 );
4745 }
4746 0
4750 };
4751 let availability_zones = availability_zones.unwrap_or_default();
4752
4753 if !availability_zones.is_empty() {
4754 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4755 }
4756
4757 let ClusterFeatureExtracted {
4759 reoptimize_imported_views,
4760 enable_eager_delta_joins,
4761 enable_new_outer_join_lowering,
4762 enable_variadic_left_join_lowering,
4763 enable_letrec_fixpoint_analysis,
4764 enable_join_prioritize_arranged,
4765 enable_projection_pushdown_after_relation_cse,
4766 seen: _,
4767 } = ClusterFeatureExtracted::try_from(features)?;
4768 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4769 reoptimize_imported_views,
4770 enable_eager_delta_joins,
4771 enable_new_outer_join_lowering,
4772 enable_variadic_left_join_lowering,
4773 enable_letrec_fixpoint_analysis,
4774 enable_join_prioritize_arranged,
4775 enable_projection_pushdown_after_relation_cse,
4776 ..Default::default()
4777 };
4778
4779 let schedule = plan_cluster_schedule(schedule)?;
4780
4781 Ok(CreateClusterPlan {
4782 name: normalize::ident(name),
4783 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4784 replication_factor,
4785 size,
4786 availability_zones,
4787 compute,
4788 optimizer_feature_overrides,
4789 schedule,
4790 }),
4791 workload_class,
4792 })
4793 } else {
4794 let Some(replica_defs) = replicas else {
4795 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4796 };
4797 if availability_zones.is_some() {
4798 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4799 }
4800 if replication_factor.is_some() {
4801 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4802 }
4803 if introspection_debugging.is_some() {
4804 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4805 }
4806 if introspection_interval.is_some() {
4807 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4808 }
4809 if size.is_some() {
4810 sql_bail!("SIZE not supported for unmanaged clusters");
4811 }
4812 if disk.is_some() {
4813 sql_bail!("DISK not supported for unmanaged clusters");
4814 }
4815 if !features.is_empty() {
4816 sql_bail!("FEATURES not supported for unmanaged clusters");
4817 }
4818 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4819 sql_bail!(
4820 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4821 );
4822 }
4823
4824 let mut replicas = vec![];
4825 for ReplicaDefinition { name, options } in replica_defs {
4826 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4827 }
4828
4829 Ok(CreateClusterPlan {
4830 name: normalize::ident(name),
4831 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4832 workload_class,
4833 })
4834 }
4835}
4836
4837pub fn unplan_create_cluster(
4841 scx: &StatementContext,
4842 CreateClusterPlan {
4843 name,
4844 variant,
4845 workload_class,
4846 }: CreateClusterPlan,
4847) -> Result<CreateClusterStatement<Aug>, PlanError> {
4848 match variant {
4849 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4850 replication_factor,
4851 size,
4852 availability_zones,
4853 compute,
4854 optimizer_feature_overrides,
4855 schedule,
4856 }) => {
4857 let schedule = unplan_cluster_schedule(schedule);
4858 let OptimizerFeatureOverrides {
4859 enable_consolidate_after_union_negate: _,
4860 enable_reduce_mfp_fusion: _,
4861 enable_cardinality_estimates: _,
4862 persist_fast_path_limit: _,
4863 reoptimize_imported_views,
4864 enable_eager_delta_joins,
4865 enable_new_outer_join_lowering,
4866 enable_variadic_left_join_lowering,
4867 enable_letrec_fixpoint_analysis,
4868 enable_join_prioritize_arranged,
4869 enable_projection_pushdown_after_relation_cse,
4870 enable_less_reduce_in_eqprop: _,
4871 enable_dequadratic_eqprop_map: _,
4872 enable_eq_classes_withholding_errors: _,
4873 enable_fast_path_plan_insights: _,
4874 enable_cast_elimination: _,
4875 enable_case_literal_transform: _,
4876 enable_simplify_quantified_comparisons: _,
4877 enable_coalesce_case_transform: _,
4878 } = optimizer_feature_overrides;
4879 let features_extracted = ClusterFeatureExtracted {
4881 seen: Default::default(),
4883 reoptimize_imported_views,
4884 enable_eager_delta_joins,
4885 enable_new_outer_join_lowering,
4886 enable_variadic_left_join_lowering,
4887 enable_letrec_fixpoint_analysis,
4888 enable_join_prioritize_arranged,
4889 enable_projection_pushdown_after_relation_cse,
4890 };
4891 let features = features_extracted.into_values(scx.catalog);
4892 let availability_zones = if availability_zones.is_empty() {
4893 None
4894 } else {
4895 Some(availability_zones)
4896 };
4897 let (introspection_interval, introspection_debugging) =
4898 unplan_compute_replica_config(compute);
4899 let replication_factor = match &schedule {
4902 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4903 ClusterScheduleOptionValue::Refresh { .. } => {
4904 assert!(
4905 replication_factor <= 1,
4906 "replication factor, {replication_factor:?}, must be <= 1"
4907 );
4908 None
4909 }
4910 };
4911 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4912 let options_extracted = ClusterOptionExtracted {
4913 seen: Default::default(),
4915 availability_zones,
4916 disk: None,
4917 introspection_debugging: Some(introspection_debugging),
4918 introspection_interval,
4919 managed: Some(true),
4920 replicas: None,
4921 replication_factor,
4922 size: Some(size),
4923 schedule: Some(schedule),
4924 workload_class,
4925 };
4926 let options = options_extracted.into_values(scx.catalog);
4927 let name = Ident::new_unchecked(name);
4928 Ok(CreateClusterStatement {
4929 name,
4930 options,
4931 features,
4932 })
4933 }
4934 CreateClusterVariant::Unmanaged(_) => {
4935 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4936 }
4937 }
4938}
4939
4940generate_extracted_config!(
4941 ReplicaOption,
4942 (AvailabilityZone, String),
4943 (BilledAs, String),
4944 (ComputeAddresses, Vec<String>),
4945 (ComputectlAddresses, Vec<String>),
4946 (Disk, bool),
4947 (Internal, bool, Default(false)),
4948 (IntrospectionDebugging, bool, Default(false)),
4949 (IntrospectionInterval, OptionalDuration),
4950 (Size, String),
4951 (StorageAddresses, Vec<String>),
4952 (StoragectlAddresses, Vec<String>),
4953 (Workers, u16)
4954);
4955
4956fn plan_replica_config(
4957 scx: &StatementContext,
4958 options: Vec<ReplicaOption<Aug>>,
4959) -> Result<ReplicaConfig, PlanError> {
4960 let ReplicaOptionExtracted {
4961 availability_zone,
4962 billed_as,
4963 computectl_addresses,
4964 disk,
4965 internal,
4966 introspection_debugging,
4967 introspection_interval,
4968 size,
4969 storagectl_addresses,
4970 ..
4971 }: ReplicaOptionExtracted = options.try_into()?;
4972
4973 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4974
4975 match (
4976 size,
4977 availability_zone,
4978 billed_as,
4979 storagectl_addresses,
4980 computectl_addresses,
4981 ) {
4982 (None, _, None, None, None) => {
4984 sql_bail!("SIZE option must be specified");
4987 }
4988 (Some(size), availability_zone, billed_as, None, None) => {
4989 if disk.is_some() {
4990 if scx.catalog.is_cluster_size_cc(&size) {
4994 sql_bail!(
4995 "DISK option not supported for modern cluster sizes because disk is always enabled"
4996 );
4997 }
4998
4999 scx.catalog
5000 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
5001 }
5002
5003 Ok(ReplicaConfig::Orchestrated {
5004 size,
5005 availability_zone,
5006 compute,
5007 billed_as,
5008 internal,
5009 })
5010 }
5011
5012 (None, None, None, storagectl_addresses, computectl_addresses) => {
5013 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
5014
5015 let Some(storagectl_addrs) = storagectl_addresses else {
5019 sql_bail!("missing STORAGECTL ADDRESSES option");
5020 };
5021 let Some(computectl_addrs) = computectl_addresses else {
5022 sql_bail!("missing COMPUTECTL ADDRESSES option");
5023 };
5024
5025 if storagectl_addrs.len() != computectl_addrs.len() {
5026 sql_bail!(
5027 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
5028 );
5029 }
5030
5031 if disk.is_some() {
5032 sql_bail!("DISK can't be specified for unorchestrated clusters");
5033 }
5034
5035 Ok(ReplicaConfig::Unorchestrated {
5036 storagectl_addrs,
5037 computectl_addrs,
5038 compute,
5039 })
5040 }
5041 _ => {
5042 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
5045 }
5046 }
5047}
5048
5049fn plan_compute_replica_config(
5053 introspection_interval: Option<OptionalDuration>,
5054 introspection_debugging: bool,
5055) -> Result<ComputeReplicaConfig, PlanError> {
5056 let introspection_interval = introspection_interval
5057 .map(|OptionalDuration(i)| i)
5058 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5059 let introspection = match introspection_interval {
5060 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5061 interval,
5062 debugging: introspection_debugging,
5063 }),
5064 None if introspection_debugging => {
5065 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5066 }
5067 None => None,
5068 };
5069 let compute = ComputeReplicaConfig { introspection };
5070 Ok(compute)
5071}
5072
5073fn unplan_compute_replica_config(
5077 compute_replica_config: ComputeReplicaConfig,
5078) -> (Option<OptionalDuration>, bool) {
5079 match compute_replica_config.introspection {
5080 Some(ComputeReplicaIntrospectionConfig {
5081 debugging,
5082 interval,
5083 }) => (Some(OptionalDuration(Some(interval))), debugging),
5084 None => (Some(OptionalDuration(None)), false),
5085 }
5086}
5087
5088fn plan_cluster_schedule(
5092 schedule: ClusterScheduleOptionValue,
5093) -> Result<ClusterSchedule, PlanError> {
5094 Ok(match schedule {
5095 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5096 ClusterScheduleOptionValue::Refresh {
5098 hydration_time_estimate: None,
5099 } => ClusterSchedule::Refresh {
5100 hydration_time_estimate: Duration::from_millis(0),
5101 },
5102 ClusterScheduleOptionValue::Refresh {
5104 hydration_time_estimate: Some(interval_value),
5105 } => {
5106 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5107 if interval.as_microseconds() < 0 {
5108 sql_bail!(
5109 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5110 interval
5111 );
5112 }
5113 if interval.months != 0 {
5114 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5118 }
5119 let duration = interval.duration()?;
5120 if u64::try_from(duration.as_millis()).is_err()
5121 || Interval::from_duration(&duration).is_err()
5122 {
5123 sql_bail!("HYDRATION TIME ESTIMATE too large");
5124 }
5125 ClusterSchedule::Refresh {
5126 hydration_time_estimate: duration,
5127 }
5128 }
5129 })
5130}
5131
5132fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5136 match schedule {
5137 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5138 ClusterSchedule::Refresh {
5139 hydration_time_estimate,
5140 } => {
5141 let interval = Interval::from_duration(&hydration_time_estimate)
5142 .expect("planning ensured that this is convertible back to Interval");
5143 let interval_value = literal::unplan_interval(&interval);
5144 ClusterScheduleOptionValue::Refresh {
5145 hydration_time_estimate: Some(interval_value),
5146 }
5147 }
5148 }
5149}
5150
5151pub fn describe_create_cluster_replica(
5152 _: &StatementContext,
5153 _: CreateClusterReplicaStatement<Aug>,
5154) -> Result<StatementDesc, PlanError> {
5155 Ok(StatementDesc::new(None))
5156}
5157
5158pub fn plan_create_cluster_replica(
5159 scx: &StatementContext,
5160 CreateClusterReplicaStatement {
5161 definition: ReplicaDefinition { name, options },
5162 of_cluster,
5163 }: CreateClusterReplicaStatement<Aug>,
5164) -> Result<Plan, PlanError> {
5165 let cluster = scx
5166 .catalog
5167 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5168
5169 let config = plan_replica_config(scx, options)?;
5170
5171 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5172 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5173 return Err(PlanError::MangedReplicaName(name.into_string()));
5174 }
5175 } else {
5176 ensure_cluster_is_not_managed(scx, cluster.id())?;
5177 }
5178
5179 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5180 name: normalize::ident(name),
5181 cluster_id: cluster.id(),
5182 config,
5183 }))
5184}
5185
5186pub fn describe_create_secret(
5187 _: &StatementContext,
5188 _: CreateSecretStatement<Aug>,
5189) -> Result<StatementDesc, PlanError> {
5190 Ok(StatementDesc::new(None))
5191}
5192
5193pub fn plan_create_secret(
5194 scx: &StatementContext,
5195 stmt: CreateSecretStatement<Aug>,
5196) -> Result<Plan, PlanError> {
5197 let CreateSecretStatement {
5198 name,
5199 if_not_exists,
5200 value,
5201 } = &stmt;
5202
5203 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5204 let mut create_sql_statement = stmt.clone();
5205 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5206 let create_sql =
5207 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5208 let secret_as = query::plan_secret_as(scx, value.clone())?;
5209
5210 let secret = Secret {
5211 create_sql,
5212 secret_as,
5213 };
5214
5215 Ok(Plan::CreateSecret(CreateSecretPlan {
5216 name,
5217 secret,
5218 if_not_exists: *if_not_exists,
5219 }))
5220}
5221
5222pub fn describe_create_connection(
5223 _: &StatementContext,
5224 _: CreateConnectionStatement<Aug>,
5225) -> Result<StatementDesc, PlanError> {
5226 Ok(StatementDesc::new(None))
5227}
5228
5229generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5230
5231pub fn plan_create_connection(
5232 scx: &StatementContext,
5233 mut stmt: CreateConnectionStatement<Aug>,
5234) -> Result<Plan, PlanError> {
5235 let CreateConnectionStatement {
5236 name,
5237 connection_type,
5238 values,
5239 if_not_exists,
5240 with_options,
5241 } = stmt.clone();
5242 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5243 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5244 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5245
5246 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5247 if options.validate.is_some() {
5248 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5249 }
5250 let validate = match options.validate {
5251 Some(val) => val,
5252 None => {
5253 scx.catalog
5254 .system_vars()
5255 .enable_default_connection_validation()
5256 && details.to_connection().validate_by_default()
5257 }
5258 };
5259
5260 let full_name = scx.catalog.resolve_full_name(&name);
5262 let partial_name = PartialItemName::from(full_name.clone());
5263 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5264 return Err(PlanError::ItemAlreadyExists {
5265 name: full_name.to_string(),
5266 item_type: item.item_type(),
5267 });
5268 }
5269
5270 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5273 stmt.values.retain(|v| {
5274 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5275 });
5276 stmt.values.push(ConnectionOption {
5277 name: ConnectionOptionName::PublicKey1,
5278 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5279 });
5280 stmt.values.push(ConnectionOption {
5281 name: ConnectionOptionName::PublicKey2,
5282 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5283 });
5284 }
5285 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5286
5287 let plan = CreateConnectionPlan {
5288 name,
5289 if_not_exists,
5290 connection: crate::plan::Connection {
5291 create_sql,
5292 details,
5293 },
5294 validate,
5295 };
5296 Ok(Plan::CreateConnection(plan))
5297}
5298
5299fn plan_drop_database(
5300 scx: &StatementContext,
5301 if_exists: bool,
5302 name: &UnresolvedDatabaseName,
5303 cascade: bool,
5304) -> Result<Option<DatabaseId>, PlanError> {
5305 Ok(match resolve_database(scx, name, if_exists)? {
5306 Some(database) => {
5307 if !cascade && database.has_schemas() {
5308 sql_bail!(
5309 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5310 name,
5311 );
5312 }
5313 Some(database.id())
5314 }
5315 None => None,
5316 })
5317}
5318
5319pub fn describe_drop_objects(
5320 _: &StatementContext,
5321 _: DropObjectsStatement,
5322) -> Result<StatementDesc, PlanError> {
5323 Ok(StatementDesc::new(None))
5324}
5325
5326pub fn plan_drop_objects(
5327 scx: &mut StatementContext,
5328 DropObjectsStatement {
5329 object_type,
5330 if_exists,
5331 names,
5332 cascade,
5333 }: DropObjectsStatement,
5334) -> Result<Plan, PlanError> {
5335 if object_type == mz_sql_parser::ast::ObjectType::Func {
5336 bail_unsupported!("DROP FUNCTION");
5337 }
5338 let object_type = object_type.into();
5339
5340 let mut referenced_ids = Vec::new();
5341 for name in names {
5342 let id = match &name {
5343 UnresolvedObjectName::Cluster(name) => {
5344 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5345 }
5346 UnresolvedObjectName::ClusterReplica(name) => {
5347 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5348 }
5349 UnresolvedObjectName::Database(name) => {
5350 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5351 }
5352 UnresolvedObjectName::Schema(name) => {
5353 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5354 }
5355 UnresolvedObjectName::Role(name) => {
5356 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5357 }
5358 UnresolvedObjectName::Item(name) => {
5359 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5360 .map(ObjectId::Item)
5361 }
5362 UnresolvedObjectName::NetworkPolicy(name) => {
5363 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5364 }
5365 };
5366 match id {
5367 Some(id) => referenced_ids.push(id),
5368 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5369 name: name.to_ast_string_simple(),
5370 object_type,
5371 }),
5372 }
5373 }
5374 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5375
5376 Ok(Plan::DropObjects(DropObjectsPlan {
5377 referenced_ids,
5378 drop_ids,
5379 object_type,
5380 }))
5381}
5382
5383fn plan_drop_schema(
5384 scx: &StatementContext,
5385 if_exists: bool,
5386 name: &UnresolvedSchemaName,
5387 cascade: bool,
5388) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5389 let normalized = normalize::unresolved_schema_name(name.clone())?;
5393 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
5394 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5395 }
5396
5397 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5398 Some((database_spec, schema_spec)) => {
5399 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5400 sql_bail!(
5401 "cannot drop schema {name} because it is required by the database system",
5402 );
5403 }
5404 if let SchemaSpecifier::Temporary = schema_spec {
5405 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5406 }
5407 let schema = scx.get_schema(&database_spec, &schema_spec);
5408 if !cascade && schema.has_items() {
5409 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5410 sql_bail!(
5411 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5412 full_schema_name
5413 );
5414 }
5415 Some((database_spec, schema_spec))
5416 }
5417 None => None,
5418 })
5419}
5420
5421fn plan_drop_role(
5422 scx: &StatementContext,
5423 if_exists: bool,
5424 name: &Ident,
5425) -> Result<Option<RoleId>, PlanError> {
5426 match scx.catalog.resolve_role(name.as_str()) {
5427 Ok(role) => {
5428 let id = role.id();
5429 if &id == scx.catalog.active_role_id() {
5430 sql_bail!("current role cannot be dropped");
5431 }
5432 for role in scx.catalog.get_roles() {
5433 for (member_id, grantor_id) in role.membership() {
5434 if &id == grantor_id {
5435 let member_role = scx.catalog.get_role(member_id);
5436 sql_bail!(
5437 "cannot drop role {}: still depended up by membership of role {} in role {}",
5438 name.as_str(),
5439 role.name(),
5440 member_role.name()
5441 );
5442 }
5443 }
5444 }
5445 Ok(Some(role.id()))
5446 }
5447 Err(_) if if_exists => Ok(None),
5448 Err(e) => Err(e.into()),
5449 }
5450}
5451
5452fn plan_drop_cluster(
5453 scx: &StatementContext,
5454 if_exists: bool,
5455 name: &Ident,
5456 cascade: bool,
5457) -> Result<Option<ClusterId>, PlanError> {
5458 Ok(match resolve_cluster(scx, name, if_exists)? {
5459 Some(cluster) => {
5460 if !cascade && !cluster.bound_objects().is_empty() {
5461 return Err(PlanError::DependentObjectsStillExist {
5462 object_type: "cluster".to_string(),
5463 object_name: cluster.name().to_string(),
5464 dependents: Vec::new(),
5465 });
5466 }
5467 Some(cluster.id())
5468 }
5469 None => None,
5470 })
5471}
5472
5473fn plan_drop_network_policy(
5474 scx: &StatementContext,
5475 if_exists: bool,
5476 name: &Ident,
5477) -> Result<Option<NetworkPolicyId>, PlanError> {
5478 match scx.catalog.resolve_network_policy(name.as_str()) {
5479 Ok(policy) => {
5480 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5483 Err(PlanError::NetworkPolicyInUse)
5484 } else {
5485 Ok(Some(policy.id()))
5486 }
5487 }
5488 Err(_) if if_exists => Ok(None),
5489 Err(e) => Err(e.into()),
5490 }
5491}
5492
5493fn plan_drop_cluster_replica(
5494 scx: &StatementContext,
5495 if_exists: bool,
5496 name: &QualifiedReplica,
5497) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5498 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5499 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5500}
5501
5502fn plan_drop_item(
5504 scx: &StatementContext,
5505 object_type: ObjectType,
5506 if_exists: bool,
5507 name: UnresolvedItemName,
5508 cascade: bool,
5509) -> Result<Option<CatalogItemId>, PlanError> {
5510 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5511 Ok(r) => r,
5512 Err(PlanError::MismatchedObjectType {
5514 name,
5515 is_type: ObjectType::MaterializedView,
5516 expected_type: ObjectType::View,
5517 }) => {
5518 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5519 }
5520 e => e?,
5521 };
5522
5523 Ok(match resolved {
5524 Some(catalog_item) => {
5525 if catalog_item.id().is_system() {
5526 sql_bail!(
5527 "cannot drop {} {} because it is required by the database system",
5528 catalog_item.item_type(),
5529 scx.catalog.minimal_qualification(catalog_item.name()),
5530 );
5531 }
5532
5533 if !cascade {
5534 for id in catalog_item.used_by() {
5535 let dep = scx.catalog.get_item(id);
5536 if dependency_prevents_drop(object_type, dep) {
5537 return Err(PlanError::DependentObjectsStillExist {
5538 object_type: catalog_item.item_type().to_string(),
5539 object_name: scx
5540 .catalog
5541 .minimal_qualification(catalog_item.name())
5542 .to_string(),
5543 dependents: vec![(
5544 dep.item_type().to_string(),
5545 scx.catalog.minimal_qualification(dep.name()).to_string(),
5546 )],
5547 });
5548 }
5549 }
5550 }
5553 Some(catalog_item.id())
5554 }
5555 None => None,
5556 })
5557}
5558
5559fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5561 match object_type {
5562 ObjectType::Type => true,
5563 ObjectType::Table
5564 | ObjectType::View
5565 | ObjectType::MaterializedView
5566 | ObjectType::Source
5567 | ObjectType::Sink
5568 | ObjectType::Index
5569 | ObjectType::Role
5570 | ObjectType::Cluster
5571 | ObjectType::ClusterReplica
5572 | ObjectType::Secret
5573 | ObjectType::Connection
5574 | ObjectType::Database
5575 | ObjectType::Schema
5576 | ObjectType::Func
5577 | ObjectType::NetworkPolicy => match dep.item_type() {
5578 CatalogItemType::Func
5579 | CatalogItemType::Table
5580 | CatalogItemType::Source
5581 | CatalogItemType::View
5582 | CatalogItemType::MaterializedView
5583 | CatalogItemType::Sink
5584 | CatalogItemType::Type
5585 | CatalogItemType::Secret
5586 | CatalogItemType::Connection => true,
5587 CatalogItemType::Index => false,
5588 },
5589 }
5590}
5591
5592pub fn describe_alter_index_options(
5593 _: &StatementContext,
5594 _: AlterIndexStatement<Aug>,
5595) -> Result<StatementDesc, PlanError> {
5596 Ok(StatementDesc::new(None))
5597}
5598
5599pub fn describe_drop_owned(
5600 _: &StatementContext,
5601 _: DropOwnedStatement<Aug>,
5602) -> Result<StatementDesc, PlanError> {
5603 Ok(StatementDesc::new(None))
5604}
5605
5606pub fn plan_drop_owned(
5607 scx: &StatementContext,
5608 drop: DropOwnedStatement<Aug>,
5609) -> Result<Plan, PlanError> {
5610 let cascade = drop.cascade();
5611 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5612 let mut drop_ids = Vec::new();
5613 let mut privilege_revokes = Vec::new();
5614 let mut default_privilege_revokes = Vec::new();
5615
5616 fn update_privilege_revokes(
5617 object_id: SystemObjectId,
5618 privileges: &PrivilegeMap,
5619 role_ids: &BTreeSet<RoleId>,
5620 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5621 ) {
5622 privilege_revokes.extend(iter::zip(
5623 iter::repeat(object_id),
5624 privileges
5625 .all_values()
5626 .filter(|privilege| role_ids.contains(&privilege.grantee))
5627 .cloned(),
5628 ));
5629 }
5630
5631 for replica in scx.catalog.get_cluster_replicas() {
5633 if role_ids.contains(&replica.owner_id()) {
5634 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5635 }
5636 }
5637
5638 for cluster in scx.catalog.get_clusters() {
5640 if role_ids.contains(&cluster.owner_id()) {
5641 if !cascade {
5643 let non_owned_bound_objects: Vec<_> = cluster
5644 .bound_objects()
5645 .into_iter()
5646 .map(|item_id| scx.catalog.get_item(item_id))
5647 .filter(|item| !role_ids.contains(&item.owner_id()))
5648 .collect();
5649 if !non_owned_bound_objects.is_empty() {
5650 let names: Vec<_> = non_owned_bound_objects
5651 .into_iter()
5652 .map(|item| {
5653 (
5654 item.item_type().to_string(),
5655 scx.catalog.resolve_full_name(item.name()).to_string(),
5656 )
5657 })
5658 .collect();
5659 return Err(PlanError::DependentObjectsStillExist {
5660 object_type: "cluster".to_string(),
5661 object_name: cluster.name().to_string(),
5662 dependents: names,
5663 });
5664 }
5665 }
5666 drop_ids.push(cluster.id().into());
5667 }
5668 update_privilege_revokes(
5669 SystemObjectId::Object(cluster.id().into()),
5670 cluster.privileges(),
5671 &role_ids,
5672 &mut privilege_revokes,
5673 );
5674 }
5675
5676 for item in scx.catalog.get_items() {
5678 if role_ids.contains(&item.owner_id()) {
5679 if !cascade {
5680 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5682 let non_owned_dependencies: Vec<_> = used_by
5683 .into_iter()
5684 .map(|item_id| scx.catalog.get_item(item_id))
5685 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5686 .filter(|item| !role_ids.contains(&item.owner_id()))
5687 .collect();
5688 if !non_owned_dependencies.is_empty() {
5689 let names: Vec<_> = non_owned_dependencies
5690 .into_iter()
5691 .map(|item| {
5692 let item_typ = item.item_type().to_string();
5693 let item_name =
5694 scx.catalog.resolve_full_name(item.name()).to_string();
5695 (item_typ, item_name)
5696 })
5697 .collect();
5698 Err(PlanError::DependentObjectsStillExist {
5699 object_type: item.item_type().to_string(),
5700 object_name: scx
5701 .catalog
5702 .resolve_full_name(item.name())
5703 .to_string()
5704 .to_string(),
5705 dependents: names,
5706 })
5707 } else {
5708 Ok(())
5709 }
5710 };
5711
5712 if let Some(id) = item.progress_id() {
5715 let progress_item = scx.catalog.get_item(&id);
5716 check_if_dependents_exist(progress_item.used_by())?;
5717 }
5718 check_if_dependents_exist(item.used_by())?;
5719 }
5720 drop_ids.push(item.id().into());
5721 }
5722 update_privilege_revokes(
5723 SystemObjectId::Object(item.id().into()),
5724 item.privileges(),
5725 &role_ids,
5726 &mut privilege_revokes,
5727 );
5728 }
5729
5730 for schema in scx.catalog.get_schemas() {
5732 if !schema.id().is_temporary() {
5733 if role_ids.contains(&schema.owner_id()) {
5734 if !cascade {
5735 let non_owned_dependencies: Vec<_> = schema
5736 .item_ids()
5737 .map(|item_id| scx.catalog.get_item(&item_id))
5738 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5739 .filter(|item| !role_ids.contains(&item.owner_id()))
5740 .collect();
5741 if !non_owned_dependencies.is_empty() {
5742 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5743 sql_bail!(
5744 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5745 full_schema_name.to_string().quoted()
5746 );
5747 }
5748 }
5749 drop_ids.push((*schema.database(), *schema.id()).into())
5750 }
5751 update_privilege_revokes(
5752 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5753 schema.privileges(),
5754 &role_ids,
5755 &mut privilege_revokes,
5756 );
5757 }
5758 }
5759
5760 for database in scx.catalog.get_databases() {
5762 if role_ids.contains(&database.owner_id()) {
5763 if !cascade {
5764 let non_owned_schemas: Vec<_> = database
5765 .schemas()
5766 .into_iter()
5767 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5768 .collect();
5769 if !non_owned_schemas.is_empty() {
5770 sql_bail!(
5771 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5772 database.name().quoted(),
5773 );
5774 }
5775 }
5776 drop_ids.push(database.id().into());
5777 }
5778 update_privilege_revokes(
5779 SystemObjectId::Object(database.id().into()),
5780 database.privileges(),
5781 &role_ids,
5782 &mut privilege_revokes,
5783 );
5784 }
5785
5786 for network_policy in scx.catalog.get_network_policies() {
5788 if role_ids.contains(&network_policy.owner_id()) {
5789 drop_ids.push(ObjectId::NetworkPolicy(network_policy.id()));
5790 }
5791 update_privilege_revokes(
5792 SystemObjectId::Object(ObjectId::NetworkPolicy(network_policy.id())),
5793 network_policy.privileges(),
5794 &role_ids,
5795 &mut privilege_revokes,
5796 );
5797 }
5798
5799 update_privilege_revokes(
5801 SystemObjectId::System,
5802 scx.catalog.get_system_privileges(),
5803 &role_ids,
5804 &mut privilege_revokes,
5805 );
5806
5807 for (default_privilege_object, default_privilege_acl_items) in
5808 scx.catalog.get_default_privileges()
5809 {
5810 for default_privilege_acl_item in default_privilege_acl_items {
5811 if role_ids.contains(&default_privilege_object.role_id)
5812 || role_ids.contains(&default_privilege_acl_item.grantee)
5813 {
5814 default_privilege_revokes.push((
5815 default_privilege_object.clone(),
5816 default_privilege_acl_item.clone(),
5817 ));
5818 }
5819 }
5820 }
5821
5822 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5823
5824 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5825 if !system_ids.is_empty() {
5826 let mut owners = system_ids
5827 .into_iter()
5828 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5829 .collect::<BTreeSet<_>>()
5830 .into_iter()
5831 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5832 sql_bail!(
5833 "cannot drop objects owned by role {} because they are required by the database system",
5834 owners.join(", "),
5835 );
5836 }
5837
5838 Ok(Plan::DropOwned(DropOwnedPlan {
5839 role_ids: role_ids.into_iter().collect(),
5840 drop_ids,
5841 privilege_revokes,
5842 default_privilege_revokes,
5843 }))
5844}
5845
5846fn plan_retain_history_option(
5847 scx: &StatementContext,
5848 retain_history: Option<OptionalDuration>,
5849) -> Result<Option<CompactionWindow>, PlanError> {
5850 if let Some(OptionalDuration(lcw)) = retain_history {
5851 Ok(Some(plan_retain_history(scx, lcw)?))
5852 } else {
5853 Ok(None)
5854 }
5855}
5856
5857fn plan_retain_history(
5863 scx: &StatementContext,
5864 lcw: Option<Duration>,
5865) -> Result<CompactionWindow, PlanError> {
5866 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5867 match lcw {
5868 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5873 option_name: "RETAIN HISTORY".to_string(),
5874 err: Box::new(PlanError::Unstructured(
5875 "internal error: unexpectedly zero".to_string(),
5876 )),
5877 }),
5878 Some(duration) => {
5879 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5882 && scx
5883 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5884 .is_err()
5885 {
5886 return Err(PlanError::RetainHistoryLow {
5887 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5888 });
5889 }
5890 Ok(duration.try_into()?)
5891 }
5892 None => {
5895 if scx
5896 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5897 .is_err()
5898 {
5899 Err(PlanError::RetainHistoryRequired)
5900 } else {
5901 Ok(CompactionWindow::DisableCompaction)
5902 }
5903 }
5904 }
5905}
5906
5907generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5908
5909fn plan_index_options(
5910 scx: &StatementContext,
5911 with_opts: Vec<IndexOption<Aug>>,
5912) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5913 if !with_opts.is_empty() {
5914 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5916 }
5917
5918 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5919
5920 let mut out = Vec::with_capacity(1);
5921 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5922 out.push(crate::plan::IndexOption::RetainHistory(cw));
5923 }
5924 Ok(out)
5925}
5926
5927generate_extracted_config!(
5928 TableOption,
5929 (PartitionBy, Vec<Ident>),
5930 (RetainHistory, OptionalDuration),
5931 (RedactedTest, String)
5932);
5933
5934fn plan_table_options(
5935 scx: &StatementContext,
5936 desc: &RelationDesc,
5937 with_opts: Vec<TableOption<Aug>>,
5938) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5939 let TableOptionExtracted {
5940 partition_by,
5941 retain_history,
5942 redacted_test,
5943 ..
5944 }: TableOptionExtracted = with_opts.try_into()?;
5945
5946 if let Some(partition_by) = partition_by {
5947 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5948 check_partition_by(desc, partition_by)?;
5949 }
5950
5951 if redacted_test.is_some() {
5952 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5953 }
5954
5955 let mut out = Vec::with_capacity(1);
5956 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5957 out.push(crate::plan::TableOption::RetainHistory(cw));
5958 }
5959 Ok(out)
5960}
5961
5962pub fn plan_alter_index_options(
5963 scx: &mut StatementContext,
5964 AlterIndexStatement {
5965 index_name,
5966 if_exists,
5967 action,
5968 }: AlterIndexStatement<Aug>,
5969) -> Result<Plan, PlanError> {
5970 let object_type = ObjectType::Index;
5971 match action {
5972 AlterIndexAction::ResetOptions(options) => {
5973 let mut options = options.into_iter();
5974 if let Some(opt) = options.next() {
5975 match opt {
5976 IndexOptionName::RetainHistory => {
5977 if options.next().is_some() {
5978 sql_bail!("RETAIN HISTORY must be only option");
5979 }
5980 return alter_retain_history(
5981 scx,
5982 object_type,
5983 if_exists,
5984 UnresolvedObjectName::Item(index_name),
5985 None,
5986 );
5987 }
5988 }
5989 }
5990 sql_bail!("expected option");
5991 }
5992 AlterIndexAction::SetOptions(options) => {
5993 let mut options = options.into_iter();
5994 if let Some(opt) = options.next() {
5995 match opt.name {
5996 IndexOptionName::RetainHistory => {
5997 if options.next().is_some() {
5998 sql_bail!("RETAIN HISTORY must be only option");
5999 }
6000 return alter_retain_history(
6001 scx,
6002 object_type,
6003 if_exists,
6004 UnresolvedObjectName::Item(index_name),
6005 opt.value,
6006 );
6007 }
6008 }
6009 }
6010 sql_bail!("expected option");
6011 }
6012 }
6013}
6014
6015pub fn describe_alter_cluster_set_options(
6016 _: &StatementContext,
6017 _: AlterClusterStatement<Aug>,
6018) -> Result<StatementDesc, PlanError> {
6019 Ok(StatementDesc::new(None))
6020}
6021
6022pub fn plan_alter_cluster(
6023 scx: &mut StatementContext,
6024 AlterClusterStatement {
6025 name,
6026 action,
6027 if_exists,
6028 }: AlterClusterStatement<Aug>,
6029) -> Result<Plan, PlanError> {
6030 let cluster = match resolve_cluster(scx, &name, if_exists)? {
6031 Some(entry) => entry,
6032 None => {
6033 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6034 name: name.to_ast_string_simple(),
6035 object_type: ObjectType::Cluster,
6036 });
6037
6038 return Ok(Plan::AlterNoop(AlterNoopPlan {
6039 object_type: ObjectType::Cluster,
6040 }));
6041 }
6042 };
6043
6044 let mut options: PlanClusterOption = Default::default();
6045 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
6046
6047 match action {
6048 AlterClusterAction::SetOptions {
6049 options: set_options,
6050 with_options,
6051 } => {
6052 let ClusterOptionExtracted {
6053 availability_zones,
6054 introspection_debugging,
6055 introspection_interval,
6056 managed,
6057 replicas: replica_defs,
6058 replication_factor,
6059 seen: _,
6060 size,
6061 disk,
6062 schedule,
6063 workload_class,
6064 }: ClusterOptionExtracted = set_options.try_into()?;
6065
6066 if !scx.catalog.active_role_id().is_system() {
6067 if workload_class.is_some() {
6068 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6069 }
6070 }
6071
6072 match managed.unwrap_or_else(|| cluster.is_managed()) {
6073 true => {
6074 let alter_strategy_extracted =
6075 ClusterAlterOptionExtracted::try_from(with_options)?;
6076 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6077
6078 match alter_strategy {
6079 AlterClusterPlanStrategy::None => {}
6080 _ => {
6081 scx.require_feature_flag(
6082 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6083 )?;
6084 }
6085 }
6086
6087 if replica_defs.is_some() {
6088 sql_bail!("REPLICAS not supported for managed clusters");
6089 }
6090 if schedule.is_some()
6091 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6092 {
6093 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6094 }
6095
6096 if replication_factor.is_some() {
6097 if schedule.is_some()
6098 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6099 {
6100 sql_bail!(
6101 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6102 );
6103 }
6104 if let Some(current_schedule) = cluster.schedule() {
6105 if !matches!(current_schedule, ClusterSchedule::Manual) {
6106 sql_bail!(
6107 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6108 );
6109 }
6110 }
6111 }
6112 }
6113 false => {
6114 if !alter_strategy.is_none() {
6115 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6116 }
6117 if availability_zones.is_some() {
6118 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6119 }
6120 if replication_factor.is_some() {
6121 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6122 }
6123 if introspection_debugging.is_some() {
6124 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6125 }
6126 if introspection_interval.is_some() {
6127 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6128 }
6129 if size.is_some() {
6130 sql_bail!("SIZE not supported for unmanaged clusters");
6131 }
6132 if disk.is_some() {
6133 sql_bail!("DISK not supported for unmanaged clusters");
6134 }
6135 if schedule.is_some()
6136 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6137 {
6138 sql_bail!(
6139 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6140 );
6141 }
6142 if let Some(current_schedule) = cluster.schedule() {
6143 if !matches!(current_schedule, ClusterSchedule::Manual)
6144 && schedule.is_none()
6145 {
6146 sql_bail!(
6147 "when switching a cluster to unmanaged, if the managed \
6148 cluster's SCHEDULE is anything other than MANUAL, you have to \
6149 explicitly set the SCHEDULE to MANUAL"
6150 );
6151 }
6152 }
6153 }
6154 }
6155
6156 let mut replicas = vec![];
6157 for ReplicaDefinition { name, options } in
6158 replica_defs.into_iter().flat_map(Vec::into_iter)
6159 {
6160 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6161 }
6162
6163 if let Some(managed) = managed {
6164 options.managed = AlterOptionParameter::Set(managed);
6165 }
6166 if let Some(replication_factor) = replication_factor {
6167 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6168 }
6169 if let Some(size) = &size {
6170 options.size = AlterOptionParameter::Set(size.clone());
6171 }
6172 if let Some(availability_zones) = availability_zones {
6173 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6174 }
6175 if let Some(introspection_debugging) = introspection_debugging {
6176 options.introspection_debugging =
6177 AlterOptionParameter::Set(introspection_debugging);
6178 }
6179 if let Some(introspection_interval) = introspection_interval {
6180 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6181 }
6182 if disk.is_some() {
6183 let size = match size.as_deref() {
6187 Some(s) => s,
6188 None => cluster
6189 .managed_size()
6190 .ok_or_else(|| sql_err!("cluster is not managed"))?,
6191 };
6192 if scx.catalog.is_cluster_size_cc(size) {
6193 sql_bail!(
6194 "DISK option not supported for modern cluster sizes because disk is always enabled"
6195 );
6196 }
6197
6198 scx.catalog
6199 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6200 }
6201 if !replicas.is_empty() {
6202 options.replicas = AlterOptionParameter::Set(replicas);
6203 }
6204 if let Some(schedule) = schedule {
6205 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6206 }
6207 if let Some(workload_class) = workload_class {
6208 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6209 }
6210 }
6211 AlterClusterAction::ResetOptions(reset_options) => {
6212 use AlterOptionParameter::Reset;
6213 use ClusterOptionName::*;
6214
6215 if !scx.catalog.active_role_id().is_system() {
6216 if reset_options.contains(&WorkloadClass) {
6217 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6218 }
6219 }
6220
6221 for option in reset_options {
6222 match option {
6223 AvailabilityZones => options.availability_zones = Reset,
6224 Disk => scx
6225 .catalog
6226 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6227 IntrospectionInterval => options.introspection_interval = Reset,
6228 IntrospectionDebugging => options.introspection_debugging = Reset,
6229 Managed => options.managed = Reset,
6230 Replicas => options.replicas = Reset,
6231 ReplicationFactor => options.replication_factor = Reset,
6232 Size => options.size = Reset,
6233 Schedule => options.schedule = Reset,
6234 WorkloadClass => options.workload_class = Reset,
6235 }
6236 }
6237 }
6238 }
6239 Ok(Plan::AlterCluster(AlterClusterPlan {
6240 id: cluster.id(),
6241 name: cluster.name().to_string(),
6242 options,
6243 strategy: alter_strategy,
6244 }))
6245}
6246
6247pub fn describe_alter_set_cluster(
6248 _: &StatementContext,
6249 _: AlterSetClusterStatement<Aug>,
6250) -> Result<StatementDesc, PlanError> {
6251 Ok(StatementDesc::new(None))
6252}
6253
6254pub fn plan_alter_item_set_cluster(
6255 scx: &StatementContext,
6256 AlterSetClusterStatement {
6257 if_exists,
6258 set_cluster: in_cluster_name,
6259 name,
6260 object_type,
6261 }: AlterSetClusterStatement<Aug>,
6262) -> Result<Plan, PlanError> {
6263 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6264
6265 let object_type = object_type.into();
6266
6267 match object_type {
6269 ObjectType::MaterializedView => {}
6270 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6271 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6272 }
6273 ObjectType::Table
6274 | ObjectType::View
6275 | ObjectType::Type
6276 | ObjectType::Role
6277 | ObjectType::Cluster
6278 | ObjectType::ClusterReplica
6279 | ObjectType::Secret
6280 | ObjectType::Connection
6281 | ObjectType::Database
6282 | ObjectType::Schema
6283 | ObjectType::Func
6284 | ObjectType::NetworkPolicy => {
6285 bail_never_supported!(
6286 format!("ALTER {object_type} SET CLUSTER"),
6287 "sql/alter-set-cluster/",
6288 format!("{object_type} has no associated cluster")
6289 )
6290 }
6291 }
6292
6293 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6294
6295 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6296 Some(entry) => {
6297 let current_cluster = entry.cluster_id();
6298 let Some(current_cluster) = current_cluster else {
6299 sql_bail!("No cluster associated with {name}");
6300 };
6301
6302 if current_cluster == in_cluster.id() {
6303 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6304 } else {
6305 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6306 id: entry.id(),
6307 set_cluster: in_cluster.id(),
6308 }))
6309 }
6310 }
6311 None => {
6312 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6313 name: name.to_ast_string_simple(),
6314 object_type,
6315 });
6316
6317 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6318 }
6319 }
6320}
6321
6322pub fn describe_alter_object_rename(
6323 _: &StatementContext,
6324 _: AlterObjectRenameStatement,
6325) -> Result<StatementDesc, PlanError> {
6326 Ok(StatementDesc::new(None))
6327}
6328
6329pub fn plan_alter_object_rename(
6330 scx: &mut StatementContext,
6331 AlterObjectRenameStatement {
6332 name,
6333 object_type,
6334 to_item_name,
6335 if_exists,
6336 }: AlterObjectRenameStatement,
6337) -> Result<Plan, PlanError> {
6338 let object_type = object_type.into();
6339 match (object_type, name) {
6340 (
6341 ObjectType::View
6342 | ObjectType::MaterializedView
6343 | ObjectType::Table
6344 | ObjectType::Source
6345 | ObjectType::Index
6346 | ObjectType::Sink
6347 | ObjectType::Secret
6348 | ObjectType::Connection,
6349 UnresolvedObjectName::Item(name),
6350 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6351 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6352 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6353 }
6354 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6355 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6356 }
6357 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6358 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6359 }
6360 (object_type, name) => {
6361 bail_internal!("invalid object type '{object_type}' for ALTER RENAME with name {name}")
6364 }
6365 }
6366}
6367
6368pub fn plan_alter_schema_rename(
6369 scx: &mut StatementContext,
6370 name: UnresolvedSchemaName,
6371 to_schema_name: Ident,
6372 if_exists: bool,
6373) -> Result<Plan, PlanError> {
6374 let normalized = normalize::unresolved_schema_name(name.clone())?;
6378 if normalized.database.is_none() && normalized.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6379 sql_bail!(
6380 "cannot rename schemas in the ambient database: {:?}",
6381 mz_repr::namespaces::MZ_TEMP_SCHEMA
6382 );
6383 }
6384
6385 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6386 let object_type = ObjectType::Schema;
6387 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6388 name: name.to_ast_string_simple(),
6389 object_type,
6390 });
6391 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6392 };
6393
6394 if scx
6396 .resolve_schema_in_database(&db_spec, &to_schema_name)
6397 .is_ok()
6398 {
6399 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6400 to_schema_name.clone().into_string(),
6401 )));
6402 }
6403
6404 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6406 if schema.id().is_system() {
6407 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6408 }
6409
6410 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6411 cur_schema_spec: (db_spec, schema_spec),
6412 new_schema_name: to_schema_name.into_string(),
6413 }))
6414}
6415
6416pub fn plan_alter_schema_swap<F>(
6417 scx: &mut StatementContext,
6418 name_a: UnresolvedSchemaName,
6419 name_b: Ident,
6420 if_exists: bool,
6421 gen_temp_suffix: F,
6422) -> Result<Plan, PlanError>
6423where
6424 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6425{
6426 let normalized_a = normalize::unresolved_schema_name(name_a.clone())?;
6430 if normalized_a.database.is_none() && normalized_a.schema == mz_repr::namespaces::MZ_TEMP_SCHEMA
6431 {
6432 sql_bail!("cannot swap schemas that are in the ambient database");
6433 }
6434 let name_b_str = normalize::ident_ref(&name_b);
6436 if name_b_str == mz_repr::namespaces::MZ_TEMP_SCHEMA {
6437 sql_bail!("cannot swap schemas that are in the ambient database");
6438 }
6439
6440 let schema_a = match scx.resolve_schema(name_a.clone()) {
6441 Ok(schema) => schema,
6442 Err(_) if if_exists => {
6443 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6444 name: name_a.to_ast_string_simple(),
6445 object_type: ObjectType::Schema,
6446 });
6447 return Ok(Plan::AlterNoop(AlterNoopPlan {
6448 object_type: ObjectType::Schema,
6449 }));
6450 }
6451 Err(e) => return Err(e),
6452 };
6453
6454 let db_spec = schema_a.database().clone();
6455 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6456 sql_bail!("cannot swap schemas that are in the ambient database");
6457 };
6458 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6459
6460 if schema_a.id().is_system() || schema_b.id().is_system() {
6462 bail_never_supported!("swapping a system schema".to_string())
6463 }
6464
6465 const SCHEMA_SWAP_PREFIX: &str = "mz_schema_swap_";
6469 let check = |temp_suffix: &str| {
6470 let mut temp_name = ident!(SCHEMA_SWAP_PREFIX);
6471 temp_name.append_lossy(temp_suffix);
6472 scx.resolve_schema_in_database(&db_spec, &temp_name)
6473 .is_err()
6474 };
6475 let temp_suffix = gen_temp_suffix(&check)?;
6476 let name_temp = format!("{SCHEMA_SWAP_PREFIX}{temp_suffix}");
6477
6478 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6479 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6480 schema_a_name: schema_a.name().schema.to_string(),
6481 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6482 schema_b_name: schema_b.name().schema.to_string(),
6483 name_temp,
6484 }))
6485}
6486
6487pub fn plan_alter_item_rename(
6488 scx: &mut StatementContext,
6489 object_type: ObjectType,
6490 name: UnresolvedItemName,
6491 to_item_name: Ident,
6492 if_exists: bool,
6493) -> Result<Plan, PlanError> {
6494 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6495 Ok(r) => r,
6496 Err(PlanError::MismatchedObjectType {
6498 name,
6499 is_type: ObjectType::MaterializedView,
6500 expected_type: ObjectType::View,
6501 }) => {
6502 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6503 }
6504 e => e?,
6505 };
6506
6507 match resolved {
6508 Some(entry) => {
6509 let full_name = scx.catalog.resolve_full_name(entry.name());
6510 let item_type = entry.item_type();
6511
6512 let proposed_name = QualifiedItemName {
6513 qualifiers: entry.name().qualifiers.clone(),
6514 item: to_item_name.clone().into_string(),
6515 };
6516
6517 let conflicting_type_exists;
6521 let conflicting_item_exists;
6522 if item_type == CatalogItemType::Type {
6523 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6524 conflicting_item_exists = scx
6525 .catalog
6526 .get_item_by_name(&proposed_name)
6527 .map(|item| item.item_type().conflicts_with_type())
6528 .unwrap_or(false);
6529 } else {
6530 conflicting_type_exists = item_type.conflicts_with_type()
6531 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6532 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6533 };
6534 if conflicting_type_exists || conflicting_item_exists {
6535 sql_bail!("catalog item '{}' already exists", to_item_name);
6536 }
6537
6538 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6539 id: entry.id(),
6540 current_full_name: full_name,
6541 to_name: normalize::ident(to_item_name),
6542 object_type,
6543 }))
6544 }
6545 None => {
6546 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6547 name: name.to_ast_string_simple(),
6548 object_type,
6549 });
6550
6551 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6552 }
6553 }
6554}
6555
6556pub fn plan_alter_cluster_rename(
6557 scx: &mut StatementContext,
6558 object_type: ObjectType,
6559 name: Ident,
6560 to_name: Ident,
6561 if_exists: bool,
6562) -> Result<Plan, PlanError> {
6563 match resolve_cluster(scx, &name, if_exists)? {
6564 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6565 id: entry.id(),
6566 name: entry.name().to_string(),
6567 to_name: ident(to_name),
6568 })),
6569 None => {
6570 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6571 name: name.to_ast_string_simple(),
6572 object_type,
6573 });
6574
6575 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6576 }
6577 }
6578}
6579
6580pub fn plan_alter_cluster_swap<F>(
6581 scx: &mut StatementContext,
6582 name_a: Ident,
6583 name_b: Ident,
6584 if_exists: bool,
6585 gen_temp_suffix: F,
6586) -> Result<Plan, PlanError>
6587where
6588 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6589{
6590 let cluster_a = match scx.resolve_cluster(Some(&name_a)) {
6591 Ok(cluster) => cluster,
6592 Err(_) if if_exists => {
6593 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6594 name: name_a.to_ast_string_simple(),
6595 object_type: ObjectType::Cluster,
6596 });
6597 return Ok(Plan::AlterNoop(AlterNoopPlan {
6598 object_type: ObjectType::Cluster,
6599 }));
6600 }
6601 Err(e) => return Err(e),
6602 };
6603 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6604
6605 const CLUSTER_SWAP_PREFIX: &str = "mz_cluster_swap_";
6606 let check = |temp_suffix: &str| {
6607 let mut temp_name = ident!(CLUSTER_SWAP_PREFIX);
6608 temp_name.append_lossy(temp_suffix);
6609 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6610 Err(CatalogError::UnknownCluster(_)) => true,
6612 Ok(_) | Err(_) => false,
6614 }
6615 };
6616 let temp_suffix = gen_temp_suffix(&check)?;
6617 let name_temp = format!("{CLUSTER_SWAP_PREFIX}{temp_suffix}");
6618
6619 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6620 id_a: cluster_a.id(),
6621 id_b: cluster_b.id(),
6622 name_a: name_a.into_string(),
6623 name_b: name_b.into_string(),
6624 name_temp,
6625 }))
6626}
6627
6628pub fn plan_alter_cluster_replica_rename(
6629 scx: &mut StatementContext,
6630 object_type: ObjectType,
6631 name: QualifiedReplica,
6632 to_item_name: Ident,
6633 if_exists: bool,
6634) -> Result<Plan, PlanError> {
6635 match resolve_cluster_replica(scx, &name, if_exists)? {
6636 Some((cluster, replica)) => {
6637 ensure_cluster_is_not_managed(scx, cluster.id())?;
6638 Ok(Plan::AlterClusterReplicaRename(
6639 AlterClusterReplicaRenamePlan {
6640 cluster_id: cluster.id(),
6641 replica_id: replica,
6642 name: QualifiedReplica {
6643 cluster: Ident::new(cluster.name())?,
6644 replica: name.replica,
6645 },
6646 to_name: normalize::ident(to_item_name),
6647 },
6648 ))
6649 }
6650 None => {
6651 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6652 name: name.to_ast_string_simple(),
6653 object_type,
6654 });
6655
6656 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6657 }
6658 }
6659}
6660
6661pub fn describe_alter_object_swap(
6662 _: &StatementContext,
6663 _: AlterObjectSwapStatement,
6664) -> Result<StatementDesc, PlanError> {
6665 Ok(StatementDesc::new(None))
6666}
6667
6668pub fn plan_alter_object_swap(
6669 scx: &mut StatementContext,
6670 stmt: AlterObjectSwapStatement,
6671) -> Result<Plan, PlanError> {
6672 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6673
6674 let AlterObjectSwapStatement {
6675 object_type,
6676 if_exists,
6677 name_a,
6678 name_b,
6679 } = stmt;
6680 let object_type = object_type.into();
6681
6682 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6684 let mut attempts = 0;
6685 let name_temp = loop {
6686 attempts += 1;
6687 if attempts > 10 {
6688 tracing::warn!("Unable to generate temp id for swapping");
6689 sql_bail!("unable to swap!");
6690 }
6691
6692 let short_id = mz_ore::id_gen::temp_id();
6694 if check_fn(&short_id) {
6695 break short_id;
6696 }
6697 };
6698
6699 Ok(name_temp)
6700 };
6701
6702 match (object_type, name_a, name_b) {
6703 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6704 plan_alter_schema_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6705 }
6706 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6707 plan_alter_cluster_swap(scx, name_a, name_b, if_exists, gen_temp_suffix)
6708 }
6709 (ObjectType::Schema | ObjectType::Cluster, _, _) => {
6710 bail_internal!("name type does not match object type for ALTER SWAP")
6711 }
6712 (
6713 ObjectType::Table
6714 | ObjectType::View
6715 | ObjectType::MaterializedView
6716 | ObjectType::Source
6717 | ObjectType::Sink
6718 | ObjectType::Index
6719 | ObjectType::Type
6720 | ObjectType::Role
6721 | ObjectType::ClusterReplica
6722 | ObjectType::Secret
6723 | ObjectType::Connection
6724 | ObjectType::Database
6725 | ObjectType::Func
6726 | ObjectType::NetworkPolicy,
6727 _,
6728 _,
6729 ) => Err(PlanError::Unsupported {
6730 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6731 discussion_no: None,
6732 }),
6733 }
6734}
6735
6736pub fn describe_alter_retain_history(
6737 _: &StatementContext,
6738 _: AlterRetainHistoryStatement<Aug>,
6739) -> Result<StatementDesc, PlanError> {
6740 Ok(StatementDesc::new(None))
6741}
6742
6743pub fn plan_alter_retain_history(
6744 scx: &StatementContext,
6745 AlterRetainHistoryStatement {
6746 object_type,
6747 if_exists,
6748 name,
6749 history,
6750 }: AlterRetainHistoryStatement<Aug>,
6751) -> Result<Plan, PlanError> {
6752 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6753}
6754
6755fn alter_retain_history(
6756 scx: &StatementContext,
6757 object_type: ObjectType,
6758 if_exists: bool,
6759 name: UnresolvedObjectName,
6760 history: Option<WithOptionValue<Aug>>,
6761) -> Result<Plan, PlanError> {
6762 let name = match (object_type, name) {
6763 (
6764 ObjectType::View
6766 | ObjectType::MaterializedView
6767 | ObjectType::Table
6768 | ObjectType::Source
6769 | ObjectType::Index,
6770 UnresolvedObjectName::Item(name),
6771 ) => name,
6772 (object_type, _) => {
6773 bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
6774 }
6775 };
6776 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6777 Some(entry) => {
6778 let full_name = scx.catalog.resolve_full_name(entry.name());
6779 let item_type = entry.item_type();
6780
6781 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6783 return Err(PlanError::AlterViewOnMaterializedView(
6784 full_name.to_string(),
6785 ));
6786 } else if object_type == ObjectType::View {
6787 sql_bail!("{object_type} does not support RETAIN HISTORY")
6788 } else if object_type != item_type {
6789 sql_bail!(
6790 "\"{}\" is a {} not a {}",
6791 full_name,
6792 entry.item_type(),
6793 format!("{object_type}").to_lowercase()
6794 )
6795 }
6796
6797 let (value, lcw) = match &history {
6799 Some(WithOptionValue::RetainHistoryFor(value)) => {
6800 let window = OptionalDuration::try_from_value(value.clone())?;
6801 (Some(value.clone()), window.0)
6802 }
6803 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6805 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6806 };
6807 let window = plan_retain_history(scx, lcw)?;
6808
6809 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6810 id: entry.id(),
6811 value,
6812 window,
6813 object_type,
6814 }))
6815 }
6816 None => {
6817 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6818 name: name.to_ast_string_simple(),
6819 object_type,
6820 });
6821
6822 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6823 }
6824 }
6825}
6826
6827fn alter_source_timestamp_interval(
6828 scx: &StatementContext,
6829 if_exists: bool,
6830 source_name: UnresolvedItemName,
6831 value: Option<WithOptionValue<Aug>>,
6832) -> Result<Plan, PlanError> {
6833 let object_type = ObjectType::Source;
6834 match resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)? {
6835 Some(entry) => {
6836 let full_name = scx.catalog.resolve_full_name(entry.name());
6837 if entry.item_type() != CatalogItemType::Source {
6838 sql_bail!(
6839 "\"{}\" is a {} not a {}",
6840 full_name,
6841 entry.item_type(),
6842 format!("{object_type}").to_lowercase()
6843 )
6844 }
6845
6846 match value {
6847 Some(val) => {
6848 let val = match val {
6849 WithOptionValue::Value(v) => v,
6850 _ => sql_bail!("TIMESTAMP INTERVAL requires an interval value"),
6851 };
6852 let duration = Duration::try_from_value(val.clone())?;
6853
6854 let min = scx.catalog.system_vars().min_timestamp_interval();
6855 let max = scx.catalog.system_vars().max_timestamp_interval();
6856 if duration < min || duration > max {
6857 return Err(PlanError::InvalidTimestampInterval {
6858 min,
6859 max,
6860 requested: duration,
6861 });
6862 }
6863
6864 Ok(Plan::AlterSourceTimestampInterval(
6865 AlterSourceTimestampIntervalPlan {
6866 id: entry.id(),
6867 value: Some(val),
6868 interval: duration,
6869 },
6870 ))
6871 }
6872 None => {
6873 let interval = scx.catalog.system_vars().default_timestamp_interval();
6874 Ok(Plan::AlterSourceTimestampInterval(
6875 AlterSourceTimestampIntervalPlan {
6876 id: entry.id(),
6877 value: None,
6878 interval,
6879 },
6880 ))
6881 }
6882 }
6883 }
6884 None => {
6885 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6886 name: source_name.to_ast_string_simple(),
6887 object_type,
6888 });
6889
6890 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6891 }
6892 }
6893}
6894
6895pub fn describe_alter_secret_options(
6896 _: &StatementContext,
6897 _: AlterSecretStatement<Aug>,
6898) -> Result<StatementDesc, PlanError> {
6899 Ok(StatementDesc::new(None))
6900}
6901
6902pub fn plan_alter_secret(
6903 scx: &mut StatementContext,
6904 stmt: AlterSecretStatement<Aug>,
6905) -> Result<Plan, PlanError> {
6906 let AlterSecretStatement {
6907 name,
6908 if_exists,
6909 value,
6910 } = stmt;
6911 let object_type = ObjectType::Secret;
6912 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6913 Some(entry) => entry.id(),
6914 None => {
6915 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6916 name: name.to_string(),
6917 object_type,
6918 });
6919
6920 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6921 }
6922 };
6923
6924 let secret_as = query::plan_secret_as(scx, value)?;
6925
6926 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6927}
6928
6929pub fn describe_alter_connection(
6930 _: &StatementContext,
6931 _: AlterConnectionStatement<Aug>,
6932) -> Result<StatementDesc, PlanError> {
6933 Ok(StatementDesc::new(None))
6934}
6935
6936generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6937
6938pub fn plan_alter_connection(
6939 scx: &StatementContext,
6940 stmt: AlterConnectionStatement<Aug>,
6941) -> Result<Plan, PlanError> {
6942 let AlterConnectionStatement {
6943 name,
6944 if_exists,
6945 actions,
6946 with_options,
6947 } = stmt;
6948 let conn_name = normalize::unresolved_item_name(name)?;
6949 let entry = match scx.catalog.resolve_item(&conn_name) {
6950 Ok(entry) => entry,
6951 Err(_) if if_exists => {
6952 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6953 name: conn_name.to_string(),
6954 object_type: ObjectType::Connection,
6955 });
6956
6957 return Ok(Plan::AlterNoop(AlterNoopPlan {
6958 object_type: ObjectType::Connection,
6959 }));
6960 }
6961 Err(e) => return Err(e.into()),
6962 };
6963
6964 let connection = entry.connection()?;
6965
6966 if actions
6967 .iter()
6968 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6969 {
6970 if actions.len() > 1 {
6971 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6972 }
6973
6974 if !with_options.is_empty() {
6975 sql_bail!(
6976 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6977 with_options
6978 .iter()
6979 .map(|o| o.to_ast_string_simple())
6980 .join(", ")
6981 );
6982 }
6983
6984 if !matches!(connection, Connection::Ssh(_)) {
6985 sql_bail!(
6986 "{} is not an SSH connection",
6987 scx.catalog.resolve_full_name(entry.name())
6988 )
6989 }
6990
6991 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6992 id: entry.id(),
6993 action: crate::plan::AlterConnectionAction::RotateKeys,
6994 }));
6995 }
6996
6997 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6998 if options.validate.is_some() {
6999 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
7000 }
7001
7002 let validate = match options.validate {
7003 Some(val) => val,
7004 None => {
7005 scx.catalog
7006 .system_vars()
7007 .enable_default_connection_validation()
7008 && connection.validate_by_default()
7009 }
7010 };
7011
7012 let connection_type = match connection {
7013 Connection::Aws(_) => CreateConnectionType::Aws,
7014 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
7015 Connection::Kafka(_) => CreateConnectionType::Kafka,
7016 Connection::Csr(_) => CreateConnectionType::Csr,
7017 Connection::Postgres(_) => CreateConnectionType::Postgres,
7018 Connection::Ssh(_) => CreateConnectionType::Ssh,
7019 Connection::MySql(_) => CreateConnectionType::MySql,
7020 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
7021 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
7022 };
7023
7024 let specified_options: BTreeSet<_> = actions
7026 .iter()
7027 .map(|action: &AlterConnectionAction<Aug>| match action {
7028 AlterConnectionAction::SetOption(option) => Ok(option.name.clone()),
7029 AlterConnectionAction::DropOption(name) => Ok(name.clone()),
7030 AlterConnectionAction::RotateKeys => {
7031 Err(internal_err!("RotateKeys is handled separately above"))
7032 }
7033 })
7034 .collect::<Result<_, PlanError>>()?;
7035
7036 for invalid in INALTERABLE_OPTIONS {
7037 if specified_options.contains(invalid) {
7038 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
7039 }
7040 }
7041
7042 connection::validate_options_per_connection_type(connection_type, specified_options)?;
7043
7044 let mut set_options_vec: Vec<_> = Vec::new();
7046 let mut drop_options: BTreeSet<_> = BTreeSet::new();
7047 for action in actions {
7048 match action {
7049 AlterConnectionAction::SetOption(option) => set_options_vec.push(option),
7050 AlterConnectionAction::DropOption(name) => {
7051 drop_options.insert(name);
7052 }
7053 AlterConnectionAction::RotateKeys => {
7054 bail_internal!("RotateKeys is handled separately above")
7055 }
7056 }
7057 }
7058
7059 let set_options: BTreeMap<_, _> = set_options_vec
7060 .clone()
7061 .into_iter()
7062 .map(|option| (option.name, option.value))
7063 .collect();
7064
7065 let connection_options_extracted =
7069 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
7070
7071 let duplicates: Vec<_> = connection_options_extracted
7072 .seen
7073 .intersection(&drop_options)
7074 .collect();
7075
7076 if !duplicates.is_empty() {
7077 sql_bail!(
7078 "cannot both SET and DROP/RESET options {}",
7079 duplicates
7080 .iter()
7081 .map(|option| option.to_string())
7082 .join(", ")
7083 )
7084 }
7085
7086 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
7087 let set_options_count = mutually_exclusive_options
7088 .iter()
7089 .filter(|o| set_options.contains_key(o))
7090 .count();
7091 let drop_options_count = mutually_exclusive_options
7092 .iter()
7093 .filter(|o| drop_options.contains(o))
7094 .count();
7095
7096 if set_options_count > 0 && drop_options_count > 0 {
7098 sql_bail!(
7099 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
7100 connection_type,
7101 mutually_exclusive_options
7102 .iter()
7103 .map(|option| option.to_string())
7104 .join(", ")
7105 )
7106 }
7107
7108 if set_options_count > 0 || drop_options_count > 0 {
7113 drop_options.extend(mutually_exclusive_options.iter().cloned());
7114 }
7115
7116 }
7119
7120 Ok(Plan::AlterConnection(AlterConnectionPlan {
7121 id: entry.id(),
7122 action: crate::plan::AlterConnectionAction::AlterOptions {
7123 set_options,
7124 drop_options,
7125 validate,
7126 },
7127 }))
7128}
7129
7130pub fn describe_alter_sink(
7131 _: &StatementContext,
7132 _: AlterSinkStatement<Aug>,
7133) -> Result<StatementDesc, PlanError> {
7134 Ok(StatementDesc::new(None))
7135}
7136
7137pub fn plan_alter_sink(
7138 scx: &mut StatementContext,
7139 stmt: AlterSinkStatement<Aug>,
7140) -> Result<Plan, PlanError> {
7141 let AlterSinkStatement {
7142 sink_name,
7143 if_exists,
7144 action,
7145 } = stmt;
7146
7147 let object_type = ObjectType::Sink;
7148 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
7149
7150 let Some(item) = item else {
7151 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7152 name: sink_name.to_string(),
7153 object_type,
7154 });
7155
7156 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7157 };
7158 let item = item.at_version(RelationVersionSelector::Latest);
7160
7161 match action {
7162 AlterSinkAction::ChangeRelation(new_from) => {
7163 let create_sql = item.create_sql();
7165 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
7166 let [stmt]: [StatementParseResult; 1] = stmts
7167 .try_into()
7168 .map_err(|_| internal_err!("create SQL of sink was not exactly one statement"))?;
7169 let Statement::CreateSink(stmt) = stmt.ast else {
7170 bail_internal!("create SQL of sink is not a CREATE SINK statement");
7171 };
7172
7173 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7175 stmt.from = new_from;
7176
7177 let Plan::CreateSink(mut plan) = plan_sink(scx, stmt)? else {
7179 bail_internal!("plan_sink did not produce a CreateSink plan");
7180 };
7181
7182 plan.sink.version += 1;
7183
7184 Ok(Plan::AlterSink(AlterSinkPlan {
7185 item_id: item.id(),
7186 global_id: item.global_id(),
7187 sink: plan.sink,
7188 with_snapshot: plan.with_snapshot,
7189 in_cluster: plan.in_cluster,
7190 }))
7191 }
7192 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7193 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7194 }
7195}
7196
7197pub fn describe_alter_source(
7198 _: &StatementContext,
7199 _: AlterSourceStatement<Aug>,
7200) -> Result<StatementDesc, PlanError> {
7201 Ok(StatementDesc::new(None))
7203}
7204
7205generate_extracted_config!(
7206 AlterSourceAddSubsourceOption,
7207 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7208 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7209 (Details, String)
7210);
7211
7212pub fn plan_alter_source(
7213 scx: &mut StatementContext,
7214 stmt: AlterSourceStatement<Aug>,
7215) -> Result<Plan, PlanError> {
7216 let AlterSourceStatement {
7217 source_name,
7218 if_exists,
7219 action,
7220 } = stmt;
7221 let object_type = ObjectType::Source;
7222
7223 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7224 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7225 name: source_name.to_string(),
7226 object_type,
7227 });
7228
7229 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7230 }
7231
7232 match action {
7233 AlterSourceAction::SetOptions(options) => {
7234 let mut options = options.into_iter();
7235 let option = options
7236 .next()
7237 .ok_or_else(|| sql_err!("ALTER SOURCE SET requires at least one option"))?;
7238 if option.name == CreateSourceOptionName::RetainHistory {
7239 if options.next().is_some() {
7240 sql_bail!("RETAIN HISTORY must be only option");
7241 }
7242 return alter_retain_history(
7243 scx,
7244 object_type,
7245 if_exists,
7246 UnresolvedObjectName::Item(source_name),
7247 option.value,
7248 );
7249 }
7250 if option.name == CreateSourceOptionName::TimestampInterval {
7251 if options.next().is_some() {
7252 sql_bail!("TIMESTAMP INTERVAL must be only option");
7253 }
7254 return alter_source_timestamp_interval(scx, if_exists, source_name, option.value);
7255 }
7256 sql_bail!(
7259 "Cannot modify the {} of a SOURCE.",
7260 option.name.to_ast_string_simple()
7261 );
7262 }
7263 AlterSourceAction::ResetOptions(reset) => {
7264 let mut options = reset.into_iter();
7265 let option = options
7266 .next()
7267 .ok_or_else(|| sql_err!("ALTER SOURCE RESET requires at least one option"))?;
7268 if option == CreateSourceOptionName::RetainHistory {
7269 if options.next().is_some() {
7270 sql_bail!("RETAIN HISTORY must be only option");
7271 }
7272 return alter_retain_history(
7273 scx,
7274 object_type,
7275 if_exists,
7276 UnresolvedObjectName::Item(source_name),
7277 None,
7278 );
7279 }
7280 if option == CreateSourceOptionName::TimestampInterval {
7281 if options.next().is_some() {
7282 sql_bail!("TIMESTAMP INTERVAL must be only option");
7283 }
7284 return alter_source_timestamp_interval(scx, if_exists, source_name, None);
7285 }
7286 sql_bail!(
7287 "Cannot modify the {} of a SOURCE.",
7288 option.to_ast_string_simple()
7289 );
7290 }
7291 AlterSourceAction::DropSubsources { .. } => {
7292 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7293 }
7294 AlterSourceAction::AddSubsources { .. } => {
7295 sql_bail!("ALTER SOURCE...ADD SUBSOURCE must be purified before planning")
7296 }
7297 AlterSourceAction::RefreshReferences => {
7298 sql_bail!("ALTER SOURCE...REFRESH REFERENCES must be purified before planning")
7299 }
7300 };
7301}
7302
7303pub fn describe_alter_system_set(
7304 _: &StatementContext,
7305 _: AlterSystemSetStatement,
7306) -> Result<StatementDesc, PlanError> {
7307 Ok(StatementDesc::new(None))
7308}
7309
7310pub fn plan_alter_system_set(
7311 _: &StatementContext,
7312 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7313) -> Result<Plan, PlanError> {
7314 let name = name.to_string();
7315 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7316 name,
7317 value: scl::plan_set_variable_to(to)?,
7318 }))
7319}
7320
7321pub fn describe_alter_system_reset(
7322 _: &StatementContext,
7323 _: AlterSystemResetStatement,
7324) -> Result<StatementDesc, PlanError> {
7325 Ok(StatementDesc::new(None))
7326}
7327
7328pub fn plan_alter_system_reset(
7329 _: &StatementContext,
7330 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7331) -> Result<Plan, PlanError> {
7332 let name = name.to_string();
7333 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7334}
7335
7336pub fn describe_alter_system_reset_all(
7337 _: &StatementContext,
7338 _: AlterSystemResetAllStatement,
7339) -> Result<StatementDesc, PlanError> {
7340 Ok(StatementDesc::new(None))
7341}
7342
7343pub fn plan_alter_system_reset_all(
7344 _: &StatementContext,
7345 _: AlterSystemResetAllStatement,
7346) -> Result<Plan, PlanError> {
7347 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7348}
7349
7350pub fn describe_alter_role(
7351 _: &StatementContext,
7352 _: AlterRoleStatement<Aug>,
7353) -> Result<StatementDesc, PlanError> {
7354 Ok(StatementDesc::new(None))
7355}
7356
7357pub fn plan_alter_role(
7358 scx: &StatementContext,
7359 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7360) -> Result<Plan, PlanError> {
7361 let option = match option {
7362 AlterRoleOption::Attributes(attrs) => {
7363 let attrs = plan_role_attributes(attrs, scx)?;
7364 PlannedAlterRoleOption::Attributes(attrs)
7365 }
7366 AlterRoleOption::Variable(variable) => {
7367 let var = plan_role_variable(variable)?;
7368 PlannedAlterRoleOption::Variable(var)
7369 }
7370 };
7371
7372 Ok(Plan::AlterRole(AlterRolePlan {
7373 id: name.id,
7374 name: name.name,
7375 option,
7376 }))
7377}
7378
7379pub fn describe_alter_table_add_column(
7380 _: &StatementContext,
7381 _: AlterTableAddColumnStatement<Aug>,
7382) -> Result<StatementDesc, PlanError> {
7383 Ok(StatementDesc::new(None))
7384}
7385
7386pub fn plan_alter_table_add_column(
7387 scx: &StatementContext,
7388 stmt: AlterTableAddColumnStatement<Aug>,
7389) -> Result<Plan, PlanError> {
7390 let AlterTableAddColumnStatement {
7391 if_exists,
7392 name,
7393 if_col_not_exist,
7394 column_name,
7395 data_type,
7396 } = stmt;
7397 let object_type = ObjectType::Table;
7398
7399 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7400
7401 let (relation_id, item_name, desc) =
7402 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7403 Some(item) => {
7404 let item_name = scx.catalog.resolve_full_name(item.name());
7406 let item = item.at_version(RelationVersionSelector::Latest);
7407 let desc = item
7408 .relation_desc()
7409 .ok_or_else(|| sql_err!("item does not have a relation description"))?
7410 .into_owned();
7411 (item.id(), item_name, desc)
7412 }
7413 None => {
7414 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7415 name: name.to_ast_string_simple(),
7416 object_type,
7417 });
7418 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7419 }
7420 };
7421
7422 let column_name = ColumnName::from(column_name.as_str());
7423 if desc.get_by_name(&column_name).is_some() {
7424 if if_col_not_exist {
7425 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7426 column_name: column_name.to_string(),
7427 object_name: item_name.item,
7428 });
7429 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7430 } else {
7431 return Err(PlanError::ColumnAlreadyExists {
7432 column_name,
7433 object_name: item_name.item,
7434 });
7435 }
7436 }
7437
7438 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7439 let column_type = scalar_type.nullable(true);
7441 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7443
7444 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7445 relation_id,
7446 column_name,
7447 column_type,
7448 raw_sql_type,
7449 }))
7450}
7451
7452pub fn describe_alter_materialized_view_apply_replacement(
7453 _: &StatementContext,
7454 _: AlterMaterializedViewApplyReplacementStatement,
7455) -> Result<StatementDesc, PlanError> {
7456 Ok(StatementDesc::new(None))
7457}
7458
7459pub fn plan_alter_materialized_view_apply_replacement(
7460 scx: &StatementContext,
7461 stmt: AlterMaterializedViewApplyReplacementStatement,
7462) -> Result<Plan, PlanError> {
7463 let AlterMaterializedViewApplyReplacementStatement {
7464 if_exists,
7465 name,
7466 replacement_name,
7467 } = stmt;
7468
7469 scx.require_feature_flag(&vars::ENABLE_REPLACEMENT_MATERIALIZED_VIEWS)?;
7470
7471 let object_type = ObjectType::MaterializedView;
7472 let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else {
7473 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7474 name: name.to_ast_string_simple(),
7475 object_type,
7476 });
7477 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7478 };
7479
7480 let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)?
7481 .ok_or_else(|| sql_err!("replacement materialized view does not exist"))?;
7482
7483 if replacement.replacement_target() != Some(mv.id()) {
7484 return Err(PlanError::InvalidReplacement {
7485 item_type: mv.item_type(),
7486 item_name: scx.catalog.minimal_qualification(mv.name()),
7487 replacement_type: replacement.item_type(),
7488 replacement_name: scx.catalog.minimal_qualification(replacement.name()),
7489 });
7490 }
7491
7492 Ok(Plan::AlterMaterializedViewApplyReplacement(
7493 AlterMaterializedViewApplyReplacementPlan {
7494 id: mv.id(),
7495 replacement_id: replacement.id(),
7496 },
7497 ))
7498}
7499
7500pub fn describe_comment(
7501 _: &StatementContext,
7502 _: CommentStatement<Aug>,
7503) -> Result<StatementDesc, PlanError> {
7504 Ok(StatementDesc::new(None))
7505}
7506
7507pub fn plan_comment(
7508 scx: &mut StatementContext,
7509 stmt: CommentStatement<Aug>,
7510) -> Result<Plan, PlanError> {
7511 const MAX_COMMENT_LENGTH: usize = 1024;
7512
7513 let CommentStatement { object, comment } = stmt;
7514
7515 if let Some(c) = &comment {
7517 if c.len() > 1024 {
7518 return Err(PlanError::CommentTooLong {
7519 length: c.len(),
7520 max_size: MAX_COMMENT_LENGTH,
7521 });
7522 }
7523 }
7524
7525 let (object_id, column_pos) = match &object {
7526 com_ty @ CommentObjectType::Table { name }
7527 | com_ty @ CommentObjectType::View { name }
7528 | com_ty @ CommentObjectType::MaterializedView { name }
7529 | com_ty @ CommentObjectType::Index { name }
7530 | com_ty @ CommentObjectType::Func { name }
7531 | com_ty @ CommentObjectType::Connection { name }
7532 | com_ty @ CommentObjectType::Source { name }
7533 | com_ty @ CommentObjectType::Sink { name }
7534 | com_ty @ CommentObjectType::Secret { name } => {
7535 let item = scx.get_item_by_resolved_name(name)?;
7536 match (com_ty, item.item_type()) {
7537 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7538 (CommentObjectId::Table(item.id()), None)
7539 }
7540 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7541 (CommentObjectId::View(item.id()), None)
7542 }
7543 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7544 (CommentObjectId::MaterializedView(item.id()), None)
7545 }
7546 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7547 (CommentObjectId::Index(item.id()), None)
7548 }
7549 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7550 (CommentObjectId::Func(item.id()), None)
7551 }
7552 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7553 (CommentObjectId::Connection(item.id()), None)
7554 }
7555 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7556 (CommentObjectId::Source(item.id()), None)
7557 }
7558 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7559 (CommentObjectId::Sink(item.id()), None)
7560 }
7561 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7562 (CommentObjectId::Secret(item.id()), None)
7563 }
7564 (com_ty, cat_ty) => {
7565 let expected_type = match com_ty {
7566 CommentObjectType::Table { .. } => ObjectType::Table,
7567 CommentObjectType::View { .. } => ObjectType::View,
7568 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7569 CommentObjectType::Index { .. } => ObjectType::Index,
7570 CommentObjectType::Func { .. } => ObjectType::Func,
7571 CommentObjectType::Connection { .. } => ObjectType::Connection,
7572 CommentObjectType::Source { .. } => ObjectType::Source,
7573 CommentObjectType::Sink { .. } => ObjectType::Sink,
7574 CommentObjectType::Secret { .. } => ObjectType::Secret,
7575 _ => sql_bail!("cannot comment on this object type"),
7576 };
7577
7578 return Err(PlanError::InvalidObjectType {
7579 expected_type: SystemObjectType::Object(expected_type),
7580 actual_type: SystemObjectType::Object(cat_ty.into()),
7581 object_name: item.name().item.clone(),
7582 });
7583 }
7584 }
7585 }
7586 CommentObjectType::Type { ty } => match ty {
7587 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7588 sql_bail!("cannot comment on anonymous list or map type");
7589 }
7590 ResolvedDataType::Named { id, modifiers, .. } => {
7591 if !modifiers.is_empty() {
7592 sql_bail!("cannot comment on type with modifiers");
7593 }
7594 (CommentObjectId::Type(*id), None)
7595 }
7596 ResolvedDataType::Error => bail_internal!("unresolved data type"),
7597 },
7598 CommentObjectType::Column { name } => {
7599 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7600 match item.item_type() {
7601 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7602 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7603 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7604 CatalogItemType::MaterializedView => {
7605 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7606 }
7607 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7608 r => {
7609 return Err(PlanError::Unsupported {
7610 feature: format!("Specifying comments on a column of {r}"),
7611 discussion_no: None,
7612 });
7613 }
7614 }
7615 }
7616 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7617 CommentObjectType::Database { name } => {
7618 (CommentObjectId::Database(*name.database_id()), None)
7619 }
7620 CommentObjectType::Schema { name } => {
7621 if matches!(name.schema_spec(), SchemaSpecifier::Temporary) {
7625 sql_bail!(
7626 "cannot comment on schema {} because it is a temporary schema",
7627 mz_repr::namespaces::MZ_TEMP_SCHEMA
7628 );
7629 }
7630 (
7631 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7632 None,
7633 )
7634 }
7635 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7636 CommentObjectType::ClusterReplica { name } => {
7637 let replica = scx.catalog.resolve_cluster_replica(name)?;
7638 (
7639 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7640 None,
7641 )
7642 }
7643 CommentObjectType::NetworkPolicy { name } => {
7644 (CommentObjectId::NetworkPolicy(name.id), None)
7645 }
7646 };
7647
7648 if let Some(p) = column_pos {
7654 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7655 max_num_columns: MAX_NUM_COLUMNS,
7656 req_num_columns: p,
7657 })?;
7658 }
7659
7660 Ok(Plan::Comment(CommentPlan {
7661 object_id,
7662 sub_component: column_pos,
7663 comment,
7664 }))
7665}
7666
7667pub(crate) fn resolve_cluster<'a>(
7668 scx: &'a StatementContext,
7669 name: &'a Ident,
7670 if_exists: bool,
7671) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7672 match scx.resolve_cluster(Some(name)) {
7673 Ok(cluster) => Ok(Some(cluster)),
7674 Err(_) if if_exists => Ok(None),
7675 Err(e) => Err(e),
7676 }
7677}
7678
7679pub(crate) fn resolve_cluster_replica<'a>(
7680 scx: &'a StatementContext,
7681 name: &QualifiedReplica,
7682 if_exists: bool,
7683) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7684 match scx.resolve_cluster(Some(&name.cluster)) {
7685 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7686 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7687 None if if_exists => Ok(None),
7688 None => Err(sql_err!(
7689 "CLUSTER {} has no CLUSTER REPLICA named {}",
7690 cluster.name(),
7691 name.replica.as_str().quoted(),
7692 )),
7693 },
7694 Err(_) if if_exists => Ok(None),
7695 Err(e) => Err(e),
7696 }
7697}
7698
7699pub(crate) fn resolve_database<'a>(
7700 scx: &'a StatementContext,
7701 name: &'a UnresolvedDatabaseName,
7702 if_exists: bool,
7703) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7704 match scx.resolve_database(name) {
7705 Ok(database) => Ok(Some(database)),
7706 Err(_) if if_exists => Ok(None),
7707 Err(e) => Err(e),
7708 }
7709}
7710
7711pub(crate) fn resolve_schema<'a>(
7712 scx: &'a StatementContext,
7713 name: UnresolvedSchemaName,
7714 if_exists: bool,
7715) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7716 match scx.resolve_schema(name) {
7717 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7718 Err(_) if if_exists => Ok(None),
7719 Err(e) => Err(e),
7720 }
7721}
7722
7723pub(crate) fn resolve_network_policy<'a>(
7724 scx: &'a StatementContext,
7725 name: Ident,
7726 if_exists: bool,
7727) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7728 match scx.catalog.resolve_network_policy(&name.to_string()) {
7729 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7730 id: policy.id(),
7731 name: policy.name().to_string(),
7732 })),
7733 Err(_) if if_exists => Ok(None),
7734 Err(e) => Err(e.into()),
7735 }
7736}
7737
7738pub(crate) fn resolve_item_or_type<'a>(
7739 scx: &'a StatementContext,
7740 object_type: ObjectType,
7741 name: UnresolvedItemName,
7742 if_exists: bool,
7743) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7744 let name = normalize::unresolved_item_name(name)?;
7745 let catalog_item = match object_type {
7746 ObjectType::Type => scx.catalog.resolve_type(&name),
7747 ObjectType::Table
7748 | ObjectType::View
7749 | ObjectType::MaterializedView
7750 | ObjectType::Source
7751 | ObjectType::Sink
7752 | ObjectType::Index
7753 | ObjectType::Role
7754 | ObjectType::Cluster
7755 | ObjectType::ClusterReplica
7756 | ObjectType::Secret
7757 | ObjectType::Connection
7758 | ObjectType::Database
7759 | ObjectType::Schema
7760 | ObjectType::Func
7761 | ObjectType::NetworkPolicy => scx.catalog.resolve_item(&name),
7762 };
7763
7764 match catalog_item {
7765 Ok(item) => {
7766 let is_type = ObjectType::from(item.item_type());
7767 if object_type == is_type {
7768 Ok(Some(item))
7769 } else {
7770 Err(PlanError::MismatchedObjectType {
7771 name: scx.catalog.minimal_qualification(item.name()),
7772 is_type,
7773 expected_type: object_type,
7774 })
7775 }
7776 }
7777 Err(_) if if_exists => Ok(None),
7778 Err(e) => Err(e.into()),
7779 }
7780}
7781
7782fn ensure_cluster_is_not_managed(
7784 scx: &StatementContext,
7785 cluster_id: ClusterId,
7786) -> Result<(), PlanError> {
7787 let cluster = scx.catalog.get_cluster(cluster_id);
7788 if cluster.is_managed() {
7789 Err(PlanError::ManagedCluster {
7790 cluster_name: cluster.name().to_string(),
7791 })
7792 } else {
7793 Ok(())
7794 }
7795}