1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_proto::RustType;
33use mz_repr::adt::interval::Interval;
34use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
35use mz_repr::network_policy_id::NetworkPolicyId;
36use mz_repr::optimize::OptimizerFeatureOverrides;
37use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
38use mz_repr::role_id::RoleId;
39use mz_repr::{
40 CatalogItemId, ColumnName, RelationDesc, RelationVersion, RelationVersionSelector,
41 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
42 preserves_order, strconv,
43};
44use mz_sql_parser::ast::{
45 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
46 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
47 AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
48 AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
49 AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
50 AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
51 AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
52 AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
53 ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
54 ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
55 ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
56 ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
57 ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
58 CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
59 CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
60 CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
61 CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
62 CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
63 CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
64 CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
65 CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
66 CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
67 CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
68 CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
69 CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
70 DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
71 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
72 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
73 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
74 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
75 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
76 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
77 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
78 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
79 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
80 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
81 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
82};
83use mz_sql_parser::ident;
84use mz_sql_parser::parser::StatementParseResult;
85use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
86use mz_storage_types::connections::{Connection, KafkaTopicOptions};
87use mz_storage_types::sinks::{
88 KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
89 StorageSinkConnection,
90};
91use mz_storage_types::sources::encoding::{
92 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
93 SourceDataEncoding, included_column_desc,
94};
95use mz_storage_types::sources::envelope::{
96 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
97};
98use mz_storage_types::sources::kafka::{
99 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
100};
101use mz_storage_types::sources::load_generator::{
102 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
103 LoadGeneratorOutput, LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
104};
105use mz_storage_types::sources::mysql::{
106 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
107};
108use mz_storage_types::sources::postgres::{
109 PostgresSourceConnection, PostgresSourcePublicationDetails,
110 ProtoPostgresSourcePublicationDetails,
111};
112use mz_storage_types::sources::sql_server::{
113 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
114};
115use mz_storage_types::sources::{
116 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
117 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
118 SourceExportDetails, SourceExportStatementDetails, SqlServerSourceConnection,
119 SqlServerSourceExtras, Timeline,
120};
121use prost::Message;
122
123use crate::ast::display::AstDisplay;
124use crate::catalog::{
125 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
126 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
127};
128use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
129use crate::names::{
130 Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
131 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
132 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
133};
134use crate::normalize::{self, ident};
135use crate::plan::error::PlanError;
136use crate::plan::query::{
137 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
138 scalar_type_from_sql,
139};
140use crate::plan::scope::Scope;
141use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
142use crate::plan::statement::{StatementContext, StatementDesc, scl};
143use crate::plan::typeconv::CastContext;
144use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
145use crate::plan::{
146 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
147 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
148 AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
149 AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
150 AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
151 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
152 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
153 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
154 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
155 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
156 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
157 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
158 MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan,
159 PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink,
160 Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat,
161 WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query,
162 transform_ast,
163};
164use crate::session::vars::{
165 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
166 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
167};
168use crate::{names, parse};
169
170mod connection;
171
172const MAX_NUM_COLUMNS: usize = 256;
177
178static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
179 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
180
181fn check_partition_by(desc: &RelationDesc, mut partition_by: Vec<Ident>) -> Result<(), PlanError> {
185 if partition_by.len() > desc.len() {
186 tracing::error!(
187 "PARTITION BY contains more columns than the relation. (expected at most {}, got {})",
188 desc.len(),
189 partition_by.len()
190 );
191 partition_by.truncate(desc.len());
192 }
193
194 let desc_prefix = desc.iter().take(partition_by.len());
195 for (idx, ((desc_name, desc_type), partition_name)) in
196 desc_prefix.zip_eq(partition_by).enumerate()
197 {
198 let partition_name = normalize::column_name(partition_name);
199 if *desc_name != partition_name {
200 sql_bail!(
201 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
202 );
203 }
204 if !preserves_order(&desc_type.scalar_type) {
205 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
206 }
207 }
208 Ok(())
209}
210
211pub fn describe_create_database(
212 _: &StatementContext,
213 _: CreateDatabaseStatement,
214) -> Result<StatementDesc, PlanError> {
215 Ok(StatementDesc::new(None))
216}
217
218pub fn plan_create_database(
219 _: &StatementContext,
220 CreateDatabaseStatement {
221 name,
222 if_not_exists,
223 }: CreateDatabaseStatement,
224) -> Result<Plan, PlanError> {
225 Ok(Plan::CreateDatabase(CreateDatabasePlan {
226 name: normalize::ident(name.0),
227 if_not_exists,
228 }))
229}
230
231pub fn describe_create_schema(
232 _: &StatementContext,
233 _: CreateSchemaStatement,
234) -> Result<StatementDesc, PlanError> {
235 Ok(StatementDesc::new(None))
236}
237
238pub fn plan_create_schema(
239 scx: &StatementContext,
240 CreateSchemaStatement {
241 mut name,
242 if_not_exists,
243 }: CreateSchemaStatement,
244) -> Result<Plan, PlanError> {
245 if name.0.len() > 2 {
246 sql_bail!("schema name {} has more than two components", name);
247 }
248 let schema_name = normalize::ident(
249 name.0
250 .pop()
251 .expect("names always have at least one component"),
252 );
253 let database_spec = match name.0.pop() {
254 None => match scx.catalog.active_database() {
255 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
256 None => sql_bail!("no database specified and no active database"),
257 },
258 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
259 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
260 Err(_) => sql_bail!("invalid database {}", n.as_str()),
261 },
262 };
263 Ok(Plan::CreateSchema(CreateSchemaPlan {
264 database_spec,
265 schema_name,
266 if_not_exists,
267 }))
268}
269
270pub fn describe_create_table(
271 _: &StatementContext,
272 _: CreateTableStatement<Aug>,
273) -> Result<StatementDesc, PlanError> {
274 Ok(StatementDesc::new(None))
275}
276
277pub fn plan_create_table(
278 scx: &StatementContext,
279 stmt: CreateTableStatement<Aug>,
280) -> Result<Plan, PlanError> {
281 let CreateTableStatement {
282 name,
283 columns,
284 constraints,
285 if_not_exists,
286 temporary,
287 with_options,
288 } = &stmt;
289
290 let names: Vec<_> = columns
291 .iter()
292 .filter(|c| {
293 let is_versioned = c
297 .options
298 .iter()
299 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
300 !is_versioned
301 })
302 .map(|c| normalize::column_name(c.name.clone()))
303 .collect();
304
305 if let Some(dup) = names.iter().duplicates().next() {
306 sql_bail!("column {} specified more than once", dup.quoted());
307 }
308
309 let mut column_types = Vec::with_capacity(columns.len());
312 let mut defaults = Vec::with_capacity(columns.len());
313 let mut changes = BTreeMap::new();
314 let mut keys = Vec::new();
315
316 for (i, c) in columns.into_iter().enumerate() {
317 let aug_data_type = &c.data_type;
318 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
319 let mut nullable = true;
320 let mut default = Expr::null();
321 let mut versioned = false;
322 for option in &c.options {
323 match &option.option {
324 ColumnOption::NotNull => nullable = false,
325 ColumnOption::Default(expr) => {
326 let mut expr = expr.clone();
329 transform_ast::transform(scx, &mut expr)?;
330 let _ = query::plan_default_expr(scx, &expr, &ty)?;
331 default = expr.clone();
332 }
333 ColumnOption::Unique { is_primary } => {
334 keys.push(vec![i]);
335 if *is_primary {
336 nullable = false;
337 }
338 }
339 ColumnOption::Versioned { action, version } => {
340 let version = RelationVersion::from(*version);
341 versioned = true;
342
343 let name = normalize::column_name(c.name.clone());
344 let typ = ty.clone().nullable(nullable);
345
346 changes.insert(version, (action.clone(), name, typ));
347 }
348 other => {
349 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
350 }
351 }
352 }
353 if !versioned {
356 column_types.push(ty.nullable(nullable));
357 }
358 defaults.push(default);
359 }
360
361 let mut seen_primary = false;
362 'c: for constraint in constraints {
363 match constraint {
364 TableConstraint::Unique {
365 name: _,
366 columns,
367 is_primary,
368 nulls_not_distinct,
369 } => {
370 if seen_primary && *is_primary {
371 sql_bail!(
372 "multiple primary keys for table {} are not allowed",
373 name.to_ast_string_stable()
374 );
375 }
376 seen_primary = *is_primary || seen_primary;
377
378 let mut key = vec![];
379 for column in columns {
380 let column = normalize::column_name(column.clone());
381 match names.iter().position(|name| *name == column) {
382 None => sql_bail!("unknown column in constraint: {}", column),
383 Some(i) => {
384 let nullable = &mut column_types[i].nullable;
385 if *is_primary {
386 if *nulls_not_distinct {
387 sql_bail!(
388 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
389 );
390 }
391
392 *nullable = false;
393 } else if !(*nulls_not_distinct || !*nullable) {
394 break 'c;
397 }
398
399 key.push(i);
400 }
401 }
402 }
403
404 if *is_primary {
405 keys.insert(0, key);
406 } else {
407 keys.push(key);
408 }
409 }
410 TableConstraint::ForeignKey { .. } => {
411 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
414 }
415 TableConstraint::Check { .. } => {
416 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
419 }
420 }
421 }
422
423 if !keys.is_empty() {
424 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
427 }
428
429 let typ = SqlRelationType::new(column_types).with_keys(keys);
430
431 let temporary = *temporary;
432 let name = if temporary {
433 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
434 } else {
435 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
436 };
437
438 let full_name = scx.catalog.resolve_full_name(&name);
440 let partial_name = PartialItemName::from(full_name.clone());
441 if let (false, Ok(item)) = (
444 if_not_exists,
445 scx.catalog.resolve_item_or_type(&partial_name),
446 ) {
447 return Err(PlanError::ItemAlreadyExists {
448 name: full_name.to_string(),
449 item_type: item.item_type(),
450 });
451 }
452
453 let desc = RelationDesc::new(typ, names);
454 let mut desc = VersionedRelationDesc::new(desc);
455 for (version, (_action, name, typ)) in changes.into_iter() {
456 let new_version = desc.add_column(name, typ);
457 if version != new_version {
458 return Err(PlanError::InvalidTable {
459 name: full_name.item,
460 });
461 }
462 }
463
464 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
465
466 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
472 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
473
474 let compaction_window = options.iter().find_map(|o| {
475 #[allow(irrefutable_let_patterns)]
476 if let crate::plan::TableOption::RetainHistory(lcw) = o {
477 Some(lcw.clone())
478 } else {
479 None
480 }
481 });
482
483 let table = Table {
484 create_sql,
485 desc,
486 temporary,
487 compaction_window,
488 data_source: TableDataSource::TableWrites { defaults },
489 };
490 Ok(Plan::CreateTable(CreateTablePlan {
491 name,
492 table,
493 if_not_exists: *if_not_exists,
494 }))
495}
496
497pub fn describe_create_table_from_source(
498 _: &StatementContext,
499 _: CreateTableFromSourceStatement<Aug>,
500) -> Result<StatementDesc, PlanError> {
501 Ok(StatementDesc::new(None))
502}
503
504pub fn describe_create_webhook_source(
505 _: &StatementContext,
506 _: CreateWebhookSourceStatement<Aug>,
507) -> Result<StatementDesc, PlanError> {
508 Ok(StatementDesc::new(None))
509}
510
511pub fn describe_create_source(
512 _: &StatementContext,
513 _: CreateSourceStatement<Aug>,
514) -> Result<StatementDesc, PlanError> {
515 Ok(StatementDesc::new(None))
516}
517
518pub fn describe_create_subsource(
519 _: &StatementContext,
520 _: CreateSubsourceStatement<Aug>,
521) -> Result<StatementDesc, PlanError> {
522 Ok(StatementDesc::new(None))
523}
524
525generate_extracted_config!(
526 CreateSourceOption,
527 (TimestampInterval, Duration),
528 (RetainHistory, OptionalDuration)
529);
530
531generate_extracted_config!(
532 PgConfigOption,
533 (Details, String),
534 (Publication, String),
535 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
536);
537
538generate_extracted_config!(
539 MySqlConfigOption,
540 (Details, String),
541 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
542 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
543);
544
545generate_extracted_config!(
546 SqlServerConfigOption,
547 (Details, String),
548 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
549 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
550);
551
552pub fn plan_create_webhook_source(
553 scx: &StatementContext,
554 mut stmt: CreateWebhookSourceStatement<Aug>,
555) -> Result<Plan, PlanError> {
556 if stmt.is_table {
557 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
558 }
559
560 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
563 let enable_multi_replica_sources =
564 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
565 if !enable_multi_replica_sources {
566 if in_cluster.replica_ids().len() > 1 {
567 sql_bail!("cannot create webhook source in cluster with more than one replica")
568 }
569 }
570 let create_sql =
571 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
572
573 let CreateWebhookSourceStatement {
574 name,
575 if_not_exists,
576 body_format,
577 include_headers,
578 validate_using,
579 is_table,
580 in_cluster: _,
582 } = stmt;
583
584 let validate_using = validate_using
585 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
586 .transpose()?;
587 if let Some(WebhookValidation { expression, .. }) = &validate_using {
588 if !expression.contains_column() {
591 return Err(PlanError::WebhookValidationDoesNotUseColumns);
592 }
593 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
597 return Err(PlanError::WebhookValidationNonDeterministic);
598 }
599 }
600
601 let body_format = match body_format {
602 Format::Bytes => WebhookBodyFormat::Bytes,
603 Format::Json { array } => WebhookBodyFormat::Json { array },
604 Format::Text => WebhookBodyFormat::Text,
605 ty => {
607 return Err(PlanError::Unsupported {
608 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
609 discussion_no: None,
610 });
611 }
612 };
613
614 let mut column_ty = vec![
615 SqlColumnType {
617 scalar_type: SqlScalarType::from(body_format),
618 nullable: false,
619 },
620 ];
621 let mut column_names = vec!["body".to_string()];
622
623 let mut headers = WebhookHeaders::default();
624
625 if let Some(filters) = include_headers.column {
627 column_ty.push(SqlColumnType {
628 scalar_type: SqlScalarType::Map {
629 value_type: Box::new(SqlScalarType::String),
630 custom_id: None,
631 },
632 nullable: false,
633 });
634 column_names.push("headers".to_string());
635
636 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
637 filters.into_iter().partition_map(|filter| {
638 if filter.block {
639 itertools::Either::Right(filter.header_name)
640 } else {
641 itertools::Either::Left(filter.header_name)
642 }
643 });
644 headers.header_column = Some(WebhookHeaderFilters { allow, block });
645 }
646
647 for header in include_headers.mappings {
649 let scalar_type = header
650 .use_bytes
651 .then_some(SqlScalarType::Bytes)
652 .unwrap_or(SqlScalarType::String);
653 column_ty.push(SqlColumnType {
654 scalar_type,
655 nullable: true,
656 });
657 column_names.push(header.column_name.into_string());
658
659 let column_idx = column_ty.len() - 1;
660 assert_eq!(
662 column_idx,
663 column_names.len() - 1,
664 "header column names and types don't match"
665 );
666 headers
667 .mapped_headers
668 .insert(column_idx, (header.header_name, header.use_bytes));
669 }
670
671 let mut unique_check = HashSet::with_capacity(column_names.len());
673 for name in &column_names {
674 if !unique_check.insert(name) {
675 return Err(PlanError::AmbiguousColumn(name.clone().into()));
676 }
677 }
678 if column_names.len() > MAX_NUM_COLUMNS {
679 return Err(PlanError::TooManyColumns {
680 max_num_columns: MAX_NUM_COLUMNS,
681 req_num_columns: column_names.len(),
682 });
683 }
684
685 let typ = SqlRelationType::new(column_ty);
686 let desc = RelationDesc::new(typ, column_names);
687
688 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
690 let full_name = scx.catalog.resolve_full_name(&name);
691 let partial_name = PartialItemName::from(full_name.clone());
692 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
693 return Err(PlanError::ItemAlreadyExists {
694 name: full_name.to_string(),
695 item_type: item.item_type(),
696 });
697 }
698
699 let timeline = Timeline::EpochMilliseconds;
702
703 let plan = if is_table {
704 let data_source = DataSourceDesc::Webhook {
705 validate_using,
706 body_format,
707 headers,
708 cluster_id: Some(in_cluster.id()),
709 };
710 let data_source = TableDataSource::DataSource {
711 desc: data_source,
712 timeline,
713 };
714 Plan::CreateTable(CreateTablePlan {
715 name,
716 if_not_exists,
717 table: Table {
718 create_sql,
719 desc: VersionedRelationDesc::new(desc),
720 temporary: false,
721 compaction_window: None,
722 data_source,
723 },
724 })
725 } else {
726 let data_source = DataSourceDesc::Webhook {
727 validate_using,
728 body_format,
729 headers,
730 cluster_id: None,
732 };
733 Plan::CreateSource(CreateSourcePlan {
734 name,
735 source: Source {
736 create_sql,
737 data_source,
738 desc,
739 compaction_window: None,
740 },
741 if_not_exists,
742 timeline,
743 in_cluster: Some(in_cluster.id()),
744 })
745 };
746
747 Ok(plan)
748}
749
750pub fn plan_create_source(
751 scx: &StatementContext,
752 mut stmt: CreateSourceStatement<Aug>,
753) -> Result<Plan, PlanError> {
754 let CreateSourceStatement {
755 name,
756 in_cluster: _,
757 col_names,
758 connection: source_connection,
759 envelope,
760 if_not_exists,
761 format,
762 key_constraint,
763 include_metadata,
764 with_options,
765 external_references: referenced_subsources,
766 progress_subsource,
767 } = &stmt;
768
769 mz_ore::soft_assert_or_log!(
770 referenced_subsources.is_none(),
771 "referenced subsources must be cleared in purification"
772 );
773
774 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
775 && scx.catalog.system_vars().force_source_table_syntax();
776
777 if force_source_table_syntax {
780 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
781 Err(PlanError::UseTablesForSources(
782 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
783 ))?;
784 }
785 }
786
787 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
788
789 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
790 && include_metadata
791 .iter()
792 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
793 {
794 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
796 }
797 if !matches!(
798 source_connection,
799 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
800 ) && !include_metadata.is_empty()
801 {
802 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
803 }
804
805 if !include_metadata.is_empty()
806 && !matches!(
807 envelope,
808 ast::SourceEnvelope::Upsert { .. }
809 | ast::SourceEnvelope::None
810 | ast::SourceEnvelope::Debezium
811 )
812 {
813 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
814 }
815
816 let external_connection =
817 plan_generic_source_connection(scx, source_connection, include_metadata)?;
818
819 let CreateSourceOptionExtracted {
820 timestamp_interval,
821 retain_history,
822 seen: _,
823 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
824
825 let metadata_columns_desc = match external_connection {
826 GenericSourceConnection::Kafka(KafkaSourceConnection {
827 ref metadata_columns,
828 ..
829 }) => kafka_metadata_columns_desc(metadata_columns),
830 _ => vec![],
831 };
832
833 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
835 scx,
836 &envelope,
837 format,
838 Some(external_connection.default_key_desc()),
839 external_connection.default_value_desc(),
840 include_metadata,
841 metadata_columns_desc,
842 &external_connection,
843 )?;
844 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
845
846 let names: Vec<_> = desc.iter_names().cloned().collect();
847 if let Some(dup) = names.iter().duplicates().next() {
848 sql_bail!("column {} specified more than once", dup.quoted());
849 }
850
851 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
853 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
856
857 let key_columns = columns
858 .into_iter()
859 .map(normalize::column_name)
860 .collect::<Vec<_>>();
861
862 let mut uniq = BTreeSet::new();
863 for col in key_columns.iter() {
864 if !uniq.insert(col) {
865 sql_bail!("Repeated column name in source key constraint: {}", col);
866 }
867 }
868
869 let key_indices = key_columns
870 .iter()
871 .map(|col| {
872 let name_idx = desc
873 .get_by_name(col)
874 .map(|(idx, _type)| idx)
875 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
876 if desc.get_unambiguous_name(name_idx).is_none() {
877 sql_bail!("Ambiguous column in source key constraint: {}", col);
878 }
879 Ok(name_idx)
880 })
881 .collect::<Result<Vec<_>, _>>()?;
882
883 if !desc.typ().keys.is_empty() {
884 return Err(key_constraint_err(&desc, &key_columns));
885 } else {
886 desc = desc.with_key(key_indices);
887 }
888 }
889
890 let timestamp_interval = match timestamp_interval {
891 Some(duration) => {
892 let min = scx.catalog.system_vars().min_timestamp_interval();
893 let max = scx.catalog.system_vars().max_timestamp_interval();
894 if duration < min || duration > max {
895 return Err(PlanError::InvalidTimestampInterval {
896 min,
897 max,
898 requested: duration,
899 });
900 }
901 duration
902 }
903 None => scx.catalog.config().timestamp_interval,
904 };
905
906 let source_desc = SourceDesc::<ReferencedConnection> {
907 connection: external_connection,
908 timestamp_interval,
909 };
910
911 let data_source = match progress_subsource {
912 Some(name) => {
913 let DeferredItemName::Named(name) = name else {
914 sql_bail!("[internal error] progress subsource must be named during purification");
915 };
916 let ResolvedItemName::Item { id, .. } = name else {
917 sql_bail!("[internal error] invalid target id");
918 };
919
920 let details = match source_desc.connection {
921 GenericSourceConnection::Kafka(ref c) => {
922 SourceExportDetails::Kafka(KafkaSourceExportDetails {
923 metadata_columns: c.metadata_columns.clone(),
924 })
925 }
926 GenericSourceConnection::LoadGenerator(ref c) => match c.load_generator {
927 LoadGenerator::Auction
928 | LoadGenerator::Marketing
929 | LoadGenerator::Tpch { .. } => SourceExportDetails::None,
930 LoadGenerator::Counter { .. }
931 | LoadGenerator::Clock
932 | LoadGenerator::Datums
933 | LoadGenerator::KeyValue(_) => {
934 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
935 output: LoadGeneratorOutput::Default,
936 })
937 }
938 },
939 GenericSourceConnection::Postgres(_)
940 | GenericSourceConnection::MySql(_)
941 | GenericSourceConnection::SqlServer(_) => SourceExportDetails::None,
942 };
943
944 DataSourceDesc::OldSyntaxIngestion {
945 desc: source_desc,
946 progress_subsource: *id,
947 data_config: SourceExportDataConfig {
948 encoding,
949 envelope: envelope.clone(),
950 },
951 details,
952 }
953 }
954 None => DataSourceDesc::Ingestion(source_desc),
955 };
956
957 let if_not_exists = *if_not_exists;
958 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
959
960 let full_name = scx.catalog.resolve_full_name(&name);
962 let partial_name = PartialItemName::from(full_name.clone());
963 if let (false, Ok(item)) = (
966 if_not_exists,
967 scx.catalog.resolve_item_or_type(&partial_name),
968 ) {
969 return Err(PlanError::ItemAlreadyExists {
970 name: full_name.to_string(),
971 item_type: item.item_type(),
972 });
973 }
974
975 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
979
980 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
981
982 let timeline = match envelope {
984 SourceEnvelope::CdcV2 => {
985 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
986 }
987 _ => Timeline::EpochMilliseconds,
988 };
989
990 let compaction_window = plan_retain_history_option(scx, retain_history)?;
991
992 let source = Source {
993 create_sql,
994 data_source,
995 desc,
996 compaction_window,
997 };
998
999 Ok(Plan::CreateSource(CreateSourcePlan {
1000 name,
1001 source,
1002 if_not_exists,
1003 timeline,
1004 in_cluster: Some(in_cluster.id()),
1005 }))
1006}
1007
1008pub fn plan_generic_source_connection(
1009 scx: &StatementContext<'_>,
1010 source_connection: &CreateSourceConnection<Aug>,
1011 include_metadata: &Vec<SourceIncludeMetadata>,
1012) -> Result<GenericSourceConnection<ReferencedConnection>, PlanError> {
1013 Ok(match source_connection {
1014 CreateSourceConnection::Kafka {
1015 connection,
1016 options,
1017 } => GenericSourceConnection::Kafka(plan_kafka_source_connection(
1018 scx,
1019 connection,
1020 options,
1021 include_metadata,
1022 )?),
1023 CreateSourceConnection::Postgres {
1024 connection,
1025 options,
1026 } => GenericSourceConnection::Postgres(plan_postgres_source_connection(
1027 scx, connection, options,
1028 )?),
1029 CreateSourceConnection::SqlServer {
1030 connection,
1031 options,
1032 } => GenericSourceConnection::SqlServer(plan_sqlserver_source_connection(
1033 scx, connection, options,
1034 )?),
1035 CreateSourceConnection::MySql {
1036 connection,
1037 options,
1038 } => {
1039 GenericSourceConnection::MySql(plan_mysql_source_connection(scx, connection, options)?)
1040 }
1041 CreateSourceConnection::LoadGenerator { generator, options } => {
1042 GenericSourceConnection::LoadGenerator(plan_load_generator_source_connection(
1043 scx,
1044 generator,
1045 options,
1046 include_metadata,
1047 )?)
1048 }
1049 })
1050}
1051
1052fn plan_load_generator_source_connection(
1053 scx: &StatementContext<'_>,
1054 generator: &ast::LoadGenerator,
1055 options: &Vec<LoadGeneratorOption<Aug>>,
1056 include_metadata: &Vec<SourceIncludeMetadata>,
1057) -> Result<LoadGeneratorSourceConnection, PlanError> {
1058 let load_generator =
1059 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1060 let LoadGeneratorOptionExtracted {
1061 tick_interval,
1062 as_of,
1063 up_to,
1064 ..
1065 } = options.clone().try_into()?;
1066 let tick_micros = match tick_interval {
1067 Some(interval) => Some(interval.as_micros().try_into()?),
1068 None => None,
1069 };
1070 if up_to < as_of {
1071 sql_bail!("UP TO cannot be less than AS OF");
1072 }
1073 Ok(LoadGeneratorSourceConnection {
1074 load_generator,
1075 tick_micros,
1076 as_of,
1077 up_to,
1078 })
1079}
1080
1081fn plan_mysql_source_connection(
1082 scx: &StatementContext<'_>,
1083 connection: &ResolvedItemName,
1084 options: &Vec<MySqlConfigOption<Aug>>,
1085) -> Result<MySqlSourceConnection<ReferencedConnection>, PlanError> {
1086 let connection_item = scx.get_item_by_resolved_name(connection)?;
1087 match connection_item.connection()? {
1088 Connection::MySql(connection) => connection,
1089 _ => sql_bail!(
1090 "{} is not a MySQL connection",
1091 scx.catalog.resolve_full_name(connection_item.name())
1092 ),
1093 };
1094 let MySqlConfigOptionExtracted {
1095 details,
1096 text_columns: _,
1100 exclude_columns: _,
1101 seen: _,
1102 } = options.clone().try_into()?;
1103 let details = details
1104 .as_ref()
1105 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1106 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1107 let details = ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1108 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1109 Ok(MySqlSourceConnection {
1110 connection: connection_item.id(),
1111 connection_id: connection_item.id(),
1112 details,
1113 })
1114}
1115
1116fn plan_sqlserver_source_connection(
1117 scx: &StatementContext<'_>,
1118 connection: &ResolvedItemName,
1119 options: &Vec<SqlServerConfigOption<Aug>>,
1120) -> Result<SqlServerSourceConnection<ReferencedConnection>, PlanError> {
1121 let connection_item = scx.get_item_by_resolved_name(connection)?;
1122 match connection_item.connection()? {
1123 Connection::SqlServer(connection) => connection,
1124 _ => sql_bail!(
1125 "{} is not a SQL Server connection",
1126 scx.catalog.resolve_full_name(connection_item.name())
1127 ),
1128 };
1129 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
1130 let details = details
1131 .as_ref()
1132 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
1133 let extras = hex::decode(details)
1134 .map_err(|e| sql_err!("{e}"))
1135 .and_then(|raw| ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}")))
1136 .and_then(|proto| SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}")))?;
1137 Ok(SqlServerSourceConnection {
1138 connection_id: connection_item.id(),
1139 connection: connection_item.id(),
1140 extras,
1141 })
1142}
1143
1144fn plan_postgres_source_connection(
1145 scx: &StatementContext<'_>,
1146 connection: &ResolvedItemName,
1147 options: &Vec<PgConfigOption<Aug>>,
1148) -> Result<PostgresSourceConnection<ReferencedConnection>, PlanError> {
1149 let connection_item = scx.get_item_by_resolved_name(connection)?;
1150 let PgConfigOptionExtracted {
1151 details,
1152 publication,
1153 text_columns: _,
1157 seen: _,
1158 } = options.clone().try_into()?;
1159 let details = details
1160 .as_ref()
1161 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
1162 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1163 let details =
1164 ProtoPostgresSourcePublicationDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1165 let publication_details =
1166 PostgresSourcePublicationDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1167 Ok(PostgresSourceConnection {
1168 connection: connection_item.id(),
1169 connection_id: connection_item.id(),
1170 publication: publication.expect("validated exists during purification"),
1171 publication_details,
1172 })
1173}
1174
1175fn plan_kafka_source_connection(
1176 scx: &StatementContext<'_>,
1177 connection_name: &ResolvedItemName,
1178 options: &Vec<ast::KafkaSourceConfigOption<Aug>>,
1179 include_metadata: &Vec<SourceIncludeMetadata>,
1180) -> Result<KafkaSourceConnection<ReferencedConnection>, PlanError> {
1181 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
1182 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
1183 sql_bail!(
1184 "{} is not a kafka connection",
1185 scx.catalog.resolve_full_name(connection_item.name())
1186 )
1187 }
1188 let KafkaSourceConfigOptionExtracted {
1189 group_id_prefix,
1190 topic,
1191 topic_metadata_refresh_interval,
1192 start_timestamp: _, start_offset,
1194 seen: _,
1195 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
1196 let topic = topic.expect("validated exists during purification");
1197 let mut start_offsets = BTreeMap::new();
1198 if let Some(offsets) = start_offset {
1199 for (part, offset) in offsets.iter().enumerate() {
1200 if *offset < 0 {
1201 sql_bail!("START OFFSET must be a nonnegative integer");
1202 }
1203 start_offsets.insert(i32::try_from(part)?, *offset);
1204 }
1205 }
1206 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
1207 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
1210 }
1211 let metadata_columns = include_metadata
1212 .into_iter()
1213 .flat_map(|item| match item {
1214 SourceIncludeMetadata::Timestamp { alias } => {
1215 let name = match alias {
1216 Some(name) => name.to_string(),
1217 None => "timestamp".to_owned(),
1218 };
1219 Some((name, KafkaMetadataKind::Timestamp))
1220 }
1221 SourceIncludeMetadata::Partition { alias } => {
1222 let name = match alias {
1223 Some(name) => name.to_string(),
1224 None => "partition".to_owned(),
1225 };
1226 Some((name, KafkaMetadataKind::Partition))
1227 }
1228 SourceIncludeMetadata::Offset { alias } => {
1229 let name = match alias {
1230 Some(name) => name.to_string(),
1231 None => "offset".to_owned(),
1232 };
1233 Some((name, KafkaMetadataKind::Offset))
1234 }
1235 SourceIncludeMetadata::Headers { alias } => {
1236 let name = match alias {
1237 Some(name) => name.to_string(),
1238 None => "headers".to_owned(),
1239 };
1240 Some((name, KafkaMetadataKind::Headers))
1241 }
1242 SourceIncludeMetadata::Header {
1243 alias,
1244 key,
1245 use_bytes,
1246 } => Some((
1247 alias.to_string(),
1248 KafkaMetadataKind::Header {
1249 key: key.clone(),
1250 use_bytes: *use_bytes,
1251 },
1252 )),
1253 SourceIncludeMetadata::Key { .. } => {
1254 None
1256 }
1257 })
1258 .collect();
1259 Ok(KafkaSourceConnection {
1260 connection: connection_item.id(),
1261 connection_id: connection_item.id(),
1262 topic,
1263 start_offsets,
1264 group_id_prefix,
1265 topic_metadata_refresh_interval,
1266 metadata_columns,
1267 })
1268}
1269
1270fn apply_source_envelope_encoding(
1271 scx: &StatementContext,
1272 envelope: &ast::SourceEnvelope,
1273 format: &Option<FormatSpecifier<Aug>>,
1274 key_desc: Option<RelationDesc>,
1275 value_desc: RelationDesc,
1276 include_metadata: &[SourceIncludeMetadata],
1277 metadata_columns_desc: Vec<(&str, SqlColumnType)>,
1278 source_connection: &GenericSourceConnection<ReferencedConnection>,
1279) -> Result<
1280 (
1281 RelationDesc,
1282 SourceEnvelope,
1283 Option<SourceDataEncoding<ReferencedConnection>>,
1284 ),
1285 PlanError,
1286> {
1287 let encoding = match format {
1288 Some(format) => Some(get_encoding(scx, format, envelope)?),
1289 None => None,
1290 };
1291
1292 let (key_desc, value_desc) = match &encoding {
1293 Some(encoding) => {
1294 match value_desc.typ().columns() {
1297 [typ] => match typ.scalar_type {
1298 SqlScalarType::Bytes => {}
1299 _ => sql_bail!(
1300 "The schema produced by the source is incompatible with format decoding"
1301 ),
1302 },
1303 _ => sql_bail!(
1304 "The schema produced by the source is incompatible with format decoding"
1305 ),
1306 }
1307
1308 let (key_desc, value_desc) = encoding.desc()?;
1309
1310 let key_desc = key_desc.map(|desc| {
1317 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1318 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1319 if is_kafka && is_envelope_none {
1320 RelationDesc::from_names_and_types(
1321 desc.into_iter()
1322 .map(|(name, typ)| (name, typ.nullable(true))),
1323 )
1324 } else {
1325 desc
1326 }
1327 });
1328 (key_desc, value_desc)
1329 }
1330 None => (key_desc, value_desc),
1331 };
1332
1333 let key_envelope_no_encoding = matches!(
1346 source_connection,
1347 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1348 load_generator: LoadGenerator::KeyValue(_),
1349 ..
1350 })
1351 );
1352 let mut key_envelope = get_key_envelope(
1353 include_metadata,
1354 encoding.as_ref(),
1355 key_envelope_no_encoding,
1356 )?;
1357
1358 match (&envelope, &key_envelope) {
1359 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1360 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1361 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1362 ),
1363 _ => {}
1364 };
1365
1366 let envelope = match &envelope {
1375 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1377 ast::SourceEnvelope::Debezium => {
1378 let after_idx = match typecheck_debezium(&value_desc) {
1380 Ok((_before_idx, after_idx)) => Ok(after_idx),
1381 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1382 Some(DataEncoding::Avro(_)) => Err(type_err),
1383 _ => Err(sql_err!(
1384 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1385 )),
1386 },
1387 }?;
1388
1389 UnplannedSourceEnvelope::Upsert {
1390 style: UpsertStyle::Debezium { after_idx },
1391 }
1392 }
1393 ast::SourceEnvelope::Upsert {
1394 value_decode_err_policy,
1395 } => {
1396 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1397 None => {
1398 if !key_envelope_no_encoding {
1399 bail_unsupported!(format!(
1400 "UPSERT requires a key/value format: {:?}",
1401 format
1402 ))
1403 }
1404 None
1405 }
1406 Some(key_encoding) => Some(key_encoding),
1407 };
1408 if key_envelope == KeyEnvelope::None {
1411 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1412 }
1413 let style = match value_decode_err_policy.as_slice() {
1415 [] => UpsertStyle::Default(key_envelope),
1416 [SourceErrorPolicy::Inline { alias }] => {
1417 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1418 UpsertStyle::ValueErrInline {
1419 key_envelope,
1420 error_column: alias
1421 .as_ref()
1422 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1423 }
1424 }
1425 _ => {
1426 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1427 }
1428 };
1429
1430 UnplannedSourceEnvelope::Upsert { style }
1431 }
1432 ast::SourceEnvelope::CdcV2 => {
1433 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1434 match format {
1436 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1437 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1438 }
1439 UnplannedSourceEnvelope::CdcV2
1440 }
1441 };
1442
1443 let metadata_desc = included_column_desc(metadata_columns_desc);
1444 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1445
1446 Ok((desc, envelope, encoding))
1447}
1448
1449fn plan_source_export_desc(
1452 scx: &StatementContext,
1453 name: &UnresolvedItemName,
1454 columns: &Vec<ColumnDef<Aug>>,
1455 constraints: &Vec<TableConstraint<Aug>>,
1456) -> Result<RelationDesc, PlanError> {
1457 let names: Vec<_> = columns
1458 .iter()
1459 .map(|c| normalize::column_name(c.name.clone()))
1460 .collect();
1461
1462 if let Some(dup) = names.iter().duplicates().next() {
1463 sql_bail!("column {} specified more than once", dup.quoted());
1464 }
1465
1466 let mut column_types = Vec::with_capacity(columns.len());
1469 let mut keys = Vec::new();
1470
1471 for (i, c) in columns.into_iter().enumerate() {
1472 let aug_data_type = &c.data_type;
1473 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1474 let mut nullable = true;
1475 for option in &c.options {
1476 match &option.option {
1477 ColumnOption::NotNull => nullable = false,
1478 ColumnOption::Default(_) => {
1479 bail_unsupported!("Source export with default value")
1480 }
1481 ColumnOption::Unique { is_primary } => {
1482 keys.push(vec![i]);
1483 if *is_primary {
1484 nullable = false;
1485 }
1486 }
1487 other => {
1488 bail_unsupported!(format!("Source export with column constraint: {}", other))
1489 }
1490 }
1491 }
1492 column_types.push(ty.nullable(nullable));
1493 }
1494
1495 let mut seen_primary = false;
1496 'c: for constraint in constraints {
1497 match constraint {
1498 TableConstraint::Unique {
1499 name: _,
1500 columns,
1501 is_primary,
1502 nulls_not_distinct,
1503 } => {
1504 if seen_primary && *is_primary {
1505 sql_bail!(
1506 "multiple primary keys for source export {} are not allowed",
1507 name.to_ast_string_stable()
1508 );
1509 }
1510 seen_primary = *is_primary || seen_primary;
1511
1512 let mut key = vec![];
1513 for column in columns {
1514 let column = normalize::column_name(column.clone());
1515 match names.iter().position(|name| *name == column) {
1516 None => sql_bail!("unknown column in constraint: {}", column),
1517 Some(i) => {
1518 let nullable = &mut column_types[i].nullable;
1519 if *is_primary {
1520 if *nulls_not_distinct {
1521 sql_bail!(
1522 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1523 );
1524 }
1525 *nullable = false;
1526 } else if !(*nulls_not_distinct || !*nullable) {
1527 break 'c;
1530 }
1531
1532 key.push(i);
1533 }
1534 }
1535 }
1536
1537 if *is_primary {
1538 keys.insert(0, key);
1539 } else {
1540 keys.push(key);
1541 }
1542 }
1543 TableConstraint::ForeignKey { .. } => {
1544 bail_unsupported!("Source export with a foreign key")
1545 }
1546 TableConstraint::Check { .. } => {
1547 bail_unsupported!("Source export with a check constraint")
1548 }
1549 }
1550 }
1551
1552 let typ = SqlRelationType::new(column_types).with_keys(keys);
1553 let desc = RelationDesc::new(typ, names);
1554 Ok(desc)
1555}
1556
1557generate_extracted_config!(
1558 CreateSubsourceOption,
1559 (Progress, bool, Default(false)),
1560 (ExternalReference, UnresolvedItemName),
1561 (RetainHistory, OptionalDuration),
1562 (TextColumns, Vec::<Ident>, Default(vec![])),
1563 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1564 (Details, String)
1565);
1566
1567pub fn plan_create_subsource(
1568 scx: &StatementContext,
1569 stmt: CreateSubsourceStatement<Aug>,
1570) -> Result<Plan, PlanError> {
1571 let CreateSubsourceStatement {
1572 name,
1573 columns,
1574 of_source,
1575 constraints,
1576 if_not_exists,
1577 with_options,
1578 } = &stmt;
1579
1580 let CreateSubsourceOptionExtracted {
1581 progress,
1582 retain_history,
1583 external_reference,
1584 text_columns,
1585 exclude_columns,
1586 details,
1587 seen: _,
1588 } = with_options.clone().try_into()?;
1589
1590 assert!(
1595 progress ^ (external_reference.is_some() && of_source.is_some()),
1596 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1597 );
1598
1599 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1600
1601 let data_source = if let Some(source_reference) = of_source {
1602 if scx.catalog.system_vars().enable_create_table_from_source()
1605 && scx.catalog.system_vars().force_source_table_syntax()
1606 {
1607 Err(PlanError::UseTablesForSources(
1608 "CREATE SUBSOURCE".to_string(),
1609 ))?;
1610 }
1611
1612 let ingestion_id = *source_reference.item_id();
1615 let external_reference = external_reference.unwrap();
1616
1617 let details = details
1620 .as_ref()
1621 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1622 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1623 let details =
1624 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1625 let details =
1626 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1627 let details = match details {
1628 SourceExportStatementDetails::Postgres { table } => {
1629 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1630 column_casts: crate::pure::postgres::generate_column_casts(
1631 scx,
1632 &table,
1633 &text_columns,
1634 )?,
1635 table,
1636 })
1637 }
1638 SourceExportStatementDetails::MySql {
1639 table,
1640 initial_gtid_set,
1641 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1642 table,
1643 initial_gtid_set,
1644 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1645 exclude_columns: exclude_columns
1646 .into_iter()
1647 .map(|c| c.into_string())
1648 .collect(),
1649 }),
1650 SourceExportStatementDetails::SqlServer {
1651 table,
1652 capture_instance,
1653 initial_lsn,
1654 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1655 capture_instance,
1656 table,
1657 initial_lsn,
1658 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1659 exclude_columns: exclude_columns
1660 .into_iter()
1661 .map(|c| c.into_string())
1662 .collect(),
1663 }),
1664 SourceExportStatementDetails::LoadGenerator { output } => {
1665 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1666 }
1667 SourceExportStatementDetails::Kafka {} => {
1668 bail_unsupported!("subsources cannot reference Kafka sources")
1669 }
1670 };
1671 DataSourceDesc::IngestionExport {
1672 ingestion_id,
1673 external_reference,
1674 details,
1675 data_config: SourceExportDataConfig {
1677 envelope: SourceEnvelope::None(NoneEnvelope {
1678 key_envelope: KeyEnvelope::None,
1679 key_arity: 0,
1680 }),
1681 encoding: None,
1682 },
1683 }
1684 } else if progress {
1685 DataSourceDesc::Progress
1686 } else {
1687 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1688 };
1689
1690 let if_not_exists = *if_not_exists;
1691 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1692
1693 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1694
1695 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1696 let source = Source {
1697 create_sql,
1698 data_source,
1699 desc,
1700 compaction_window,
1701 };
1702
1703 Ok(Plan::CreateSource(CreateSourcePlan {
1704 name,
1705 source,
1706 if_not_exists,
1707 timeline: Timeline::EpochMilliseconds,
1708 in_cluster: None,
1709 }))
1710}
1711
1712generate_extracted_config!(
1713 TableFromSourceOption,
1714 (TextColumns, Vec::<Ident>, Default(vec![])),
1715 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1716 (PartitionBy, Vec<Ident>),
1717 (RetainHistory, OptionalDuration),
1718 (Details, String)
1719);
1720
1721pub fn plan_create_table_from_source(
1722 scx: &StatementContext,
1723 stmt: CreateTableFromSourceStatement<Aug>,
1724) -> Result<Plan, PlanError> {
1725 if !scx.catalog.system_vars().enable_create_table_from_source() {
1726 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1727 }
1728
1729 let CreateTableFromSourceStatement {
1730 name,
1731 columns,
1732 constraints,
1733 if_not_exists,
1734 source,
1735 external_reference,
1736 envelope,
1737 format,
1738 include_metadata,
1739 with_options,
1740 } = &stmt;
1741
1742 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1743
1744 let TableFromSourceOptionExtracted {
1745 text_columns,
1746 exclude_columns,
1747 retain_history,
1748 partition_by,
1749 details,
1750 seen: _,
1751 } = with_options.clone().try_into()?;
1752
1753 let source_item = scx.get_item_by_resolved_name(source)?;
1754 let ingestion_id = source_item.id();
1755
1756 let details = details
1759 .as_ref()
1760 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1761 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1762 let details =
1763 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1764 let details =
1765 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1766
1767 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1768 && include_metadata
1769 .iter()
1770 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1771 {
1772 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1774 }
1775 if !matches!(
1776 details,
1777 SourceExportStatementDetails::Kafka { .. }
1778 | SourceExportStatementDetails::LoadGenerator { .. }
1779 ) && !include_metadata.is_empty()
1780 {
1781 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1782 }
1783
1784 let details = match details {
1785 SourceExportStatementDetails::Postgres { table } => {
1786 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1787 column_casts: crate::pure::postgres::generate_column_casts(
1788 scx,
1789 &table,
1790 &text_columns,
1791 )?,
1792 table,
1793 })
1794 }
1795 SourceExportStatementDetails::MySql {
1796 table,
1797 initial_gtid_set,
1798 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1799 table,
1800 initial_gtid_set,
1801 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1802 exclude_columns: exclude_columns
1803 .into_iter()
1804 .map(|c| c.into_string())
1805 .collect(),
1806 }),
1807 SourceExportStatementDetails::SqlServer {
1808 table,
1809 capture_instance,
1810 initial_lsn,
1811 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1812 table,
1813 capture_instance,
1814 initial_lsn,
1815 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1816 exclude_columns: exclude_columns
1817 .into_iter()
1818 .map(|c| c.into_string())
1819 .collect(),
1820 }),
1821 SourceExportStatementDetails::LoadGenerator { output } => {
1822 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1823 }
1824 SourceExportStatementDetails::Kafka {} => {
1825 if !include_metadata.is_empty()
1826 && !matches!(
1827 envelope,
1828 ast::SourceEnvelope::Upsert { .. }
1829 | ast::SourceEnvelope::None
1830 | ast::SourceEnvelope::Debezium
1831 )
1832 {
1833 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1835 }
1836
1837 let metadata_columns = include_metadata
1838 .into_iter()
1839 .flat_map(|item| match item {
1840 SourceIncludeMetadata::Timestamp { alias } => {
1841 let name = match alias {
1842 Some(name) => name.to_string(),
1843 None => "timestamp".to_owned(),
1844 };
1845 Some((name, KafkaMetadataKind::Timestamp))
1846 }
1847 SourceIncludeMetadata::Partition { alias } => {
1848 let name = match alias {
1849 Some(name) => name.to_string(),
1850 None => "partition".to_owned(),
1851 };
1852 Some((name, KafkaMetadataKind::Partition))
1853 }
1854 SourceIncludeMetadata::Offset { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "offset".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Offset))
1860 }
1861 SourceIncludeMetadata::Headers { alias } => {
1862 let name = match alias {
1863 Some(name) => name.to_string(),
1864 None => "headers".to_owned(),
1865 };
1866 Some((name, KafkaMetadataKind::Headers))
1867 }
1868 SourceIncludeMetadata::Header {
1869 alias,
1870 key,
1871 use_bytes,
1872 } => Some((
1873 alias.to_string(),
1874 KafkaMetadataKind::Header {
1875 key: key.clone(),
1876 use_bytes: *use_bytes,
1877 },
1878 )),
1879 SourceIncludeMetadata::Key { .. } => {
1880 None
1882 }
1883 })
1884 .collect();
1885
1886 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1887 }
1888 };
1889
1890 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1891
1892 let (key_desc, value_desc) =
1897 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1898 let columns = match columns {
1899 TableFromSourceColumns::Defined(columns) => columns,
1900 _ => unreachable!(),
1901 };
1902 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1903 (None, desc)
1904 } else {
1905 let key_desc = source_connection.default_key_desc();
1906 let value_desc = source_connection.default_value_desc();
1907 (Some(key_desc), value_desc)
1908 };
1909
1910 let metadata_columns_desc = match &details {
1911 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1912 metadata_columns, ..
1913 }) => kafka_metadata_columns_desc(metadata_columns),
1914 _ => vec![],
1915 };
1916
1917 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1918 scx,
1919 &envelope,
1920 format,
1921 key_desc,
1922 value_desc,
1923 include_metadata,
1924 metadata_columns_desc,
1925 source_connection,
1926 )?;
1927 if let TableFromSourceColumns::Named(col_names) = columns {
1928 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1929 }
1930
1931 let names: Vec<_> = desc.iter_names().cloned().collect();
1932 if let Some(dup) = names.iter().duplicates().next() {
1933 sql_bail!("column {} specified more than once", dup.quoted());
1934 }
1935
1936 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1937
1938 let timeline = match envelope {
1941 SourceEnvelope::CdcV2 => {
1942 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1943 }
1944 _ => Timeline::EpochMilliseconds,
1945 };
1946
1947 if let Some(partition_by) = partition_by {
1948 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1949 check_partition_by(&desc, partition_by)?;
1950 }
1951
1952 let data_source = DataSourceDesc::IngestionExport {
1953 ingestion_id,
1954 external_reference: external_reference
1955 .as_ref()
1956 .expect("populated in purification")
1957 .clone(),
1958 details,
1959 data_config: SourceExportDataConfig { envelope, encoding },
1960 };
1961
1962 let if_not_exists = *if_not_exists;
1963
1964 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1965
1966 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1967 let table = Table {
1968 create_sql,
1969 desc: VersionedRelationDesc::new(desc),
1970 temporary: false,
1971 compaction_window,
1972 data_source: TableDataSource::DataSource {
1973 desc: data_source,
1974 timeline,
1975 },
1976 };
1977
1978 Ok(Plan::CreateTable(CreateTablePlan {
1979 name,
1980 table,
1981 if_not_exists,
1982 }))
1983}
1984
1985generate_extracted_config!(
1986 LoadGeneratorOption,
1987 (TickInterval, Duration),
1988 (AsOf, u64, Default(0_u64)),
1989 (UpTo, u64, Default(u64::MAX)),
1990 (ScaleFactor, f64),
1991 (MaxCardinality, u64),
1992 (Keys, u64),
1993 (SnapshotRounds, u64),
1994 (TransactionalSnapshot, bool),
1995 (ValueSize, u64),
1996 (Seed, u64),
1997 (Partitions, u64),
1998 (BatchSize, u64)
1999);
2000
2001impl LoadGeneratorOptionExtracted {
2002 pub(super) fn ensure_only_valid_options(
2003 &self,
2004 loadgen: &ast::LoadGenerator,
2005 ) -> Result<(), PlanError> {
2006 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2007
2008 let mut options = self.seen.clone();
2009
2010 let permitted_options: &[_] = match loadgen {
2011 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2012 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2013 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2014 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2015 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2016 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2017 ast::LoadGenerator::KeyValue => &[
2018 TickInterval,
2019 Keys,
2020 SnapshotRounds,
2021 TransactionalSnapshot,
2022 ValueSize,
2023 Seed,
2024 Partitions,
2025 BatchSize,
2026 ],
2027 };
2028
2029 for o in permitted_options {
2030 options.remove(o);
2031 }
2032
2033 if !options.is_empty() {
2034 sql_bail!(
2035 "{} load generators do not support {} values",
2036 loadgen,
2037 options.iter().join(", ")
2038 )
2039 }
2040
2041 Ok(())
2042 }
2043}
2044
2045pub(crate) fn load_generator_ast_to_generator(
2046 scx: &StatementContext,
2047 loadgen: &ast::LoadGenerator,
2048 options: &[LoadGeneratorOption<Aug>],
2049 include_metadata: &[SourceIncludeMetadata],
2050) -> Result<LoadGenerator, PlanError> {
2051 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2052 extracted.ensure_only_valid_options(loadgen)?;
2053
2054 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2055 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2056 }
2057
2058 let load_generator = match loadgen {
2059 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2060 ast::LoadGenerator::Clock => {
2061 scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2062 LoadGenerator::Clock
2063 }
2064 ast::LoadGenerator::Counter => {
2065 let LoadGeneratorOptionExtracted {
2066 max_cardinality, ..
2067 } = extracted;
2068 LoadGenerator::Counter { max_cardinality }
2069 }
2070 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2071 ast::LoadGenerator::Datums => LoadGenerator::Datums,
2072 ast::LoadGenerator::Tpch => {
2073 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2074
2075 let sf: f64 = scale_factor.unwrap_or(0.01);
2077 if !sf.is_finite() || sf < 0.0 {
2078 sql_bail!("unsupported scale factor {sf}");
2079 }
2080
2081 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2082 let total = (sf * multiplier).floor();
2083 let mut i = i64::try_cast_from(total)
2084 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2085 if i < 1 {
2086 i = 1;
2087 }
2088 Ok(i)
2089 };
2090
2091 let count_supplier = f_to_i(10_000f64)?;
2094 let count_part = f_to_i(200_000f64)?;
2095 let count_customer = f_to_i(150_000f64)?;
2096 let count_orders = f_to_i(150_000f64 * 10f64)?;
2097 let count_clerk = f_to_i(1_000f64)?;
2098
2099 LoadGenerator::Tpch {
2100 count_supplier,
2101 count_part,
2102 count_customer,
2103 count_orders,
2104 count_clerk,
2105 }
2106 }
2107 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2108 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2109 let LoadGeneratorOptionExtracted {
2110 keys,
2111 snapshot_rounds,
2112 transactional_snapshot,
2113 value_size,
2114 tick_interval,
2115 seed,
2116 partitions,
2117 batch_size,
2118 ..
2119 } = extracted;
2120
2121 let mut include_offset = None;
2122 for im in include_metadata {
2123 match im {
2124 SourceIncludeMetadata::Offset { alias } => {
2125 include_offset = match alias {
2126 Some(alias) => Some(alias.to_string()),
2127 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2128 }
2129 }
2130 SourceIncludeMetadata::Key { .. } => continue,
2131
2132 _ => {
2133 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2134 }
2135 };
2136 }
2137
2138 let lgkv = KeyValueLoadGenerator {
2139 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2140 snapshot_rounds: snapshot_rounds
2141 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2142 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2144 value_size: value_size
2145 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2146 partitions: partitions
2147 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2148 tick_interval,
2149 batch_size: batch_size
2150 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2151 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2152 include_offset,
2153 };
2154
2155 if lgkv.keys == 0
2156 || lgkv.partitions == 0
2157 || lgkv.value_size == 0
2158 || lgkv.batch_size == 0
2159 {
2160 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2161 }
2162
2163 if lgkv.keys % lgkv.partitions != 0 {
2164 sql_bail!("KEYS must be a multiple of PARTITIONS")
2165 }
2166
2167 if lgkv.batch_size > lgkv.keys {
2168 sql_bail!("KEYS must be larger than BATCH SIZE")
2169 }
2170
2171 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2174 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2175 }
2176
2177 if lgkv.snapshot_rounds == 0 {
2178 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2179 }
2180
2181 LoadGenerator::KeyValue(lgkv)
2182 }
2183 };
2184
2185 Ok(load_generator)
2186}
2187
2188fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2189 let before = value_desc.get_by_name(&"before".into());
2190 let (after_idx, after_ty) = value_desc
2191 .get_by_name(&"after".into())
2192 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2193 let before_idx = if let Some((before_idx, before_ty)) = before {
2194 if !matches!(before_ty.scalar_type, SqlScalarType::Record { .. }) {
2195 sql_bail!("'before' column must be of type record");
2196 }
2197 if before_ty != after_ty {
2198 sql_bail!("'before' type differs from 'after' column");
2199 }
2200 Some(before_idx)
2201 } else {
2202 None
2203 };
2204 Ok((before_idx, after_idx))
2205}
2206
2207fn get_encoding(
2208 scx: &StatementContext,
2209 format: &FormatSpecifier<Aug>,
2210 envelope: &ast::SourceEnvelope,
2211) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2212 let encoding = match format {
2213 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2214 FormatSpecifier::KeyValue { key, value } => {
2215 let key = {
2216 let encoding = get_encoding_inner(scx, key)?;
2217 Some(encoding.key.unwrap_or(encoding.value))
2218 };
2219 let value = get_encoding_inner(scx, value)?.value;
2220 SourceDataEncoding { key, value }
2221 }
2222 };
2223
2224 let requires_keyvalue = matches!(
2225 envelope,
2226 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2227 );
2228 let is_keyvalue = encoding.key.is_some();
2229 if requires_keyvalue && !is_keyvalue {
2230 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2231 };
2232
2233 Ok(encoding)
2234}
2235
2236fn source_sink_cluster_config<'a, 'ctx>(
2242 scx: &'a StatementContext<'ctx>,
2243 in_cluster: &mut Option<ResolvedClusterName>,
2244) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2245 let cluster = match in_cluster {
2246 None => {
2247 let cluster = scx.catalog.resolve_cluster(None)?;
2248 *in_cluster = Some(ResolvedClusterName {
2249 id: cluster.id(),
2250 print_name: None,
2251 });
2252 cluster
2253 }
2254 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2255 };
2256
2257 Ok(cluster)
2258}
2259
2260generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2261
2262#[derive(Debug)]
2263pub struct Schema {
2264 pub key_schema: Option<String>,
2265 pub value_schema: String,
2266 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2267 pub confluent_wire_format: bool,
2268}
2269
2270fn get_encoding_inner(
2271 scx: &StatementContext,
2272 format: &Format<Aug>,
2273) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2274 let value = match format {
2275 Format::Bytes => DataEncoding::Bytes,
2276 Format::Avro(schema) => {
2277 let Schema {
2278 key_schema,
2279 value_schema,
2280 csr_connection,
2281 confluent_wire_format,
2282 } = match schema {
2283 AvroSchema::InlineSchema {
2286 schema: ast::Schema { schema },
2287 with_options,
2288 } => {
2289 let AvroSchemaOptionExtracted {
2290 confluent_wire_format,
2291 ..
2292 } = with_options.clone().try_into()?;
2293
2294 Schema {
2295 key_schema: None,
2296 value_schema: schema.clone(),
2297 csr_connection: None,
2298 confluent_wire_format,
2299 }
2300 }
2301 AvroSchema::Csr {
2302 csr_connection:
2303 CsrConnectionAvro {
2304 connection,
2305 seed,
2306 key_strategy: _,
2307 value_strategy: _,
2308 },
2309 } => {
2310 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2311 let csr_connection = match item.connection()? {
2312 Connection::Csr(_) => item.id(),
2313 _ => {
2314 sql_bail!(
2315 "{} is not a schema registry connection",
2316 scx.catalog
2317 .resolve_full_name(item.name())
2318 .to_string()
2319 .quoted()
2320 )
2321 }
2322 };
2323
2324 if let Some(seed) = seed {
2325 Schema {
2326 key_schema: seed.key_schema.clone(),
2327 value_schema: seed.value_schema.clone(),
2328 csr_connection: Some(csr_connection),
2329 confluent_wire_format: true,
2330 }
2331 } else {
2332 unreachable!("CSR seed resolution should already have been called: Avro")
2333 }
2334 }
2335 };
2336
2337 if let Some(key_schema) = key_schema {
2338 return Ok(SourceDataEncoding {
2339 key: Some(DataEncoding::Avro(AvroEncoding {
2340 schema: key_schema,
2341 csr_connection: csr_connection.clone(),
2342 confluent_wire_format,
2343 })),
2344 value: DataEncoding::Avro(AvroEncoding {
2345 schema: value_schema,
2346 csr_connection,
2347 confluent_wire_format,
2348 }),
2349 });
2350 } else {
2351 DataEncoding::Avro(AvroEncoding {
2352 schema: value_schema,
2353 csr_connection,
2354 confluent_wire_format,
2355 })
2356 }
2357 }
2358 Format::Protobuf(schema) => match schema {
2359 ProtobufSchema::Csr {
2360 csr_connection:
2361 CsrConnectionProtobuf {
2362 connection:
2363 CsrConnection {
2364 connection,
2365 options,
2366 },
2367 seed,
2368 },
2369 } => {
2370 if let Some(CsrSeedProtobuf { key, value }) = seed {
2371 let item = scx.get_item_by_resolved_name(connection)?;
2372 let _ = match item.connection()? {
2373 Connection::Csr(connection) => connection,
2374 _ => {
2375 sql_bail!(
2376 "{} is not a schema registry connection",
2377 scx.catalog
2378 .resolve_full_name(item.name())
2379 .to_string()
2380 .quoted()
2381 )
2382 }
2383 };
2384
2385 if !options.is_empty() {
2386 sql_bail!("Protobuf CSR connections do not support any options");
2387 }
2388
2389 let value = DataEncoding::Protobuf(ProtobufEncoding {
2390 descriptors: strconv::parse_bytes(&value.schema)?,
2391 message_name: value.message_name.clone(),
2392 confluent_wire_format: true,
2393 });
2394 if let Some(key) = key {
2395 return Ok(SourceDataEncoding {
2396 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2397 descriptors: strconv::parse_bytes(&key.schema)?,
2398 message_name: key.message_name.clone(),
2399 confluent_wire_format: true,
2400 })),
2401 value,
2402 });
2403 }
2404 value
2405 } else {
2406 unreachable!("CSR seed resolution should already have been called: Proto")
2407 }
2408 }
2409 ProtobufSchema::InlineSchema {
2410 message_name,
2411 schema: ast::Schema { schema },
2412 } => {
2413 let descriptors = strconv::parse_bytes(schema)?;
2414
2415 DataEncoding::Protobuf(ProtobufEncoding {
2416 descriptors,
2417 message_name: message_name.to_owned(),
2418 confluent_wire_format: false,
2419 })
2420 }
2421 },
2422 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2423 regex: mz_repr::adt::regex::Regex::new(regex, false)
2424 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2425 }),
2426 Format::Csv { columns, delimiter } => {
2427 let columns = match columns {
2428 CsvColumns::Header { names } => {
2429 if names.is_empty() {
2430 sql_bail!("[internal error] column spec should get names in purify")
2431 }
2432 ColumnSpec::Header {
2433 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2434 }
2435 }
2436 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2437 };
2438 DataEncoding::Csv(CsvEncoding {
2439 columns,
2440 delimiter: u8::try_from(*delimiter)
2441 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2442 })
2443 }
2444 Format::Json { array: false } => DataEncoding::Json,
2445 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2446 Format::Text => DataEncoding::Text,
2447 };
2448 Ok(SourceDataEncoding { key: None, value })
2449}
2450
2451fn get_key_envelope(
2453 included_items: &[SourceIncludeMetadata],
2454 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2455 key_envelope_no_encoding: bool,
2456) -> Result<KeyEnvelope, PlanError> {
2457 let key_definition = included_items
2458 .iter()
2459 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2460 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2461 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2462 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2463 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2464 (Some(name), _) if key_envelope_no_encoding => {
2465 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2466 }
2467 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2468 (_, None) => {
2469 sql_bail!(
2473 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2474 got bare FORMAT"
2475 );
2476 }
2477 }
2478 } else {
2479 Ok(KeyEnvelope::None)
2480 }
2481}
2482
2483fn get_unnamed_key_envelope(
2486 key: Option<&DataEncoding<ReferencedConnection>>,
2487) -> Result<KeyEnvelope, PlanError> {
2488 let is_composite = match key {
2492 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2493 Some(
2494 DataEncoding::Avro(_)
2495 | DataEncoding::Csv(_)
2496 | DataEncoding::Protobuf(_)
2497 | DataEncoding::Regex { .. },
2498 ) => true,
2499 None => false,
2500 };
2501
2502 if is_composite {
2503 Ok(KeyEnvelope::Flattened)
2504 } else {
2505 Ok(KeyEnvelope::Named("key".to_string()))
2506 }
2507}
2508
2509pub fn describe_create_view(
2510 _: &StatementContext,
2511 _: CreateViewStatement<Aug>,
2512) -> Result<StatementDesc, PlanError> {
2513 Ok(StatementDesc::new(None))
2514}
2515
2516pub fn plan_view(
2517 scx: &StatementContext,
2518 def: &mut ViewDefinition<Aug>,
2519 temporary: bool,
2520) -> Result<(QualifiedItemName, View), PlanError> {
2521 let create_sql = normalize::create_statement(
2522 scx,
2523 Statement::CreateView(CreateViewStatement {
2524 if_exists: IfExistsBehavior::Error,
2525 temporary,
2526 definition: def.clone(),
2527 }),
2528 )?;
2529
2530 let ViewDefinition {
2531 name,
2532 columns,
2533 query,
2534 } = def;
2535
2536 let query::PlannedRootQuery {
2537 expr,
2538 mut desc,
2539 finishing,
2540 scope: _,
2541 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2542 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2548 &finishing,
2549 expr.arity()
2550 ));
2551 if expr.contains_parameters()? {
2552 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2553 }
2554
2555 let dependencies = expr
2556 .depends_on()
2557 .into_iter()
2558 .map(|gid| scx.catalog.resolve_item_id(&gid))
2559 .collect();
2560
2561 let name = if temporary {
2562 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2563 } else {
2564 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2565 };
2566
2567 plan_utils::maybe_rename_columns(
2568 format!("view {}", scx.catalog.resolve_full_name(&name)),
2569 &mut desc,
2570 columns,
2571 )?;
2572 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2573
2574 if let Some(dup) = names.iter().duplicates().next() {
2575 sql_bail!("column {} specified more than once", dup.quoted());
2576 }
2577
2578 let view = View {
2579 create_sql,
2580 expr,
2581 dependencies,
2582 column_names: names,
2583 temporary,
2584 };
2585
2586 Ok((name, view))
2587}
2588
2589pub fn plan_create_view(
2590 scx: &StatementContext,
2591 mut stmt: CreateViewStatement<Aug>,
2592) -> Result<Plan, PlanError> {
2593 let CreateViewStatement {
2594 temporary,
2595 if_exists,
2596 definition,
2597 } = &mut stmt;
2598 let (name, view) = plan_view(scx, definition, *temporary)?;
2599
2600 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2603
2604 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2605 let if_exists = true;
2606 let cascade = false;
2607 let maybe_item_to_drop = plan_drop_item(
2608 scx,
2609 ObjectType::View,
2610 if_exists,
2611 definition.name.clone(),
2612 cascade,
2613 )?;
2614
2615 if let Some(id) = maybe_item_to_drop {
2617 let dependencies = view.expr.depends_on();
2618 let invalid_drop = scx
2619 .get_item(&id)
2620 .global_ids()
2621 .any(|gid| dependencies.contains(&gid));
2622 if invalid_drop {
2623 let item = scx.catalog.get_item(&id);
2624 sql_bail!(
2625 "cannot replace view {0}: depended upon by new {0} definition",
2626 scx.catalog.resolve_full_name(item.name())
2627 );
2628 }
2629
2630 Some(id)
2631 } else {
2632 None
2633 }
2634 } else {
2635 None
2636 };
2637 let drop_ids = replace
2638 .map(|id| {
2639 scx.catalog
2640 .item_dependents(id)
2641 .into_iter()
2642 .map(|id| id.unwrap_item_id())
2643 .collect()
2644 })
2645 .unwrap_or_default();
2646
2647 let full_name = scx.catalog.resolve_full_name(&name);
2649 let partial_name = PartialItemName::from(full_name.clone());
2650 if let (Ok(item), IfExistsBehavior::Error, false) = (
2653 scx.catalog.resolve_item_or_type(&partial_name),
2654 *if_exists,
2655 ignore_if_exists_errors,
2656 ) {
2657 return Err(PlanError::ItemAlreadyExists {
2658 name: full_name.to_string(),
2659 item_type: item.item_type(),
2660 });
2661 }
2662
2663 Ok(Plan::CreateView(CreateViewPlan {
2664 name,
2665 view,
2666 replace,
2667 drop_ids,
2668 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2669 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2670 }))
2671}
2672
2673pub fn describe_create_materialized_view(
2674 _: &StatementContext,
2675 _: CreateMaterializedViewStatement<Aug>,
2676) -> Result<StatementDesc, PlanError> {
2677 Ok(StatementDesc::new(None))
2678}
2679
2680pub fn describe_create_continual_task(
2681 _: &StatementContext,
2682 _: CreateContinualTaskStatement<Aug>,
2683) -> Result<StatementDesc, PlanError> {
2684 Ok(StatementDesc::new(None))
2685}
2686
2687pub fn describe_create_network_policy(
2688 _: &StatementContext,
2689 _: CreateNetworkPolicyStatement<Aug>,
2690) -> Result<StatementDesc, PlanError> {
2691 Ok(StatementDesc::new(None))
2692}
2693
2694pub fn describe_alter_network_policy(
2695 _: &StatementContext,
2696 _: AlterNetworkPolicyStatement<Aug>,
2697) -> Result<StatementDesc, PlanError> {
2698 Ok(StatementDesc::new(None))
2699}
2700
2701pub fn plan_create_materialized_view(
2702 scx: &StatementContext,
2703 mut stmt: CreateMaterializedViewStatement<Aug>,
2704) -> Result<Plan, PlanError> {
2705 let cluster_id =
2706 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2707 stmt.in_cluster = Some(ResolvedClusterName {
2708 id: cluster_id,
2709 print_name: None,
2710 });
2711
2712 let create_sql =
2713 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2714
2715 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2716 let name = scx.allocate_qualified_name(partial_name.clone())?;
2717
2718 let query::PlannedRootQuery {
2719 expr,
2720 mut desc,
2721 finishing,
2722 scope: _,
2723 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2724 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2726 &finishing,
2727 expr.arity()
2728 ));
2729 if expr.contains_parameters()? {
2730 return Err(PlanError::ParameterNotAllowed(
2731 "materialized views".to_string(),
2732 ));
2733 }
2734
2735 plan_utils::maybe_rename_columns(
2736 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2737 &mut desc,
2738 &stmt.columns,
2739 )?;
2740 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2741
2742 let MaterializedViewOptionExtracted {
2743 assert_not_null,
2744 partition_by,
2745 retain_history,
2746 refresh,
2747 seen: _,
2748 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2749
2750 if let Some(partition_by) = partition_by {
2751 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2752 check_partition_by(&desc, partition_by)?;
2753 }
2754
2755 let refresh_schedule = {
2756 let mut refresh_schedule = RefreshSchedule::default();
2757 let mut on_commits_seen = 0;
2758 for refresh_option_value in refresh {
2759 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2760 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2761 }
2762 match refresh_option_value {
2763 RefreshOptionValue::OnCommit => {
2764 on_commits_seen += 1;
2765 }
2766 RefreshOptionValue::AtCreation => {
2767 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2768 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2769 }
2770 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2771 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2773 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2774 name: "REFRESH AT",
2775 scope: &Scope::empty(),
2776 relation_type: &SqlRelationType::empty(),
2777 allow_aggregates: false,
2778 allow_subqueries: false,
2779 allow_parameters: false,
2780 allow_windows: false,
2781 };
2782 let hir = plan_expr(ecx, &time)?.cast_to(
2783 ecx,
2784 CastContext::Assignment,
2785 &SqlScalarType::MzTimestamp,
2786 )?;
2787 let timestamp = hir
2789 .into_literal_mz_timestamp()
2790 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2791 refresh_schedule.ats.push(timestamp);
2792 }
2793 RefreshOptionValue::Every(RefreshEveryOptionValue {
2794 interval,
2795 aligned_to,
2796 }) => {
2797 let interval = Interval::try_from_value(Value::Interval(interval))?;
2798 if interval.as_microseconds() <= 0 {
2799 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2800 }
2801 if interval.months != 0 {
2802 sql_bail!("REFRESH interval must not involve units larger than days");
2807 }
2808 let interval = interval.duration()?;
2809 if u64::try_from(interval.as_millis()).is_err() {
2810 sql_bail!("REFRESH interval too large");
2811 }
2812
2813 let mut aligned_to = match aligned_to {
2814 Some(aligned_to) => aligned_to,
2815 None => {
2816 soft_panic_or_log!(
2817 "ALIGNED TO should have been filled in by purification"
2818 );
2819 sql_bail!(
2820 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2821 )
2822 }
2823 };
2824
2825 transform_ast::transform(scx, &mut aligned_to)?;
2827
2828 let ecx = &ExprContext {
2829 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2830 name: "REFRESH EVERY ... ALIGNED TO",
2831 scope: &Scope::empty(),
2832 relation_type: &SqlRelationType::empty(),
2833 allow_aggregates: false,
2834 allow_subqueries: false,
2835 allow_parameters: false,
2836 allow_windows: false,
2837 };
2838 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2839 ecx,
2840 CastContext::Assignment,
2841 &SqlScalarType::MzTimestamp,
2842 )?;
2843 let aligned_to_const = aligned_to_hir
2845 .into_literal_mz_timestamp()
2846 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2847
2848 refresh_schedule.everies.push(RefreshEvery {
2849 interval,
2850 aligned_to: aligned_to_const,
2851 });
2852 }
2853 }
2854 }
2855
2856 if on_commits_seen > 1 {
2857 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2858 }
2859 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2860 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2861 }
2862
2863 if refresh_schedule == RefreshSchedule::default() {
2864 None
2865 } else {
2866 Some(refresh_schedule)
2867 }
2868 };
2869
2870 let as_of = stmt.as_of.map(Timestamp::from);
2871 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2872 let mut non_null_assertions = assert_not_null
2873 .into_iter()
2874 .map(normalize::column_name)
2875 .map(|assertion_name| {
2876 column_names
2877 .iter()
2878 .position(|col| col == &assertion_name)
2879 .ok_or_else(|| {
2880 sql_err!(
2881 "column {} in ASSERT NOT NULL option not found",
2882 assertion_name.quoted()
2883 )
2884 })
2885 })
2886 .collect::<Result<Vec<_>, _>>()?;
2887 non_null_assertions.sort();
2888 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2889 let dup = &column_names[*dup];
2890 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2891 }
2892
2893 if let Some(dup) = column_names.iter().duplicates().next() {
2894 sql_bail!("column {} specified more than once", dup.quoted());
2895 }
2896
2897 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2900 Ok(true) => IfExistsBehavior::Skip,
2901 _ => stmt.if_exists,
2902 };
2903
2904 let mut replace = None;
2905 let mut if_not_exists = false;
2906 match if_exists {
2907 IfExistsBehavior::Replace => {
2908 let if_exists = true;
2909 let cascade = false;
2910 let replace_id = plan_drop_item(
2911 scx,
2912 ObjectType::MaterializedView,
2913 if_exists,
2914 partial_name.into(),
2915 cascade,
2916 )?;
2917
2918 if let Some(id) = replace_id {
2920 let dependencies = expr.depends_on();
2921 let invalid_drop = scx
2922 .get_item(&id)
2923 .global_ids()
2924 .any(|gid| dependencies.contains(&gid));
2925 if invalid_drop {
2926 let item = scx.catalog.get_item(&id);
2927 sql_bail!(
2928 "cannot replace materialized view {0}: depended upon by new {0} definition",
2929 scx.catalog.resolve_full_name(item.name())
2930 );
2931 }
2932 replace = Some(id);
2933 }
2934 }
2935 IfExistsBehavior::Skip => if_not_exists = true,
2936 IfExistsBehavior::Error => (),
2937 }
2938 let drop_ids = replace
2939 .map(|id| {
2940 scx.catalog
2941 .item_dependents(id)
2942 .into_iter()
2943 .map(|id| id.unwrap_item_id())
2944 .collect()
2945 })
2946 .unwrap_or_default();
2947 let dependencies = expr
2948 .depends_on()
2949 .into_iter()
2950 .map(|gid| scx.catalog.resolve_item_id(&gid))
2951 .collect();
2952
2953 let full_name = scx.catalog.resolve_full_name(&name);
2955 let partial_name = PartialItemName::from(full_name.clone());
2956 if let (IfExistsBehavior::Error, Ok(item)) =
2959 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2960 {
2961 return Err(PlanError::ItemAlreadyExists {
2962 name: full_name.to_string(),
2963 item_type: item.item_type(),
2964 });
2965 }
2966
2967 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2968 name,
2969 materialized_view: MaterializedView {
2970 create_sql,
2971 expr,
2972 dependencies,
2973 column_names,
2974 cluster_id,
2975 non_null_assertions,
2976 compaction_window,
2977 refresh_schedule,
2978 as_of,
2979 },
2980 replace,
2981 drop_ids,
2982 if_not_exists,
2983 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2984 }))
2985}
2986
2987generate_extracted_config!(
2988 MaterializedViewOption,
2989 (AssertNotNull, Ident, AllowMultiple),
2990 (PartitionBy, Vec<Ident>),
2991 (RetainHistory, OptionalDuration),
2992 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2993);
2994
2995pub fn plan_create_continual_task(
2996 scx: &StatementContext,
2997 mut stmt: CreateContinualTaskStatement<Aug>,
2998) -> Result<Plan, PlanError> {
2999 match &stmt.sugar {
3000 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
3001 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
3002 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
3003 }
3004 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
3005 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
3006 }
3007 };
3008 let cluster_id = match &stmt.in_cluster {
3009 None => scx.catalog.resolve_cluster(None)?.id(),
3010 Some(in_cluster) => in_cluster.id,
3011 };
3012 stmt.in_cluster = Some(ResolvedClusterName {
3013 id: cluster_id,
3014 print_name: None,
3015 });
3016
3017 let create_sql =
3018 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3019
3020 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3021
3022 let mut desc = match stmt.columns {
3027 None => None,
3028 Some(columns) => {
3029 let mut desc_columns = Vec::with_capacity(columns.capacity());
3030 for col in columns.iter() {
3031 desc_columns.push((
3032 normalize::column_name(col.name.clone()),
3033 SqlColumnType {
3034 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3035 nullable: false,
3036 },
3037 ));
3038 }
3039 Some(RelationDesc::from_names_and_types(desc_columns))
3040 }
3041 };
3042 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3043 match input.item_type() {
3044 CatalogItemType::ContinualTask
3047 | CatalogItemType::Table
3048 | CatalogItemType::MaterializedView
3049 | CatalogItemType::Source => {}
3050 CatalogItemType::Sink
3051 | CatalogItemType::View
3052 | CatalogItemType::Index
3053 | CatalogItemType::Type
3054 | CatalogItemType::Func
3055 | CatalogItemType::Secret
3056 | CatalogItemType::Connection => {
3057 sql_bail!(
3058 "CONTINUAL TASK cannot use {} as an input",
3059 input.item_type()
3060 );
3061 }
3062 }
3063
3064 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3065 let ct_name = stmt.name;
3066 let placeholder_id = match &ct_name {
3067 ResolvedItemName::ContinualTask { id, name } => {
3068 let desc = match desc.as_ref().cloned() {
3069 Some(x) => x,
3070 None => {
3071 let input_name = scx.catalog.resolve_full_name(input.name());
3076 input.desc(&input_name)?.into_owned()
3077 }
3078 };
3079 qcx.ctes.insert(
3080 *id,
3081 CteDesc {
3082 name: name.item.clone(),
3083 desc,
3084 },
3085 );
3086 Some(*id)
3087 }
3088 _ => None,
3089 };
3090
3091 let mut exprs = Vec::new();
3092 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3093 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3094 let query::PlannedRootQuery {
3095 expr,
3096 desc: desc_query,
3097 finishing,
3098 scope: _,
3099 } = query::plan_ct_query(&mut qcx, query)?;
3100 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3103 &finishing,
3104 expr.arity()
3105 ));
3106 if expr.contains_parameters()? {
3107 if expr.contains_parameters()? {
3108 return Err(PlanError::ParameterNotAllowed(
3109 "continual tasks".to_string(),
3110 ));
3111 }
3112 }
3113 let expr = match desc.as_mut() {
3114 None => {
3115 desc = Some(desc_query);
3116 expr
3117 }
3118 Some(desc) => {
3119 if desc_query.arity() > desc.arity() {
3122 sql_bail!(
3123 "statement {}: INSERT has more expressions than target columns",
3124 idx
3125 );
3126 }
3127 if desc_query.arity() < desc.arity() {
3128 sql_bail!(
3129 "statement {}: INSERT has more target columns than expressions",
3130 idx
3131 );
3132 }
3133 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3136 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3137 let expr = expr.map_err(|e| {
3138 sql_err!(
3139 "statement {}: column {} is of type {} but expression is of type {}",
3140 idx,
3141 desc.get_name(e.column).quoted(),
3142 qcx.humanize_scalar_type(&e.target_type, false),
3143 qcx.humanize_scalar_type(&e.source_type, false),
3144 )
3145 })?;
3146
3147 let zip_types = || desc.iter_types().zip_eq(desc_query.iter_types());
3150 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3151 if updated {
3152 let new_types = zip_types().map(|(ct, q)| {
3153 let mut ct = ct.clone();
3154 if q.nullable {
3155 ct.nullable = true;
3156 }
3157 ct
3158 });
3159 *desc = RelationDesc::from_names_and_types(
3160 desc.iter_names().cloned().zip_eq(new_types),
3161 );
3162 }
3163
3164 expr
3165 }
3166 };
3167 match stmt {
3168 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3169 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3170 }
3171 }
3172 let expr = exprs
3175 .into_iter()
3176 .reduce(|acc, expr| acc.union(expr))
3177 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3178 let dependencies = expr
3179 .depends_on()
3180 .into_iter()
3181 .map(|gid| scx.catalog.resolve_item_id(&gid))
3182 .collect();
3183
3184 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3185 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3186 if let Some(dup) = column_names.iter().duplicates().next() {
3187 sql_bail!("column {} specified more than once", dup.quoted());
3188 }
3189
3190 let name = match &ct_name {
3192 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3193 ResolvedItemName::ContinualTask { name, .. } => {
3194 let name = scx.allocate_qualified_name(name.clone())?;
3195 let full_name = scx.catalog.resolve_full_name(&name);
3196 let partial_name = PartialItemName::from(full_name.clone());
3197 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3200 return Err(PlanError::ItemAlreadyExists {
3201 name: full_name.to_string(),
3202 item_type: item.item_type(),
3203 });
3204 }
3205 name
3206 }
3207 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3208 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3209 };
3210
3211 let as_of = stmt.as_of.map(Timestamp::from);
3212 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3213 name,
3214 placeholder_id,
3215 desc,
3216 input_id: input.global_id(),
3217 with_snapshot: snapshot.unwrap_or(true),
3218 continual_task: MaterializedView {
3219 create_sql,
3220 expr,
3221 dependencies,
3222 column_names,
3223 cluster_id,
3224 non_null_assertions: Vec::new(),
3225 compaction_window: None,
3226 refresh_schedule: None,
3227 as_of,
3228 },
3229 }))
3230}
3231
3232fn continual_task_query<'a>(
3233 ct_name: &ResolvedItemName,
3234 stmt: &'a ast::ContinualTaskStmt<Aug>,
3235) -> Option<ast::Query<Aug>> {
3236 match stmt {
3237 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3238 table_name: _,
3239 columns,
3240 source,
3241 returning,
3242 }) => {
3243 if !columns.is_empty() || !returning.is_empty() {
3244 return None;
3245 }
3246 match source {
3247 ast::InsertSource::Query(query) => Some(query.clone()),
3248 ast::InsertSource::DefaultValues => None,
3249 }
3250 }
3251 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3252 table_name: _,
3253 alias,
3254 using,
3255 selection,
3256 }) => {
3257 if !using.is_empty() {
3258 return None;
3259 }
3260 let from = ast::TableWithJoins {
3262 relation: ast::TableFactor::Table {
3263 name: ct_name.clone(),
3264 alias: alias.clone(),
3265 },
3266 joins: Vec::new(),
3267 };
3268 let select = ast::Select {
3269 from: vec![from],
3270 selection: selection.clone(),
3271 distinct: None,
3272 projection: vec![ast::SelectItem::Wildcard],
3273 group_by: Vec::new(),
3274 having: None,
3275 qualify: None,
3276 options: Vec::new(),
3277 };
3278 let query = ast::Query {
3279 ctes: ast::CteBlock::Simple(Vec::new()),
3280 body: ast::SetExpr::Select(Box::new(select)),
3281 order_by: Vec::new(),
3282 limit: None,
3283 offset: None,
3284 };
3285 Some(query)
3287 }
3288 }
3289}
3290
3291generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3292
3293pub fn describe_create_sink(
3294 _: &StatementContext,
3295 _: CreateSinkStatement<Aug>,
3296) -> Result<StatementDesc, PlanError> {
3297 Ok(StatementDesc::new(None))
3298}
3299
3300generate_extracted_config!(
3301 CreateSinkOption,
3302 (Snapshot, bool),
3303 (PartitionStrategy, String),
3304 (Version, u64)
3305);
3306
3307pub fn plan_create_sink(
3308 scx: &StatementContext,
3309 stmt: CreateSinkStatement<Aug>,
3310) -> Result<Plan, PlanError> {
3311 let Some(name) = stmt.name.clone() else {
3313 return Err(PlanError::MissingName(CatalogItemType::Sink));
3314 };
3315 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3316 let full_name = scx.catalog.resolve_full_name(&name);
3317 let partial_name = PartialItemName::from(full_name.clone());
3318 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3319 return Err(PlanError::ItemAlreadyExists {
3320 name: full_name.to_string(),
3321 item_type: item.item_type(),
3322 });
3323 }
3324
3325 plan_sink(scx, stmt)
3326}
3327
3328fn plan_sink(
3333 scx: &StatementContext,
3334 mut stmt: CreateSinkStatement<Aug>,
3335) -> Result<Plan, PlanError> {
3336 let CreateSinkStatement {
3337 name,
3338 in_cluster: _,
3339 from,
3340 connection,
3341 format,
3342 envelope,
3343 if_not_exists,
3344 with_options,
3345 } = stmt.clone();
3346
3347 let Some(name) = name else {
3348 return Err(PlanError::MissingName(CatalogItemType::Sink));
3349 };
3350 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3351
3352 let envelope = match envelope {
3353 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3354 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3355 None => sql_bail!("ENVELOPE clause is required"),
3356 };
3357
3358 let from_name = &from;
3359 let from = scx.get_item_by_resolved_name(&from)?;
3360 if from.id().is_system() {
3361 bail_unsupported!("creating a sink directly on a catalog object");
3362 }
3363 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3364 let key_indices = match &connection {
3365 CreateSinkConnection::Kafka { key, .. } => {
3366 if let Some(key) = key.clone() {
3367 let key_columns = key
3368 .key_columns
3369 .into_iter()
3370 .map(normalize::column_name)
3371 .collect::<Vec<_>>();
3372 let mut uniq = BTreeSet::new();
3373 for col in key_columns.iter() {
3374 if !uniq.insert(col) {
3375 sql_bail!("duplicate column referenced in KEY: {}", col);
3376 }
3377 }
3378 let indices = key_columns
3379 .iter()
3380 .map(|col| -> anyhow::Result<usize> {
3381 let name_idx =
3382 desc.get_by_name(col)
3383 .map(|(idx, _type)| idx)
3384 .ok_or_else(|| {
3385 sql_err!("column referenced in KEY does not exist: {}", col)
3386 })?;
3387 if desc.get_unambiguous_name(name_idx).is_none() {
3388 sql_err!("column referenced in KEY is ambiguous: {}", col);
3389 }
3390 Ok(name_idx)
3391 })
3392 .collect::<Result<Vec<_>, _>>()?;
3393 let is_valid_key =
3394 desc.typ().keys.iter().any(|key_columns| {
3395 key_columns.iter().all(|column| indices.contains(column))
3396 });
3397
3398 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3399 if key.not_enforced {
3400 scx.catalog
3401 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3402 key: key_columns.clone(),
3403 name: name.item.clone(),
3404 })
3405 } else {
3406 return Err(PlanError::UpsertSinkWithInvalidKey {
3407 name: from_name.full_name_str(),
3408 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3409 valid_keys: desc
3410 .typ()
3411 .keys
3412 .iter()
3413 .map(|key| {
3414 key.iter()
3415 .map(|col| desc.get_name(*col).as_str().into())
3416 .collect()
3417 })
3418 .collect(),
3419 });
3420 }
3421 }
3422 Some(indices)
3423 } else {
3424 None
3425 }
3426 }
3427 };
3428
3429 let headers_index = match &connection {
3430 CreateSinkConnection::Kafka {
3431 headers: Some(headers),
3432 ..
3433 } => {
3434 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3435
3436 match envelope {
3437 SinkEnvelope::Upsert => (),
3438 SinkEnvelope::Debezium => {
3439 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3440 }
3441 };
3442
3443 let headers = normalize::column_name(headers.clone());
3444 let (idx, ty) = desc
3445 .get_by_name(&headers)
3446 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3447
3448 if desc.get_unambiguous_name(idx).is_none() {
3449 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3450 }
3451
3452 match &ty.scalar_type {
3453 SqlScalarType::Map { value_type, .. }
3454 if matches!(&**value_type, SqlScalarType::String | SqlScalarType::Bytes) => {}
3455 _ => sql_bail!(
3456 "HEADERS column must have type map[text => text] or map[text => bytea]"
3457 ),
3458 }
3459
3460 Some(idx)
3461 }
3462 _ => None,
3463 };
3464
3465 let relation_key_indices = desc.typ().keys.get(0).cloned();
3467
3468 let key_desc_and_indices = key_indices.map(|key_indices| {
3469 let cols = desc
3470 .iter()
3471 .map(|(name, ty)| (name.clone(), ty.clone()))
3472 .collect::<Vec<_>>();
3473 let (names, types): (Vec<_>, Vec<_>) =
3474 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3475 let typ = SqlRelationType::new(types);
3476 (RelationDesc::new(typ, names), key_indices)
3477 });
3478
3479 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3480 return Err(PlanError::UpsertSinkWithoutKey);
3481 }
3482
3483 let connection_builder = match connection {
3484 CreateSinkConnection::Kafka {
3485 connection,
3486 options,
3487 ..
3488 } => kafka_sink_builder(
3489 scx,
3490 connection,
3491 options,
3492 format,
3493 relation_key_indices,
3494 key_desc_and_indices,
3495 headers_index,
3496 desc.into_owned(),
3497 envelope,
3498 from.id(),
3499 )?,
3500 };
3501
3502 let CreateSinkOptionExtracted {
3503 snapshot,
3504 version,
3505 partition_strategy: _,
3506 seen: _,
3507 } = with_options.try_into()?;
3508
3509 let with_snapshot = snapshot.unwrap_or(true);
3511 let version = version.unwrap_or(0);
3513
3514 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3518 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3519
3520 Ok(Plan::CreateSink(CreateSinkPlan {
3521 name,
3522 sink: Sink {
3523 create_sql,
3524 from: from.global_id(),
3525 connection: connection_builder,
3526 envelope,
3527 version,
3528 },
3529 with_snapshot,
3530 if_not_exists,
3531 in_cluster: in_cluster.id(),
3532 }))
3533}
3534
3535fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3536 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3537
3538 let existing_keys = desc
3539 .typ()
3540 .keys
3541 .iter()
3542 .map(|key_columns| {
3543 key_columns
3544 .iter()
3545 .map(|col| desc.get_name(*col).as_str())
3546 .join(", ")
3547 })
3548 .join(", ");
3549
3550 sql_err!(
3551 "Key constraint ({}) conflicts with existing key ({})",
3552 user_keys,
3553 existing_keys
3554 )
3555}
3556
3557#[derive(Debug, Default, PartialEq, Clone)]
3560pub struct CsrConfigOptionExtracted {
3561 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3562 pub(crate) avro_key_fullname: Option<String>,
3563 pub(crate) avro_value_fullname: Option<String>,
3564 pub(crate) null_defaults: bool,
3565 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3566 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3567 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3568 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3569}
3570
3571impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3572 type Error = crate::plan::PlanError;
3573 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3574 let mut extracted = CsrConfigOptionExtracted::default();
3575 let mut common_doc_comments = BTreeMap::new();
3576 for option in v {
3577 if !extracted.seen.insert(option.name.clone()) {
3578 return Err(PlanError::Unstructured({
3579 format!("{} specified more than once", option.name)
3580 }));
3581 }
3582 let option_name = option.name.clone();
3583 let option_name_str = option_name.to_ast_string_simple();
3584 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3585 option_name: option_name.to_ast_string_simple(),
3586 err: e.into(),
3587 };
3588 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3589 val.map(|s| match s {
3590 WithOptionValue::Value(Value::String(s)) => {
3591 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3592 }
3593 _ => Err("must be a string".to_string()),
3594 })
3595 .transpose()
3596 .map_err(PlanError::Unstructured)
3597 .map_err(better_error)
3598 };
3599 match option.name {
3600 CsrConfigOptionName::AvroKeyFullname => {
3601 extracted.avro_key_fullname =
3602 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3603 }
3604 CsrConfigOptionName::AvroValueFullname => {
3605 extracted.avro_value_fullname =
3606 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3607 }
3608 CsrConfigOptionName::NullDefaults => {
3609 extracted.null_defaults =
3610 <bool>::try_from_value(option.value).map_err(better_error)?;
3611 }
3612 CsrConfigOptionName::AvroDocOn(doc_on) => {
3613 let value = String::try_from_value(option.value.ok_or_else(|| {
3614 PlanError::InvalidOptionValue {
3615 option_name: option_name_str,
3616 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3617 }
3618 })?)
3619 .map_err(better_error)?;
3620 let key = match doc_on.identifier {
3621 DocOnIdentifier::Column(ast::ColumnName {
3622 relation: ResolvedItemName::Item { id, .. },
3623 column: ResolvedColumnReference::Column { name, index: _ },
3624 }) => DocTarget::Field {
3625 object_id: id,
3626 column_name: name,
3627 },
3628 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3629 DocTarget::Type(id)
3630 }
3631 _ => unreachable!(),
3632 };
3633
3634 match doc_on.for_schema {
3635 DocOnSchema::KeyOnly => {
3636 extracted.key_doc_options.insert(key, value);
3637 }
3638 DocOnSchema::ValueOnly => {
3639 extracted.value_doc_options.insert(key, value);
3640 }
3641 DocOnSchema::All => {
3642 common_doc_comments.insert(key, value);
3643 }
3644 }
3645 }
3646 CsrConfigOptionName::KeyCompatibilityLevel => {
3647 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3648 }
3649 CsrConfigOptionName::ValueCompatibilityLevel => {
3650 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3651 }
3652 }
3653 }
3654
3655 for (key, value) in common_doc_comments {
3656 if !extracted.key_doc_options.contains_key(&key) {
3657 extracted.key_doc_options.insert(key.clone(), value.clone());
3658 }
3659 if !extracted.value_doc_options.contains_key(&key) {
3660 extracted.value_doc_options.insert(key, value);
3661 }
3662 }
3663 Ok(extracted)
3664 }
3665}
3666
3667fn kafka_sink_builder(
3668 scx: &StatementContext,
3669 connection: ResolvedItemName,
3670 options: Vec<KafkaSinkConfigOption<Aug>>,
3671 format: Option<FormatSpecifier<Aug>>,
3672 relation_key_indices: Option<Vec<usize>>,
3673 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3674 headers_index: Option<usize>,
3675 value_desc: RelationDesc,
3676 envelope: SinkEnvelope,
3677 sink_from: CatalogItemId,
3678) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3679 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3681 let connection_id = connection_item.id();
3682 match connection_item.connection()? {
3683 Connection::Kafka(_) => (),
3684 _ => sql_bail!(
3685 "{} is not a kafka connection",
3686 scx.catalog.resolve_full_name(connection_item.name())
3687 ),
3688 };
3689
3690 let KafkaSinkConfigOptionExtracted {
3691 topic,
3692 compression_type,
3693 partition_by,
3694 progress_group_id_prefix,
3695 transactional_id_prefix,
3696 legacy_ids,
3697 topic_config,
3698 topic_metadata_refresh_interval,
3699 topic_partition_count,
3700 topic_replication_factor,
3701 seen: _,
3702 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3703
3704 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3705 (Some(_), Some(true)) => {
3706 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3707 }
3708 (None, Some(true)) => KafkaIdStyle::Legacy,
3709 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3710 };
3711
3712 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3713 (Some(_), Some(true)) => {
3714 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3715 }
3716 (None, Some(true)) => KafkaIdStyle::Legacy,
3717 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3718 };
3719
3720 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3721
3722 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3723 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3726 }
3727
3728 let assert_positive = |val: Option<i32>, name: &str| {
3729 if let Some(val) = val {
3730 if val <= 0 {
3731 sql_bail!("{} must be a positive integer", name);
3732 }
3733 }
3734 val.map(NonNeg::try_from)
3735 .transpose()
3736 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3737 };
3738 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3739 let topic_replication_factor =
3740 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3741
3742 let gen_avro_schema_options = |conn| {
3745 let CsrConnectionAvro {
3746 connection:
3747 CsrConnection {
3748 connection,
3749 options,
3750 },
3751 seed,
3752 key_strategy,
3753 value_strategy,
3754 } = conn;
3755 if seed.is_some() {
3756 sql_bail!("SEED option does not make sense with sinks");
3757 }
3758 if key_strategy.is_some() {
3759 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3760 }
3761 if value_strategy.is_some() {
3762 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3763 }
3764
3765 let item = scx.get_item_by_resolved_name(&connection)?;
3766 let csr_connection = match item.connection()? {
3767 Connection::Csr(_) => item.id(),
3768 _ => {
3769 sql_bail!(
3770 "{} is not a schema registry connection",
3771 scx.catalog
3772 .resolve_full_name(item.name())
3773 .to_string()
3774 .quoted()
3775 )
3776 }
3777 };
3778 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3779
3780 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3781 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3782 }
3783
3784 if key_desc_and_indices.is_some()
3785 && (extracted_options.avro_key_fullname.is_some()
3786 ^ extracted_options.avro_value_fullname.is_some())
3787 {
3788 sql_bail!(
3789 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3790 );
3791 }
3792
3793 Ok((csr_connection, extracted_options))
3794 };
3795
3796 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3797 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3798 Format::Bytes if desc.arity() == 1 => {
3799 let col_type = &desc.typ().column_types[0].scalar_type;
3800 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3801 bail_unsupported!(format!(
3802 "BYTES format with non-encodable type: {:?}",
3803 col_type
3804 ));
3805 }
3806
3807 Ok(KafkaSinkFormatType::Bytes)
3808 }
3809 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3810 Format::Bytes | Format::Text => {
3811 bail_unsupported!("BYTES or TEXT format with multiple columns")
3812 }
3813 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3814 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3815 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3816 let schema = if is_key {
3817 AvroSchemaGenerator::new(
3818 desc.clone(),
3819 false,
3820 options.key_doc_options,
3821 options.avro_key_fullname.as_deref().unwrap_or("row"),
3822 options.null_defaults,
3823 Some(sink_from),
3824 false,
3825 )?
3826 .schema()
3827 .to_string()
3828 } else {
3829 AvroSchemaGenerator::new(
3830 desc.clone(),
3831 matches!(envelope, SinkEnvelope::Debezium),
3832 options.value_doc_options,
3833 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3834 options.null_defaults,
3835 Some(sink_from),
3836 true,
3837 )?
3838 .schema()
3839 .to_string()
3840 };
3841 Ok(KafkaSinkFormatType::Avro {
3842 schema,
3843 compatibility_level: if is_key {
3844 options.key_compatibility_level
3845 } else {
3846 options.value_compatibility_level
3847 },
3848 csr_connection,
3849 })
3850 }
3851 format => bail_unsupported!(format!("sink format {:?}", format)),
3852 };
3853
3854 let partition_by = match &partition_by {
3855 Some(partition_by) => {
3856 let mut scope = Scope::from_source(None, value_desc.iter_names());
3857
3858 match envelope {
3859 SinkEnvelope::Upsert => (),
3860 SinkEnvelope::Debezium => {
3861 let key_indices: HashSet<_> = key_desc_and_indices
3862 .as_ref()
3863 .map(|(_desc, indices)| indices.as_slice())
3864 .unwrap_or_default()
3865 .into_iter()
3866 .collect();
3867 for (i, item) in scope.items.iter_mut().enumerate() {
3868 if !key_indices.contains(&i) {
3869 item.error_if_referenced = Some(|_table, column| {
3870 PlanError::InvalidPartitionByEnvelopeDebezium {
3871 column_name: column.to_string(),
3872 }
3873 });
3874 }
3875 }
3876 }
3877 };
3878
3879 let ecx = &ExprContext {
3880 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3881 name: "PARTITION BY",
3882 scope: &scope,
3883 relation_type: value_desc.typ(),
3884 allow_aggregates: false,
3885 allow_subqueries: false,
3886 allow_parameters: false,
3887 allow_windows: false,
3888 };
3889 let expr = plan_expr(ecx, partition_by)?.cast_to(
3890 ecx,
3891 CastContext::Assignment,
3892 &SqlScalarType::UInt64,
3893 )?;
3894 let expr = expr.lower_uncorrelated()?;
3895
3896 Some(expr)
3897 }
3898 _ => None,
3899 };
3900
3901 let format = match format {
3903 Some(FormatSpecifier::KeyValue { key, value }) => {
3904 let key_format = match key_desc_and_indices.as_ref() {
3905 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3906 None => None,
3907 };
3908 KafkaSinkFormat {
3909 value_format: map_format(value, &value_desc, false)?,
3910 key_format,
3911 }
3912 }
3913 Some(FormatSpecifier::Bare(format)) => {
3914 let key_format = match key_desc_and_indices.as_ref() {
3915 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3916 None => None,
3917 };
3918 KafkaSinkFormat {
3919 value_format: map_format(format, &value_desc, false)?,
3920 key_format,
3921 }
3922 }
3923 None => bail_unsupported!("sink without format"),
3924 };
3925
3926 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3927 connection_id,
3928 connection: connection_id,
3929 format,
3930 topic: topic_name,
3931 relation_key_indices,
3932 key_desc_and_indices,
3933 headers_index,
3934 value_desc,
3935 partition_by,
3936 compression_type,
3937 progress_group_id,
3938 transactional_id,
3939 topic_options: KafkaTopicOptions {
3940 partition_count: topic_partition_count,
3941 replication_factor: topic_replication_factor,
3942 topic_config: topic_config.unwrap_or_default(),
3943 },
3944 topic_metadata_refresh_interval,
3945 }))
3946}
3947
3948pub fn describe_create_index(
3949 _: &StatementContext,
3950 _: CreateIndexStatement<Aug>,
3951) -> Result<StatementDesc, PlanError> {
3952 Ok(StatementDesc::new(None))
3953}
3954
3955pub fn plan_create_index(
3956 scx: &StatementContext,
3957 mut stmt: CreateIndexStatement<Aug>,
3958) -> Result<Plan, PlanError> {
3959 let CreateIndexStatement {
3960 name,
3961 on_name,
3962 in_cluster,
3963 key_parts,
3964 with_options,
3965 if_not_exists,
3966 } = &mut stmt;
3967 let on = scx.get_item_by_resolved_name(on_name)?;
3968 let on_item_type = on.item_type();
3969
3970 if !matches!(
3971 on_item_type,
3972 CatalogItemType::View
3973 | CatalogItemType::MaterializedView
3974 | CatalogItemType::Source
3975 | CatalogItemType::Table
3976 ) {
3977 sql_bail!(
3978 "index cannot be created on {} because it is a {}",
3979 on_name.full_name_str(),
3980 on.item_type()
3981 )
3982 }
3983
3984 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3985
3986 let filled_key_parts = match key_parts {
3987 Some(kp) => kp.to_vec(),
3988 None => {
3989 on.desc(&scx.catalog.resolve_full_name(on.name()))?
3991 .typ()
3992 .default_key()
3993 .iter()
3994 .map(|i| match on_desc.get_unambiguous_name(*i) {
3995 Some(n) => Expr::Identifier(vec![n.clone().into()]),
3996 _ => Expr::Value(Value::Number((i + 1).to_string())),
3997 })
3998 .collect()
3999 }
4000 };
4001 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
4002
4003 let index_name = if let Some(name) = name {
4004 QualifiedItemName {
4005 qualifiers: on.name().qualifiers.clone(),
4006 item: normalize::ident(name.clone()),
4007 }
4008 } else {
4009 let mut idx_name = QualifiedItemName {
4010 qualifiers: on.name().qualifiers.clone(),
4011 item: on.name().item.clone(),
4012 };
4013 if key_parts.is_none() {
4014 idx_name.item += "_primary_idx";
4016 } else {
4017 let index_name_col_suffix = keys
4020 .iter()
4021 .map(|k| match k {
4022 mz_expr::MirScalarExpr::Column(i, name) => {
4023 match (on_desc.get_unambiguous_name(*i), &name.0) {
4024 (Some(col_name), _) => col_name.to_string(),
4025 (None, Some(name)) => name.to_string(),
4026 (None, None) => format!("{}", i + 1),
4027 }
4028 }
4029 _ => "expr".to_string(),
4030 })
4031 .join("_");
4032 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4033 .expect("write on strings cannot fail");
4034 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4035 }
4036
4037 if !*if_not_exists {
4038 scx.catalog.find_available_name(idx_name)
4039 } else {
4040 idx_name
4041 }
4042 };
4043
4044 let full_name = scx.catalog.resolve_full_name(&index_name);
4046 let partial_name = PartialItemName::from(full_name.clone());
4047 if let (Ok(item), false, false) = (
4055 scx.catalog.resolve_item_or_type(&partial_name),
4056 *if_not_exists,
4057 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4058 ) {
4059 return Err(PlanError::ItemAlreadyExists {
4060 name: full_name.to_string(),
4061 item_type: item.item_type(),
4062 });
4063 }
4064
4065 let options = plan_index_options(scx, with_options.clone())?;
4066 let cluster_id = match in_cluster {
4067 None => scx.resolve_cluster(None)?.id(),
4068 Some(in_cluster) => in_cluster.id,
4069 };
4070
4071 *in_cluster = Some(ResolvedClusterName {
4072 id: cluster_id,
4073 print_name: None,
4074 });
4075
4076 *name = Some(Ident::new(index_name.item.clone())?);
4078 *key_parts = Some(filled_key_parts);
4079 let if_not_exists = *if_not_exists;
4080
4081 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4082 let compaction_window = options.iter().find_map(|o| {
4083 #[allow(irrefutable_let_patterns)]
4084 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4085 Some(lcw.clone())
4086 } else {
4087 None
4088 }
4089 });
4090
4091 Ok(Plan::CreateIndex(CreateIndexPlan {
4092 name: index_name,
4093 index: Index {
4094 create_sql,
4095 on: on.global_id(),
4096 keys,
4097 cluster_id,
4098 compaction_window,
4099 },
4100 if_not_exists,
4101 }))
4102}
4103
4104pub fn describe_create_type(
4105 _: &StatementContext,
4106 _: CreateTypeStatement<Aug>,
4107) -> Result<StatementDesc, PlanError> {
4108 Ok(StatementDesc::new(None))
4109}
4110
4111pub fn plan_create_type(
4112 scx: &StatementContext,
4113 stmt: CreateTypeStatement<Aug>,
4114) -> Result<Plan, PlanError> {
4115 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4116 let CreateTypeStatement { name, as_type, .. } = stmt;
4117
4118 fn validate_data_type(
4119 scx: &StatementContext,
4120 data_type: ResolvedDataType,
4121 as_type: &str,
4122 key: &str,
4123 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4124 let (id, modifiers) = match data_type {
4125 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4126 _ => sql_bail!(
4127 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4128 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4129 as_type,
4130 key,
4131 data_type.human_readable_name(),
4132 ),
4133 };
4134
4135 let item = scx.catalog.get_item(&id);
4136 match item.type_details() {
4137 None => sql_bail!(
4138 "{} must be of class type, but received {} which is of class {}",
4139 key,
4140 scx.catalog.resolve_full_name(item.name()),
4141 item.item_type()
4142 ),
4143 Some(CatalogTypeDetails {
4144 typ: CatalogType::Char,
4145 ..
4146 }) => {
4147 bail_unsupported!("embedding char type in a list or map")
4148 }
4149 _ => {
4150 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4152
4153 Ok((id, modifiers))
4154 }
4155 }
4156 }
4157
4158 let inner = match as_type {
4159 CreateTypeAs::List { options } => {
4160 let CreateTypeListOptionExtracted {
4161 element_type,
4162 seen: _,
4163 } = CreateTypeListOptionExtracted::try_from(options)?;
4164 let element_type =
4165 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4166 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4167 CatalogType::List {
4168 element_reference: id,
4169 element_modifiers: modifiers,
4170 }
4171 }
4172 CreateTypeAs::Map { options } => {
4173 let CreateTypeMapOptionExtracted {
4174 key_type,
4175 value_type,
4176 seen: _,
4177 } = CreateTypeMapOptionExtracted::try_from(options)?;
4178 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4179 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4180 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4181 let (value_id, value_modifiers) =
4182 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4183 CatalogType::Map {
4184 key_reference: key_id,
4185 key_modifiers,
4186 value_reference: value_id,
4187 value_modifiers,
4188 }
4189 }
4190 CreateTypeAs::Record { column_defs } => {
4191 let mut fields = vec![];
4192 for column_def in column_defs {
4193 let data_type = column_def.data_type;
4194 let key = ident(column_def.name.clone());
4195 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4196 fields.push(CatalogRecordField {
4197 name: ColumnName::from(key.clone()),
4198 type_reference: id,
4199 type_modifiers: modifiers,
4200 });
4201 }
4202 CatalogType::Record { fields }
4203 }
4204 };
4205
4206 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4207
4208 let full_name = scx.catalog.resolve_full_name(&name);
4210 let partial_name = PartialItemName::from(full_name.clone());
4211 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4214 if item.item_type().conflicts_with_type() {
4215 return Err(PlanError::ItemAlreadyExists {
4216 name: full_name.to_string(),
4217 item_type: item.item_type(),
4218 });
4219 }
4220 }
4221
4222 Ok(Plan::CreateType(CreateTypePlan {
4223 name,
4224 typ: Type { create_sql, inner },
4225 }))
4226}
4227
4228generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4229
4230generate_extracted_config!(
4231 CreateTypeMapOption,
4232 (KeyType, ResolvedDataType),
4233 (ValueType, ResolvedDataType)
4234);
4235
4236#[derive(Debug)]
4237pub enum PlannedAlterRoleOption {
4238 Attributes(PlannedRoleAttributes),
4239 Variable(PlannedRoleVariable),
4240}
4241
4242#[derive(Debug, Clone)]
4243pub struct PlannedRoleAttributes {
4244 pub inherit: Option<bool>,
4245 pub password: Option<Password>,
4246 pub nopassword: Option<bool>,
4250 pub superuser: Option<bool>,
4251 pub login: Option<bool>,
4252}
4253
4254fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4255 let mut planned_attributes = PlannedRoleAttributes {
4256 inherit: None,
4257 password: None,
4258 superuser: None,
4259 login: None,
4260 nopassword: None,
4261 };
4262
4263 for option in options {
4264 match option {
4265 RoleAttribute::Inherit | RoleAttribute::NoInherit
4266 if planned_attributes.inherit.is_some() =>
4267 {
4268 sql_bail!("conflicting or redundant options");
4269 }
4270 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4271 bail_never_supported!(
4272 "CREATECLUSTER attribute",
4273 "sql/create-role/#details",
4274 "Use system privileges instead."
4275 );
4276 }
4277 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4278 bail_never_supported!(
4279 "CREATEDB attribute",
4280 "sql/create-role/#details",
4281 "Use system privileges instead."
4282 );
4283 }
4284 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4285 bail_never_supported!(
4286 "CREATEROLE attribute",
4287 "sql/create-role/#details",
4288 "Use system privileges instead."
4289 );
4290 }
4291 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4292 sql_bail!("conflicting or redundant options");
4293 }
4294
4295 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4296 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4297 RoleAttribute::Password(password) => {
4298 if let Some(password) = password {
4299 planned_attributes.password = Some(password.into());
4300 } else {
4301 planned_attributes.nopassword = Some(true);
4302 }
4303 }
4304 RoleAttribute::SuperUser => {
4305 if planned_attributes.superuser == Some(false) {
4306 sql_bail!("conflicting or redundant options");
4307 }
4308 planned_attributes.superuser = Some(true);
4309 }
4310 RoleAttribute::NoSuperUser => {
4311 if planned_attributes.superuser == Some(true) {
4312 sql_bail!("conflicting or redundant options");
4313 }
4314 planned_attributes.superuser = Some(false);
4315 }
4316 RoleAttribute::Login => {
4317 if planned_attributes.login == Some(false) {
4318 sql_bail!("conflicting or redundant options");
4319 }
4320 planned_attributes.login = Some(true);
4321 }
4322 RoleAttribute::NoLogin => {
4323 if planned_attributes.login == Some(true) {
4324 sql_bail!("conflicting or redundant options");
4325 }
4326 planned_attributes.login = Some(false);
4327 }
4328 }
4329 }
4330 if planned_attributes.inherit == Some(false) {
4331 bail_unsupported!("non inherit roles");
4332 }
4333
4334 Ok(planned_attributes)
4335}
4336
4337#[derive(Debug)]
4338pub enum PlannedRoleVariable {
4339 Set { name: String, value: VariableValue },
4340 Reset { name: String },
4341}
4342
4343impl PlannedRoleVariable {
4344 pub fn name(&self) -> &str {
4345 match self {
4346 PlannedRoleVariable::Set { name, .. } => name,
4347 PlannedRoleVariable::Reset { name } => name,
4348 }
4349 }
4350}
4351
4352fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4353 let plan = match variable {
4354 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4355 name: name.to_string(),
4356 value: scl::plan_set_variable_to(value)?,
4357 },
4358 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4359 name: name.to_string(),
4360 },
4361 };
4362 Ok(plan)
4363}
4364
4365pub fn describe_create_role(
4366 _: &StatementContext,
4367 _: CreateRoleStatement,
4368) -> Result<StatementDesc, PlanError> {
4369 Ok(StatementDesc::new(None))
4370}
4371
4372pub fn plan_create_role(
4373 _: &StatementContext,
4374 CreateRoleStatement { name, options }: CreateRoleStatement,
4375) -> Result<Plan, PlanError> {
4376 let attributes = plan_role_attributes(options)?;
4377 Ok(Plan::CreateRole(CreateRolePlan {
4378 name: normalize::ident(name),
4379 attributes: attributes.into(),
4380 }))
4381}
4382
4383pub fn plan_create_network_policy(
4384 ctx: &StatementContext,
4385 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4386) -> Result<Plan, PlanError> {
4387 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4388 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4389
4390 let Some(rule_defs) = policy_options.rules else {
4391 sql_bail!("RULES must be specified when creating network policies.");
4392 };
4393
4394 let mut rules = vec![];
4395 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4396 let NetworkPolicyRuleOptionExtracted {
4397 seen: _,
4398 direction,
4399 action,
4400 address,
4401 } = options.try_into()?;
4402 let (direction, action, address) = match (direction, action, address) {
4403 (Some(direction), Some(action), Some(address)) => (
4404 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4405 NetworkPolicyRuleAction::try_from(action.as_str())?,
4406 PolicyAddress::try_from(address.as_str())?,
4407 ),
4408 (_, _, _) => {
4409 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4410 }
4411 };
4412 rules.push(NetworkPolicyRule {
4413 name: normalize::ident(name),
4414 direction,
4415 action,
4416 address,
4417 });
4418 }
4419
4420 if rules.len()
4421 > ctx
4422 .catalog
4423 .system_vars()
4424 .max_rules_per_network_policy()
4425 .try_into()?
4426 {
4427 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4428 }
4429
4430 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4431 name: normalize::ident(name),
4432 rules,
4433 }))
4434}
4435
4436pub fn plan_alter_network_policy(
4437 ctx: &StatementContext,
4438 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4439) -> Result<Plan, PlanError> {
4440 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4441
4442 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4443 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4444
4445 let Some(rule_defs) = policy_options.rules else {
4446 sql_bail!("RULES must be specified when creating network policies.");
4447 };
4448
4449 let mut rules = vec![];
4450 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4451 let NetworkPolicyRuleOptionExtracted {
4452 seen: _,
4453 direction,
4454 action,
4455 address,
4456 } = options.try_into()?;
4457
4458 let (direction, action, address) = match (direction, action, address) {
4459 (Some(direction), Some(action), Some(address)) => (
4460 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4461 NetworkPolicyRuleAction::try_from(action.as_str())?,
4462 PolicyAddress::try_from(address.as_str())?,
4463 ),
4464 (_, _, _) => {
4465 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4466 }
4467 };
4468 rules.push(NetworkPolicyRule {
4469 name: normalize::ident(name),
4470 direction,
4471 action,
4472 address,
4473 });
4474 }
4475 if rules.len()
4476 > ctx
4477 .catalog
4478 .system_vars()
4479 .max_rules_per_network_policy()
4480 .try_into()?
4481 {
4482 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4483 }
4484
4485 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4486 id: policy.id(),
4487 name: normalize::ident(name),
4488 rules,
4489 }))
4490}
4491
4492pub fn describe_create_cluster(
4493 _: &StatementContext,
4494 _: CreateClusterStatement<Aug>,
4495) -> Result<StatementDesc, PlanError> {
4496 Ok(StatementDesc::new(None))
4497}
4498
4499generate_extracted_config!(
4505 ClusterOption,
4506 (AvailabilityZones, Vec<String>),
4507 (Disk, bool),
4508 (IntrospectionDebugging, bool),
4509 (IntrospectionInterval, OptionalDuration),
4510 (Managed, bool),
4511 (Replicas, Vec<ReplicaDefinition<Aug>>),
4512 (ReplicationFactor, u32),
4513 (Size, String),
4514 (Schedule, ClusterScheduleOptionValue),
4515 (WorkloadClass, OptionalString)
4516);
4517
4518generate_extracted_config!(
4519 NetworkPolicyOption,
4520 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4521);
4522
4523generate_extracted_config!(
4524 NetworkPolicyRuleOption,
4525 (Direction, String),
4526 (Action, String),
4527 (Address, String)
4528);
4529
4530generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4531
4532generate_extracted_config!(
4533 ClusterAlterUntilReadyOption,
4534 (Timeout, Duration),
4535 (OnTimeout, String)
4536);
4537
4538generate_extracted_config!(
4539 ClusterFeature,
4540 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4541 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4542 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4543 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4544 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4545 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4546 (
4547 EnableProjectionPushdownAfterRelationCse,
4548 Option<bool>,
4549 Default(None)
4550 )
4551);
4552
4553pub fn plan_create_cluster(
4557 scx: &StatementContext,
4558 stmt: CreateClusterStatement<Aug>,
4559) -> Result<Plan, PlanError> {
4560 let plan = plan_create_cluster_inner(scx, stmt)?;
4561
4562 if let CreateClusterVariant::Managed(_) = &plan.variant {
4564 let stmt = unplan_create_cluster(scx, plan.clone())
4565 .map_err(|e| PlanError::Replan(e.to_string()))?;
4566 let create_sql = stmt.to_ast_string_stable();
4567 let stmt = parse::parse(&create_sql)
4568 .map_err(|e| PlanError::Replan(e.to_string()))?
4569 .into_element()
4570 .ast;
4571 let (stmt, _resolved_ids) =
4572 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4573 let stmt = match stmt {
4574 Statement::CreateCluster(stmt) => stmt,
4575 stmt => {
4576 return Err(PlanError::Replan(format!(
4577 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4578 )));
4579 }
4580 };
4581 let replan =
4582 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4583 if plan != replan {
4584 return Err(PlanError::Replan(format!(
4585 "replan does not match: plan={plan:?}, replan={replan:?}"
4586 )));
4587 }
4588 }
4589
4590 Ok(Plan::CreateCluster(plan))
4591}
4592
4593pub fn plan_create_cluster_inner(
4594 scx: &StatementContext,
4595 CreateClusterStatement {
4596 name,
4597 options,
4598 features,
4599 }: CreateClusterStatement<Aug>,
4600) -> Result<CreateClusterPlan, PlanError> {
4601 let ClusterOptionExtracted {
4602 availability_zones,
4603 introspection_debugging,
4604 introspection_interval,
4605 managed,
4606 replicas,
4607 replication_factor,
4608 seen: _,
4609 size,
4610 disk,
4611 schedule,
4612 workload_class,
4613 }: ClusterOptionExtracted = options.try_into()?;
4614
4615 let managed = managed.unwrap_or_else(|| replicas.is_none());
4616
4617 if !scx.catalog.active_role_id().is_system() {
4618 if !features.is_empty() {
4619 sql_bail!("FEATURES not supported for non-system users");
4620 }
4621 if workload_class.is_some() {
4622 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4623 }
4624 }
4625
4626 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4627 let workload_class = workload_class.and_then(|v| v.0);
4628
4629 if managed {
4630 if replicas.is_some() {
4631 sql_bail!("REPLICAS not supported for managed clusters");
4632 }
4633 let Some(size) = size else {
4634 sql_bail!("SIZE must be specified for managed clusters");
4635 };
4636
4637 if disk.is_some() {
4638 if scx.catalog.is_cluster_size_cc(&size) {
4642 sql_bail!(
4643 "DISK option not supported for modern cluster sizes because disk is always enabled"
4644 );
4645 }
4646
4647 scx.catalog
4648 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4649 }
4650
4651 let compute = plan_compute_replica_config(
4652 introspection_interval,
4653 introspection_debugging.unwrap_or(false),
4654 )?;
4655
4656 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4657 replication_factor.unwrap_or_else(|| {
4658 scx.catalog
4659 .system_vars()
4660 .default_cluster_replication_factor()
4661 })
4662 } else {
4663 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4664 if replication_factor.is_some() {
4665 sql_bail!(
4666 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4667 );
4668 }
4669 0
4673 };
4674 let availability_zones = availability_zones.unwrap_or_default();
4675
4676 if !availability_zones.is_empty() {
4677 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4678 }
4679
4680 let ClusterFeatureExtracted {
4682 reoptimize_imported_views,
4683 enable_eager_delta_joins,
4684 enable_new_outer_join_lowering,
4685 enable_variadic_left_join_lowering,
4686 enable_letrec_fixpoint_analysis,
4687 enable_join_prioritize_arranged,
4688 enable_projection_pushdown_after_relation_cse,
4689 seen: _,
4690 } = ClusterFeatureExtracted::try_from(features)?;
4691 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4692 reoptimize_imported_views,
4693 enable_eager_delta_joins,
4694 enable_new_outer_join_lowering,
4695 enable_variadic_left_join_lowering,
4696 enable_letrec_fixpoint_analysis,
4697 enable_join_prioritize_arranged,
4698 enable_projection_pushdown_after_relation_cse,
4699 ..Default::default()
4700 };
4701
4702 let schedule = plan_cluster_schedule(schedule)?;
4703
4704 Ok(CreateClusterPlan {
4705 name: normalize::ident(name),
4706 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4707 replication_factor,
4708 size,
4709 availability_zones,
4710 compute,
4711 optimizer_feature_overrides,
4712 schedule,
4713 }),
4714 workload_class,
4715 })
4716 } else {
4717 let Some(replica_defs) = replicas else {
4718 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4719 };
4720 if availability_zones.is_some() {
4721 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4722 }
4723 if replication_factor.is_some() {
4724 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4725 }
4726 if introspection_debugging.is_some() {
4727 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4728 }
4729 if introspection_interval.is_some() {
4730 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4731 }
4732 if size.is_some() {
4733 sql_bail!("SIZE not supported for unmanaged clusters");
4734 }
4735 if disk.is_some() {
4736 sql_bail!("DISK not supported for unmanaged clusters");
4737 }
4738 if !features.is_empty() {
4739 sql_bail!("FEATURES not supported for unmanaged clusters");
4740 }
4741 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4742 sql_bail!(
4743 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4744 );
4745 }
4746
4747 let mut replicas = vec![];
4748 for ReplicaDefinition { name, options } in replica_defs {
4749 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4750 }
4751
4752 Ok(CreateClusterPlan {
4753 name: normalize::ident(name),
4754 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4755 workload_class,
4756 })
4757 }
4758}
4759
4760pub fn unplan_create_cluster(
4764 scx: &StatementContext,
4765 CreateClusterPlan {
4766 name,
4767 variant,
4768 workload_class,
4769 }: CreateClusterPlan,
4770) -> Result<CreateClusterStatement<Aug>, PlanError> {
4771 match variant {
4772 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4773 replication_factor,
4774 size,
4775 availability_zones,
4776 compute,
4777 optimizer_feature_overrides,
4778 schedule,
4779 }) => {
4780 let schedule = unplan_cluster_schedule(schedule);
4781 let OptimizerFeatureOverrides {
4782 enable_guard_subquery_tablefunc: _,
4783 enable_consolidate_after_union_negate: _,
4784 enable_reduce_mfp_fusion: _,
4785 enable_cardinality_estimates: _,
4786 persist_fast_path_limit: _,
4787 reoptimize_imported_views,
4788 enable_eager_delta_joins,
4789 enable_new_outer_join_lowering,
4790 enable_variadic_left_join_lowering,
4791 enable_letrec_fixpoint_analysis,
4792 enable_reduce_reduction: _,
4793 enable_join_prioritize_arranged,
4794 enable_projection_pushdown_after_relation_cse,
4795 enable_less_reduce_in_eqprop: _,
4796 enable_dequadratic_eqprop_map: _,
4797 enable_eq_classes_withholding_errors: _,
4798 enable_fast_path_plan_insights: _,
4799 } = optimizer_feature_overrides;
4800 let features_extracted = ClusterFeatureExtracted {
4802 seen: Default::default(),
4804 reoptimize_imported_views,
4805 enable_eager_delta_joins,
4806 enable_new_outer_join_lowering,
4807 enable_variadic_left_join_lowering,
4808 enable_letrec_fixpoint_analysis,
4809 enable_join_prioritize_arranged,
4810 enable_projection_pushdown_after_relation_cse,
4811 };
4812 let features = features_extracted.into_values(scx.catalog);
4813 let availability_zones = if availability_zones.is_empty() {
4814 None
4815 } else {
4816 Some(availability_zones)
4817 };
4818 let (introspection_interval, introspection_debugging) =
4819 unplan_compute_replica_config(compute);
4820 let replication_factor = match &schedule {
4823 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4824 ClusterScheduleOptionValue::Refresh { .. } => {
4825 assert!(
4826 replication_factor <= 1,
4827 "replication factor, {replication_factor:?}, must be <= 1"
4828 );
4829 None
4830 }
4831 };
4832 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4833 let options_extracted = ClusterOptionExtracted {
4834 seen: Default::default(),
4836 availability_zones,
4837 disk: None,
4838 introspection_debugging: Some(introspection_debugging),
4839 introspection_interval,
4840 managed: Some(true),
4841 replicas: None,
4842 replication_factor,
4843 size: Some(size),
4844 schedule: Some(schedule),
4845 workload_class,
4846 };
4847 let options = options_extracted.into_values(scx.catalog);
4848 let name = Ident::new_unchecked(name);
4849 Ok(CreateClusterStatement {
4850 name,
4851 options,
4852 features,
4853 })
4854 }
4855 CreateClusterVariant::Unmanaged(_) => {
4856 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4857 }
4858 }
4859}
4860
4861generate_extracted_config!(
4862 ReplicaOption,
4863 (AvailabilityZone, String),
4864 (BilledAs, String),
4865 (ComputeAddresses, Vec<String>),
4866 (ComputectlAddresses, Vec<String>),
4867 (Disk, bool),
4868 (Internal, bool, Default(false)),
4869 (IntrospectionDebugging, bool, Default(false)),
4870 (IntrospectionInterval, OptionalDuration),
4871 (Size, String),
4872 (StorageAddresses, Vec<String>),
4873 (StoragectlAddresses, Vec<String>),
4874 (Workers, u16)
4875);
4876
4877fn plan_replica_config(
4878 scx: &StatementContext,
4879 options: Vec<ReplicaOption<Aug>>,
4880) -> Result<ReplicaConfig, PlanError> {
4881 let ReplicaOptionExtracted {
4882 availability_zone,
4883 billed_as,
4884 computectl_addresses,
4885 disk,
4886 internal,
4887 introspection_debugging,
4888 introspection_interval,
4889 size,
4890 storagectl_addresses,
4891 ..
4892 }: ReplicaOptionExtracted = options.try_into()?;
4893
4894 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4895
4896 match (
4897 size,
4898 availability_zone,
4899 billed_as,
4900 storagectl_addresses,
4901 computectl_addresses,
4902 ) {
4903 (None, _, None, None, None) => {
4905 sql_bail!("SIZE option must be specified");
4908 }
4909 (Some(size), availability_zone, billed_as, None, None) => {
4910 if disk.is_some() {
4911 if scx.catalog.is_cluster_size_cc(&size) {
4915 sql_bail!(
4916 "DISK option not supported for modern cluster sizes because disk is always enabled"
4917 );
4918 }
4919
4920 scx.catalog
4921 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4922 }
4923
4924 Ok(ReplicaConfig::Orchestrated {
4925 size,
4926 availability_zone,
4927 compute,
4928 billed_as,
4929 internal,
4930 })
4931 }
4932
4933 (None, None, None, storagectl_addresses, computectl_addresses) => {
4934 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4935
4936 let Some(storagectl_addrs) = storagectl_addresses else {
4940 sql_bail!("missing STORAGECTL ADDRESSES option");
4941 };
4942 let Some(computectl_addrs) = computectl_addresses else {
4943 sql_bail!("missing COMPUTECTL ADDRESSES option");
4944 };
4945
4946 if storagectl_addrs.len() != computectl_addrs.len() {
4947 sql_bail!(
4948 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4949 );
4950 }
4951
4952 if disk.is_some() {
4953 sql_bail!("DISK can't be specified for unorchestrated clusters");
4954 }
4955
4956 Ok(ReplicaConfig::Unorchestrated {
4957 storagectl_addrs,
4958 computectl_addrs,
4959 compute,
4960 })
4961 }
4962 _ => {
4963 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4966 }
4967 }
4968}
4969
4970fn plan_compute_replica_config(
4974 introspection_interval: Option<OptionalDuration>,
4975 introspection_debugging: bool,
4976) -> Result<ComputeReplicaConfig, PlanError> {
4977 let introspection_interval = introspection_interval
4978 .map(|OptionalDuration(i)| i)
4979 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
4980 let introspection = match introspection_interval {
4981 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
4982 interval,
4983 debugging: introspection_debugging,
4984 }),
4985 None if introspection_debugging => {
4986 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
4987 }
4988 None => None,
4989 };
4990 let compute = ComputeReplicaConfig { introspection };
4991 Ok(compute)
4992}
4993
4994fn unplan_compute_replica_config(
4998 compute_replica_config: ComputeReplicaConfig,
4999) -> (Option<OptionalDuration>, bool) {
5000 match compute_replica_config.introspection {
5001 Some(ComputeReplicaIntrospectionConfig {
5002 debugging,
5003 interval,
5004 }) => (Some(OptionalDuration(Some(interval))), debugging),
5005 None => (Some(OptionalDuration(None)), false),
5006 }
5007}
5008
5009fn plan_cluster_schedule(
5013 schedule: ClusterScheduleOptionValue,
5014) -> Result<ClusterSchedule, PlanError> {
5015 Ok(match schedule {
5016 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5017 ClusterScheduleOptionValue::Refresh {
5019 hydration_time_estimate: None,
5020 } => ClusterSchedule::Refresh {
5021 hydration_time_estimate: Duration::from_millis(0),
5022 },
5023 ClusterScheduleOptionValue::Refresh {
5025 hydration_time_estimate: Some(interval_value),
5026 } => {
5027 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5028 if interval.as_microseconds() < 0 {
5029 sql_bail!(
5030 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5031 interval
5032 );
5033 }
5034 if interval.months != 0 {
5035 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5039 }
5040 let duration = interval.duration()?;
5041 if u64::try_from(duration.as_millis()).is_err()
5042 || Interval::from_duration(&duration).is_err()
5043 {
5044 sql_bail!("HYDRATION TIME ESTIMATE too large");
5045 }
5046 ClusterSchedule::Refresh {
5047 hydration_time_estimate: duration,
5048 }
5049 }
5050 })
5051}
5052
5053fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5057 match schedule {
5058 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5059 ClusterSchedule::Refresh {
5060 hydration_time_estimate,
5061 } => {
5062 let interval = Interval::from_duration(&hydration_time_estimate)
5063 .expect("planning ensured that this is convertible back to Interval");
5064 let interval_value = literal::unplan_interval(&interval);
5065 ClusterScheduleOptionValue::Refresh {
5066 hydration_time_estimate: Some(interval_value),
5067 }
5068 }
5069 }
5070}
5071
5072pub fn describe_create_cluster_replica(
5073 _: &StatementContext,
5074 _: CreateClusterReplicaStatement<Aug>,
5075) -> Result<StatementDesc, PlanError> {
5076 Ok(StatementDesc::new(None))
5077}
5078
5079pub fn plan_create_cluster_replica(
5080 scx: &StatementContext,
5081 CreateClusterReplicaStatement {
5082 definition: ReplicaDefinition { name, options },
5083 of_cluster,
5084 }: CreateClusterReplicaStatement<Aug>,
5085) -> Result<Plan, PlanError> {
5086 let cluster = scx
5087 .catalog
5088 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5089 let current_replica_count = cluster.replica_ids().iter().count();
5090 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5091 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5092 return Err(PlanError::CreateReplicaFailStorageObjects {
5093 current_replica_count,
5094 internal_replica_count,
5095 hypothetical_replica_count: current_replica_count + 1,
5096 });
5097 }
5098
5099 let config = plan_replica_config(scx, options)?;
5100
5101 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5102 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5103 return Err(PlanError::MangedReplicaName(name.into_string()));
5104 }
5105 } else {
5106 ensure_cluster_is_not_managed(scx, cluster.id())?;
5107 }
5108
5109 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5110 name: normalize::ident(name),
5111 cluster_id: cluster.id(),
5112 config,
5113 }))
5114}
5115
5116pub fn describe_create_secret(
5117 _: &StatementContext,
5118 _: CreateSecretStatement<Aug>,
5119) -> Result<StatementDesc, PlanError> {
5120 Ok(StatementDesc::new(None))
5121}
5122
5123pub fn plan_create_secret(
5124 scx: &StatementContext,
5125 stmt: CreateSecretStatement<Aug>,
5126) -> Result<Plan, PlanError> {
5127 let CreateSecretStatement {
5128 name,
5129 if_not_exists,
5130 value,
5131 } = &stmt;
5132
5133 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5134 let mut create_sql_statement = stmt.clone();
5135 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5136 let create_sql =
5137 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5138 let secret_as = query::plan_secret_as(scx, value.clone())?;
5139
5140 let secret = Secret {
5141 create_sql,
5142 secret_as,
5143 };
5144
5145 Ok(Plan::CreateSecret(CreateSecretPlan {
5146 name,
5147 secret,
5148 if_not_exists: *if_not_exists,
5149 }))
5150}
5151
5152pub fn describe_create_connection(
5153 _: &StatementContext,
5154 _: CreateConnectionStatement<Aug>,
5155) -> Result<StatementDesc, PlanError> {
5156 Ok(StatementDesc::new(None))
5157}
5158
5159generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5160
5161pub fn plan_create_connection(
5162 scx: &StatementContext,
5163 mut stmt: CreateConnectionStatement<Aug>,
5164) -> Result<Plan, PlanError> {
5165 let CreateConnectionStatement {
5166 name,
5167 connection_type,
5168 values,
5169 if_not_exists,
5170 with_options,
5171 } = stmt.clone();
5172 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5173 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5174 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5175
5176 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5177 if options.validate.is_some() {
5178 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5179 }
5180 let validate = match options.validate {
5181 Some(val) => val,
5182 None => {
5183 scx.catalog
5184 .system_vars()
5185 .enable_default_connection_validation()
5186 && details.to_connection().validate_by_default()
5187 }
5188 };
5189
5190 let full_name = scx.catalog.resolve_full_name(&name);
5192 let partial_name = PartialItemName::from(full_name.clone());
5193 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5194 return Err(PlanError::ItemAlreadyExists {
5195 name: full_name.to_string(),
5196 item_type: item.item_type(),
5197 });
5198 }
5199
5200 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5203 stmt.values.retain(|v| {
5204 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5205 });
5206 stmt.values.push(ConnectionOption {
5207 name: ConnectionOptionName::PublicKey1,
5208 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5209 });
5210 stmt.values.push(ConnectionOption {
5211 name: ConnectionOptionName::PublicKey2,
5212 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5213 });
5214 }
5215 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5216
5217 let plan = CreateConnectionPlan {
5218 name,
5219 if_not_exists,
5220 connection: crate::plan::Connection {
5221 create_sql,
5222 details,
5223 },
5224 validate,
5225 };
5226 Ok(Plan::CreateConnection(plan))
5227}
5228
5229fn plan_drop_database(
5230 scx: &StatementContext,
5231 if_exists: bool,
5232 name: &UnresolvedDatabaseName,
5233 cascade: bool,
5234) -> Result<Option<DatabaseId>, PlanError> {
5235 Ok(match resolve_database(scx, name, if_exists)? {
5236 Some(database) => {
5237 if !cascade && database.has_schemas() {
5238 sql_bail!(
5239 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5240 name,
5241 );
5242 }
5243 Some(database.id())
5244 }
5245 None => None,
5246 })
5247}
5248
5249pub fn describe_drop_objects(
5250 _: &StatementContext,
5251 _: DropObjectsStatement,
5252) -> Result<StatementDesc, PlanError> {
5253 Ok(StatementDesc::new(None))
5254}
5255
5256pub fn plan_drop_objects(
5257 scx: &mut StatementContext,
5258 DropObjectsStatement {
5259 object_type,
5260 if_exists,
5261 names,
5262 cascade,
5263 }: DropObjectsStatement,
5264) -> Result<Plan, PlanError> {
5265 assert_ne!(
5266 object_type,
5267 mz_sql_parser::ast::ObjectType::Func,
5268 "rejected in parser"
5269 );
5270 let object_type = object_type.into();
5271
5272 let mut referenced_ids = Vec::new();
5273 for name in names {
5274 let id = match &name {
5275 UnresolvedObjectName::Cluster(name) => {
5276 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5277 }
5278 UnresolvedObjectName::ClusterReplica(name) => {
5279 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5280 }
5281 UnresolvedObjectName::Database(name) => {
5282 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5283 }
5284 UnresolvedObjectName::Schema(name) => {
5285 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5286 }
5287 UnresolvedObjectName::Role(name) => {
5288 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5289 }
5290 UnresolvedObjectName::Item(name) => {
5291 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5292 .map(ObjectId::Item)
5293 }
5294 UnresolvedObjectName::NetworkPolicy(name) => {
5295 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5296 }
5297 };
5298 match id {
5299 Some(id) => referenced_ids.push(id),
5300 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5301 name: name.to_ast_string_simple(),
5302 object_type,
5303 }),
5304 }
5305 }
5306 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5307
5308 Ok(Plan::DropObjects(DropObjectsPlan {
5309 referenced_ids,
5310 drop_ids,
5311 object_type,
5312 }))
5313}
5314
5315fn plan_drop_schema(
5316 scx: &StatementContext,
5317 if_exists: bool,
5318 name: &UnresolvedSchemaName,
5319 cascade: bool,
5320) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5321 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5322 Some((database_spec, schema_spec)) => {
5323 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5324 sql_bail!(
5325 "cannot drop schema {name} because it is required by the database system",
5326 );
5327 }
5328 if let SchemaSpecifier::Temporary = schema_spec {
5329 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5330 }
5331 let schema = scx.get_schema(&database_spec, &schema_spec);
5332 if !cascade && schema.has_items() {
5333 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5334 sql_bail!(
5335 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5336 full_schema_name
5337 );
5338 }
5339 Some((database_spec, schema_spec))
5340 }
5341 None => None,
5342 })
5343}
5344
5345fn plan_drop_role(
5346 scx: &StatementContext,
5347 if_exists: bool,
5348 name: &Ident,
5349) -> Result<Option<RoleId>, PlanError> {
5350 match scx.catalog.resolve_role(name.as_str()) {
5351 Ok(role) => {
5352 let id = role.id();
5353 if &id == scx.catalog.active_role_id() {
5354 sql_bail!("current role cannot be dropped");
5355 }
5356 for role in scx.catalog.get_roles() {
5357 for (member_id, grantor_id) in role.membership() {
5358 if &id == grantor_id {
5359 let member_role = scx.catalog.get_role(member_id);
5360 sql_bail!(
5361 "cannot drop role {}: still depended up by membership of role {} in role {}",
5362 name.as_str(),
5363 role.name(),
5364 member_role.name()
5365 );
5366 }
5367 }
5368 }
5369 Ok(Some(role.id()))
5370 }
5371 Err(_) if if_exists => Ok(None),
5372 Err(e) => Err(e.into()),
5373 }
5374}
5375
5376fn plan_drop_cluster(
5377 scx: &StatementContext,
5378 if_exists: bool,
5379 name: &Ident,
5380 cascade: bool,
5381) -> Result<Option<ClusterId>, PlanError> {
5382 Ok(match resolve_cluster(scx, name, if_exists)? {
5383 Some(cluster) => {
5384 if !cascade && !cluster.bound_objects().is_empty() {
5385 return Err(PlanError::DependentObjectsStillExist {
5386 object_type: "cluster".to_string(),
5387 object_name: cluster.name().to_string(),
5388 dependents: Vec::new(),
5389 });
5390 }
5391 Some(cluster.id())
5392 }
5393 None => None,
5394 })
5395}
5396
5397fn plan_drop_network_policy(
5398 scx: &StatementContext,
5399 if_exists: bool,
5400 name: &Ident,
5401) -> Result<Option<NetworkPolicyId>, PlanError> {
5402 match scx.catalog.resolve_network_policy(name.as_str()) {
5403 Ok(policy) => {
5404 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5407 Err(PlanError::NetworkPolicyInUse)
5408 } else {
5409 Ok(Some(policy.id()))
5410 }
5411 }
5412 Err(_) if if_exists => Ok(None),
5413 Err(e) => Err(e.into()),
5414 }
5415}
5416
5417fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5420 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5422 false
5423 } else {
5424 cluster.bound_objects().iter().any(|id| {
5426 let item = scx.catalog.get_item(id);
5427 matches!(
5428 item.item_type(),
5429 CatalogItemType::Sink | CatalogItemType::Source
5430 )
5431 })
5432 }
5433}
5434
5435fn plan_drop_cluster_replica(
5436 scx: &StatementContext,
5437 if_exists: bool,
5438 name: &QualifiedReplica,
5439) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5440 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5441 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5442}
5443
5444fn plan_drop_item(
5446 scx: &StatementContext,
5447 object_type: ObjectType,
5448 if_exists: bool,
5449 name: UnresolvedItemName,
5450 cascade: bool,
5451) -> Result<Option<CatalogItemId>, PlanError> {
5452 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5453 Ok(r) => r,
5454 Err(PlanError::MismatchedObjectType {
5456 name,
5457 is_type: ObjectType::MaterializedView,
5458 expected_type: ObjectType::View,
5459 }) => {
5460 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5461 }
5462 e => e?,
5463 };
5464
5465 Ok(match resolved {
5466 Some(catalog_item) => {
5467 if catalog_item.id().is_system() {
5468 sql_bail!(
5469 "cannot drop {} {} because it is required by the database system",
5470 catalog_item.item_type(),
5471 scx.catalog.minimal_qualification(catalog_item.name()),
5472 );
5473 }
5474
5475 if !cascade {
5476 for id in catalog_item.used_by() {
5477 let dep = scx.catalog.get_item(id);
5478 if dependency_prevents_drop(object_type, dep) {
5479 return Err(PlanError::DependentObjectsStillExist {
5480 object_type: catalog_item.item_type().to_string(),
5481 object_name: scx
5482 .catalog
5483 .minimal_qualification(catalog_item.name())
5484 .to_string(),
5485 dependents: vec![(
5486 dep.item_type().to_string(),
5487 scx.catalog.minimal_qualification(dep.name()).to_string(),
5488 )],
5489 });
5490 }
5491 }
5492 }
5495 Some(catalog_item.id())
5496 }
5497 None => None,
5498 })
5499}
5500
5501fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5503 match object_type {
5504 ObjectType::Type => true,
5505 _ => match dep.item_type() {
5506 CatalogItemType::Func
5507 | CatalogItemType::Table
5508 | CatalogItemType::Source
5509 | CatalogItemType::View
5510 | CatalogItemType::MaterializedView
5511 | CatalogItemType::Sink
5512 | CatalogItemType::Type
5513 | CatalogItemType::Secret
5514 | CatalogItemType::Connection
5515 | CatalogItemType::ContinualTask => true,
5516 CatalogItemType::Index => false,
5517 },
5518 }
5519}
5520
5521pub fn describe_alter_index_options(
5522 _: &StatementContext,
5523 _: AlterIndexStatement<Aug>,
5524) -> Result<StatementDesc, PlanError> {
5525 Ok(StatementDesc::new(None))
5526}
5527
5528pub fn describe_drop_owned(
5529 _: &StatementContext,
5530 _: DropOwnedStatement<Aug>,
5531) -> Result<StatementDesc, PlanError> {
5532 Ok(StatementDesc::new(None))
5533}
5534
5535pub fn plan_drop_owned(
5536 scx: &StatementContext,
5537 drop: DropOwnedStatement<Aug>,
5538) -> Result<Plan, PlanError> {
5539 let cascade = drop.cascade();
5540 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5541 let mut drop_ids = Vec::new();
5542 let mut privilege_revokes = Vec::new();
5543 let mut default_privilege_revokes = Vec::new();
5544
5545 fn update_privilege_revokes(
5546 object_id: SystemObjectId,
5547 privileges: &PrivilegeMap,
5548 role_ids: &BTreeSet<RoleId>,
5549 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5550 ) {
5551 privilege_revokes.extend(iter::zip(
5552 iter::repeat(object_id),
5553 privileges
5554 .all_values()
5555 .filter(|privilege| role_ids.contains(&privilege.grantee))
5556 .cloned(),
5557 ));
5558 }
5559
5560 for replica in scx.catalog.get_cluster_replicas() {
5562 if role_ids.contains(&replica.owner_id()) {
5563 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5564 }
5565 }
5566
5567 for cluster in scx.catalog.get_clusters() {
5569 if role_ids.contains(&cluster.owner_id()) {
5570 if !cascade {
5572 let non_owned_bound_objects: Vec<_> = cluster
5573 .bound_objects()
5574 .into_iter()
5575 .map(|item_id| scx.catalog.get_item(item_id))
5576 .filter(|item| !role_ids.contains(&item.owner_id()))
5577 .collect();
5578 if !non_owned_bound_objects.is_empty() {
5579 let names: Vec<_> = non_owned_bound_objects
5580 .into_iter()
5581 .map(|item| {
5582 (
5583 item.item_type().to_string(),
5584 scx.catalog.resolve_full_name(item.name()).to_string(),
5585 )
5586 })
5587 .collect();
5588 return Err(PlanError::DependentObjectsStillExist {
5589 object_type: "cluster".to_string(),
5590 object_name: cluster.name().to_string(),
5591 dependents: names,
5592 });
5593 }
5594 }
5595 drop_ids.push(cluster.id().into());
5596 }
5597 update_privilege_revokes(
5598 SystemObjectId::Object(cluster.id().into()),
5599 cluster.privileges(),
5600 &role_ids,
5601 &mut privilege_revokes,
5602 );
5603 }
5604
5605 for item in scx.catalog.get_items() {
5607 if role_ids.contains(&item.owner_id()) {
5608 if !cascade {
5609 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5611 let non_owned_dependencies: Vec<_> = used_by
5612 .into_iter()
5613 .map(|item_id| scx.catalog.get_item(item_id))
5614 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5615 .filter(|item| !role_ids.contains(&item.owner_id()))
5616 .collect();
5617 if !non_owned_dependencies.is_empty() {
5618 let names: Vec<_> = non_owned_dependencies
5619 .into_iter()
5620 .map(|item| {
5621 let item_typ = item.item_type().to_string();
5622 let item_name =
5623 scx.catalog.resolve_full_name(item.name()).to_string();
5624 (item_typ, item_name)
5625 })
5626 .collect();
5627 Err(PlanError::DependentObjectsStillExist {
5628 object_type: item.item_type().to_string(),
5629 object_name: scx
5630 .catalog
5631 .resolve_full_name(item.name())
5632 .to_string()
5633 .to_string(),
5634 dependents: names,
5635 })
5636 } else {
5637 Ok(())
5638 }
5639 };
5640
5641 if let Some(id) = item.progress_id() {
5644 let progress_item = scx.catalog.get_item(&id);
5645 check_if_dependents_exist(progress_item.used_by())?;
5646 }
5647 check_if_dependents_exist(item.used_by())?;
5648 }
5649 drop_ids.push(item.id().into());
5650 }
5651 update_privilege_revokes(
5652 SystemObjectId::Object(item.id().into()),
5653 item.privileges(),
5654 &role_ids,
5655 &mut privilege_revokes,
5656 );
5657 }
5658
5659 for schema in scx.catalog.get_schemas() {
5661 if !schema.id().is_temporary() {
5662 if role_ids.contains(&schema.owner_id()) {
5663 if !cascade {
5664 let non_owned_dependencies: Vec<_> = schema
5665 .item_ids()
5666 .map(|item_id| scx.catalog.get_item(&item_id))
5667 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5668 .filter(|item| !role_ids.contains(&item.owner_id()))
5669 .collect();
5670 if !non_owned_dependencies.is_empty() {
5671 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5672 sql_bail!(
5673 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5674 full_schema_name.to_string().quoted()
5675 );
5676 }
5677 }
5678 drop_ids.push((*schema.database(), *schema.id()).into())
5679 }
5680 update_privilege_revokes(
5681 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5682 schema.privileges(),
5683 &role_ids,
5684 &mut privilege_revokes,
5685 );
5686 }
5687 }
5688
5689 for database in scx.catalog.get_databases() {
5691 if role_ids.contains(&database.owner_id()) {
5692 if !cascade {
5693 let non_owned_schemas: Vec<_> = database
5694 .schemas()
5695 .into_iter()
5696 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5697 .collect();
5698 if !non_owned_schemas.is_empty() {
5699 sql_bail!(
5700 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5701 database.name().quoted(),
5702 );
5703 }
5704 }
5705 drop_ids.push(database.id().into());
5706 }
5707 update_privilege_revokes(
5708 SystemObjectId::Object(database.id().into()),
5709 database.privileges(),
5710 &role_ids,
5711 &mut privilege_revokes,
5712 );
5713 }
5714
5715 update_privilege_revokes(
5717 SystemObjectId::System,
5718 scx.catalog.get_system_privileges(),
5719 &role_ids,
5720 &mut privilege_revokes,
5721 );
5722
5723 for (default_privilege_object, default_privilege_acl_items) in
5724 scx.catalog.get_default_privileges()
5725 {
5726 for default_privilege_acl_item in default_privilege_acl_items {
5727 if role_ids.contains(&default_privilege_object.role_id)
5728 || role_ids.contains(&default_privilege_acl_item.grantee)
5729 {
5730 default_privilege_revokes.push((
5731 default_privilege_object.clone(),
5732 default_privilege_acl_item.clone(),
5733 ));
5734 }
5735 }
5736 }
5737
5738 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5739
5740 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5741 if !system_ids.is_empty() {
5742 let mut owners = system_ids
5743 .into_iter()
5744 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5745 .collect::<BTreeSet<_>>()
5746 .into_iter()
5747 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5748 sql_bail!(
5749 "cannot drop objects owned by role {} because they are required by the database system",
5750 owners.join(", "),
5751 );
5752 }
5753
5754 Ok(Plan::DropOwned(DropOwnedPlan {
5755 role_ids: role_ids.into_iter().collect(),
5756 drop_ids,
5757 privilege_revokes,
5758 default_privilege_revokes,
5759 }))
5760}
5761
5762fn plan_retain_history_option(
5763 scx: &StatementContext,
5764 retain_history: Option<OptionalDuration>,
5765) -> Result<Option<CompactionWindow>, PlanError> {
5766 if let Some(OptionalDuration(lcw)) = retain_history {
5767 Ok(Some(plan_retain_history(scx, lcw)?))
5768 } else {
5769 Ok(None)
5770 }
5771}
5772
5773fn plan_retain_history(
5779 scx: &StatementContext,
5780 lcw: Option<Duration>,
5781) -> Result<CompactionWindow, PlanError> {
5782 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5783 match lcw {
5784 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5789 option_name: "RETAIN HISTORY".to_string(),
5790 err: Box::new(PlanError::Unstructured(
5791 "internal error: unexpectedly zero".to_string(),
5792 )),
5793 }),
5794 Some(duration) => {
5795 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5798 && scx
5799 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5800 .is_err()
5801 {
5802 return Err(PlanError::RetainHistoryLow {
5803 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5804 });
5805 }
5806 Ok(duration.try_into()?)
5807 }
5808 None => {
5811 if scx
5812 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5813 .is_err()
5814 {
5815 Err(PlanError::RetainHistoryRequired)
5816 } else {
5817 Ok(CompactionWindow::DisableCompaction)
5818 }
5819 }
5820 }
5821}
5822
5823generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5824
5825fn plan_index_options(
5826 scx: &StatementContext,
5827 with_opts: Vec<IndexOption<Aug>>,
5828) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5829 if !with_opts.is_empty() {
5830 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5832 }
5833
5834 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5835
5836 let mut out = Vec::with_capacity(1);
5837 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5838 out.push(crate::plan::IndexOption::RetainHistory(cw));
5839 }
5840 Ok(out)
5841}
5842
5843generate_extracted_config!(
5844 TableOption,
5845 (PartitionBy, Vec<Ident>),
5846 (RetainHistory, OptionalDuration),
5847 (RedactedTest, String)
5848);
5849
5850fn plan_table_options(
5851 scx: &StatementContext,
5852 desc: &RelationDesc,
5853 with_opts: Vec<TableOption<Aug>>,
5854) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5855 let TableOptionExtracted {
5856 partition_by,
5857 retain_history,
5858 redacted_test,
5859 ..
5860 }: TableOptionExtracted = with_opts.try_into()?;
5861
5862 if let Some(partition_by) = partition_by {
5863 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5864 check_partition_by(desc, partition_by)?;
5865 }
5866
5867 if redacted_test.is_some() {
5868 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5869 }
5870
5871 let mut out = Vec::with_capacity(1);
5872 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5873 out.push(crate::plan::TableOption::RetainHistory(cw));
5874 }
5875 Ok(out)
5876}
5877
5878pub fn plan_alter_index_options(
5879 scx: &mut StatementContext,
5880 AlterIndexStatement {
5881 index_name,
5882 if_exists,
5883 action,
5884 }: AlterIndexStatement<Aug>,
5885) -> Result<Plan, PlanError> {
5886 let object_type = ObjectType::Index;
5887 match action {
5888 AlterIndexAction::ResetOptions(options) => {
5889 let mut options = options.into_iter();
5890 if let Some(opt) = options.next() {
5891 match opt {
5892 IndexOptionName::RetainHistory => {
5893 if options.next().is_some() {
5894 sql_bail!("RETAIN HISTORY must be only option");
5895 }
5896 return alter_retain_history(
5897 scx,
5898 object_type,
5899 if_exists,
5900 UnresolvedObjectName::Item(index_name),
5901 None,
5902 );
5903 }
5904 }
5905 }
5906 sql_bail!("expected option");
5907 }
5908 AlterIndexAction::SetOptions(options) => {
5909 let mut options = options.into_iter();
5910 if let Some(opt) = options.next() {
5911 match opt.name {
5912 IndexOptionName::RetainHistory => {
5913 if options.next().is_some() {
5914 sql_bail!("RETAIN HISTORY must be only option");
5915 }
5916 return alter_retain_history(
5917 scx,
5918 object_type,
5919 if_exists,
5920 UnresolvedObjectName::Item(index_name),
5921 opt.value,
5922 );
5923 }
5924 }
5925 }
5926 sql_bail!("expected option");
5927 }
5928 }
5929}
5930
5931pub fn describe_alter_cluster_set_options(
5932 _: &StatementContext,
5933 _: AlterClusterStatement<Aug>,
5934) -> Result<StatementDesc, PlanError> {
5935 Ok(StatementDesc::new(None))
5936}
5937
5938pub fn plan_alter_cluster(
5939 scx: &mut StatementContext,
5940 AlterClusterStatement {
5941 name,
5942 action,
5943 if_exists,
5944 }: AlterClusterStatement<Aug>,
5945) -> Result<Plan, PlanError> {
5946 let cluster = match resolve_cluster(scx, &name, if_exists)? {
5947 Some(entry) => entry,
5948 None => {
5949 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5950 name: name.to_ast_string_simple(),
5951 object_type: ObjectType::Cluster,
5952 });
5953
5954 return Ok(Plan::AlterNoop(AlterNoopPlan {
5955 object_type: ObjectType::Cluster,
5956 }));
5957 }
5958 };
5959
5960 let mut options: PlanClusterOption = Default::default();
5961 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5962
5963 match action {
5964 AlterClusterAction::SetOptions {
5965 options: set_options,
5966 with_options,
5967 } => {
5968 let ClusterOptionExtracted {
5969 availability_zones,
5970 introspection_debugging,
5971 introspection_interval,
5972 managed,
5973 replicas: replica_defs,
5974 replication_factor,
5975 seen: _,
5976 size,
5977 disk,
5978 schedule,
5979 workload_class,
5980 }: ClusterOptionExtracted = set_options.try_into()?;
5981
5982 if !scx.catalog.active_role_id().is_system() {
5983 if workload_class.is_some() {
5984 sql_bail!("WORKLOAD CLASS not supported for non-system users");
5985 }
5986 }
5987
5988 match managed.unwrap_or_else(|| cluster.is_managed()) {
5989 true => {
5990 let alter_strategy_extracted =
5991 ClusterAlterOptionExtracted::try_from(with_options)?;
5992 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
5993
5994 match alter_strategy {
5995 AlterClusterPlanStrategy::None => {}
5996 _ => {
5997 scx.require_feature_flag(
5998 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
5999 )?;
6000 }
6001 }
6002
6003 if replica_defs.is_some() {
6004 sql_bail!("REPLICAS not supported for managed clusters");
6005 }
6006 if schedule.is_some()
6007 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6008 {
6009 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6010 }
6011
6012 if let Some(replication_factor) = replication_factor {
6013 if schedule.is_some()
6014 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6015 {
6016 sql_bail!(
6017 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6018 );
6019 }
6020 if let Some(current_schedule) = cluster.schedule() {
6021 if !matches!(current_schedule, ClusterSchedule::Manual) {
6022 sql_bail!(
6023 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6024 );
6025 }
6026 }
6027
6028 let internal_replica_count =
6029 cluster.replicas().iter().filter(|r| r.internal()).count();
6030 let hypothetical_replica_count =
6031 internal_replica_count + usize::cast_from(replication_factor);
6032
6033 if contains_single_replica_objects(scx, cluster)
6036 && hypothetical_replica_count > 1
6037 {
6038 return Err(PlanError::CreateReplicaFailStorageObjects {
6039 current_replica_count: cluster.replica_ids().iter().count(),
6040 internal_replica_count,
6041 hypothetical_replica_count,
6042 });
6043 }
6044 } else if alter_strategy.is_some() {
6045 let internal_replica_count =
6049 cluster.replicas().iter().filter(|r| r.internal()).count();
6050 let hypothetical_replica_count = internal_replica_count * 2;
6051 if contains_single_replica_objects(scx, cluster) {
6052 return Err(PlanError::CreateReplicaFailStorageObjects {
6053 current_replica_count: cluster.replica_ids().iter().count(),
6054 internal_replica_count,
6055 hypothetical_replica_count,
6056 });
6057 }
6058 }
6059 }
6060 false => {
6061 if !alter_strategy.is_none() {
6062 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6063 }
6064 if availability_zones.is_some() {
6065 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6066 }
6067 if replication_factor.is_some() {
6068 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6069 }
6070 if introspection_debugging.is_some() {
6071 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6072 }
6073 if introspection_interval.is_some() {
6074 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6075 }
6076 if size.is_some() {
6077 sql_bail!("SIZE not supported for unmanaged clusters");
6078 }
6079 if disk.is_some() {
6080 sql_bail!("DISK not supported for unmanaged clusters");
6081 }
6082 if schedule.is_some()
6083 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6084 {
6085 sql_bail!(
6086 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6087 );
6088 }
6089 if let Some(current_schedule) = cluster.schedule() {
6090 if !matches!(current_schedule, ClusterSchedule::Manual)
6091 && schedule.is_none()
6092 {
6093 sql_bail!(
6094 "when switching a cluster to unmanaged, if the managed \
6095 cluster's SCHEDULE is anything other than MANUAL, you have to \
6096 explicitly set the SCHEDULE to MANUAL"
6097 );
6098 }
6099 }
6100 }
6101 }
6102
6103 let mut replicas = vec![];
6104 for ReplicaDefinition { name, options } in
6105 replica_defs.into_iter().flat_map(Vec::into_iter)
6106 {
6107 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6108 }
6109
6110 if let Some(managed) = managed {
6111 options.managed = AlterOptionParameter::Set(managed);
6112 }
6113 if let Some(replication_factor) = replication_factor {
6114 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6115 }
6116 if let Some(size) = &size {
6117 options.size = AlterOptionParameter::Set(size.clone());
6118 }
6119 if let Some(availability_zones) = availability_zones {
6120 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6121 }
6122 if let Some(introspection_debugging) = introspection_debugging {
6123 options.introspection_debugging =
6124 AlterOptionParameter::Set(introspection_debugging);
6125 }
6126 if let Some(introspection_interval) = introspection_interval {
6127 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6128 }
6129 if disk.is_some() {
6130 let size = size.as_deref().unwrap_or_else(|| {
6134 cluster.managed_size().expect("cluster known to be managed")
6135 });
6136 if scx.catalog.is_cluster_size_cc(size) {
6137 sql_bail!(
6138 "DISK option not supported for modern cluster sizes because disk is always enabled"
6139 );
6140 }
6141
6142 scx.catalog
6143 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6144 }
6145 if !replicas.is_empty() {
6146 options.replicas = AlterOptionParameter::Set(replicas);
6147 }
6148 if let Some(schedule) = schedule {
6149 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6150 }
6151 if let Some(workload_class) = workload_class {
6152 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6153 }
6154 }
6155 AlterClusterAction::ResetOptions(reset_options) => {
6156 use AlterOptionParameter::Reset;
6157 use ClusterOptionName::*;
6158
6159 if !scx.catalog.active_role_id().is_system() {
6160 if reset_options.contains(&WorkloadClass) {
6161 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6162 }
6163 }
6164
6165 for option in reset_options {
6166 match option {
6167 AvailabilityZones => options.availability_zones = Reset,
6168 Disk => scx
6169 .catalog
6170 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6171 IntrospectionInterval => options.introspection_interval = Reset,
6172 IntrospectionDebugging => options.introspection_debugging = Reset,
6173 Managed => options.managed = Reset,
6174 Replicas => options.replicas = Reset,
6175 ReplicationFactor => options.replication_factor = Reset,
6176 Size => options.size = Reset,
6177 Schedule => options.schedule = Reset,
6178 WorkloadClass => options.workload_class = Reset,
6179 }
6180 }
6181 }
6182 }
6183 Ok(Plan::AlterCluster(AlterClusterPlan {
6184 id: cluster.id(),
6185 name: cluster.name().to_string(),
6186 options,
6187 strategy: alter_strategy,
6188 }))
6189}
6190
6191pub fn describe_alter_set_cluster(
6192 _: &StatementContext,
6193 _: AlterSetClusterStatement<Aug>,
6194) -> Result<StatementDesc, PlanError> {
6195 Ok(StatementDesc::new(None))
6196}
6197
6198pub fn plan_alter_item_set_cluster(
6199 scx: &StatementContext,
6200 AlterSetClusterStatement {
6201 if_exists,
6202 set_cluster: in_cluster_name,
6203 name,
6204 object_type,
6205 }: AlterSetClusterStatement<Aug>,
6206) -> Result<Plan, PlanError> {
6207 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6208
6209 let object_type = object_type.into();
6210
6211 match object_type {
6213 ObjectType::MaterializedView => {}
6214 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6215 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6216 }
6217 _ => {
6218 bail_never_supported!(
6219 format!("ALTER {object_type} SET CLUSTER"),
6220 "sql/alter-set-cluster/",
6221 format!("{object_type} has no associated cluster")
6222 )
6223 }
6224 }
6225
6226 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6227
6228 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6229 Some(entry) => {
6230 let current_cluster = entry.cluster_id();
6231 let Some(current_cluster) = current_cluster else {
6232 sql_bail!("No cluster associated with {name}");
6233 };
6234
6235 if current_cluster == in_cluster.id() {
6236 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6237 } else {
6238 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6239 id: entry.id(),
6240 set_cluster: in_cluster.id(),
6241 }))
6242 }
6243 }
6244 None => {
6245 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6246 name: name.to_ast_string_simple(),
6247 object_type,
6248 });
6249
6250 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6251 }
6252 }
6253}
6254
6255pub fn describe_alter_object_rename(
6256 _: &StatementContext,
6257 _: AlterObjectRenameStatement,
6258) -> Result<StatementDesc, PlanError> {
6259 Ok(StatementDesc::new(None))
6260}
6261
6262pub fn plan_alter_object_rename(
6263 scx: &mut StatementContext,
6264 AlterObjectRenameStatement {
6265 name,
6266 object_type,
6267 to_item_name,
6268 if_exists,
6269 }: AlterObjectRenameStatement,
6270) -> Result<Plan, PlanError> {
6271 let object_type = object_type.into();
6272 match (object_type, name) {
6273 (
6274 ObjectType::View
6275 | ObjectType::MaterializedView
6276 | ObjectType::Table
6277 | ObjectType::Source
6278 | ObjectType::Index
6279 | ObjectType::Sink
6280 | ObjectType::Secret
6281 | ObjectType::Connection,
6282 UnresolvedObjectName::Item(name),
6283 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6284 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6285 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6286 }
6287 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6288 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6289 }
6290 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6291 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6292 }
6293 (object_type, name) => {
6294 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6295 }
6296 }
6297}
6298
6299pub fn plan_alter_schema_rename(
6300 scx: &mut StatementContext,
6301 name: UnresolvedSchemaName,
6302 to_schema_name: Ident,
6303 if_exists: bool,
6304) -> Result<Plan, PlanError> {
6305 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6306 let object_type = ObjectType::Schema;
6307 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6308 name: name.to_ast_string_simple(),
6309 object_type,
6310 });
6311 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6312 };
6313
6314 if scx
6316 .resolve_schema_in_database(&db_spec, &to_schema_name)
6317 .is_ok()
6318 {
6319 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6320 to_schema_name.clone().into_string(),
6321 )));
6322 }
6323
6324 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6326 if schema.id().is_system() {
6327 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6328 }
6329
6330 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6331 cur_schema_spec: (db_spec, schema_spec),
6332 new_schema_name: to_schema_name.into_string(),
6333 }))
6334}
6335
6336pub fn plan_alter_schema_swap<F>(
6337 scx: &mut StatementContext,
6338 name_a: UnresolvedSchemaName,
6339 name_b: Ident,
6340 gen_temp_suffix: F,
6341) -> Result<Plan, PlanError>
6342where
6343 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6344{
6345 let schema_a = scx.resolve_schema(name_a.clone())?;
6346
6347 let db_spec = schema_a.database().clone();
6348 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6349 sql_bail!("cannot swap schemas that are in the ambient database");
6350 };
6351 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6352
6353 if schema_a.id().is_system() || schema_b.id().is_system() {
6355 bail_never_supported!("swapping a system schema".to_string())
6356 }
6357
6358 let check = |temp_suffix: &str| {
6362 let mut temp_name = ident!("mz_schema_swap_");
6363 temp_name.append_lossy(temp_suffix);
6364 scx.resolve_schema_in_database(&db_spec, &temp_name)
6365 .is_err()
6366 };
6367 let temp_suffix = gen_temp_suffix(&check)?;
6368 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6369
6370 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6371 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6372 schema_a_name: schema_a.name().schema.to_string(),
6373 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6374 schema_b_name: schema_b.name().schema.to_string(),
6375 name_temp,
6376 }))
6377}
6378
6379pub fn plan_alter_item_rename(
6380 scx: &mut StatementContext,
6381 object_type: ObjectType,
6382 name: UnresolvedItemName,
6383 to_item_name: Ident,
6384 if_exists: bool,
6385) -> Result<Plan, PlanError> {
6386 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6387 Ok(r) => r,
6388 Err(PlanError::MismatchedObjectType {
6390 name,
6391 is_type: ObjectType::MaterializedView,
6392 expected_type: ObjectType::View,
6393 }) => {
6394 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6395 }
6396 e => e?,
6397 };
6398
6399 match resolved {
6400 Some(entry) => {
6401 let full_name = scx.catalog.resolve_full_name(entry.name());
6402 let item_type = entry.item_type();
6403
6404 let proposed_name = QualifiedItemName {
6405 qualifiers: entry.name().qualifiers.clone(),
6406 item: to_item_name.clone().into_string(),
6407 };
6408
6409 let conflicting_type_exists;
6413 let conflicting_item_exists;
6414 if item_type == CatalogItemType::Type {
6415 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6416 conflicting_item_exists = scx
6417 .catalog
6418 .get_item_by_name(&proposed_name)
6419 .map(|item| item.item_type().conflicts_with_type())
6420 .unwrap_or(false);
6421 } else {
6422 conflicting_type_exists = item_type.conflicts_with_type()
6423 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6424 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6425 };
6426 if conflicting_type_exists || conflicting_item_exists {
6427 sql_bail!("catalog item '{}' already exists", to_item_name);
6428 }
6429
6430 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6431 id: entry.id(),
6432 current_full_name: full_name,
6433 to_name: normalize::ident(to_item_name),
6434 object_type,
6435 }))
6436 }
6437 None => {
6438 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6439 name: name.to_ast_string_simple(),
6440 object_type,
6441 });
6442
6443 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6444 }
6445 }
6446}
6447
6448pub fn plan_alter_cluster_rename(
6449 scx: &mut StatementContext,
6450 object_type: ObjectType,
6451 name: Ident,
6452 to_name: Ident,
6453 if_exists: bool,
6454) -> Result<Plan, PlanError> {
6455 match resolve_cluster(scx, &name, if_exists)? {
6456 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6457 id: entry.id(),
6458 name: entry.name().to_string(),
6459 to_name: ident(to_name),
6460 })),
6461 None => {
6462 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6463 name: name.to_ast_string_simple(),
6464 object_type,
6465 });
6466
6467 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6468 }
6469 }
6470}
6471
6472pub fn plan_alter_cluster_swap<F>(
6473 scx: &mut StatementContext,
6474 name_a: Ident,
6475 name_b: Ident,
6476 gen_temp_suffix: F,
6477) -> Result<Plan, PlanError>
6478where
6479 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6480{
6481 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6482 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6483
6484 let check = |temp_suffix: &str| {
6485 let mut temp_name = ident!("mz_schema_swap_");
6486 temp_name.append_lossy(temp_suffix);
6487 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6488 Err(CatalogError::UnknownCluster(_)) => true,
6490 Ok(_) | Err(_) => false,
6492 }
6493 };
6494 let temp_suffix = gen_temp_suffix(&check)?;
6495 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6496
6497 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6498 id_a: cluster_a.id(),
6499 id_b: cluster_b.id(),
6500 name_a: name_a.into_string(),
6501 name_b: name_b.into_string(),
6502 name_temp,
6503 }))
6504}
6505
6506pub fn plan_alter_cluster_replica_rename(
6507 scx: &mut StatementContext,
6508 object_type: ObjectType,
6509 name: QualifiedReplica,
6510 to_item_name: Ident,
6511 if_exists: bool,
6512) -> Result<Plan, PlanError> {
6513 match resolve_cluster_replica(scx, &name, if_exists)? {
6514 Some((cluster, replica)) => {
6515 ensure_cluster_is_not_managed(scx, cluster.id())?;
6516 Ok(Plan::AlterClusterReplicaRename(
6517 AlterClusterReplicaRenamePlan {
6518 cluster_id: cluster.id(),
6519 replica_id: replica,
6520 name: QualifiedReplica {
6521 cluster: Ident::new(cluster.name())?,
6522 replica: name.replica,
6523 },
6524 to_name: normalize::ident(to_item_name),
6525 },
6526 ))
6527 }
6528 None => {
6529 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6530 name: name.to_ast_string_simple(),
6531 object_type,
6532 });
6533
6534 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6535 }
6536 }
6537}
6538
6539pub fn describe_alter_object_swap(
6540 _: &StatementContext,
6541 _: AlterObjectSwapStatement,
6542) -> Result<StatementDesc, PlanError> {
6543 Ok(StatementDesc::new(None))
6544}
6545
6546pub fn plan_alter_object_swap(
6547 scx: &mut StatementContext,
6548 stmt: AlterObjectSwapStatement,
6549) -> Result<Plan, PlanError> {
6550 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6551
6552 let AlterObjectSwapStatement {
6553 object_type,
6554 name_a,
6555 name_b,
6556 } = stmt;
6557 let object_type = object_type.into();
6558
6559 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6561 let mut attempts = 0;
6562 let name_temp = loop {
6563 attempts += 1;
6564 if attempts > 10 {
6565 tracing::warn!("Unable to generate temp id for swapping");
6566 sql_bail!("unable to swap!");
6567 }
6568
6569 let short_id = mz_ore::id_gen::temp_id();
6571 if check_fn(&short_id) {
6572 break short_id;
6573 }
6574 };
6575
6576 Ok(name_temp)
6577 };
6578
6579 match (object_type, name_a, name_b) {
6580 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6581 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6582 }
6583 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6584 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6585 }
6586 (object_type, _, _) => Err(PlanError::Unsupported {
6587 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6588 discussion_no: None,
6589 }),
6590 }
6591}
6592
6593pub fn describe_alter_retain_history(
6594 _: &StatementContext,
6595 _: AlterRetainHistoryStatement<Aug>,
6596) -> Result<StatementDesc, PlanError> {
6597 Ok(StatementDesc::new(None))
6598}
6599
6600pub fn plan_alter_retain_history(
6601 scx: &StatementContext,
6602 AlterRetainHistoryStatement {
6603 object_type,
6604 if_exists,
6605 name,
6606 history,
6607 }: AlterRetainHistoryStatement<Aug>,
6608) -> Result<Plan, PlanError> {
6609 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6610}
6611
6612fn alter_retain_history(
6613 scx: &StatementContext,
6614 object_type: ObjectType,
6615 if_exists: bool,
6616 name: UnresolvedObjectName,
6617 history: Option<WithOptionValue<Aug>>,
6618) -> Result<Plan, PlanError> {
6619 let name = match (object_type, name) {
6620 (
6621 ObjectType::View
6623 | ObjectType::MaterializedView
6624 | ObjectType::Table
6625 | ObjectType::Source
6626 | ObjectType::Index,
6627 UnresolvedObjectName::Item(name),
6628 ) => name,
6629 (object_type, _) => {
6630 sql_bail!("{object_type} does not support RETAIN HISTORY")
6631 }
6632 };
6633 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6634 Some(entry) => {
6635 let full_name = scx.catalog.resolve_full_name(entry.name());
6636 let item_type = entry.item_type();
6637
6638 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6640 return Err(PlanError::AlterViewOnMaterializedView(
6641 full_name.to_string(),
6642 ));
6643 } else if object_type == ObjectType::View {
6644 sql_bail!("{object_type} does not support RETAIN HISTORY")
6645 } else if object_type != item_type {
6646 sql_bail!(
6647 "\"{}\" is a {} not a {}",
6648 full_name,
6649 entry.item_type(),
6650 format!("{object_type}").to_lowercase()
6651 )
6652 }
6653
6654 let (value, lcw) = match &history {
6656 Some(WithOptionValue::RetainHistoryFor(value)) => {
6657 let window = OptionalDuration::try_from_value(value.clone())?;
6658 (Some(value.clone()), window.0)
6659 }
6660 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6662 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6663 };
6664 let window = plan_retain_history(scx, lcw)?;
6665
6666 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6667 id: entry.id(),
6668 value,
6669 window,
6670 object_type,
6671 }))
6672 }
6673 None => {
6674 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6675 name: name.to_ast_string_simple(),
6676 object_type,
6677 });
6678
6679 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6680 }
6681 }
6682}
6683
6684pub fn describe_alter_secret_options(
6685 _: &StatementContext,
6686 _: AlterSecretStatement<Aug>,
6687) -> Result<StatementDesc, PlanError> {
6688 Ok(StatementDesc::new(None))
6689}
6690
6691pub fn plan_alter_secret(
6692 scx: &mut StatementContext,
6693 stmt: AlterSecretStatement<Aug>,
6694) -> Result<Plan, PlanError> {
6695 let AlterSecretStatement {
6696 name,
6697 if_exists,
6698 value,
6699 } = stmt;
6700 let object_type = ObjectType::Secret;
6701 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6702 Some(entry) => entry.id(),
6703 None => {
6704 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6705 name: name.to_string(),
6706 object_type,
6707 });
6708
6709 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6710 }
6711 };
6712
6713 let secret_as = query::plan_secret_as(scx, value)?;
6714
6715 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6716}
6717
6718pub fn describe_alter_connection(
6719 _: &StatementContext,
6720 _: AlterConnectionStatement<Aug>,
6721) -> Result<StatementDesc, PlanError> {
6722 Ok(StatementDesc::new(None))
6723}
6724
6725generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6726
6727pub fn plan_alter_connection(
6728 scx: &StatementContext,
6729 stmt: AlterConnectionStatement<Aug>,
6730) -> Result<Plan, PlanError> {
6731 let AlterConnectionStatement {
6732 name,
6733 if_exists,
6734 actions,
6735 with_options,
6736 } = stmt;
6737 let conn_name = normalize::unresolved_item_name(name)?;
6738 let entry = match scx.catalog.resolve_item(&conn_name) {
6739 Ok(entry) => entry,
6740 Err(_) if if_exists => {
6741 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6742 name: conn_name.to_string(),
6743 object_type: ObjectType::Sink,
6744 });
6745
6746 return Ok(Plan::AlterNoop(AlterNoopPlan {
6747 object_type: ObjectType::Connection,
6748 }));
6749 }
6750 Err(e) => return Err(e.into()),
6751 };
6752
6753 let connection = entry.connection()?;
6754
6755 if actions
6756 .iter()
6757 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6758 {
6759 if actions.len() > 1 {
6760 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6761 }
6762
6763 if !with_options.is_empty() {
6764 sql_bail!(
6765 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6766 with_options
6767 .iter()
6768 .map(|o| o.to_ast_string_simple())
6769 .join(", ")
6770 );
6771 }
6772
6773 if !matches!(connection, Connection::Ssh(_)) {
6774 sql_bail!(
6775 "{} is not an SSH connection",
6776 scx.catalog.resolve_full_name(entry.name())
6777 )
6778 }
6779
6780 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6781 id: entry.id(),
6782 action: crate::plan::AlterConnectionAction::RotateKeys,
6783 }));
6784 }
6785
6786 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6787 if options.validate.is_some() {
6788 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6789 }
6790
6791 let validate = match options.validate {
6792 Some(val) => val,
6793 None => {
6794 scx.catalog
6795 .system_vars()
6796 .enable_default_connection_validation()
6797 && connection.validate_by_default()
6798 }
6799 };
6800
6801 let connection_type = match connection {
6802 Connection::Aws(_) => CreateConnectionType::Aws,
6803 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6804 Connection::Kafka(_) => CreateConnectionType::Kafka,
6805 Connection::Csr(_) => CreateConnectionType::Csr,
6806 Connection::Postgres(_) => CreateConnectionType::Postgres,
6807 Connection::Ssh(_) => CreateConnectionType::Ssh,
6808 Connection::MySql(_) => CreateConnectionType::MySql,
6809 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6810 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6811 };
6812
6813 let specified_options: BTreeSet<_> = actions
6815 .iter()
6816 .map(|action: &AlterConnectionAction<Aug>| match action {
6817 AlterConnectionAction::SetOption(option) => option.name.clone(),
6818 AlterConnectionAction::DropOption(name) => name.clone(),
6819 AlterConnectionAction::RotateKeys => unreachable!(),
6820 })
6821 .collect();
6822
6823 for invalid in INALTERABLE_OPTIONS {
6824 if specified_options.contains(invalid) {
6825 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6826 }
6827 }
6828
6829 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6830
6831 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6833 actions.into_iter().partition_map(|action| match action {
6834 AlterConnectionAction::SetOption(option) => Either::Left(option),
6835 AlterConnectionAction::DropOption(name) => Either::Right(name),
6836 AlterConnectionAction::RotateKeys => unreachable!(),
6837 });
6838
6839 let set_options: BTreeMap<_, _> = set_options_vec
6840 .clone()
6841 .into_iter()
6842 .map(|option| (option.name, option.value))
6843 .collect();
6844
6845 let connection_options_extracted =
6849 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6850
6851 let duplicates: Vec<_> = connection_options_extracted
6852 .seen
6853 .intersection(&drop_options)
6854 .collect();
6855
6856 if !duplicates.is_empty() {
6857 sql_bail!(
6858 "cannot both SET and DROP/RESET options {}",
6859 duplicates
6860 .iter()
6861 .map(|option| option.to_string())
6862 .join(", ")
6863 )
6864 }
6865
6866 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6867 let set_options_count = mutually_exclusive_options
6868 .iter()
6869 .filter(|o| set_options.contains_key(o))
6870 .count();
6871 let drop_options_count = mutually_exclusive_options
6872 .iter()
6873 .filter(|o| drop_options.contains(o))
6874 .count();
6875
6876 if set_options_count > 0 && drop_options_count > 0 {
6878 sql_bail!(
6879 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6880 connection_type,
6881 mutually_exclusive_options
6882 .iter()
6883 .map(|option| option.to_string())
6884 .join(", ")
6885 )
6886 }
6887
6888 if set_options_count > 0 || drop_options_count > 0 {
6893 drop_options.extend(mutually_exclusive_options.iter().cloned());
6894 }
6895
6896 }
6899
6900 Ok(Plan::AlterConnection(AlterConnectionPlan {
6901 id: entry.id(),
6902 action: crate::plan::AlterConnectionAction::AlterOptions {
6903 set_options,
6904 drop_options,
6905 validate,
6906 },
6907 }))
6908}
6909
6910pub fn describe_alter_sink(
6911 _: &StatementContext,
6912 _: AlterSinkStatement<Aug>,
6913) -> Result<StatementDesc, PlanError> {
6914 Ok(StatementDesc::new(None))
6915}
6916
6917pub fn plan_alter_sink(
6918 scx: &mut StatementContext,
6919 stmt: AlterSinkStatement<Aug>,
6920) -> Result<Plan, PlanError> {
6921 let AlterSinkStatement {
6922 sink_name,
6923 if_exists,
6924 action,
6925 } = stmt;
6926
6927 let object_type = ObjectType::Sink;
6928 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6929
6930 let Some(item) = item else {
6931 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6932 name: sink_name.to_string(),
6933 object_type,
6934 });
6935
6936 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6937 };
6938 let item = item.at_version(RelationVersionSelector::Latest);
6940
6941 match action {
6942 AlterSinkAction::ChangeRelation(new_from) => {
6943 let create_sql = item.create_sql();
6945 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6946 let [stmt]: [StatementParseResult; 1] = stmts
6947 .try_into()
6948 .expect("create sql of sink was not exactly one statement");
6949 let Statement::CreateSink(mut stmt) = stmt.ast else {
6950 unreachable!("invalid create SQL for sink item");
6951 };
6952
6953 let cur_version = stmt
6955 .with_options
6956 .extract_if(.., |o| o.name == CreateSinkOptionName::Version)
6957 .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
6958 .max()
6959 .unwrap_or(0);
6960 let new_version = cur_version + 1;
6961 stmt.with_options.push(CreateSinkOption {
6962 name: CreateSinkOptionName::Version,
6963 value: Some(WithOptionValue::Value(Value::Number(
6964 new_version.to_string(),
6965 ))),
6966 });
6967
6968 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
6970 stmt.from = new_from;
6971
6972 let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
6974 unreachable!("invalid plan for CREATE SINK statement");
6975 };
6976
6977 Ok(Plan::AlterSink(AlterSinkPlan {
6978 item_id: item.id(),
6979 global_id: item.global_id(),
6980 sink: plan.sink,
6981 with_snapshot: plan.with_snapshot,
6982 in_cluster: plan.in_cluster,
6983 }))
6984 }
6985 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
6986 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
6987 }
6988}
6989
6990pub fn describe_alter_source(
6991 _: &StatementContext,
6992 _: AlterSourceStatement<Aug>,
6993) -> Result<StatementDesc, PlanError> {
6994 Ok(StatementDesc::new(None))
6996}
6997
6998generate_extracted_config!(
6999 AlterSourceAddSubsourceOption,
7000 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7001 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
7002 (Details, String)
7003);
7004
7005pub fn plan_alter_source(
7006 scx: &mut StatementContext,
7007 stmt: AlterSourceStatement<Aug>,
7008) -> Result<Plan, PlanError> {
7009 let AlterSourceStatement {
7010 source_name,
7011 if_exists,
7012 action,
7013 } = stmt;
7014 let object_type = ObjectType::Source;
7015
7016 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7017 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7018 name: source_name.to_string(),
7019 object_type,
7020 });
7021
7022 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7023 }
7024
7025 match action {
7026 AlterSourceAction::SetOptions(options) => {
7027 let mut options = options.into_iter();
7028 let option = options.next().unwrap();
7029 if option.name == CreateSourceOptionName::RetainHistory {
7030 if options.next().is_some() {
7031 sql_bail!("RETAIN HISTORY must be only option");
7032 }
7033 return alter_retain_history(
7034 scx,
7035 object_type,
7036 if_exists,
7037 UnresolvedObjectName::Item(source_name),
7038 option.value,
7039 );
7040 }
7041 sql_bail!(
7044 "Cannot modify the {} of a SOURCE.",
7045 option.name.to_ast_string_simple()
7046 );
7047 }
7048 AlterSourceAction::ResetOptions(reset) => {
7049 let mut options = reset.into_iter();
7050 let option = options.next().unwrap();
7051 if option == CreateSourceOptionName::RetainHistory {
7052 if options.next().is_some() {
7053 sql_bail!("RETAIN HISTORY must be only option");
7054 }
7055 return alter_retain_history(
7056 scx,
7057 object_type,
7058 if_exists,
7059 UnresolvedObjectName::Item(source_name),
7060 None,
7061 );
7062 }
7063 sql_bail!(
7064 "Cannot modify the {} of a SOURCE.",
7065 option.to_ast_string_simple()
7066 );
7067 }
7068 AlterSourceAction::DropSubsources { .. } => {
7069 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7070 }
7071 AlterSourceAction::AddSubsources { .. } => {
7072 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7073 }
7074 AlterSourceAction::RefreshReferences => {
7075 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7076 }
7077 };
7078}
7079
7080pub fn describe_alter_system_set(
7081 _: &StatementContext,
7082 _: AlterSystemSetStatement,
7083) -> Result<StatementDesc, PlanError> {
7084 Ok(StatementDesc::new(None))
7085}
7086
7087pub fn plan_alter_system_set(
7088 _: &StatementContext,
7089 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7090) -> Result<Plan, PlanError> {
7091 let name = name.to_string();
7092 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7093 name,
7094 value: scl::plan_set_variable_to(to)?,
7095 }))
7096}
7097
7098pub fn describe_alter_system_reset(
7099 _: &StatementContext,
7100 _: AlterSystemResetStatement,
7101) -> Result<StatementDesc, PlanError> {
7102 Ok(StatementDesc::new(None))
7103}
7104
7105pub fn plan_alter_system_reset(
7106 _: &StatementContext,
7107 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7108) -> Result<Plan, PlanError> {
7109 let name = name.to_string();
7110 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7111}
7112
7113pub fn describe_alter_system_reset_all(
7114 _: &StatementContext,
7115 _: AlterSystemResetAllStatement,
7116) -> Result<StatementDesc, PlanError> {
7117 Ok(StatementDesc::new(None))
7118}
7119
7120pub fn plan_alter_system_reset_all(
7121 _: &StatementContext,
7122 _: AlterSystemResetAllStatement,
7123) -> Result<Plan, PlanError> {
7124 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7125}
7126
7127pub fn describe_alter_role(
7128 _: &StatementContext,
7129 _: AlterRoleStatement<Aug>,
7130) -> Result<StatementDesc, PlanError> {
7131 Ok(StatementDesc::new(None))
7132}
7133
7134pub fn plan_alter_role(
7135 _scx: &StatementContext,
7136 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7137) -> Result<Plan, PlanError> {
7138 let option = match option {
7139 AlterRoleOption::Attributes(attrs) => {
7140 let attrs = plan_role_attributes(attrs)?;
7141 PlannedAlterRoleOption::Attributes(attrs)
7142 }
7143 AlterRoleOption::Variable(variable) => {
7144 let var = plan_role_variable(variable)?;
7145 PlannedAlterRoleOption::Variable(var)
7146 }
7147 };
7148
7149 Ok(Plan::AlterRole(AlterRolePlan {
7150 id: name.id,
7151 name: name.name,
7152 option,
7153 }))
7154}
7155
7156pub fn describe_alter_table_add_column(
7157 _: &StatementContext,
7158 _: AlterTableAddColumnStatement<Aug>,
7159) -> Result<StatementDesc, PlanError> {
7160 Ok(StatementDesc::new(None))
7161}
7162
7163pub fn plan_alter_table_add_column(
7164 scx: &StatementContext,
7165 stmt: AlterTableAddColumnStatement<Aug>,
7166) -> Result<Plan, PlanError> {
7167 let AlterTableAddColumnStatement {
7168 if_exists,
7169 name,
7170 if_col_not_exist,
7171 column_name,
7172 data_type,
7173 } = stmt;
7174 let object_type = ObjectType::Table;
7175
7176 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7177
7178 let (relation_id, item_name, desc) =
7179 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7180 Some(item) => {
7181 let item_name = scx.catalog.resolve_full_name(item.name());
7183 let item = item.at_version(RelationVersionSelector::Latest);
7184 let desc = item.desc(&item_name)?.into_owned();
7185 (item.id(), item_name, desc)
7186 }
7187 None => {
7188 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7189 name: name.to_ast_string_simple(),
7190 object_type,
7191 });
7192 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7193 }
7194 };
7195
7196 let column_name = ColumnName::from(column_name.as_str());
7197 if desc.get_by_name(&column_name).is_some() {
7198 if if_col_not_exist {
7199 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7200 column_name: column_name.to_string(),
7201 object_name: item_name.item,
7202 });
7203 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7204 } else {
7205 return Err(PlanError::ColumnAlreadyExists {
7206 column_name,
7207 object_name: item_name.item,
7208 });
7209 }
7210 }
7211
7212 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7213 let column_type = scalar_type.nullable(true);
7215 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7217
7218 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7219 relation_id,
7220 column_name,
7221 column_type,
7222 raw_sql_type,
7223 }))
7224}
7225
7226pub fn describe_comment(
7227 _: &StatementContext,
7228 _: CommentStatement<Aug>,
7229) -> Result<StatementDesc, PlanError> {
7230 Ok(StatementDesc::new(None))
7231}
7232
7233pub fn plan_comment(
7234 scx: &mut StatementContext,
7235 stmt: CommentStatement<Aug>,
7236) -> Result<Plan, PlanError> {
7237 const MAX_COMMENT_LENGTH: usize = 1024;
7238
7239 let CommentStatement { object, comment } = stmt;
7240
7241 if let Some(c) = &comment {
7243 if c.len() > 1024 {
7244 return Err(PlanError::CommentTooLong {
7245 length: c.len(),
7246 max_size: MAX_COMMENT_LENGTH,
7247 });
7248 }
7249 }
7250
7251 let (object_id, column_pos) = match &object {
7252 com_ty @ CommentObjectType::Table { name }
7253 | com_ty @ CommentObjectType::View { name }
7254 | com_ty @ CommentObjectType::MaterializedView { name }
7255 | com_ty @ CommentObjectType::Index { name }
7256 | com_ty @ CommentObjectType::Func { name }
7257 | com_ty @ CommentObjectType::Connection { name }
7258 | com_ty @ CommentObjectType::Source { name }
7259 | com_ty @ CommentObjectType::Sink { name }
7260 | com_ty @ CommentObjectType::Secret { name }
7261 | com_ty @ CommentObjectType::ContinualTask { name } => {
7262 let item = scx.get_item_by_resolved_name(name)?;
7263 match (com_ty, item.item_type()) {
7264 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7265 (CommentObjectId::Table(item.id()), None)
7266 }
7267 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7268 (CommentObjectId::View(item.id()), None)
7269 }
7270 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7271 (CommentObjectId::MaterializedView(item.id()), None)
7272 }
7273 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7274 (CommentObjectId::Index(item.id()), None)
7275 }
7276 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7277 (CommentObjectId::Func(item.id()), None)
7278 }
7279 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7280 (CommentObjectId::Connection(item.id()), None)
7281 }
7282 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7283 (CommentObjectId::Source(item.id()), None)
7284 }
7285 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7286 (CommentObjectId::Sink(item.id()), None)
7287 }
7288 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7289 (CommentObjectId::Secret(item.id()), None)
7290 }
7291 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7292 (CommentObjectId::ContinualTask(item.id()), None)
7293 }
7294 (com_ty, cat_ty) => {
7295 let expected_type = match com_ty {
7296 CommentObjectType::Table { .. } => ObjectType::Table,
7297 CommentObjectType::View { .. } => ObjectType::View,
7298 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7299 CommentObjectType::Index { .. } => ObjectType::Index,
7300 CommentObjectType::Func { .. } => ObjectType::Func,
7301 CommentObjectType::Connection { .. } => ObjectType::Connection,
7302 CommentObjectType::Source { .. } => ObjectType::Source,
7303 CommentObjectType::Sink { .. } => ObjectType::Sink,
7304 CommentObjectType::Secret { .. } => ObjectType::Secret,
7305 _ => unreachable!("these are the only types we match on"),
7306 };
7307
7308 return Err(PlanError::InvalidObjectType {
7309 expected_type: SystemObjectType::Object(expected_type),
7310 actual_type: SystemObjectType::Object(cat_ty.into()),
7311 object_name: item.name().item.clone(),
7312 });
7313 }
7314 }
7315 }
7316 CommentObjectType::Type { ty } => match ty {
7317 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7318 sql_bail!("cannot comment on anonymous list or map type");
7319 }
7320 ResolvedDataType::Named { id, modifiers, .. } => {
7321 if !modifiers.is_empty() {
7322 sql_bail!("cannot comment on type with modifiers");
7323 }
7324 (CommentObjectId::Type(*id), None)
7325 }
7326 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7327 },
7328 CommentObjectType::Column { name } => {
7329 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7330 match item.item_type() {
7331 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7332 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7333 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7334 CatalogItemType::MaterializedView => {
7335 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7336 }
7337 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7338 r => {
7339 return Err(PlanError::Unsupported {
7340 feature: format!("Specifying comments on a column of {r}"),
7341 discussion_no: None,
7342 });
7343 }
7344 }
7345 }
7346 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7347 CommentObjectType::Database { name } => {
7348 (CommentObjectId::Database(*name.database_id()), None)
7349 }
7350 CommentObjectType::Schema { name } => (
7351 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7352 None,
7353 ),
7354 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7355 CommentObjectType::ClusterReplica { name } => {
7356 let replica = scx.catalog.resolve_cluster_replica(name)?;
7357 (
7358 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7359 None,
7360 )
7361 }
7362 CommentObjectType::NetworkPolicy { name } => {
7363 (CommentObjectId::NetworkPolicy(name.id), None)
7364 }
7365 };
7366
7367 if let Some(p) = column_pos {
7373 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7374 max_num_columns: MAX_NUM_COLUMNS,
7375 req_num_columns: p,
7376 })?;
7377 }
7378
7379 Ok(Plan::Comment(CommentPlan {
7380 object_id,
7381 sub_component: column_pos,
7382 comment,
7383 }))
7384}
7385
7386pub(crate) fn resolve_cluster<'a>(
7387 scx: &'a StatementContext,
7388 name: &'a Ident,
7389 if_exists: bool,
7390) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7391 match scx.resolve_cluster(Some(name)) {
7392 Ok(cluster) => Ok(Some(cluster)),
7393 Err(_) if if_exists => Ok(None),
7394 Err(e) => Err(e),
7395 }
7396}
7397
7398pub(crate) fn resolve_cluster_replica<'a>(
7399 scx: &'a StatementContext,
7400 name: &QualifiedReplica,
7401 if_exists: bool,
7402) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7403 match scx.resolve_cluster(Some(&name.cluster)) {
7404 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7405 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7406 None if if_exists => Ok(None),
7407 None => Err(sql_err!(
7408 "CLUSTER {} has no CLUSTER REPLICA named {}",
7409 cluster.name(),
7410 name.replica.as_str().quoted(),
7411 )),
7412 },
7413 Err(_) if if_exists => Ok(None),
7414 Err(e) => Err(e),
7415 }
7416}
7417
7418pub(crate) fn resolve_database<'a>(
7419 scx: &'a StatementContext,
7420 name: &'a UnresolvedDatabaseName,
7421 if_exists: bool,
7422) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7423 match scx.resolve_database(name) {
7424 Ok(database) => Ok(Some(database)),
7425 Err(_) if if_exists => Ok(None),
7426 Err(e) => Err(e),
7427 }
7428}
7429
7430pub(crate) fn resolve_schema<'a>(
7431 scx: &'a StatementContext,
7432 name: UnresolvedSchemaName,
7433 if_exists: bool,
7434) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7435 match scx.resolve_schema(name) {
7436 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7437 Err(_) if if_exists => Ok(None),
7438 Err(e) => Err(e),
7439 }
7440}
7441
7442pub(crate) fn resolve_network_policy<'a>(
7443 scx: &'a StatementContext,
7444 name: Ident,
7445 if_exists: bool,
7446) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7447 match scx.catalog.resolve_network_policy(&name.to_string()) {
7448 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7449 id: policy.id(),
7450 name: policy.name().to_string(),
7451 })),
7452 Err(_) if if_exists => Ok(None),
7453 Err(e) => Err(e.into()),
7454 }
7455}
7456
7457pub(crate) fn resolve_item_or_type<'a>(
7458 scx: &'a StatementContext,
7459 object_type: ObjectType,
7460 name: UnresolvedItemName,
7461 if_exists: bool,
7462) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7463 let name = normalize::unresolved_item_name(name)?;
7464 let catalog_item = match object_type {
7465 ObjectType::Type => scx.catalog.resolve_type(&name),
7466 _ => scx.catalog.resolve_item(&name),
7467 };
7468
7469 match catalog_item {
7470 Ok(item) => {
7471 let is_type = ObjectType::from(item.item_type());
7472 if object_type == is_type {
7473 Ok(Some(item))
7474 } else {
7475 Err(PlanError::MismatchedObjectType {
7476 name: scx.catalog.minimal_qualification(item.name()),
7477 is_type,
7478 expected_type: object_type,
7479 })
7480 }
7481 }
7482 Err(_) if if_exists => Ok(None),
7483 Err(e) => Err(e.into()),
7484 }
7485}
7486
7487fn ensure_cluster_is_not_managed(
7489 scx: &StatementContext,
7490 cluster_id: ClusterId,
7491) -> Result<(), PlanError> {
7492 let cluster = scx.catalog.get_cluster(cluster_id);
7493 if cluster.is_managed() {
7494 Err(PlanError::ManagedCluster {
7495 cluster_name: cluster.name().to_string(),
7496 })
7497 } else {
7498 Ok(())
7499 }
7500}