1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Write;
17use std::iter;
18use std::time::Duration;
19
20use itertools::{Either, Itertools};
21use mz_adapter_types::compaction::{CompactionWindow, DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION};
22use mz_adapter_types::dyncfgs::ENABLE_MULTI_REPLICA_SOURCES;
23use mz_auth::password::Password;
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
25use mz_expr::{CollectionPlan, UnmaterializableFunc};
26use mz_interchange::avro::{AvroSchemaGenerator, DocTarget};
27use mz_ore::cast::{CastFrom, TryCastFrom};
28use mz_ore::collections::{CollectionExt, HashSet};
29use mz_ore::num::NonNeg;
30use mz_ore::soft_panic_or_log;
31use mz_ore::str::StrExt;
32use mz_postgres_util::tunnel::PostgresFlavor;
33use mz_proto::RustType;
34use mz_repr::adt::interval::Interval;
35use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
36use mz_repr::network_policy_id::NetworkPolicyId;
37use mz_repr::optimize::OptimizerFeatureOverrides;
38use mz_repr::refresh_schedule::{RefreshEvery, RefreshSchedule};
39use mz_repr::role_id::RoleId;
40use mz_repr::{
41 CatalogItemId, ColumnName, ColumnType, RelationDesc, RelationType, RelationVersion,
42 RelationVersionSelector, ScalarType, Timestamp, VersionedRelationDesc, preserves_order,
43 strconv,
44};
45use mz_sql_parser::ast::{
46 self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption,
47 AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement,
48 AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement,
49 AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement,
50 AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction,
51 AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement,
52 AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement,
53 AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName,
54 ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue,
55 ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature,
56 ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef,
57 ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName,
58 ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement,
59 CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName,
60 CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement,
61 CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement,
62 CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement,
63 CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName,
64 CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName,
65 CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName,
66 CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs,
67 CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName,
68 CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption,
69 CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf,
70 CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement,
71 DropOwnedStatement, Expr, Format, FormatSpecifier, Ident, IfExistsBehavior, IndexOption,
72 IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption,
73 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
74 MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName,
75 NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName,
76 PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue,
77 RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption,
78 ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata,
79 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint,
80 TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption,
81 TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName,
82 UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue,
83};
84use mz_sql_parser::ident;
85use mz_sql_parser::parser::StatementParseResult;
86use mz_storage_types::connections::inline::{ConnectionAccess, ReferencedConnection};
87use mz_storage_types::connections::{Connection, KafkaTopicOptions};
88use mz_storage_types::sinks::{
89 KafkaIdStyle, KafkaSinkConnection, KafkaSinkFormat, KafkaSinkFormatType, SinkEnvelope,
90 StorageSinkConnection,
91};
92use mz_storage_types::sources::encoding::{
93 AvroEncoding, ColumnSpec, CsvEncoding, DataEncoding, ProtobufEncoding, RegexEncoding,
94 SourceDataEncoding, included_column_desc,
95};
96use mz_storage_types::sources::envelope::{
97 KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle,
98};
99use mz_storage_types::sources::kafka::{
100 KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc,
101};
102use mz_storage_types::sources::load_generator::{
103 KeyValueLoadGenerator, LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT, LoadGenerator,
104 LoadGeneratorSourceConnection, LoadGeneratorSourceExportDetails,
105};
106use mz_storage_types::sources::mysql::{
107 MySqlSourceConnection, MySqlSourceDetails, ProtoMySqlSourceDetails,
108};
109use mz_storage_types::sources::postgres::{
110 PostgresSourceConnection, PostgresSourcePublicationDetails,
111 ProtoPostgresSourcePublicationDetails,
112};
113use mz_storage_types::sources::sql_server::{
114 ProtoSqlServerSourceExtras, SqlServerSourceExportDetails,
115};
116use mz_storage_types::sources::{
117 GenericSourceConnection, MySqlSourceExportDetails, PostgresSourceExportDetails,
118 ProtoSourceExportStatementDetails, SourceConnection, SourceDesc, SourceExportDataConfig,
119 SourceExportDetails, SourceExportStatementDetails, SqlServerSource, SqlServerSourceExtras,
120 Timeline,
121};
122use prost::Message;
123
124use crate::ast::display::AstDisplay;
125use crate::catalog::{
126 CatalogCluster, CatalogDatabase, CatalogError, CatalogItem, CatalogItemType,
127 CatalogRecordField, CatalogType, CatalogTypeDetails, ObjectType, SystemObjectType,
128};
129use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
130use crate::names::{
131 Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
132 ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
133 ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
134};
135use crate::normalize::{self, ident};
136use crate::plan::error::PlanError;
137use crate::plan::query::{
138 CteDesc, ExprContext, QueryLifetime, cast_relation, plan_expr, scalar_type_from_catalog,
139 scalar_type_from_sql,
140};
141use crate::plan::scope::Scope;
142use crate::plan::statement::ddl::connection::{INALTERABLE_OPTIONS, MUTUALLY_EXCLUSIVE_SETS};
143use crate::plan::statement::{StatementContext, StatementDesc, scl};
144use crate::plan::typeconv::CastContext;
145use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue};
146use crate::plan::{
147 AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan,
148 AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan,
149 AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan,
150 AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan,
151 AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan,
152 AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig,
153 ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan,
154 CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant,
155 CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan,
156 CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan,
157 CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan,
158 CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index,
159 Ingestion, MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction,
160 NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice, PolicyAddress, QueryContext,
161 ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, VariableValue, View,
162 WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal,
163 plan_utils, query, transform_ast,
164};
165use crate::session::vars::{
166 self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY,
167 ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_REFRESH_EVERY_MVS,
168};
169use crate::{names, parse};
170
171mod connection;
172
173const MAX_NUM_COLUMNS: usize = 256;
178
179static MANAGED_REPLICA_PATTERN: std::sync::LazyLock<regex::Regex> =
180 std::sync::LazyLock::new(|| regex::Regex::new(r"^r(\d)+$").unwrap());
181
182fn check_partition_by(desc: &RelationDesc, partition_by: Vec<Ident>) -> Result<(), PlanError> {
186 for (idx, ((desc_name, desc_type), partition_name)) in
187 desc.iter().zip(partition_by.into_iter()).enumerate()
188 {
189 let partition_name = normalize::column_name(partition_name);
190 if *desc_name != partition_name {
191 sql_bail!(
192 "PARTITION BY columns should be a prefix of the relation's columns (expected {desc_name} at index {idx}, got {partition_name})"
193 );
194 }
195 if !preserves_order(&desc_type.scalar_type) {
196 sql_bail!("PARTITION BY column {partition_name} has unsupported type");
197 }
198 }
199 Ok(())
200}
201
202pub fn describe_create_database(
203 _: &StatementContext,
204 _: CreateDatabaseStatement,
205) -> Result<StatementDesc, PlanError> {
206 Ok(StatementDesc::new(None))
207}
208
209pub fn plan_create_database(
210 _: &StatementContext,
211 CreateDatabaseStatement {
212 name,
213 if_not_exists,
214 }: CreateDatabaseStatement,
215) -> Result<Plan, PlanError> {
216 Ok(Plan::CreateDatabase(CreateDatabasePlan {
217 name: normalize::ident(name.0),
218 if_not_exists,
219 }))
220}
221
222pub fn describe_create_schema(
223 _: &StatementContext,
224 _: CreateSchemaStatement,
225) -> Result<StatementDesc, PlanError> {
226 Ok(StatementDesc::new(None))
227}
228
229pub fn plan_create_schema(
230 scx: &StatementContext,
231 CreateSchemaStatement {
232 mut name,
233 if_not_exists,
234 }: CreateSchemaStatement,
235) -> Result<Plan, PlanError> {
236 if name.0.len() > 2 {
237 sql_bail!("schema name {} has more than two components", name);
238 }
239 let schema_name = normalize::ident(
240 name.0
241 .pop()
242 .expect("names always have at least one component"),
243 );
244 let database_spec = match name.0.pop() {
245 None => match scx.catalog.active_database() {
246 Some(id) => ResolvedDatabaseSpecifier::Id(id.clone()),
247 None => sql_bail!("no database specified and no active database"),
248 },
249 Some(n) => match scx.resolve_database(&UnresolvedDatabaseName(n.clone())) {
250 Ok(database) => ResolvedDatabaseSpecifier::Id(database.id()),
251 Err(_) => sql_bail!("invalid database {}", n.as_str()),
252 },
253 };
254 Ok(Plan::CreateSchema(CreateSchemaPlan {
255 database_spec,
256 schema_name,
257 if_not_exists,
258 }))
259}
260
261pub fn describe_create_table(
262 _: &StatementContext,
263 _: CreateTableStatement<Aug>,
264) -> Result<StatementDesc, PlanError> {
265 Ok(StatementDesc::new(None))
266}
267
268pub fn plan_create_table(
269 scx: &StatementContext,
270 stmt: CreateTableStatement<Aug>,
271) -> Result<Plan, PlanError> {
272 let CreateTableStatement {
273 name,
274 columns,
275 constraints,
276 if_not_exists,
277 temporary,
278 with_options,
279 } = &stmt;
280
281 let names: Vec<_> = columns
282 .iter()
283 .filter(|c| {
284 let is_versioned = c
288 .options
289 .iter()
290 .any(|o| matches!(o.option, ColumnOption::Versioned { .. }));
291 !is_versioned
292 })
293 .map(|c| normalize::column_name(c.name.clone()))
294 .collect();
295
296 if let Some(dup) = names.iter().duplicates().next() {
297 sql_bail!("column {} specified more than once", dup.quoted());
298 }
299
300 let mut column_types = Vec::with_capacity(columns.len());
303 let mut defaults = Vec::with_capacity(columns.len());
304 let mut changes = BTreeMap::new();
305 let mut keys = Vec::new();
306
307 for (i, c) in columns.into_iter().enumerate() {
308 let aug_data_type = &c.data_type;
309 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
310 let mut nullable = true;
311 let mut default = Expr::null();
312 let mut versioned = false;
313 for option in &c.options {
314 match &option.option {
315 ColumnOption::NotNull => nullable = false,
316 ColumnOption::Default(expr) => {
317 let mut expr = expr.clone();
320 transform_ast::transform(scx, &mut expr)?;
321 let _ = query::plan_default_expr(scx, &expr, &ty)?;
322 default = expr.clone();
323 }
324 ColumnOption::Unique { is_primary } => {
325 keys.push(vec![i]);
326 if *is_primary {
327 nullable = false;
328 }
329 }
330 ColumnOption::Versioned { action, version } => {
331 let version = RelationVersion::from(*version);
332 versioned = true;
333
334 let name = normalize::column_name(c.name.clone());
335 let typ = ty.clone().nullable(nullable);
336
337 changes.insert(version, (action.clone(), name, typ));
338 }
339 other => {
340 bail_unsupported!(format!("CREATE TABLE with column constraint: {}", other))
341 }
342 }
343 }
344 if !versioned {
347 column_types.push(ty.nullable(nullable));
348 }
349 defaults.push(default);
350 }
351
352 let mut seen_primary = false;
353 'c: for constraint in constraints {
354 match constraint {
355 TableConstraint::Unique {
356 name: _,
357 columns,
358 is_primary,
359 nulls_not_distinct,
360 } => {
361 if seen_primary && *is_primary {
362 sql_bail!(
363 "multiple primary keys for table {} are not allowed",
364 name.to_ast_string_stable()
365 );
366 }
367 seen_primary = *is_primary || seen_primary;
368
369 let mut key = vec![];
370 for column in columns {
371 let column = normalize::column_name(column.clone());
372 match names.iter().position(|name| *name == column) {
373 None => sql_bail!("unknown column in constraint: {}", column),
374 Some(i) => {
375 let nullable = &mut column_types[i].nullable;
376 if *is_primary {
377 if *nulls_not_distinct {
378 sql_bail!(
379 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
380 );
381 }
382
383 *nullable = false;
384 } else if !(*nulls_not_distinct || !*nullable) {
385 break 'c;
388 }
389
390 key.push(i);
391 }
392 }
393 }
394
395 if *is_primary {
396 keys.insert(0, key);
397 } else {
398 keys.push(key);
399 }
400 }
401 TableConstraint::ForeignKey { .. } => {
402 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_FOREIGN_KEY)?
405 }
406 TableConstraint::Check { .. } => {
407 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_CHECK_CONSTRAINT)?
410 }
411 }
412 }
413
414 if !keys.is_empty() {
415 scx.require_feature_flag(&vars::UNSAFE_ENABLE_TABLE_KEYS)?
418 }
419
420 let typ = RelationType::new(column_types).with_keys(keys);
421
422 let temporary = *temporary;
423 let name = if temporary {
424 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
425 } else {
426 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
427 };
428
429 let full_name = scx.catalog.resolve_full_name(&name);
431 let partial_name = PartialItemName::from(full_name.clone());
432 if let (false, Ok(item)) = (
435 if_not_exists,
436 scx.catalog.resolve_item_or_type(&partial_name),
437 ) {
438 return Err(PlanError::ItemAlreadyExists {
439 name: full_name.to_string(),
440 item_type: item.item_type(),
441 });
442 }
443
444 let desc = RelationDesc::new(typ, names);
445 let mut desc = VersionedRelationDesc::new(desc);
446 for (version, (_action, name, typ)) in changes.into_iter() {
447 let new_version = desc.add_column(name, typ);
448 if version != new_version {
449 return Err(PlanError::InvalidTable {
450 name: full_name.item,
451 });
452 }
453 }
454
455 let create_sql = normalize::create_statement(scx, Statement::CreateTable(stmt.clone()))?;
456
457 let original_desc = desc.at_version(RelationVersionSelector::Specific(RelationVersion::root()));
463 let options = plan_table_options(scx, &original_desc, with_options.clone())?;
464
465 let compaction_window = options.iter().find_map(|o| {
466 #[allow(irrefutable_let_patterns)]
467 if let crate::plan::TableOption::RetainHistory(lcw) = o {
468 Some(lcw.clone())
469 } else {
470 None
471 }
472 });
473
474 let table = Table {
475 create_sql,
476 desc,
477 temporary,
478 compaction_window,
479 data_source: TableDataSource::TableWrites { defaults },
480 };
481 Ok(Plan::CreateTable(CreateTablePlan {
482 name,
483 table,
484 if_not_exists: *if_not_exists,
485 }))
486}
487
488pub fn describe_create_table_from_source(
489 _: &StatementContext,
490 _: CreateTableFromSourceStatement<Aug>,
491) -> Result<StatementDesc, PlanError> {
492 Ok(StatementDesc::new(None))
493}
494
495pub fn describe_create_webhook_source(
496 _: &StatementContext,
497 _: CreateWebhookSourceStatement<Aug>,
498) -> Result<StatementDesc, PlanError> {
499 Ok(StatementDesc::new(None))
500}
501
502pub fn describe_create_source(
503 _: &StatementContext,
504 _: CreateSourceStatement<Aug>,
505) -> Result<StatementDesc, PlanError> {
506 Ok(StatementDesc::new(None))
507}
508
509pub fn describe_create_subsource(
510 _: &StatementContext,
511 _: CreateSubsourceStatement<Aug>,
512) -> Result<StatementDesc, PlanError> {
513 Ok(StatementDesc::new(None))
514}
515
516generate_extracted_config!(
517 CreateSourceOption,
518 (TimestampInterval, Duration),
519 (RetainHistory, OptionalDuration)
520);
521
522generate_extracted_config!(
523 PgConfigOption,
524 (Details, String),
525 (Publication, String),
526 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![]))
527);
528
529generate_extracted_config!(
530 MySqlConfigOption,
531 (Details, String),
532 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
533 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
534);
535
536generate_extracted_config!(
537 SqlServerConfigOption,
538 (Details, String),
539 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
540 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![]))
541);
542
543pub fn plan_create_webhook_source(
544 scx: &StatementContext,
545 mut stmt: CreateWebhookSourceStatement<Aug>,
546) -> Result<Plan, PlanError> {
547 if stmt.is_table {
548 scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?;
549 }
550
551 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
554 let enable_multi_replica_sources =
555 ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs());
556 if !enable_multi_replica_sources {
557 if in_cluster.replica_ids().len() > 1 {
558 sql_bail!("cannot create webhook source in cluster with more than one replica")
559 }
560 }
561 let create_sql =
562 normalize::create_statement(scx, Statement::CreateWebhookSource(stmt.clone()))?;
563
564 let CreateWebhookSourceStatement {
565 name,
566 if_not_exists,
567 body_format,
568 include_headers,
569 validate_using,
570 is_table,
571 in_cluster: _,
573 } = stmt;
574
575 let validate_using = validate_using
576 .map(|stmt| query::plan_webhook_validate_using(scx, stmt))
577 .transpose()?;
578 if let Some(WebhookValidation { expression, .. }) = &validate_using {
579 if !expression.contains_column() {
582 return Err(PlanError::WebhookValidationDoesNotUseColumns);
583 }
584 if expression.contains_unmaterializable_except(&[UnmaterializableFunc::CurrentTimestamp]) {
588 return Err(PlanError::WebhookValidationNonDeterministic);
589 }
590 }
591
592 let body_format = match body_format {
593 Format::Bytes => WebhookBodyFormat::Bytes,
594 Format::Json { array } => WebhookBodyFormat::Json { array },
595 Format::Text => WebhookBodyFormat::Text,
596 ty => {
598 return Err(PlanError::Unsupported {
599 feature: format!("{ty} is not a valid BODY FORMAT for a WEBHOOK source"),
600 discussion_no: None,
601 });
602 }
603 };
604
605 let mut column_ty = vec![
606 ColumnType {
608 scalar_type: ScalarType::from(body_format),
609 nullable: false,
610 },
611 ];
612 let mut column_names = vec!["body".to_string()];
613
614 let mut headers = WebhookHeaders::default();
615
616 if let Some(filters) = include_headers.column {
618 column_ty.push(ColumnType {
619 scalar_type: ScalarType::Map {
620 value_type: Box::new(ScalarType::String),
621 custom_id: None,
622 },
623 nullable: false,
624 });
625 column_names.push("headers".to_string());
626
627 let (allow, block): (BTreeSet<_>, BTreeSet<_>) =
628 filters.into_iter().partition_map(|filter| {
629 if filter.block {
630 itertools::Either::Right(filter.header_name)
631 } else {
632 itertools::Either::Left(filter.header_name)
633 }
634 });
635 headers.header_column = Some(WebhookHeaderFilters { allow, block });
636 }
637
638 for header in include_headers.mappings {
640 let scalar_type = header
641 .use_bytes
642 .then_some(ScalarType::Bytes)
643 .unwrap_or(ScalarType::String);
644 column_ty.push(ColumnType {
645 scalar_type,
646 nullable: true,
647 });
648 column_names.push(header.column_name.into_string());
649
650 let column_idx = column_ty.len() - 1;
651 assert_eq!(
653 column_idx,
654 column_names.len() - 1,
655 "header column names and types don't match"
656 );
657 headers
658 .mapped_headers
659 .insert(column_idx, (header.header_name, header.use_bytes));
660 }
661
662 let mut unique_check = HashSet::with_capacity(column_names.len());
664 for name in &column_names {
665 if !unique_check.insert(name) {
666 return Err(PlanError::AmbiguousColumn(name.clone().into()));
667 }
668 }
669 if column_names.len() > MAX_NUM_COLUMNS {
670 return Err(PlanError::TooManyColumns {
671 max_num_columns: MAX_NUM_COLUMNS,
672 req_num_columns: column_names.len(),
673 });
674 }
675
676 let typ = RelationType::new(column_ty);
677 let desc = RelationDesc::new(typ, column_names);
678
679 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
681 let full_name = scx.catalog.resolve_full_name(&name);
682 let partial_name = PartialItemName::from(full_name.clone());
683 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
684 return Err(PlanError::ItemAlreadyExists {
685 name: full_name.to_string(),
686 item_type: item.item_type(),
687 });
688 }
689
690 let timeline = Timeline::EpochMilliseconds;
693
694 let plan = if is_table {
695 let data_source = DataSourceDesc::Webhook {
696 validate_using,
697 body_format,
698 headers,
699 cluster_id: Some(in_cluster.id()),
700 };
701 let data_source = TableDataSource::DataSource {
702 desc: data_source,
703 timeline,
704 };
705 Plan::CreateTable(CreateTablePlan {
706 name,
707 if_not_exists,
708 table: Table {
709 create_sql,
710 desc: VersionedRelationDesc::new(desc),
711 temporary: false,
712 compaction_window: None,
713 data_source,
714 },
715 })
716 } else {
717 let data_source = DataSourceDesc::Webhook {
718 validate_using,
719 body_format,
720 headers,
721 cluster_id: None,
723 };
724 Plan::CreateSource(CreateSourcePlan {
725 name,
726 source: Source {
727 create_sql,
728 data_source,
729 desc,
730 compaction_window: None,
731 },
732 if_not_exists,
733 timeline,
734 in_cluster: Some(in_cluster.id()),
735 })
736 };
737
738 Ok(plan)
739}
740
741pub fn plan_create_source(
742 scx: &StatementContext,
743 mut stmt: CreateSourceStatement<Aug>,
744) -> Result<Plan, PlanError> {
745 let CreateSourceStatement {
746 name,
747 in_cluster: _,
748 col_names,
749 connection: source_connection,
750 envelope,
751 if_not_exists,
752 format,
753 key_constraint,
754 include_metadata,
755 with_options,
756 external_references: referenced_subsources,
757 progress_subsource,
758 } = &stmt;
759
760 mz_ore::soft_assert_or_log!(
761 referenced_subsources.is_none(),
762 "referenced subsources must be cleared in purification"
763 );
764
765 let force_source_table_syntax = scx.catalog.system_vars().enable_create_table_from_source()
766 && scx.catalog.system_vars().force_source_table_syntax();
767
768 if force_source_table_syntax {
771 if envelope.is_some() || format.is_some() || !include_metadata.is_empty() {
772 Err(PlanError::UseTablesForSources(
773 "CREATE SOURCE (ENVELOPE|FORMAT|INCLUDE)".to_string(),
774 ))?;
775 }
776 }
777
778 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
779
780 if !matches!(source_connection, CreateSourceConnection::Kafka { .. })
781 && include_metadata
782 .iter()
783 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
784 {
785 sql_bail!("INCLUDE HEADERS with non-Kafka sources not supported");
787 }
788 if !matches!(
789 source_connection,
790 CreateSourceConnection::Kafka { .. } | CreateSourceConnection::LoadGenerator { .. }
791 ) && !include_metadata.is_empty()
792 {
793 bail_unsupported!("INCLUDE metadata with non-Kafka sources");
794 }
795
796 let external_connection = match source_connection {
797 CreateSourceConnection::Kafka {
798 connection: connection_name,
799 options,
800 } => {
801 let connection_item = scx.get_item_by_resolved_name(connection_name)?;
802 if !matches!(connection_item.connection()?, Connection::Kafka(_)) {
803 sql_bail!(
804 "{} is not a kafka connection",
805 scx.catalog.resolve_full_name(connection_item.name())
806 )
807 }
808
809 let KafkaSourceConfigOptionExtracted {
810 group_id_prefix,
811 topic,
812 topic_metadata_refresh_interval,
813 start_timestamp: _, start_offset,
815 seen: _,
816 }: KafkaSourceConfigOptionExtracted = options.clone().try_into()?;
817
818 let topic = topic.expect("validated exists during purification");
819
820 let mut start_offsets = BTreeMap::new();
821 if let Some(offsets) = start_offset {
822 for (part, offset) in offsets.iter().enumerate() {
823 if *offset < 0 {
824 sql_bail!("START OFFSET must be a nonnegative integer");
825 }
826 start_offsets.insert(i32::try_from(part)?, *offset);
827 }
828 }
829
830 if !start_offsets.is_empty() && envelope.requires_all_input() {
831 sql_bail!("START OFFSET is not supported with ENVELOPE {}", envelope)
832 }
833
834 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
835 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
838 }
839
840 if !include_metadata.is_empty()
841 && !matches!(
842 envelope,
843 ast::SourceEnvelope::Upsert { .. }
844 | ast::SourceEnvelope::None
845 | ast::SourceEnvelope::Debezium
846 )
847 {
848 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
850 }
851
852 let metadata_columns = include_metadata
856 .into_iter()
857 .flat_map(|item| match item {
858 SourceIncludeMetadata::Timestamp { alias } => {
859 let name = match alias {
860 Some(name) => name.to_string(),
861 None => "timestamp".to_owned(),
862 };
863 Some((name, KafkaMetadataKind::Timestamp))
864 }
865 SourceIncludeMetadata::Partition { alias } => {
866 let name = match alias {
867 Some(name) => name.to_string(),
868 None => "partition".to_owned(),
869 };
870 Some((name, KafkaMetadataKind::Partition))
871 }
872 SourceIncludeMetadata::Offset { alias } => {
873 let name = match alias {
874 Some(name) => name.to_string(),
875 None => "offset".to_owned(),
876 };
877 Some((name, KafkaMetadataKind::Offset))
878 }
879 SourceIncludeMetadata::Headers { alias } => {
880 let name = match alias {
881 Some(name) => name.to_string(),
882 None => "headers".to_owned(),
883 };
884 Some((name, KafkaMetadataKind::Headers))
885 }
886 SourceIncludeMetadata::Header {
887 alias,
888 key,
889 use_bytes,
890 } => Some((
891 alias.to_string(),
892 KafkaMetadataKind::Header {
893 key: key.clone(),
894 use_bytes: *use_bytes,
895 },
896 )),
897 SourceIncludeMetadata::Key { .. } => {
898 None
900 }
901 })
902 .collect();
903
904 let connection = KafkaSourceConnection::<ReferencedConnection> {
905 connection: connection_item.id(),
906 connection_id: connection_item.id(),
907 topic,
908 start_offsets,
909 group_id_prefix,
910 topic_metadata_refresh_interval,
911 metadata_columns,
912 };
913
914 GenericSourceConnection::Kafka(connection)
915 }
916 CreateSourceConnection::Postgres {
917 connection,
918 options,
919 }
920 | CreateSourceConnection::Yugabyte {
921 connection,
922 options,
923 } => {
924 let (source_flavor, flavor_name) = match source_connection {
925 CreateSourceConnection::Postgres { .. } => (PostgresFlavor::Vanilla, "PostgreSQL"),
926 CreateSourceConnection::Yugabyte { .. } => (PostgresFlavor::Yugabyte, "Yugabyte"),
927 _ => unreachable!(),
928 };
929
930 let connection_item = scx.get_item_by_resolved_name(connection)?;
931 let connection_mismatch = match connection_item.connection()? {
932 Connection::Postgres(conn) => source_flavor != conn.flavor,
933 _ => true,
934 };
935 if connection_mismatch {
936 sql_bail!(
937 "{} is not a {flavor_name} connection",
938 scx.catalog.resolve_full_name(connection_item.name())
939 )
940 }
941
942 let PgConfigOptionExtracted {
943 details,
944 publication,
945 text_columns: _,
949 seen: _,
950 } = options.clone().try_into()?;
951
952 let details = details
953 .as_ref()
954 .ok_or_else(|| sql_err!("internal error: Postgres source missing details"))?;
955 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
956 let details = ProtoPostgresSourcePublicationDetails::decode(&*details)
957 .map_err(|e| sql_err!("{}", e))?;
958
959 let publication_details = PostgresSourcePublicationDetails::from_proto(details)
960 .map_err(|e| sql_err!("{}", e))?;
961
962 let connection =
963 GenericSourceConnection::<ReferencedConnection>::from(PostgresSourceConnection {
964 connection: connection_item.id(),
965 connection_id: connection_item.id(),
966 publication: publication.expect("validated exists during purification"),
967 publication_details,
968 });
969
970 connection
971 }
972 CreateSourceConnection::SqlServer {
973 connection,
974 options,
975 } => {
976 let connection_item = scx.get_item_by_resolved_name(connection)?;
977 match connection_item.connection()? {
978 Connection::SqlServer(connection) => connection,
979 _ => sql_bail!(
980 "{} is not a SQL Server connection",
981 scx.catalog.resolve_full_name(connection_item.name())
982 ),
983 };
984
985 let SqlServerConfigOptionExtracted { details, .. } = options.clone().try_into()?;
986 let details = details
987 .as_ref()
988 .ok_or_else(|| sql_err!("internal error: SQL Server source missing details"))?;
989 let extras = hex::decode(details)
990 .map_err(|e| sql_err!("{e}"))
991 .and_then(|raw| {
992 ProtoSqlServerSourceExtras::decode(&*raw).map_err(|e| sql_err!("{e}"))
993 })
994 .and_then(|proto| {
995 SqlServerSourceExtras::from_proto(proto).map_err(|e| sql_err!("{e}"))
996 })?;
997
998 let connection =
999 GenericSourceConnection::<ReferencedConnection>::from(SqlServerSource {
1000 catalog_id: connection_item.id(),
1001 connection: connection_item.id(),
1002 extras,
1003 });
1004
1005 connection
1006 }
1007 CreateSourceConnection::MySql {
1008 connection,
1009 options,
1010 } => {
1011 let connection_item = scx.get_item_by_resolved_name(connection)?;
1012 match connection_item.connection()? {
1013 Connection::MySql(connection) => connection,
1014 _ => sql_bail!(
1015 "{} is not a MySQL connection",
1016 scx.catalog.resolve_full_name(connection_item.name())
1017 ),
1018 };
1019 let MySqlConfigOptionExtracted {
1020 details,
1021 text_columns: _,
1025 exclude_columns: _,
1026 seen: _,
1027 } = options.clone().try_into()?;
1028
1029 let details = details
1030 .as_ref()
1031 .ok_or_else(|| sql_err!("internal error: MySQL source missing details"))?;
1032 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1033 let details =
1034 ProtoMySqlSourceDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1035 let details = MySqlSourceDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1036
1037 let connection =
1038 GenericSourceConnection::<ReferencedConnection>::from(MySqlSourceConnection {
1039 connection: connection_item.id(),
1040 connection_id: connection_item.id(),
1041 details,
1042 });
1043
1044 connection
1045 }
1046 CreateSourceConnection::LoadGenerator { generator, options } => {
1047 let load_generator =
1048 load_generator_ast_to_generator(scx, generator, options, include_metadata)?;
1049
1050 let LoadGeneratorOptionExtracted {
1051 tick_interval,
1052 as_of,
1053 up_to,
1054 ..
1055 } = options.clone().try_into()?;
1056 let tick_micros = match tick_interval {
1057 Some(interval) => Some(interval.as_micros().try_into()?),
1058 None => None,
1059 };
1060
1061 if up_to < as_of {
1062 sql_bail!("UP TO cannot be less than AS OF");
1063 }
1064
1065 let connection = GenericSourceConnection::from(LoadGeneratorSourceConnection {
1066 load_generator,
1067 tick_micros,
1068 as_of,
1069 up_to,
1070 });
1071
1072 connection
1073 }
1074 };
1075
1076 let CreateSourceOptionExtracted {
1077 timestamp_interval,
1078 retain_history,
1079 seen: _,
1080 } = CreateSourceOptionExtracted::try_from(with_options.clone())?;
1081
1082 let metadata_columns_desc = match external_connection {
1083 GenericSourceConnection::Kafka(KafkaSourceConnection {
1084 ref metadata_columns,
1085 ..
1086 }) => kafka_metadata_columns_desc(metadata_columns),
1087 _ => vec![],
1088 };
1089
1090 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1092 scx,
1093 &envelope,
1094 format,
1095 Some(external_connection.default_key_desc()),
1096 external_connection.default_value_desc(),
1097 include_metadata,
1098 metadata_columns_desc,
1099 &external_connection,
1100 )?;
1101 plan_utils::maybe_rename_columns(format!("source {}", name), &mut desc, col_names)?;
1102
1103 let names: Vec<_> = desc.iter_names().cloned().collect();
1104 if let Some(dup) = names.iter().duplicates().next() {
1105 sql_bail!("column {} specified more than once", dup.quoted());
1106 }
1107
1108 if let Some(KeyConstraint::PrimaryKeyNotEnforced { columns }) = key_constraint.clone() {
1110 scx.require_feature_flag(&vars::ENABLE_PRIMARY_KEY_NOT_ENFORCED)?;
1113
1114 let key_columns = columns
1115 .into_iter()
1116 .map(normalize::column_name)
1117 .collect::<Vec<_>>();
1118
1119 let mut uniq = BTreeSet::new();
1120 for col in key_columns.iter() {
1121 if !uniq.insert(col) {
1122 sql_bail!("Repeated column name in source key constraint: {}", col);
1123 }
1124 }
1125
1126 let key_indices = key_columns
1127 .iter()
1128 .map(|col| {
1129 let name_idx = desc
1130 .get_by_name(col)
1131 .map(|(idx, _type)| idx)
1132 .ok_or_else(|| sql_err!("No such column in source key constraint: {}", col))?;
1133 if desc.get_unambiguous_name(name_idx).is_none() {
1134 sql_bail!("Ambiguous column in source key constraint: {}", col);
1135 }
1136 Ok(name_idx)
1137 })
1138 .collect::<Result<Vec<_>, _>>()?;
1139
1140 if !desc.typ().keys.is_empty() {
1141 return Err(key_constraint_err(&desc, &key_columns));
1142 } else {
1143 desc = desc.with_key(key_indices);
1144 }
1145 }
1146
1147 let timestamp_interval = match timestamp_interval {
1148 Some(duration) => {
1149 let min = scx.catalog.system_vars().min_timestamp_interval();
1150 let max = scx.catalog.system_vars().max_timestamp_interval();
1151 if duration < min || duration > max {
1152 return Err(PlanError::InvalidTimestampInterval {
1153 min,
1154 max,
1155 requested: duration,
1156 });
1157 }
1158 duration
1159 }
1160 None => scx.catalog.config().timestamp_interval,
1161 };
1162
1163 let (primary_export, primary_export_details) = if force_source_table_syntax {
1164 (
1165 SourceExportDataConfig {
1166 encoding: None,
1167 envelope: SourceEnvelope::None(NoneEnvelope {
1168 key_envelope: KeyEnvelope::None,
1169 key_arity: 0,
1170 }),
1171 },
1172 SourceExportDetails::None,
1173 )
1174 } else {
1175 (
1176 SourceExportDataConfig {
1177 encoding,
1178 envelope: envelope.clone(),
1179 },
1180 external_connection.primary_export_details(),
1181 )
1182 };
1183 let source_desc = SourceDesc::<ReferencedConnection> {
1184 connection: external_connection,
1185 primary_export,
1189 primary_export_details,
1190 timestamp_interval,
1191 };
1192
1193 let progress_subsource = match progress_subsource {
1194 Some(name) => match name {
1195 DeferredItemName::Named(name) => match name {
1196 ResolvedItemName::Item { id, .. } => *id,
1197 ResolvedItemName::Cte { .. }
1198 | ResolvedItemName::ContinualTask { .. }
1199 | ResolvedItemName::Error => {
1200 sql_bail!("[internal error] invalid target id")
1201 }
1202 },
1203 DeferredItemName::Deferred(_) => {
1204 sql_bail!("[internal error] progress subsource must be named during purification")
1205 }
1206 },
1207 _ => sql_bail!("[internal error] progress subsource must be named during purification"),
1208 };
1209
1210 let if_not_exists = *if_not_exists;
1211 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1212
1213 let full_name = scx.catalog.resolve_full_name(&name);
1215 let partial_name = PartialItemName::from(full_name.clone());
1216 if let (false, Ok(item)) = (
1219 if_not_exists,
1220 scx.catalog.resolve_item_or_type(&partial_name),
1221 ) {
1222 return Err(PlanError::ItemAlreadyExists {
1223 name: full_name.to_string(),
1224 item_type: item.item_type(),
1225 });
1226 }
1227
1228 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
1232
1233 let create_sql = normalize::create_statement(scx, Statement::CreateSource(stmt))?;
1234
1235 let timeline = match envelope {
1237 SourceEnvelope::CdcV2 => {
1238 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1239 }
1240 _ => Timeline::EpochMilliseconds,
1241 };
1242
1243 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1244 let source = Source {
1245 create_sql,
1246 data_source: DataSourceDesc::Ingestion(Ingestion {
1247 desc: source_desc,
1248 progress_subsource,
1249 }),
1250 desc,
1251 compaction_window,
1252 };
1253
1254 Ok(Plan::CreateSource(CreateSourcePlan {
1255 name,
1256 source,
1257 if_not_exists,
1258 timeline,
1259 in_cluster: Some(in_cluster.id()),
1260 }))
1261}
1262
1263fn apply_source_envelope_encoding(
1264 scx: &StatementContext,
1265 envelope: &ast::SourceEnvelope,
1266 format: &Option<FormatSpecifier<Aug>>,
1267 key_desc: Option<RelationDesc>,
1268 value_desc: RelationDesc,
1269 include_metadata: &[SourceIncludeMetadata],
1270 metadata_columns_desc: Vec<(&str, ColumnType)>,
1271 source_connection: &GenericSourceConnection<ReferencedConnection>,
1272) -> Result<
1273 (
1274 RelationDesc,
1275 SourceEnvelope,
1276 Option<SourceDataEncoding<ReferencedConnection>>,
1277 ),
1278 PlanError,
1279> {
1280 let encoding = match format {
1281 Some(format) => Some(get_encoding(scx, format, envelope)?),
1282 None => None,
1283 };
1284
1285 let (key_desc, value_desc) = match &encoding {
1286 Some(encoding) => {
1287 match value_desc.typ().columns() {
1290 [typ] => match typ.scalar_type {
1291 ScalarType::Bytes => {}
1292 _ => sql_bail!(
1293 "The schema produced by the source is incompatible with format decoding"
1294 ),
1295 },
1296 _ => sql_bail!(
1297 "The schema produced by the source is incompatible with format decoding"
1298 ),
1299 }
1300
1301 let (key_desc, value_desc) = encoding.desc()?;
1302
1303 let key_desc = key_desc.map(|desc| {
1310 let is_kafka = matches!(source_connection, GenericSourceConnection::Kafka(_));
1311 let is_envelope_none = matches!(envelope, ast::SourceEnvelope::None);
1312 if is_kafka && is_envelope_none {
1313 RelationDesc::from_names_and_types(
1314 desc.into_iter()
1315 .map(|(name, typ)| (name, typ.nullable(true))),
1316 )
1317 } else {
1318 desc
1319 }
1320 });
1321 (key_desc, value_desc)
1322 }
1323 None => (key_desc, value_desc),
1324 };
1325
1326 let key_envelope_no_encoding = matches!(
1339 source_connection,
1340 GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
1341 load_generator: LoadGenerator::KeyValue(_),
1342 ..
1343 })
1344 );
1345 let mut key_envelope = get_key_envelope(
1346 include_metadata,
1347 encoding.as_ref(),
1348 key_envelope_no_encoding,
1349 )?;
1350
1351 match (&envelope, &key_envelope) {
1352 (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {}
1353 (ast::SourceEnvelope::Debezium, _) => sql_bail!(
1354 "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys."
1355 ),
1356 _ => {}
1357 };
1358
1359 let envelope = match &envelope {
1368 ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope),
1370 ast::SourceEnvelope::Debezium => {
1371 let after_idx = match typecheck_debezium(&value_desc) {
1373 Ok((_before_idx, after_idx)) => Ok(after_idx),
1374 Err(type_err) => match encoding.as_ref().map(|e| &e.value) {
1375 Some(DataEncoding::Avro(_)) => Err(type_err),
1376 _ => Err(sql_err!(
1377 "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO"
1378 )),
1379 },
1380 }?;
1381
1382 UnplannedSourceEnvelope::Upsert {
1383 style: UpsertStyle::Debezium { after_idx },
1384 }
1385 }
1386 ast::SourceEnvelope::Upsert {
1387 value_decode_err_policy,
1388 } => {
1389 let key_encoding = match encoding.as_ref().and_then(|e| e.key.as_ref()) {
1390 None => {
1391 if !key_envelope_no_encoding {
1392 bail_unsupported!(format!(
1393 "UPSERT requires a key/value format: {:?}",
1394 format
1395 ))
1396 }
1397 None
1398 }
1399 Some(key_encoding) => Some(key_encoding),
1400 };
1401 if key_envelope == KeyEnvelope::None {
1404 key_envelope = get_unnamed_key_envelope(key_encoding)?;
1405 }
1406 let style = match value_decode_err_policy.as_slice() {
1408 [] => UpsertStyle::Default(key_envelope),
1409 [SourceErrorPolicy::Inline { alias }] => {
1410 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_UPSERT_INLINE_ERRORS)?;
1411 UpsertStyle::ValueErrInline {
1412 key_envelope,
1413 error_column: alias
1414 .as_ref()
1415 .map_or_else(|| "error".to_string(), |a| a.to_string()),
1416 }
1417 }
1418 _ => {
1419 bail_unsupported!("ENVELOPE UPSERT with unsupported value decode error policy")
1420 }
1421 };
1422
1423 UnplannedSourceEnvelope::Upsert { style }
1424 }
1425 ast::SourceEnvelope::CdcV2 => {
1426 scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;
1427 match format {
1429 Some(FormatSpecifier::Bare(Format::Avro(_))) => {}
1430 _ => bail_unsupported!("non-Avro-encoded ENVELOPE MATERIALIZE"),
1431 }
1432 UnplannedSourceEnvelope::CdcV2
1433 }
1434 };
1435
1436 let metadata_desc = included_column_desc(metadata_columns_desc);
1437 let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?;
1438
1439 Ok((desc, envelope, encoding))
1440}
1441
1442fn plan_source_export_desc(
1445 scx: &StatementContext,
1446 name: &UnresolvedItemName,
1447 columns: &Vec<ColumnDef<Aug>>,
1448 constraints: &Vec<TableConstraint<Aug>>,
1449) -> Result<RelationDesc, PlanError> {
1450 let names: Vec<_> = columns
1451 .iter()
1452 .map(|c| normalize::column_name(c.name.clone()))
1453 .collect();
1454
1455 if let Some(dup) = names.iter().duplicates().next() {
1456 sql_bail!("column {} specified more than once", dup.quoted());
1457 }
1458
1459 let mut column_types = Vec::with_capacity(columns.len());
1462 let mut keys = Vec::new();
1463
1464 for (i, c) in columns.into_iter().enumerate() {
1465 let aug_data_type = &c.data_type;
1466 let ty = query::scalar_type_from_sql(scx, aug_data_type)?;
1467 let mut nullable = true;
1468 for option in &c.options {
1469 match &option.option {
1470 ColumnOption::NotNull => nullable = false,
1471 ColumnOption::Default(_) => {
1472 bail_unsupported!("Source export with default value")
1473 }
1474 ColumnOption::Unique { is_primary } => {
1475 keys.push(vec![i]);
1476 if *is_primary {
1477 nullable = false;
1478 }
1479 }
1480 other => {
1481 bail_unsupported!(format!("Source export with column constraint: {}", other))
1482 }
1483 }
1484 }
1485 column_types.push(ty.nullable(nullable));
1486 }
1487
1488 let mut seen_primary = false;
1489 'c: for constraint in constraints {
1490 match constraint {
1491 TableConstraint::Unique {
1492 name: _,
1493 columns,
1494 is_primary,
1495 nulls_not_distinct,
1496 } => {
1497 if seen_primary && *is_primary {
1498 sql_bail!(
1499 "multiple primary keys for source export {} are not allowed",
1500 name.to_ast_string_stable()
1501 );
1502 }
1503 seen_primary = *is_primary || seen_primary;
1504
1505 let mut key = vec![];
1506 for column in columns {
1507 let column = normalize::column_name(column.clone());
1508 match names.iter().position(|name| *name == column) {
1509 None => sql_bail!("unknown column in constraint: {}", column),
1510 Some(i) => {
1511 let nullable = &mut column_types[i].nullable;
1512 if *is_primary {
1513 if *nulls_not_distinct {
1514 sql_bail!(
1515 "[internal error] PRIMARY KEY does not support NULLS NOT DISTINCT"
1516 );
1517 }
1518 *nullable = false;
1519 } else if !(*nulls_not_distinct || !*nullable) {
1520 break 'c;
1523 }
1524
1525 key.push(i);
1526 }
1527 }
1528 }
1529
1530 if *is_primary {
1531 keys.insert(0, key);
1532 } else {
1533 keys.push(key);
1534 }
1535 }
1536 TableConstraint::ForeignKey { .. } => {
1537 bail_unsupported!("Source export with a foreign key")
1538 }
1539 TableConstraint::Check { .. } => {
1540 bail_unsupported!("Source export with a check constraint")
1541 }
1542 }
1543 }
1544
1545 let typ = RelationType::new(column_types).with_keys(keys);
1546 let desc = RelationDesc::new(typ, names);
1547 Ok(desc)
1548}
1549
1550generate_extracted_config!(
1551 CreateSubsourceOption,
1552 (Progress, bool, Default(false)),
1553 (ExternalReference, UnresolvedItemName),
1554 (RetainHistory, OptionalDuration),
1555 (TextColumns, Vec::<Ident>, Default(vec![])),
1556 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1557 (Details, String)
1558);
1559
1560pub fn plan_create_subsource(
1561 scx: &StatementContext,
1562 stmt: CreateSubsourceStatement<Aug>,
1563) -> Result<Plan, PlanError> {
1564 let CreateSubsourceStatement {
1565 name,
1566 columns,
1567 of_source,
1568 constraints,
1569 if_not_exists,
1570 with_options,
1571 } = &stmt;
1572
1573 let CreateSubsourceOptionExtracted {
1574 progress,
1575 retain_history,
1576 external_reference,
1577 text_columns,
1578 exclude_columns,
1579 details,
1580 seen: _,
1581 } = with_options.clone().try_into()?;
1582
1583 assert!(
1588 progress ^ (external_reference.is_some() && of_source.is_some()),
1589 "CREATE SUBSOURCE statement must specify either PROGRESS or REFERENCES option"
1590 );
1591
1592 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1593
1594 let data_source = if let Some(source_reference) = of_source {
1595 if scx.catalog.system_vars().enable_create_table_from_source()
1598 && scx.catalog.system_vars().force_source_table_syntax()
1599 {
1600 Err(PlanError::UseTablesForSources(
1601 "CREATE SUBSOURCE".to_string(),
1602 ))?;
1603 }
1604
1605 let ingestion_id = *source_reference.item_id();
1608 let external_reference = external_reference.unwrap();
1609
1610 let details = details
1613 .as_ref()
1614 .ok_or_else(|| sql_err!("internal error: source-export subsource missing details"))?;
1615 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1616 let details =
1617 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1618 let details =
1619 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1620 let details = match details {
1621 SourceExportStatementDetails::Postgres { table } => {
1622 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1623 column_casts: crate::pure::postgres::generate_column_casts(
1624 scx,
1625 &table,
1626 &text_columns,
1627 )?,
1628 table,
1629 })
1630 }
1631 SourceExportStatementDetails::MySql {
1632 table,
1633 initial_gtid_set,
1634 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1635 table,
1636 initial_gtid_set,
1637 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1638 exclude_columns: exclude_columns
1639 .into_iter()
1640 .map(|c| c.into_string())
1641 .collect(),
1642 }),
1643 SourceExportStatementDetails::SqlServer {
1644 table,
1645 capture_instance,
1646 initial_lsn,
1647 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1648 capture_instance,
1649 table,
1650 initial_lsn,
1651 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1652 exclude_columns: exclude_columns
1653 .into_iter()
1654 .map(|c| c.into_string())
1655 .collect(),
1656 }),
1657 SourceExportStatementDetails::LoadGenerator { output } => {
1658 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1659 }
1660 SourceExportStatementDetails::Kafka {} => {
1661 bail_unsupported!("subsources cannot reference Kafka sources")
1662 }
1663 };
1664 DataSourceDesc::IngestionExport {
1665 ingestion_id,
1666 external_reference,
1667 details,
1668 data_config: SourceExportDataConfig {
1670 envelope: SourceEnvelope::None(NoneEnvelope {
1671 key_envelope: KeyEnvelope::None,
1672 key_arity: 0,
1673 }),
1674 encoding: None,
1675 },
1676 }
1677 } else if progress {
1678 DataSourceDesc::Progress
1679 } else {
1680 panic!("subsources must specify one of `external_reference`, `progress`, or `references`")
1681 };
1682
1683 let if_not_exists = *if_not_exists;
1684 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1685
1686 let create_sql = normalize::create_statement(scx, Statement::CreateSubsource(stmt))?;
1687
1688 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1689 let source = Source {
1690 create_sql,
1691 data_source,
1692 desc,
1693 compaction_window,
1694 };
1695
1696 Ok(Plan::CreateSource(CreateSourcePlan {
1697 name,
1698 source,
1699 if_not_exists,
1700 timeline: Timeline::EpochMilliseconds,
1701 in_cluster: None,
1702 }))
1703}
1704
1705generate_extracted_config!(
1706 TableFromSourceOption,
1707 (TextColumns, Vec::<Ident>, Default(vec![])),
1708 (ExcludeColumns, Vec::<Ident>, Default(vec![])),
1709 (PartitionBy, Vec<Ident>),
1710 (RetainHistory, OptionalDuration),
1711 (Details, String)
1712);
1713
1714pub fn plan_create_table_from_source(
1715 scx: &StatementContext,
1716 stmt: CreateTableFromSourceStatement<Aug>,
1717) -> Result<Plan, PlanError> {
1718 if !scx.catalog.system_vars().enable_create_table_from_source() {
1719 sql_bail!("CREATE TABLE ... FROM SOURCE is not supported");
1720 }
1721
1722 let CreateTableFromSourceStatement {
1723 name,
1724 columns,
1725 constraints,
1726 if_not_exists,
1727 source,
1728 external_reference,
1729 envelope,
1730 format,
1731 include_metadata,
1732 with_options,
1733 } = &stmt;
1734
1735 let envelope = envelope.clone().unwrap_or(ast::SourceEnvelope::None);
1736
1737 let TableFromSourceOptionExtracted {
1738 text_columns,
1739 exclude_columns,
1740 retain_history,
1741 partition_by,
1742 details,
1743 seen: _,
1744 } = with_options.clone().try_into()?;
1745
1746 let source_item = scx.get_item_by_resolved_name(source)?;
1747 let ingestion_id = source_item.id();
1748
1749 let details = details
1752 .as_ref()
1753 .ok_or_else(|| sql_err!("internal error: source-export missing details"))?;
1754 let details = hex::decode(details).map_err(|e| sql_err!("{}", e))?;
1755 let details =
1756 ProtoSourceExportStatementDetails::decode(&*details).map_err(|e| sql_err!("{}", e))?;
1757 let details =
1758 SourceExportStatementDetails::from_proto(details).map_err(|e| sql_err!("{}", e))?;
1759
1760 if !matches!(details, SourceExportStatementDetails::Kafka { .. })
1761 && include_metadata
1762 .iter()
1763 .any(|sic| matches!(sic, SourceIncludeMetadata::Headers { .. }))
1764 {
1765 sql_bail!("INCLUDE HEADERS with non-Kafka source table not supported");
1767 }
1768 if !matches!(
1769 details,
1770 SourceExportStatementDetails::Kafka { .. }
1771 | SourceExportStatementDetails::LoadGenerator { .. }
1772 ) && !include_metadata.is_empty()
1773 {
1774 bail_unsupported!("INCLUDE metadata with non-Kafka source table");
1775 }
1776
1777 let details = match details {
1778 SourceExportStatementDetails::Postgres { table } => {
1779 SourceExportDetails::Postgres(PostgresSourceExportDetails {
1780 column_casts: crate::pure::postgres::generate_column_casts(
1781 scx,
1782 &table,
1783 &text_columns,
1784 )?,
1785 table,
1786 })
1787 }
1788 SourceExportStatementDetails::MySql {
1789 table,
1790 initial_gtid_set,
1791 } => SourceExportDetails::MySql(MySqlSourceExportDetails {
1792 table,
1793 initial_gtid_set,
1794 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1795 exclude_columns: exclude_columns
1796 .into_iter()
1797 .map(|c| c.into_string())
1798 .collect(),
1799 }),
1800 SourceExportStatementDetails::SqlServer {
1801 table,
1802 capture_instance,
1803 initial_lsn,
1804 } => SourceExportDetails::SqlServer(SqlServerSourceExportDetails {
1805 table,
1806 capture_instance,
1807 initial_lsn,
1808 text_columns: text_columns.into_iter().map(|c| c.into_string()).collect(),
1809 exclude_columns: exclude_columns
1810 .into_iter()
1811 .map(|c| c.into_string())
1812 .collect(),
1813 }),
1814 SourceExportStatementDetails::LoadGenerator { output } => {
1815 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails { output })
1816 }
1817 SourceExportStatementDetails::Kafka {} => {
1818 if !include_metadata.is_empty()
1819 && !matches!(
1820 envelope,
1821 ast::SourceEnvelope::Upsert { .. }
1822 | ast::SourceEnvelope::None
1823 | ast::SourceEnvelope::Debezium
1824 )
1825 {
1826 sql_bail!("INCLUDE <metadata> requires ENVELOPE (NONE|UPSERT|DEBEZIUM)");
1828 }
1829
1830 let metadata_columns = include_metadata
1831 .into_iter()
1832 .flat_map(|item| match item {
1833 SourceIncludeMetadata::Timestamp { alias } => {
1834 let name = match alias {
1835 Some(name) => name.to_string(),
1836 None => "timestamp".to_owned(),
1837 };
1838 Some((name, KafkaMetadataKind::Timestamp))
1839 }
1840 SourceIncludeMetadata::Partition { alias } => {
1841 let name = match alias {
1842 Some(name) => name.to_string(),
1843 None => "partition".to_owned(),
1844 };
1845 Some((name, KafkaMetadataKind::Partition))
1846 }
1847 SourceIncludeMetadata::Offset { alias } => {
1848 let name = match alias {
1849 Some(name) => name.to_string(),
1850 None => "offset".to_owned(),
1851 };
1852 Some((name, KafkaMetadataKind::Offset))
1853 }
1854 SourceIncludeMetadata::Headers { alias } => {
1855 let name = match alias {
1856 Some(name) => name.to_string(),
1857 None => "headers".to_owned(),
1858 };
1859 Some((name, KafkaMetadataKind::Headers))
1860 }
1861 SourceIncludeMetadata::Header {
1862 alias,
1863 key,
1864 use_bytes,
1865 } => Some((
1866 alias.to_string(),
1867 KafkaMetadataKind::Header {
1868 key: key.clone(),
1869 use_bytes: *use_bytes,
1870 },
1871 )),
1872 SourceIncludeMetadata::Key { .. } => {
1873 None
1875 }
1876 })
1877 .collect();
1878
1879 SourceExportDetails::Kafka(KafkaSourceExportDetails { metadata_columns })
1880 }
1881 };
1882
1883 let source_connection = &source_item.source_desc()?.expect("is source").connection;
1884
1885 let (key_desc, value_desc) =
1890 if matches!(columns, TableFromSourceColumns::Defined(_)) || !constraints.is_empty() {
1891 let columns = match columns {
1892 TableFromSourceColumns::Defined(columns) => columns,
1893 _ => unreachable!(),
1894 };
1895 let desc = plan_source_export_desc(scx, name, columns, constraints)?;
1896 (None, desc)
1897 } else {
1898 let key_desc = source_connection.default_key_desc();
1899 let value_desc = source_connection.default_value_desc();
1900 (Some(key_desc), value_desc)
1901 };
1902
1903 let metadata_columns_desc = match &details {
1904 SourceExportDetails::Kafka(KafkaSourceExportDetails {
1905 metadata_columns, ..
1906 }) => kafka_metadata_columns_desc(metadata_columns),
1907 _ => vec![],
1908 };
1909
1910 let (mut desc, envelope, encoding) = apply_source_envelope_encoding(
1911 scx,
1912 &envelope,
1913 format,
1914 key_desc,
1915 value_desc,
1916 include_metadata,
1917 metadata_columns_desc,
1918 source_connection,
1919 )?;
1920 if let TableFromSourceColumns::Named(col_names) = columns {
1921 plan_utils::maybe_rename_columns(format!("source table {}", name), &mut desc, col_names)?;
1922 }
1923
1924 let names: Vec<_> = desc.iter_names().cloned().collect();
1925 if let Some(dup) = names.iter().duplicates().next() {
1926 sql_bail!("column {} specified more than once", dup.quoted());
1927 }
1928
1929 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.clone())?)?;
1930
1931 let timeline = match envelope {
1934 SourceEnvelope::CdcV2 => {
1935 Timeline::External(scx.catalog.resolve_full_name(&name).to_string())
1936 }
1937 _ => Timeline::EpochMilliseconds,
1938 };
1939
1940 if let Some(partition_by) = partition_by {
1941 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
1942 check_partition_by(&desc, partition_by)?;
1943 }
1944
1945 let data_source = DataSourceDesc::IngestionExport {
1946 ingestion_id,
1947 external_reference: external_reference
1948 .as_ref()
1949 .expect("populated in purification")
1950 .clone(),
1951 details,
1952 data_config: SourceExportDataConfig { envelope, encoding },
1953 };
1954
1955 let if_not_exists = *if_not_exists;
1956
1957 let create_sql = normalize::create_statement(scx, Statement::CreateTableFromSource(stmt))?;
1958
1959 let compaction_window = plan_retain_history_option(scx, retain_history)?;
1960 let table = Table {
1961 create_sql,
1962 desc: VersionedRelationDesc::new(desc),
1963 temporary: false,
1964 compaction_window,
1965 data_source: TableDataSource::DataSource {
1966 desc: data_source,
1967 timeline,
1968 },
1969 };
1970
1971 Ok(Plan::CreateTable(CreateTablePlan {
1972 name,
1973 table,
1974 if_not_exists,
1975 }))
1976}
1977
1978generate_extracted_config!(
1979 LoadGeneratorOption,
1980 (TickInterval, Duration),
1981 (AsOf, u64, Default(0_u64)),
1982 (UpTo, u64, Default(u64::MAX)),
1983 (ScaleFactor, f64),
1984 (MaxCardinality, u64),
1985 (Keys, u64),
1986 (SnapshotRounds, u64),
1987 (TransactionalSnapshot, bool),
1988 (ValueSize, u64),
1989 (Seed, u64),
1990 (Partitions, u64),
1991 (BatchSize, u64)
1992);
1993
1994impl LoadGeneratorOptionExtracted {
1995 pub(super) fn ensure_only_valid_options(
1996 &self,
1997 loadgen: &ast::LoadGenerator,
1998 ) -> Result<(), PlanError> {
1999 use mz_sql_parser::ast::LoadGeneratorOptionName::*;
2000
2001 let mut options = self.seen.clone();
2002
2003 let permitted_options: &[_] = match loadgen {
2004 ast::LoadGenerator::Auction => &[TickInterval, AsOf, UpTo],
2005 ast::LoadGenerator::Clock => &[TickInterval, AsOf, UpTo],
2006 ast::LoadGenerator::Counter => &[TickInterval, AsOf, UpTo, MaxCardinality],
2007 ast::LoadGenerator::Marketing => &[TickInterval, AsOf, UpTo],
2008 ast::LoadGenerator::Datums => &[TickInterval, AsOf, UpTo],
2009 ast::LoadGenerator::Tpch => &[TickInterval, AsOf, UpTo, ScaleFactor],
2010 ast::LoadGenerator::KeyValue => &[
2011 TickInterval,
2012 Keys,
2013 SnapshotRounds,
2014 TransactionalSnapshot,
2015 ValueSize,
2016 Seed,
2017 Partitions,
2018 BatchSize,
2019 ],
2020 };
2021
2022 for o in permitted_options {
2023 options.remove(o);
2024 }
2025
2026 if !options.is_empty() {
2027 sql_bail!(
2028 "{} load generators do not support {} values",
2029 loadgen,
2030 options.iter().join(", ")
2031 )
2032 }
2033
2034 Ok(())
2035 }
2036}
2037
2038pub(crate) fn load_generator_ast_to_generator(
2039 scx: &StatementContext,
2040 loadgen: &ast::LoadGenerator,
2041 options: &[LoadGeneratorOption<Aug>],
2042 include_metadata: &[SourceIncludeMetadata],
2043) -> Result<LoadGenerator, PlanError> {
2044 let extracted: LoadGeneratorOptionExtracted = options.to_vec().try_into()?;
2045 extracted.ensure_only_valid_options(loadgen)?;
2046
2047 if loadgen != &ast::LoadGenerator::KeyValue && !include_metadata.is_empty() {
2048 sql_bail!("INCLUDE metadata only supported with `KEY VALUE` load generators");
2049 }
2050
2051 let load_generator = match loadgen {
2052 ast::LoadGenerator::Auction => LoadGenerator::Auction,
2053 ast::LoadGenerator::Clock => {
2054 scx.require_feature_flag(&crate::session::vars::ENABLE_CLOCK_LOAD_GENERATOR)?;
2055 LoadGenerator::Clock
2056 }
2057 ast::LoadGenerator::Counter => {
2058 let LoadGeneratorOptionExtracted {
2059 max_cardinality, ..
2060 } = extracted;
2061 LoadGenerator::Counter { max_cardinality }
2062 }
2063 ast::LoadGenerator::Marketing => LoadGenerator::Marketing,
2064 ast::LoadGenerator::Datums => LoadGenerator::Datums,
2065 ast::LoadGenerator::Tpch => {
2066 let LoadGeneratorOptionExtracted { scale_factor, .. } = extracted;
2067
2068 let sf: f64 = scale_factor.unwrap_or(0.01);
2070 if !sf.is_finite() || sf < 0.0 {
2071 sql_bail!("unsupported scale factor {sf}");
2072 }
2073
2074 let f_to_i = |multiplier: f64| -> Result<i64, PlanError> {
2075 let total = (sf * multiplier).floor();
2076 let mut i = i64::try_cast_from(total)
2077 .ok_or_else(|| sql_err!("unsupported scale factor {sf}"))?;
2078 if i < 1 {
2079 i = 1;
2080 }
2081 Ok(i)
2082 };
2083
2084 let count_supplier = f_to_i(10_000f64)?;
2087 let count_part = f_to_i(200_000f64)?;
2088 let count_customer = f_to_i(150_000f64)?;
2089 let count_orders = f_to_i(150_000f64 * 10f64)?;
2090 let count_clerk = f_to_i(1_000f64)?;
2091
2092 LoadGenerator::Tpch {
2093 count_supplier,
2094 count_part,
2095 count_customer,
2096 count_orders,
2097 count_clerk,
2098 }
2099 }
2100 mz_sql_parser::ast::LoadGenerator::KeyValue => {
2101 scx.require_feature_flag(&vars::ENABLE_LOAD_GENERATOR_KEY_VALUE)?;
2102 let LoadGeneratorOptionExtracted {
2103 keys,
2104 snapshot_rounds,
2105 transactional_snapshot,
2106 value_size,
2107 tick_interval,
2108 seed,
2109 partitions,
2110 batch_size,
2111 ..
2112 } = extracted;
2113
2114 let mut include_offset = None;
2115 for im in include_metadata {
2116 match im {
2117 SourceIncludeMetadata::Offset { alias } => {
2118 include_offset = match alias {
2119 Some(alias) => Some(alias.to_string()),
2120 None => Some(LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT.to_string()),
2121 }
2122 }
2123 SourceIncludeMetadata::Key { .. } => continue,
2124
2125 _ => {
2126 sql_bail!("only `INCLUDE OFFSET` and `INCLUDE KEY` is supported");
2127 }
2128 };
2129 }
2130
2131 let lgkv = KeyValueLoadGenerator {
2132 keys: keys.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires KEYS"))?,
2133 snapshot_rounds: snapshot_rounds
2134 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SNAPSHOT ROUNDS"))?,
2135 transactional_snapshot: transactional_snapshot.unwrap_or(true),
2137 value_size: value_size
2138 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires VALUE SIZE"))?,
2139 partitions: partitions
2140 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires PARTITIONS"))?,
2141 tick_interval,
2142 batch_size: batch_size
2143 .ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires BATCH SIZE"))?,
2144 seed: seed.ok_or_else(|| sql_err!("LOAD GENERATOR KEY VALUE requires SEED"))?,
2145 include_offset,
2146 };
2147
2148 if lgkv.keys == 0
2149 || lgkv.partitions == 0
2150 || lgkv.value_size == 0
2151 || lgkv.batch_size == 0
2152 {
2153 sql_bail!("LOAD GENERATOR KEY VALUE options must be non-zero")
2154 }
2155
2156 if lgkv.keys % lgkv.partitions != 0 {
2157 sql_bail!("KEYS must be a multiple of PARTITIONS")
2158 }
2159
2160 if lgkv.batch_size > lgkv.keys {
2161 sql_bail!("KEYS must be larger than BATCH SIZE")
2162 }
2163
2164 if (lgkv.keys / lgkv.partitions) % lgkv.batch_size != 0 {
2167 sql_bail!("PARTITIONS * BATCH SIZE must be a divisor of KEYS")
2168 }
2169
2170 if lgkv.snapshot_rounds == 0 {
2171 sql_bail!("SNAPSHOT ROUNDS must be larger than 0")
2172 }
2173
2174 LoadGenerator::KeyValue(lgkv)
2175 }
2176 };
2177
2178 Ok(load_generator)
2179}
2180
2181fn typecheck_debezium(value_desc: &RelationDesc) -> Result<(Option<usize>, usize), PlanError> {
2182 let before = value_desc.get_by_name(&"before".into());
2183 let (after_idx, after_ty) = value_desc
2184 .get_by_name(&"after".into())
2185 .ok_or_else(|| sql_err!("'after' column missing from debezium input"))?;
2186 let before_idx = if let Some((before_idx, before_ty)) = before {
2187 if !matches!(before_ty.scalar_type, ScalarType::Record { .. }) {
2188 sql_bail!("'before' column must be of type record");
2189 }
2190 if before_ty != after_ty {
2191 sql_bail!("'before' type differs from 'after' column");
2192 }
2193 Some(before_idx)
2194 } else {
2195 None
2196 };
2197 Ok((before_idx, after_idx))
2198}
2199
2200fn get_encoding(
2201 scx: &StatementContext,
2202 format: &FormatSpecifier<Aug>,
2203 envelope: &ast::SourceEnvelope,
2204) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2205 let encoding = match format {
2206 FormatSpecifier::Bare(format) => get_encoding_inner(scx, format)?,
2207 FormatSpecifier::KeyValue { key, value } => {
2208 let key = {
2209 let encoding = get_encoding_inner(scx, key)?;
2210 Some(encoding.key.unwrap_or(encoding.value))
2211 };
2212 let value = get_encoding_inner(scx, value)?.value;
2213 SourceDataEncoding { key, value }
2214 }
2215 };
2216
2217 let requires_keyvalue = matches!(
2218 envelope,
2219 ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. }
2220 );
2221 let is_keyvalue = encoding.key.is_some();
2222 if requires_keyvalue && !is_keyvalue {
2223 sql_bail!("ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified");
2224 };
2225
2226 Ok(encoding)
2227}
2228
2229fn source_sink_cluster_config<'a, 'ctx>(
2235 scx: &'a StatementContext<'ctx>,
2236 in_cluster: &mut Option<ResolvedClusterName>,
2237) -> Result<&'a dyn CatalogCluster<'ctx>, PlanError> {
2238 let cluster = match in_cluster {
2239 None => {
2240 let cluster = scx.catalog.resolve_cluster(None)?;
2241 *in_cluster = Some(ResolvedClusterName {
2242 id: cluster.id(),
2243 print_name: None,
2244 });
2245 cluster
2246 }
2247 Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
2248 };
2249
2250 Ok(cluster)
2251}
2252
2253generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default(true)));
2254
2255#[derive(Debug)]
2256pub struct Schema {
2257 pub key_schema: Option<String>,
2258 pub value_schema: String,
2259 pub csr_connection: Option<<ReferencedConnection as ConnectionAccess>::Csr>,
2260 pub confluent_wire_format: bool,
2261}
2262
2263fn get_encoding_inner(
2264 scx: &StatementContext,
2265 format: &Format<Aug>,
2266) -> Result<SourceDataEncoding<ReferencedConnection>, PlanError> {
2267 let value = match format {
2268 Format::Bytes => DataEncoding::Bytes,
2269 Format::Avro(schema) => {
2270 let Schema {
2271 key_schema,
2272 value_schema,
2273 csr_connection,
2274 confluent_wire_format,
2275 } = match schema {
2276 AvroSchema::InlineSchema {
2279 schema: ast::Schema { schema },
2280 with_options,
2281 } => {
2282 let AvroSchemaOptionExtracted {
2283 confluent_wire_format,
2284 ..
2285 } = with_options.clone().try_into()?;
2286
2287 Schema {
2288 key_schema: None,
2289 value_schema: schema.clone(),
2290 csr_connection: None,
2291 confluent_wire_format,
2292 }
2293 }
2294 AvroSchema::Csr {
2295 csr_connection:
2296 CsrConnectionAvro {
2297 connection,
2298 seed,
2299 key_strategy: _,
2300 value_strategy: _,
2301 },
2302 } => {
2303 let item = scx.get_item_by_resolved_name(&connection.connection)?;
2304 let csr_connection = match item.connection()? {
2305 Connection::Csr(_) => item.id(),
2306 _ => {
2307 sql_bail!(
2308 "{} is not a schema registry connection",
2309 scx.catalog
2310 .resolve_full_name(item.name())
2311 .to_string()
2312 .quoted()
2313 )
2314 }
2315 };
2316
2317 if let Some(seed) = seed {
2318 Schema {
2319 key_schema: seed.key_schema.clone(),
2320 value_schema: seed.value_schema.clone(),
2321 csr_connection: Some(csr_connection),
2322 confluent_wire_format: true,
2323 }
2324 } else {
2325 unreachable!("CSR seed resolution should already have been called: Avro")
2326 }
2327 }
2328 };
2329
2330 if let Some(key_schema) = key_schema {
2331 return Ok(SourceDataEncoding {
2332 key: Some(DataEncoding::Avro(AvroEncoding {
2333 schema: key_schema,
2334 csr_connection: csr_connection.clone(),
2335 confluent_wire_format,
2336 })),
2337 value: DataEncoding::Avro(AvroEncoding {
2338 schema: value_schema,
2339 csr_connection,
2340 confluent_wire_format,
2341 }),
2342 });
2343 } else {
2344 DataEncoding::Avro(AvroEncoding {
2345 schema: value_schema,
2346 csr_connection,
2347 confluent_wire_format,
2348 })
2349 }
2350 }
2351 Format::Protobuf(schema) => match schema {
2352 ProtobufSchema::Csr {
2353 csr_connection:
2354 CsrConnectionProtobuf {
2355 connection:
2356 CsrConnection {
2357 connection,
2358 options,
2359 },
2360 seed,
2361 },
2362 } => {
2363 if let Some(CsrSeedProtobuf { key, value }) = seed {
2364 let item = scx.get_item_by_resolved_name(connection)?;
2365 let _ = match item.connection()? {
2366 Connection::Csr(connection) => connection,
2367 _ => {
2368 sql_bail!(
2369 "{} is not a schema registry connection",
2370 scx.catalog
2371 .resolve_full_name(item.name())
2372 .to_string()
2373 .quoted()
2374 )
2375 }
2376 };
2377
2378 if !options.is_empty() {
2379 sql_bail!("Protobuf CSR connections do not support any options");
2380 }
2381
2382 let value = DataEncoding::Protobuf(ProtobufEncoding {
2383 descriptors: strconv::parse_bytes(&value.schema)?,
2384 message_name: value.message_name.clone(),
2385 confluent_wire_format: true,
2386 });
2387 if let Some(key) = key {
2388 return Ok(SourceDataEncoding {
2389 key: Some(DataEncoding::Protobuf(ProtobufEncoding {
2390 descriptors: strconv::parse_bytes(&key.schema)?,
2391 message_name: key.message_name.clone(),
2392 confluent_wire_format: true,
2393 })),
2394 value,
2395 });
2396 }
2397 value
2398 } else {
2399 unreachable!("CSR seed resolution should already have been called: Proto")
2400 }
2401 }
2402 ProtobufSchema::InlineSchema {
2403 message_name,
2404 schema: ast::Schema { schema },
2405 } => {
2406 let descriptors = strconv::parse_bytes(schema)?;
2407
2408 DataEncoding::Protobuf(ProtobufEncoding {
2409 descriptors,
2410 message_name: message_name.to_owned(),
2411 confluent_wire_format: false,
2412 })
2413 }
2414 },
2415 Format::Regex(regex) => DataEncoding::Regex(RegexEncoding {
2416 regex: mz_repr::adt::regex::Regex::new(regex, false)
2417 .map_err(|e| sql_err!("parsing regex: {e}"))?,
2418 }),
2419 Format::Csv { columns, delimiter } => {
2420 let columns = match columns {
2421 CsvColumns::Header { names } => {
2422 if names.is_empty() {
2423 sql_bail!("[internal error] column spec should get names in purify")
2424 }
2425 ColumnSpec::Header {
2426 names: names.iter().cloned().map(|n| n.into_string()).collect(),
2427 }
2428 }
2429 CsvColumns::Count(n) => ColumnSpec::Count(usize::cast_from(*n)),
2430 };
2431 DataEncoding::Csv(CsvEncoding {
2432 columns,
2433 delimiter: u8::try_from(*delimiter)
2434 .map_err(|_| sql_err!("CSV delimiter must be an ASCII character"))?,
2435 })
2436 }
2437 Format::Json { array: false } => DataEncoding::Json,
2438 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sources"),
2439 Format::Text => DataEncoding::Text,
2440 };
2441 Ok(SourceDataEncoding { key: None, value })
2442}
2443
2444fn get_key_envelope(
2446 included_items: &[SourceIncludeMetadata],
2447 encoding: Option<&SourceDataEncoding<ReferencedConnection>>,
2448 key_envelope_no_encoding: bool,
2449) -> Result<KeyEnvelope, PlanError> {
2450 let key_definition = included_items
2451 .iter()
2452 .find(|i| matches!(i, SourceIncludeMetadata::Key { .. }));
2453 if let Some(SourceIncludeMetadata::Key { alias }) = key_definition {
2454 match (alias, encoding.and_then(|e| e.key.as_ref())) {
2455 (Some(name), Some(_)) => Ok(KeyEnvelope::Named(name.as_str().to_string())),
2456 (None, Some(key)) => get_unnamed_key_envelope(Some(key)),
2457 (Some(name), _) if key_envelope_no_encoding => {
2458 Ok(KeyEnvelope::Named(name.as_str().to_string()))
2459 }
2460 (None, _) if key_envelope_no_encoding => get_unnamed_key_envelope(None),
2461 (_, None) => {
2462 sql_bail!(
2466 "INCLUDE KEY requires specifying KEY FORMAT .. VALUE FORMAT, \
2467 got bare FORMAT"
2468 );
2469 }
2470 }
2471 } else {
2472 Ok(KeyEnvelope::None)
2473 }
2474}
2475
2476fn get_unnamed_key_envelope(
2479 key: Option<&DataEncoding<ReferencedConnection>>,
2480) -> Result<KeyEnvelope, PlanError> {
2481 let is_composite = match key {
2485 Some(DataEncoding::Bytes | DataEncoding::Json | DataEncoding::Text) => false,
2486 Some(
2487 DataEncoding::Avro(_)
2488 | DataEncoding::Csv(_)
2489 | DataEncoding::Protobuf(_)
2490 | DataEncoding::Regex { .. },
2491 ) => true,
2492 None => false,
2493 };
2494
2495 if is_composite {
2496 Ok(KeyEnvelope::Flattened)
2497 } else {
2498 Ok(KeyEnvelope::Named("key".to_string()))
2499 }
2500}
2501
2502pub fn describe_create_view(
2503 _: &StatementContext,
2504 _: CreateViewStatement<Aug>,
2505) -> Result<StatementDesc, PlanError> {
2506 Ok(StatementDesc::new(None))
2507}
2508
2509pub fn plan_view(
2510 scx: &StatementContext,
2511 def: &mut ViewDefinition<Aug>,
2512 temporary: bool,
2513) -> Result<(QualifiedItemName, View), PlanError> {
2514 let create_sql = normalize::create_statement(
2515 scx,
2516 Statement::CreateView(CreateViewStatement {
2517 if_exists: IfExistsBehavior::Error,
2518 temporary,
2519 definition: def.clone(),
2520 }),
2521 )?;
2522
2523 let ViewDefinition {
2524 name,
2525 columns,
2526 query,
2527 } = def;
2528
2529 let query::PlannedRootQuery {
2530 expr,
2531 mut desc,
2532 finishing,
2533 scope: _,
2534 } = query::plan_root_query(scx, query.clone(), QueryLifetime::View)?;
2535 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2541 &finishing,
2542 expr.arity()
2543 ));
2544 if expr.contains_parameters()? {
2545 return Err(PlanError::ParameterNotAllowed("views".to_string()));
2546 }
2547
2548 let dependencies = expr
2549 .depends_on()
2550 .into_iter()
2551 .map(|gid| scx.catalog.resolve_item_id(&gid))
2552 .collect();
2553
2554 let name = if temporary {
2555 scx.allocate_temporary_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2556 } else {
2557 scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?
2558 };
2559
2560 plan_utils::maybe_rename_columns(
2561 format!("view {}", scx.catalog.resolve_full_name(&name)),
2562 &mut desc,
2563 columns,
2564 )?;
2565 let names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2566
2567 if let Some(dup) = names.iter().duplicates().next() {
2568 sql_bail!("column {} specified more than once", dup.quoted());
2569 }
2570
2571 let view = View {
2572 create_sql,
2573 expr,
2574 dependencies,
2575 column_names: names,
2576 temporary,
2577 };
2578
2579 Ok((name, view))
2580}
2581
2582pub fn plan_create_view(
2583 scx: &StatementContext,
2584 mut stmt: CreateViewStatement<Aug>,
2585) -> Result<Plan, PlanError> {
2586 let CreateViewStatement {
2587 temporary,
2588 if_exists,
2589 definition,
2590 } = &mut stmt;
2591 let (name, view) = plan_view(scx, definition, *temporary)?;
2592
2593 let ignore_if_exists_errors = scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors);
2596
2597 let replace = if *if_exists == IfExistsBehavior::Replace && !ignore_if_exists_errors {
2598 let if_exists = true;
2599 let cascade = false;
2600 let maybe_item_to_drop = plan_drop_item(
2601 scx,
2602 ObjectType::View,
2603 if_exists,
2604 definition.name.clone(),
2605 cascade,
2606 )?;
2607
2608 if let Some(id) = maybe_item_to_drop {
2610 let dependencies = view.expr.depends_on();
2611 let invalid_drop = scx
2612 .get_item(&id)
2613 .global_ids()
2614 .any(|gid| dependencies.contains(&gid));
2615 if invalid_drop {
2616 let item = scx.catalog.get_item(&id);
2617 sql_bail!(
2618 "cannot replace view {0}: depended upon by new {0} definition",
2619 scx.catalog.resolve_full_name(item.name())
2620 );
2621 }
2622
2623 Some(id)
2624 } else {
2625 None
2626 }
2627 } else {
2628 None
2629 };
2630 let drop_ids = replace
2631 .map(|id| {
2632 scx.catalog
2633 .item_dependents(id)
2634 .into_iter()
2635 .map(|id| id.unwrap_item_id())
2636 .collect()
2637 })
2638 .unwrap_or_default();
2639
2640 let full_name = scx.catalog.resolve_full_name(&name);
2642 let partial_name = PartialItemName::from(full_name.clone());
2643 if let (Ok(item), IfExistsBehavior::Error, false) = (
2646 scx.catalog.resolve_item_or_type(&partial_name),
2647 *if_exists,
2648 ignore_if_exists_errors,
2649 ) {
2650 return Err(PlanError::ItemAlreadyExists {
2651 name: full_name.to_string(),
2652 item_type: item.item_type(),
2653 });
2654 }
2655
2656 Ok(Plan::CreateView(CreateViewPlan {
2657 name,
2658 view,
2659 replace,
2660 drop_ids,
2661 if_not_exists: *if_exists == IfExistsBehavior::Skip,
2662 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2663 }))
2664}
2665
2666pub fn describe_create_materialized_view(
2667 _: &StatementContext,
2668 _: CreateMaterializedViewStatement<Aug>,
2669) -> Result<StatementDesc, PlanError> {
2670 Ok(StatementDesc::new(None))
2671}
2672
2673pub fn describe_create_continual_task(
2674 _: &StatementContext,
2675 _: CreateContinualTaskStatement<Aug>,
2676) -> Result<StatementDesc, PlanError> {
2677 Ok(StatementDesc::new(None))
2678}
2679
2680pub fn describe_create_network_policy(
2681 _: &StatementContext,
2682 _: CreateNetworkPolicyStatement<Aug>,
2683) -> Result<StatementDesc, PlanError> {
2684 Ok(StatementDesc::new(None))
2685}
2686
2687pub fn describe_alter_network_policy(
2688 _: &StatementContext,
2689 _: AlterNetworkPolicyStatement<Aug>,
2690) -> Result<StatementDesc, PlanError> {
2691 Ok(StatementDesc::new(None))
2692}
2693
2694pub fn plan_create_materialized_view(
2695 scx: &StatementContext,
2696 mut stmt: CreateMaterializedViewStatement<Aug>,
2697) -> Result<Plan, PlanError> {
2698 let cluster_id =
2699 crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
2700 stmt.in_cluster = Some(ResolvedClusterName {
2701 id: cluster_id,
2702 print_name: None,
2703 });
2704
2705 let create_sql =
2706 normalize::create_statement(scx, Statement::CreateMaterializedView(stmt.clone()))?;
2707
2708 let partial_name = normalize::unresolved_item_name(stmt.name)?;
2709 let name = scx.allocate_qualified_name(partial_name.clone())?;
2710
2711 let query::PlannedRootQuery {
2712 expr,
2713 mut desc,
2714 finishing,
2715 scope: _,
2716 } = query::plan_root_query(scx, stmt.query, QueryLifetime::MaterializedView)?;
2717 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
2719 &finishing,
2720 expr.arity()
2721 ));
2722 if expr.contains_parameters()? {
2723 return Err(PlanError::ParameterNotAllowed(
2724 "materialized views".to_string(),
2725 ));
2726 }
2727
2728 plan_utils::maybe_rename_columns(
2729 format!("materialized view {}", scx.catalog.resolve_full_name(&name)),
2730 &mut desc,
2731 &stmt.columns,
2732 )?;
2733 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
2734
2735 let MaterializedViewOptionExtracted {
2736 assert_not_null,
2737 partition_by,
2738 retain_history,
2739 refresh,
2740 seen: _,
2741 }: MaterializedViewOptionExtracted = stmt.with_options.try_into()?;
2742
2743 if let Some(partition_by) = partition_by {
2744 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
2745 check_partition_by(&desc, partition_by)?;
2746 }
2747
2748 let refresh_schedule = {
2749 let mut refresh_schedule = RefreshSchedule::default();
2750 let mut on_commits_seen = 0;
2751 for refresh_option_value in refresh {
2752 if !matches!(refresh_option_value, RefreshOptionValue::OnCommit) {
2753 scx.require_feature_flag(&ENABLE_REFRESH_EVERY_MVS)?;
2754 }
2755 match refresh_option_value {
2756 RefreshOptionValue::OnCommit => {
2757 on_commits_seen += 1;
2758 }
2759 RefreshOptionValue::AtCreation => {
2760 soft_panic_or_log!("REFRESH AT CREATION should have been purified away");
2761 sql_bail!("INTERNAL ERROR: REFRESH AT CREATION should have been purified away")
2762 }
2763 RefreshOptionValue::At(RefreshAtOptionValue { mut time }) => {
2764 transform_ast::transform(scx, &mut time)?; let ecx = &ExprContext {
2766 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2767 name: "REFRESH AT",
2768 scope: &Scope::empty(),
2769 relation_type: &RelationType::empty(),
2770 allow_aggregates: false,
2771 allow_subqueries: false,
2772 allow_parameters: false,
2773 allow_windows: false,
2774 };
2775 let hir = plan_expr(ecx, &time)?.cast_to(
2776 ecx,
2777 CastContext::Assignment,
2778 &ScalarType::MzTimestamp,
2779 )?;
2780 let timestamp = hir
2782 .into_literal_mz_timestamp()
2783 .ok_or_else(|| PlanError::InvalidRefreshAt)?;
2784 refresh_schedule.ats.push(timestamp);
2785 }
2786 RefreshOptionValue::Every(RefreshEveryOptionValue {
2787 interval,
2788 aligned_to,
2789 }) => {
2790 let interval = Interval::try_from_value(Value::Interval(interval))?;
2791 if interval.as_microseconds() <= 0 {
2792 sql_bail!("REFRESH interval must be positive; got: {}", interval);
2793 }
2794 if interval.months != 0 {
2795 sql_bail!("REFRESH interval must not involve units larger than days");
2800 }
2801 let interval = interval.duration()?;
2802 if u64::try_from(interval.as_millis()).is_err() {
2803 sql_bail!("REFRESH interval too large");
2804 }
2805
2806 let mut aligned_to = match aligned_to {
2807 Some(aligned_to) => aligned_to,
2808 None => {
2809 soft_panic_or_log!(
2810 "ALIGNED TO should have been filled in by purification"
2811 );
2812 sql_bail!(
2813 "INTERNAL ERROR: ALIGNED TO should have been filled in by purification"
2814 )
2815 }
2816 };
2817
2818 transform_ast::transform(scx, &mut aligned_to)?;
2820
2821 let ecx = &ExprContext {
2822 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
2823 name: "REFRESH EVERY ... ALIGNED TO",
2824 scope: &Scope::empty(),
2825 relation_type: &RelationType::empty(),
2826 allow_aggregates: false,
2827 allow_subqueries: false,
2828 allow_parameters: false,
2829 allow_windows: false,
2830 };
2831 let aligned_to_hir = plan_expr(ecx, &aligned_to)?.cast_to(
2832 ecx,
2833 CastContext::Assignment,
2834 &ScalarType::MzTimestamp,
2835 )?;
2836 let aligned_to_const = aligned_to_hir
2838 .into_literal_mz_timestamp()
2839 .ok_or_else(|| PlanError::InvalidRefreshEveryAlignedTo)?;
2840
2841 refresh_schedule.everies.push(RefreshEvery {
2842 interval,
2843 aligned_to: aligned_to_const,
2844 });
2845 }
2846 }
2847 }
2848
2849 if on_commits_seen > 1 {
2850 sql_bail!("REFRESH ON COMMIT cannot be specified multiple times");
2851 }
2852 if on_commits_seen > 0 && refresh_schedule != RefreshSchedule::default() {
2853 sql_bail!("REFRESH ON COMMIT is not compatible with any of the other REFRESH options");
2854 }
2855
2856 if refresh_schedule == RefreshSchedule::default() {
2857 None
2858 } else {
2859 Some(refresh_schedule)
2860 }
2861 };
2862
2863 let as_of = stmt.as_of.map(Timestamp::from);
2864 let compaction_window = plan_retain_history_option(scx, retain_history)?;
2865 let mut non_null_assertions = assert_not_null
2866 .into_iter()
2867 .map(normalize::column_name)
2868 .map(|assertion_name| {
2869 column_names
2870 .iter()
2871 .position(|col| col == &assertion_name)
2872 .ok_or_else(|| {
2873 sql_err!(
2874 "column {} in ASSERT NOT NULL option not found",
2875 assertion_name.quoted()
2876 )
2877 })
2878 })
2879 .collect::<Result<Vec<_>, _>>()?;
2880 non_null_assertions.sort();
2881 if let Some(dup) = non_null_assertions.iter().duplicates().next() {
2882 let dup = &column_names[*dup];
2883 sql_bail!("duplicate column {} in non-null assertions", dup.quoted());
2884 }
2885
2886 if let Some(dup) = column_names.iter().duplicates().next() {
2887 sql_bail!("column {} specified more than once", dup.quoted());
2888 }
2889
2890 let if_exists = match scx.pcx().map(|pcx| pcx.ignore_if_exists_errors) {
2893 Ok(true) => IfExistsBehavior::Skip,
2894 _ => stmt.if_exists,
2895 };
2896
2897 let mut replace = None;
2898 let mut if_not_exists = false;
2899 match if_exists {
2900 IfExistsBehavior::Replace => {
2901 let if_exists = true;
2902 let cascade = false;
2903 let replace_id = plan_drop_item(
2904 scx,
2905 ObjectType::MaterializedView,
2906 if_exists,
2907 partial_name.into(),
2908 cascade,
2909 )?;
2910
2911 if let Some(id) = replace_id {
2913 let dependencies = expr.depends_on();
2914 let invalid_drop = scx
2915 .get_item(&id)
2916 .global_ids()
2917 .any(|gid| dependencies.contains(&gid));
2918 if invalid_drop {
2919 let item = scx.catalog.get_item(&id);
2920 sql_bail!(
2921 "cannot replace materialized view {0}: depended upon by new {0} definition",
2922 scx.catalog.resolve_full_name(item.name())
2923 );
2924 }
2925 replace = Some(id);
2926 }
2927 }
2928 IfExistsBehavior::Skip => if_not_exists = true,
2929 IfExistsBehavior::Error => (),
2930 }
2931 let drop_ids = replace
2932 .map(|id| {
2933 scx.catalog
2934 .item_dependents(id)
2935 .into_iter()
2936 .map(|id| id.unwrap_item_id())
2937 .collect()
2938 })
2939 .unwrap_or_default();
2940 let dependencies = expr
2941 .depends_on()
2942 .into_iter()
2943 .map(|gid| scx.catalog.resolve_item_id(&gid))
2944 .collect();
2945
2946 let full_name = scx.catalog.resolve_full_name(&name);
2948 let partial_name = PartialItemName::from(full_name.clone());
2949 if let (IfExistsBehavior::Error, Ok(item)) =
2952 (if_exists, scx.catalog.resolve_item_or_type(&partial_name))
2953 {
2954 return Err(PlanError::ItemAlreadyExists {
2955 name: full_name.to_string(),
2956 item_type: item.item_type(),
2957 });
2958 }
2959
2960 Ok(Plan::CreateMaterializedView(CreateMaterializedViewPlan {
2961 name,
2962 materialized_view: MaterializedView {
2963 create_sql,
2964 expr,
2965 dependencies,
2966 column_names,
2967 cluster_id,
2968 non_null_assertions,
2969 compaction_window,
2970 refresh_schedule,
2971 as_of,
2972 },
2973 replace,
2974 drop_ids,
2975 if_not_exists,
2976 ambiguous_columns: *scx.ambiguous_columns.borrow(),
2977 }))
2978}
2979
2980generate_extracted_config!(
2981 MaterializedViewOption,
2982 (AssertNotNull, Ident, AllowMultiple),
2983 (PartitionBy, Vec<Ident>),
2984 (RetainHistory, OptionalDuration),
2985 (Refresh, RefreshOptionValue<Aug>, AllowMultiple)
2986);
2987
2988pub fn plan_create_continual_task(
2989 scx: &StatementContext,
2990 mut stmt: CreateContinualTaskStatement<Aug>,
2991) -> Result<Plan, PlanError> {
2992 match &stmt.sugar {
2993 None => scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_CREATE)?,
2994 Some(ast::CreateContinualTaskSugar::Transform { .. }) => {
2995 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_TRANSFORM)?
2996 }
2997 Some(ast::CreateContinualTaskSugar::Retain { .. }) => {
2998 scx.require_feature_flag(&vars::ENABLE_CONTINUAL_TASK_RETAIN)?
2999 }
3000 };
3001 let cluster_id = match &stmt.in_cluster {
3002 None => scx.catalog.resolve_cluster(None)?.id(),
3003 Some(in_cluster) => in_cluster.id,
3004 };
3005 stmt.in_cluster = Some(ResolvedClusterName {
3006 id: cluster_id,
3007 print_name: None,
3008 });
3009
3010 let create_sql =
3011 normalize::create_statement(scx, Statement::CreateContinualTask(stmt.clone()))?;
3012
3013 let ContinualTaskOptionExtracted { snapshot, seen: _ } = stmt.with_options.try_into()?;
3014
3015 let mut desc = match stmt.columns {
3020 None => None,
3021 Some(columns) => {
3022 let mut desc_columns = Vec::with_capacity(columns.capacity());
3023 for col in columns.iter() {
3024 desc_columns.push((
3025 normalize::column_name(col.name.clone()),
3026 ColumnType {
3027 scalar_type: scalar_type_from_sql(scx, &col.data_type)?,
3028 nullable: false,
3029 },
3030 ));
3031 }
3032 Some(RelationDesc::from_names_and_types(desc_columns))
3033 }
3034 };
3035 let input = scx.get_item_by_resolved_name(&stmt.input)?;
3036 match input.item_type() {
3037 CatalogItemType::ContinualTask
3040 | CatalogItemType::Table
3041 | CatalogItemType::MaterializedView
3042 | CatalogItemType::Source => {}
3043 CatalogItemType::Sink
3044 | CatalogItemType::View
3045 | CatalogItemType::Index
3046 | CatalogItemType::Type
3047 | CatalogItemType::Func
3048 | CatalogItemType::Secret
3049 | CatalogItemType::Connection => {
3050 sql_bail!(
3051 "CONTINUAL TASK cannot use {} as an input",
3052 input.item_type()
3053 );
3054 }
3055 }
3056
3057 let mut qcx = QueryContext::root(scx, QueryLifetime::MaterializedView);
3058 let ct_name = stmt.name;
3059 let placeholder_id = match &ct_name {
3060 ResolvedItemName::ContinualTask { id, name } => {
3061 let desc = match desc.as_ref().cloned() {
3062 Some(x) => x,
3063 None => {
3064 let input_name = scx.catalog.resolve_full_name(input.name());
3069 input.desc(&input_name)?.into_owned()
3070 }
3071 };
3072 qcx.ctes.insert(
3073 *id,
3074 CteDesc {
3075 name: name.item.clone(),
3076 desc,
3077 },
3078 );
3079 Some(*id)
3080 }
3081 _ => None,
3082 };
3083
3084 let mut exprs = Vec::new();
3085 for (idx, stmt) in stmt.stmts.iter().enumerate() {
3086 let query = continual_task_query(&ct_name, stmt).ok_or_else(|| sql_err!("TODO(ct3)"))?;
3087 let query::PlannedRootQuery {
3088 expr,
3089 desc: desc_query,
3090 finishing,
3091 scope: _,
3092 } = query::plan_ct_query(&mut qcx, query)?;
3093 assert!(HirRelationExpr::is_trivial_row_set_finishing_hir(
3096 &finishing,
3097 expr.arity()
3098 ));
3099 if expr.contains_parameters()? {
3100 if expr.contains_parameters()? {
3101 return Err(PlanError::ParameterNotAllowed(
3102 "continual tasks".to_string(),
3103 ));
3104 }
3105 }
3106 let expr = match desc.as_mut() {
3107 None => {
3108 desc = Some(desc_query);
3109 expr
3110 }
3111 Some(desc) => {
3112 if desc_query.arity() > desc.arity() {
3115 sql_bail!(
3116 "statement {}: INSERT has more expressions than target columns",
3117 idx
3118 );
3119 }
3120 if desc_query.arity() < desc.arity() {
3121 sql_bail!(
3122 "statement {}: INSERT has more target columns than expressions",
3123 idx
3124 );
3125 }
3126 let target_types = desc.iter_types().map(|x| &x.scalar_type);
3129 let expr = cast_relation(&qcx, CastContext::Assignment, expr, target_types);
3130 let expr = expr.map_err(|e| {
3131 sql_err!(
3132 "statement {}: column {} is of type {} but expression is of type {}",
3133 idx,
3134 desc.get_name(e.column).quoted(),
3135 qcx.humanize_scalar_type(&e.target_type, false),
3136 qcx.humanize_scalar_type(&e.source_type, false),
3137 )
3138 })?;
3139
3140 let zip_types = || desc.iter_types().zip(desc_query.iter_types());
3143 let updated = zip_types().any(|(ct, q)| q.nullable && !ct.nullable);
3144 if updated {
3145 let new_types = zip_types().map(|(ct, q)| {
3146 let mut ct = ct.clone();
3147 if q.nullable {
3148 ct.nullable = true;
3149 }
3150 ct
3151 });
3152 *desc = RelationDesc::from_names_and_types(
3153 desc.iter_names().cloned().zip(new_types),
3154 );
3155 }
3156
3157 expr
3158 }
3159 };
3160 match stmt {
3161 ast::ContinualTaskStmt::Insert(_) => exprs.push(expr),
3162 ast::ContinualTaskStmt::Delete(_) => exprs.push(expr.negate()),
3163 }
3164 }
3165 let expr = exprs
3168 .into_iter()
3169 .reduce(|acc, expr| acc.union(expr))
3170 .ok_or_else(|| sql_err!("TODO(ct3)"))?;
3171 let dependencies = expr
3172 .depends_on()
3173 .into_iter()
3174 .map(|gid| scx.catalog.resolve_item_id(&gid))
3175 .collect();
3176
3177 let desc = desc.ok_or_else(|| sql_err!("TODO(ct3)"))?;
3178 let column_names: Vec<ColumnName> = desc.iter_names().cloned().collect();
3179 if let Some(dup) = column_names.iter().duplicates().next() {
3180 sql_bail!("column {} specified more than once", dup.quoted());
3181 }
3182
3183 let name = match &ct_name {
3185 ResolvedItemName::Item { id, .. } => scx.catalog.get_item(id).name().clone(),
3186 ResolvedItemName::ContinualTask { name, .. } => {
3187 let name = scx.allocate_qualified_name(name.clone())?;
3188 let full_name = scx.catalog.resolve_full_name(&name);
3189 let partial_name = PartialItemName::from(full_name.clone());
3190 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
3193 return Err(PlanError::ItemAlreadyExists {
3194 name: full_name.to_string(),
3195 item_type: item.item_type(),
3196 });
3197 }
3198 name
3199 }
3200 ResolvedItemName::Cte { .. } => unreachable!("name should not resolve to a CTE"),
3201 ResolvedItemName::Error => unreachable!("error should be returned in name resolution"),
3202 };
3203
3204 let as_of = stmt.as_of.map(Timestamp::from);
3205 Ok(Plan::CreateContinualTask(CreateContinualTaskPlan {
3206 name,
3207 placeholder_id,
3208 desc,
3209 input_id: input.global_id(),
3210 with_snapshot: snapshot.unwrap_or(true),
3211 continual_task: MaterializedView {
3212 create_sql,
3213 expr,
3214 dependencies,
3215 column_names,
3216 cluster_id,
3217 non_null_assertions: Vec::new(),
3218 compaction_window: None,
3219 refresh_schedule: None,
3220 as_of,
3221 },
3222 }))
3223}
3224
3225fn continual_task_query<'a>(
3226 ct_name: &ResolvedItemName,
3227 stmt: &'a ast::ContinualTaskStmt<Aug>,
3228) -> Option<ast::Query<Aug>> {
3229 match stmt {
3230 ast::ContinualTaskStmt::Insert(ast::InsertStatement {
3231 table_name: _,
3232 columns,
3233 source,
3234 returning,
3235 }) => {
3236 if !columns.is_empty() || !returning.is_empty() {
3237 return None;
3238 }
3239 match source {
3240 ast::InsertSource::Query(query) => Some(query.clone()),
3241 ast::InsertSource::DefaultValues => None,
3242 }
3243 }
3244 ast::ContinualTaskStmt::Delete(ast::DeleteStatement {
3245 table_name: _,
3246 alias,
3247 using,
3248 selection,
3249 }) => {
3250 if !using.is_empty() {
3251 return None;
3252 }
3253 let from = ast::TableWithJoins {
3255 relation: ast::TableFactor::Table {
3256 name: ct_name.clone(),
3257 alias: alias.clone(),
3258 },
3259 joins: Vec::new(),
3260 };
3261 let select = ast::Select {
3262 from: vec![from],
3263 selection: selection.clone(),
3264 distinct: None,
3265 projection: vec![ast::SelectItem::Wildcard],
3266 group_by: Vec::new(),
3267 having: None,
3268 qualify: None,
3269 options: Vec::new(),
3270 };
3271 let query = ast::Query {
3272 ctes: ast::CteBlock::Simple(Vec::new()),
3273 body: ast::SetExpr::Select(Box::new(select)),
3274 order_by: Vec::new(),
3275 limit: None,
3276 offset: None,
3277 };
3278 Some(query)
3280 }
3281 }
3282}
3283
3284generate_extracted_config!(ContinualTaskOption, (Snapshot, bool));
3285
3286pub fn describe_create_sink(
3287 _: &StatementContext,
3288 _: CreateSinkStatement<Aug>,
3289) -> Result<StatementDesc, PlanError> {
3290 Ok(StatementDesc::new(None))
3291}
3292
3293generate_extracted_config!(
3294 CreateSinkOption,
3295 (Snapshot, bool),
3296 (PartitionStrategy, String),
3297 (Version, u64)
3298);
3299
3300pub fn plan_create_sink(
3301 scx: &StatementContext,
3302 stmt: CreateSinkStatement<Aug>,
3303) -> Result<Plan, PlanError> {
3304 let Some(name) = stmt.name.clone() else {
3306 return Err(PlanError::MissingName(CatalogItemType::Sink));
3307 };
3308 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3309 let full_name = scx.catalog.resolve_full_name(&name);
3310 let partial_name = PartialItemName::from(full_name.clone());
3311 if let (false, Ok(item)) = (stmt.if_not_exists, scx.catalog.resolve_item(&partial_name)) {
3312 return Err(PlanError::ItemAlreadyExists {
3313 name: full_name.to_string(),
3314 item_type: item.item_type(),
3315 });
3316 }
3317
3318 plan_sink(scx, stmt)
3319}
3320
3321fn plan_sink(
3326 scx: &StatementContext,
3327 mut stmt: CreateSinkStatement<Aug>,
3328) -> Result<Plan, PlanError> {
3329 let CreateSinkStatement {
3330 name,
3331 in_cluster: _,
3332 from,
3333 connection,
3334 format,
3335 envelope,
3336 if_not_exists,
3337 with_options,
3338 } = stmt.clone();
3339
3340 let Some(name) = name else {
3341 return Err(PlanError::MissingName(CatalogItemType::Sink));
3342 };
3343 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
3344
3345 let envelope = match envelope {
3346 Some(ast::SinkEnvelope::Upsert) => SinkEnvelope::Upsert,
3347 Some(ast::SinkEnvelope::Debezium) => SinkEnvelope::Debezium,
3348 None => sql_bail!("ENVELOPE clause is required"),
3349 };
3350
3351 let from_name = &from;
3352 let from = scx.get_item_by_resolved_name(&from)?;
3353 if from.id().is_system() {
3354 bail_unsupported!("creating a sink directly on a catalog object");
3355 }
3356 let desc = from.desc(&scx.catalog.resolve_full_name(from.name()))?;
3357 let key_indices = match &connection {
3358 CreateSinkConnection::Kafka { key, .. } => {
3359 if let Some(key) = key.clone() {
3360 let key_columns = key
3361 .key_columns
3362 .into_iter()
3363 .map(normalize::column_name)
3364 .collect::<Vec<_>>();
3365 let mut uniq = BTreeSet::new();
3366 for col in key_columns.iter() {
3367 if !uniq.insert(col) {
3368 sql_bail!("duplicate column referenced in KEY: {}", col);
3369 }
3370 }
3371 let indices = key_columns
3372 .iter()
3373 .map(|col| -> anyhow::Result<usize> {
3374 let name_idx =
3375 desc.get_by_name(col)
3376 .map(|(idx, _type)| idx)
3377 .ok_or_else(|| {
3378 sql_err!("column referenced in KEY does not exist: {}", col)
3379 })?;
3380 if desc.get_unambiguous_name(name_idx).is_none() {
3381 sql_err!("column referenced in KEY is ambiguous: {}", col);
3382 }
3383 Ok(name_idx)
3384 })
3385 .collect::<Result<Vec<_>, _>>()?;
3386 let is_valid_key =
3387 desc.typ().keys.iter().any(|key_columns| {
3388 key_columns.iter().all(|column| indices.contains(column))
3389 });
3390
3391 if !is_valid_key && envelope == SinkEnvelope::Upsert {
3392 if key.not_enforced {
3393 scx.catalog
3394 .add_notice(PlanNotice::UpsertSinkKeyNotEnforced {
3395 key: key_columns.clone(),
3396 name: name.item.clone(),
3397 })
3398 } else {
3399 return Err(PlanError::UpsertSinkWithInvalidKey {
3400 name: from_name.full_name_str(),
3401 desired_key: key_columns.iter().map(|c| c.to_string()).collect(),
3402 valid_keys: desc
3403 .typ()
3404 .keys
3405 .iter()
3406 .map(|key| {
3407 key.iter()
3408 .map(|col| desc.get_name(*col).as_str().into())
3409 .collect()
3410 })
3411 .collect(),
3412 });
3413 }
3414 }
3415 Some(indices)
3416 } else {
3417 None
3418 }
3419 }
3420 };
3421
3422 let headers_index = match &connection {
3423 CreateSinkConnection::Kafka {
3424 headers: Some(headers),
3425 ..
3426 } => {
3427 scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;
3428
3429 match envelope {
3430 SinkEnvelope::Upsert => (),
3431 SinkEnvelope::Debezium => {
3432 sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
3433 }
3434 };
3435
3436 let headers = normalize::column_name(headers.clone());
3437 let (idx, ty) = desc
3438 .get_by_name(&headers)
3439 .ok_or_else(|| sql_err!("HEADERS column ({}) is unknown", headers))?;
3440
3441 if desc.get_unambiguous_name(idx).is_none() {
3442 sql_bail!("HEADERS column ({}) is ambiguous", headers);
3443 }
3444
3445 match &ty.scalar_type {
3446 ScalarType::Map { value_type, .. }
3447 if matches!(&**value_type, ScalarType::String | ScalarType::Bytes) => {}
3448 _ => sql_bail!(
3449 "HEADERS column must have type map[text => text] or map[text => bytea]"
3450 ),
3451 }
3452
3453 Some(idx)
3454 }
3455 _ => None,
3456 };
3457
3458 let relation_key_indices = desc.typ().keys.get(0).cloned();
3460
3461 let key_desc_and_indices = key_indices.map(|key_indices| {
3462 let cols = desc
3463 .iter()
3464 .map(|(name, ty)| (name.clone(), ty.clone()))
3465 .collect::<Vec<_>>();
3466 let (names, types): (Vec<_>, Vec<_>) =
3467 key_indices.iter().map(|&idx| cols[idx].clone()).unzip();
3468 let typ = RelationType::new(types);
3469 (RelationDesc::new(typ, names), key_indices)
3470 });
3471
3472 if key_desc_and_indices.is_none() && envelope == SinkEnvelope::Upsert {
3473 return Err(PlanError::UpsertSinkWithoutKey);
3474 }
3475
3476 let connection_builder = match connection {
3477 CreateSinkConnection::Kafka {
3478 connection,
3479 options,
3480 ..
3481 } => kafka_sink_builder(
3482 scx,
3483 connection,
3484 options,
3485 format,
3486 relation_key_indices,
3487 key_desc_and_indices,
3488 headers_index,
3489 desc.into_owned(),
3490 envelope,
3491 from.id(),
3492 )?,
3493 };
3494
3495 let CreateSinkOptionExtracted {
3496 snapshot,
3497 version,
3498 partition_strategy: _,
3499 seen: _,
3500 } = with_options.try_into()?;
3501
3502 let with_snapshot = snapshot.unwrap_or(true);
3504 let version = version.unwrap_or(0);
3506
3507 let in_cluster = source_sink_cluster_config(scx, &mut stmt.in_cluster)?;
3511 let create_sql = normalize::create_statement(scx, Statement::CreateSink(stmt))?;
3512
3513 Ok(Plan::CreateSink(CreateSinkPlan {
3514 name,
3515 sink: Sink {
3516 create_sql,
3517 from: from.global_id(),
3518 connection: connection_builder,
3519 envelope,
3520 version,
3521 },
3522 with_snapshot,
3523 if_not_exists,
3524 in_cluster: in_cluster.id(),
3525 }))
3526}
3527
3528fn key_constraint_err(desc: &RelationDesc, user_keys: &[ColumnName]) -> PlanError {
3529 let user_keys = user_keys.iter().map(|column| column.as_str()).join(", ");
3530
3531 let existing_keys = desc
3532 .typ()
3533 .keys
3534 .iter()
3535 .map(|key_columns| {
3536 key_columns
3537 .iter()
3538 .map(|col| desc.get_name(*col).as_str())
3539 .join(", ")
3540 })
3541 .join(", ");
3542
3543 sql_err!(
3544 "Key constraint ({}) conflicts with existing key ({})",
3545 user_keys,
3546 existing_keys
3547 )
3548}
3549
3550#[derive(Debug, Default, PartialEq, Clone)]
3553pub struct CsrConfigOptionExtracted {
3554 seen: ::std::collections::BTreeSet<CsrConfigOptionName<Aug>>,
3555 pub(crate) avro_key_fullname: Option<String>,
3556 pub(crate) avro_value_fullname: Option<String>,
3557 pub(crate) null_defaults: bool,
3558 pub(crate) value_doc_options: BTreeMap<DocTarget, String>,
3559 pub(crate) key_doc_options: BTreeMap<DocTarget, String>,
3560 pub(crate) key_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3561 pub(crate) value_compatibility_level: Option<mz_ccsr::CompatibilityLevel>,
3562}
3563
3564impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtracted {
3565 type Error = crate::plan::PlanError;
3566 fn try_from(v: Vec<CsrConfigOption<Aug>>) -> Result<CsrConfigOptionExtracted, Self::Error> {
3567 let mut extracted = CsrConfigOptionExtracted::default();
3568 let mut common_doc_comments = BTreeMap::new();
3569 for option in v {
3570 if !extracted.seen.insert(option.name.clone()) {
3571 return Err(PlanError::Unstructured({
3572 format!("{} specified more than once", option.name)
3573 }));
3574 }
3575 let option_name = option.name.clone();
3576 let option_name_str = option_name.to_ast_string_simple();
3577 let better_error = |e: PlanError| PlanError::InvalidOptionValue {
3578 option_name: option_name.to_ast_string_simple(),
3579 err: e.into(),
3580 };
3581 let to_compatibility_level = |val: Option<WithOptionValue<Aug>>| {
3582 val.map(|s| match s {
3583 WithOptionValue::Value(Value::String(s)) => {
3584 mz_ccsr::CompatibilityLevel::try_from(s.to_uppercase().as_str())
3585 }
3586 _ => Err("must be a string".to_string()),
3587 })
3588 .transpose()
3589 .map_err(PlanError::Unstructured)
3590 .map_err(better_error)
3591 };
3592 match option.name {
3593 CsrConfigOptionName::AvroKeyFullname => {
3594 extracted.avro_key_fullname =
3595 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3596 }
3597 CsrConfigOptionName::AvroValueFullname => {
3598 extracted.avro_value_fullname =
3599 <Option<String>>::try_from_value(option.value).map_err(better_error)?;
3600 }
3601 CsrConfigOptionName::NullDefaults => {
3602 extracted.null_defaults =
3603 <bool>::try_from_value(option.value).map_err(better_error)?;
3604 }
3605 CsrConfigOptionName::AvroDocOn(doc_on) => {
3606 let value = String::try_from_value(option.value.ok_or_else(|| {
3607 PlanError::InvalidOptionValue {
3608 option_name: option_name_str,
3609 err: Box::new(PlanError::Unstructured("cannot be empty".to_string())),
3610 }
3611 })?)
3612 .map_err(better_error)?;
3613 let key = match doc_on.identifier {
3614 DocOnIdentifier::Column(ast::ColumnName {
3615 relation: ResolvedItemName::Item { id, .. },
3616 column: ResolvedColumnReference::Column { name, index: _ },
3617 }) => DocTarget::Field {
3618 object_id: id,
3619 column_name: name,
3620 },
3621 DocOnIdentifier::Type(ResolvedItemName::Item { id, .. }) => {
3622 DocTarget::Type(id)
3623 }
3624 _ => unreachable!(),
3625 };
3626
3627 match doc_on.for_schema {
3628 DocOnSchema::KeyOnly => {
3629 extracted.key_doc_options.insert(key, value);
3630 }
3631 DocOnSchema::ValueOnly => {
3632 extracted.value_doc_options.insert(key, value);
3633 }
3634 DocOnSchema::All => {
3635 common_doc_comments.insert(key, value);
3636 }
3637 }
3638 }
3639 CsrConfigOptionName::KeyCompatibilityLevel => {
3640 extracted.key_compatibility_level = to_compatibility_level(option.value)?;
3641 }
3642 CsrConfigOptionName::ValueCompatibilityLevel => {
3643 extracted.value_compatibility_level = to_compatibility_level(option.value)?;
3644 }
3645 }
3646 }
3647
3648 for (key, value) in common_doc_comments {
3649 if !extracted.key_doc_options.contains_key(&key) {
3650 extracted.key_doc_options.insert(key.clone(), value.clone());
3651 }
3652 if !extracted.value_doc_options.contains_key(&key) {
3653 extracted.value_doc_options.insert(key, value);
3654 }
3655 }
3656 Ok(extracted)
3657 }
3658}
3659
3660fn kafka_sink_builder(
3661 scx: &StatementContext,
3662 connection: ResolvedItemName,
3663 options: Vec<KafkaSinkConfigOption<Aug>>,
3664 format: Option<FormatSpecifier<Aug>>,
3665 relation_key_indices: Option<Vec<usize>>,
3666 key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
3667 headers_index: Option<usize>,
3668 value_desc: RelationDesc,
3669 envelope: SinkEnvelope,
3670 sink_from: CatalogItemId,
3671) -> Result<StorageSinkConnection<ReferencedConnection>, PlanError> {
3672 let connection_item = scx.get_item_by_resolved_name(&connection)?;
3674 let connection_id = connection_item.id();
3675 match connection_item.connection()? {
3676 Connection::Kafka(_) => (),
3677 _ => sql_bail!(
3678 "{} is not a kafka connection",
3679 scx.catalog.resolve_full_name(connection_item.name())
3680 ),
3681 };
3682
3683 let KafkaSinkConfigOptionExtracted {
3684 topic,
3685 compression_type,
3686 partition_by,
3687 progress_group_id_prefix,
3688 transactional_id_prefix,
3689 legacy_ids,
3690 topic_config,
3691 topic_metadata_refresh_interval,
3692 topic_partition_count,
3693 topic_replication_factor,
3694 seen: _,
3695 }: KafkaSinkConfigOptionExtracted = options.try_into()?;
3696
3697 let transactional_id = match (transactional_id_prefix, legacy_ids) {
3698 (Some(_), Some(true)) => {
3699 sql_bail!("LEGACY IDS cannot be used at the same time as TRANSACTIONAL ID PREFIX")
3700 }
3701 (None, Some(true)) => KafkaIdStyle::Legacy,
3702 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3703 };
3704
3705 let progress_group_id = match (progress_group_id_prefix, legacy_ids) {
3706 (Some(_), Some(true)) => {
3707 sql_bail!("LEGACY IDS cannot be used at the same time as PROGRESS GROUP ID PREFIX")
3708 }
3709 (None, Some(true)) => KafkaIdStyle::Legacy,
3710 (prefix, _) => KafkaIdStyle::Prefix(prefix),
3711 };
3712
3713 let topic_name = topic.ok_or_else(|| sql_err!("KAFKA CONNECTION must specify TOPIC"))?;
3714
3715 if topic_metadata_refresh_interval > Duration::from_secs(60 * 60) {
3716 sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
3719 }
3720
3721 let assert_positive = |val: Option<i32>, name: &str| {
3722 if let Some(val) = val {
3723 if val <= 0 {
3724 sql_bail!("{} must be a positive integer", name);
3725 }
3726 }
3727 val.map(NonNeg::try_from)
3728 .transpose()
3729 .map_err(|_| PlanError::Unstructured(format!("{} must be a positive integer", name)))
3730 };
3731 let topic_partition_count = assert_positive(topic_partition_count, "TOPIC PARTITION COUNT")?;
3732 let topic_replication_factor =
3733 assert_positive(topic_replication_factor, "TOPIC REPLICATION FACTOR")?;
3734
3735 let gen_avro_schema_options = |conn| {
3738 let CsrConnectionAvro {
3739 connection:
3740 CsrConnection {
3741 connection,
3742 options,
3743 },
3744 seed,
3745 key_strategy,
3746 value_strategy,
3747 } = conn;
3748 if seed.is_some() {
3749 sql_bail!("SEED option does not make sense with sinks");
3750 }
3751 if key_strategy.is_some() {
3752 sql_bail!("KEY STRATEGY option does not make sense with sinks");
3753 }
3754 if value_strategy.is_some() {
3755 sql_bail!("VALUE STRATEGY option does not make sense with sinks");
3756 }
3757
3758 let item = scx.get_item_by_resolved_name(&connection)?;
3759 let csr_connection = match item.connection()? {
3760 Connection::Csr(_) => item.id(),
3761 _ => {
3762 sql_bail!(
3763 "{} is not a schema registry connection",
3764 scx.catalog
3765 .resolve_full_name(item.name())
3766 .to_string()
3767 .quoted()
3768 )
3769 }
3770 };
3771 let extracted_options: CsrConfigOptionExtracted = options.try_into()?;
3772
3773 if key_desc_and_indices.is_none() && extracted_options.avro_key_fullname.is_some() {
3774 sql_bail!("Cannot specify AVRO KEY FULLNAME without a corresponding KEY field");
3775 }
3776
3777 if key_desc_and_indices.is_some()
3778 && (extracted_options.avro_key_fullname.is_some()
3779 ^ extracted_options.avro_value_fullname.is_some())
3780 {
3781 sql_bail!(
3782 "Must specify both AVRO KEY FULLNAME and AVRO VALUE FULLNAME when specifying generated schema names"
3783 );
3784 }
3785
3786 Ok((csr_connection, extracted_options))
3787 };
3788
3789 let map_format = |format: Format<Aug>, desc: &RelationDesc, is_key: bool| match format {
3790 Format::Json { array: false } => Ok::<_, PlanError>(KafkaSinkFormatType::Json),
3791 Format::Bytes if desc.arity() == 1 => {
3792 let col_type = &desc.typ().column_types[0].scalar_type;
3793 if !mz_pgrepr::Value::can_encode_binary(col_type) {
3794 bail_unsupported!(format!(
3795 "BYTES format with non-encodable type: {:?}",
3796 col_type
3797 ));
3798 }
3799
3800 Ok(KafkaSinkFormatType::Bytes)
3801 }
3802 Format::Text if desc.arity() == 1 => Ok(KafkaSinkFormatType::Text),
3803 Format::Bytes | Format::Text => {
3804 bail_unsupported!("BYTES or TEXT format with multiple columns")
3805 }
3806 Format::Json { array: true } => bail_unsupported!("JSON ARRAY format in sinks"),
3807 Format::Avro(AvroSchema::Csr { csr_connection }) => {
3808 let (csr_connection, options) = gen_avro_schema_options(csr_connection)?;
3809 let schema = if is_key {
3810 AvroSchemaGenerator::new(
3811 desc.clone(),
3812 false,
3813 options.key_doc_options,
3814 options.avro_key_fullname.as_deref().unwrap_or("row"),
3815 options.null_defaults,
3816 Some(sink_from),
3817 false,
3818 )?
3819 .schema()
3820 .to_string()
3821 } else {
3822 AvroSchemaGenerator::new(
3823 desc.clone(),
3824 matches!(envelope, SinkEnvelope::Debezium),
3825 options.value_doc_options,
3826 options.avro_value_fullname.as_deref().unwrap_or("envelope"),
3827 options.null_defaults,
3828 Some(sink_from),
3829 true,
3830 )?
3831 .schema()
3832 .to_string()
3833 };
3834 Ok(KafkaSinkFormatType::Avro {
3835 schema,
3836 compatibility_level: if is_key {
3837 options.key_compatibility_level
3838 } else {
3839 options.value_compatibility_level
3840 },
3841 csr_connection,
3842 })
3843 }
3844 format => bail_unsupported!(format!("sink format {:?}", format)),
3845 };
3846
3847 let partition_by = match &partition_by {
3848 Some(partition_by) => {
3849 let mut scope = Scope::from_source(None, value_desc.iter_names());
3850
3851 match envelope {
3852 SinkEnvelope::Upsert => (),
3853 SinkEnvelope::Debezium => {
3854 let key_indices: HashSet<_> = key_desc_and_indices
3855 .as_ref()
3856 .map(|(_desc, indices)| indices.as_slice())
3857 .unwrap_or_default()
3858 .into_iter()
3859 .collect();
3860 for (i, item) in scope.items.iter_mut().enumerate() {
3861 if !key_indices.contains(&i) {
3862 item.error_if_referenced = Some(|_table, column| {
3863 PlanError::InvalidPartitionByEnvelopeDebezium {
3864 column_name: column.to_string(),
3865 }
3866 });
3867 }
3868 }
3869 }
3870 };
3871
3872 let ecx = &ExprContext {
3873 qcx: &QueryContext::root(scx, QueryLifetime::OneShot),
3874 name: "PARTITION BY",
3875 scope: &scope,
3876 relation_type: value_desc.typ(),
3877 allow_aggregates: false,
3878 allow_subqueries: false,
3879 allow_parameters: false,
3880 allow_windows: false,
3881 };
3882 let expr = plan_expr(ecx, partition_by)?.cast_to(
3883 ecx,
3884 CastContext::Assignment,
3885 &ScalarType::UInt64,
3886 )?;
3887 let expr = expr.lower_uncorrelated()?;
3888
3889 Some(expr)
3890 }
3891 _ => None,
3892 };
3893
3894 let format = match format {
3896 Some(FormatSpecifier::KeyValue { key, value }) => {
3897 let key_format = match key_desc_and_indices.as_ref() {
3898 Some((desc, _indices)) => Some(map_format(key, desc, true)?),
3899 None => None,
3900 };
3901 KafkaSinkFormat {
3902 value_format: map_format(value, &value_desc, false)?,
3903 key_format,
3904 }
3905 }
3906 Some(FormatSpecifier::Bare(format)) => {
3907 let key_format = match key_desc_and_indices.as_ref() {
3908 Some((desc, _indices)) => Some(map_format(format.clone(), desc, true)?),
3909 None => None,
3910 };
3911 KafkaSinkFormat {
3912 value_format: map_format(format, &value_desc, false)?,
3913 key_format,
3914 }
3915 }
3916 None => bail_unsupported!("sink without format"),
3917 };
3918
3919 Ok(StorageSinkConnection::Kafka(KafkaSinkConnection {
3920 connection_id,
3921 connection: connection_id,
3922 format,
3923 topic: topic_name,
3924 relation_key_indices,
3925 key_desc_and_indices,
3926 headers_index,
3927 value_desc,
3928 partition_by,
3929 compression_type,
3930 progress_group_id,
3931 transactional_id,
3932 topic_options: KafkaTopicOptions {
3933 partition_count: topic_partition_count,
3934 replication_factor: topic_replication_factor,
3935 topic_config: topic_config.unwrap_or_default(),
3936 },
3937 topic_metadata_refresh_interval,
3938 }))
3939}
3940
3941pub fn describe_create_index(
3942 _: &StatementContext,
3943 _: CreateIndexStatement<Aug>,
3944) -> Result<StatementDesc, PlanError> {
3945 Ok(StatementDesc::new(None))
3946}
3947
3948pub fn plan_create_index(
3949 scx: &StatementContext,
3950 mut stmt: CreateIndexStatement<Aug>,
3951) -> Result<Plan, PlanError> {
3952 let CreateIndexStatement {
3953 name,
3954 on_name,
3955 in_cluster,
3956 key_parts,
3957 with_options,
3958 if_not_exists,
3959 } = &mut stmt;
3960 let on = scx.get_item_by_resolved_name(on_name)?;
3961 let on_item_type = on.item_type();
3962
3963 if !matches!(
3964 on_item_type,
3965 CatalogItemType::View
3966 | CatalogItemType::MaterializedView
3967 | CatalogItemType::Source
3968 | CatalogItemType::Table
3969 ) {
3970 sql_bail!(
3971 "index cannot be created on {} because it is a {}",
3972 on_name.full_name_str(),
3973 on.item_type()
3974 )
3975 }
3976
3977 let on_desc = on.desc(&scx.catalog.resolve_full_name(on.name()))?;
3978
3979 let filled_key_parts = match key_parts {
3980 Some(kp) => kp.to_vec(),
3981 None => {
3982 on.desc(&scx.catalog.resolve_full_name(on.name()))?
3984 .typ()
3985 .default_key()
3986 .iter()
3987 .map(|i| match on_desc.get_unambiguous_name(*i) {
3988 Some(n) => Expr::Identifier(vec![n.clone().into()]),
3989 _ => Expr::Value(Value::Number((i + 1).to_string())),
3990 })
3991 .collect()
3992 }
3993 };
3994 let keys = query::plan_index_exprs(scx, &on_desc, filled_key_parts.clone())?;
3995
3996 let index_name = if let Some(name) = name {
3997 QualifiedItemName {
3998 qualifiers: on.name().qualifiers.clone(),
3999 item: normalize::ident(name.clone()),
4000 }
4001 } else {
4002 let mut idx_name = QualifiedItemName {
4003 qualifiers: on.name().qualifiers.clone(),
4004 item: on.name().item.clone(),
4005 };
4006 if key_parts.is_none() {
4007 idx_name.item += "_primary_idx";
4009 } else {
4010 let index_name_col_suffix = keys
4013 .iter()
4014 .map(|k| match k {
4015 mz_expr::MirScalarExpr::Column(i, name) => {
4016 match (on_desc.get_unambiguous_name(*i), &name.0) {
4017 (Some(col_name), _) => col_name.to_string(),
4018 (None, Some(name)) => name.to_string(),
4019 (None, None) => format!("{}", i + 1),
4020 }
4021 }
4022 _ => "expr".to_string(),
4023 })
4024 .join("_");
4025 write!(idx_name.item, "_{index_name_col_suffix}_idx")
4026 .expect("write on strings cannot fail");
4027 idx_name.item = normalize::ident(Ident::new(&idx_name.item)?)
4028 }
4029
4030 if !*if_not_exists {
4031 scx.catalog.find_available_name(idx_name)
4032 } else {
4033 idx_name
4034 }
4035 };
4036
4037 let full_name = scx.catalog.resolve_full_name(&index_name);
4039 let partial_name = PartialItemName::from(full_name.clone());
4040 if let (Ok(item), false, false) = (
4048 scx.catalog.resolve_item_or_type(&partial_name),
4049 *if_not_exists,
4050 scx.pcx().map_or(false, |pcx| pcx.ignore_if_exists_errors),
4051 ) {
4052 return Err(PlanError::ItemAlreadyExists {
4053 name: full_name.to_string(),
4054 item_type: item.item_type(),
4055 });
4056 }
4057
4058 let options = plan_index_options(scx, with_options.clone())?;
4059 let cluster_id = match in_cluster {
4060 None => scx.resolve_cluster(None)?.id(),
4061 Some(in_cluster) => in_cluster.id,
4062 };
4063
4064 *in_cluster = Some(ResolvedClusterName {
4065 id: cluster_id,
4066 print_name: None,
4067 });
4068
4069 *name = Some(Ident::new(index_name.item.clone())?);
4071 *key_parts = Some(filled_key_parts);
4072 let if_not_exists = *if_not_exists;
4073
4074 let create_sql = normalize::create_statement(scx, Statement::CreateIndex(stmt))?;
4075 let compaction_window = options.iter().find_map(|o| {
4076 #[allow(irrefutable_let_patterns)]
4077 if let crate::plan::IndexOption::RetainHistory(lcw) = o {
4078 Some(lcw.clone())
4079 } else {
4080 None
4081 }
4082 });
4083
4084 Ok(Plan::CreateIndex(CreateIndexPlan {
4085 name: index_name,
4086 index: Index {
4087 create_sql,
4088 on: on.global_id(),
4089 keys,
4090 cluster_id,
4091 compaction_window,
4092 },
4093 if_not_exists,
4094 }))
4095}
4096
4097pub fn describe_create_type(
4098 _: &StatementContext,
4099 _: CreateTypeStatement<Aug>,
4100) -> Result<StatementDesc, PlanError> {
4101 Ok(StatementDesc::new(None))
4102}
4103
4104pub fn plan_create_type(
4105 scx: &StatementContext,
4106 stmt: CreateTypeStatement<Aug>,
4107) -> Result<Plan, PlanError> {
4108 let create_sql = normalize::create_statement(scx, Statement::CreateType(stmt.clone()))?;
4109 let CreateTypeStatement { name, as_type, .. } = stmt;
4110
4111 fn validate_data_type(
4112 scx: &StatementContext,
4113 data_type: ResolvedDataType,
4114 as_type: &str,
4115 key: &str,
4116 ) -> Result<(CatalogItemId, Vec<i64>), PlanError> {
4117 let (id, modifiers) = match data_type {
4118 ResolvedDataType::Named { id, modifiers, .. } => (id, modifiers),
4119 _ => sql_bail!(
4120 "CREATE TYPE ... AS {}option {} can only use named data types, but \
4121 found unnamed data type {}. Use CREATE TYPE to create a named type first",
4122 as_type,
4123 key,
4124 data_type.human_readable_name(),
4125 ),
4126 };
4127
4128 let item = scx.catalog.get_item(&id);
4129 match item.type_details() {
4130 None => sql_bail!(
4131 "{} must be of class type, but received {} which is of class {}",
4132 key,
4133 scx.catalog.resolve_full_name(item.name()),
4134 item.item_type()
4135 ),
4136 Some(CatalogTypeDetails {
4137 typ: CatalogType::Char,
4138 ..
4139 }) => {
4140 bail_unsupported!("embedding char type in a list or map")
4141 }
4142 _ => {
4143 scalar_type_from_catalog(scx.catalog, id, &modifiers)?;
4145
4146 Ok((id, modifiers))
4147 }
4148 }
4149 }
4150
4151 let inner = match as_type {
4152 CreateTypeAs::List { options } => {
4153 let CreateTypeListOptionExtracted {
4154 element_type,
4155 seen: _,
4156 } = CreateTypeListOptionExtracted::try_from(options)?;
4157 let element_type =
4158 element_type.ok_or_else(|| sql_err!("ELEMENT TYPE option is required"))?;
4159 let (id, modifiers) = validate_data_type(scx, element_type, "LIST ", "ELEMENT TYPE")?;
4160 CatalogType::List {
4161 element_reference: id,
4162 element_modifiers: modifiers,
4163 }
4164 }
4165 CreateTypeAs::Map { options } => {
4166 let CreateTypeMapOptionExtracted {
4167 key_type,
4168 value_type,
4169 seen: _,
4170 } = CreateTypeMapOptionExtracted::try_from(options)?;
4171 let key_type = key_type.ok_or_else(|| sql_err!("KEY TYPE option is required"))?;
4172 let value_type = value_type.ok_or_else(|| sql_err!("VALUE TYPE option is required"))?;
4173 let (key_id, key_modifiers) = validate_data_type(scx, key_type, "MAP ", "KEY TYPE")?;
4174 let (value_id, value_modifiers) =
4175 validate_data_type(scx, value_type, "MAP ", "VALUE TYPE")?;
4176 CatalogType::Map {
4177 key_reference: key_id,
4178 key_modifiers,
4179 value_reference: value_id,
4180 value_modifiers,
4181 }
4182 }
4183 CreateTypeAs::Record { column_defs } => {
4184 let mut fields = vec![];
4185 for column_def in column_defs {
4186 let data_type = column_def.data_type;
4187 let key = ident(column_def.name.clone());
4188 let (id, modifiers) = validate_data_type(scx, data_type, "", &key)?;
4189 fields.push(CatalogRecordField {
4190 name: ColumnName::from(key.clone()),
4191 type_reference: id,
4192 type_modifiers: modifiers,
4193 });
4194 }
4195 CatalogType::Record { fields }
4196 }
4197 };
4198
4199 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
4200
4201 let full_name = scx.catalog.resolve_full_name(&name);
4203 let partial_name = PartialItemName::from(full_name.clone());
4204 if let Ok(item) = scx.catalog.resolve_item_or_type(&partial_name) {
4207 if item.item_type().conflicts_with_type() {
4208 return Err(PlanError::ItemAlreadyExists {
4209 name: full_name.to_string(),
4210 item_type: item.item_type(),
4211 });
4212 }
4213 }
4214
4215 Ok(Plan::CreateType(CreateTypePlan {
4216 name,
4217 typ: Type { create_sql, inner },
4218 }))
4219}
4220
4221generate_extracted_config!(CreateTypeListOption, (ElementType, ResolvedDataType));
4222
4223generate_extracted_config!(
4224 CreateTypeMapOption,
4225 (KeyType, ResolvedDataType),
4226 (ValueType, ResolvedDataType)
4227);
4228
4229#[derive(Debug)]
4230pub enum PlannedAlterRoleOption {
4231 Attributes(PlannedRoleAttributes),
4232 Variable(PlannedRoleVariable),
4233}
4234
4235#[derive(Debug, Clone)]
4236pub struct PlannedRoleAttributes {
4237 pub inherit: Option<bool>,
4238 pub password: Option<Password>,
4239 pub nopassword: Option<bool>,
4243 pub superuser: Option<bool>,
4244 pub login: Option<bool>,
4245}
4246
4247fn plan_role_attributes(options: Vec<RoleAttribute>) -> Result<PlannedRoleAttributes, PlanError> {
4248 let mut planned_attributes = PlannedRoleAttributes {
4249 inherit: None,
4250 password: None,
4251 superuser: None,
4252 login: None,
4253 nopassword: None,
4254 };
4255
4256 for option in options {
4257 match option {
4258 RoleAttribute::Inherit | RoleAttribute::NoInherit
4259 if planned_attributes.inherit.is_some() =>
4260 {
4261 sql_bail!("conflicting or redundant options");
4262 }
4263 RoleAttribute::CreateCluster | RoleAttribute::NoCreateCluster => {
4264 bail_never_supported!(
4265 "CREATECLUSTER attribute",
4266 "sql/create-role/#details",
4267 "Use system privileges instead."
4268 );
4269 }
4270 RoleAttribute::CreateDB | RoleAttribute::NoCreateDB => {
4271 bail_never_supported!(
4272 "CREATEDB attribute",
4273 "sql/create-role/#details",
4274 "Use system privileges instead."
4275 );
4276 }
4277 RoleAttribute::CreateRole | RoleAttribute::NoCreateRole => {
4278 bail_never_supported!(
4279 "CREATEROLE attribute",
4280 "sql/create-role/#details",
4281 "Use system privileges instead."
4282 );
4283 }
4284 RoleAttribute::Password(_) if planned_attributes.password.is_some() => {
4285 sql_bail!("conflicting or redundant options");
4286 }
4287
4288 RoleAttribute::Inherit => planned_attributes.inherit = Some(true),
4289 RoleAttribute::NoInherit => planned_attributes.inherit = Some(false),
4290 RoleAttribute::Password(password) => {
4291 if let Some(password) = password {
4292 planned_attributes.password = Some(password.into());
4293 } else {
4294 planned_attributes.nopassword = Some(true);
4295 }
4296 }
4297 RoleAttribute::SuperUser => {
4298 if planned_attributes.superuser == Some(false) {
4299 sql_bail!("conflicting or redundant options");
4300 }
4301 planned_attributes.superuser = Some(true);
4302 }
4303 RoleAttribute::NoSuperUser => {
4304 if planned_attributes.superuser == Some(true) {
4305 sql_bail!("conflicting or redundant options");
4306 }
4307 planned_attributes.superuser = Some(false);
4308 }
4309 RoleAttribute::Login => {
4310 if planned_attributes.login == Some(false) {
4311 sql_bail!("conflicting or redundant options");
4312 }
4313 planned_attributes.login = Some(true);
4314 }
4315 RoleAttribute::NoLogin => {
4316 if planned_attributes.login == Some(true) {
4317 sql_bail!("conflicting or redundant options");
4318 }
4319 planned_attributes.login = Some(false);
4320 }
4321 }
4322 }
4323 if planned_attributes.inherit == Some(false) {
4324 bail_unsupported!("non inherit roles");
4325 }
4326
4327 Ok(planned_attributes)
4328}
4329
4330#[derive(Debug)]
4331pub enum PlannedRoleVariable {
4332 Set { name: String, value: VariableValue },
4333 Reset { name: String },
4334}
4335
4336impl PlannedRoleVariable {
4337 pub fn name(&self) -> &str {
4338 match self {
4339 PlannedRoleVariable::Set { name, .. } => name,
4340 PlannedRoleVariable::Reset { name } => name,
4341 }
4342 }
4343}
4344
4345fn plan_role_variable(variable: SetRoleVar) -> Result<PlannedRoleVariable, PlanError> {
4346 let plan = match variable {
4347 SetRoleVar::Set { name, value } => PlannedRoleVariable::Set {
4348 name: name.to_string(),
4349 value: scl::plan_set_variable_to(value)?,
4350 },
4351 SetRoleVar::Reset { name } => PlannedRoleVariable::Reset {
4352 name: name.to_string(),
4353 },
4354 };
4355 Ok(plan)
4356}
4357
4358pub fn describe_create_role(
4359 _: &StatementContext,
4360 _: CreateRoleStatement,
4361) -> Result<StatementDesc, PlanError> {
4362 Ok(StatementDesc::new(None))
4363}
4364
4365pub fn plan_create_role(
4366 _: &StatementContext,
4367 CreateRoleStatement { name, options }: CreateRoleStatement,
4368) -> Result<Plan, PlanError> {
4369 let attributes = plan_role_attributes(options)?;
4370 Ok(Plan::CreateRole(CreateRolePlan {
4371 name: normalize::ident(name),
4372 attributes: attributes.into(),
4373 }))
4374}
4375
4376pub fn plan_create_network_policy(
4377 ctx: &StatementContext,
4378 CreateNetworkPolicyStatement { name, options }: CreateNetworkPolicyStatement<Aug>,
4379) -> Result<Plan, PlanError> {
4380 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4381 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4382
4383 let Some(rule_defs) = policy_options.rules else {
4384 sql_bail!("RULES must be specified when creating network policies.");
4385 };
4386
4387 let mut rules = vec![];
4388 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4389 let NetworkPolicyRuleOptionExtracted {
4390 seen: _,
4391 direction,
4392 action,
4393 address,
4394 } = options.try_into()?;
4395 let (direction, action, address) = match (direction, action, address) {
4396 (Some(direction), Some(action), Some(address)) => (
4397 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4398 NetworkPolicyRuleAction::try_from(action.as_str())?,
4399 PolicyAddress::try_from(address.as_str())?,
4400 ),
4401 (_, _, _) => {
4402 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4403 }
4404 };
4405 rules.push(NetworkPolicyRule {
4406 name: normalize::ident(name),
4407 direction,
4408 action,
4409 address,
4410 });
4411 }
4412
4413 if rules.len()
4414 > ctx
4415 .catalog
4416 .system_vars()
4417 .max_rules_per_network_policy()
4418 .try_into()?
4419 {
4420 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4421 }
4422
4423 Ok(Plan::CreateNetworkPolicy(CreateNetworkPolicyPlan {
4424 name: normalize::ident(name),
4425 rules,
4426 }))
4427}
4428
4429pub fn plan_alter_network_policy(
4430 ctx: &StatementContext,
4431 AlterNetworkPolicyStatement { name, options }: AlterNetworkPolicyStatement<Aug>,
4432) -> Result<Plan, PlanError> {
4433 ctx.require_feature_flag(&vars::ENABLE_NETWORK_POLICIES)?;
4434
4435 let policy_options: NetworkPolicyOptionExtracted = options.try_into()?;
4436 let policy = ctx.catalog.resolve_network_policy(&name.to_string())?;
4437
4438 let Some(rule_defs) = policy_options.rules else {
4439 sql_bail!("RULES must be specified when creating network policies.");
4440 };
4441
4442 let mut rules = vec![];
4443 for NetworkPolicyRuleDefinition { name, options } in rule_defs {
4444 let NetworkPolicyRuleOptionExtracted {
4445 seen: _,
4446 direction,
4447 action,
4448 address,
4449 } = options.try_into()?;
4450
4451 let (direction, action, address) = match (direction, action, address) {
4452 (Some(direction), Some(action), Some(address)) => (
4453 NetworkPolicyRuleDirection::try_from(direction.as_str())?,
4454 NetworkPolicyRuleAction::try_from(action.as_str())?,
4455 PolicyAddress::try_from(address.as_str())?,
4456 ),
4457 (_, _, _) => {
4458 sql_bail!("Direction, Address, and Action must specified when creating a rule")
4459 }
4460 };
4461 rules.push(NetworkPolicyRule {
4462 name: normalize::ident(name),
4463 direction,
4464 action,
4465 address,
4466 });
4467 }
4468 if rules.len()
4469 > ctx
4470 .catalog
4471 .system_vars()
4472 .max_rules_per_network_policy()
4473 .try_into()?
4474 {
4475 sql_bail!("RULES count exceeds max_rules_per_network_policy.")
4476 }
4477
4478 Ok(Plan::AlterNetworkPolicy(AlterNetworkPolicyPlan {
4479 id: policy.id(),
4480 name: normalize::ident(name),
4481 rules,
4482 }))
4483}
4484
4485pub fn describe_create_cluster(
4486 _: &StatementContext,
4487 _: CreateClusterStatement<Aug>,
4488) -> Result<StatementDesc, PlanError> {
4489 Ok(StatementDesc::new(None))
4490}
4491
4492generate_extracted_config!(
4498 ClusterOption,
4499 (AvailabilityZones, Vec<String>),
4500 (Disk, bool),
4501 (IntrospectionDebugging, bool),
4502 (IntrospectionInterval, OptionalDuration),
4503 (Managed, bool),
4504 (Replicas, Vec<ReplicaDefinition<Aug>>),
4505 (ReplicationFactor, u32),
4506 (Size, String),
4507 (Schedule, ClusterScheduleOptionValue),
4508 (WorkloadClass, OptionalString)
4509);
4510
4511generate_extracted_config!(
4512 NetworkPolicyOption,
4513 (Rules, Vec<NetworkPolicyRuleDefinition<Aug>>)
4514);
4515
4516generate_extracted_config!(
4517 NetworkPolicyRuleOption,
4518 (Direction, String),
4519 (Action, String),
4520 (Address, String)
4521);
4522
4523generate_extracted_config!(ClusterAlterOption, (Wait, ClusterAlterOptionValue<Aug>));
4524
4525generate_extracted_config!(
4526 ClusterAlterUntilReadyOption,
4527 (Timeout, Duration),
4528 (OnTimeout, String)
4529);
4530
4531generate_extracted_config!(
4532 ClusterFeature,
4533 (ReoptimizeImportedViews, Option<bool>, Default(None)),
4534 (EnableEagerDeltaJoins, Option<bool>, Default(None)),
4535 (EnableNewOuterJoinLowering, Option<bool>, Default(None)),
4536 (EnableVariadicLeftJoinLowering, Option<bool>, Default(None)),
4537 (EnableLetrecFixpointAnalysis, Option<bool>, Default(None)),
4538 (EnableJoinPrioritizeArranged, Option<bool>, Default(None)),
4539 (
4540 EnableProjectionPushdownAfterRelationCse,
4541 Option<bool>,
4542 Default(None)
4543 )
4544);
4545
4546pub fn plan_create_cluster(
4550 scx: &StatementContext,
4551 stmt: CreateClusterStatement<Aug>,
4552) -> Result<Plan, PlanError> {
4553 let plan = plan_create_cluster_inner(scx, stmt)?;
4554
4555 if let CreateClusterVariant::Managed(_) = &plan.variant {
4557 let stmt = unplan_create_cluster(scx, plan.clone())
4558 .map_err(|e| PlanError::Replan(e.to_string()))?;
4559 let create_sql = stmt.to_ast_string_stable();
4560 let stmt = parse::parse(&create_sql)
4561 .map_err(|e| PlanError::Replan(e.to_string()))?
4562 .into_element()
4563 .ast;
4564 let (stmt, _resolved_ids) =
4565 names::resolve(scx.catalog, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4566 let stmt = match stmt {
4567 Statement::CreateCluster(stmt) => stmt,
4568 stmt => {
4569 return Err(PlanError::Replan(format!(
4570 "replan does not match: plan={plan:?}, create_sql={create_sql:?}, stmt={stmt:?}"
4571 )));
4572 }
4573 };
4574 let replan =
4575 plan_create_cluster_inner(scx, stmt).map_err(|e| PlanError::Replan(e.to_string()))?;
4576 if plan != replan {
4577 return Err(PlanError::Replan(format!(
4578 "replan does not match: plan={plan:?}, replan={replan:?}"
4579 )));
4580 }
4581 }
4582
4583 Ok(Plan::CreateCluster(plan))
4584}
4585
4586pub fn plan_create_cluster_inner(
4587 scx: &StatementContext,
4588 CreateClusterStatement {
4589 name,
4590 options,
4591 features,
4592 }: CreateClusterStatement<Aug>,
4593) -> Result<CreateClusterPlan, PlanError> {
4594 let ClusterOptionExtracted {
4595 availability_zones,
4596 introspection_debugging,
4597 introspection_interval,
4598 managed,
4599 replicas,
4600 replication_factor,
4601 seen: _,
4602 size,
4603 disk,
4604 schedule,
4605 workload_class,
4606 }: ClusterOptionExtracted = options.try_into()?;
4607
4608 let managed = managed.unwrap_or_else(|| replicas.is_none());
4609
4610 if !scx.catalog.active_role_id().is_system() {
4611 if !features.is_empty() {
4612 sql_bail!("FEATURES not supported for non-system users");
4613 }
4614 if workload_class.is_some() {
4615 sql_bail!("WORKLOAD CLASS not supported for non-system users");
4616 }
4617 }
4618
4619 let schedule = schedule.unwrap_or(ClusterScheduleOptionValue::Manual);
4620 let workload_class = workload_class.and_then(|v| v.0);
4621
4622 if managed {
4623 if replicas.is_some() {
4624 sql_bail!("REPLICAS not supported for managed clusters");
4625 }
4626 let Some(size) = size else {
4627 sql_bail!("SIZE must be specified for managed clusters");
4628 };
4629
4630 if disk.is_some() {
4631 if scx.catalog.is_cluster_size_cc(&size) {
4635 sql_bail!(
4636 "DISK option not supported for modern cluster sizes because disk is always enabled"
4637 );
4638 }
4639
4640 scx.catalog
4641 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4642 }
4643
4644 let compute = plan_compute_replica_config(
4645 introspection_interval,
4646 introspection_debugging.unwrap_or(false),
4647 )?;
4648
4649 let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) {
4650 replication_factor.unwrap_or_else(|| {
4651 scx.catalog
4652 .system_vars()
4653 .default_cluster_replication_factor()
4654 })
4655 } else {
4656 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
4657 if replication_factor.is_some() {
4658 sql_bail!(
4659 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
4660 );
4661 }
4662 0
4666 };
4667 let availability_zones = availability_zones.unwrap_or_default();
4668
4669 if !availability_zones.is_empty() {
4670 scx.require_feature_flag(&vars::ENABLE_MANAGED_CLUSTER_AVAILABILITY_ZONES)?;
4671 }
4672
4673 let ClusterFeatureExtracted {
4675 reoptimize_imported_views,
4676 enable_eager_delta_joins,
4677 enable_new_outer_join_lowering,
4678 enable_variadic_left_join_lowering,
4679 enable_letrec_fixpoint_analysis,
4680 enable_join_prioritize_arranged,
4681 enable_projection_pushdown_after_relation_cse,
4682 seen: _,
4683 } = ClusterFeatureExtracted::try_from(features)?;
4684 let optimizer_feature_overrides = OptimizerFeatureOverrides {
4685 reoptimize_imported_views,
4686 enable_eager_delta_joins,
4687 enable_new_outer_join_lowering,
4688 enable_variadic_left_join_lowering,
4689 enable_letrec_fixpoint_analysis,
4690 enable_join_prioritize_arranged,
4691 enable_projection_pushdown_after_relation_cse,
4692 ..Default::default()
4693 };
4694
4695 let schedule = plan_cluster_schedule(schedule)?;
4696
4697 Ok(CreateClusterPlan {
4698 name: normalize::ident(name),
4699 variant: CreateClusterVariant::Managed(CreateClusterManagedPlan {
4700 replication_factor,
4701 size,
4702 availability_zones,
4703 compute,
4704 optimizer_feature_overrides,
4705 schedule,
4706 }),
4707 workload_class,
4708 })
4709 } else {
4710 let Some(replica_defs) = replicas else {
4711 sql_bail!("REPLICAS must be specified for unmanaged clusters");
4712 };
4713 if availability_zones.is_some() {
4714 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
4715 }
4716 if replication_factor.is_some() {
4717 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
4718 }
4719 if introspection_debugging.is_some() {
4720 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
4721 }
4722 if introspection_interval.is_some() {
4723 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
4724 }
4725 if size.is_some() {
4726 sql_bail!("SIZE not supported for unmanaged clusters");
4727 }
4728 if disk.is_some() {
4729 sql_bail!("DISK not supported for unmanaged clusters");
4730 }
4731 if !features.is_empty() {
4732 sql_bail!("FEATURES not supported for unmanaged clusters");
4733 }
4734 if !matches!(schedule, ClusterScheduleOptionValue::Manual) {
4735 sql_bail!(
4736 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
4737 );
4738 }
4739
4740 let mut replicas = vec![];
4741 for ReplicaDefinition { name, options } in replica_defs {
4742 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
4743 }
4744
4745 Ok(CreateClusterPlan {
4746 name: normalize::ident(name),
4747 variant: CreateClusterVariant::Unmanaged(CreateClusterUnmanagedPlan { replicas }),
4748 workload_class,
4749 })
4750 }
4751}
4752
4753pub fn unplan_create_cluster(
4757 scx: &StatementContext,
4758 CreateClusterPlan {
4759 name,
4760 variant,
4761 workload_class,
4762 }: CreateClusterPlan,
4763) -> Result<CreateClusterStatement<Aug>, PlanError> {
4764 match variant {
4765 CreateClusterVariant::Managed(CreateClusterManagedPlan {
4766 replication_factor,
4767 size,
4768 availability_zones,
4769 compute,
4770 optimizer_feature_overrides,
4771 schedule,
4772 }) => {
4773 let schedule = unplan_cluster_schedule(schedule);
4774 let OptimizerFeatureOverrides {
4775 enable_guard_subquery_tablefunc: _,
4776 enable_consolidate_after_union_negate: _,
4777 enable_reduce_mfp_fusion: _,
4778 enable_cardinality_estimates: _,
4779 persist_fast_path_limit: _,
4780 reoptimize_imported_views,
4781 enable_eager_delta_joins,
4782 enable_new_outer_join_lowering,
4783 enable_variadic_left_join_lowering,
4784 enable_letrec_fixpoint_analysis,
4785 enable_reduce_reduction: _,
4786 enable_join_prioritize_arranged,
4787 enable_projection_pushdown_after_relation_cse,
4788 enable_less_reduce_in_eqprop: _,
4789 enable_dequadratic_eqprop_map: _,
4790 enable_eq_classes_withholding_errors: _,
4791 enable_fast_path_plan_insights: _,
4792 } = optimizer_feature_overrides;
4793 let features_extracted = ClusterFeatureExtracted {
4795 seen: Default::default(),
4797 reoptimize_imported_views,
4798 enable_eager_delta_joins,
4799 enable_new_outer_join_lowering,
4800 enable_variadic_left_join_lowering,
4801 enable_letrec_fixpoint_analysis,
4802 enable_join_prioritize_arranged,
4803 enable_projection_pushdown_after_relation_cse,
4804 };
4805 let features = features_extracted.into_values(scx.catalog);
4806 let availability_zones = if availability_zones.is_empty() {
4807 None
4808 } else {
4809 Some(availability_zones)
4810 };
4811 let (introspection_interval, introspection_debugging) =
4812 unplan_compute_replica_config(compute);
4813 let replication_factor = match &schedule {
4816 ClusterScheduleOptionValue::Manual => Some(replication_factor),
4817 ClusterScheduleOptionValue::Refresh { .. } => {
4818 assert!(
4819 replication_factor <= 1,
4820 "replication factor, {replication_factor:?}, must be <= 1"
4821 );
4822 None
4823 }
4824 };
4825 let workload_class = workload_class.map(|s| OptionalString(Some(s)));
4826 let options_extracted = ClusterOptionExtracted {
4827 seen: Default::default(),
4829 availability_zones,
4830 disk: None,
4831 introspection_debugging: Some(introspection_debugging),
4832 introspection_interval,
4833 managed: Some(true),
4834 replicas: None,
4835 replication_factor,
4836 size: Some(size),
4837 schedule: Some(schedule),
4838 workload_class,
4839 };
4840 let options = options_extracted.into_values(scx.catalog);
4841 let name = Ident::new_unchecked(name);
4842 Ok(CreateClusterStatement {
4843 name,
4844 options,
4845 features,
4846 })
4847 }
4848 CreateClusterVariant::Unmanaged(_) => {
4849 bail_unsupported!("SHOW CREATE for unmanaged clusters")
4850 }
4851 }
4852}
4853
4854generate_extracted_config!(
4855 ReplicaOption,
4856 (AvailabilityZone, String),
4857 (BilledAs, String),
4858 (ComputeAddresses, Vec<String>),
4859 (ComputectlAddresses, Vec<String>),
4860 (Disk, bool),
4861 (Internal, bool, Default(false)),
4862 (IntrospectionDebugging, bool, Default(false)),
4863 (IntrospectionInterval, OptionalDuration),
4864 (Size, String),
4865 (StorageAddresses, Vec<String>),
4866 (StoragectlAddresses, Vec<String>),
4867 (Workers, u16)
4868);
4869
4870fn plan_replica_config(
4871 scx: &StatementContext,
4872 options: Vec<ReplicaOption<Aug>>,
4873) -> Result<ReplicaConfig, PlanError> {
4874 let ReplicaOptionExtracted {
4875 availability_zone,
4876 billed_as,
4877 computectl_addresses,
4878 disk,
4879 internal,
4880 introspection_debugging,
4881 introspection_interval,
4882 size,
4883 storagectl_addresses,
4884 ..
4885 }: ReplicaOptionExtracted = options.try_into()?;
4886
4887 let compute = plan_compute_replica_config(introspection_interval, introspection_debugging)?;
4888
4889 match (
4890 size,
4891 availability_zone,
4892 billed_as,
4893 storagectl_addresses,
4894 computectl_addresses,
4895 ) {
4896 (None, _, None, None, None) => {
4898 sql_bail!("SIZE option must be specified");
4901 }
4902 (Some(size), availability_zone, billed_as, None, None) => {
4903 if disk.is_some() {
4904 if scx.catalog.is_cluster_size_cc(&size) {
4908 sql_bail!(
4909 "DISK option not supported for modern cluster sizes because disk is always enabled"
4910 );
4911 }
4912
4913 scx.catalog
4914 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
4915 }
4916
4917 Ok(ReplicaConfig::Orchestrated {
4918 size,
4919 availability_zone,
4920 compute,
4921 billed_as,
4922 internal,
4923 })
4924 }
4925
4926 (None, None, None, storagectl_addresses, computectl_addresses) => {
4927 scx.require_feature_flag(&vars::UNSAFE_ENABLE_UNORCHESTRATED_CLUSTER_REPLICAS)?;
4928
4929 let Some(storagectl_addrs) = storagectl_addresses else {
4933 sql_bail!("missing STORAGECTL ADDRESSES option");
4934 };
4935 let Some(computectl_addrs) = computectl_addresses else {
4936 sql_bail!("missing COMPUTECTL ADDRESSES option");
4937 };
4938
4939 if storagectl_addrs.len() != computectl_addrs.len() {
4940 sql_bail!(
4941 "COMPUTECTL ADDRESSES and STORAGECTL ADDRESSES must have the same length"
4942 );
4943 }
4944
4945 if disk.is_some() {
4946 sql_bail!("DISK can't be specified for unorchestrated clusters");
4947 }
4948
4949 Ok(ReplicaConfig::Unorchestrated {
4950 storagectl_addrs,
4951 computectl_addrs,
4952 compute,
4953 })
4954 }
4955 _ => {
4956 sql_bail!("invalid mixture of orchestrated and unorchestrated replica options");
4959 }
4960 }
4961}
4962
4963fn plan_compute_replica_config(
4967 introspection_interval: Option<OptionalDuration>,
4968 introspection_debugging: bool,
4969) -> Result<ComputeReplicaConfig, PlanError> {
4970 let introspection_interval = introspection_interval
4971 .map(|OptionalDuration(i)| i)
4972 .unwrap_or(Some(DEFAULT_REPLICA_LOGGING_INTERVAL));
4973 let introspection = match introspection_interval {
4974 Some(interval) => Some(ComputeReplicaIntrospectionConfig {
4975 interval,
4976 debugging: introspection_debugging,
4977 }),
4978 None if introspection_debugging => {
4979 sql_bail!("INTROSPECTION DEBUGGING cannot be specified without INTROSPECTION INTERVAL")
4980 }
4981 None => None,
4982 };
4983 let compute = ComputeReplicaConfig { introspection };
4984 Ok(compute)
4985}
4986
4987fn unplan_compute_replica_config(
4991 compute_replica_config: ComputeReplicaConfig,
4992) -> (Option<OptionalDuration>, bool) {
4993 match compute_replica_config.introspection {
4994 Some(ComputeReplicaIntrospectionConfig {
4995 debugging,
4996 interval,
4997 }) => (Some(OptionalDuration(Some(interval))), debugging),
4998 None => (Some(OptionalDuration(None)), false),
4999 }
5000}
5001
5002fn plan_cluster_schedule(
5006 schedule: ClusterScheduleOptionValue,
5007) -> Result<ClusterSchedule, PlanError> {
5008 Ok(match schedule {
5009 ClusterScheduleOptionValue::Manual => ClusterSchedule::Manual,
5010 ClusterScheduleOptionValue::Refresh {
5012 hydration_time_estimate: None,
5013 } => ClusterSchedule::Refresh {
5014 hydration_time_estimate: Duration::from_millis(0),
5015 },
5016 ClusterScheduleOptionValue::Refresh {
5018 hydration_time_estimate: Some(interval_value),
5019 } => {
5020 let interval = Interval::try_from_value(Value::Interval(interval_value))?;
5021 if interval.as_microseconds() < 0 {
5022 sql_bail!(
5023 "HYDRATION TIME ESTIMATE must be non-negative; got: {}",
5024 interval
5025 );
5026 }
5027 if interval.months != 0 {
5028 sql_bail!("HYDRATION TIME ESTIMATE must not involve units larger than days");
5032 }
5033 let duration = interval.duration()?;
5034 if u64::try_from(duration.as_millis()).is_err()
5035 || Interval::from_duration(&duration).is_err()
5036 {
5037 sql_bail!("HYDRATION TIME ESTIMATE too large");
5038 }
5039 ClusterSchedule::Refresh {
5040 hydration_time_estimate: duration,
5041 }
5042 }
5043 })
5044}
5045
5046fn unplan_cluster_schedule(schedule: ClusterSchedule) -> ClusterScheduleOptionValue {
5050 match schedule {
5051 ClusterSchedule::Manual => ClusterScheduleOptionValue::Manual,
5052 ClusterSchedule::Refresh {
5053 hydration_time_estimate,
5054 } => {
5055 let interval = Interval::from_duration(&hydration_time_estimate)
5056 .expect("planning ensured that this is convertible back to Interval");
5057 let interval_value = literal::unplan_interval(&interval);
5058 ClusterScheduleOptionValue::Refresh {
5059 hydration_time_estimate: Some(interval_value),
5060 }
5061 }
5062 }
5063}
5064
5065pub fn describe_create_cluster_replica(
5066 _: &StatementContext,
5067 _: CreateClusterReplicaStatement<Aug>,
5068) -> Result<StatementDesc, PlanError> {
5069 Ok(StatementDesc::new(None))
5070}
5071
5072pub fn plan_create_cluster_replica(
5073 scx: &StatementContext,
5074 CreateClusterReplicaStatement {
5075 definition: ReplicaDefinition { name, options },
5076 of_cluster,
5077 }: CreateClusterReplicaStatement<Aug>,
5078) -> Result<Plan, PlanError> {
5079 let cluster = scx
5080 .catalog
5081 .resolve_cluster(Some(&normalize::ident(of_cluster)))?;
5082 let current_replica_count = cluster.replica_ids().iter().count();
5083 if contains_single_replica_objects(scx, cluster) && current_replica_count > 0 {
5084 let internal_replica_count = cluster.replicas().iter().filter(|r| r.internal()).count();
5085 return Err(PlanError::CreateReplicaFailStorageObjects {
5086 current_replica_count,
5087 internal_replica_count,
5088 hypothetical_replica_count: current_replica_count + 1,
5089 });
5090 }
5091
5092 let config = plan_replica_config(scx, options)?;
5093
5094 if let ReplicaConfig::Orchestrated { internal: true, .. } = &config {
5095 if MANAGED_REPLICA_PATTERN.is_match(name.as_str()) {
5096 return Err(PlanError::MangedReplicaName(name.into_string()));
5097 }
5098 } else {
5099 ensure_cluster_is_not_managed(scx, cluster.id())?;
5100 }
5101
5102 Ok(Plan::CreateClusterReplica(CreateClusterReplicaPlan {
5103 name: normalize::ident(name),
5104 cluster_id: cluster.id(),
5105 config,
5106 }))
5107}
5108
5109pub fn describe_create_secret(
5110 _: &StatementContext,
5111 _: CreateSecretStatement<Aug>,
5112) -> Result<StatementDesc, PlanError> {
5113 Ok(StatementDesc::new(None))
5114}
5115
5116pub fn plan_create_secret(
5117 scx: &StatementContext,
5118 stmt: CreateSecretStatement<Aug>,
5119) -> Result<Plan, PlanError> {
5120 let CreateSecretStatement {
5121 name,
5122 if_not_exists,
5123 value,
5124 } = &stmt;
5125
5126 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name.to_owned())?)?;
5127 let mut create_sql_statement = stmt.clone();
5128 create_sql_statement.value = Expr::Value(Value::String("********".to_string()));
5129 let create_sql =
5130 normalize::create_statement(scx, Statement::CreateSecret(create_sql_statement))?;
5131 let secret_as = query::plan_secret_as(scx, value.clone())?;
5132
5133 let secret = Secret {
5134 create_sql,
5135 secret_as,
5136 };
5137
5138 Ok(Plan::CreateSecret(CreateSecretPlan {
5139 name,
5140 secret,
5141 if_not_exists: *if_not_exists,
5142 }))
5143}
5144
5145pub fn describe_create_connection(
5146 _: &StatementContext,
5147 _: CreateConnectionStatement<Aug>,
5148) -> Result<StatementDesc, PlanError> {
5149 Ok(StatementDesc::new(None))
5150}
5151
5152generate_extracted_config!(CreateConnectionOption, (Validate, bool));
5153
5154pub fn plan_create_connection(
5155 scx: &StatementContext,
5156 mut stmt: CreateConnectionStatement<Aug>,
5157) -> Result<Plan, PlanError> {
5158 let CreateConnectionStatement {
5159 name,
5160 connection_type,
5161 values,
5162 if_not_exists,
5163 with_options,
5164 } = stmt.clone();
5165 let connection_options_extracted = connection::ConnectionOptionExtracted::try_from(values)?;
5166 let details = connection_options_extracted.try_into_connection_details(scx, connection_type)?;
5167 let name = scx.allocate_qualified_name(normalize::unresolved_item_name(name)?)?;
5168
5169 let options = CreateConnectionOptionExtracted::try_from(with_options)?;
5170 if options.validate.is_some() {
5171 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
5172 }
5173 let validate = match options.validate {
5174 Some(val) => val,
5175 None => {
5176 scx.catalog
5177 .system_vars()
5178 .enable_default_connection_validation()
5179 && details.to_connection().validate_by_default()
5180 }
5181 };
5182
5183 let full_name = scx.catalog.resolve_full_name(&name);
5185 let partial_name = PartialItemName::from(full_name.clone());
5186 if let (false, Ok(item)) = (if_not_exists, scx.catalog.resolve_item(&partial_name)) {
5187 return Err(PlanError::ItemAlreadyExists {
5188 name: full_name.to_string(),
5189 item_type: item.item_type(),
5190 });
5191 }
5192
5193 if let ConnectionDetails::Ssh { key_1, key_2, .. } = &details {
5196 stmt.values.retain(|v| {
5197 v.name != ConnectionOptionName::PublicKey1 && v.name != ConnectionOptionName::PublicKey2
5198 });
5199 stmt.values.push(ConnectionOption {
5200 name: ConnectionOptionName::PublicKey1,
5201 value: Some(WithOptionValue::Value(Value::String(key_1.public_key()))),
5202 });
5203 stmt.values.push(ConnectionOption {
5204 name: ConnectionOptionName::PublicKey2,
5205 value: Some(WithOptionValue::Value(Value::String(key_2.public_key()))),
5206 });
5207 }
5208 let create_sql = normalize::create_statement(scx, Statement::CreateConnection(stmt))?;
5209
5210 let plan = CreateConnectionPlan {
5211 name,
5212 if_not_exists,
5213 connection: crate::plan::Connection {
5214 create_sql,
5215 details,
5216 },
5217 validate,
5218 };
5219 Ok(Plan::CreateConnection(plan))
5220}
5221
5222fn plan_drop_database(
5223 scx: &StatementContext,
5224 if_exists: bool,
5225 name: &UnresolvedDatabaseName,
5226 cascade: bool,
5227) -> Result<Option<DatabaseId>, PlanError> {
5228 Ok(match resolve_database(scx, name, if_exists)? {
5229 Some(database) => {
5230 if !cascade && database.has_schemas() {
5231 sql_bail!(
5232 "database '{}' cannot be dropped with RESTRICT while it contains schemas",
5233 name,
5234 );
5235 }
5236 Some(database.id())
5237 }
5238 None => None,
5239 })
5240}
5241
5242pub fn describe_drop_objects(
5243 _: &StatementContext,
5244 _: DropObjectsStatement,
5245) -> Result<StatementDesc, PlanError> {
5246 Ok(StatementDesc::new(None))
5247}
5248
5249pub fn plan_drop_objects(
5250 scx: &mut StatementContext,
5251 DropObjectsStatement {
5252 object_type,
5253 if_exists,
5254 names,
5255 cascade,
5256 }: DropObjectsStatement,
5257) -> Result<Plan, PlanError> {
5258 assert_ne!(
5259 object_type,
5260 mz_sql_parser::ast::ObjectType::Func,
5261 "rejected in parser"
5262 );
5263 let object_type = object_type.into();
5264
5265 let mut referenced_ids = Vec::new();
5266 for name in names {
5267 let id = match &name {
5268 UnresolvedObjectName::Cluster(name) => {
5269 plan_drop_cluster(scx, if_exists, name, cascade)?.map(ObjectId::Cluster)
5270 }
5271 UnresolvedObjectName::ClusterReplica(name) => {
5272 plan_drop_cluster_replica(scx, if_exists, name)?.map(ObjectId::ClusterReplica)
5273 }
5274 UnresolvedObjectName::Database(name) => {
5275 plan_drop_database(scx, if_exists, name, cascade)?.map(ObjectId::Database)
5276 }
5277 UnresolvedObjectName::Schema(name) => {
5278 plan_drop_schema(scx, if_exists, name, cascade)?.map(ObjectId::Schema)
5279 }
5280 UnresolvedObjectName::Role(name) => {
5281 plan_drop_role(scx, if_exists, name)?.map(ObjectId::Role)
5282 }
5283 UnresolvedObjectName::Item(name) => {
5284 plan_drop_item(scx, object_type, if_exists, name.clone(), cascade)?
5285 .map(ObjectId::Item)
5286 }
5287 UnresolvedObjectName::NetworkPolicy(name) => {
5288 plan_drop_network_policy(scx, if_exists, name)?.map(ObjectId::NetworkPolicy)
5289 }
5290 };
5291 match id {
5292 Some(id) => referenced_ids.push(id),
5293 None => scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5294 name: name.to_ast_string_simple(),
5295 object_type,
5296 }),
5297 }
5298 }
5299 let drop_ids = scx.catalog.object_dependents(&referenced_ids);
5300
5301 Ok(Plan::DropObjects(DropObjectsPlan {
5302 referenced_ids,
5303 drop_ids,
5304 object_type,
5305 }))
5306}
5307
5308fn plan_drop_schema(
5309 scx: &StatementContext,
5310 if_exists: bool,
5311 name: &UnresolvedSchemaName,
5312 cascade: bool,
5313) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
5314 Ok(match resolve_schema(scx, name.clone(), if_exists)? {
5315 Some((database_spec, schema_spec)) => {
5316 if let ResolvedDatabaseSpecifier::Ambient = database_spec {
5317 sql_bail!(
5318 "cannot drop schema {name} because it is required by the database system",
5319 );
5320 }
5321 if let SchemaSpecifier::Temporary = schema_spec {
5322 sql_bail!("cannot drop schema {name} because it is a temporary schema",)
5323 }
5324 let schema = scx.get_schema(&database_spec, &schema_spec);
5325 if !cascade && schema.has_items() {
5326 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5327 sql_bail!(
5328 "schema '{}' cannot be dropped without CASCADE while it contains objects",
5329 full_schema_name
5330 );
5331 }
5332 Some((database_spec, schema_spec))
5333 }
5334 None => None,
5335 })
5336}
5337
5338fn plan_drop_role(
5339 scx: &StatementContext,
5340 if_exists: bool,
5341 name: &Ident,
5342) -> Result<Option<RoleId>, PlanError> {
5343 match scx.catalog.resolve_role(name.as_str()) {
5344 Ok(role) => {
5345 let id = role.id();
5346 if &id == scx.catalog.active_role_id() {
5347 sql_bail!("current role cannot be dropped");
5348 }
5349 for role in scx.catalog.get_roles() {
5350 for (member_id, grantor_id) in role.membership() {
5351 if &id == grantor_id {
5352 let member_role = scx.catalog.get_role(member_id);
5353 sql_bail!(
5354 "cannot drop role {}: still depended up by membership of role {} in role {}",
5355 name.as_str(),
5356 role.name(),
5357 member_role.name()
5358 );
5359 }
5360 }
5361 }
5362 Ok(Some(role.id()))
5363 }
5364 Err(_) if if_exists => Ok(None),
5365 Err(e) => Err(e.into()),
5366 }
5367}
5368
5369fn plan_drop_cluster(
5370 scx: &StatementContext,
5371 if_exists: bool,
5372 name: &Ident,
5373 cascade: bool,
5374) -> Result<Option<ClusterId>, PlanError> {
5375 Ok(match resolve_cluster(scx, name, if_exists)? {
5376 Some(cluster) => {
5377 if !cascade && !cluster.bound_objects().is_empty() {
5378 return Err(PlanError::DependentObjectsStillExist {
5379 object_type: "cluster".to_string(),
5380 object_name: cluster.name().to_string(),
5381 dependents: Vec::new(),
5382 });
5383 }
5384 Some(cluster.id())
5385 }
5386 None => None,
5387 })
5388}
5389
5390fn plan_drop_network_policy(
5391 scx: &StatementContext,
5392 if_exists: bool,
5393 name: &Ident,
5394) -> Result<Option<NetworkPolicyId>, PlanError> {
5395 match scx.catalog.resolve_network_policy(name.as_str()) {
5396 Ok(policy) => {
5397 if scx.catalog.system_vars().default_network_policy_name() == policy.name() {
5400 Err(PlanError::NetworkPolicyInUse)
5401 } else {
5402 Ok(Some(policy.id()))
5403 }
5404 }
5405 Err(_) if if_exists => Ok(None),
5406 Err(e) => Err(e.into()),
5407 }
5408}
5409
5410fn contains_single_replica_objects(scx: &StatementContext, cluster: &dyn CatalogCluster) -> bool {
5413 if ENABLE_MULTI_REPLICA_SOURCES.get(scx.catalog.system_vars().dyncfgs()) {
5415 false
5416 } else {
5417 cluster.bound_objects().iter().any(|id| {
5419 let item = scx.catalog.get_item(id);
5420 matches!(
5421 item.item_type(),
5422 CatalogItemType::Sink | CatalogItemType::Source
5423 )
5424 })
5425 }
5426}
5427
5428fn plan_drop_cluster_replica(
5429 scx: &StatementContext,
5430 if_exists: bool,
5431 name: &QualifiedReplica,
5432) -> Result<Option<(ClusterId, ReplicaId)>, PlanError> {
5433 let cluster = resolve_cluster_replica(scx, name, if_exists)?;
5434 Ok(cluster.map(|(cluster, replica_id)| (cluster.id(), replica_id)))
5435}
5436
5437fn plan_drop_item(
5439 scx: &StatementContext,
5440 object_type: ObjectType,
5441 if_exists: bool,
5442 name: UnresolvedItemName,
5443 cascade: bool,
5444) -> Result<Option<CatalogItemId>, PlanError> {
5445 let resolved = match resolve_item_or_type(scx, object_type, name, if_exists) {
5446 Ok(r) => r,
5447 Err(PlanError::MismatchedObjectType {
5449 name,
5450 is_type: ObjectType::MaterializedView,
5451 expected_type: ObjectType::View,
5452 }) => {
5453 return Err(PlanError::DropViewOnMaterializedView(name.to_string()));
5454 }
5455 e => e?,
5456 };
5457
5458 Ok(match resolved {
5459 Some(catalog_item) => {
5460 if catalog_item.id().is_system() {
5461 sql_bail!(
5462 "cannot drop {} {} because it is required by the database system",
5463 catalog_item.item_type(),
5464 scx.catalog.minimal_qualification(catalog_item.name()),
5465 );
5466 }
5467
5468 if !cascade {
5469 for id in catalog_item.used_by() {
5470 let dep = scx.catalog.get_item(id);
5471 if dependency_prevents_drop(object_type, dep) {
5472 return Err(PlanError::DependentObjectsStillExist {
5473 object_type: catalog_item.item_type().to_string(),
5474 object_name: scx
5475 .catalog
5476 .minimal_qualification(catalog_item.name())
5477 .to_string(),
5478 dependents: vec![(
5479 dep.item_type().to_string(),
5480 scx.catalog.minimal_qualification(dep.name()).to_string(),
5481 )],
5482 });
5483 }
5484 }
5485 }
5488 Some(catalog_item.id())
5489 }
5490 None => None,
5491 })
5492}
5493
5494fn dependency_prevents_drop(object_type: ObjectType, dep: &dyn CatalogItem) -> bool {
5496 match object_type {
5497 ObjectType::Type => true,
5498 _ => match dep.item_type() {
5499 CatalogItemType::Func
5500 | CatalogItemType::Table
5501 | CatalogItemType::Source
5502 | CatalogItemType::View
5503 | CatalogItemType::MaterializedView
5504 | CatalogItemType::Sink
5505 | CatalogItemType::Type
5506 | CatalogItemType::Secret
5507 | CatalogItemType::Connection
5508 | CatalogItemType::ContinualTask => true,
5509 CatalogItemType::Index => false,
5510 },
5511 }
5512}
5513
5514pub fn describe_alter_index_options(
5515 _: &StatementContext,
5516 _: AlterIndexStatement<Aug>,
5517) -> Result<StatementDesc, PlanError> {
5518 Ok(StatementDesc::new(None))
5519}
5520
5521pub fn describe_drop_owned(
5522 _: &StatementContext,
5523 _: DropOwnedStatement<Aug>,
5524) -> Result<StatementDesc, PlanError> {
5525 Ok(StatementDesc::new(None))
5526}
5527
5528pub fn plan_drop_owned(
5529 scx: &StatementContext,
5530 drop: DropOwnedStatement<Aug>,
5531) -> Result<Plan, PlanError> {
5532 let cascade = drop.cascade();
5533 let role_ids: BTreeSet<_> = drop.role_names.into_iter().map(|role| role.id).collect();
5534 let mut drop_ids = Vec::new();
5535 let mut privilege_revokes = Vec::new();
5536 let mut default_privilege_revokes = Vec::new();
5537
5538 fn update_privilege_revokes(
5539 object_id: SystemObjectId,
5540 privileges: &PrivilegeMap,
5541 role_ids: &BTreeSet<RoleId>,
5542 privilege_revokes: &mut Vec<(SystemObjectId, MzAclItem)>,
5543 ) {
5544 privilege_revokes.extend(iter::zip(
5545 iter::repeat(object_id),
5546 privileges
5547 .all_values()
5548 .filter(|privilege| role_ids.contains(&privilege.grantee))
5549 .cloned(),
5550 ));
5551 }
5552
5553 for replica in scx.catalog.get_cluster_replicas() {
5555 if role_ids.contains(&replica.owner_id()) {
5556 drop_ids.push((replica.cluster_id(), replica.replica_id()).into());
5557 }
5558 }
5559
5560 for cluster in scx.catalog.get_clusters() {
5562 if role_ids.contains(&cluster.owner_id()) {
5563 if !cascade {
5565 let non_owned_bound_objects: Vec<_> = cluster
5566 .bound_objects()
5567 .into_iter()
5568 .map(|item_id| scx.catalog.get_item(item_id))
5569 .filter(|item| !role_ids.contains(&item.owner_id()))
5570 .collect();
5571 if !non_owned_bound_objects.is_empty() {
5572 let names: Vec<_> = non_owned_bound_objects
5573 .into_iter()
5574 .map(|item| {
5575 (
5576 item.item_type().to_string(),
5577 scx.catalog.resolve_full_name(item.name()).to_string(),
5578 )
5579 })
5580 .collect();
5581 return Err(PlanError::DependentObjectsStillExist {
5582 object_type: "cluster".to_string(),
5583 object_name: cluster.name().to_string(),
5584 dependents: names,
5585 });
5586 }
5587 }
5588 drop_ids.push(cluster.id().into());
5589 }
5590 update_privilege_revokes(
5591 SystemObjectId::Object(cluster.id().into()),
5592 cluster.privileges(),
5593 &role_ids,
5594 &mut privilege_revokes,
5595 );
5596 }
5597
5598 for item in scx.catalog.get_items() {
5600 if role_ids.contains(&item.owner_id()) {
5601 if !cascade {
5602 let check_if_dependents_exist = |used_by: &[CatalogItemId]| {
5604 let non_owned_dependencies: Vec<_> = used_by
5605 .into_iter()
5606 .map(|item_id| scx.catalog.get_item(item_id))
5607 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5608 .filter(|item| !role_ids.contains(&item.owner_id()))
5609 .collect();
5610 if !non_owned_dependencies.is_empty() {
5611 let names: Vec<_> = non_owned_dependencies
5612 .into_iter()
5613 .map(|item| {
5614 let item_typ = item.item_type().to_string();
5615 let item_name =
5616 scx.catalog.resolve_full_name(item.name()).to_string();
5617 (item_typ, item_name)
5618 })
5619 .collect();
5620 Err(PlanError::DependentObjectsStillExist {
5621 object_type: item.item_type().to_string(),
5622 object_name: scx
5623 .catalog
5624 .resolve_full_name(item.name())
5625 .to_string()
5626 .to_string(),
5627 dependents: names,
5628 })
5629 } else {
5630 Ok(())
5631 }
5632 };
5633
5634 if let Some(id) = item.progress_id() {
5637 let progress_item = scx.catalog.get_item(&id);
5638 check_if_dependents_exist(progress_item.used_by())?;
5639 }
5640 check_if_dependents_exist(item.used_by())?;
5641 }
5642 drop_ids.push(item.id().into());
5643 }
5644 update_privilege_revokes(
5645 SystemObjectId::Object(item.id().into()),
5646 item.privileges(),
5647 &role_ids,
5648 &mut privilege_revokes,
5649 );
5650 }
5651
5652 for schema in scx.catalog.get_schemas() {
5654 if !schema.id().is_temporary() {
5655 if role_ids.contains(&schema.owner_id()) {
5656 if !cascade {
5657 let non_owned_dependencies: Vec<_> = schema
5658 .item_ids()
5659 .map(|item_id| scx.catalog.get_item(&item_id))
5660 .filter(|item| dependency_prevents_drop(item.item_type().into(), *item))
5661 .filter(|item| !role_ids.contains(&item.owner_id()))
5662 .collect();
5663 if !non_owned_dependencies.is_empty() {
5664 let full_schema_name = scx.catalog.resolve_full_schema_name(schema.name());
5665 sql_bail!(
5666 "schema {} cannot be dropped without CASCADE while it contains non-owned objects",
5667 full_schema_name.to_string().quoted()
5668 );
5669 }
5670 }
5671 drop_ids.push((*schema.database(), *schema.id()).into())
5672 }
5673 update_privilege_revokes(
5674 SystemObjectId::Object((*schema.database(), *schema.id()).into()),
5675 schema.privileges(),
5676 &role_ids,
5677 &mut privilege_revokes,
5678 );
5679 }
5680 }
5681
5682 for database in scx.catalog.get_databases() {
5684 if role_ids.contains(&database.owner_id()) {
5685 if !cascade {
5686 let non_owned_schemas: Vec<_> = database
5687 .schemas()
5688 .into_iter()
5689 .filter(|schema| !role_ids.contains(&schema.owner_id()))
5690 .collect();
5691 if !non_owned_schemas.is_empty() {
5692 sql_bail!(
5693 "database {} cannot be dropped without CASCADE while it contains non-owned schemas",
5694 database.name().quoted(),
5695 );
5696 }
5697 }
5698 drop_ids.push(database.id().into());
5699 }
5700 update_privilege_revokes(
5701 SystemObjectId::Object(database.id().into()),
5702 database.privileges(),
5703 &role_ids,
5704 &mut privilege_revokes,
5705 );
5706 }
5707
5708 update_privilege_revokes(
5710 SystemObjectId::System,
5711 scx.catalog.get_system_privileges(),
5712 &role_ids,
5713 &mut privilege_revokes,
5714 );
5715
5716 for (default_privilege_object, default_privilege_acl_items) in
5717 scx.catalog.get_default_privileges()
5718 {
5719 for default_privilege_acl_item in default_privilege_acl_items {
5720 if role_ids.contains(&default_privilege_object.role_id)
5721 || role_ids.contains(&default_privilege_acl_item.grantee)
5722 {
5723 default_privilege_revokes.push((
5724 default_privilege_object.clone(),
5725 default_privilege_acl_item.clone(),
5726 ));
5727 }
5728 }
5729 }
5730
5731 let drop_ids = scx.catalog.object_dependents(&drop_ids);
5732
5733 let system_ids: Vec<_> = drop_ids.iter().filter(|id| id.is_system()).collect();
5734 if !system_ids.is_empty() {
5735 let mut owners = system_ids
5736 .into_iter()
5737 .filter_map(|object_id| scx.catalog.get_owner_id(object_id))
5738 .collect::<BTreeSet<_>>()
5739 .into_iter()
5740 .map(|role_id| scx.catalog.get_role(&role_id).name().quoted());
5741 sql_bail!(
5742 "cannot drop objects owned by role {} because they are required by the database system",
5743 owners.join(", "),
5744 );
5745 }
5746
5747 Ok(Plan::DropOwned(DropOwnedPlan {
5748 role_ids: role_ids.into_iter().collect(),
5749 drop_ids,
5750 privilege_revokes,
5751 default_privilege_revokes,
5752 }))
5753}
5754
5755fn plan_retain_history_option(
5756 scx: &StatementContext,
5757 retain_history: Option<OptionalDuration>,
5758) -> Result<Option<CompactionWindow>, PlanError> {
5759 if let Some(OptionalDuration(lcw)) = retain_history {
5760 Ok(Some(plan_retain_history(scx, lcw)?))
5761 } else {
5762 Ok(None)
5763 }
5764}
5765
5766fn plan_retain_history(
5772 scx: &StatementContext,
5773 lcw: Option<Duration>,
5774) -> Result<CompactionWindow, PlanError> {
5775 scx.require_feature_flag(&vars::ENABLE_LOGICAL_COMPACTION_WINDOW)?;
5776 match lcw {
5777 Some(Duration::ZERO) => Err(PlanError::InvalidOptionValue {
5782 option_name: "RETAIN HISTORY".to_string(),
5783 err: Box::new(PlanError::Unstructured(
5784 "internal error: unexpectedly zero".to_string(),
5785 )),
5786 }),
5787 Some(duration) => {
5788 if duration < DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION
5791 && scx
5792 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5793 .is_err()
5794 {
5795 return Err(PlanError::RetainHistoryLow {
5796 limit: DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION,
5797 });
5798 }
5799 Ok(duration.try_into()?)
5800 }
5801 None => {
5804 if scx
5805 .require_feature_flag(&vars::ENABLE_UNLIMITED_RETAIN_HISTORY)
5806 .is_err()
5807 {
5808 Err(PlanError::RetainHistoryRequired)
5809 } else {
5810 Ok(CompactionWindow::DisableCompaction)
5811 }
5812 }
5813 }
5814}
5815
5816generate_extracted_config!(IndexOption, (RetainHistory, OptionalDuration));
5817
5818fn plan_index_options(
5819 scx: &StatementContext,
5820 with_opts: Vec<IndexOption<Aug>>,
5821) -> Result<Vec<crate::plan::IndexOption>, PlanError> {
5822 if !with_opts.is_empty() {
5823 scx.require_feature_flag(&vars::ENABLE_INDEX_OPTIONS)?;
5825 }
5826
5827 let IndexOptionExtracted { retain_history, .. }: IndexOptionExtracted = with_opts.try_into()?;
5828
5829 let mut out = Vec::with_capacity(1);
5830 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5831 out.push(crate::plan::IndexOption::RetainHistory(cw));
5832 }
5833 Ok(out)
5834}
5835
5836generate_extracted_config!(
5837 TableOption,
5838 (PartitionBy, Vec<Ident>),
5839 (RetainHistory, OptionalDuration),
5840 (RedactedTest, String)
5841);
5842
5843fn plan_table_options(
5844 scx: &StatementContext,
5845 desc: &RelationDesc,
5846 with_opts: Vec<TableOption<Aug>>,
5847) -> Result<Vec<crate::plan::TableOption>, PlanError> {
5848 let TableOptionExtracted {
5849 partition_by,
5850 retain_history,
5851 redacted_test,
5852 ..
5853 }: TableOptionExtracted = with_opts.try_into()?;
5854
5855 if let Some(partition_by) = partition_by {
5856 scx.require_feature_flag(&ENABLE_COLLECTION_PARTITION_BY)?;
5857 check_partition_by(desc, partition_by)?;
5858 }
5859
5860 if redacted_test.is_some() {
5861 scx.require_feature_flag(&vars::ENABLE_REDACTED_TEST_OPTION)?;
5862 }
5863
5864 let mut out = Vec::with_capacity(1);
5865 if let Some(cw) = plan_retain_history_option(scx, retain_history)? {
5866 out.push(crate::plan::TableOption::RetainHistory(cw));
5867 }
5868 Ok(out)
5869}
5870
5871pub fn plan_alter_index_options(
5872 scx: &mut StatementContext,
5873 AlterIndexStatement {
5874 index_name,
5875 if_exists,
5876 action,
5877 }: AlterIndexStatement<Aug>,
5878) -> Result<Plan, PlanError> {
5879 let object_type = ObjectType::Index;
5880 match action {
5881 AlterIndexAction::ResetOptions(options) => {
5882 let mut options = options.into_iter();
5883 if let Some(opt) = options.next() {
5884 match opt {
5885 IndexOptionName::RetainHistory => {
5886 if options.next().is_some() {
5887 sql_bail!("RETAIN HISTORY must be only option");
5888 }
5889 return alter_retain_history(
5890 scx,
5891 object_type,
5892 if_exists,
5893 UnresolvedObjectName::Item(index_name),
5894 None,
5895 );
5896 }
5897 }
5898 }
5899 sql_bail!("expected option");
5900 }
5901 AlterIndexAction::SetOptions(options) => {
5902 let mut options = options.into_iter();
5903 if let Some(opt) = options.next() {
5904 match opt.name {
5905 IndexOptionName::RetainHistory => {
5906 if options.next().is_some() {
5907 sql_bail!("RETAIN HISTORY must be only option");
5908 }
5909 return alter_retain_history(
5910 scx,
5911 object_type,
5912 if_exists,
5913 UnresolvedObjectName::Item(index_name),
5914 opt.value,
5915 );
5916 }
5917 }
5918 }
5919 sql_bail!("expected option");
5920 }
5921 }
5922}
5923
5924pub fn describe_alter_cluster_set_options(
5925 _: &StatementContext,
5926 _: AlterClusterStatement<Aug>,
5927) -> Result<StatementDesc, PlanError> {
5928 Ok(StatementDesc::new(None))
5929}
5930
5931pub fn plan_alter_cluster(
5932 scx: &mut StatementContext,
5933 AlterClusterStatement {
5934 name,
5935 action,
5936 if_exists,
5937 }: AlterClusterStatement<Aug>,
5938) -> Result<Plan, PlanError> {
5939 let cluster = match resolve_cluster(scx, &name, if_exists)? {
5940 Some(entry) => entry,
5941 None => {
5942 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
5943 name: name.to_ast_string_simple(),
5944 object_type: ObjectType::Cluster,
5945 });
5946
5947 return Ok(Plan::AlterNoop(AlterNoopPlan {
5948 object_type: ObjectType::Cluster,
5949 }));
5950 }
5951 };
5952
5953 let mut options: PlanClusterOption = Default::default();
5954 let mut alter_strategy: AlterClusterPlanStrategy = AlterClusterPlanStrategy::None;
5955
5956 match action {
5957 AlterClusterAction::SetOptions {
5958 options: set_options,
5959 with_options,
5960 } => {
5961 let ClusterOptionExtracted {
5962 availability_zones,
5963 introspection_debugging,
5964 introspection_interval,
5965 managed,
5966 replicas: replica_defs,
5967 replication_factor,
5968 seen: _,
5969 size,
5970 disk,
5971 schedule,
5972 workload_class,
5973 }: ClusterOptionExtracted = set_options.try_into()?;
5974
5975 if !scx.catalog.active_role_id().is_system() {
5976 if workload_class.is_some() {
5977 sql_bail!("WORKLOAD CLASS not supported for non-system users");
5978 }
5979 }
5980
5981 match managed.unwrap_or_else(|| cluster.is_managed()) {
5982 true => {
5983 let alter_strategy_extracted =
5984 ClusterAlterOptionExtracted::try_from(with_options)?;
5985 alter_strategy = AlterClusterPlanStrategy::try_from(alter_strategy_extracted)?;
5986
5987 match alter_strategy {
5988 AlterClusterPlanStrategy::None => {}
5989 _ => {
5990 scx.require_feature_flag(
5991 &crate::session::vars::ENABLE_ZERO_DOWNTIME_CLUSTER_RECONFIGURATION,
5992 )?;
5993 }
5994 }
5995
5996 if replica_defs.is_some() {
5997 sql_bail!("REPLICAS not supported for managed clusters");
5998 }
5999 if schedule.is_some()
6000 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6001 {
6002 scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?;
6003 }
6004
6005 if let Some(replication_factor) = replication_factor {
6006 if schedule.is_some()
6007 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6008 {
6009 sql_bail!(
6010 "REPLICATION FACTOR cannot be given together with any SCHEDULE other than MANUAL"
6011 );
6012 }
6013 if let Some(current_schedule) = cluster.schedule() {
6014 if !matches!(current_schedule, ClusterSchedule::Manual) {
6015 sql_bail!(
6016 "REPLICATION FACTOR cannot be set if the cluster SCHEDULE is anything other than MANUAL"
6017 );
6018 }
6019 }
6020
6021 let internal_replica_count =
6022 cluster.replicas().iter().filter(|r| r.internal()).count();
6023 let hypothetical_replica_count =
6024 internal_replica_count + usize::cast_from(replication_factor);
6025
6026 if contains_single_replica_objects(scx, cluster)
6029 && hypothetical_replica_count > 1
6030 {
6031 return Err(PlanError::CreateReplicaFailStorageObjects {
6032 current_replica_count: cluster.replica_ids().iter().count(),
6033 internal_replica_count,
6034 hypothetical_replica_count,
6035 });
6036 }
6037 } else if alter_strategy.is_some() {
6038 let internal_replica_count =
6042 cluster.replicas().iter().filter(|r| r.internal()).count();
6043 let hypothetical_replica_count = internal_replica_count * 2;
6044 if contains_single_replica_objects(scx, cluster) {
6045 return Err(PlanError::CreateReplicaFailStorageObjects {
6046 current_replica_count: cluster.replica_ids().iter().count(),
6047 internal_replica_count,
6048 hypothetical_replica_count,
6049 });
6050 }
6051 }
6052 }
6053 false => {
6054 if !alter_strategy.is_none() {
6055 sql_bail!("ALTER... WITH not supported for unmanaged clusters");
6056 }
6057 if availability_zones.is_some() {
6058 sql_bail!("AVAILABILITY ZONES not supported for unmanaged clusters");
6059 }
6060 if replication_factor.is_some() {
6061 sql_bail!("REPLICATION FACTOR not supported for unmanaged clusters");
6062 }
6063 if introspection_debugging.is_some() {
6064 sql_bail!("INTROSPECTION DEBUGGING not supported for unmanaged clusters");
6065 }
6066 if introspection_interval.is_some() {
6067 sql_bail!("INTROSPECTION INTERVAL not supported for unmanaged clusters");
6068 }
6069 if size.is_some() {
6070 sql_bail!("SIZE not supported for unmanaged clusters");
6071 }
6072 if disk.is_some() {
6073 sql_bail!("DISK not supported for unmanaged clusters");
6074 }
6075 if schedule.is_some()
6076 && !matches!(schedule, Some(ClusterScheduleOptionValue::Manual))
6077 {
6078 sql_bail!(
6079 "cluster schedules other than MANUAL are not supported for unmanaged clusters"
6080 );
6081 }
6082 if let Some(current_schedule) = cluster.schedule() {
6083 if !matches!(current_schedule, ClusterSchedule::Manual)
6084 && schedule.is_none()
6085 {
6086 sql_bail!(
6087 "when switching a cluster to unmanaged, if the managed \
6088 cluster's SCHEDULE is anything other than MANUAL, you have to \
6089 explicitly set the SCHEDULE to MANUAL"
6090 );
6091 }
6092 }
6093 }
6094 }
6095
6096 let mut replicas = vec![];
6097 for ReplicaDefinition { name, options } in
6098 replica_defs.into_iter().flat_map(Vec::into_iter)
6099 {
6100 replicas.push((normalize::ident(name), plan_replica_config(scx, options)?));
6101 }
6102
6103 if let Some(managed) = managed {
6104 options.managed = AlterOptionParameter::Set(managed);
6105 }
6106 if let Some(replication_factor) = replication_factor {
6107 options.replication_factor = AlterOptionParameter::Set(replication_factor);
6108 }
6109 if let Some(size) = &size {
6110 options.size = AlterOptionParameter::Set(size.clone());
6111 }
6112 if let Some(availability_zones) = availability_zones {
6113 options.availability_zones = AlterOptionParameter::Set(availability_zones);
6114 }
6115 if let Some(introspection_debugging) = introspection_debugging {
6116 options.introspection_debugging =
6117 AlterOptionParameter::Set(introspection_debugging);
6118 }
6119 if let Some(introspection_interval) = introspection_interval {
6120 options.introspection_interval = AlterOptionParameter::Set(introspection_interval);
6121 }
6122 if disk.is_some() {
6123 let size = size.as_deref().unwrap_or_else(|| {
6127 cluster.managed_size().expect("cluster known to be managed")
6128 });
6129 if scx.catalog.is_cluster_size_cc(size) {
6130 sql_bail!(
6131 "DISK option not supported for modern cluster sizes because disk is always enabled"
6132 );
6133 }
6134
6135 scx.catalog
6136 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated);
6137 }
6138 if !replicas.is_empty() {
6139 options.replicas = AlterOptionParameter::Set(replicas);
6140 }
6141 if let Some(schedule) = schedule {
6142 options.schedule = AlterOptionParameter::Set(plan_cluster_schedule(schedule)?);
6143 }
6144 if let Some(workload_class) = workload_class {
6145 options.workload_class = AlterOptionParameter::Set(workload_class.0);
6146 }
6147 }
6148 AlterClusterAction::ResetOptions(reset_options) => {
6149 use AlterOptionParameter::Reset;
6150 use ClusterOptionName::*;
6151
6152 if !scx.catalog.active_role_id().is_system() {
6153 if reset_options.contains(&WorkloadClass) {
6154 sql_bail!("WORKLOAD CLASS not supported for non-system users");
6155 }
6156 }
6157
6158 for option in reset_options {
6159 match option {
6160 AvailabilityZones => options.availability_zones = Reset,
6161 Disk => scx
6162 .catalog
6163 .add_notice(PlanNotice::ReplicaDiskOptionDeprecated),
6164 IntrospectionInterval => options.introspection_interval = Reset,
6165 IntrospectionDebugging => options.introspection_debugging = Reset,
6166 Managed => options.managed = Reset,
6167 Replicas => options.replicas = Reset,
6168 ReplicationFactor => options.replication_factor = Reset,
6169 Size => options.size = Reset,
6170 Schedule => options.schedule = Reset,
6171 WorkloadClass => options.workload_class = Reset,
6172 }
6173 }
6174 }
6175 }
6176 Ok(Plan::AlterCluster(AlterClusterPlan {
6177 id: cluster.id(),
6178 name: cluster.name().to_string(),
6179 options,
6180 strategy: alter_strategy,
6181 }))
6182}
6183
6184pub fn describe_alter_set_cluster(
6185 _: &StatementContext,
6186 _: AlterSetClusterStatement<Aug>,
6187) -> Result<StatementDesc, PlanError> {
6188 Ok(StatementDesc::new(None))
6189}
6190
6191pub fn plan_alter_item_set_cluster(
6192 scx: &StatementContext,
6193 AlterSetClusterStatement {
6194 if_exists,
6195 set_cluster: in_cluster_name,
6196 name,
6197 object_type,
6198 }: AlterSetClusterStatement<Aug>,
6199) -> Result<Plan, PlanError> {
6200 scx.require_feature_flag(&vars::ENABLE_ALTER_SET_CLUSTER)?;
6201
6202 let object_type = object_type.into();
6203
6204 match object_type {
6206 ObjectType::MaterializedView => {}
6207 ObjectType::Index | ObjectType::Sink | ObjectType::Source => {
6208 bail_unsupported!(29606, format!("ALTER {object_type} SET CLUSTER"))
6209 }
6210 _ => {
6211 bail_never_supported!(
6212 format!("ALTER {object_type} SET CLUSTER"),
6213 "sql/alter-set-cluster/",
6214 format!("{object_type} has no associated cluster")
6215 )
6216 }
6217 }
6218
6219 let in_cluster = scx.catalog.get_cluster(in_cluster_name.id);
6220
6221 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6222 Some(entry) => {
6223 let current_cluster = entry.cluster_id();
6224 let Some(current_cluster) = current_cluster else {
6225 sql_bail!("No cluster associated with {name}");
6226 };
6227
6228 if current_cluster == in_cluster.id() {
6229 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6230 } else {
6231 Ok(Plan::AlterSetCluster(AlterSetClusterPlan {
6232 id: entry.id(),
6233 set_cluster: in_cluster.id(),
6234 }))
6235 }
6236 }
6237 None => {
6238 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6239 name: name.to_ast_string_simple(),
6240 object_type,
6241 });
6242
6243 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6244 }
6245 }
6246}
6247
6248pub fn describe_alter_object_rename(
6249 _: &StatementContext,
6250 _: AlterObjectRenameStatement,
6251) -> Result<StatementDesc, PlanError> {
6252 Ok(StatementDesc::new(None))
6253}
6254
6255pub fn plan_alter_object_rename(
6256 scx: &mut StatementContext,
6257 AlterObjectRenameStatement {
6258 name,
6259 object_type,
6260 to_item_name,
6261 if_exists,
6262 }: AlterObjectRenameStatement,
6263) -> Result<Plan, PlanError> {
6264 let object_type = object_type.into();
6265 match (object_type, name) {
6266 (
6267 ObjectType::View
6268 | ObjectType::MaterializedView
6269 | ObjectType::Table
6270 | ObjectType::Source
6271 | ObjectType::Index
6272 | ObjectType::Sink
6273 | ObjectType::Secret
6274 | ObjectType::Connection,
6275 UnresolvedObjectName::Item(name),
6276 ) => plan_alter_item_rename(scx, object_type, name, to_item_name, if_exists),
6277 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name)) => {
6278 plan_alter_cluster_rename(scx, object_type, name, to_item_name, if_exists)
6279 }
6280 (ObjectType::ClusterReplica, UnresolvedObjectName::ClusterReplica(name)) => {
6281 plan_alter_cluster_replica_rename(scx, object_type, name, to_item_name, if_exists)
6282 }
6283 (ObjectType::Schema, UnresolvedObjectName::Schema(name)) => {
6284 plan_alter_schema_rename(scx, name, to_item_name, if_exists)
6285 }
6286 (object_type, name) => {
6287 unreachable!("parser set the wrong object type '{object_type:?}' for name {name:?}")
6288 }
6289 }
6290}
6291
6292pub fn plan_alter_schema_rename(
6293 scx: &mut StatementContext,
6294 name: UnresolvedSchemaName,
6295 to_schema_name: Ident,
6296 if_exists: bool,
6297) -> Result<Plan, PlanError> {
6298 let Some((db_spec, schema_spec)) = resolve_schema(scx, name.clone(), if_exists)? else {
6299 let object_type = ObjectType::Schema;
6300 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6301 name: name.to_ast_string_simple(),
6302 object_type,
6303 });
6304 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6305 };
6306
6307 if scx
6309 .resolve_schema_in_database(&db_spec, &to_schema_name)
6310 .is_ok()
6311 {
6312 return Err(PlanError::Catalog(CatalogError::SchemaAlreadyExists(
6313 to_schema_name.clone().into_string(),
6314 )));
6315 }
6316
6317 let schema = scx.catalog.get_schema(&db_spec, &schema_spec);
6319 if schema.id().is_system() {
6320 bail_never_supported!(format!("renaming the {} schema", schema.name().schema))
6321 }
6322
6323 Ok(Plan::AlterSchemaRename(AlterSchemaRenamePlan {
6324 cur_schema_spec: (db_spec, schema_spec),
6325 new_schema_name: to_schema_name.into_string(),
6326 }))
6327}
6328
6329pub fn plan_alter_schema_swap<F>(
6330 scx: &mut StatementContext,
6331 name_a: UnresolvedSchemaName,
6332 name_b: Ident,
6333 gen_temp_suffix: F,
6334) -> Result<Plan, PlanError>
6335where
6336 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6337{
6338 let schema_a = scx.resolve_schema(name_a.clone())?;
6339
6340 let db_spec = schema_a.database().clone();
6341 if matches!(db_spec, ResolvedDatabaseSpecifier::Ambient) {
6342 sql_bail!("cannot swap schemas that are in the ambient database");
6343 };
6344 let schema_b = scx.resolve_schema_in_database(&db_spec, &name_b)?;
6345
6346 if schema_a.id().is_system() || schema_b.id().is_system() {
6348 bail_never_supported!("swapping a system schema".to_string())
6349 }
6350
6351 let check = |temp_suffix: &str| {
6355 let mut temp_name = ident!("mz_schema_swap_");
6356 temp_name.append_lossy(temp_suffix);
6357 scx.resolve_schema_in_database(&db_spec, &temp_name)
6358 .is_err()
6359 };
6360 let temp_suffix = gen_temp_suffix(&check)?;
6361 let name_temp = format!("mz_schema_swap_{temp_suffix}");
6362
6363 Ok(Plan::AlterSchemaSwap(AlterSchemaSwapPlan {
6364 schema_a_spec: (*schema_a.database(), *schema_a.id()),
6365 schema_a_name: schema_a.name().schema.to_string(),
6366 schema_b_spec: (*schema_b.database(), *schema_b.id()),
6367 schema_b_name: schema_b.name().schema.to_string(),
6368 name_temp,
6369 }))
6370}
6371
6372pub fn plan_alter_item_rename(
6373 scx: &mut StatementContext,
6374 object_type: ObjectType,
6375 name: UnresolvedItemName,
6376 to_item_name: Ident,
6377 if_exists: bool,
6378) -> Result<Plan, PlanError> {
6379 let resolved = match resolve_item_or_type(scx, object_type, name.clone(), if_exists) {
6380 Ok(r) => r,
6381 Err(PlanError::MismatchedObjectType {
6383 name,
6384 is_type: ObjectType::MaterializedView,
6385 expected_type: ObjectType::View,
6386 }) => {
6387 return Err(PlanError::AlterViewOnMaterializedView(name.to_string()));
6388 }
6389 e => e?,
6390 };
6391
6392 match resolved {
6393 Some(entry) => {
6394 let full_name = scx.catalog.resolve_full_name(entry.name());
6395 let item_type = entry.item_type();
6396
6397 let proposed_name = QualifiedItemName {
6398 qualifiers: entry.name().qualifiers.clone(),
6399 item: to_item_name.clone().into_string(),
6400 };
6401
6402 let conflicting_type_exists;
6406 let conflicting_item_exists;
6407 if item_type == CatalogItemType::Type {
6408 conflicting_type_exists = scx.catalog.get_type_by_name(&proposed_name).is_some();
6409 conflicting_item_exists = scx
6410 .catalog
6411 .get_item_by_name(&proposed_name)
6412 .map(|item| item.item_type().conflicts_with_type())
6413 .unwrap_or(false);
6414 } else {
6415 conflicting_type_exists = item_type.conflicts_with_type()
6416 && scx.catalog.get_type_by_name(&proposed_name).is_some();
6417 conflicting_item_exists = scx.catalog.get_item_by_name(&proposed_name).is_some();
6418 };
6419 if conflicting_type_exists || conflicting_item_exists {
6420 sql_bail!("catalog item '{}' already exists", to_item_name);
6421 }
6422
6423 Ok(Plan::AlterItemRename(AlterItemRenamePlan {
6424 id: entry.id(),
6425 current_full_name: full_name,
6426 to_name: normalize::ident(to_item_name),
6427 object_type,
6428 }))
6429 }
6430 None => {
6431 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6432 name: name.to_ast_string_simple(),
6433 object_type,
6434 });
6435
6436 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6437 }
6438 }
6439}
6440
6441pub fn plan_alter_cluster_rename(
6442 scx: &mut StatementContext,
6443 object_type: ObjectType,
6444 name: Ident,
6445 to_name: Ident,
6446 if_exists: bool,
6447) -> Result<Plan, PlanError> {
6448 match resolve_cluster(scx, &name, if_exists)? {
6449 Some(entry) => Ok(Plan::AlterClusterRename(AlterClusterRenamePlan {
6450 id: entry.id(),
6451 name: entry.name().to_string(),
6452 to_name: ident(to_name),
6453 })),
6454 None => {
6455 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6456 name: name.to_ast_string_simple(),
6457 object_type,
6458 });
6459
6460 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6461 }
6462 }
6463}
6464
6465pub fn plan_alter_cluster_swap<F>(
6466 scx: &mut StatementContext,
6467 name_a: Ident,
6468 name_b: Ident,
6469 gen_temp_suffix: F,
6470) -> Result<Plan, PlanError>
6471where
6472 F: Fn(&dyn Fn(&str) -> bool) -> Result<String, PlanError>,
6473{
6474 let cluster_a = scx.resolve_cluster(Some(&name_a))?;
6475 let cluster_b = scx.resolve_cluster(Some(&name_b))?;
6476
6477 let check = |temp_suffix: &str| {
6478 let mut temp_name = ident!("mz_schema_swap_");
6479 temp_name.append_lossy(temp_suffix);
6480 match scx.catalog.resolve_cluster(Some(temp_name.as_str())) {
6481 Err(CatalogError::UnknownCluster(_)) => true,
6483 Ok(_) | Err(_) => false,
6485 }
6486 };
6487 let temp_suffix = gen_temp_suffix(&check)?;
6488 let name_temp = format!("mz_cluster_swap_{temp_suffix}");
6489
6490 Ok(Plan::AlterClusterSwap(AlterClusterSwapPlan {
6491 id_a: cluster_a.id(),
6492 id_b: cluster_b.id(),
6493 name_a: name_a.into_string(),
6494 name_b: name_b.into_string(),
6495 name_temp,
6496 }))
6497}
6498
6499pub fn plan_alter_cluster_replica_rename(
6500 scx: &mut StatementContext,
6501 object_type: ObjectType,
6502 name: QualifiedReplica,
6503 to_item_name: Ident,
6504 if_exists: bool,
6505) -> Result<Plan, PlanError> {
6506 match resolve_cluster_replica(scx, &name, if_exists)? {
6507 Some((cluster, replica)) => {
6508 ensure_cluster_is_not_managed(scx, cluster.id())?;
6509 Ok(Plan::AlterClusterReplicaRename(
6510 AlterClusterReplicaRenamePlan {
6511 cluster_id: cluster.id(),
6512 replica_id: replica,
6513 name: QualifiedReplica {
6514 cluster: Ident::new(cluster.name())?,
6515 replica: name.replica,
6516 },
6517 to_name: normalize::ident(to_item_name),
6518 },
6519 ))
6520 }
6521 None => {
6522 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6523 name: name.to_ast_string_simple(),
6524 object_type,
6525 });
6526
6527 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6528 }
6529 }
6530}
6531
6532pub fn describe_alter_object_swap(
6533 _: &StatementContext,
6534 _: AlterObjectSwapStatement,
6535) -> Result<StatementDesc, PlanError> {
6536 Ok(StatementDesc::new(None))
6537}
6538
6539pub fn plan_alter_object_swap(
6540 scx: &mut StatementContext,
6541 stmt: AlterObjectSwapStatement,
6542) -> Result<Plan, PlanError> {
6543 scx.require_feature_flag(&vars::ENABLE_ALTER_SWAP)?;
6544
6545 let AlterObjectSwapStatement {
6546 object_type,
6547 name_a,
6548 name_b,
6549 } = stmt;
6550 let object_type = object_type.into();
6551
6552 let gen_temp_suffix = |check_fn: &dyn Fn(&str) -> bool| {
6554 let mut attempts = 0;
6555 let name_temp = loop {
6556 attempts += 1;
6557 if attempts > 10 {
6558 tracing::warn!("Unable to generate temp id for swapping");
6559 sql_bail!("unable to swap!");
6560 }
6561
6562 let short_id = mz_ore::id_gen::temp_id();
6564 if check_fn(&short_id) {
6565 break short_id;
6566 }
6567 };
6568
6569 Ok(name_temp)
6570 };
6571
6572 match (object_type, name_a, name_b) {
6573 (ObjectType::Schema, UnresolvedObjectName::Schema(name_a), name_b) => {
6574 plan_alter_schema_swap(scx, name_a, name_b, gen_temp_suffix)
6575 }
6576 (ObjectType::Cluster, UnresolvedObjectName::Cluster(name_a), name_b) => {
6577 plan_alter_cluster_swap(scx, name_a, name_b, gen_temp_suffix)
6578 }
6579 (object_type, _, _) => Err(PlanError::Unsupported {
6580 feature: format!("ALTER {object_type} .. SWAP WITH ..."),
6581 discussion_no: None,
6582 }),
6583 }
6584}
6585
6586pub fn describe_alter_retain_history(
6587 _: &StatementContext,
6588 _: AlterRetainHistoryStatement<Aug>,
6589) -> Result<StatementDesc, PlanError> {
6590 Ok(StatementDesc::new(None))
6591}
6592
6593pub fn plan_alter_retain_history(
6594 scx: &StatementContext,
6595 AlterRetainHistoryStatement {
6596 object_type,
6597 if_exists,
6598 name,
6599 history,
6600 }: AlterRetainHistoryStatement<Aug>,
6601) -> Result<Plan, PlanError> {
6602 alter_retain_history(scx, object_type.into(), if_exists, name, history)
6603}
6604
6605fn alter_retain_history(
6606 scx: &StatementContext,
6607 object_type: ObjectType,
6608 if_exists: bool,
6609 name: UnresolvedObjectName,
6610 history: Option<WithOptionValue<Aug>>,
6611) -> Result<Plan, PlanError> {
6612 let name = match (object_type, name) {
6613 (
6614 ObjectType::View
6616 | ObjectType::MaterializedView
6617 | ObjectType::Table
6618 | ObjectType::Source
6619 | ObjectType::Index,
6620 UnresolvedObjectName::Item(name),
6621 ) => name,
6622 (object_type, _) => {
6623 sql_bail!("{object_type} does not support RETAIN HISTORY")
6624 }
6625 };
6626 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6627 Some(entry) => {
6628 let full_name = scx.catalog.resolve_full_name(entry.name());
6629 let item_type = entry.item_type();
6630
6631 if object_type == ObjectType::View && item_type == CatalogItemType::MaterializedView {
6633 return Err(PlanError::AlterViewOnMaterializedView(
6634 full_name.to_string(),
6635 ));
6636 } else if object_type == ObjectType::View {
6637 sql_bail!("{object_type} does not support RETAIN HISTORY")
6638 } else if object_type != item_type {
6639 sql_bail!(
6640 "\"{}\" is a {} not a {}",
6641 full_name,
6642 entry.item_type(),
6643 format!("{object_type}").to_lowercase()
6644 )
6645 }
6646
6647 let (value, lcw) = match &history {
6649 Some(WithOptionValue::RetainHistoryFor(value)) => {
6650 let window = OptionalDuration::try_from_value(value.clone())?;
6651 (Some(value.clone()), window.0)
6652 }
6653 None => (None, Some(DEFAULT_LOGICAL_COMPACTION_WINDOW_DURATION)),
6655 _ => sql_bail!("unexpected value type for RETAIN HISTORY"),
6656 };
6657 let window = plan_retain_history(scx, lcw)?;
6658
6659 Ok(Plan::AlterRetainHistory(AlterRetainHistoryPlan {
6660 id: entry.id(),
6661 value,
6662 window,
6663 object_type,
6664 }))
6665 }
6666 None => {
6667 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6668 name: name.to_ast_string_simple(),
6669 object_type,
6670 });
6671
6672 Ok(Plan::AlterNoop(AlterNoopPlan { object_type }))
6673 }
6674 }
6675}
6676
6677pub fn describe_alter_secret_options(
6678 _: &StatementContext,
6679 _: AlterSecretStatement<Aug>,
6680) -> Result<StatementDesc, PlanError> {
6681 Ok(StatementDesc::new(None))
6682}
6683
6684pub fn plan_alter_secret(
6685 scx: &mut StatementContext,
6686 stmt: AlterSecretStatement<Aug>,
6687) -> Result<Plan, PlanError> {
6688 let AlterSecretStatement {
6689 name,
6690 if_exists,
6691 value,
6692 } = stmt;
6693 let object_type = ObjectType::Secret;
6694 let id = match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
6695 Some(entry) => entry.id(),
6696 None => {
6697 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6698 name: name.to_string(),
6699 object_type,
6700 });
6701
6702 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6703 }
6704 };
6705
6706 let secret_as = query::plan_secret_as(scx, value)?;
6707
6708 Ok(Plan::AlterSecret(AlterSecretPlan { id, secret_as }))
6709}
6710
6711pub fn describe_alter_connection(
6712 _: &StatementContext,
6713 _: AlterConnectionStatement<Aug>,
6714) -> Result<StatementDesc, PlanError> {
6715 Ok(StatementDesc::new(None))
6716}
6717
6718generate_extracted_config!(AlterConnectionOption, (Validate, bool));
6719
6720pub fn plan_alter_connection(
6721 scx: &StatementContext,
6722 stmt: AlterConnectionStatement<Aug>,
6723) -> Result<Plan, PlanError> {
6724 let AlterConnectionStatement {
6725 name,
6726 if_exists,
6727 actions,
6728 with_options,
6729 } = stmt;
6730 let conn_name = normalize::unresolved_item_name(name)?;
6731 let entry = match scx.catalog.resolve_item(&conn_name) {
6732 Ok(entry) => entry,
6733 Err(_) if if_exists => {
6734 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6735 name: conn_name.to_string(),
6736 object_type: ObjectType::Sink,
6737 });
6738
6739 return Ok(Plan::AlterNoop(AlterNoopPlan {
6740 object_type: ObjectType::Connection,
6741 }));
6742 }
6743 Err(e) => return Err(e.into()),
6744 };
6745
6746 let connection = entry.connection()?;
6747
6748 if actions
6749 .iter()
6750 .any(|action| matches!(action, AlterConnectionAction::RotateKeys))
6751 {
6752 if actions.len() > 1 {
6753 sql_bail!("cannot specify any other actions alongside ALTER CONNECTION...ROTATE KEYS");
6754 }
6755
6756 if !with_options.is_empty() {
6757 sql_bail!(
6758 "ALTER CONNECTION...ROTATE KEYS does not support WITH ({})",
6759 with_options
6760 .iter()
6761 .map(|o| o.to_ast_string_simple())
6762 .join(", ")
6763 );
6764 }
6765
6766 if !matches!(connection, Connection::Ssh(_)) {
6767 sql_bail!(
6768 "{} is not an SSH connection",
6769 scx.catalog.resolve_full_name(entry.name())
6770 )
6771 }
6772
6773 return Ok(Plan::AlterConnection(AlterConnectionPlan {
6774 id: entry.id(),
6775 action: crate::plan::AlterConnectionAction::RotateKeys,
6776 }));
6777 }
6778
6779 let options = AlterConnectionOptionExtracted::try_from(with_options)?;
6780 if options.validate.is_some() {
6781 scx.require_feature_flag(&vars::ENABLE_CONNECTION_VALIDATION_SYNTAX)?;
6782 }
6783
6784 let validate = match options.validate {
6785 Some(val) => val,
6786 None => {
6787 scx.catalog
6788 .system_vars()
6789 .enable_default_connection_validation()
6790 && connection.validate_by_default()
6791 }
6792 };
6793
6794 let connection_type = match connection {
6795 Connection::Aws(_) => CreateConnectionType::Aws,
6796 Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink,
6797 Connection::Kafka(_) => CreateConnectionType::Kafka,
6798 Connection::Csr(_) => CreateConnectionType::Csr,
6799 Connection::Postgres(_) => CreateConnectionType::Postgres,
6800 Connection::Ssh(_) => CreateConnectionType::Ssh,
6801 Connection::MySql(_) => CreateConnectionType::MySql,
6802 Connection::SqlServer(_) => CreateConnectionType::SqlServer,
6803 Connection::IcebergCatalog(_) => CreateConnectionType::IcebergCatalog,
6804 };
6805
6806 let specified_options: BTreeSet<_> = actions
6808 .iter()
6809 .map(|action: &AlterConnectionAction<Aug>| match action {
6810 AlterConnectionAction::SetOption(option) => option.name.clone(),
6811 AlterConnectionAction::DropOption(name) => name.clone(),
6812 AlterConnectionAction::RotateKeys => unreachable!(),
6813 })
6814 .collect();
6815
6816 for invalid in INALTERABLE_OPTIONS {
6817 if specified_options.contains(invalid) {
6818 sql_bail!("cannot ALTER {} option {}", connection_type, invalid);
6819 }
6820 }
6821
6822 connection::validate_options_per_connection_type(connection_type, specified_options)?;
6823
6824 let (set_options_vec, mut drop_options): (Vec<_>, BTreeSet<_>) =
6826 actions.into_iter().partition_map(|action| match action {
6827 AlterConnectionAction::SetOption(option) => Either::Left(option),
6828 AlterConnectionAction::DropOption(name) => Either::Right(name),
6829 AlterConnectionAction::RotateKeys => unreachable!(),
6830 });
6831
6832 let set_options: BTreeMap<_, _> = set_options_vec
6833 .clone()
6834 .into_iter()
6835 .map(|option| (option.name, option.value))
6836 .collect();
6837
6838 let connection_options_extracted =
6842 connection::ConnectionOptionExtracted::try_from(set_options_vec)?;
6843
6844 let duplicates: Vec<_> = connection_options_extracted
6845 .seen
6846 .intersection(&drop_options)
6847 .collect();
6848
6849 if !duplicates.is_empty() {
6850 sql_bail!(
6851 "cannot both SET and DROP/RESET options {}",
6852 duplicates
6853 .iter()
6854 .map(|option| option.to_string())
6855 .join(", ")
6856 )
6857 }
6858
6859 for mutually_exclusive_options in MUTUALLY_EXCLUSIVE_SETS {
6860 let set_options_count = mutually_exclusive_options
6861 .iter()
6862 .filter(|o| set_options.contains_key(o))
6863 .count();
6864 let drop_options_count = mutually_exclusive_options
6865 .iter()
6866 .filter(|o| drop_options.contains(o))
6867 .count();
6868
6869 if set_options_count > 0 && drop_options_count > 0 {
6871 sql_bail!(
6872 "cannot both SET and DROP/RESET mutually exclusive {} options {}",
6873 connection_type,
6874 mutually_exclusive_options
6875 .iter()
6876 .map(|option| option.to_string())
6877 .join(", ")
6878 )
6879 }
6880
6881 if set_options_count > 0 || drop_options_count > 0 {
6886 drop_options.extend(mutually_exclusive_options.iter().cloned());
6887 }
6888
6889 }
6892
6893 Ok(Plan::AlterConnection(AlterConnectionPlan {
6894 id: entry.id(),
6895 action: crate::plan::AlterConnectionAction::AlterOptions {
6896 set_options,
6897 drop_options,
6898 validate,
6899 },
6900 }))
6901}
6902
6903pub fn describe_alter_sink(
6904 _: &StatementContext,
6905 _: AlterSinkStatement<Aug>,
6906) -> Result<StatementDesc, PlanError> {
6907 Ok(StatementDesc::new(None))
6908}
6909
6910pub fn plan_alter_sink(
6911 scx: &mut StatementContext,
6912 stmt: AlterSinkStatement<Aug>,
6913) -> Result<Plan, PlanError> {
6914 let AlterSinkStatement {
6915 sink_name,
6916 if_exists,
6917 action,
6918 } = stmt;
6919
6920 let object_type = ObjectType::Sink;
6921 let item = resolve_item_or_type(scx, object_type, sink_name.clone(), if_exists)?;
6922
6923 let Some(item) = item else {
6924 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
6925 name: sink_name.to_string(),
6926 object_type,
6927 });
6928
6929 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
6930 };
6931 let item = item.at_version(RelationVersionSelector::Latest);
6933
6934 match action {
6935 AlterSinkAction::ChangeRelation(new_from) => {
6936 let create_sql = item.create_sql();
6938 let stmts = mz_sql_parser::parser::parse_statements(create_sql)?;
6939 let [stmt]: [StatementParseResult; 1] = stmts
6940 .try_into()
6941 .expect("create sql of sink was not exactly one statement");
6942 let Statement::CreateSink(mut stmt) = stmt.ast else {
6943 unreachable!("invalid create SQL for sink item");
6944 };
6945
6946 let cur_version = stmt
6948 .with_options
6949 .extract_if(.., |o| o.name == CreateSinkOptionName::Version)
6950 .map(|o| u64::try_from_value(o.value).expect("invalid sink create_sql"))
6951 .max()
6952 .unwrap_or(0);
6953 let new_version = cur_version + 1;
6954 stmt.with_options.push(CreateSinkOption {
6955 name: CreateSinkOptionName::Version,
6956 value: Some(WithOptionValue::Value(Value::Number(
6957 new_version.to_string(),
6958 ))),
6959 });
6960
6961 let (mut stmt, _) = crate::names::resolve(scx.catalog, stmt)?;
6963 stmt.from = new_from;
6964
6965 let Plan::CreateSink(plan) = plan_sink(scx, stmt)? else {
6967 unreachable!("invalid plan for CREATE SINK statement");
6968 };
6969
6970 Ok(Plan::AlterSink(AlterSinkPlan {
6971 item_id: item.id(),
6972 global_id: item.global_id(),
6973 sink: plan.sink,
6974 with_snapshot: plan.with_snapshot,
6975 in_cluster: plan.in_cluster,
6976 }))
6977 }
6978 AlterSinkAction::SetOptions(_) => bail_unsupported!("ALTER SINK SET options"),
6979 AlterSinkAction::ResetOptions(_) => bail_unsupported!("ALTER SINK RESET option"),
6980 }
6981}
6982
6983pub fn describe_alter_source(
6984 _: &StatementContext,
6985 _: AlterSourceStatement<Aug>,
6986) -> Result<StatementDesc, PlanError> {
6987 Ok(StatementDesc::new(None))
6989}
6990
6991generate_extracted_config!(
6992 AlterSourceAddSubsourceOption,
6993 (TextColumns, Vec::<UnresolvedItemName>, Default(vec![])),
6994 (ExcludeColumns, Vec::<UnresolvedItemName>, Default(vec![])),
6995 (Details, String)
6996);
6997
6998pub fn plan_alter_source(
6999 scx: &mut StatementContext,
7000 stmt: AlterSourceStatement<Aug>,
7001) -> Result<Plan, PlanError> {
7002 let AlterSourceStatement {
7003 source_name,
7004 if_exists,
7005 action,
7006 } = stmt;
7007 let object_type = ObjectType::Source;
7008
7009 if resolve_item_or_type(scx, object_type, source_name.clone(), if_exists)?.is_none() {
7010 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7011 name: source_name.to_string(),
7012 object_type,
7013 });
7014
7015 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7016 }
7017
7018 match action {
7019 AlterSourceAction::SetOptions(options) => {
7020 let mut options = options.into_iter();
7021 let option = options.next().unwrap();
7022 if option.name == CreateSourceOptionName::RetainHistory {
7023 if options.next().is_some() {
7024 sql_bail!("RETAIN HISTORY must be only option");
7025 }
7026 return alter_retain_history(
7027 scx,
7028 object_type,
7029 if_exists,
7030 UnresolvedObjectName::Item(source_name),
7031 option.value,
7032 );
7033 }
7034 sql_bail!(
7037 "Cannot modify the {} of a SOURCE.",
7038 option.name.to_ast_string_simple()
7039 );
7040 }
7041 AlterSourceAction::ResetOptions(reset) => {
7042 let mut options = reset.into_iter();
7043 let option = options.next().unwrap();
7044 if option == CreateSourceOptionName::RetainHistory {
7045 if options.next().is_some() {
7046 sql_bail!("RETAIN HISTORY must be only option");
7047 }
7048 return alter_retain_history(
7049 scx,
7050 object_type,
7051 if_exists,
7052 UnresolvedObjectName::Item(source_name),
7053 None,
7054 );
7055 }
7056 sql_bail!(
7057 "Cannot modify the {} of a SOURCE.",
7058 option.to_ast_string_simple()
7059 );
7060 }
7061 AlterSourceAction::DropSubsources { .. } => {
7062 sql_bail!("ALTER SOURCE...DROP SUBSOURCE no longer supported; use DROP SOURCE")
7063 }
7064 AlterSourceAction::AddSubsources { .. } => {
7065 unreachable!("ALTER SOURCE...ADD SUBSOURCE must be purified")
7066 }
7067 AlterSourceAction::RefreshReferences => {
7068 unreachable!("ALTER SOURCE...REFRESH REFERENCES must be purified")
7069 }
7070 };
7071}
7072
7073pub fn describe_alter_system_set(
7074 _: &StatementContext,
7075 _: AlterSystemSetStatement,
7076) -> Result<StatementDesc, PlanError> {
7077 Ok(StatementDesc::new(None))
7078}
7079
7080pub fn plan_alter_system_set(
7081 _: &StatementContext,
7082 AlterSystemSetStatement { name, to }: AlterSystemSetStatement,
7083) -> Result<Plan, PlanError> {
7084 let name = name.to_string();
7085 Ok(Plan::AlterSystemSet(AlterSystemSetPlan {
7086 name,
7087 value: scl::plan_set_variable_to(to)?,
7088 }))
7089}
7090
7091pub fn describe_alter_system_reset(
7092 _: &StatementContext,
7093 _: AlterSystemResetStatement,
7094) -> Result<StatementDesc, PlanError> {
7095 Ok(StatementDesc::new(None))
7096}
7097
7098pub fn plan_alter_system_reset(
7099 _: &StatementContext,
7100 AlterSystemResetStatement { name }: AlterSystemResetStatement,
7101) -> Result<Plan, PlanError> {
7102 let name = name.to_string();
7103 Ok(Plan::AlterSystemReset(AlterSystemResetPlan { name }))
7104}
7105
7106pub fn describe_alter_system_reset_all(
7107 _: &StatementContext,
7108 _: AlterSystemResetAllStatement,
7109) -> Result<StatementDesc, PlanError> {
7110 Ok(StatementDesc::new(None))
7111}
7112
7113pub fn plan_alter_system_reset_all(
7114 _: &StatementContext,
7115 _: AlterSystemResetAllStatement,
7116) -> Result<Plan, PlanError> {
7117 Ok(Plan::AlterSystemResetAll(AlterSystemResetAllPlan {}))
7118}
7119
7120pub fn describe_alter_role(
7121 _: &StatementContext,
7122 _: AlterRoleStatement<Aug>,
7123) -> Result<StatementDesc, PlanError> {
7124 Ok(StatementDesc::new(None))
7125}
7126
7127pub fn plan_alter_role(
7128 _scx: &StatementContext,
7129 AlterRoleStatement { name, option }: AlterRoleStatement<Aug>,
7130) -> Result<Plan, PlanError> {
7131 let option = match option {
7132 AlterRoleOption::Attributes(attrs) => {
7133 let attrs = plan_role_attributes(attrs)?;
7134 PlannedAlterRoleOption::Attributes(attrs)
7135 }
7136 AlterRoleOption::Variable(variable) => {
7137 let var = plan_role_variable(variable)?;
7138 PlannedAlterRoleOption::Variable(var)
7139 }
7140 };
7141
7142 Ok(Plan::AlterRole(AlterRolePlan {
7143 id: name.id,
7144 name: name.name,
7145 option,
7146 }))
7147}
7148
7149pub fn describe_alter_table_add_column(
7150 _: &StatementContext,
7151 _: AlterTableAddColumnStatement<Aug>,
7152) -> Result<StatementDesc, PlanError> {
7153 Ok(StatementDesc::new(None))
7154}
7155
7156pub fn plan_alter_table_add_column(
7157 scx: &StatementContext,
7158 stmt: AlterTableAddColumnStatement<Aug>,
7159) -> Result<Plan, PlanError> {
7160 let AlterTableAddColumnStatement {
7161 if_exists,
7162 name,
7163 if_col_not_exist,
7164 column_name,
7165 data_type,
7166 } = stmt;
7167 let object_type = ObjectType::Table;
7168
7169 scx.require_feature_flag(&vars::ENABLE_ALTER_TABLE_ADD_COLUMN)?;
7170
7171 let (relation_id, item_name, desc) =
7172 match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {
7173 Some(item) => {
7174 let item_name = scx.catalog.resolve_full_name(item.name());
7176 let item = item.at_version(RelationVersionSelector::Latest);
7177 let desc = item.desc(&item_name)?.into_owned();
7178 (item.id(), item_name, desc)
7179 }
7180 None => {
7181 scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist {
7182 name: name.to_ast_string_simple(),
7183 object_type,
7184 });
7185 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7186 }
7187 };
7188
7189 let column_name = ColumnName::from(column_name.as_str());
7190 if desc.get_by_name(&column_name).is_some() {
7191 if if_col_not_exist {
7192 scx.catalog.add_notice(PlanNotice::ColumnAlreadyExists {
7193 column_name: column_name.to_string(),
7194 object_name: item_name.item,
7195 });
7196 return Ok(Plan::AlterNoop(AlterNoopPlan { object_type }));
7197 } else {
7198 return Err(PlanError::ColumnAlreadyExists {
7199 column_name,
7200 object_name: item_name.item,
7201 });
7202 }
7203 }
7204
7205 let scalar_type = scalar_type_from_sql(scx, &data_type)?;
7206 let column_type = scalar_type.nullable(true);
7208 let raw_sql_type = mz_sql_parser::parser::parse_data_type(&data_type.to_ast_string_stable())?;
7210
7211 Ok(Plan::AlterTableAddColumn(AlterTablePlan {
7212 relation_id,
7213 column_name,
7214 column_type,
7215 raw_sql_type,
7216 }))
7217}
7218
7219pub fn describe_comment(
7220 _: &StatementContext,
7221 _: CommentStatement<Aug>,
7222) -> Result<StatementDesc, PlanError> {
7223 Ok(StatementDesc::new(None))
7224}
7225
7226pub fn plan_comment(
7227 scx: &mut StatementContext,
7228 stmt: CommentStatement<Aug>,
7229) -> Result<Plan, PlanError> {
7230 const MAX_COMMENT_LENGTH: usize = 1024;
7231
7232 let CommentStatement { object, comment } = stmt;
7233
7234 if let Some(c) = &comment {
7236 if c.len() > 1024 {
7237 return Err(PlanError::CommentTooLong {
7238 length: c.len(),
7239 max_size: MAX_COMMENT_LENGTH,
7240 });
7241 }
7242 }
7243
7244 let (object_id, column_pos) = match &object {
7245 com_ty @ CommentObjectType::Table { name }
7246 | com_ty @ CommentObjectType::View { name }
7247 | com_ty @ CommentObjectType::MaterializedView { name }
7248 | com_ty @ CommentObjectType::Index { name }
7249 | com_ty @ CommentObjectType::Func { name }
7250 | com_ty @ CommentObjectType::Connection { name }
7251 | com_ty @ CommentObjectType::Source { name }
7252 | com_ty @ CommentObjectType::Sink { name }
7253 | com_ty @ CommentObjectType::Secret { name }
7254 | com_ty @ CommentObjectType::ContinualTask { name } => {
7255 let item = scx.get_item_by_resolved_name(name)?;
7256 match (com_ty, item.item_type()) {
7257 (CommentObjectType::Table { .. }, CatalogItemType::Table) => {
7258 (CommentObjectId::Table(item.id()), None)
7259 }
7260 (CommentObjectType::View { .. }, CatalogItemType::View) => {
7261 (CommentObjectId::View(item.id()), None)
7262 }
7263 (CommentObjectType::MaterializedView { .. }, CatalogItemType::MaterializedView) => {
7264 (CommentObjectId::MaterializedView(item.id()), None)
7265 }
7266 (CommentObjectType::Index { .. }, CatalogItemType::Index) => {
7267 (CommentObjectId::Index(item.id()), None)
7268 }
7269 (CommentObjectType::Func { .. }, CatalogItemType::Func) => {
7270 (CommentObjectId::Func(item.id()), None)
7271 }
7272 (CommentObjectType::Connection { .. }, CatalogItemType::Connection) => {
7273 (CommentObjectId::Connection(item.id()), None)
7274 }
7275 (CommentObjectType::Source { .. }, CatalogItemType::Source) => {
7276 (CommentObjectId::Source(item.id()), None)
7277 }
7278 (CommentObjectType::Sink { .. }, CatalogItemType::Sink) => {
7279 (CommentObjectId::Sink(item.id()), None)
7280 }
7281 (CommentObjectType::Secret { .. }, CatalogItemType::Secret) => {
7282 (CommentObjectId::Secret(item.id()), None)
7283 }
7284 (CommentObjectType::ContinualTask { .. }, CatalogItemType::ContinualTask) => {
7285 (CommentObjectId::ContinualTask(item.id()), None)
7286 }
7287 (com_ty, cat_ty) => {
7288 let expected_type = match com_ty {
7289 CommentObjectType::Table { .. } => ObjectType::Table,
7290 CommentObjectType::View { .. } => ObjectType::View,
7291 CommentObjectType::MaterializedView { .. } => ObjectType::MaterializedView,
7292 CommentObjectType::Index { .. } => ObjectType::Index,
7293 CommentObjectType::Func { .. } => ObjectType::Func,
7294 CommentObjectType::Connection { .. } => ObjectType::Connection,
7295 CommentObjectType::Source { .. } => ObjectType::Source,
7296 CommentObjectType::Sink { .. } => ObjectType::Sink,
7297 CommentObjectType::Secret { .. } => ObjectType::Secret,
7298 _ => unreachable!("these are the only types we match on"),
7299 };
7300
7301 return Err(PlanError::InvalidObjectType {
7302 expected_type: SystemObjectType::Object(expected_type),
7303 actual_type: SystemObjectType::Object(cat_ty.into()),
7304 object_name: item.name().item.clone(),
7305 });
7306 }
7307 }
7308 }
7309 CommentObjectType::Type { ty } => match ty {
7310 ResolvedDataType::AnonymousList(_) | ResolvedDataType::AnonymousMap { .. } => {
7311 sql_bail!("cannot comment on anonymous list or map type");
7312 }
7313 ResolvedDataType::Named { id, modifiers, .. } => {
7314 if !modifiers.is_empty() {
7315 sql_bail!("cannot comment on type with modifiers");
7316 }
7317 (CommentObjectId::Type(*id), None)
7318 }
7319 ResolvedDataType::Error => unreachable!("should have been caught in name resolution"),
7320 },
7321 CommentObjectType::Column { name } => {
7322 let (item, pos) = scx.get_column_by_resolved_name(name)?;
7323 match item.item_type() {
7324 CatalogItemType::Table => (CommentObjectId::Table(item.id()), Some(pos + 1)),
7325 CatalogItemType::Source => (CommentObjectId::Source(item.id()), Some(pos + 1)),
7326 CatalogItemType::View => (CommentObjectId::View(item.id()), Some(pos + 1)),
7327 CatalogItemType::MaterializedView => {
7328 (CommentObjectId::MaterializedView(item.id()), Some(pos + 1))
7329 }
7330 CatalogItemType::Type => (CommentObjectId::Type(item.id()), Some(pos + 1)),
7331 r => {
7332 return Err(PlanError::Unsupported {
7333 feature: format!("Specifying comments on a column of {r}"),
7334 discussion_no: None,
7335 });
7336 }
7337 }
7338 }
7339 CommentObjectType::Role { name } => (CommentObjectId::Role(name.id), None),
7340 CommentObjectType::Database { name } => {
7341 (CommentObjectId::Database(*name.database_id()), None)
7342 }
7343 CommentObjectType::Schema { name } => (
7344 CommentObjectId::Schema((*name.database_spec(), *name.schema_spec())),
7345 None,
7346 ),
7347 CommentObjectType::Cluster { name } => (CommentObjectId::Cluster(name.id), None),
7348 CommentObjectType::ClusterReplica { name } => {
7349 let replica = scx.catalog.resolve_cluster_replica(name)?;
7350 (
7351 CommentObjectId::ClusterReplica((replica.cluster_id(), replica.replica_id())),
7352 None,
7353 )
7354 }
7355 CommentObjectType::NetworkPolicy { name } => {
7356 (CommentObjectId::NetworkPolicy(name.id), None)
7357 }
7358 };
7359
7360 if let Some(p) = column_pos {
7366 i32::try_from(p).map_err(|_| PlanError::TooManyColumns {
7367 max_num_columns: MAX_NUM_COLUMNS,
7368 req_num_columns: p,
7369 })?;
7370 }
7371
7372 Ok(Plan::Comment(CommentPlan {
7373 object_id,
7374 sub_component: column_pos,
7375 comment,
7376 }))
7377}
7378
7379pub(crate) fn resolve_cluster<'a>(
7380 scx: &'a StatementContext,
7381 name: &'a Ident,
7382 if_exists: bool,
7383) -> Result<Option<&'a dyn CatalogCluster<'a>>, PlanError> {
7384 match scx.resolve_cluster(Some(name)) {
7385 Ok(cluster) => Ok(Some(cluster)),
7386 Err(_) if if_exists => Ok(None),
7387 Err(e) => Err(e),
7388 }
7389}
7390
7391pub(crate) fn resolve_cluster_replica<'a>(
7392 scx: &'a StatementContext,
7393 name: &QualifiedReplica,
7394 if_exists: bool,
7395) -> Result<Option<(&'a dyn CatalogCluster<'a>, ReplicaId)>, PlanError> {
7396 match scx.resolve_cluster(Some(&name.cluster)) {
7397 Ok(cluster) => match cluster.replica_ids().get(name.replica.as_str()) {
7398 Some(replica_id) => Ok(Some((cluster, *replica_id))),
7399 None if if_exists => Ok(None),
7400 None => Err(sql_err!(
7401 "CLUSTER {} has no CLUSTER REPLICA named {}",
7402 cluster.name(),
7403 name.replica.as_str().quoted(),
7404 )),
7405 },
7406 Err(_) if if_exists => Ok(None),
7407 Err(e) => Err(e),
7408 }
7409}
7410
7411pub(crate) fn resolve_database<'a>(
7412 scx: &'a StatementContext,
7413 name: &'a UnresolvedDatabaseName,
7414 if_exists: bool,
7415) -> Result<Option<&'a dyn CatalogDatabase>, PlanError> {
7416 match scx.resolve_database(name) {
7417 Ok(database) => Ok(Some(database)),
7418 Err(_) if if_exists => Ok(None),
7419 Err(e) => Err(e),
7420 }
7421}
7422
7423pub(crate) fn resolve_schema<'a>(
7424 scx: &'a StatementContext,
7425 name: UnresolvedSchemaName,
7426 if_exists: bool,
7427) -> Result<Option<(ResolvedDatabaseSpecifier, SchemaSpecifier)>, PlanError> {
7428 match scx.resolve_schema(name) {
7429 Ok(schema) => Ok(Some((schema.database().clone(), schema.id().clone()))),
7430 Err(_) if if_exists => Ok(None),
7431 Err(e) => Err(e),
7432 }
7433}
7434
7435pub(crate) fn resolve_network_policy<'a>(
7436 scx: &'a StatementContext,
7437 name: Ident,
7438 if_exists: bool,
7439) -> Result<Option<ResolvedNetworkPolicyName>, PlanError> {
7440 match scx.catalog.resolve_network_policy(&name.to_string()) {
7441 Ok(policy) => Ok(Some(ResolvedNetworkPolicyName {
7442 id: policy.id(),
7443 name: policy.name().to_string(),
7444 })),
7445 Err(_) if if_exists => Ok(None),
7446 Err(e) => Err(e.into()),
7447 }
7448}
7449
7450pub(crate) fn resolve_item_or_type<'a>(
7451 scx: &'a StatementContext,
7452 object_type: ObjectType,
7453 name: UnresolvedItemName,
7454 if_exists: bool,
7455) -> Result<Option<&'a dyn CatalogItem>, PlanError> {
7456 let name = normalize::unresolved_item_name(name)?;
7457 let catalog_item = match object_type {
7458 ObjectType::Type => scx.catalog.resolve_type(&name),
7459 _ => scx.catalog.resolve_item(&name),
7460 };
7461
7462 match catalog_item {
7463 Ok(item) => {
7464 let is_type = ObjectType::from(item.item_type());
7465 if object_type == is_type {
7466 Ok(Some(item))
7467 } else {
7468 Err(PlanError::MismatchedObjectType {
7469 name: scx.catalog.minimal_qualification(item.name()),
7470 is_type,
7471 expected_type: object_type,
7472 })
7473 }
7474 }
7475 Err(_) if if_exists => Ok(None),
7476 Err(e) => Err(e.into()),
7477 }
7478}
7479
7480fn ensure_cluster_is_not_managed(
7482 scx: &StatementContext,
7483 cluster_id: ClusterId,
7484) -> Result<(), PlanError> {
7485 let cluster = scx.catalog.get_cluster(cluster_id);
7486 if cluster.is_managed() {
7487 Err(PlanError::ManagedCluster {
7488 cluster_name: cluster.name().to_string(),
7489 })
7490 } else {
7491 Ok(())
7492 }
7493}