1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_ore::vec::VecExt;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::adt::interval::Interval;
36use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
37use mz_repr::network_policy_id::NetworkPolicyId;
38use mz_repr::optimize::OptimizerFeatureOverrides;
39use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
40use mz_repr::role_id::RoleId;
41use mz_repr::{
42 CatalogItemId, ColumnName, ColumnType, RelationDesc, RelationType, RelationVersion,
43 RelationVersionSelector, ScalarType, Timestamp, VersionedRelationDesc, preserves_order,
44 strconv,
45};
46use mz_sql_parser::ast::{
47 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
48 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
49 AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
50 AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
51 AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
52 AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
53 AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
54 AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
55 ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
56 ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
57 ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
58 ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
59 ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
60 CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
61 CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
62 CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
63 CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
64 CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
65 CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
66 CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
67 CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
68 CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
69 CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
70 CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
71 CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
72 DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
73 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
74 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
75 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
76 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
77 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
78 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
79 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
80 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
81 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
82 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
83 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
84};
85use mz_sql_parser::ident;
86use mz_sql_parser::parser::StatementParseResult;
87use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
88use mz_storage_types::connections::{Connection, KafkaTopicOptions};
89use mz_storage_types::sinks::{
90 KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
91 StorageSinkConnection,
92};
93use mz_storage_types::sources::encoding::{
94 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
95 SourceDataEncoding, included_column_desc,
96};
97use mz_storage_types::sources::envelope::{
98 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
99};
100use mz_storage_types::sources::kafka::{
101 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
102};
103use mz_storage_types::sources::load_generator::{
104 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
105 LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
106};
107use mz_storage_types::sources::mysql::{
108 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
109};
110use mz_storage_types::sources::postgres::{
111 PostgresSourceConnection, PostgresSourcePublicationDetails,
112 ProtoPostgresSourcePublicationDetails,
113};
114use mz_storage_types::sources::sql_server::SqlServerSourceExportDetails;
115use mz_storage_types::sources::{
116 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
117 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
118 SourceExportDetails, SourceExportStatementDetails, SqlServerSource, SqlServerSourceExtras,
119 Timeline,
120};
121use prost::Message;
122
123use crate::ast::display::AstDisplay;
124use crate::catalog::{
125 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
126 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
127};
128use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
129use crate::names::{
130 Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
131 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
132 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
133};
134use crate::normalize::{self, ident};
135use crate::plan::error::PlanError;
136use crate::plan::query::{
137 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
138 scalar_type_from_sql,
139};
140use crate::plan::scope::Scope;
141use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
142use crate::plan::statement::{StatementContext, StatementDesc, scl};
143use crate::plan::typeconv::CastContext;
144use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
145use crate::plan::{
146 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
147 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
148 AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
149 AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
150 AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
151 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
152 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
153 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
154 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
155 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
156 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
157 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
158 Ingestion, MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction,
159 NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice, PolicyAddress, QueryContext,
160 ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View,
161 WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal,
162 plan_utils, query, transform_ast,
163};
164use crate::session::vars::{
165 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
166 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
167};
168use crate::{names, parse};
169
170mod connection;
171
172const MAX_NUM_COLUMNS: usize = 256;
177
178static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
179 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
180
181fn check_partition_by(desc: &RelationDesc, partition_by: Vec<Ident>) -> Result<(), PlanError> {
185 for (idx, ((desc_name, desc_type), partition_name)) in
186 desc.iter().zip(partition_by.into_iter()).enumerate()
187 {
188 let partition_name = normalize::column_name(partition_name);
189 if *desc_name != partition_name {
190 sql_bail!(
191 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
192 );
193 }
194 if !preserves_order(&desc_type.scalar_type) {
195 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
196 }
197 }
198 Ok(())
199}
200
201pub fn describe_create_database(
202 _: &StatementContext,
203 _: CreateDatabaseStatement,
204) -> Result<StatementDesc, PlanError> {
205 Ok(StatementDesc::new(None))
206}
207
208pub fn plan_create_database(
209 _: &StatementContext,
210 CreateDatabaseStatement {
211 name,
212 if_not_exists,
213 }: CreateDatabaseStatement,
214) -> Result<Plan, PlanError> {
215 Ok(Plan::CreateDatabase(CreateDatabasePlan {
216 name: normalize::ident(name.0),
217 if_not_exists,
218 }))
219}
220
221pub fn describe_create_schema(
222 _: &StatementContext,
223 _: CreateSchemaStatement,
224) -> Result<StatementDesc, PlanError> {
225 Ok(StatementDesc::new(None))
226}
227
228pub fn plan_create_schema(
229 scx: &StatementContext,
230 CreateSchemaStatement {
231 mut name,
232 if_not_exists,
233 }: CreateSchemaStatement,
234) -> Result<Plan, PlanError> {
235 if name.0.len() > 2 {
236 sql_bail!("schema name {} has more than two components", name);
237 }
238 let schema_name = normalize::ident(
239 name.0
240 .pop()
241 .expect("names always have at least one component"),
242 );
243 let database_spec = match name.0.pop() {
244 None => match scx.catalog.active_database() {
245 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
246 None => sql_bail!("no database specified and no active database"),
247 },
248 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
249 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
250 Err(_) => sql_bail!("invalid database {}", n.as_str()),
251 },
252 };
253 Ok(Plan::CreateSchema(CreateSchemaPlan {
254 database_spec,
255 schema_name,
256 if_not_exists,
257 }))
258}
259
260pub fn describe_create_table(
261 _: &StatementContext,
262 _: CreateTableStatement<Aug>,
263) -> Result<StatementDesc, PlanError> {
264 Ok(StatementDesc::new(None))
265}
266
267pub fn plan_create_table(
268 scx: &StatementContext,
269 stmt: CreateTableStatement<Aug>,
270) -> Result<Plan, PlanError> {
271 let CreateTableStatement {
272 name,
273 columns,
274 constraints,
275 if_not_exists,
276 temporary,
277 with_options,
278 } = &stmt;
279
280 let names: Vec<_> = columns
281 .iter()
282 .filter(|c| {
283 let is_versioned = c
287 .options
288 .iter()
289 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
290 !is_versioned
291 })
292 .map(|c| normalize::column_name(c.name.clone()))
293 .collect();
294
295 if let Some(dup) = names.iter().duplicates().next() {
296 sql_bail!("column {} specified more than once", dup.quoted());
297 }
298
299 let mut column_types = Vec::with_capacity(columns.len());
302 let mut defaults = Vec::with_capacity(columns.len());
303 let mut changes = BTreeMap::new();
304 let mut keys = Vec::new();
305
306 for (i, c) in columns.into_iter().enumerate() {
307 let aug_data_type = &c.data_type;
308 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
309 let mut nullable = true;
310 let mut default = Expr::null();
311 let mut versioned = false;
312 for option in &c.options {
313 match &option.option {
314 ColumnOption::NotNull => nullable = false,
315 ColumnOption::Default(expr) => {
316 let mut expr = expr.clone();
319 transform_ast::transform(scx, &mut expr)?;
320 let _ = query::plan_default_expr(scx, &expr, &ty)?;
321 default = expr.clone();
322 }
323 ColumnOption::Unique { is_primary } => {
324 keys.push(vec![i]);
325 if *is_primary {
326 nullable = false;
327 }
328 }
329 ColumnOption::Versioned { action, version } => {
330 let version = RelationVersion::from(*version);
331 versioned = true;
332
333 let name = normalize::column_name(c.name.clone());
334 let typ = ty.clone().nullable(nullable);
335
336 changes.insert(version, (action.clone(), name, typ));
337 }
338 other => {
339 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
340 }
341 }
342 }
343 if !versioned {
346 column_types.push(ty.nullable(nullable));
347 }
348 defaults.push(default);
349 }
350
351 let mut seen_primary = false;
352 'c: for constraint in constraints {
353 match constraint {
354 TableConstraint::Unique {
355 name: _,
356 columns,
357 is_primary,
358 nulls_not_distinct,
359 } => {
360 if seen_primary && *is_primary {
361 sql_bail!(
362 "multiple primary keys for table {} are not allowed",
363 name.to_ast_string_stable()
364 );
365 }
366 seen_primary = *is_primary || seen_primary;
367
368 let mut key = vec![];
369 for column in columns {
370 let column = normalize::column_name(column.clone());
371 match names.iter().position(|name| *name == column) {
372 None => sql_bail!("unknown column in constraint: {}", column),
373 Some(i) => {
374 let nullable = &mut column_types[i].nullable;
375 if *is_primary {
376 if *nulls_not_distinct {
377 sql_bail!(
378 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
379 );
380 }
381
382 *nullable = false;
383 } else if !(*nulls_not_distinct || !*nullable) {
384 break 'c;
387 }
388
389 key.push(i);
390 }
391 }
392 }
393
394 if *is_primary {
395 keys.insert(0, key);
396 } else {
397 keys.push(key);
398 }
399 }
400 TableConstraint::ForeignKey { .. } => {
401 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
404 }
405 TableConstraint::Check { .. } => {
406 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
409 }
410 }
411 }
412
413 if !keys.is_empty() {
414 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
417 }
418
419 let typ = RelationType::new(column_types).with_keys(keys);
420
421 let temporary = *temporary;
422 let name = if temporary {
423 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
424 } else {
425 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
426 };
427
428 let full_name = scx.catalog.resolve_full_name(&name);
430 let partial_name = PartialItemName::from(full_name.clone());
431 if let (false, Ok(item)) = (
434 if_not_exists,
435 scx.catalog.resolve_item_or_type(&partial_name),
436 ) {
437 return Err(PlanError::ItemAlreadyExists {
438 name: full_name.to_string(),
439 item_type: item.item_type(),
440 });
441 }
442
443 let desc = RelationDesc::new(typ, names);
444 let mut desc = VersionedRelationDesc::new(desc);
445 for (version, (_action, name, typ)) in changes.into_iter() {
446 let new_version = desc.add_column(name, typ);
447 if version != new_version {
448 return Err(PlanError::InvalidTable {
449 name: full_name.item,
450 });
451 }
452 }
453
454 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
455
456 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
462 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
463
464 let compaction_window = options.iter().find_map(|o| {
465 #[allow(irrefutable_let_patterns)]
466 if let crate::plan::TableOption::RetainHistory(lcw) = o {
467 Some(lcw.clone())
468 } else {
469 None
470 }
471 });
472
473 let table = Table {
474 create_sql,
475 desc,
476 temporary,
477 compaction_window,
478 data_source: TableDataSource::TableWrites { defaults },
479 };
480 Ok(Plan::CreateTable(CreateTablePlan {
481 name,
482 table,
483 if_not_exists: *if_not_exists,
484 }))
485}
486
487pub fn describe_create_table_from_source(
488 _: &StatementContext,
489 _: CreateTableFromSourceStatement<Aug>,
490) -> Result<StatementDesc, PlanError> {
491 Ok(StatementDesc::new(None))
492}
493
494pub fn describe_create_webhook_source(
495 _: &StatementContext,
496 _: CreateWebhookSourceStatement<Aug>,
497) -> Result<StatementDesc, PlanError> {
498 Ok(StatementDesc::new(None))
499}
500
501pub fn describe_create_source(
502 _: &StatementContext,
503 _: CreateSourceStatement<Aug>,
504) -> Result<StatementDesc, PlanError> {
505 Ok(StatementDesc::new(None))
506}
507
508pub fn describe_create_subsource(
509 _: &StatementContext,
510 _: CreateSubsourceStatement<Aug>,
511) -> Result<StatementDesc, PlanError> {
512 Ok(StatementDesc::new(None))
513}
514
515generate_extracted_config!(
516 CreateSourceOption,
517 (TimestampInterval, Duration),
518 (RetainHistory, OptionalDuration)
519);
520
521generate_extracted_config!(
522 PgConfigOption,
523 (Details, String),
524 (Publication, String),
525 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
526);
527
528generate_extracted_config!(
529 MySqlConfigOption,
530 (Details, String),
531 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
532 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
533);
534
535generate_extracted_config!(
536 SqlServerConfigOption,
537 (Details, String),
538 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
539 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
540);
541
542pub fn plan_create_webhook_source(
543 scx: &StatementContext,
544 mut stmt: CreateWebhookSourceStatement<Aug>,
545) -> Result<Plan, PlanError> {
546 if stmt.is_table {
547 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
548 }
549
550 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
553 let enable_multi_replica_sources =
554 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
555 if !enable_multi_replica_sources {
556 if in_cluster.replica_ids().len() > 1 {
557 sql_bail!("cannot create webhook source in cluster with more than one replica")
558 }
559 }
560 let create_sql =
561 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
562
563 let CreateWebhookSourceStatement {
564 name,
565 if_not_exists,
566 body_format,
567 include_headers,
568 validate_using,
569 is_table,
570 in_cluster: _,
572 } = stmt;
573
574 let validate_using = validate_using
575 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
576 .transpose()?;
577 if let Some(WebhookValidation { expression, .. }) = &validate_using {
578 if !expression.contains_column() {
581 return Err(PlanError::WebhookValidationDoesNotUseColumns);
582 }
583 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
587 return Err(PlanError::WebhookValidationNonDeterministic);
588 }
589 }
590
591 let body_format = match body_format {
592 Format::Bytes => WebhookBodyFormat::Bytes,
593 Format::Json { array } => WebhookBodyFormat::Json { array },
594 Format::Text => WebhookBodyFormat::Text,
595 ty => {
597 return Err(PlanError::Unsupported {
598 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
599 discussion_no: None,
600 });
601 }
602 };
603
604 let mut column_ty = vec![
605 ColumnType {
607 scalar_type: ScalarType::from(body_format),
608 nullable: false,
609 },
610 ];
611 let mut column_names = vec!["body".to_string()];
612
613 let mut headers = WebhookHeaders::default();
614
615 if let Some(filters) = include_headers.column {
617 column_ty.push(ColumnType {
618 scalar_type: ScalarType::Map {
619 value_type: Box::new(ScalarType::String),
620 custom_id: None,
621 },
622 nullable: false,
623 });
624 column_names.push("headers".to_string());
625
626 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
627 filters.into_iter().partition_map(|filter| {
628 if filter.block {
629 itertools::Either::Right(filter.header_name)
630 } else {
631 itertools::Either::Left(filter.header_name)
632 }
633 });
634 headers.header_column = Some(WebhookHeaderFilters { allow, block });
635 }
636
637 for header in include_headers.mappings {
639 let scalar_type = header
640 .use_bytes
641 .then_some(ScalarType::Bytes)
642 .unwrap_or(ScalarType::String);
643 column_ty.push(ColumnType {
644 scalar_type,
645 nullable: true,
646 });
647 column_names.push(header.column_name.into_string());
648
649 let column_idx = column_ty.len() - 1;
650 assert_eq!(
652 column_idx,
653 column_names.len() - 1,
654 "header column names and types don't match"
655 );
656 headers
657 .mapped_headers
658 .insert(column_idx, (header.header_name, header.use_bytes));
659 }
660
661 let mut unique_check = HashSet::with_capacity(column_names.len());
663 for name in &column_names {
664 if !unique_check.insert(name) {
665 return Err(PlanError::AmbiguousColumn(name.clone().into()));
666 }
667 }
668 if column_names.len() > MAX_NUM_COLUMNS {
669 return Err(PlanError::TooManyColumns {
670 max_num_columns: MAX_NUM_COLUMNS,
671 req_num_columns: column_names.len(),
672 });
673 }
674
675 let typ = RelationType::new(column_ty);
676 let desc = RelationDesc::new(typ, column_names);
677
678 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
680 let full_name = scx.catalog.resolve_full_name(&name);
681 let partial_name = PartialItemName::from(full_name.clone());
682 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
683 return Err(PlanError::ItemAlreadyExists {
684 name: full_name.to_string(),
685 item_type: item.item_type(),
686 });
687 }
688
689 let timeline = Timeline::EpochMilliseconds;
692
693 let plan = if is_table {
694 let data_source = DataSourceDesc::Webhook {
695 validate_using,
696 body_format,
697 headers,
698 cluster_id: Some(in_cluster.id()),
699 };
700 let data_source = TableDataSource::DataSource {
701 desc: data_source,
702 timeline,
703 };
704 Plan::CreateTable(CreateTablePlan {
705 name,
706 if_not_exists,
707 table: Table {
708 create_sql,
709 desc: VersionedRelationDesc::new(desc),
710 temporary: false,
711 compaction_window: None,
712 data_source,
713 },
714 })
715 } else {
716 let data_source = DataSourceDesc::Webhook {
717 validate_using,
718 body_format,
719 headers,
720 cluster_id: None,
722 };
723 Plan::CreateSource(CreateSourcePlan {
724 name,
725 source: Source {
726 create_sql,
727 data_source,
728 desc,
729 compaction_window: None,
730 },
731 if_not_exists,
732 timeline,
733 in_cluster: Some(in_cluster.id()),
734 })
735 };
736
737 Ok(plan)
738}
739
740pub fn plan_create_source(
741 scx: &StatementContext,
742 mut stmt: CreateSourceStatement<Aug>,
743) -> Result<Plan, PlanError> {
744 let CreateSourceStatement {
745 name,
746 in_cluster: _,
747 col_names,
748 connection: source_connection,
749 envelope,
750 if_not_exists,
751 format,
752 key_constraint,
753 include_metadata,
754 with_options,
755 external_references: referenced_subsources,
756 progress_subsource,
757 } = &stmt;
758
759 mz_ore::soft_assert_or_log!(
760 referenced_subsources.is_none(),
761 "referenced subsources must be cleared in purification"
762 );
763
764 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
765 && scx.catalog.system_vars().force_source_table_syntax();
766
767 if force_source_table_syntax {
770 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
771 Err(PlanError::UseTablesForSources(
772 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
773 ))?;
774 }
775 }
776
777 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
778
779 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
780 && include_metadata
781 .iter()
782 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
783 {
784 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
786 }
787 if !matches!(
788 source_connection,
789 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
790 ) && !include_metadata.is_empty()
791 {
792 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
793 }
794
795 let external_connection = match source_connection {
796 CreateSourceConnection::Kafka {
797 connection: connection_name,
798 options,
799 } => {
800 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
801 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
802 sql_bail!(
803 "{} is not a kafka connection",
804 scx.catalog.resolve_full_name(connection_item.name())
805 )
806 }
807
808 let KafkaSourceConfigOptionExtracted {
809 group_id_prefix,
810 topic,
811 topic_metadata_refresh_interval,
812 start_timestamp: _, start_offset,
814 seen: _,
815 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
816
817 let topic = topic.expect("validated exists during purification");
818
819 let mut start_offsets = BTreeMap::new();
820 if let Some(offsets) = start_offset {
821 for (part, offset) in offsets.iter().enumerate() {
822 if *offset < 0 {
823 sql_bail!("START OFFSET must be a nonnegative integer");
824 }
825 start_offsets.insert(i32::try_from(part)?, *offset);
826 }
827 }
828
829 if !start_offsets.is_empty() && envelope.requires_all_input() {
830 sql_bail!("START OFFSET is not supported with ENVELOPE {}", envelope)
831 }
832
833 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
834 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
837 }
838
839 if !include_metadata.is_empty()
840 && !matches!(
841 envelope,
842 ast::SourceEnvelope::Upsert { .. }
843 | ast::SourceEnvelope::None
844 | ast::SourceEnvelope::Debezium
845 )
846 {
847 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
849 }
850
851 let metadata_columns = include_metadata
855 .into_iter()
856 .flat_map(|item| match item {
857 SourceIncludeMetadata::Timestamp { alias } => {
858 let name = match alias {
859 Some(name) => name.to_string(),
860 None => "timestamp".to_owned(),
861 };
862 Some((name, KafkaMetadataKind::Timestamp))
863 }
864 SourceIncludeMetadata::Partition { alias } => {
865 let name = match alias {
866 Some(name) => name.to_string(),
867 None => "partition".to_owned(),
868 };
869 Some((name, KafkaMetadataKind::Partition))
870 }
871 SourceIncludeMetadata::Offset { alias } => {
872 let name = match alias {
873 Some(name) => name.to_string(),
874 None => "offset".to_owned(),
875 };
876 Some((name, KafkaMetadataKind::Offset))
877 }
878 SourceIncludeMetadata::Headers { alias } => {
879 let name = match alias {
880 Some(name) => name.to_string(),
881 None => "headers".to_owned(),
882 };
883 Some((name, KafkaMetadataKind::Headers))
884 }
885 SourceIncludeMetadata::Header {
886 alias,
887 key,
888 use_bytes,
889 } => Some((
890 alias.to_string(),
891 KafkaMetadataKind::Header {
892 key: key.clone(),
893 use_bytes: *use_bytes,
894 },
895 )),
896 SourceIncludeMetadata::Key { .. } => {
897 None
899 }
900 })
901 .collect();
902
903 let connection = KafkaSourceConnection::<ReferencedConnection> {
904 connection: connection_item.id(),
905 connection_id: connection_item.id(),
906 topic,
907 start_offsets,
908 group_id_prefix,
909 topic_metadata_refresh_interval,
910 metadata_columns,
911 };
912
913 GenericSourceConnection::Kafka(connection)
914 }
915 CreateSourceConnection::Postgres {
916 connection,
917 options,
918 }
919 | CreateSourceConnection::Yugabyte {
920 connection,
921 options,
922 } => {
923 let (source_flavor, flavor_name) = match source_connection {
924 CreateSourceConnection::Postgres { .. } => (PostgresFlavor::Vanilla, "PostgreSQL"),
925 CreateSourceConnection::Yugabyte { .. } => (PostgresFlavor::Yugabyte, "Yugabyte"),
926 _ => unreachable!(),
927 };
928
929 let connection_item = scx.get_item_by_resolved_name(connection)?;
930 let connection_mismatch = match connection_item.connection()? {
931 Connection::Postgres(conn) => source_flavor != conn.flavor,
932 _ => true,
933 };
934 if connection_mismatch {
935 sql_bail!(
936 "{} is not a {flavor_name} connection",
937 scx.catalog.resolve_full_name(connection_item.name())
938 )
939 }
940
941 let PgConfigOptionExtracted {
942 details,
943 publication,
944 text_columns: _,
948 seen: _,
949 } = options.clone().try_into()?;
950
951 let details = details
952 .as_ref()
953 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
954 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
955 let details = ProtoPostgresSourcePublicationDetails::decode(&*details)
956 .map_err(|e| sql_err!("{}", e))?;
957
958 let publication_details = PostgresSourcePublicationDetails::from_proto(details)
959 .map_err(|e| sql_err!("{}", e))?;
960
961 let connection =
962 GenericSourceConnection::<ReferencedConnection>::from(PostgresSourceConnection {
963 connection: connection_item.id(),
964 connection_id: connection_item.id(),
965 publication: publication.expect("validated exists during purification"),
966 publication_details,
967 });
968
969 connection
970 }
971 CreateSourceConnection::SqlServer {
972 connection,
973 options: _,
974 } => {
975 let connection_item = scx.get_item_by_resolved_name(connection)?;
976 match connection_item.connection()? {
977 Connection::SqlServer(connection) => connection,
978 _ => sql_bail!(
979 "{} is not a SQL Server connection",
980 scx.catalog.resolve_full_name(connection_item.name())
981 ),
982 };
983 let extras = SqlServerSourceExtras {};
986 let connection =
987 GenericSourceConnection::<ReferencedConnection>::from(SqlServerSource {
988 catalog_id: connection_item.id(),
989 connection: connection_item.id(),
990 extras,
991 });
992
993 connection
994 }
995 CreateSourceConnection::MySql {
996 connection,
997 options,
998 } => {
999 let connection_item = scx.get_item_by_resolved_name(connection)?;
1000 match connection_item.connection()? {
1001 Connection::MySql(connection) => connection,
1002 _ => sql_bail!(
1003 "{} is not a MySQL connection",
1004 scx.catalog.resolve_full_name(connection_item.name())
1005 ),
1006 };
1007 let MySqlConfigOptionExtracted {
1008 details,
1009 text_columns: _,
1013 exclude_columns: _,
1014 seen: _,
1015 } = options.clone().try_into()?;
1016
1017 let details = details
1018 .as_ref()
1019 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1020 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1021 let details =
1022 ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1023 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1024
1025 let connection =
1026 GenericSourceConnection::<ReferencedConnection>::from(MySqlSourceConnection {
1027 connection: connection_item.id(),
1028 connection_id: connection_item.id(),
1029 details,
1030 });
1031
1032 connection
1033 }
1034 CreateSourceConnection::LoadGenerator { generator, options } => {
1035 let load_generator =
1036 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1037
1038 let LoadGeneratorOptionExtracted {
1039 tick_interval,
1040 as_of,
1041 up_to,
1042 ..
1043 } = options.clone().try_into()?;
1044 let tick_micros = match tick_interval {
1045 Some(interval) => Some(interval.as_micros().try_into()?),
1046 None => None,
1047 };
1048
1049 if up_to < as_of {
1050 sql_bail!("UP TO cannot be less than AS OF");
1051 }
1052
1053 let connection = GenericSourceConnection::from(LoadGeneratorSourceConnection {
1054 load_generator,
1055 tick_micros,
1056 as_of,
1057 up_to,
1058 });
1059
1060 connection
1061 }
1062 };
1063
1064 let CreateSourceOptionExtracted {
1065 timestamp_interval,
1066 retain_history,
1067 seen: _,
1068 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
1069
1070 let metadata_columns_desc = match external_connection {
1071 GenericSourceConnection::Kafka(KafkaSourceConnection {
1072 ref metadata_columns,
1073 ..
1074 }) => kafka_metadata_columns_desc(metadata_columns),
1075 _ => vec![],
1076 };
1077
1078 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1080 scx,
1081 &envelope,
1082 format,
1083 Some(external_connection.default_key_desc()),
1084 external_connection.default_value_desc(),
1085 include_metadata,
1086 metadata_columns_desc,
1087 &external_connection,
1088 )?;
1089 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
1090
1091 let names: Vec<_> = desc.iter_names().cloned().collect();
1092 if let Some(dup) = names.iter().duplicates().next() {
1093 sql_bail!("column {} specified more than once", dup.quoted());
1094 }
1095
1096 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
1098 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
1101
1102 let key_columns = columns
1103 .into_iter()
1104 .map(normalize::column_name)
1105 .collect::<Vec<_>>();
1106
1107 let mut uniq = BTreeSet::new();
1108 for col in key_columns.iter() {
1109 if !uniq.insert(col) {
1110 sql_bail!("Repeated column name in source key constraint: {}", col);
1111 }
1112 }
1113
1114 let key_indices = key_columns
1115 .iter()
1116 .map(|col| {
1117 let name_idx = desc
1118 .get_by_name(col)
1119 .map(|(idx, _type)| idx)
1120 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
1121 if desc.get_unambiguous_name(name_idx).is_none() {
1122 sql_bail!("Ambiguous column in source key constraint: {}", col);
1123 }
1124 Ok(name_idx)
1125 })
1126 .collect::<Result<Vec<_>, _>>()?;
1127
1128 if !desc.typ().keys.is_empty() {
1129 return Err(key_constraint_err(&desc, &key_columns));
1130 } else {
1131 desc = desc.with_key(key_indices);
1132 }
1133 }
1134
1135 let timestamp_interval = match timestamp_interval {
1136 Some(duration) => {
1137 let min = scx.catalog.system_vars().min_timestamp_interval();
1138 let max = scx.catalog.system_vars().max_timestamp_interval();
1139 if duration < min || duration > max {
1140 return Err(PlanError::InvalidTimestampInterval {
1141 min,
1142 max,
1143 requested: duration,
1144 });
1145 }
1146 duration
1147 }
1148 None => scx.catalog.config().timestamp_interval,
1149 };
1150
1151 let (primary_export, primary_export_details) = if force_source_table_syntax {
1152 (
1153 SourceExportDataConfig {
1154 encoding: None,
1155 envelope: SourceEnvelope::None(NoneEnvelope {
1156 key_envelope: KeyEnvelope::None,
1157 key_arity: 0,
1158 }),
1159 },
1160 SourceExportDetails::None,
1161 )
1162 } else {
1163 (
1164 SourceExportDataConfig {
1165 encoding,
1166 envelope: envelope.clone(),
1167 },
1168 external_connection.primary_export_details(),
1169 )
1170 };
1171 let source_desc = SourceDesc::<ReferencedConnection> {
1172 connection: external_connection,
1173 primary_export,
1177 primary_export_details,
1178 timestamp_interval,
1179 };
1180
1181 let progress_subsource = match progress_subsource {
1182 Some(name) => match name {
1183 DeferredItemName::Named(name) => match name {
1184 ResolvedItemName::Item { id, .. } => *id,
1185 ResolvedItemName::Cte { .. }
1186 | ResolvedItemName::ContinualTask { .. }
1187 | ResolvedItemName::Error => {
1188 sql_bail!("[internal error] invalid target id")
1189 }
1190 },
1191 DeferredItemName::Deferred(_) => {
1192 sql_bail!("[internal error] progress subsource must be named during purification")
1193 }
1194 },
1195 _ => sql_bail!("[internal error] progress subsource must be named during purification"),
1196 };
1197
1198 let if_not_exists = *if_not_exists;
1199 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1200
1201 let full_name = scx.catalog.resolve_full_name(&name);
1203 let partial_name = PartialItemName::from(full_name.clone());
1204 if let (false, Ok(item)) = (
1207 if_not_exists,
1208 scx.catalog.resolve_item_or_type(&partial_name),
1209 ) {
1210 return Err(PlanError::ItemAlreadyExists {
1211 name: full_name.to_string(),
1212 item_type: item.item_type(),
1213 });
1214 }
1215
1216 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
1220
1221 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
1222
1223 let timeline = match envelope {
1225 SourceEnvelope::CdcV2 => {
1226 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1227 }
1228 _ => Timeline::EpochMilliseconds,
1229 };
1230
1231 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1232 let source = Source {
1233 create_sql,
1234 data_source: DataSourceDesc::Ingestion(Ingestion {
1235 desc: source_desc,
1236 progress_subsource,
1237 }),
1238 desc,
1239 compaction_window,
1240 };
1241
1242 Ok(Plan::CreateSource(CreateSourcePlan {
1243 name,
1244 source,
1245 if_not_exists,
1246 timeline,
1247 in_cluster: Some(in_cluster.id()),
1248 }))
1249}
1250
1251fn apply_source_envelope_encoding(
1252 scx: &StatementContext,
1253 envelope: &ast::SourceEnvelope,
1254 format: &Option<FormatSpecifier<Aug>>,
1255 key_desc: Option<RelationDesc>,
1256 value_desc: RelationDesc,
1257 include_metadata: &[SourceIncludeMetadata],
1258 metadata_columns_desc: Vec<(&str, ColumnType)>,
1259 source_connection: &GenericSourceConnection<ReferencedConnection>,
1260) -> Result<
1261 (
1262 RelationDesc,
1263 SourceEnvelope,
1264 Option<SourceDataEncoding<ReferencedConnection>>,
1265 ),
1266 PlanError,
1267> {
1268 let encoding = match format {
1269 Some(format) => Some(get_encoding(scx, format, envelope)?),
1270 None => None,
1271 };
1272
1273 let (key_desc, value_desc) = match &encoding {
1274 Some(encoding) => {
1275 match value_desc.typ().columns() {
1278 [typ] => match typ.scalar_type {
1279 ScalarType::Bytes => {}
1280 _ => sql_bail!(
1281 "The schema produced by the source is incompatible with format decoding"
1282 ),
1283 },
1284 _ => sql_bail!(
1285 "The schema produced by the source is incompatible with format decoding"
1286 ),
1287 }
1288
1289 let (key_desc, value_desc) = encoding.desc()?;
1290
1291 let key_desc = key_desc.map(|desc| {
1298 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1299 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1300 if is_kafka && is_envelope_none {
1301 RelationDesc::from_names_and_types(
1302 desc.into_iter()
1303 .map(|(name, typ)| (name, typ.nullable(true))),
1304 )
1305 } else {
1306 desc
1307 }
1308 });
1309 (key_desc, value_desc)
1310 }
1311 None => (key_desc, value_desc),
1312 };
1313
1314 let key_envelope_no_encoding = matches!(
1327 source_connection,
1328 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1329 load_generator: LoadGenerator::KeyValue(_),
1330 ..
1331 })
1332 );
1333 let mut key_envelope = get_key_envelope(
1334 include_metadata,
1335 encoding.as_ref(),
1336 key_envelope_no_encoding,
1337 )?;
1338
1339 match (&envelope, &key_envelope) {
1340 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1341 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1342 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1343 ),
1344 _ => {}
1345 };
1346
1347 let envelope = match &envelope {
1356 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1358 ast::SourceEnvelope::Debezium => {
1359 let after_idx = match typecheck_debezium(&value_desc) {
1361 Ok((_before_idx, after_idx)) => Ok(after_idx),
1362 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1363 Some(DataEncoding::Avro(_)) => Err(type_err),
1364 _ => Err(sql_err!(
1365 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1366 )),
1367 },
1368 }?;
1369
1370 UnplannedSourceEnvelope::Upsert {
1371 style: UpsertStyle::Debezium { after_idx },
1372 }
1373 }
1374 ast::SourceEnvelope::Upsert {
1375 value_decode_err_policy,
1376 } => {
1377 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1378 None => {
1379 if !key_envelope_no_encoding {
1380 bail_unsupported!(format!(
1381 "UPSERT requires a key/value format: {:?}",
1382 format
1383 ))
1384 }
1385 None
1386 }
1387 Some(key_encoding) => Some(key_encoding),
1388 };
1389 if key_envelope == KeyEnvelope::None {
1392 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1393 }
1394 let style = match value_decode_err_policy.as_slice() {
1396 [] => UpsertStyle::Default(key_envelope),
1397 [SourceErrorPolicy::Inline { alias }] => {
1398 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1399 UpsertStyle::ValueErrInline {
1400 key_envelope,
1401 error_column: alias
1402 .as_ref()
1403 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1404 }
1405 }
1406 _ => {
1407 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1408 }
1409 };
1410
1411 UnplannedSourceEnvelope::Upsert { style }
1412 }
1413 ast::SourceEnvelope::CdcV2 => {
1414 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1415 match format {
1417 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1418 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1419 }
1420 UnplannedSourceEnvelope::CdcV2
1421 }
1422 };
1423
1424 let metadata_desc = included_column_desc(metadata_columns_desc);
1425 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1426
1427 Ok((desc, envelope, encoding))
1428}
1429
1430fn plan_source_export_desc(
1433 scx: &StatementContext,
1434 name: &UnresolvedItemName,
1435 columns: &Vec<ColumnDef<Aug>>,
1436 constraints: &Vec<TableConstraint<Aug>>,
1437) -> Result<RelationDesc, PlanError> {
1438 let names: Vec<_> = columns
1439 .iter()
1440 .map(|c| normalize::column_name(c.name.clone()))
1441 .collect();
1442
1443 if let Some(dup) = names.iter().duplicates().next() {
1444 sql_bail!("column {} specified more than once", dup.quoted());
1445 }
1446
1447 let mut column_types = Vec::with_capacity(columns.len());
1450 let mut keys = Vec::new();
1451
1452 for (i, c) in columns.into_iter().enumerate() {
1453 let aug_data_type = &c.data_type;
1454 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1455 let mut nullable = true;
1456 for option in &c.options {
1457 match &option.option {
1458 ColumnOption::NotNull => nullable = false,
1459 ColumnOption::Default(_) => {
1460 bail_unsupported!("Source export with default value")
1461 }
1462 ColumnOption::Unique { is_primary } => {
1463 keys.push(vec![i]);
1464 if *is_primary {
1465 nullable = false;
1466 }
1467 }
1468 other => {
1469 bail_unsupported!(format!("Source export with column constraint: {}", other))
1470 }
1471 }
1472 }
1473 column_types.push(ty.nullable(nullable));
1474 }
1475
1476 let mut seen_primary = false;
1477 'c: for constraint in constraints {
1478 match constraint {
1479 TableConstraint::Unique {
1480 name: _,
1481 columns,
1482 is_primary,
1483 nulls_not_distinct,
1484 } => {
1485 if seen_primary && *is_primary {
1486 sql_bail!(
1487 "multiple primary keys for source export {} are not allowed",
1488 name.to_ast_string_stable()
1489 );
1490 }
1491 seen_primary = *is_primary || seen_primary;
1492
1493 let mut key = vec![];
1494 for column in columns {
1495 let column = normalize::column_name(column.clone());
1496 match names.iter().position(|name| *name == column) {
1497 None => sql_bail!("unknown column in constraint: {}", column),
1498 Some(i) => {
1499 let nullable = &mut column_types[i].nullable;
1500 if *is_primary {
1501 if *nulls_not_distinct {
1502 sql_bail!(
1503 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1504 );
1505 }
1506 *nullable = false;
1507 } else if !(*nulls_not_distinct || !*nullable) {
1508 break 'c;
1511 }
1512
1513 key.push(i);
1514 }
1515 }
1516 }
1517
1518 if *is_primary {
1519 keys.insert(0, key);
1520 } else {
1521 keys.push(key);
1522 }
1523 }
1524 TableConstraint::ForeignKey { .. } => {
1525 bail_unsupported!("Source export with a foreign key")
1526 }
1527 TableConstraint::Check { .. } => {
1528 bail_unsupported!("Source export with a check constraint")
1529 }
1530 }
1531 }
1532
1533 let typ = RelationType::new(column_types).with_keys(keys);
1534 let desc = RelationDesc::new(typ, names);
1535 Ok(desc)
1536}
1537
1538generate_extracted_config!(
1539 CreateSubsourceOption,
1540 (Progress, bool, Default(false)),
1541 (ExternalReference, UnresolvedItemName),
1542 (TextColumns, Vec::<Ident>, Default(vec![])),
1543 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1544 (Details, String)
1545);
1546
1547pub fn plan_create_subsource(
1548 scx: &StatementContext,
1549 stmt: CreateSubsourceStatement<Aug>,
1550) -> Result<Plan, PlanError> {
1551 let CreateSubsourceStatement {
1552 name,
1553 columns,
1554 of_source,
1555 constraints,
1556 if_not_exists,
1557 with_options,
1558 } = &stmt;
1559
1560 let CreateSubsourceOptionExtracted {
1561 progress,
1562 external_reference,
1563 text_columns,
1564 exclude_columns,
1565 details,
1566 seen: _,
1567 } = with_options.clone().try_into()?;
1568
1569 assert!(
1574 progress ^ (external_reference.is_some() && of_source.is_some()),
1575 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1576 );
1577
1578 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1579
1580 let data_source = if let Some(source_reference) = of_source {
1581 if scx.catalog.system_vars().enable_create_table_from_source()
1584 && scx.catalog.system_vars().force_source_table_syntax()
1585 {
1586 Err(PlanError::UseTablesForSources(
1587 "CREATE SUBSOURCE".to_string(),
1588 ))?;
1589 }
1590
1591 let ingestion_id = *source_reference.item_id();
1594 let external_reference = external_reference.unwrap();
1595
1596 let details = details
1599 .as_ref()
1600 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1601 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1602 let details =
1603 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1604 let details =
1605 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1606 let details = match details {
1607 SourceExportStatementDetails::Postgres { table } => {
1608 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1609 column_casts: crate::pure::postgres::generate_column_casts(
1610 scx,
1611 &table,
1612 &text_columns,
1613 )?,
1614 table,
1615 })
1616 }
1617 SourceExportStatementDetails::MySql {
1618 table,
1619 initial_gtid_set,
1620 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1621 table,
1622 initial_gtid_set,
1623 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1624 exclude_columns: exclude_columns
1625 .into_iter()
1626 .map(|c| c.into_string())
1627 .collect(),
1628 }),
1629 SourceExportStatementDetails::SqlServer {
1630 table,
1631 capture_instance,
1632 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1633 capture_instance,
1634 table,
1635 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1636 exclude_columns: exclude_columns
1637 .into_iter()
1638 .map(|c| c.into_string())
1639 .collect(),
1640 }),
1641 SourceExportStatementDetails::LoadGenerator { output } => {
1642 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1643 }
1644 SourceExportStatementDetails::Kafka {} => {
1645 bail_unsupported!("subsources cannot reference Kafka sources")
1646 }
1647 };
1648 DataSourceDesc::IngestionExport {
1649 ingestion_id,
1650 external_reference,
1651 details,
1652 data_config: SourceExportDataConfig {
1654 envelope: SourceEnvelope::None(NoneEnvelope {
1655 key_envelope: KeyEnvelope::None,
1656 key_arity: 0,
1657 }),
1658 encoding: None,
1659 },
1660 }
1661 } else if progress {
1662 DataSourceDesc::Progress
1663 } else {
1664 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1665 };
1666
1667 let if_not_exists = *if_not_exists;
1668 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1669
1670 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1671
1672 let source = Source {
1673 create_sql,
1674 data_source,
1675 desc,
1676 compaction_window: None,
1677 };
1678
1679 Ok(Plan::CreateSource(CreateSourcePlan {
1680 name,
1681 source,
1682 if_not_exists,
1683 timeline: Timeline::EpochMilliseconds,
1684 in_cluster: None,
1685 }))
1686}
1687
1688generate_extracted_config!(
1689 TableFromSourceOption,
1690 (TextColumns, Vec::<Ident>, Default(vec![])),
1691 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1692 (PartitionBy, Vec<Ident>),
1693 (Details, String)
1694);
1695
1696pub fn plan_create_table_from_source(
1697 scx: &StatementContext,
1698 stmt: CreateTableFromSourceStatement<Aug>,
1699) -> Result<Plan, PlanError> {
1700 if !scx.catalog.system_vars().enable_create_table_from_source() {
1701 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1702 }
1703
1704 let CreateTableFromSourceStatement {
1705 name,
1706 columns,
1707 constraints,
1708 if_not_exists,
1709 source,
1710 external_reference,
1711 envelope,
1712 format,
1713 include_metadata,
1714 with_options,
1715 } = &stmt;
1716
1717 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1718
1719 let TableFromSourceOptionExtracted {
1720 text_columns,
1721 exclude_columns,
1722 partition_by,
1723 details,
1724 seen: _,
1725 } = with_options.clone().try_into()?;
1726
1727 let source_item = scx.get_item_by_resolved_name(source)?;
1728 let ingestion_id = source_item.id();
1729
1730 let details = details
1733 .as_ref()
1734 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1735 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1736 let details =
1737 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1738 let details =
1739 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1740
1741 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1742 && include_metadata
1743 .iter()
1744 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1745 {
1746 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1748 }
1749 if !matches!(
1750 details,
1751 SourceExportStatementDetails::Kafka { .. }
1752 | SourceExportStatementDetails::LoadGenerator { .. }
1753 ) && !include_metadata.is_empty()
1754 {
1755 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1756 }
1757
1758 let details = match details {
1759 SourceExportStatementDetails::Postgres { table } => {
1760 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1761 column_casts: crate::pure::postgres::generate_column_casts(
1762 scx,
1763 &table,
1764 &text_columns,
1765 )?,
1766 table,
1767 })
1768 }
1769 SourceExportStatementDetails::MySql {
1770 table,
1771 initial_gtid_set,
1772 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1773 table,
1774 initial_gtid_set,
1775 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1776 exclude_columns: exclude_columns
1777 .into_iter()
1778 .map(|c| c.into_string())
1779 .collect(),
1780 }),
1781 SourceExportStatementDetails::SqlServer {
1782 table,
1783 capture_instance,
1784 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1785 table,
1786 capture_instance,
1787 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1788 exclude_columns: exclude_columns
1789 .into_iter()
1790 .map(|c| c.into_string())
1791 .collect(),
1792 }),
1793 SourceExportStatementDetails::LoadGenerator { output } => {
1794 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1795 }
1796 SourceExportStatementDetails::Kafka {} => {
1797 if !include_metadata.is_empty()
1798 && !matches!(
1799 envelope,
1800 ast::SourceEnvelope::Upsert { .. }
1801 | ast::SourceEnvelope::None
1802 | ast::SourceEnvelope::Debezium
1803 )
1804 {
1805 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1807 }
1808
1809 let metadata_columns = include_metadata
1810 .into_iter()
1811 .flat_map(|item| match item {
1812 SourceIncludeMetadata::Timestamp { alias } => {
1813 let name = match alias {
1814 Some(name) => name.to_string(),
1815 None => "timestamp".to_owned(),
1816 };
1817 Some((name, KafkaMetadataKind::Timestamp))
1818 }
1819 SourceIncludeMetadata::Partition { alias } => {
1820 let name = match alias {
1821 Some(name) => name.to_string(),
1822 None => "partition".to_owned(),
1823 };
1824 Some((name, KafkaMetadataKind::Partition))
1825 }
1826 SourceIncludeMetadata::Offset { alias } => {
1827 let name = match alias {
1828 Some(name) => name.to_string(),
1829 None => "offset".to_owned(),
1830 };
1831 Some((name, KafkaMetadataKind::Offset))
1832 }
1833 SourceIncludeMetadata::Headers { alias } => {
1834 let name = match alias {
1835 Some(name) => name.to_string(),
1836 None => "headers".to_owned(),
1837 };
1838 Some((name, KafkaMetadataKind::Headers))
1839 }
1840 SourceIncludeMetadata::Header {
1841 alias,
1842 key,
1843 use_bytes,
1844 } => Some((
1845 alias.to_string(),
1846 KafkaMetadataKind::Header {
1847 key: key.clone(),
1848 use_bytes: *use_bytes,
1849 },
1850 )),
1851 SourceIncludeMetadata::Key { .. } => {
1852 None
1854 }
1855 })
1856 .collect();
1857
1858 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1859 }
1860 };
1861
1862 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1863
1864 let (key_desc, value_desc) =
1869 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1870 let columns = match columns {
1871 TableFromSourceColumns::Defined(columns) => columns,
1872 _ => unreachable!(),
1873 };
1874 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1875 (None, desc)
1876 } else {
1877 let key_desc = source_connection.default_key_desc();
1878 let value_desc = source_connection.default_value_desc();
1879 (Some(key_desc), value_desc)
1880 };
1881
1882 let metadata_columns_desc = match &details {
1883 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1884 metadata_columns, ..
1885 }) => kafka_metadata_columns_desc(metadata_columns),
1886 _ => vec![],
1887 };
1888
1889 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1890 scx,
1891 &envelope,
1892 format,
1893 key_desc,
1894 value_desc,
1895 include_metadata,
1896 metadata_columns_desc,
1897 source_connection,
1898 )?;
1899 if let TableFromSourceColumns::Named(col_names) = columns {
1900 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1901 }
1902
1903 let names: Vec<_> = desc.iter_names().cloned().collect();
1904 if let Some(dup) = names.iter().duplicates().next() {
1905 sql_bail!("column {} specified more than once", dup.quoted());
1906 }
1907
1908 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1909
1910 let timeline = match envelope {
1913 SourceEnvelope::CdcV2 => {
1914 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1915 }
1916 _ => Timeline::EpochMilliseconds,
1917 };
1918
1919 if let Some(partition_by) = partition_by {
1920 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1921 check_partition_by(&desc, partition_by)?;
1922 }
1923
1924 let data_source = DataSourceDesc::IngestionExport {
1925 ingestion_id,
1926 external_reference: external_reference
1927 .as_ref()
1928 .expect("populated in purification")
1929 .clone(),
1930 details,
1931 data_config: SourceExportDataConfig { envelope, encoding },
1932 };
1933
1934 let if_not_exists = *if_not_exists;
1935
1936 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1937
1938 let table = Table {
1939 create_sql,
1940 desc: VersionedRelationDesc::new(desc),
1941 temporary: false,
1942 compaction_window: None,
1943 data_source: TableDataSource::DataSource {
1944 desc: data_source,
1945 timeline,
1946 },
1947 };
1948
1949 Ok(Plan::CreateTable(CreateTablePlan {
1950 name,
1951 table,
1952 if_not_exists,
1953 }))
1954}
1955
1956generate_extracted_config!(
1957 LoadGeneratorOption,
1958 (TickInterval, Duration),
1959 (AsOf, u64, Default(0_u64)),
1960 (UpTo, u64, Default(u64::MAX)),
1961 (ScaleFactor, f64),
1962 (MaxCardinality, u64),
1963 (Keys, u64),
1964 (SnapshotRounds, u64),
1965 (TransactionalSnapshot, bool),
1966 (ValueSize, u64),
1967 (Seed, u64),
1968 (Partitions, u64),
1969 (BatchSize, u64)
1970);
1971
1972impl LoadGeneratorOptionExtracted {
1973 pub(super) fn ensure_only_valid_options(
1974 &self,
1975 loadgen: &ast::LoadGenerator,
1976 ) -> Result<(), PlanError> {
1977 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
1978
1979 let mut options = self.seen.clone();
1980
1981 let permitted_options: &[_] = match loadgen {
1982 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
1983 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
1984 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
1985 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
1986 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
1987 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
1988 ast::LoadGenerator::KeyValue => &[
1989 TickInterval,
1990 Keys,
1991 SnapshotRounds,
1992 TransactionalSnapshot,
1993 ValueSize,
1994 Seed,
1995 Partitions,
1996 BatchSize,
1997 ],
1998 };
1999
2000 for o in permitted_options {
2001 options.remove(o);
2002 }
2003
2004 if !options.is_empty() {
2005 sql_bail!(
2006 "{} load generators do not support {} values",
2007 loadgen,
2008 options.iter().join(", ")
2009 )
2010 }
2011
2012 Ok(())
2013 }
2014}
2015
2016pub(crate) fn load_generator_ast_to_generator(
2017 scx: &StatementContext,
2018 loadgen: &ast::LoadGenerator,
2019 options: &[LoadGeneratorOption<Aug>],
2020 include_metadata: &[SourceIncludeMetadata],
2021) -> Result<LoadGenerator, PlanError> {
2022 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2023 extracted.ensure_only_valid_options(loadgen)?;
2024
2025 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2026 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2027 }
2028
2029 let load_generator = match loadgen {
2030 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2031 ast::LoadGenerator::Clock => {
2032 scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2033 LoadGenerator::Clock
2034 }
2035 ast::LoadGenerator::Counter => {
2036 let LoadGeneratorOptionExtracted {
2037 max_cardinality, ..
2038 } = extracted;
2039 LoadGenerator::Counter { max_cardinality }
2040 }
2041 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2042 ast::LoadGenerator::Datums => LoadGenerator::Datums,
2043 ast::LoadGenerator::Tpch => {
2044 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2045
2046 let sf: f64 = scale_factor.unwrap_or(0.01);
2048 if !sf.is_finite() || sf < 0.0 {
2049 sql_bail!("unsupported scale factor {sf}");
2050 }
2051
2052 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2053 let total = (sf * multiplier).floor();
2054 let mut i = i64::try_cast_from(total)
2055 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2056 if i < 1 {
2057 i = 1;
2058 }
2059 Ok(i)
2060 };
2061
2062 let count_supplier = f_to_i(10_000f64)?;
2065 let count_part = f_to_i(200_000f64)?;
2066 let count_customer = f_to_i(150_000f64)?;
2067 let count_orders = f_to_i(150_000f64 * 10f64)?;
2068 let count_clerk = f_to_i(1_000f64)?;
2069
2070 LoadGenerator::Tpch {
2071 count_supplier,
2072 count_part,
2073 count_customer,
2074 count_orders,
2075 count_clerk,
2076 }
2077 }
2078 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2079 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2080 let LoadGeneratorOptionExtracted {
2081 keys,
2082 snapshot_rounds,
2083 transactional_snapshot,
2084 value_size,
2085 tick_interval,
2086 seed,
2087 partitions,
2088 batch_size,
2089 ..
2090 } = extracted;
2091
2092 let mut include_offset = None;
2093 for im in include_metadata {
2094 match im {
2095 SourceIncludeMetadata::Offset { alias } => {
2096 include_offset = match alias {
2097 Some(alias) => Some(alias.to_string()),
2098 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2099 }
2100 }
2101 SourceIncludeMetadata::Key { .. } => continue,
2102
2103 _ => {
2104 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2105 }
2106 };
2107 }
2108
2109 let lgkv = KeyValueLoadGenerator {
2110 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2111 snapshot_rounds: snapshot_rounds
2112 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2113 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2115 value_size: value_size
2116 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2117 partitions: partitions
2118 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2119 tick_interval,
2120 batch_size: batch_size
2121 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2122 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2123 include_offset,
2124 };
2125
2126 if lgkv.keys == 0
2127 || lgkv.partitions == 0
2128 || lgkv.value_size == 0
2129 || lgkv.batch_size == 0
2130 {
2131 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2132 }
2133
2134 if lgkv.keys % lgkv.partitions != 0 {
2135 sql_bail!("KEYS must be a multiple of PARTITIONS")
2136 }
2137
2138 if lgkv.batch_size > lgkv.keys {
2139 sql_bail!("KEYS must be larger than BATCH SIZE")
2140 }
2141
2142 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2145 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2146 }
2147
2148 if lgkv.snapshot_rounds == 0 {
2149 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2150 }
2151
2152 LoadGenerator::KeyValue(lgkv)
2153 }
2154 };
2155
2156 Ok(load_generator)
2157}
2158
2159fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2160 let before = value_desc.get_by_name(&"before".into());
2161 let (after_idx, after_ty) = value_desc
2162 .get_by_name(&"after".into())
2163 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2164 let before_idx = if let Some((before_idx, before_ty)) = before {
2165 if !matches!(before_ty.scalar_type, ScalarType::Record { .. }) {
2166 sql_bail!("'before' column must be of type record");
2167 }
2168 if before_ty != after_ty {
2169 sql_bail!("'before' type differs from 'after' column");
2170 }
2171 Some(before_idx)
2172 } else {
2173 None
2174 };
2175 Ok((before_idx, after_idx))
2176}
2177
2178fn get_encoding(
2179 scx: &StatementContext,
2180 format: &FormatSpecifier<Aug>,
2181 envelope: &ast::SourceEnvelope,
2182) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2183 let encoding = match format {
2184 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2185 FormatSpecifier::KeyValue { key, value } => {
2186 let key = {
2187 let encoding = get_encoding_inner(scx, key)?;
2188 Some(encoding.key.unwrap_or(encoding.value))
2189 };
2190 let value = get_encoding_inner(scx, value)?.value;
2191 SourceDataEncoding { key, value }
2192 }
2193 };
2194
2195 let requires_keyvalue = matches!(
2196 envelope,
2197 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2198 );
2199 let is_keyvalue = encoding.key.is_some();
2200 if requires_keyvalue && !is_keyvalue {
2201 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2202 };
2203
2204 Ok(encoding)
2205}
2206
2207fn source_sink_cluster_config<'a, 'ctx>(
2213 scx: &'a StatementContext<'ctx>,
2214 in_cluster: &mut Option<ResolvedClusterName>,
2215) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2216 let cluster = match in_cluster {
2217 None => {
2218 let cluster = scx.catalog.resolve_cluster(None)?;
2219 *in_cluster = Some(ResolvedClusterName {
2220 id: cluster.id(),
2221 print_name: None,
2222 });
2223 cluster
2224 }
2225 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2226 };
2227
2228 Ok(cluster)
2229}
2230
2231generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2232
2233#[derive(Debug)]
2234pub struct Schema {
2235 pub key_schema: Option<String>,
2236 pub value_schema: String,
2237 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2238 pub confluent_wire_format: bool,
2239}
2240
2241fn get_encoding_inner(
2242 scx: &StatementContext,
2243 format: &Format<Aug>,
2244) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2245 let value = match format {
2246 Format::Bytes => DataEncoding::Bytes,
2247 Format::Avro(schema) => {
2248 let Schema {
2249 key_schema,
2250 value_schema,
2251 csr_connection,
2252 confluent_wire_format,
2253 } = match schema {
2254 AvroSchema::InlineSchema {
2257 schema: ast::Schema { schema },
2258 with_options,
2259 } => {
2260 let AvroSchemaOptionExtracted {
2261 confluent_wire_format,
2262 ..
2263 } = with_options.clone().try_into()?;
2264
2265 Schema {
2266 key_schema: None,
2267 value_schema: schema.clone(),
2268 csr_connection: None,
2269 confluent_wire_format,
2270 }
2271 }
2272 AvroSchema::Csr {
2273 csr_connection:
2274 CsrConnectionAvro {
2275 connection,
2276 seed,
2277 key_strategy: _,
2278 value_strategy: _,
2279 },
2280 } => {
2281 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2282 let csr_connection = match item.connection()? {
2283 Connection::Csr(_) => item.id(),
2284 _ => {
2285 sql_bail!(
2286 "{} is not a schema registry connection",
2287 scx.catalog
2288 .resolve_full_name(item.name())
2289 .to_string()
2290 .quoted()
2291 )
2292 }
2293 };
2294
2295 if let Some(seed) = seed {
2296 Schema {
2297 key_schema: seed.key_schema.clone(),
2298 value_schema: seed.value_schema.clone(),
2299 csr_connection: Some(csr_connection),
2300 confluent_wire_format: true,
2301 }
2302 } else {
2303 unreachable!("CSR seed resolution should already have been called: Avro")
2304 }
2305 }
2306 };
2307
2308 if let Some(key_schema) = key_schema {
2309 return Ok(SourceDataEncoding {
2310 key: Some(DataEncoding::Avro(AvroEncoding {
2311 schema: key_schema,
2312 csr_connection: csr_connection.clone(),
2313 confluent_wire_format,
2314 })),
2315 value: DataEncoding::Avro(AvroEncoding {
2316 schema: value_schema,
2317 csr_connection,
2318 confluent_wire_format,
2319 }),
2320 });
2321 } else {
2322 DataEncoding::Avro(AvroEncoding {
2323 schema: value_schema,
2324 csr_connection,
2325 confluent_wire_format,
2326 })
2327 }
2328 }
2329 Format::Protobuf(schema) => match schema {
2330 ProtobufSchema::Csr {
2331 csr_connection:
2332 CsrConnectionProtobuf {
2333 connection:
2334 CsrConnection {
2335 connection,
2336 options,
2337 },
2338 seed,
2339 },
2340 } => {
2341 if let Some(CsrSeedProtobuf { key, value }) = seed {
2342 let item = scx.get_item_by_resolved_name(connection)?;
2343 let _ = match item.connection()? {
2344 Connection::Csr(connection) => connection,
2345 _ => {
2346 sql_bail!(
2347 "{} is not a schema registry connection",
2348 scx.catalog
2349 .resolve_full_name(item.name())
2350 .to_string()
2351 .quoted()
2352 )
2353 }
2354 };
2355
2356 if !options.is_empty() {
2357 sql_bail!("Protobuf CSR connections do not support any options");
2358 }
2359
2360 let value = DataEncoding::Protobuf(ProtobufEncoding {
2361 descriptors: strconv::parse_bytes(&value.schema)?,
2362 message_name: value.message_name.clone(),
2363 confluent_wire_format: true,
2364 });
2365 if let Some(key) = key {
2366 return Ok(SourceDataEncoding {
2367 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2368 descriptors: strconv::parse_bytes(&key.schema)?,
2369 message_name: key.message_name.clone(),
2370 confluent_wire_format: true,
2371 })),
2372 value,
2373 });
2374 }
2375 value
2376 } else {
2377 unreachable!("CSR seed resolution should already have been called: Proto")
2378 }
2379 }
2380 ProtobufSchema::InlineSchema {
2381 message_name,
2382 schema: ast::Schema { schema },
2383 } => {
2384 let descriptors = strconv::parse_bytes(schema)?;
2385
2386 DataEncoding::Protobuf(ProtobufEncoding {
2387 descriptors,
2388 message_name: message_name.to_owned(),
2389 confluent_wire_format: false,
2390 })
2391 }
2392 },
2393 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2394 regex: mz_repr::adt::regex::Regex::new(regex, false)
2395 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2396 }),
2397 Format::Csv { columns, delimiter } => {
2398 let columns = match columns {
2399 CsvColumns::Header { names } => {
2400 if names.is_empty() {
2401 sql_bail!("[internal error] column spec should get names in purify")
2402 }
2403 ColumnSpec::Header {
2404 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2405 }
2406 }
2407 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2408 };
2409 DataEncoding::Csv(CsvEncoding {
2410 columns,
2411 delimiter: u8::try_from(*delimiter)
2412 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2413 })
2414 }
2415 Format::Json { array: false } => DataEncoding::Json,
2416 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2417 Format::Text => DataEncoding::Text,
2418 };
2419 Ok(SourceDataEncoding { key: None, value })
2420}
2421
2422fn get_key_envelope(
2424 included_items: &[SourceIncludeMetadata],
2425 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2426 key_envelope_no_encoding: bool,
2427) -> Result<KeyEnvelope, PlanError> {
2428 let key_definition = included_items
2429 .iter()
2430 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2431 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2432 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2433 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2434 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2435 (Some(name), _) if key_envelope_no_encoding => {
2436 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2437 }
2438 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2439 (_, None) => {
2440 sql_bail!(
2444 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2445 got bare FORMAT"
2446 );
2447 }
2448 }
2449 } else {
2450 Ok(KeyEnvelope::None)
2451 }
2452}
2453
2454fn get_unnamed_key_envelope(
2457 key: Option<&DataEncoding<ReferencedConnection>>,
2458) -> Result<KeyEnvelope, PlanError> {
2459 let is_composite = match key {
2463 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2464 Some(
2465 DataEncoding::Avro(_)
2466 | DataEncoding::Csv(_)
2467 | DataEncoding::Protobuf(_)
2468 | DataEncoding::Regex { .. },
2469 ) => true,
2470 None => false,
2471 };
2472
2473 if is_composite {
2474 Ok(KeyEnvelope::Flattened)
2475 } else {
2476 Ok(KeyEnvelope::Named("key".to_string()))
2477 }
2478}
2479
2480pub fn describe_create_view(
2481 _: &StatementContext,
2482 _: CreateViewStatement<Aug>,
2483) -> Result<StatementDesc, PlanError> {
2484 Ok(StatementDesc::new(None))
2485}
2486
2487pub fn plan_view(
2488 scx: &StatementContext,
2489 def: &mut ViewDefinition<Aug>,
2490 temporary: bool,
2491) -> Result<(QualifiedItemName, View), PlanError> {
2492 let create_sql = normalize::create_statement(
2493 scx,
2494 Statement::CreateView(CreateViewStatement {
2495 if_exists: IfExistsBehavior::Error,
2496 temporary,
2497 definition: def.clone(),
2498 }),
2499 )?;
2500
2501 let ViewDefinition {
2502 name,
2503 columns,
2504 query,
2505 } = def;
2506
2507 let query::PlannedRootQuery {
2508 expr,
2509 mut desc,
2510 finishing,
2511 scope: _,
2512 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2513 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2519 &finishing,
2520 expr.arity()
2521 ));
2522 if expr.contains_parameters()? {
2523 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2524 }
2525
2526 let dependencies = expr
2527 .depends_on()
2528 .into_iter()
2529 .map(|gid| scx.catalog.resolve_item_id(&gid))
2530 .collect();
2531
2532 let name = if temporary {
2533 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2534 } else {
2535 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2536 };
2537
2538 plan_utils::maybe_rename_columns(
2539 format!("view {}", scx.catalog.resolve_full_name(&name)),
2540 &mut desc,
2541 columns,
2542 )?;
2543 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2544
2545 if let Some(dup) = names.iter().duplicates().next() {
2546 sql_bail!("column {} specified more than once", dup.quoted());
2547 }
2548
2549 let view = View {
2550 create_sql,
2551 expr,
2552 dependencies,
2553 column_names: names,
2554 temporary,
2555 };
2556
2557 Ok((name, view))
2558}
2559
2560pub fn plan_create_view(
2561 scx: &StatementContext,
2562 mut stmt: CreateViewStatement<Aug>,
2563) -> Result<Plan, PlanError> {
2564 let CreateViewStatement {
2565 temporary,
2566 if_exists,
2567 definition,
2568 } = &mut stmt;
2569 let (name, view) = plan_view(scx, definition, *temporary)?;
2570
2571 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2574
2575 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2576 let if_exists = true;
2577 let cascade = false;
2578 let maybe_item_to_drop = plan_drop_item(
2579 scx,
2580 ObjectType::View,
2581 if_exists,
2582 definition.name.clone(),
2583 cascade,
2584 )?;
2585
2586 if let Some(id) = maybe_item_to_drop {
2588 let dependencies = view.expr.depends_on();
2589 let invalid_drop = scx
2590 .get_item(&id)
2591 .global_ids()
2592 .any(|gid| dependencies.contains(&gid));
2593 if invalid_drop {
2594 let item = scx.catalog.get_item(&id);
2595 sql_bail!(
2596 "cannot replace view {0}: depended upon by new {0} definition",
2597 scx.catalog.resolve_full_name(item.name())
2598 );
2599 }
2600
2601 Some(id)
2602 } else {
2603 None
2604 }
2605 } else {
2606 None
2607 };
2608 let drop_ids = replace
2609 .map(|id| {
2610 scx.catalog
2611 .item_dependents(id)
2612 .into_iter()
2613 .map(|id| id.unwrap_item_id())
2614 .collect()
2615 })
2616 .unwrap_or_default();
2617
2618 let full_name = scx.catalog.resolve_full_name(&name);
2620 let partial_name = PartialItemName::from(full_name.clone());
2621 if let (Ok(item), IfExistsBehavior::Error, false) = (
2624 scx.catalog.resolve_item_or_type(&partial_name),
2625 *if_exists,
2626 ignore_if_exists_errors,
2627 ) {
2628 return Err(PlanError::ItemAlreadyExists {
2629 name: full_name.to_string(),
2630 item_type: item.item_type(),
2631 });
2632 }
2633
2634 Ok(Plan::CreateView(CreateViewPlan {
2635 name,
2636 view,
2637 replace,
2638 drop_ids,
2639 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2640 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2641 }))
2642}
2643
2644pub fn describe_create_materialized_view(
2645 _: &StatementContext,
2646 _: CreateMaterializedViewStatement<Aug>,
2647) -> Result<StatementDesc, PlanError> {
2648 Ok(StatementDesc::new(None))
2649}
2650
2651pub fn describe_create_continual_task(
2652 _: &StatementContext,
2653 _: CreateContinualTaskStatement<Aug>,
2654) -> Result<StatementDesc, PlanError> {
2655 Ok(StatementDesc::new(None))
2656}
2657
2658pub fn describe_create_network_policy(
2659 _: &StatementContext,
2660 _: CreateNetworkPolicyStatement<Aug>,
2661) -> Result<StatementDesc, PlanError> {
2662 Ok(StatementDesc::new(None))
2663}
2664
2665pub fn describe_alter_network_policy(
2666 _: &StatementContext,
2667 _: AlterNetworkPolicyStatement<Aug>,
2668) -> Result<StatementDesc, PlanError> {
2669 Ok(StatementDesc::new(None))
2670}
2671
2672pub fn plan_create_materialized_view(
2673 scx: &StatementContext,
2674 mut stmt: CreateMaterializedViewStatement<Aug>,
2675) -> Result<Plan, PlanError> {
2676 let cluster_id =
2677 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2678 stmt.in_cluster = Some(ResolvedClusterName {
2679 id: cluster_id,
2680 print_name: None,
2681 });
2682
2683 let create_sql =
2684 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2685
2686 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2687 let name = scx.allocate_qualified_name(partial_name.clone())?;
2688
2689 let query::PlannedRootQuery {
2690 expr,
2691 mut desc,
2692 finishing,
2693 scope: _,
2694 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2695 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2697 &finishing,
2698 expr.arity()
2699 ));
2700 if expr.contains_parameters()? {
2701 return Err(PlanError::ParameterNotAllowed(
2702 "materialized views".to_string(),
2703 ));
2704 }
2705
2706 plan_utils::maybe_rename_columns(
2707 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2708 &mut desc,
2709 &stmt.columns,
2710 )?;
2711 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2712
2713 let MaterializedViewOptionExtracted {
2714 assert_not_null,
2715 partition_by,
2716 retain_history,
2717 refresh,
2718 seen: _,
2719 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2720
2721 if let Some(partition_by) = partition_by {
2722 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2723 check_partition_by(&desc, partition_by)?;
2724 }
2725
2726 let refresh_schedule = {
2727 let mut refresh_schedule = RefreshSchedule::default();
2728 let mut on_commits_seen = 0;
2729 for refresh_option_value in refresh {
2730 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2731 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2732 }
2733 match refresh_option_value {
2734 RefreshOptionValue::OnCommit => {
2735 on_commits_seen += 1;
2736 }
2737 RefreshOptionValue::AtCreation => {
2738 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2739 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2740 }
2741 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2742 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2744 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2745 name: "REFRESH AT",
2746 scope: &Scope::empty(),
2747 relation_type: &RelationType::empty(),
2748 allow_aggregates: false,
2749 allow_subqueries: false,
2750 allow_parameters: false,
2751 allow_windows: false,
2752 };
2753 let hir = plan_expr(ecx, &time)?.cast_to(
2754 ecx,
2755 CastContext::Assignment,
2756 &ScalarType::MzTimestamp,
2757 )?;
2758 let timestamp = hir
2760 .into_literal_mz_timestamp()
2761 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2762 refresh_schedule.ats.push(timestamp);
2763 }
2764 RefreshOptionValue::Every(RefreshEveryOptionValue {
2765 interval,
2766 aligned_to,
2767 }) => {
2768 let interval = Interval::try_from_value(Value::Interval(interval))?;
2769 if interval.as_microseconds() <= 0 {
2770 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2771 }
2772 if interval.months != 0 {
2773 sql_bail!("REFRESH interval must not involve units larger than days");
2778 }
2779 let interval = interval.duration()?;
2780 if u64::try_from(interval.as_millis()).is_err() {
2781 sql_bail!("REFRESH interval too large");
2782 }
2783
2784 let mut aligned_to = match aligned_to {
2785 Some(aligned_to) => aligned_to,
2786 None => {
2787 soft_panic_or_log!(
2788 "ALIGNED TO should have been filled in by purification"
2789 );
2790 sql_bail!(
2791 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2792 )
2793 }
2794 };
2795
2796 transform_ast::transform(scx, &mut aligned_to)?;
2798
2799 let ecx = &ExprContext {
2800 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2801 name: "REFRESH EVERY ... ALIGNED TO",
2802 scope: &Scope::empty(),
2803 relation_type: &RelationType::empty(),
2804 allow_aggregates: false,
2805 allow_subqueries: false,
2806 allow_parameters: false,
2807 allow_windows: false,
2808 };
2809 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2810 ecx,
2811 CastContext::Assignment,
2812 &ScalarType::MzTimestamp,
2813 )?;
2814 let aligned_to_const = aligned_to_hir
2816 .into_literal_mz_timestamp()
2817 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2818
2819 refresh_schedule.everies.push(RefreshEvery {
2820 interval,
2821 aligned_to: aligned_to_const,
2822 });
2823 }
2824 }
2825 }
2826
2827 if on_commits_seen > 1 {
2828 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2829 }
2830 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2831 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2832 }
2833
2834 if refresh_schedule == RefreshSchedule::default() {
2835 None
2836 } else {
2837 Some(refresh_schedule)
2838 }
2839 };
2840
2841 let as_of = stmt.as_of.map(Timestamp::from);
2842 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2843 let mut non_null_assertions = assert_not_null
2844 .into_iter()
2845 .map(normalize::column_name)
2846 .map(|assertion_name| {
2847 column_names
2848 .iter()
2849 .position(|col| col == &assertion_name)
2850 .ok_or_else(|| {
2851 sql_err!(
2852 "column {} in ASSERT NOT NULL option not found",
2853 assertion_name.quoted()
2854 )
2855 })
2856 })
2857 .collect::<Result<Vec<_>, _>>()?;
2858 non_null_assertions.sort();
2859 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2860 let dup = &column_names[*dup];
2861 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2862 }
2863
2864 if let Some(dup) = column_names.iter().duplicates().next() {
2865 sql_bail!("column {} specified more than once", dup.quoted());
2866 }
2867
2868 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2871 Ok(true) => IfExistsBehavior::Skip,
2872 _ => stmt.if_exists,
2873 };
2874
2875 let mut replace = None;
2876 let mut if_not_exists = false;
2877 match if_exists {
2878 IfExistsBehavior::Replace => {
2879 let if_exists = true;
2880 let cascade = false;
2881 let replace_id = plan_drop_item(
2882 scx,
2883 ObjectType::MaterializedView,
2884 if_exists,
2885 partial_name.into(),
2886 cascade,
2887 )?;
2888
2889 if let Some(id) = replace_id {
2891 let dependencies = expr.depends_on();
2892 let invalid_drop = scx
2893 .get_item(&id)
2894 .global_ids()
2895 .any(|gid| dependencies.contains(&gid));
2896 if invalid_drop {
2897 let item = scx.catalog.get_item(&id);
2898 sql_bail!(
2899 "cannot replace materialized view {0}: depended upon by new {0} definition",
2900 scx.catalog.resolve_full_name(item.name())
2901 );
2902 }
2903 replace = Some(id);
2904 }
2905 }
2906 IfExistsBehavior::Skip => if_not_exists = true,
2907 IfExistsBehavior::Error => (),
2908 }
2909 let drop_ids = replace
2910 .map(|id| {
2911 scx.catalog
2912 .item_dependents(id)
2913 .into_iter()
2914 .map(|id| id.unwrap_item_id())
2915 .collect()
2916 })
2917 .unwrap_or_default();
2918 let dependencies = expr
2919 .depends_on()
2920 .into_iter()
2921 .map(|gid| scx.catalog.resolve_item_id(&gid))
2922 .collect();
2923
2924 let full_name = scx.catalog.resolve_full_name(&name);
2926 let partial_name = PartialItemName::from(full_name.clone());
2927 if let (IfExistsBehavior::Error, Ok(item)) =
2930 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2931 {
2932 return Err(PlanError::ItemAlreadyExists {
2933 name: full_name.to_string(),
2934 item_type: item.item_type(),
2935 });
2936 }
2937
2938 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2939 name,
2940 materialized_view: MaterializedView {
2941 create_sql,
2942 expr,
2943 dependencies,
2944 column_names,
2945 cluster_id,
2946 non_null_assertions,
2947 compaction_window,
2948 refresh_schedule,
2949 as_of,
2950 },
2951 replace,
2952 drop_ids,
2953 if_not_exists,
2954 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2955 }))
2956}
2957
2958generate_extracted_config!(
2959 MaterializedViewOption,
2960 (AssertNotNull, Ident, AllowMultiple),
2961 (PartitionBy, Vec<Ident>),
2962 (RetainHistory, OptionalDuration),
2963 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2964);
2965
2966pub fn plan_create_continual_task(
2967 scx: &StatementContext,
2968 mut stmt: CreateContinualTaskStatement<Aug>,
2969) -> Result<Plan, PlanError> {
2970 match &stmt.sugar {
2971 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
2972 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
2973 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
2974 }
2975 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
2976 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
2977 }
2978 };
2979 let cluster_id = match &stmt.in_cluster {
2980 None => scx.catalog.resolve_cluster(None)?.id(),
2981 Some(in_cluster) => in_cluster.id,
2982 };
2983 stmt.in_cluster = Some(ResolvedClusterName {
2984 id: cluster_id,
2985 print_name: None,
2986 });
2987
2988 let create_sql =
2989 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
2990
2991 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
2992
2993 let mut desc = match stmt.columns {
2998 None => None,
2999 Some(columns) => {
3000 let mut desc_columns = Vec::with_capacity(columns.capacity());
3001 for col in columns.iter() {
3002 desc_columns.push((
3003 normalize::column_name(col.name.clone()),
3004 ColumnType {
3005 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3006 nullable: false,
3007 },
3008 ));
3009 }
3010 Some(RelationDesc::from_names_and_types(desc_columns))
3011 }
3012 };
3013 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3014 match input.item_type() {
3015 CatalogItemType::ContinualTask
3018 | CatalogItemType::Table
3019 | CatalogItemType::MaterializedView
3020 | CatalogItemType::Source => {}
3021 CatalogItemType::Sink
3022 | CatalogItemType::View
3023 | CatalogItemType::Index
3024 | CatalogItemType::Type
3025 | CatalogItemType::Func
3026 | CatalogItemType::Secret
3027 | CatalogItemType::Connection => {
3028 sql_bail!(
3029 "CONTINUAL TASK cannot use {} as an input",
3030 input.item_type()
3031 );
3032 }
3033 }
3034
3035 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3036 let ct_name = stmt.name;
3037 let placeholder_id = match &ct_name {
3038 ResolvedItemName::ContinualTask { id, name } => {
3039 let desc = match desc.as_ref().cloned() {
3040 Some(x) => x,
3041 None => {
3042 let input_name = scx.catalog.resolve_full_name(input.name());
3047 input.desc(&input_name)?.into_owned()
3048 }
3049 };
3050 qcx.ctes.insert(
3051 *id,
3052 CteDesc {
3053 name: name.item.clone(),
3054 desc,
3055 },
3056 );
3057 Some(*id)
3058 }
3059 _ => None,
3060 };
3061
3062 let mut exprs = Vec::new();
3063 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3064 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3065 let query::PlannedRootQuery {
3066 expr,
3067 desc: desc_query,
3068 finishing,
3069 scope: _,
3070 } = query::plan_ct_query(&mut qcx, query)?;
3071 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3074 &finishing,
3075 expr.arity()
3076 ));
3077 if expr.contains_parameters()? {
3078 if expr.contains_parameters()? {
3079 return Err(PlanError::ParameterNotAllowed(
3080 "continual tasks".to_string(),
3081 ));
3082 }
3083 }
3084 let expr = match desc.as_mut() {
3085 None => {
3086 desc = Some(desc_query);
3087 expr
3088 }
3089 Some(desc) => {
3090 if desc_query.arity() > desc.arity() {
3093 sql_bail!(
3094 "statement {}: INSERT has more expressions than target columns",
3095 idx
3096 );
3097 }
3098 if desc_query.arity() < desc.arity() {
3099 sql_bail!(
3100 "statement {}: INSERT has more target columns than expressions",
3101 idx
3102 );
3103 }
3104 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3107 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3108 let expr = expr.map_err(|e| {
3109 sql_err!(
3110 "statement {}: column {} is of type {} but expression is of type {}",
3111 idx,
3112 desc.get_name(e.column).quoted(),
3113 qcx.humanize_scalar_type(&e.target_type, false),
3114 qcx.humanize_scalar_type(&e.source_type, false),
3115 )
3116 })?;
3117
3118 let zip_types = || desc.iter_types().zip(desc_query.iter_types());
3121 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3122 if updated {
3123 let new_types = zip_types().map(|(ct, q)| {
3124 let mut ct = ct.clone();
3125 if q.nullable {
3126 ct.nullable = true;
3127 }
3128 ct
3129 });
3130 *desc = RelationDesc::from_names_and_types(
3131 desc.iter_names().cloned().zip(new_types),
3132 );
3133 }
3134
3135 expr
3136 }
3137 };
3138 match stmt {
3139 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3140 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3141 }
3142 }
3143 let expr = exprs
3146 .into_iter()
3147 .reduce(|acc, expr| acc.union(expr))
3148 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3149 let dependencies = expr
3150 .depends_on()
3151 .into_iter()
3152 .map(|gid| scx.catalog.resolve_item_id(&gid))
3153 .collect();
3154
3155 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3156 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3157 if let Some(dup) = column_names.iter().duplicates().next() {
3158 sql_bail!("column {} specified more than once", dup.quoted());
3159 }
3160
3161 let name = match &ct_name {
3163 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3164 ResolvedItemName::ContinualTask { name, .. } => {
3165 let name = scx.allocate_qualified_name(name.clone())?;
3166 let full_name = scx.catalog.resolve_full_name(&name);
3167 let partial_name = PartialItemName::from(full_name.clone());
3168 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3171 return Err(PlanError::ItemAlreadyExists {
3172 name: full_name.to_string(),
3173 item_type: item.item_type(),
3174 });
3175 }
3176 name
3177 }
3178 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3179 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3180 };
3181
3182 let as_of = stmt.as_of.map(Timestamp::from);
3183 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3184 name,
3185 placeholder_id,
3186 desc,
3187 input_id: input.global_id(),
3188 with_snapshot: snapshot.unwrap_or(true),
3189 continual_task: MaterializedView {
3190 create_sql,
3191 expr,
3192 dependencies,
3193 column_names,
3194 cluster_id,
3195 non_null_assertions: Vec::new(),
3196 compaction_window: None,
3197 refresh_schedule: None,
3198 as_of,
3199 },
3200 }))
3201}
3202
3203fn continual_task_query<'a>(
3204 ct_name: &ResolvedItemName,
3205 stmt: &'a ast::ContinualTaskStmt<Aug>,
3206) -> Option<ast::Query<Aug>> {
3207 match stmt {
3208 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3209 table_name: _,
3210 columns,
3211 source,
3212 returning,
3213 }) => {
3214 if !columns.is_empty() || !returning.is_empty() {
3215 return None;
3216 }
3217 match source {
3218 ast::InsertSource::Query(query) => Some(query.clone()),
3219 ast::InsertSource::DefaultValues => None,
3220 }
3221 }
3222 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3223 table_name: _,
3224 alias,
3225 using,
3226 selection,
3227 }) => {
3228 if !using.is_empty() {
3229 return None;
3230 }
3231 let from = ast::TableWithJoins {
3233 relation: ast::TableFactor::Table {
3234 name: ct_name.clone(),
3235 alias: alias.clone(),
3236 },
3237 joins: Vec::new(),
3238 };
3239 let select = ast::Select {
3240 from: vec![from],
3241 selection: selection.clone(),
3242 distinct: None,
3243 projection: vec![ast::SelectItem::Wildcard],
3244 group_by: Vec::new(),
3245 having: None,
3246 qualify: None,
3247 options: Vec::new(),
3248 };
3249 let query = ast::Query {
3250 ctes: ast::CteBlock::Simple(Vec::new()),
3251 body: ast::SetExpr::Select(Box::new(select)),
3252 order_by: Vec::new(),
3253 limit: None,
3254 offset: None,
3255 };
3256 Some(query)
3258 }
3259 }
3260}
3261
3262generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3263
3264pub fn describe_create_sink(
3265 _: &StatementContext,
3266 _: CreateSinkStatement<Aug>,
3267) -> Result<StatementDesc, PlanError> {
3268 Ok(StatementDesc::new(None))
3269}
3270
3271generate_extracted_config!(
3272 CreateSinkOption,
3273 (Snapshot, bool),
3274 (PartitionStrategy, String),
3275 (Version, u64)
3276);
3277
3278pub fn plan_create_sink(
3279 scx: &StatementContext,
3280 stmt: CreateSinkStatement<Aug>,
3281) -> Result<Plan, PlanError> {
3282 let Some(name) = stmt.name.clone() else {
3284 return Err(PlanError::MissingName(CatalogItemType::Sink));
3285 };
3286 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3287 let full_name = scx.catalog.resolve_full_name(&name);
3288 let partial_name = PartialItemName::from(full_name.clone());
3289 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3290 return Err(PlanError::ItemAlreadyExists {
3291 name: full_name.to_string(),
3292 item_type: item.item_type(),
3293 });
3294 }
3295
3296 plan_sink(scx, stmt)
3297}
3298
3299fn plan_sink(
3304 scx: &StatementContext,
3305 mut stmt: CreateSinkStatement<Aug>,
3306) -> Result<Plan, PlanError> {
3307 let CreateSinkStatement {
3308 name,
3309 in_cluster: _,
3310 from,
3311 connection,
3312 format,
3313 envelope,
3314 if_not_exists,
3315 with_options,
3316 } = stmt.clone();
3317
3318 let Some(name) = name else {
3319 return Err(PlanError::MissingName(CatalogItemType::Sink));
3320 };
3321 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3322
3323 let envelope = match envelope {
3324 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3325 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3326 None => sql_bail!("ENVELOPE clause is required"),
3327 };
3328
3329 let from_name = &from;
3330 let from = scx.get_item_by_resolved_name(&from)?;
3331 if from.id().is_system() {
3332 bail_unsupported!("creating a sink directly on a catalog object");
3333 }
3334 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3335 let key_indices = match &connection {
3336 CreateSinkConnection::Kafka { key, .. } => {
3337 if let Some(key) = key.clone() {
3338 let key_columns = key
3339 .key_columns
3340 .into_iter()
3341 .map(normalize::column_name)
3342 .collect::<Vec<_>>();
3343 let mut uniq = BTreeSet::new();
3344 for col in key_columns.iter() {
3345 if !uniq.insert(col) {
3346 sql_bail!("duplicate column referenced in KEY: {}", col);
3347 }
3348 }
3349 let indices = key_columns
3350 .iter()
3351 .map(|col| -> anyhow::Result<usize> {
3352 let name_idx =
3353 desc.get_by_name(col)
3354 .map(|(idx, _type)| idx)
3355 .ok_or_else(|| {
3356 sql_err!("column referenced in KEY does not exist: {}", col)
3357 })?;
3358 if desc.get_unambiguous_name(name_idx).is_none() {
3359 sql_err!("column referenced in KEY is ambiguous: {}", col);
3360 }
3361 Ok(name_idx)
3362 })
3363 .collect::<Result<Vec<_>, _>>()?;
3364 let is_valid_key =
3365 desc.typ().keys.iter().any(|key_columns| {
3366 key_columns.iter().all(|column| indices.contains(column))
3367 });
3368
3369 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3370 if key.not_enforced {
3371 scx.catalog
3372 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3373 key: key_columns.clone(),
3374 name: name.item.clone(),
3375 })
3376 } else {
3377 return Err(PlanError::UpsertSinkWithInvalidKey {
3378 name: from_name.full_name_str(),
3379 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3380 valid_keys: desc
3381 .typ()
3382 .keys
3383 .iter()
3384 .map(|key| {
3385 key.iter()
3386 .map(|col| desc.get_name(*col).as_str().into())
3387 .collect()
3388 })
3389 .collect(),
3390 });
3391 }
3392 }
3393 Some(indices)
3394 } else {
3395 None
3396 }
3397 }
3398 };
3399
3400 let headers_index = match &connection {
3401 CreateSinkConnection::Kafka {
3402 headers: Some(headers),
3403 ..
3404 } => {
3405 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3406
3407 match envelope {
3408 SinkEnvelope::Upsert => (),
3409 SinkEnvelope::Debezium => {
3410 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3411 }
3412 };
3413
3414 let headers = normalize::column_name(headers.clone());
3415 let (idx, ty) = desc
3416 .get_by_name(&headers)
3417 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3418
3419 if desc.get_unambiguous_name(idx).is_none() {
3420 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3421 }
3422
3423 match &ty.scalar_type {
3424 ScalarType::Map { value_type, .. }
3425 if matches!(&**value_type, ScalarType::String | ScalarType::Bytes) => {}
3426 _ => sql_bail!(
3427 "HEADERS column must have type map[text => text] or map[text => bytea]"
3428 ),
3429 }
3430
3431 Some(idx)
3432 }
3433 _ => None,
3434 };
3435
3436 let relation_key_indices = desc.typ().keys.get(0).cloned();
3438
3439 let key_desc_and_indices = key_indices.map(|key_indices| {
3440 let cols = desc
3441 .iter()
3442 .map(|(name, ty)| (name.clone(), ty.clone()))
3443 .collect::<Vec<_>>();
3444 let (names, types): (Vec<_>, Vec<_>) =
3445 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3446 let typ = RelationType::new(types);
3447 (RelationDesc::new(typ, names), key_indices)
3448 });
3449
3450 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3451 return Err(PlanError::UpsertSinkWithoutKey);
3452 }
3453
3454 let connection_builder = match connection {
3455 CreateSinkConnection::Kafka {
3456 connection,
3457 options,
3458 ..
3459 } => kafka_sink_builder(
3460 scx,
3461 connection,
3462 options,
3463 format,
3464 relation_key_indices,
3465 key_desc_and_indices,
3466 headers_index,
3467 desc.into_owned(),
3468 envelope,
3469 from.id(),
3470 )?,
3471 };
3472
3473 let CreateSinkOptionExtracted {
3474 snapshot,
3475 version,
3476 partition_strategy: _,
3477 seen: _,
3478 } = with_options.try_into()?;
3479
3480 let with_snapshot = snapshot.unwrap_or(true);
3482 let version = version.unwrap_or(0);
3484
3485 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3489 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3490
3491 Ok(Plan::CreateSink(CreateSinkPlan {
3492 name,
3493 sink: Sink {
3494 create_sql,
3495 from: from.global_id(),
3496 connection: connection_builder,
3497 envelope,
3498 version,
3499 },
3500 with_snapshot,
3501 if_not_exists,
3502 in_cluster: in_cluster.id(),
3503 }))
3504}
3505
3506fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3507 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3508
3509 let existing_keys = desc
3510 .typ()
3511 .keys
3512 .iter()
3513 .map(|key_columns| {
3514 key_columns
3515 .iter()
3516 .map(|col| desc.get_name(*col).as_str())
3517 .join(", ")
3518 })
3519 .join(", ");
3520
3521 sql_err!(
3522 "Key constraint ({}) conflicts with existing key ({})",
3523 user_keys,
3524 existing_keys
3525 )
3526}
3527
3528#[derive(Debug, Default, PartialEq, Clone)]
3531pub struct CsrConfigOptionExtracted {
3532 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3533 pub(crate) avro_key_fullname: Option<String>,
3534 pub(crate) avro_value_fullname: Option<String>,
3535 pub(crate) null_defaults: bool,
3536 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3537 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3538 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3539 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3540}
3541
3542impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3543 type Error = crate::plan::PlanError;
3544 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3545 let mut extracted = CsrConfigOptionExtracted::default();
3546 let mut common_doc_comments = BTreeMap::new();
3547 for option in v {
3548 if !extracted.seen.insert(option.name.clone()) {
3549 return Err(PlanError::Unstructured({
3550 format!("{} specified more than once", option.name)
3551 }));
3552 }
3553 let option_name = option.name.clone();
3554 let option_name_str = option_name.to_ast_string_simple();
3555 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3556 option_name: option_name.to_ast_string_simple(),
3557 err: e.into(),
3558 };
3559 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3560 val.map(|s| match s {
3561 WithOptionValue::Value(Value::String(s)) => {
3562 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3563 }
3564 _ => Err("must be a string".to_string()),
3565 })
3566 .transpose()
3567 .map_err(PlanError::Unstructured)
3568 .map_err(better_error)
3569 };
3570 match option.name {
3571 CsrConfigOptionName::AvroKeyFullname => {
3572 extracted.avro_key_fullname =
3573 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3574 }
3575 CsrConfigOptionName::AvroValueFullname => {
3576 extracted.avro_value_fullname =
3577 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3578 }
3579 CsrConfigOptionName::NullDefaults => {
3580 extracted.null_defaults =
3581 <bool>::try_from_value(option.value).map_err(better_error)?;
3582 }
3583 CsrConfigOptionName::AvroDocOn(doc_on) => {
3584 let value = String::try_from_value(option.value.ok_or_else(|| {
3585 PlanError::InvalidOptionValue {
3586 option_name: option_name_str,
3587 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3588 }
3589 })?)
3590 .map_err(better_error)?;
3591 let key = match doc_on.identifier {
3592 DocOnIdentifier::Column(ast::ColumnName {
3593 relation: ResolvedItemName::Item { id, .. },
3594 column: ResolvedColumnReference::Column { name, index: _ },
3595 }) => DocTarget::Field {
3596 object_id: id,
3597 column_name: name,
3598 },
3599 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3600 DocTarget::Type(id)
3601 }
3602 _ => unreachable!(),
3603 };
3604
3605 match doc_on.for_schema {
3606 DocOnSchema::KeyOnly => {
3607 extracted.key_doc_options.insert(key, value);
3608 }
3609 DocOnSchema::ValueOnly => {
3610 extracted.value_doc_options.insert(key, value);
3611 }
3612 DocOnSchema::All => {
3613 common_doc_comments.insert(key, value);
3614 }
3615 }
3616 }
3617 CsrConfigOptionName::KeyCompatibilityLevel => {
3618 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3619 }
3620 CsrConfigOptionName::ValueCompatibilityLevel => {
3621 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3622 }
3623 }
3624 }
3625
3626 for (key, value) in common_doc_comments {
3627 if !extracted.key_doc_options.contains_key(&key) {
3628 extracted.key_doc_options.insert(key.clone(), value.clone());
3629 }
3630 if !extracted.value_doc_options.contains_key(&key) {
3631 extracted.value_doc_options.insert(key, value);
3632 }
3633 }
3634 Ok(extracted)
3635 }
3636}
3637
3638fn kafka_sink_builder(
3639 scx: &StatementContext,
3640 connection: ResolvedItemName,
3641 options: Vec<KafkaSinkConfigOption<Aug>>,
3642 format: Option<FormatSpecifier<Aug>>,
3643 relation_key_indices: Option<Vec<usize>>,
3644 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3645 headers_index: Option<usize>,
3646 value_desc: RelationDesc,
3647 envelope: SinkEnvelope,
3648 sink_from: CatalogItemId,
3649) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3650 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3652 let connection_id = connection_item.id();
3653 match connection_item.connection()? {
3654 Connection::Kafka(_) => (),
3655 _ => sql_bail!(
3656 "{} is not a kafka connection",
3657 scx.catalog.resolve_full_name(connection_item.name())
3658 ),
3659 };
3660
3661 let KafkaSinkConfigOptionExtracted {
3662 topic,
3663 compression_type,
3664 partition_by,
3665 progress_group_id_prefix,
3666 transactional_id_prefix,
3667 legacy_ids,
3668 topic_config,
3669 topic_metadata_refresh_interval,
3670 topic_partition_count,
3671 topic_replication_factor,
3672 seen: _,
3673 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3674
3675 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3676 (Some(_), Some(true)) => {
3677 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3678 }
3679 (None, Some(true)) => KafkaIdStyle::Legacy,
3680 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3681 };
3682
3683 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3684 (Some(_), Some(true)) => {
3685 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3686 }
3687 (None, Some(true)) => KafkaIdStyle::Legacy,
3688 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3689 };
3690
3691 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3692
3693 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3694 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3697 }
3698
3699 let assert_positive = |val: Option<i32>, name: &str| {
3700 if let Some(val) = val {
3701 if val <= 0 {
3702 sql_bail!("{} must be a positive integer", name);
3703 }
3704 }
3705 val.map(NonNeg::try_from)
3706 .transpose()
3707 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3708 };
3709 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3710 let topic_replication_factor =
3711 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3712
3713 let gen_avro_schema_options = |conn| {
3716 let CsrConnectionAvro {
3717 connection:
3718 CsrConnection {
3719 connection,
3720 options,
3721 },
3722 seed,
3723 key_strategy,
3724 value_strategy,
3725 } = conn;
3726 if seed.is_some() {
3727 sql_bail!("SEED option does not make sense with sinks");
3728 }
3729 if key_strategy.is_some() {
3730 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3731 }
3732 if value_strategy.is_some() {
3733 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3734 }
3735
3736 let item = scx.get_item_by_resolved_name(&connection)?;
3737 let csr_connection = match item.connection()? {
3738 Connection::Csr(_) => item.id(),
3739 _ => {
3740 sql_bail!(
3741 "{} is not a schema registry connection",
3742 scx.catalog
3743 .resolve_full_name(item.name())
3744 .to_string()
3745 .quoted()
3746 )
3747 }
3748 };
3749 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3750
3751 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3752 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3753 }
3754
3755 if key_desc_and_indices.is_some()
3756 && (extracted_options.avro_key_fullname.is_some()
3757 ^ extracted_options.avro_value_fullname.is_some())
3758 {
3759 sql_bail!(
3760 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3761 );
3762 }
3763
3764 Ok((csr_connection, extracted_options))
3765 };
3766
3767 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3768 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3769 Format::Bytes if desc.arity() == 1 => {
3770 let col_type = &desc.typ().column_types[0].scalar_type;
3771 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3772 bail_unsupported!(format!(
3773 "BYTES format with non-encodable type: {:?}",
3774 col_type
3775 ));
3776 }
3777
3778 Ok(KafkaSinkFormatType::Bytes)
3779 }
3780 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3781 Format::Bytes | Format::Text => {
3782 bail_unsupported!("BYTES or TEXT format with multiple columns")
3783 }
3784 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3785 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3786 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3787 let schema = if is_key {
3788 AvroSchemaGenerator::new(
3789 desc.clone(),
3790 false,
3791 options.key_doc_options,
3792 options.avro_key_fullname.as_deref().unwrap_or("row"),
3793 options.null_defaults,
3794 Some(sink_from),
3795 false,
3796 )?
3797 .schema()
3798 .to_string()
3799 } else {
3800 AvroSchemaGenerator::new(
3801 desc.clone(),
3802 matches!(envelope, SinkEnvelope::Debezium),
3803 options.value_doc_options,
3804 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3805 options.null_defaults,
3806 Some(sink_from),
3807 true,
3808 )?
3809 .schema()
3810 .to_string()
3811 };
3812 Ok(KafkaSinkFormatType::Avro {
3813 schema,
3814 compatibility_level: if is_key {
3815 options.key_compatibility_level
3816 } else {
3817 options.value_compatibility_level
3818 },
3819 csr_connection,
3820 })
3821 }
3822 format => bail_unsupported!(format!("sink format {:?}", format)),
3823 };
3824
3825 let partition_by = match &partition_by {
3826 Some(partition_by) => {
3827 let mut scope = Scope::from_source(None, value_desc.iter_names());
3828
3829 match envelope {
3830 SinkEnvelope::Upsert => (),
3831 SinkEnvelope::Debezium => {
3832 let key_indices: HashSet<_> = key_desc_and_indices
3833 .as_ref()
3834 .map(|(_desc, indices)| indices.as_slice())
3835 .unwrap_or_default()
3836 .into_iter()
3837 .collect();
3838 for (i, item) in scope.items.iter_mut().enumerate() {
3839 if !key_indices.contains(&i) {
3840 item.error_if_referenced = Some(|_table, column| {
3841 PlanError::InvalidPartitionByEnvelopeDebezium {
3842 column_name: column.to_string(),
3843 }
3844 });
3845 }
3846 }
3847 }
3848 };
3849
3850 let ecx = &ExprContext {
3851 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3852 name: "PARTITION BY",
3853 scope: &scope,
3854 relation_type: value_desc.typ(),
3855 allow_aggregates: false,
3856 allow_subqueries: false,
3857 allow_parameters: false,
3858 allow_windows: false,
3859 };
3860 let expr = plan_expr(ecx, partition_by)?.cast_to(
3861 ecx,
3862 CastContext::Assignment,
3863 &ScalarType::UInt64,
3864 )?;
3865 let expr = expr.lower_uncorrelated()?;
3866
3867 Some(expr)
3868 }
3869 _ => None,
3870 };
3871
3872 let format = match format {
3874 Some(FormatSpecifier::KeyValue { key, value }) => {
3875 let key_format = match key_desc_and_indices.as_ref() {
3876 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3877 None => None,
3878 };
3879 KafkaSinkFormat {
3880 value_format: map_format(value, &value_desc, false)?,
3881 key_format,
3882 }
3883 }
3884 Some(FormatSpecifier::Bare(format)) => {
3885 let key_format = match key_desc_and_indices.as_ref() {
3886 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3887 None => None,
3888 };
3889 KafkaSinkFormat {
3890 value_format: map_format(format, &value_desc, false)?,
3891 key_format,
3892 }
3893 }
3894 None => bail_unsupported!("sink without format"),
3895 };
3896
3897 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3898 connection_id,
3899 connection: connection_id,
3900 format,
3901 topic: topic_name,
3902 relation_key_indices,
3903 key_desc_and_indices,
3904 headers_index,
3905 value_desc,
3906 partition_by,
3907 compression_type,
3908 progress_group_id,
3909 transactional_id,
3910 topic_options: KafkaTopicOptions {
3911 partition_count: topic_partition_count,
3912 replication_factor: topic_replication_factor,
3913 topic_config: topic_config.unwrap_or_default(),
3914 },
3915 topic_metadata_refresh_interval,
3916 }))
3917}
3918
3919pub fn describe_create_index(
3920 _: &StatementContext,
3921 _: CreateIndexStatement<Aug>,
3922) -> Result<StatementDesc, PlanError> {
3923 Ok(StatementDesc::new(None))
3924}
3925
3926pub fn plan_create_index(
3927 scx: &StatementContext,
3928 mut stmt: CreateIndexStatement<Aug>,
3929) -> Result<Plan, PlanError> {
3930 let CreateIndexStatement {
3931 name,
3932 on_name,
3933 in_cluster,
3934 key_parts,
3935 with_options,
3936 if_not_exists,
3937 } = &mut stmt;
3938 let on = scx.get_item_by_resolved_name(on_name)?;
3939 let on_item_type = on.item_type();
3940
3941 if !matches!(
3942 on_item_type,
3943 CatalogItemType::View
3944 | CatalogItemType::MaterializedView
3945 | CatalogItemType::Source
3946 | CatalogItemType::Table
3947 ) {
3948 sql_bail!(
3949 "index cannot be created on {} because it is a {}",
3950 on_name.full_name_str(),
3951 on.item_type()
3952 )
3953 }
3954
3955 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3956
3957 let filled_key_parts = match key_parts {
3958 Some(kp) => kp.to_vec(),
3959 None => {
3960 on.desc(&scx.catalog.resolve_full_name(on.name()))?
3962 .typ()
3963 .default_key()
3964 .iter()
3965 .map(|i| match on_desc.get_unambiguous_name(*i) {
3966 Some(n) => Expr::Identifier(vec![n.clone().into()]),
3967 _ => Expr::Value(Value::Number((i + 1).to_string())),
3968 })
3969 .collect()
3970 }
3971 };
3972 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
3973
3974 let index_name = if let Some(name) = name {
3975 QualifiedItemName {
3976 qualifiers: on.name().qualifiers.clone(),
3977 item: normalize::ident(name.clone()),
3978 }
3979 } else {
3980 let mut idx_name = QualifiedItemName {
3981 qualifiers: on.name().qualifiers.clone(),
3982 item: on.name().item.clone(),
3983 };
3984 if key_parts.is_none() {
3985 idx_name.item += "_primary_idx";
3987 } else {
3988 let index_name_col_suffix = keys
3991 .iter()
3992 .map(|k| match k {
3993 mz_expr::MirScalarExpr::Column(i, name) => {
3994 match (on_desc.get_unambiguous_name(*i), &name.0) {
3995 (Some(col_name), _) => col_name.to_string(),
3996 (None, Some(name)) => name.to_string(),
3997 (None, None) => format!("{}", i + 1),
3998 }
3999 }
4000 _ => "expr".to_string(),
4001 })
4002 .join("_");
4003 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4004 .expect("write on strings cannot fail");
4005 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4006 }
4007
4008 if !*if_not_exists {
4009 scx.catalog.find_available_name(idx_name)
4010 } else {
4011 idx_name
4012 }
4013 };
4014
4015 let full_name = scx.catalog.resolve_full_name(&index_name);
4017 let partial_name = PartialItemName::from(full_name.clone());
4018 if let (Ok(item), false, false) = (
4026 scx.catalog.resolve_item_or_type(&partial_name),
4027 *if_not_exists,
4028 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4029 ) {
4030 return Err(PlanError::ItemAlreadyExists {
4031 name: full_name.to_string(),
4032 item_type: item.item_type(),
4033 });
4034 }
4035
4036 let options = plan_index_options(scx, with_options.clone())?;
4037 let cluster_id = match in_cluster {
4038 None => scx.resolve_cluster(None)?.id(),
4039 Some(in_cluster) => in_cluster.id,
4040 };
4041
4042 *in_cluster = Some(ResolvedClusterName {
4043 id: cluster_id,
4044 print_name: None,
4045 });
4046
4047 *name = Some(Ident::new(index_name.item.clone())?);
4049 *key_parts = Some(filled_key_parts);
4050 let if_not_exists = *if_not_exists;
4051
4052 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4053 let compaction_window = options.iter().find_map(|o| {
4054 #[allow(irrefutable_let_patterns)]
4055 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4056 Some(lcw.clone())
4057 } else {
4058 None
4059 }
4060 });
4061
4062 Ok(Plan::CreateIndex(CreateIndexPlan {
4063 name: index_name,
4064 index: Index {
4065 create_sql,
4066 on: on.global_id(),
4067 keys,
4068 cluster_id,
4069 compaction_window,
4070 },
4071 if_not_exists,
4072 }))
4073}
4074
4075pub fn describe_create_type(
4076 _: &StatementContext,
4077 _: CreateTypeStatement<Aug>,
4078) -> Result<StatementDesc, PlanError> {
4079 Ok(StatementDesc::new(None))
4080}
4081
4082pub fn plan_create_type(
4083 scx: &StatementContext,
4084 stmt: CreateTypeStatement<Aug>,
4085) -> Result<Plan, PlanError> {
4086 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4087 let CreateTypeStatement { name, as_type, .. } = stmt;
4088
4089 fn validate_data_type(
4090 scx: &StatementContext,
4091 data_type: ResolvedDataType,
4092 as_type: &str,
4093 key: &str,
4094 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4095 let (id, modifiers) = match data_type {
4096 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4097 _ => sql_bail!(
4098 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4099 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4100 as_type,
4101 key,
4102 data_type.human_readable_name(),
4103 ),
4104 };
4105
4106 let item = scx.catalog.get_item(&id);
4107 match item.type_details() {
4108 None => sql_bail!(
4109 "{} must be of class type, but received {} which is of class {}",
4110 key,
4111 scx.catalog.resolve_full_name(item.name()),
4112 item.item_type()
4113 ),
4114 Some(CatalogTypeDetails {
4115 typ: CatalogType::Char,
4116 ..
4117 }) => {
4118 bail_unsupported!("embedding char type in a list or map")
4119 }
4120 _ => {
4121 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4123
4124 Ok((id, modifiers))
4125 }
4126 }
4127 }
4128
4129 let inner = match as_type {
4130 CreateTypeAs::List { options } => {
4131 let CreateTypeListOptionExtracted {
4132 element_type,
4133 seen: _,
4134 } = CreateTypeListOptionExtracted::try_from(options)?;
4135 let element_type =
4136 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4137 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4138 CatalogType::List {
4139 element_reference: id,
4140 element_modifiers: modifiers,
4141 }
4142 }
4143 CreateTypeAs::Map { options } => {
4144 let CreateTypeMapOptionExtracted {
4145 key_type,
4146 value_type,
4147 seen: _,
4148 } = CreateTypeMapOptionExtracted::try_from(options)?;
4149 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4150 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4151 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4152 let (value_id, value_modifiers) =
4153 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4154 CatalogType::Map {
4155 key_reference: key_id,
4156 key_modifiers,
4157 value_reference: value_id,
4158 value_modifiers,
4159 }
4160 }
4161 CreateTypeAs::Record { column_defs } => {
4162 let mut fields = vec![];
4163 for column_def in column_defs {
4164 let data_type = column_def.data_type;
4165 let key = ident(column_def.name.clone());
4166 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4167 fields.push(CatalogRecordField {
4168 name: ColumnName::from(key.clone()),
4169 type_reference: id,
4170 type_modifiers: modifiers,
4171 });
4172 }
4173 CatalogType::Record { fields }
4174 }
4175 };
4176
4177 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4178
4179 let full_name = scx.catalog.resolve_full_name(&name);
4181 let partial_name = PartialItemName::from(full_name.clone());
4182 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4185 if item.item_type().conflicts_with_type() {
4186 return Err(PlanError::ItemAlreadyExists {
4187 name: full_name.to_string(),
4188 item_type: item.item_type(),
4189 });
4190 }
4191 }
4192
4193 Ok(Plan::CreateType(CreateTypePlan {
4194 name,
4195 typ: Type { create_sql, inner },
4196 }))
4197}
4198
4199generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4200
4201generate_extracted_config!(
4202 CreateTypeMapOption,
4203 (KeyType, ResolvedDataType),
4204 (ValueType, ResolvedDataType)
4205);
4206
4207#[derive(Debug)]
4208pub enum PlannedAlterRoleOption {
4209 Attributes(PlannedRoleAttributes),
4210 Variable(PlannedRoleVariable),
4211}
4212
4213#[derive(Debug, Clone)]
4214pub struct PlannedRoleAttributes {
4215 pub inherit: Option<bool>,
4216 pub password: Option<Password>,
4217 pub nopassword: Option<bool>,
4221 pub superuser: Option<bool>,
4222 pub login: Option<bool>,
4223}
4224
4225fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4226 let mut planned_attributes = PlannedRoleAttributes {
4227 inherit: None,
4228 password: None,
4229 superuser: None,
4230 login: None,
4231 nopassword: None,
4232 };
4233
4234 for option in options {
4235 match option {
4236 RoleAttribute::Inherit | RoleAttribute::NoInherit
4237 if planned_attributes.inherit.is_some() =>
4238 {
4239 sql_bail!("conflicting or redundant options");
4240 }
4241 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4242 bail_never_supported!(
4243 "CREATECLUSTER attribute",
4244 "sql/create-role/#details",
4245 "Use system privileges instead."
4246 );
4247 }
4248 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4249 bail_never_supported!(
4250 "CREATEDB attribute",
4251 "sql/create-role/#details",
4252 "Use system privileges instead."
4253 );
4254 }
4255 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4256 bail_never_supported!(
4257 "CREATEROLE attribute",
4258 "sql/create-role/#details",
4259 "Use system privileges instead."
4260 );
4261 }
4262 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4263 sql_bail!("conflicting or redundant options");
4264 }
4265
4266 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4267 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4268 RoleAttribute::Password(password) => {
4269 if let Some(password) = password {
4270 planned_attributes.password = Some(password.into());
4271 } else {
4272 planned_attributes.nopassword = Some(true);
4273 }
4274 }
4275 RoleAttribute::SuperUser => {
4276 if planned_attributes.superuser == Some(false) {
4277 sql_bail!("conflicting or redundant options");
4278 }
4279 planned_attributes.superuser = Some(true);
4280 }
4281 RoleAttribute::NoSuperUser => {
4282 if planned_attributes.superuser == Some(true) {
4283 sql_bail!("conflicting or redundant options");
4284 }
4285 planned_attributes.superuser = Some(false);
4286 }
4287 RoleAttribute::Login => {
4288 if planned_attributes.login == Some(false) {
4289 sql_bail!("conflicting or redundant options");
4290 }
4291 planned_attributes.login = Some(true);
4292 }
4293 RoleAttribute::NoLogin => {
4294 if planned_attributes.login == Some(true) {
4295 sql_bail!("conflicting or redundant options");
4296 }
4297 planned_attributes.login = Some(false);
4298 }
4299 }
4300 }
4301 if planned_attributes.inherit == Some(false) {
4302 bail_unsupported!("non inherit roles");
4303 }
4304
4305 Ok(planned_attributes)
4306}
4307
4308#[derive(Debug)]
4309pub enum PlannedRoleVariable {
4310 Set { name: String, value: VariableValue },
4311 Reset { name: String },
4312}
4313
4314impl PlannedRoleVariable {
4315 pub fn name(&self) -> &str {
4316 match self {
4317 PlannedRoleVariable::Set { name, .. } => name,
4318 PlannedRoleVariable::Reset { name } => name,
4319 }
4320 }
4321}
4322
4323fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4324 let plan = match variable {
4325 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4326 name: name.to_string(),
4327 value: scl::plan_set_variable_to(value)?,
4328 },
4329 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4330 name: name.to_string(),
4331 },
4332 };
4333 Ok(plan)
4334}
4335
4336pub fn describe_create_role(
4337 _: &StatementContext,
4338 _: CreateRoleStatement,
4339) -> Result<StatementDesc, PlanError> {
4340 Ok(StatementDesc::new(None))
4341}
4342
4343pub fn plan_create_role(
4344 _: &StatementContext,
4345 CreateRoleStatement { name, options }: CreateRoleStatement,
4346) -> Result<Plan, PlanError> {
4347 let attributes = plan_role_attributes(options)?;
4348 Ok(Plan::CreateRole(CreateRolePlan {
4349 name: normalize::ident(name),
4350 attributes: attributes.into(),
4351 }))
4352}
4353
4354pub fn plan_create_network_policy(
4355 ctx: &StatementContext,
4356 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4357) -> Result<Plan, PlanError> {
4358 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4359 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4360
4361 let Some(rule_defs) = policy_options.rules else {
4362 sql_bail!("RULES must be specified when creating network policies.");
4363 };
4364
4365 let mut rules = vec![];
4366 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4367 let NetworkPolicyRuleOptionExtracted {
4368 seen: _,
4369 direction,
4370 action,
4371 address,
4372 } = options.try_into()?;
4373 let (direction, action, address) = match (direction, action, address) {
4374 (Some(direction), Some(action), Some(address)) => (
4375 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4376 NetworkPolicyRuleAction::try_from(action.as_str())?,
4377 PolicyAddress::try_from(address.as_str())?,
4378 ),
4379 (_, _, _) => {
4380 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4381 }
4382 };
4383 rules.push(NetworkPolicyRule {
4384 name: normalize::ident(name),
4385 direction,
4386 action,
4387 address,
4388 });
4389 }
4390
4391 if rules.len()
4392 > ctx
4393 .catalog
4394 .system_vars()
4395 .max_rules_per_network_policy()
4396 .try_into()?
4397 {
4398 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4399 }
4400
4401 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4402 name: normalize::ident(name),
4403 rules,
4404 }))
4405}
4406
4407pub fn plan_alter_network_policy(
4408 ctx: &StatementContext,
4409 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4410) -> Result<Plan, PlanError> {
4411 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4412
4413 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4414 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4415
4416 let Some(rule_defs) = policy_options.rules else {
4417 sql_bail!("RULES must be specified when creating network policies.");
4418 };
4419
4420 let mut rules = vec![];
4421 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4422 let NetworkPolicyRuleOptionExtracted {
4423 seen: _,
4424 direction,
4425 action,
4426 address,
4427 } = options.try_into()?;
4428
4429 let (direction, action, address) = match (direction, action, address) {
4430 (Some(direction), Some(action), Some(address)) => (
4431 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4432 NetworkPolicyRuleAction::try_from(action.as_str())?,
4433 PolicyAddress::try_from(address.as_str())?,
4434 ),
4435 (_, _, _) => {
4436 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4437 }
4438 };
4439 rules.push(NetworkPolicyRule {
4440 name: normalize::ident(name),
4441 direction,
4442 action,
4443 address,
4444 });
4445 }
4446 if rules.len()
4447 > ctx
4448 .catalog
4449 .system_vars()
4450 .max_rules_per_network_policy()
4451 .try_into()?
4452 {
4453 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4454 }
4455
4456 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4457 id: policy.id(),
4458 name: normalize::ident(name),
4459 rules,
4460 }))
4461}
4462
4463pub fn describe_create_cluster(
4464 _: &StatementContext,
4465 _: CreateClusterStatement<Aug>,
4466) -> Result<StatementDesc, PlanError> {
4467 Ok(StatementDesc::new(None))
4468}
4469
4470generate_extracted_config!(
4476 ClusterOption,
4477 (AvailabilityZones, Vec<String>),
4478 (Disk, bool),
4479 (IntrospectionDebugging, bool),
4480 (IntrospectionInterval, OptionalDuration),
4481 (Managed, bool),
4482 (Replicas, Vec<ReplicaDefinition<Aug>>),
4483 (ReplicationFactor, u32),
4484 (Size, String),
4485 (Schedule, ClusterScheduleOptionValue),
4486 (WorkloadClass, OptionalString)
4487);
4488
4489generate_extracted_config!(
4490 NetworkPolicyOption,
4491 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4492);
4493
4494generate_extracted_config!(
4495 NetworkPolicyRuleOption,
4496 (Direction, String),
4497 (Action, String),
4498 (Address, String)
4499);
4500
4501generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4502
4503generate_extracted_config!(
4504 ClusterAlterUntilReadyOption,
4505 (Timeout, Duration),
4506 (OnTimeout, String)
4507);
4508
4509generate_extracted_config!(
4510 ClusterFeature,
4511 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4512 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4513 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4514 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4515 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4516 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4517 (
4518 EnableProjectionPushdownAfterRelationCse,
4519 Option<bool>,
4520 Default(None)
4521 )
4522);
4523
4524pub fn plan_create_cluster(
4528 scx: &StatementContext,
4529 stmt: CreateClusterStatement<Aug>,
4530) -> Result<Plan, PlanError> {
4531 let plan = plan_create_cluster_inner(scx, stmt)?;
4532
4533 if let CreateClusterVariant::Managed(_) = &plan.variant {
4535 let stmt = unplan_create_cluster(scx, plan.clone())
4536 .map_err(|e| PlanError::Replan(e.to_string()))?;
4537 let create_sql = stmt.to_ast_string_stable();
4538 let stmt = parse::parse(&create_sql)
4539 .map_err(|e| PlanError::Replan(e.to_string()))?
4540 .into_element()
4541 .ast;
4542 let (stmt, _resolved_ids) =
4543 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4544 let stmt = match stmt {
4545 Statement::CreateCluster(stmt) => stmt,
4546 stmt => {
4547 return Err(PlanError::Replan(format!(
4548 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4549 )));
4550 }
4551 };
4552 let replan =
4553 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4554 if plan != replan {
4555 return Err(PlanError::Replan(format!(
4556 "replan does not match: plan={plan:?}, replan={replan:?}"
4557 )));
4558 }
4559 }
4560
4561 Ok(Plan::CreateCluster(plan))
4562}
4563
4564pub fn plan_create_cluster_inner(
4565 scx: &StatementContext,
4566 CreateClusterStatement {
4567 name,
4568 options,
4569 features,
4570 }: CreateClusterStatement<Aug>,
4571) -> Result<CreateClusterPlan, PlanError> {
4572 let ClusterOptionExtracted {
4573 availability_zones,
4574 introspection_debugging,
4575 introspection_interval,
4576 managed,
4577 replicas,
4578 replication_factor,
4579 seen: _,
4580 size,
4581 disk: disk_in,
4582 schedule,
4583 workload_class,
4584 }: ClusterOptionExtracted = options.try_into()?;
4585
4586 let managed = managed.unwrap_or_else(|| replicas.is_none());
4587
4588 if !scx.catalog.active_role_id().is_system() {
4589 if !features.is_empty() {
4590 sql_bail!("FEATURES not supported for non-system users");
4591 }
4592 if workload_class.is_some() {
4593 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4594 }
4595 }
4596
4597 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4598 let workload_class = workload_class.and_then(|v| v.0);
4599
4600 if managed {
4601 if replicas.is_some() {
4602 sql_bail!("REPLICAS not supported for managed clusters");
4603 }
4604 let Some(size) = size else {
4605 sql_bail!("SIZE must be specified for managed clusters");
4606 };
4607
4608 let mut disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4609 if scx.catalog.is_cluster_size_cc(&size) {
4616 if disk_in == Some(false) {
4617 sql_bail!(
4618 "DISK option disabled is not supported for non-legacy cluster sizes because disk is always enabled"
4619 );
4620 }
4621 disk_default = true;
4622 }
4623 if matches!(disk_in, Some(disk) if disk != disk_default) {
4626 scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4627 }
4628 let disk = disk_in.unwrap_or(disk_default);
4629
4630 let compute = plan_compute_replica_config(
4631 introspection_interval,
4632 introspection_debugging.unwrap_or(false),
4633 )?;
4634
4635 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4636 replication_factor.unwrap_or_else(|| {
4637 scx.catalog
4638 .system_vars()
4639 .default_cluster_replication_factor()
4640 })
4641 } else {
4642 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4643 if replication_factor.is_some() {
4644 sql_bail!(
4645 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4646 );
4647 }
4648 0
4652 };
4653 let availability_zones = availability_zones.unwrap_or_default();
4654
4655 if !availability_zones.is_empty() {
4656 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4657 }
4658
4659 let ClusterFeatureExtracted {
4661 reoptimize_imported_views,
4662 enable_eager_delta_joins,
4663 enable_new_outer_join_lowering,
4664 enable_variadic_left_join_lowering,
4665 enable_letrec_fixpoint_analysis,
4666 enable_join_prioritize_arranged,
4667 enable_projection_pushdown_after_relation_cse,
4668 seen: _,
4669 } = ClusterFeatureExtracted::try_from(features)?;
4670 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4671 reoptimize_imported_views,
4672 enable_eager_delta_joins,
4673 enable_new_outer_join_lowering,
4674 enable_variadic_left_join_lowering,
4675 enable_letrec_fixpoint_analysis,
4676 enable_join_prioritize_arranged,
4677 enable_projection_pushdown_after_relation_cse,
4678 ..Default::default()
4679 };
4680
4681 let schedule = plan_cluster_schedule(schedule)?;
4682
4683 Ok(CreateClusterPlan {
4684 name: normalize::ident(name),
4685 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4686 replication_factor,
4687 size,
4688 availability_zones,
4689 compute,
4690 disk,
4691 optimizer_feature_overrides,
4692 schedule,
4693 }),
4694 workload_class,
4695 })
4696 } else {
4697 let Some(replica_defs) = replicas else {
4698 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4699 };
4700 if availability_zones.is_some() {
4701 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4702 }
4703 if replication_factor.is_some() {
4704 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4705 }
4706 if introspection_debugging.is_some() {
4707 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4708 }
4709 if introspection_interval.is_some() {
4710 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4711 }
4712 if size.is_some() {
4713 sql_bail!("SIZE not supported for unmanaged clusters");
4714 }
4715 if disk_in.is_some() {
4716 sql_bail!("DISK not supported for unmanaged clusters");
4717 }
4718 if !features.is_empty() {
4719 sql_bail!("FEATURES not supported for unmanaged clusters");
4720 }
4721 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4722 sql_bail!(
4723 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4724 );
4725 }
4726
4727 let mut replicas = vec![];
4728 for ReplicaDefinition { name, options } in replica_defs {
4729 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4730 }
4731
4732 Ok(CreateClusterPlan {
4733 name: normalize::ident(name),
4734 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4735 workload_class,
4736 })
4737 }
4738}
4739
4740pub fn unplan_create_cluster(
4744 scx: &StatementContext,
4745 CreateClusterPlan {
4746 name,
4747 variant,
4748 workload_class,
4749 }: CreateClusterPlan,
4750) -> Result<CreateClusterStatement<Aug>, PlanError> {
4751 match variant {
4752 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4753 replication_factor,
4754 size,
4755 availability_zones,
4756 compute,
4757 disk,
4758 optimizer_feature_overrides,
4759 schedule,
4760 }) => {
4761 let schedule = unplan_cluster_schedule(schedule);
4762 let OptimizerFeatureOverrides {
4763 enable_consolidate_after_union_negate: _,
4764 enable_reduce_mfp_fusion: _,
4765 enable_cardinality_estimates: _,
4766 persist_fast_path_limit: _,
4767 reoptimize_imported_views,
4768 enable_eager_delta_joins,
4769 enable_new_outer_join_lowering,
4770 enable_variadic_left_join_lowering,
4771 enable_letrec_fixpoint_analysis,
4772 enable_reduce_reduction: _,
4773 enable_join_prioritize_arranged,
4774 enable_projection_pushdown_after_relation_cse,
4775 enable_less_reduce_in_eqprop: _,
4776 enable_dequadratic_eqprop_map: _,
4777 } = optimizer_feature_overrides;
4778 let features_extracted = ClusterFeatureExtracted {
4780 seen: Default::default(),
4782 reoptimize_imported_views,
4783 enable_eager_delta_joins,
4784 enable_new_outer_join_lowering,
4785 enable_variadic_left_join_lowering,
4786 enable_letrec_fixpoint_analysis,
4787 enable_join_prioritize_arranged,
4788 enable_projection_pushdown_after_relation_cse,
4789 };
4790 let features = features_extracted.into_values(scx.catalog);
4791 let availability_zones = if availability_zones.is_empty() {
4792 None
4793 } else {
4794 Some(availability_zones)
4795 };
4796 let (introspection_interval, introspection_debugging) =
4797 unplan_compute_replica_config(compute);
4798 let replication_factor = match &schedule {
4801 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4802 ClusterScheduleOptionValue::Refresh { .. } => {
4803 assert!(
4804 replication_factor <= 1,
4805 "replication factor, {replication_factor:?}, must be <= 1"
4806 );
4807 None
4808 }
4809 };
4810 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4811 let options_extracted = ClusterOptionExtracted {
4812 seen: Default::default(),
4814 availability_zones,
4815 disk: Some(disk),
4816 introspection_debugging: Some(introspection_debugging),
4817 introspection_interval,
4818 managed: Some(true),
4819 replicas: None,
4820 replication_factor,
4821 size: Some(size),
4822 schedule: Some(schedule),
4823 workload_class,
4824 };
4825 let options = options_extracted.into_values(scx.catalog);
4826 let name = Ident::new_unchecked(name);
4827 Ok(CreateClusterStatement {
4828 name,
4829 options,
4830 features,
4831 })
4832 }
4833 CreateClusterVariant::Unmanaged(_) => {
4834 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4835 }
4836 }
4837}
4838
4839generate_extracted_config!(
4840 ReplicaOption,
4841 (AvailabilityZone, String),
4842 (BilledAs, String),
4843 (ComputeAddresses, Vec<String>),
4844 (ComputectlAddresses, Vec<String>),
4845 (Disk, bool),
4846 (Internal, bool, Default(false)),
4847 (IntrospectionDebugging, bool, Default(false)),
4848 (IntrospectionInterval, OptionalDuration),
4849 (Size, String),
4850 (StorageAddresses, Vec<String>),
4851 (StoragectlAddresses, Vec<String>),
4852 (Workers, u16)
4853);
4854
4855fn plan_replica_config(
4856 scx: &StatementContext,
4857 options: Vec<ReplicaOption<Aug>>,
4858) -> Result<ReplicaConfig, PlanError> {
4859 let ReplicaOptionExtracted {
4860 availability_zone,
4861 billed_as,
4862 compute_addresses,
4863 computectl_addresses,
4864 disk: disk_in,
4865 internal,
4866 introspection_debugging,
4867 introspection_interval,
4868 size,
4869 storage_addresses,
4870 storagectl_addresses,
4871 workers,
4872 ..
4873 }: ReplicaOptionExtracted = options.try_into()?;
4874
4875 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4876
4877 if disk_in.is_some() {
4878 scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
4879 }
4880
4881 match (
4882 size,
4883 availability_zone,
4884 billed_as,
4885 storagectl_addresses,
4886 storage_addresses,
4887 computectl_addresses,
4888 compute_addresses,
4889 workers,
4890 ) {
4891 (None, _, None, None, None, None, None, None) => {
4893 sql_bail!("SIZE option must be specified");
4896 }
4897 (Some(size), availability_zone, billed_as, None, None, None, None, None) => {
4898 let disk_default = scx.catalog.system_vars().disk_cluster_replicas_default();
4899 let mut disk = disk_in.unwrap_or(disk_default);
4900
4901 if scx.catalog.is_cluster_size_cc(&size) {
4909 if disk_in.is_some() {
4910 sql_bail!(
4911 "DISK option not supported for non-legacy cluster sizes because disk is always enabled"
4912 );
4913 }
4914 disk = true;
4915 }
4916
4917 Ok(ReplicaConfig::Orchestrated {
4918 size,
4919 availability_zone,
4920 compute,
4921 disk,
4922 billed_as,
4923 internal,
4924 })
4925 }
4926
4927 (
4928 None,
4929 None,
4930 None,
4931 storagectl_addresses,
4932 storage_addresses,
4933 computectl_addresses,
4934 compute_addresses,
4935 workers,
4936 ) => {
4937 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4938
4939 let Some(storagectl_addrs) = storagectl_addresses else {
4943 sql_bail!("missing STORAGECTL ADDRESSES option");
4944 };
4945 let Some(storage_addrs) = storage_addresses else {
4946 sql_bail!("missing STORAGE ADDRESSES option");
4947 };
4948 let Some(computectl_addrs) = computectl_addresses else {
4949 sql_bail!("missing COMPUTECTL ADDRESSES option");
4950 };
4951 let Some(compute_addrs) = compute_addresses else {
4952 sql_bail!("missing COMPUTE ADDRESSES option");
4953 };
4954 let workers = workers.unwrap_or(1);
4955
4956 if computectl_addrs.len() != compute_addrs.len() {
4957 sql_bail!("COMPUTECTL ADDRESSES and COMPUTE ADDRESSES must have the same length");
4958 }
4959 if storagectl_addrs.len() != storage_addrs.len() {
4960 sql_bail!("STORAGECTL ADDRESSES and STORAGE ADDRESSES must have the same length");
4961 }
4962 if storagectl_addrs.len() != computectl_addrs.len() {
4963 sql_bail!(
4964 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4965 );
4966 }
4967
4968 if workers == 0 {
4969 sql_bail!("WORKERS must be greater than 0");
4970 }
4971
4972 if disk_in.is_some() {
4973 sql_bail!("DISK can't be specified for unorchestrated clusters");
4974 }
4975
4976 Ok(ReplicaConfig::Unorchestrated {
4977 storagectl_addrs,
4978 storage_addrs,
4979 computectl_addrs,
4980 compute_addrs,
4981 workers: workers.into(),
4982 compute,
4983 })
4984 }
4985 _ => {
4986 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4989 }
4990 }
4991}
4992
4993fn plan_compute_replica_config(
4997 introspection_interval: Option<OptionalDuration>,
4998 introspection_debugging: bool,
4999) -> Result<ComputeReplicaConfig, PlanError> {
5000 let introspection_interval = introspection_interval
5001 .map(|OptionalDuration(i)| i)
5002 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
5003 let introspection = match introspection_interval {
5004 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
5005 interval,
5006 debugging: introspection_debugging,
5007 }),
5008 None if introspection_debugging => {
5009 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
5010 }
5011 None => None,
5012 };
5013 let compute = ComputeReplicaConfig { introspection };
5014 Ok(compute)
5015}
5016
5017fn unplan_compute_replica_config(
5021 compute_replica_config: ComputeReplicaConfig,
5022) -> (Option<OptionalDuration>, bool) {
5023 match compute_replica_config.introspection {
5024 Some(ComputeReplicaIntrospectionConfig {
5025 debugging,
5026 interval,
5027 }) => (Some(OptionalDuration(Some(interval))), debugging),
5028 None => (Some(OptionalDuration(None)), false),
5029 }
5030}
5031
5032fn plan_cluster_schedule(
5036 schedule: ClusterScheduleOptionValue,
5037) -> Result<ClusterSchedule, PlanError> {
5038 Ok(match schedule {
5039 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5040 ClusterScheduleOptionValue::Refresh {
5042 hydration_time_estimate: None,
5043 } => ClusterSchedule::Refresh {
5044 hydration_time_estimate: Duration::from_millis(0),
5045 },
5046 ClusterScheduleOptionValue::Refresh {
5048 hydration_time_estimate: Some(interval_value),
5049 } => {
5050 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5051 if interval.as_microseconds() < 0 {
5052 sql_bail!(
5053 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5054 interval
5055 );
5056 }
5057 if interval.months != 0 {
5058 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5062 }
5063 let duration = interval.duration()?;
5064 if u64::try_from(duration.as_millis()).is_err()
5065 || Interval::from_duration(&duration).is_err()
5066 {
5067 sql_bail!("HYDRATION TIME ESTIMATE too large");
5068 }
5069 ClusterSchedule::Refresh {
5070 hydration_time_estimate: duration,
5071 }
5072 }
5073 })
5074}
5075
5076fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5080 match schedule {
5081 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5082 ClusterSchedule::Refresh {
5083 hydration_time_estimate,
5084 } => {
5085 let interval = Interval::from_duration(&hydration_time_estimate)
5086 .expect("planning ensured that this is convertible back to Interval");
5087 let interval_value = literal::unplan_interval(&interval);
5088 ClusterScheduleOptionValue::Refresh {
5089 hydration_time_estimate: Some(interval_value),
5090 }
5091 }
5092 }
5093}
5094
5095pub fn describe_create_cluster_replica(
5096 _: &StatementContext,
5097 _: CreateClusterReplicaStatement<Aug>,
5098) -> Result<StatementDesc, PlanError> {
5099 Ok(StatementDesc::new(None))
5100}
5101
5102pub fn plan_create_cluster_replica(
5103 scx: &StatementContext,
5104 CreateClusterReplicaStatement {
5105 definition: ReplicaDefinition { name, options },
5106 of_cluster,
5107 }: CreateClusterReplicaStatement<Aug>,
5108) -> Result<Plan, PlanError> {
5109 let cluster = scx
5110 .catalog
5111 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5112 let current_replica_count = cluster.replica_ids().iter().count();
5113 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5114 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5115 return Err(PlanError::CreateReplicaFailStorageObjects {
5116 current_replica_count,
5117 internal_replica_count,
5118 hypothetical_replica_count: current_replica_count + 1,
5119 });
5120 }
5121
5122 let config = plan_replica_config(scx, options)?;
5123
5124 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5125 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5126 return Err(PlanError::MangedReplicaName(name.into_string()));
5127 }
5128 } else {
5129 ensure_cluster_is_not_managed(scx, cluster.id())?;
5130 }
5131
5132 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5133 name: normalize::ident(name),
5134 cluster_id: cluster.id(),
5135 config,
5136 }))
5137}
5138
5139pub fn describe_create_secret(
5140 _: &StatementContext,
5141 _: CreateSecretStatement<Aug>,
5142) -> Result<StatementDesc, PlanError> {
5143 Ok(StatementDesc::new(None))
5144}
5145
5146pub fn plan_create_secret(
5147 scx: &StatementContext,
5148 stmt: CreateSecretStatement<Aug>,
5149) -> Result<Plan, PlanError> {
5150 let CreateSecretStatement {
5151 name,
5152 if_not_exists,
5153 value,
5154 } = &stmt;
5155
5156 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5157 let mut create_sql_statement = stmt.clone();
5158 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5159 let create_sql =
5160 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5161 let secret_as = query::plan_secret_as(scx, value.clone())?;
5162
5163 let secret = Secret {
5164 create_sql,
5165 secret_as,
5166 };
5167
5168 Ok(Plan::CreateSecret(CreateSecretPlan {
5169 name,
5170 secret,
5171 if_not_exists: *if_not_exists,
5172 }))
5173}
5174
5175pub fn describe_create_connection(
5176 _: &StatementContext,
5177 _: CreateConnectionStatement<Aug>,
5178) -> Result<StatementDesc, PlanError> {
5179 Ok(StatementDesc::new(None))
5180}
5181
5182generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5183
5184pub fn plan_create_connection(
5185 scx: &StatementContext,
5186 mut stmt: CreateConnectionStatement<Aug>,
5187) -> Result<Plan, PlanError> {
5188 let CreateConnectionStatement {
5189 name,
5190 connection_type,
5191 values,
5192 if_not_exists,
5193 with_options,
5194 } = stmt.clone();
5195 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5196 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5197 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5198
5199 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5200 if options.validate.is_some() {
5201 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5202 }
5203 let validate = match options.validate {
5204 Some(val) => val,
5205 None => {
5206 scx.catalog
5207 .system_vars()
5208 .enable_default_connection_validation()
5209 && details.to_connection().validate_by_default()
5210 }
5211 };
5212
5213 let full_name = scx.catalog.resolve_full_name(&name);
5215 let partial_name = PartialItemName::from(full_name.clone());
5216 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5217 return Err(PlanError::ItemAlreadyExists {
5218 name: full_name.to_string(),
5219 item_type: item.item_type(),
5220 });
5221 }
5222
5223 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5226 stmt.values.retain(|v| {
5227 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5228 });
5229 stmt.values.push(ConnectionOption {
5230 name: ConnectionOptionName::PublicKey1,
5231 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5232 });
5233 stmt.values.push(ConnectionOption {
5234 name: ConnectionOptionName::PublicKey2,
5235 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5236 });
5237 }
5238 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5239
5240 let plan = CreateConnectionPlan {
5241 name,
5242 if_not_exists,
5243 connection: crate::plan::Connection {
5244 create_sql,
5245 details,
5246 },
5247 validate,
5248 };
5249 Ok(Plan::CreateConnection(plan))
5250}
5251
5252fn plan_drop_database(
5253 scx: &StatementContext,
5254 if_exists: bool,
5255 name: &UnresolvedDatabaseName,
5256 cascade: bool,
5257) -> Result<Option<DatabaseId>, PlanError> {
5258 Ok(match resolve_database(scx, name, if_exists)? {
5259 Some(database) => {
5260 if !cascade && database.has_schemas() {
5261 sql_bail!(
5262 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5263 name,
5264 );
5265 }
5266 Some(database.id())
5267 }
5268 None => None,
5269 })
5270}
5271
5272pub fn describe_drop_objects(
5273 _: &StatementContext,
5274 _: DropObjectsStatement,
5275) -> Result<StatementDesc, PlanError> {
5276 Ok(StatementDesc::new(None))
5277}
5278
5279pub fn plan_drop_objects(
5280 scx: &mut StatementContext,
5281 DropObjectsStatement {
5282 object_type,
5283 if_exists,
5284 names,
5285 cascade,
5286 }: DropObjectsStatement,
5287) -> Result<Plan, PlanError> {
5288 assert_ne!(
5289 object_type,
5290 mz_sql_parser::ast::ObjectType::Func,
5291 "rejected in parser"
5292 );
5293 let object_type = object_type.into();
5294
5295 let mut referenced_ids = Vec::new();
5296 for name in names {
5297 let id = match &name {
5298 UnresolvedObjectName::Cluster(name) => {
5299 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5300 }
5301 UnresolvedObjectName::ClusterReplica(name) => {
5302 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5303 }
5304 UnresolvedObjectName::Database(name) => {
5305 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5306 }
5307 UnresolvedObjectName::Schema(name) => {
5308 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5309 }
5310 UnresolvedObjectName::Role(name) => {
5311 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5312 }
5313 UnresolvedObjectName::Item(name) => {
5314 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5315 .map(ObjectId::Item)
5316 }
5317 UnresolvedObjectName::NetworkPolicy(name) => {
5318 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5319 }
5320 };
5321 match id {
5322 Some(id) => referenced_ids.push(id),
5323 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5324 name: name.to_ast_string_simple(),
5325 object_type,
5326 }),
5327 }
5328 }
5329 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5330
5331 Ok(Plan::DropObjects(DropObjectsPlan {
5332 referenced_ids,
5333 drop_ids,
5334 object_type,
5335 }))
5336}
5337
5338fn plan_drop_schema(
5339 scx: &StatementContext,
5340 if_exists: bool,
5341 name: &UnresolvedSchemaName,
5342 cascade: bool,
5343) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5344 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5345 Some((database_spec, schema_spec)) => {
5346 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5347 sql_bail!(
5348 "cannot drop schema {name} because it is required by the database system",
5349 );
5350 }
5351 if let SchemaSpecifier::Temporary = schema_spec {
5352 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5353 }
5354 let schema = scx.get_schema(&database_spec, &schema_spec);
5355 if !cascade && schema.has_items() {
5356 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5357 sql_bail!(
5358 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5359 full_schema_name
5360 );
5361 }
5362 Some((database_spec, schema_spec))
5363 }
5364 None => None,
5365 })
5366}
5367
5368fn plan_drop_role(
5369 scx: &StatementContext,
5370 if_exists: bool,
5371 name: &Ident,
5372) -> Result<Option<RoleId>, PlanError> {
5373 match scx.catalog.resolve_role(name.as_str()) {
5374 Ok(role) => {
5375 let id = role.id();
5376 if &id == scx.catalog.active_role_id() {
5377 sql_bail!("current role cannot be dropped");
5378 }
5379 for role in scx.catalog.get_roles() {
5380 for (member_id, grantor_id) in role.membership() {
5381 if &id == grantor_id {
5382 let member_role = scx.catalog.get_role(member_id);
5383 sql_bail!(
5384 "cannot drop role {}: still depended up by membership of role {} in role {}",
5385 name.as_str(),
5386 role.name(),
5387 member_role.name()
5388 );
5389 }
5390 }
5391 }
5392 Ok(Some(role.id()))
5393 }
5394 Err(_) if if_exists => Ok(None),
5395 Err(e) => Err(e.into()),
5396 }
5397}
5398
5399fn plan_drop_cluster(
5400 scx: &StatementContext,
5401 if_exists: bool,
5402 name: &Ident,
5403 cascade: bool,
5404) -> Result<Option<ClusterId>, PlanError> {
5405 Ok(match resolve_cluster(scx, name, if_exists)? {
5406 Some(cluster) => {
5407 if !cascade && !cluster.bound_objects().is_empty() {
5408 return Err(PlanError::DependentObjectsStillExist {
5409 object_type: "cluster".to_string(),
5410 object_name: cluster.name().to_string(),
5411 dependents: Vec::new(),
5412 });
5413 }
5414 Some(cluster.id())
5415 }
5416 None => None,
5417 })
5418}
5419
5420fn plan_drop_network_policy(
5421 scx: &StatementContext,
5422 if_exists: bool,
5423 name: &Ident,
5424) -> Result<Option<NetworkPolicyId>, PlanError> {
5425 match scx.catalog.resolve_network_policy(name.as_str()) {
5426 Ok(policy) => {
5427 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5430 Err(PlanError::NetworkPolicyInUse)
5431 } else {
5432 Ok(Some(policy.id()))
5433 }
5434 }
5435 Err(_) if if_exists => Ok(None),
5436 Err(e) => Err(e.into()),
5437 }
5438}
5439
5440fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5443 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5445 false
5446 } else {
5447 cluster.bound_objects().iter().any(|id| {
5449 let item = scx.catalog.get_item(id);
5450 matches!(
5451 item.item_type(),
5452 CatalogItemType::Sink | CatalogItemType::Source
5453 )
5454 })
5455 }
5456}
5457
5458fn plan_drop_cluster_replica(
5459 scx: &StatementContext,
5460 if_exists: bool,
5461 name: &QualifiedReplica,
5462) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5463 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5464 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5465}
5466
5467fn plan_drop_item(
5469 scx: &StatementContext,
5470 object_type: ObjectType,
5471 if_exists: bool,
5472 name: UnresolvedItemName,
5473 cascade: bool,
5474) -> Result<Option<CatalogItemId>, PlanError> {
5475 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5476 Ok(r) => r,
5477 Err(PlanError::MismatchedObjectType {
5479 name,
5480 is_type: ObjectType::MaterializedView,
5481 expected_type: ObjectType::View,
5482 }) => {
5483 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5484 }
5485 e => e?,
5486 };
5487
5488 Ok(match resolved {
5489 Some(catalog_item) => {
5490 if catalog_item.id().is_system() {
5491 sql_bail!(
5492 "cannot drop {} {} because it is required by the database system",
5493 catalog_item.item_type(),
5494 scx.catalog.minimal_qualification(catalog_item.name()),
5495 );
5496 }
5497
5498 if !cascade {
5499 for id in catalog_item.used_by() {
5500 let dep = scx.catalog.get_item(id);
5501 if dependency_prevents_drop(object_type, dep) {
5502 return Err(PlanError::DependentObjectsStillExist {
5503 object_type: catalog_item.item_type().to_string(),
5504 object_name: scx
5505 .catalog
5506 .minimal_qualification(catalog_item.name())
5507 .to_string(),
5508 dependents: vec![(
5509 dep.item_type().to_string(),
5510 scx.catalog.minimal_qualification(dep.name()).to_string(),
5511 )],
5512 });
5513 }
5514 }
5515 }
5518 Some(catalog_item.id())
5519 }
5520 None => None,
5521 })
5522}
5523
5524fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5526 match object_type {
5527 ObjectType::Type => true,
5528 _ => match dep.item_type() {
5529 CatalogItemType::Func
5530 | CatalogItemType::Table
5531 | CatalogItemType::Source
5532 | CatalogItemType::View
5533 | CatalogItemType::MaterializedView
5534 | CatalogItemType::Sink
5535 | CatalogItemType::Type
5536 | CatalogItemType::Secret
5537 | CatalogItemType::Connection
5538 | CatalogItemType::ContinualTask => true,
5539 CatalogItemType::Index => false,
5540 },
5541 }
5542}
5543
5544pub fn describe_alter_index_options(
5545 _: &StatementContext,
5546 _: AlterIndexStatement<Aug>,
5547) -> Result<StatementDesc, PlanError> {
5548 Ok(StatementDesc::new(None))
5549}
5550
5551pub fn describe_drop_owned(
5552 _: &StatementContext,
5553 _: DropOwnedStatement<Aug>,
5554) -> Result<StatementDesc, PlanError> {
5555 Ok(StatementDesc::new(None))
5556}
5557
5558pub fn plan_drop_owned(
5559 scx: &StatementContext,
5560 drop: DropOwnedStatement<Aug>,
5561) -> Result<Plan, PlanError> {
5562 let cascade = drop.cascade();
5563 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5564 let mut drop_ids = Vec::new();
5565 let mut privilege_revokes = Vec::new();
5566 let mut default_privilege_revokes = Vec::new();
5567
5568 fn update_privilege_revokes(
5569 object_id: SystemObjectId,
5570 privileges: &PrivilegeMap,
5571 role_ids: &BTreeSet<RoleId>,
5572 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5573 ) {
5574 privilege_revokes.extend(iter::zip(
5575 iter::repeat(object_id),
5576 privileges
5577 .all_values()
5578 .filter(|privilege| role_ids.contains(&privilege.grantee))
5579 .cloned(),
5580 ));
5581 }
5582
5583 for replica in scx.catalog.get_cluster_replicas() {
5585 if role_ids.contains(&replica.owner_id()) {
5586 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5587 }
5588 }
5589
5590 for cluster in scx.catalog.get_clusters() {
5592 if role_ids.contains(&cluster.owner_id()) {
5593 if !cascade {
5595 let non_owned_bound_objects: Vec<_> = cluster
5596 .bound_objects()
5597 .into_iter()
5598 .map(|item_id| scx.catalog.get_item(item_id))
5599 .filter(|item| !role_ids.contains(&item.owner_id()))
5600 .collect();
5601 if !non_owned_bound_objects.is_empty() {
5602 let names: Vec<_> = non_owned_bound_objects
5603 .into_iter()
5604 .map(|item| {
5605 (
5606 item.item_type().to_string(),
5607 scx.catalog.resolve_full_name(item.name()).to_string(),
5608 )
5609 })
5610 .collect();
5611 return Err(PlanError::DependentObjectsStillExist {
5612 object_type: "cluster".to_string(),
5613 object_name: cluster.name().to_string(),
5614 dependents: names,
5615 });
5616 }
5617 }
5618 drop_ids.push(cluster.id().into());
5619 }
5620 update_privilege_revokes(
5621 SystemObjectId::Object(cluster.id().into()),
5622 cluster.privileges(),
5623 &role_ids,
5624 &mut privilege_revokes,
5625 );
5626 }
5627
5628 for item in scx.catalog.get_items() {
5630 if role_ids.contains(&item.owner_id()) {
5631 if !cascade {
5632 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5634 let non_owned_dependencies: Vec<_> = used_by
5635 .into_iter()
5636 .map(|item_id| scx.catalog.get_item(item_id))
5637 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5638 .filter(|item| !role_ids.contains(&item.owner_id()))
5639 .collect();
5640 if !non_owned_dependencies.is_empty() {
5641 let names: Vec<_> = non_owned_dependencies
5642 .into_iter()
5643 .map(|item| {
5644 let item_typ = item.item_type().to_string();
5645 let item_name =
5646 scx.catalog.resolve_full_name(item.name()).to_string();
5647 (item_typ, item_name)
5648 })
5649 .collect();
5650 Err(PlanError::DependentObjectsStillExist {
5651 object_type: item.item_type().to_string(),
5652 object_name: scx
5653 .catalog
5654 .resolve_full_name(item.name())
5655 .to_string()
5656 .to_string(),
5657 dependents: names,
5658 })
5659 } else {
5660 Ok(())
5661 }
5662 };
5663
5664 if let Some(id) = item.progress_id() {
5667 let progress_item = scx.catalog.get_item(&id);
5668 check_if_dependents_exist(progress_item.used_by())?;
5669 }
5670 check_if_dependents_exist(item.used_by())?;
5671 }
5672 drop_ids.push(item.id().into());
5673 }
5674 update_privilege_revokes(
5675 SystemObjectId::Object(item.id().into()),
5676 item.privileges(),
5677 &role_ids,
5678 &mut privilege_revokes,
5679 );
5680 }
5681
5682 for schema in scx.catalog.get_schemas() {
5684 if !schema.id().is_temporary() {
5685 if role_ids.contains(&schema.owner_id()) {
5686 if !cascade {
5687 let non_owned_dependencies: Vec<_> = schema
5688 .item_ids()
5689 .map(|item_id| scx.catalog.get_item(&item_id))
5690 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5691 .filter(|item| !role_ids.contains(&item.owner_id()))
5692 .collect();
5693 if !non_owned_dependencies.is_empty() {
5694 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5695 sql_bail!(
5696 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5697 full_schema_name.to_string().quoted()
5698 );
5699 }
5700 }
5701 drop_ids.push((*schema.database(), *schema.id()).into())
5702 }
5703 update_privilege_revokes(
5704 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5705 schema.privileges(),
5706 &role_ids,
5707 &mut privilege_revokes,
5708 );
5709 }
5710 }
5711
5712 for database in scx.catalog.get_databases() {
5714 if role_ids.contains(&database.owner_id()) {
5715 if !cascade {
5716 let non_owned_schemas: Vec<_> = database
5717 .schemas()
5718 .into_iter()
5719 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5720 .collect();
5721 if !non_owned_schemas.is_empty() {
5722 sql_bail!(
5723 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5724 database.name().quoted(),
5725 );
5726 }
5727 }
5728 drop_ids.push(database.id().into());
5729 }
5730 update_privilege_revokes(
5731 SystemObjectId::Object(database.id().into()),
5732 database.privileges(),
5733 &role_ids,
5734 &mut privilege_revokes,
5735 );
5736 }
5737
5738 update_privilege_revokes(
5740 SystemObjectId::System,
5741 scx.catalog.get_system_privileges(),
5742 &role_ids,
5743 &mut privilege_revokes,
5744 );
5745
5746 for (default_privilege_object, default_privilege_acl_items) in
5747 scx.catalog.get_default_privileges()
5748 {
5749 for default_privilege_acl_item in default_privilege_acl_items {
5750 if role_ids.contains(&default_privilege_object.role_id)
5751 || role_ids.contains(&default_privilege_acl_item.grantee)
5752 {
5753 default_privilege_revokes.push((
5754 default_privilege_object.clone(),
5755 default_privilege_acl_item.clone(),
5756 ));
5757 }
5758 }
5759 }
5760
5761 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5762
5763 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5764 if !system_ids.is_empty() {
5765 let mut owners = system_ids
5766 .into_iter()
5767 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5768 .collect::<BTreeSet<_>>()
5769 .into_iter()
5770 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5771 sql_bail!(
5772 "cannot drop objects owned by role {} because they are required by the database system",
5773 owners.join(", "),
5774 );
5775 }
5776
5777 Ok(Plan::DropOwned(DropOwnedPlan {
5778 role_ids: role_ids.into_iter().collect(),
5779 drop_ids,
5780 privilege_revokes,
5781 default_privilege_revokes,
5782 }))
5783}
5784
5785fn plan_retain_history_option(
5786 scx: &StatementContext,
5787 retain_history: Option<OptionalDuration>,
5788) -> Result<Option<CompactionWindow>, PlanError> {
5789 if let Some(OptionalDuration(lcw)) = retain_history {
5790 Ok(Some(plan_retain_history(scx, lcw)?))
5791 } else {
5792 Ok(None)
5793 }
5794}
5795
5796fn plan_retain_history(
5802 scx: &StatementContext,
5803 lcw: Option<Duration>,
5804) -> Result<CompactionWindow, PlanError> {
5805 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5806 match lcw {
5807 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5812 option_name: "RETAIN HISTORY".to_string(),
5813 err: Box::new(PlanError::Unstructured(
5814 "internal error: unexpectedly zero".to_string(),
5815 )),
5816 }),
5817 Some(duration) => {
5818 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5821 && scx
5822 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5823 .is_err()
5824 {
5825 return Err(PlanError::RetainHistoryLow {
5826 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5827 });
5828 }
5829 Ok(duration.try_into()?)
5830 }
5831 None => {
5834 if scx
5835 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5836 .is_err()
5837 {
5838 Err(PlanError::RetainHistoryRequired)
5839 } else {
5840 Ok(CompactionWindow::DisableCompaction)
5841 }
5842 }
5843 }
5844}
5845
5846generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5847
5848fn plan_index_options(
5849 scx: &StatementContext,
5850 with_opts: Vec<IndexOption<Aug>>,
5851) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5852 if !with_opts.is_empty() {
5853 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5855 }
5856
5857 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5858
5859 let mut out = Vec::with_capacity(1);
5860 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5861 out.push(crate::plan::IndexOption::RetainHistory(cw));
5862 }
5863 Ok(out)
5864}
5865
5866generate_extracted_config!(
5867 TableOption,
5868 (PartitionBy, Vec<Ident>),
5869 (RetainHistory, OptionalDuration),
5870 (RedactedTest, String)
5871);
5872
5873fn plan_table_options(
5874 scx: &StatementContext,
5875 desc: &RelationDesc,
5876 with_opts: Vec<TableOption<Aug>>,
5877) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5878 let TableOptionExtracted {
5879 partition_by,
5880 retain_history,
5881 redacted_test,
5882 ..
5883 }: TableOptionExtracted = with_opts.try_into()?;
5884
5885 if let Some(partition_by) = partition_by {
5886 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5887 check_partition_by(desc, partition_by)?;
5888 }
5889
5890 if redacted_test.is_some() {
5891 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5892 }
5893
5894 let mut out = Vec::with_capacity(1);
5895 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5896 out.push(crate::plan::TableOption::RetainHistory(cw));
5897 }
5898 Ok(out)
5899}
5900
5901pub fn plan_alter_index_options(
5902 scx: &mut StatementContext,
5903 AlterIndexStatement {
5904 index_name,
5905 if_exists,
5906 action,
5907 }: AlterIndexStatement<Aug>,
5908) -> Result<Plan, PlanError> {
5909 let object_type = ObjectType::Index;
5910 match action {
5911 AlterIndexAction::ResetOptions(options) => {
5912 let mut options = options.into_iter();
5913 if let Some(opt) = options.next() {
5914 match opt {
5915 IndexOptionName::RetainHistory => {
5916 if options.next().is_some() {
5917 sql_bail!("RETAIN HISTORY must be only option");
5918 }
5919 return alter_retain_history(
5920 scx,
5921 object_type,
5922 if_exists,
5923 UnresolvedObjectName::Item(index_name),
5924 None,
5925 );
5926 }
5927 }
5928 }
5929 sql_bail!("expected option");
5930 }
5931 AlterIndexAction::SetOptions(options) => {
5932 let mut options = options.into_iter();
5933 if let Some(opt) = options.next() {
5934 match opt.name {
5935 IndexOptionName::RetainHistory => {
5936 if options.next().is_some() {
5937 sql_bail!("RETAIN HISTORY must be only option");
5938 }
5939 return alter_retain_history(
5940 scx,
5941 object_type,
5942 if_exists,
5943 UnresolvedObjectName::Item(index_name),
5944 opt.value,
5945 );
5946 }
5947 }
5948 }
5949 sql_bail!("expected option");
5950 }
5951 }
5952}
5953
5954pub fn describe_alter_cluster_set_options(
5955 _: &StatementContext,
5956 _: AlterClusterStatement<Aug>,
5957) -> Result<StatementDesc, PlanError> {
5958 Ok(StatementDesc::new(None))
5959}
5960
5961pub fn plan_alter_cluster(
5962 scx: &mut StatementContext,
5963 AlterClusterStatement {
5964 name,
5965 action,
5966 if_exists,
5967 }: AlterClusterStatement<Aug>,
5968) -> Result<Plan, PlanError> {
5969 let cluster = match resolve_cluster(scx, &name, if_exists)? {
5970 Some(entry) => entry,
5971 None => {
5972 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5973 name: name.to_ast_string_simple(),
5974 object_type: ObjectType::Cluster,
5975 });
5976
5977 return Ok(Plan::AlterNoop(AlterNoopPlan {
5978 object_type: ObjectType::Cluster,
5979 }));
5980 }
5981 };
5982
5983 let mut options: PlanClusterOption = Default::default();
5984 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5985
5986 match action {
5987 AlterClusterAction::SetOptions {
5988 options: set_options,
5989 with_options,
5990 } => {
5991 let ClusterOptionExtracted {
5992 availability_zones,
5993 introspection_debugging,
5994 introspection_interval,
5995 managed,
5996 replicas: replica_defs,
5997 replication_factor,
5998 seen: _,
5999 size,
6000 disk,
6001 schedule,
6002 workload_class,
6003 }: ClusterOptionExtracted = set_options.try_into()?;
6004
6005 if !scx.catalog.active_role_id().is_system() {
6006 if workload_class.is_some() {
6007 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6008 }
6009 }
6010
6011 match managed.unwrap_or_else(|| cluster.is_managed()) {
6012 true => {
6013 let alter_strategy_extracted =
6014 ClusterAlterOptionExtracted::try_from(with_options)?;
6015 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
6016
6017 match alter_strategy {
6018 AlterClusterPlanStrategy::None => {}
6019 _ => {
6020 scx.require_feature_flag(
6021 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
6022 )?;
6023 }
6024 }
6025
6026 if replica_defs.is_some() {
6027 sql_bail!("REPLICAS not supported for managed clusters");
6028 }
6029 if schedule.is_some()
6030 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6031 {
6032 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6033 }
6034
6035 if let Some(replication_factor) = replication_factor {
6036 if schedule.is_some()
6037 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6038 {
6039 sql_bail!(
6040 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6041 );
6042 }
6043 if let Some(current_schedule) = cluster.schedule() {
6044 if !matches!(current_schedule, ClusterSchedule::Manual) {
6045 sql_bail!(
6046 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6047 );
6048 }
6049 }
6050
6051 let internal_replica_count =
6052 cluster.replicas().iter().filter(|r| r.internal()).count();
6053 let hypothetical_replica_count =
6054 internal_replica_count + usize::cast_from(replication_factor);
6055
6056 if contains_single_replica_objects(scx, cluster)
6059 && hypothetical_replica_count > 1
6060 {
6061 return Err(PlanError::CreateReplicaFailStorageObjects {
6062 current_replica_count: cluster.replica_ids().iter().count(),
6063 internal_replica_count,
6064 hypothetical_replica_count,
6065 });
6066 }
6067 } else if alter_strategy.is_some() {
6068 let internal_replica_count =
6072 cluster.replicas().iter().filter(|r| r.internal()).count();
6073 let hypothetical_replica_count = internal_replica_count * 2;
6074 if contains_single_replica_objects(scx, cluster) {
6075 return Err(PlanError::CreateReplicaFailStorageObjects {
6076 current_replica_count: cluster.replica_ids().iter().count(),
6077 internal_replica_count,
6078 hypothetical_replica_count,
6079 });
6080 }
6081 }
6082 }
6083 false => {
6084 if !alter_strategy.is_none() {
6085 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6086 }
6087 if availability_zones.is_some() {
6088 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6089 }
6090 if replication_factor.is_some() {
6091 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6092 }
6093 if introspection_debugging.is_some() {
6094 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6095 }
6096 if introspection_interval.is_some() {
6097 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6098 }
6099 if size.is_some() {
6100 sql_bail!("SIZE not supported for unmanaged clusters");
6101 }
6102 if disk.is_some() {
6103 sql_bail!("DISK not supported for unmanaged clusters");
6104 }
6105 if schedule.is_some()
6106 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6107 {
6108 sql_bail!(
6109 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6110 );
6111 }
6112 if let Some(current_schedule) = cluster.schedule() {
6113 if !matches!(current_schedule, ClusterSchedule::Manual)
6114 && schedule.is_none()
6115 {
6116 sql_bail!(
6117 "when switching a cluster to unmanaged, if the managed \
6118 cluster's SCHEDULE is anything other than MANUAL, you have to \
6119 explicitly set the SCHEDULE to MANUAL"
6120 );
6121 }
6122 }
6123 }
6124 }
6125
6126 let mut replicas = vec![];
6127 for ReplicaDefinition { name, options } in
6128 replica_defs.into_iter().flat_map(Vec::into_iter)
6129 {
6130 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6131 }
6132
6133 if let Some(managed) = managed {
6134 options.managed = AlterOptionParameter::Set(managed);
6135 }
6136 if let Some(replication_factor) = replication_factor {
6137 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6138 }
6139 if let Some(size) = &size {
6140 if scx.catalog.is_cluster_size_cc(size) {
6148 if disk.is_some() {
6149 sql_bail!(
6150 "DISK option not supported for modern cluster sizes because disk is always enabled"
6151 );
6152 } else {
6153 options.disk = AlterOptionParameter::Set(true);
6154 }
6155 }
6156 options.size = AlterOptionParameter::Set(size.clone());
6157 }
6158 if let Some(availability_zones) = availability_zones {
6159 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6160 }
6161 if let Some(introspection_debugging) = introspection_debugging {
6162 options.introspection_debugging =
6163 AlterOptionParameter::Set(introspection_debugging);
6164 }
6165 if let Some(introspection_interval) = introspection_interval {
6166 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6167 }
6168 if let Some(disk) = disk {
6169 let size = size.as_deref().unwrap_or_else(|| {
6177 cluster.managed_size().expect("cluster known to be managed")
6178 });
6179 if scx.catalog.is_cluster_size_cc(size) {
6180 sql_bail!(
6181 "DISK option not supported for modern cluster sizes because disk is always enabled"
6182 );
6183 }
6184
6185 if disk {
6186 scx.require_feature_flag(&vars::ENABLE_DISK_CLUSTER_REPLICAS)?;
6187 }
6188 options.disk = AlterOptionParameter::Set(disk);
6189 }
6190 if !replicas.is_empty() {
6191 options.replicas = AlterOptionParameter::Set(replicas);
6192 }
6193 if let Some(schedule) = schedule {
6194 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6195 }
6196 if let Some(workload_class) = workload_class {
6197 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6198 }
6199 }
6200 AlterClusterAction::ResetOptions(reset_options) => {
6201 use AlterOptionParameter::Reset;
6202 use ClusterOptionName::*;
6203
6204 if !scx.catalog.active_role_id().is_system() {
6205 if reset_options.contains(&WorkloadClass) {
6206 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6207 }
6208 }
6209
6210 for option in reset_options {
6211 match option {
6212 AvailabilityZones => options.availability_zones = Reset,
6213 Disk => options.disk = Reset,
6214 IntrospectionInterval => options.introspection_interval = Reset,
6215 IntrospectionDebugging => options.introspection_debugging = Reset,
6216 Managed => options.managed = Reset,
6217 Replicas => options.replicas = Reset,
6218 ReplicationFactor => options.replication_factor = Reset,
6219 Size => options.size = Reset,
6220 Schedule => options.schedule = Reset,
6221 WorkloadClass => options.workload_class = Reset,
6222 }
6223 }
6224 }
6225 }
6226 Ok(Plan::AlterCluster(AlterClusterPlan {
6227 id: cluster.id(),
6228 name: cluster.name().to_string(),
6229 options,
6230 strategy: alter_strategy,
6231 }))
6232}
6233
6234pub fn describe_alter_set_cluster(
6235 _: &StatementContext,
6236 _: AlterSetClusterStatement<Aug>,
6237) -> Result<StatementDesc, PlanError> {
6238 Ok(StatementDesc::new(None))
6239}
6240
6241pub fn plan_alter_item_set_cluster(
6242 scx: &StatementContext,
6243 AlterSetClusterStatement {
6244 if_exists,
6245 set_cluster: in_cluster_name,
6246 name,
6247 object_type,
6248 }: AlterSetClusterStatement<Aug>,
6249) -> Result<Plan, PlanError> {
6250 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6251
6252 let object_type = object_type.into();
6253
6254 match object_type {
6256 ObjectType::MaterializedView => {}
6257 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6258 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6259 }
6260 _ => {
6261 bail_never_supported!(
6262 format!("ALTER {object_type} SET CLUSTER"),
6263 "sql/alter-set-cluster/",
6264 format!("{object_type} has no associated cluster")
6265 )
6266 }
6267 }
6268
6269 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6270
6271 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6272 Some(entry) => {
6273 let current_cluster = entry.cluster_id();
6274 let Some(current_cluster) = current_cluster else {
6275 sql_bail!("No cluster associated with {name}");
6276 };
6277
6278 if current_cluster == in_cluster.id() {
6279 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6280 } else {
6281 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6282 id: entry.id(),
6283 set_cluster: in_cluster.id(),
6284 }))
6285 }
6286 }
6287 None => {
6288 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6289 name: name.to_ast_string_simple(),
6290 object_type,
6291 });
6292
6293 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6294 }
6295 }
6296}
6297
6298pub fn describe_alter_object_rename(
6299 _: &StatementContext,
6300 _: AlterObjectRenameStatement,
6301) -> Result<StatementDesc, PlanError> {
6302 Ok(StatementDesc::new(None))
6303}
6304
6305pub fn plan_alter_object_rename(
6306 scx: &mut StatementContext,
6307 AlterObjectRenameStatement {
6308 name,
6309 object_type,
6310 to_item_name,
6311 if_exists,
6312 }: AlterObjectRenameStatement,
6313) -> Result<Plan, PlanError> {
6314 let object_type = object_type.into();
6315 match (object_type, name) {
6316 (
6317 ObjectType::View
6318 | ObjectType::MaterializedView
6319 | ObjectType::Table
6320 | ObjectType::Source
6321 | ObjectType::Index
6322 | ObjectType::Sink
6323 | ObjectType::Secret
6324 | ObjectType::Connection,
6325 UnresolvedObjectName::Item(name),
6326 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6327 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6328 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6329 }
6330 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6331 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6332 }
6333 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6334 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6335 }
6336 (object_type, name) => {
6337 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6338 }
6339 }
6340}
6341
6342pub fn plan_alter_schema_rename(
6343 scx: &mut StatementContext,
6344 name: UnresolvedSchemaName,
6345 to_schema_name: Ident,
6346 if_exists: bool,
6347) -> Result<Plan, PlanError> {
6348 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6349 let object_type = ObjectType::Schema;
6350 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6351 name: name.to_ast_string_simple(),
6352 object_type,
6353 });
6354 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6355 };
6356
6357 if scx
6359 .resolve_schema_in_database(&db_spec, &to_schema_name)
6360 .is_ok()
6361 {
6362 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6363 to_schema_name.clone().into_string(),
6364 )));
6365 }
6366
6367 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6369 if schema.id().is_system() {
6370 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6371 }
6372
6373 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6374 cur_schema_spec: (db_spec, schema_spec),
6375 new_schema_name: to_schema_name.into_string(),
6376 }))
6377}
6378
6379pub fn plan_alter_schema_swap<F>(
6380 scx: &mut StatementContext,
6381 name_a: UnresolvedSchemaName,
6382 name_b: Ident,
6383 gen_temp_suffix: F,
6384) -> Result<Plan, PlanError>
6385where
6386 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6387{
6388 let schema_a = scx.resolve_schema(name_a.clone())?;
6389
6390 let db_spec = schema_a.database().clone();
6391 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6392 sql_bail!("cannot swap schemas that are in the ambient database");
6393 };
6394 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6395
6396 if schema_a.id().is_system() || schema_b.id().is_system() {
6398 bail_never_supported!("swapping a system schema".to_string())
6399 }
6400
6401 let check = |temp_suffix: &str| {
6405 let mut temp_name = ident!("mz_schema_swap_");
6406 temp_name.append_lossy(temp_suffix);
6407 scx.resolve_schema_in_database(&db_spec, &temp_name)
6408 .is_err()
6409 };
6410 let temp_suffix = gen_temp_suffix(&check)?;
6411 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6412
6413 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6414 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6415 schema_a_name: schema_a.name().schema.to_string(),
6416 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6417 schema_b_name: schema_b.name().schema.to_string(),
6418 name_temp,
6419 }))
6420}
6421
6422pub fn plan_alter_item_rename(
6423 scx: &mut StatementContext,
6424 object_type: ObjectType,
6425 name: UnresolvedItemName,
6426 to_item_name: Ident,
6427 if_exists: bool,
6428) -> Result<Plan, PlanError> {
6429 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6430 Ok(r) => r,
6431 Err(PlanError::MismatchedObjectType {
6433 name,
6434 is_type: ObjectType::MaterializedView,
6435 expected_type: ObjectType::View,
6436 }) => {
6437 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6438 }
6439 e => e?,
6440 };
6441
6442 match resolved {
6443 Some(entry) => {
6444 let full_name = scx.catalog.resolve_full_name(entry.name());
6445 let item_type = entry.item_type();
6446
6447 let proposed_name = QualifiedItemName {
6448 qualifiers: entry.name().qualifiers.clone(),
6449 item: to_item_name.clone().into_string(),
6450 };
6451
6452 let conflicting_type_exists;
6456 let conflicting_item_exists;
6457 if item_type == CatalogItemType::Type {
6458 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6459 conflicting_item_exists = scx
6460 .catalog
6461 .get_item_by_name(&proposed_name)
6462 .map(|item| item.item_type().conflicts_with_type())
6463 .unwrap_or(false);
6464 } else {
6465 conflicting_type_exists = item_type.conflicts_with_type()
6466 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6467 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6468 };
6469 if conflicting_type_exists || conflicting_item_exists {
6470 sql_bail!("catalog item '{}' already exists", to_item_name);
6471 }
6472
6473 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6474 id: entry.id(),
6475 current_full_name: full_name,
6476 to_name: normalize::ident(to_item_name),
6477 object_type,
6478 }))
6479 }
6480 None => {
6481 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6482 name: name.to_ast_string_simple(),
6483 object_type,
6484 });
6485
6486 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6487 }
6488 }
6489}
6490
6491pub fn plan_alter_cluster_rename(
6492 scx: &mut StatementContext,
6493 object_type: ObjectType,
6494 name: Ident,
6495 to_name: Ident,
6496 if_exists: bool,
6497) -> Result<Plan, PlanError> {
6498 match resolve_cluster(scx, &name, if_exists)? {
6499 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6500 id: entry.id(),
6501 name: entry.name().to_string(),
6502 to_name: ident(to_name),
6503 })),
6504 None => {
6505 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6506 name: name.to_ast_string_simple(),
6507 object_type,
6508 });
6509
6510 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6511 }
6512 }
6513}
6514
6515pub fn plan_alter_cluster_swap<F>(
6516 scx: &mut StatementContext,
6517 name_a: Ident,
6518 name_b: Ident,
6519 gen_temp_suffix: F,
6520) -> Result<Plan, PlanError>
6521where
6522 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6523{
6524 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6525 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6526
6527 let check = |temp_suffix: &str| {
6528 let mut temp_name = ident!("mz_schema_swap_");
6529 temp_name.append_lossy(temp_suffix);
6530 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6531 Err(CatalogError::UnknownCluster(_)) => true,
6533 Ok(_) | Err(_) => false,
6535 }
6536 };
6537 let temp_suffix = gen_temp_suffix(&check)?;
6538 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6539
6540 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6541 id_a: cluster_a.id(),
6542 id_b: cluster_b.id(),
6543 name_a: name_a.into_string(),
6544 name_b: name_b.into_string(),
6545 name_temp,
6546 }))
6547}
6548
6549pub fn plan_alter_cluster_replica_rename(
6550 scx: &mut StatementContext,
6551 object_type: ObjectType,
6552 name: QualifiedReplica,
6553 to_item_name: Ident,
6554 if_exists: bool,
6555) -> Result<Plan, PlanError> {
6556 match resolve_cluster_replica(scx, &name, if_exists)? {
6557 Some((cluster, replica)) => {
6558 ensure_cluster_is_not_managed(scx, cluster.id())?;
6559 Ok(Plan::AlterClusterReplicaRename(
6560 AlterClusterReplicaRenamePlan {
6561 cluster_id: cluster.id(),
6562 replica_id: replica,
6563 name: QualifiedReplica {
6564 cluster: Ident::new(cluster.name())?,
6565 replica: name.replica,
6566 },
6567 to_name: normalize::ident(to_item_name),
6568 },
6569 ))
6570 }
6571 None => {
6572 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6573 name: name.to_ast_string_simple(),
6574 object_type,
6575 });
6576
6577 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6578 }
6579 }
6580}
6581
6582pub fn describe_alter_object_swap(
6583 _: &StatementContext,
6584 _: AlterObjectSwapStatement,
6585) -> Result<StatementDesc, PlanError> {
6586 Ok(StatementDesc::new(None))
6587}
6588
6589pub fn plan_alter_object_swap(
6590 scx: &mut StatementContext,
6591 stmt: AlterObjectSwapStatement,
6592) -> Result<Plan, PlanError> {
6593 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6594
6595 let AlterObjectSwapStatement {
6596 object_type,
6597 name_a,
6598 name_b,
6599 } = stmt;
6600 let object_type = object_type.into();
6601
6602 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6604 let mut attempts = 0;
6605 let name_temp = loop {
6606 attempts += 1;
6607 if attempts > 10 {
6608 tracing::warn!("Unable to generate temp id for swapping");
6609 sql_bail!("unable to swap!");
6610 }
6611
6612 let short_id = mz_ore::id_gen::temp_id();
6614 if check_fn(&short_id) {
6615 break short_id;
6616 }
6617 };
6618
6619 Ok(name_temp)
6620 };
6621
6622 match (object_type, name_a, name_b) {
6623 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6624 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6625 }
6626 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6627 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6628 }
6629 (object_type, _, _) => Err(PlanError::Unsupported {
6630 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6631 discussion_no: None,
6632 }),
6633 }
6634}
6635
6636pub fn describe_alter_retain_history(
6637 _: &StatementContext,
6638 _: AlterRetainHistoryStatement<Aug>,
6639) -> Result<StatementDesc, PlanError> {
6640 Ok(StatementDesc::new(None))
6641}
6642
6643pub fn plan_alter_retain_history(
6644 scx: &StatementContext,
6645 AlterRetainHistoryStatement {
6646 object_type,
6647 if_exists,
6648 name,
6649 history,
6650 }: AlterRetainHistoryStatement<Aug>,
6651) -> Result<Plan, PlanError> {
6652 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6653}
6654
6655fn alter_retain_history(
6656 scx: &StatementContext,
6657 object_type: ObjectType,
6658 if_exists: bool,
6659 name: UnresolvedObjectName,
6660 history: Option<WithOptionValue<Aug>>,
6661) -> Result<Plan, PlanError> {
6662 let name = match (object_type, name) {
6663 (
6664 ObjectType::View
6666 | ObjectType::MaterializedView
6667 | ObjectType::Table
6668 | ObjectType::Source
6669 | ObjectType::Index,
6670 UnresolvedObjectName::Item(name),
6671 ) => name,
6672 (object_type, _) => {
6673 sql_bail!("{object_type} does not support RETAIN HISTORY")
6674 }
6675 };
6676 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6677 Some(entry) => {
6678 let full_name = scx.catalog.resolve_full_name(entry.name());
6679 let item_type = entry.item_type();
6680
6681 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6683 return Err(PlanError::AlterViewOnMaterializedView(
6684 full_name.to_string(),
6685 ));
6686 } else if object_type == ObjectType::View {
6687 sql_bail!("{object_type} does not support RETAIN HISTORY")
6688 } else if object_type != item_type {
6689 sql_bail!(
6690 "\"{}\" is a {} not a {}",
6691 full_name,
6692 entry.item_type(),
6693 format!("{object_type}").to_lowercase()
6694 )
6695 }
6696
6697 let (value, lcw) = match &history {
6699 Some(WithOptionValue::RetainHistoryFor(value)) => {
6700 let window = OptionalDuration::try_from_value(value.clone())?;
6701 (Some(value.clone()), window.0)
6702 }
6703 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6705 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6706 };
6707 let window = plan_retain_history(scx, lcw)?;
6708
6709 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6710 id: entry.id(),
6711 value,
6712 window,
6713 object_type,
6714 }))
6715 }
6716 None => {
6717 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6718 name: name.to_ast_string_simple(),
6719 object_type,
6720 });
6721
6722 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6723 }
6724 }
6725}
6726
6727pub fn describe_alter_secret_options(
6728 _: &StatementContext,
6729 _: AlterSecretStatement<Aug>,
6730) -> Result<StatementDesc, PlanError> {
6731 Ok(StatementDesc::new(None))
6732}
6733
6734pub fn plan_alter_secret(
6735 scx: &mut StatementContext,
6736 stmt: AlterSecretStatement<Aug>,
6737) -> Result<Plan, PlanError> {
6738 let AlterSecretStatement {
6739 name,
6740 if_exists,
6741 value,
6742 } = stmt;
6743 let object_type = ObjectType::Secret;
6744 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6745 Some(entry) => entry.id(),
6746 None => {
6747 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6748 name: name.to_string(),
6749 object_type,
6750 });
6751
6752 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6753 }
6754 };
6755
6756 let secret_as = query::plan_secret_as(scx, value)?;
6757
6758 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6759}
6760
6761pub fn describe_alter_connection(
6762 _: &StatementContext,
6763 _: AlterConnectionStatement<Aug>,
6764) -> Result<StatementDesc, PlanError> {
6765 Ok(StatementDesc::new(None))
6766}
6767
6768generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6769
6770pub fn plan_alter_connection(
6771 scx: &StatementContext,
6772 stmt: AlterConnectionStatement<Aug>,
6773) -> Result<Plan, PlanError> {
6774 let AlterConnectionStatement {
6775 name,
6776 if_exists,
6777 actions,
6778 with_options,
6779 } = stmt;
6780 let conn_name = normalize::unresolved_item_name(name)?;
6781 let entry = match scx.catalog.resolve_item(&conn_name) {
6782 Ok(entry) => entry,
6783 Err(_) if if_exists => {
6784 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6785 name: conn_name.to_string(),
6786 object_type: ObjectType::Sink,
6787 });
6788
6789 return Ok(Plan::AlterNoop(AlterNoopPlan {
6790 object_type: ObjectType::Connection,
6791 }));
6792 }
6793 Err(e) => return Err(e.into()),
6794 };
6795
6796 let connection = entry.connection()?;
6797
6798 if actions
6799 .iter()
6800 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6801 {
6802 if actions.len() > 1 {
6803 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6804 }
6805
6806 if !with_options.is_empty() {
6807 sql_bail!(
6808 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6809 with_options
6810 .iter()
6811 .map(|o| o.to_ast_string_simple())
6812 .join(", ")
6813 );
6814 }
6815
6816 if !matches!(connection, Connection::Ssh(_)) {
6817 sql_bail!(
6818 "{} is not an SSH connection",
6819 scx.catalog.resolve_full_name(entry.name())
6820 )
6821 }
6822
6823 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6824 id: entry.id(),
6825 action: crate::plan::AlterConnectionAction::RotateKeys,
6826 }));
6827 }
6828
6829 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6830 if options.validate.is_some() {
6831 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6832 }
6833
6834 let validate = match options.validate {
6835 Some(val) => val,
6836 None => {
6837 scx.catalog
6838 .system_vars()
6839 .enable_default_connection_validation()
6840 && connection.validate_by_default()
6841 }
6842 };
6843
6844 let connection_type = match connection {
6845 Connection::Aws(_) => CreateConnectionType::Aws,
6846 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6847 Connection::Kafka(_) => CreateConnectionType::Kafka,
6848 Connection::Csr(_) => CreateConnectionType::Csr,
6849 Connection::Postgres(_) => CreateConnectionType::Postgres,
6850 Connection::Ssh(_) => CreateConnectionType::Ssh,
6851 Connection::MySql(_) => CreateConnectionType::MySql,
6852 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6853 };
6854
6855 let specified_options: BTreeSet<_> = actions
6857 .iter()
6858 .map(|action: &AlterConnectionAction<Aug>| match action {
6859 AlterConnectionAction::SetOption(option) => option.name.clone(),
6860 AlterConnectionAction::DropOption(name) => name.clone(),
6861 AlterConnectionAction::RotateKeys => unreachable!(),
6862 })
6863 .collect();
6864
6865 for invalid in INALTERABLE_OPTIONS {
6866 if specified_options.contains(invalid) {
6867 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6868 }
6869 }
6870
6871 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6872
6873 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6875 actions.into_iter().partition_map(|action| match action {
6876 AlterConnectionAction::SetOption(option) => Either::Left(option),
6877 AlterConnectionAction::DropOption(name) => Either::Right(name),
6878 AlterConnectionAction::RotateKeys => unreachable!(),
6879 });
6880
6881 let set_options: BTreeMap<_, _> = set_options_vec
6882 .clone()
6883 .into_iter()
6884 .map(|option| (option.name, option.value))
6885 .collect();
6886
6887 let connection_options_extracted =
6891 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6892
6893 let duplicates: Vec<_> = connection_options_extracted
6894 .seen
6895 .intersection(&drop_options)
6896 .collect();
6897
6898 if !duplicates.is_empty() {
6899 sql_bail!(
6900 "cannot both SET and DROP/RESET options {}",
6901 duplicates
6902 .iter()
6903 .map(|option| option.to_string())
6904 .join(", ")
6905 )
6906 }
6907
6908 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6909 let set_options_count = mutually_exclusive_options
6910 .iter()
6911 .filter(|o| set_options.contains_key(o))
6912 .count();
6913 let drop_options_count = mutually_exclusive_options
6914 .iter()
6915 .filter(|o| drop_options.contains(o))
6916 .count();
6917
6918 if set_options_count > 0 && drop_options_count > 0 {
6920 sql_bail!(
6921 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6922 connection_type,
6923 mutually_exclusive_options
6924 .iter()
6925 .map(|option| option.to_string())
6926 .join(", ")
6927 )
6928 }
6929
6930 if set_options_count > 0 || drop_options_count > 0 {
6935 drop_options.extend(mutually_exclusive_options.iter().cloned());
6936 }
6937
6938 }
6941
6942 Ok(Plan::AlterConnection(AlterConnectionPlan {
6943 id: entry.id(),
6944 action: crate::plan::AlterConnectionAction::AlterOptions {
6945 set_options,
6946 drop_options,
6947 validate,
6948 },
6949 }))
6950}
6951
6952pub fn describe_alter_sink(
6953 _: &StatementContext,
6954 _: AlterSinkStatement<Aug>,
6955) -> Result<StatementDesc, PlanError> {
6956 Ok(StatementDesc::new(None))
6957}
6958
6959pub fn plan_alter_sink(
6960 scx: &mut StatementContext,
6961 stmt: AlterSinkStatement<Aug>,
6962) -> Result<Plan, PlanError> {
6963 let AlterSinkStatement {
6964 sink_name,
6965 if_exists,
6966 action,
6967 } = stmt;
6968
6969 let object_type = ObjectType::Sink;
6970 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6971
6972 let Some(item) = item else {
6973 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6974 name: sink_name.to_string(),
6975 object_type,
6976 });
6977
6978 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6979 };
6980 let item = item.at_version(RelationVersionSelector::Latest);
6982
6983 match action {
6984 AlterSinkAction::ChangeRelation(new_from) => {
6985 let create_sql = item.create_sql();
6987 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6988 let [stmt]: [StatementParseResult; 1] = stmts
6989 .try_into()
6990 .expect("create sql of sink was not exactly one statement");
6991 let Statement::CreateSink(mut stmt) = stmt.ast else {
6992 unreachable!("invalid create SQL for sink item");
6993 };
6994
6995 let cur_version = stmt
6997 .with_options
6998 .drain_filter_swapping(|o| o.name == CreateSinkOptionName::Version)
6999 .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
7000 .max()
7001 .unwrap_or(0);
7002 let new_version = cur_version + 1;
7003 stmt.with_options.push(CreateSinkOption {
7004 name: CreateSinkOptionName::Version,
7005 value: Some(WithOptionValue::Value(Value::Number(
7006 new_version.to_string(),
7007 ))),
7008 });
7009
7010 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
7012 stmt.from = new_from;
7013
7014 let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
7016 unreachable!("invalid plan for CREATE SINK statement");
7017 };
7018
7019 Ok(Plan::AlterSink(AlterSinkPlan {
7020 item_id: item.id(),
7021 global_id: item.global_id(),
7022 sink: plan.sink,
7023 with_snapshot: plan.with_snapshot,
7024 in_cluster: plan.in_cluster,
7025 }))
7026 }
7027 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
7028 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
7029 }
7030}
7031
7032pub fn describe_alter_source(
7033 _: &StatementContext,
7034 _: AlterSourceStatement<Aug>,
7035) -> Result<StatementDesc, PlanError> {
7036 Ok(StatementDesc::new(None))
7038}
7039
7040generate_extracted_config!(
7041 AlterSourceAddSubsourceOption,
7042 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7043 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7044 (Details, String)
7045);
7046
7047pub fn plan_alter_source(
7048 scx: &mut StatementContext,
7049 stmt: AlterSourceStatement<Aug>,
7050) -> Result<Plan, PlanError> {
7051 let AlterSourceStatement {
7052 source_name,
7053 if_exists,
7054 action,
7055 } = stmt;
7056 let object_type = ObjectType::Source;
7057
7058 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7059 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7060 name: source_name.to_string(),
7061 object_type,
7062 });
7063
7064 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7065 }
7066
7067 match action {
7068 AlterSourceAction::SetOptions(options) => {
7069 let mut options = options.into_iter();
7070 let option = options.next().unwrap();
7071 if option.name == CreateSourceOptionName::RetainHistory {
7072 if options.next().is_some() {
7073 sql_bail!("RETAIN HISTORY must be only option");
7074 }
7075 return alter_retain_history(
7076 scx,
7077 object_type,
7078 if_exists,
7079 UnresolvedObjectName::Item(source_name),
7080 option.value,
7081 );
7082 }
7083 sql_bail!(
7086 "Cannot modify the {} of a SOURCE.",
7087 option.name.to_ast_string_simple()
7088 );
7089 }
7090 AlterSourceAction::ResetOptions(reset) => {
7091 let mut options = reset.into_iter();
7092 let option = options.next().unwrap();
7093 if option == CreateSourceOptionName::RetainHistory {
7094 if options.next().is_some() {
7095 sql_bail!("RETAIN HISTORY must be only option");
7096 }
7097 return alter_retain_history(
7098 scx,
7099 object_type,
7100 if_exists,
7101 UnresolvedObjectName::Item(source_name),
7102 None,
7103 );
7104 }
7105 sql_bail!(
7106 "Cannot modify the {} of a SOURCE.",
7107 option.to_ast_string_simple()
7108 );
7109 }
7110 AlterSourceAction::DropSubsources { .. } => {
7111 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7112 }
7113 AlterSourceAction::AddSubsources { .. } => {
7114 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7115 }
7116 AlterSourceAction::RefreshReferences => {
7117 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7118 }
7119 };
7120}
7121
7122pub fn describe_alter_system_set(
7123 _: &StatementContext,
7124 _: AlterSystemSetStatement,
7125) -> Result<StatementDesc, PlanError> {
7126 Ok(StatementDesc::new(None))
7127}
7128
7129pub fn plan_alter_system_set(
7130 _: &StatementContext,
7131 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7132) -> Result<Plan, PlanError> {
7133 let name = name.to_string();
7134 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7135 name,
7136 value: scl::plan_set_variable_to(to)?,
7137 }))
7138}
7139
7140pub fn describe_alter_system_reset(
7141 _: &StatementContext,
7142 _: AlterSystemResetStatement,
7143) -> Result<StatementDesc, PlanError> {
7144 Ok(StatementDesc::new(None))
7145}
7146
7147pub fn plan_alter_system_reset(
7148 _: &StatementContext,
7149 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7150) -> Result<Plan, PlanError> {
7151 let name = name.to_string();
7152 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7153}
7154
7155pub fn describe_alter_system_reset_all(
7156 _: &StatementContext,
7157 _: AlterSystemResetAllStatement,
7158) -> Result<StatementDesc, PlanError> {
7159 Ok(StatementDesc::new(None))
7160}
7161
7162pub fn plan_alter_system_reset_all(
7163 _: &StatementContext,
7164 _: AlterSystemResetAllStatement,
7165) -> Result<Plan, PlanError> {
7166 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7167}
7168
7169pub fn describe_alter_role(
7170 _: &StatementContext,
7171 _: AlterRoleStatement<Aug>,
7172) -> Result<StatementDesc, PlanError> {
7173 Ok(StatementDesc::new(None))
7174}
7175
7176pub fn plan_alter_role(
7177 _scx: &StatementContext,
7178 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7179) -> Result<Plan, PlanError> {
7180 let option = match option {
7181 AlterRoleOption::Attributes(attrs) => {
7182 let attrs = plan_role_attributes(attrs)?;
7183 PlannedAlterRoleOption::Attributes(attrs)
7184 }
7185 AlterRoleOption::Variable(variable) => {
7186 let var = plan_role_variable(variable)?;
7187 PlannedAlterRoleOption::Variable(var)
7188 }
7189 };
7190
7191 Ok(Plan::AlterRole(AlterRolePlan {
7192 id: name.id,
7193 name: name.name,
7194 option,
7195 }))
7196}
7197
7198pub fn describe_alter_table_add_column(
7199 _: &StatementContext,
7200 _: AlterTableAddColumnStatement<Aug>,
7201) -> Result<StatementDesc, PlanError> {
7202 Ok(StatementDesc::new(None))
7203}
7204
7205pub fn plan_alter_table_add_column(
7206 scx: &StatementContext,
7207 stmt: AlterTableAddColumnStatement<Aug>,
7208) -> Result<Plan, PlanError> {
7209 let AlterTableAddColumnStatement {
7210 if_exists,
7211 name,
7212 if_col_not_exist,
7213 column_name,
7214 data_type,
7215 } = stmt;
7216 let object_type = ObjectType::Table;
7217
7218 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7219
7220 let (relation_id, item_name, desc) =
7221 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7222 Some(item) => {
7223 let item_name = scx.catalog.resolve_full_name(item.name());
7225 let item = item.at_version(RelationVersionSelector::Latest);
7226 let desc = item.desc(&item_name)?.into_owned();
7227 (item.id(), item_name, desc)
7228 }
7229 None => {
7230 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7231 name: name.to_ast_string_simple(),
7232 object_type,
7233 });
7234 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7235 }
7236 };
7237
7238 let column_name = ColumnName::from(column_name.as_str());
7239 if desc.get_by_name(&column_name).is_some() {
7240 if if_col_not_exist {
7241 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7242 column_name: column_name.to_string(),
7243 object_name: item_name.item,
7244 });
7245 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7246 } else {
7247 return Err(PlanError::ColumnAlreadyExists {
7248 column_name,
7249 object_name: item_name.item,
7250 });
7251 }
7252 }
7253
7254 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7255 let column_type = scalar_type.nullable(true);
7257 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7259
7260 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7261 relation_id,
7262 column_name,
7263 column_type,
7264 raw_sql_type,
7265 }))
7266}
7267
7268pub fn describe_comment(
7269 _: &StatementContext,
7270 _: CommentStatement<Aug>,
7271) -> Result<StatementDesc, PlanError> {
7272 Ok(StatementDesc::new(None))
7273}
7274
7275pub fn plan_comment(
7276 scx: &mut StatementContext,
7277 stmt: CommentStatement<Aug>,
7278) -> Result<Plan, PlanError> {
7279 const MAX_COMMENT_LENGTH: usize = 1024;
7280
7281 let CommentStatement { object, comment } = stmt;
7282
7283 if let Some(c) = &comment {
7285 if c.len() > 1024 {
7286 return Err(PlanError::CommentTooLong {
7287 length: c.len(),
7288 max_size: MAX_COMMENT_LENGTH,
7289 });
7290 }
7291 }
7292
7293 let (object_id, column_pos) = match &object {
7294 com_ty @ CommentObjectType::Table { name }
7295 | com_ty @ CommentObjectType::View { name }
7296 | com_ty @ CommentObjectType::MaterializedView { name }
7297 | com_ty @ CommentObjectType::Index { name }
7298 | com_ty @ CommentObjectType::Func { name }
7299 | com_ty @ CommentObjectType::Connection { name }
7300 | com_ty @ CommentObjectType::Source { name }
7301 | com_ty @ CommentObjectType::Sink { name }
7302 | com_ty @ CommentObjectType::Secret { name }
7303 | com_ty @ CommentObjectType::ContinualTask { name } => {
7304 let item = scx.get_item_by_resolved_name(name)?;
7305 match (com_ty, item.item_type()) {
7306 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7307 (CommentObjectId::Table(item.id()), None)
7308 }
7309 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7310 (CommentObjectId::View(item.id()), None)
7311 }
7312 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7313 (CommentObjectId::MaterializedView(item.id()), None)
7314 }
7315 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7316 (CommentObjectId::Index(item.id()), None)
7317 }
7318 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7319 (CommentObjectId::Func(item.id()), None)
7320 }
7321 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7322 (CommentObjectId::Connection(item.id()), None)
7323 }
7324 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7325 (CommentObjectId::Source(item.id()), None)
7326 }
7327 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7328 (CommentObjectId::Sink(item.id()), None)
7329 }
7330 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7331 (CommentObjectId::Secret(item.id()), None)
7332 }
7333 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7334 (CommentObjectId::ContinualTask(item.id()), None)
7335 }
7336 (com_ty, cat_ty) => {
7337 let expected_type = match com_ty {
7338 CommentObjectType::Table { .. } => ObjectType::Table,
7339 CommentObjectType::View { .. } => ObjectType::View,
7340 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7341 CommentObjectType::Index { .. } => ObjectType::Index,
7342 CommentObjectType::Func { .. } => ObjectType::Func,
7343 CommentObjectType::Connection { .. } => ObjectType::Connection,
7344 CommentObjectType::Source { .. } => ObjectType::Source,
7345 CommentObjectType::Sink { .. } => ObjectType::Sink,
7346 CommentObjectType::Secret { .. } => ObjectType::Secret,
7347 _ => unreachable!("these are the only types we match on"),
7348 };
7349
7350 return Err(PlanError::InvalidObjectType {
7351 expected_type: SystemObjectType::Object(expected_type),
7352 actual_type: SystemObjectType::Object(cat_ty.into()),
7353 object_name: item.name().item.clone(),
7354 });
7355 }
7356 }
7357 }
7358 CommentObjectType::Type { ty } => match ty {
7359 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7360 sql_bail!("cannot comment on anonymous list or map type");
7361 }
7362 ResolvedDataType::Named { id, modifiers, .. } => {
7363 if !modifiers.is_empty() {
7364 sql_bail!("cannot comment on type with modifiers");
7365 }
7366 (CommentObjectId::Type(*id), None)
7367 }
7368 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7369 },
7370 CommentObjectType::Column { name } => {
7371 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7372 match item.item_type() {
7373 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7374 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7375 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7376 CatalogItemType::MaterializedView => {
7377 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7378 }
7379 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7380 r => {
7381 return Err(PlanError::Unsupported {
7382 feature: format!("Specifying comments on a column of {r}"),
7383 discussion_no: None,
7384 });
7385 }
7386 }
7387 }
7388 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7389 CommentObjectType::Database { name } => {
7390 (CommentObjectId::Database(*name.database_id()), None)
7391 }
7392 CommentObjectType::Schema { name } => (
7393 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7394 None,
7395 ),
7396 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7397 CommentObjectType::ClusterReplica { name } => {
7398 let replica = scx.catalog.resolve_cluster_replica(name)?;
7399 (
7400 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7401 None,
7402 )
7403 }
7404 CommentObjectType::NetworkPolicy { name } => {
7405 (CommentObjectId::NetworkPolicy(name.id), None)
7406 }
7407 };
7408
7409 if let Some(p) = column_pos {
7415 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7416 max_num_columns: MAX_NUM_COLUMNS,
7417 req_num_columns: p,
7418 })?;
7419 }
7420
7421 Ok(Plan::Comment(CommentPlan {
7422 object_id,
7423 sub_component: column_pos,
7424 comment,
7425 }))
7426}
7427
7428pub(crate) fn resolve_cluster<'a>(
7429 scx: &'a StatementContext,
7430 name: &'a Ident,
7431 if_exists: bool,
7432) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7433 match scx.resolve_cluster(Some(name)) {
7434 Ok(cluster) => Ok(Some(cluster)),
7435 Err(_) if if_exists => Ok(None),
7436 Err(e) => Err(e),
7437 }
7438}
7439
7440pub(crate) fn resolve_cluster_replica<'a>(
7441 scx: &'a StatementContext,
7442 name: &QualifiedReplica,
7443 if_exists: bool,
7444) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7445 match scx.resolve_cluster(Some(&name.cluster)) {
7446 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7447 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7448 None if if_exists => Ok(None),
7449 None => Err(sql_err!(
7450 "CLUSTER {} has no CLUSTER REPLICA named {}",
7451 cluster.name(),
7452 name.replica.as_str().quoted(),
7453 )),
7454 },
7455 Err(_) if if_exists => Ok(None),
7456 Err(e) => Err(e),
7457 }
7458}
7459
7460pub(crate) fn resolve_database<'a>(
7461 scx: &'a StatementContext,
7462 name: &'a UnresolvedDatabaseName,
7463 if_exists: bool,
7464) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7465 match scx.resolve_database(name) {
7466 Ok(database) => Ok(Some(database)),
7467 Err(_) if if_exists => Ok(None),
7468 Err(e) => Err(e),
7469 }
7470}
7471
7472pub(crate) fn resolve_schema<'a>(
7473 scx: &'a StatementContext,
7474 name: UnresolvedSchemaName,
7475 if_exists: bool,
7476) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7477 match scx.resolve_schema(name) {
7478 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7479 Err(_) if if_exists => Ok(None),
7480 Err(e) => Err(e),
7481 }
7482}
7483
7484pub(crate) fn resolve_network_policy<'a>(
7485 scx: &'a StatementContext,
7486 name: Ident,
7487 if_exists: bool,
7488) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7489 match scx.catalog.resolve_network_policy(&name.to_string()) {
7490 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7491 id: policy.id(),
7492 name: policy.name().to_string(),
7493 })),
7494 Err(_) if if_exists => Ok(None),
7495 Err(e) => Err(e.into()),
7496 }
7497}
7498
7499pub(crate) fn resolve_item_or_type<'a>(
7500 scx: &'a StatementContext,
7501 object_type: ObjectType,
7502 name: UnresolvedItemName,
7503 if_exists: bool,
7504) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7505 let name = normalize::unresolved_item_name(name)?;
7506 let catalog_item = match object_type {
7507 ObjectType::Type => scx.catalog.resolve_type(&name),
7508 _ => scx.catalog.resolve_item(&name),
7509 };
7510
7511 match catalog_item {
7512 Ok(item) => {
7513 let is_type = ObjectType::from(item.item_type());
7514 if object_type == is_type {
7515 Ok(Some(item))
7516 } else {
7517 Err(PlanError::MismatchedObjectType {
7518 name: scx.catalog.minimal_qualification(item.name()),
7519 is_type,
7520 expected_type: object_type,
7521 })
7522 }
7523 }
7524 Err(_) if if_exists => Ok(None),
7525 Err(e) => Err(e.into()),
7526 }
7527}
7528
7529fn ensure_cluster_is_not_managed(
7531 scx: &StatementContext,
7532 cluster_id: ClusterId,
7533) -> Result<(), PlanError> {
7534 let cluster = scx.catalog.get_cluster(cluster_id);
7535 if cluster.is_managed() {
7536 Err(PlanError::ManagedCluster {
7537 cluster_name: cluster.name().to_string(),
7538 })
7539 } else {
7540 Ok(())
7541 }
7542}