1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50 CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51 SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54 AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55 RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56 Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62 AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63 MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68 SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74 ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75 TransactionAccessMode,
76};
77use crate::catalog::{
78 CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79 RoleAttributesRaw,
80};
81use crate::names::{
82 Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83 ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108 AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109 WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117 AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118 PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
119 SqlServerConfigOptionExtracted,
120};
121pub use statement::{
122 StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
123 resolve_cluster_for_materialized_view,
124};
125
126use self::statement::ddl::ClusterAlterOptionExtracted;
127use self::with_options::TryFromValue;
128
129#[derive(Debug, EnumKind)]
131#[enum_kind(PlanKind)]
132pub enum Plan {
133 CreateConnection(CreateConnectionPlan),
134 CreateDatabase(CreateDatabasePlan),
135 CreateSchema(CreateSchemaPlan),
136 CreateRole(CreateRolePlan),
137 CreateCluster(CreateClusterPlan),
138 CreateClusterReplica(CreateClusterReplicaPlan),
139 CreateSource(CreateSourcePlan),
140 CreateSources(Vec<CreateSourcePlanBundle>),
141 CreateSecret(CreateSecretPlan),
142 CreateSink(CreateSinkPlan),
143 CreateTable(CreateTablePlan),
144 CreateView(CreateViewPlan),
145 CreateMaterializedView(CreateMaterializedViewPlan),
146 CreateNetworkPolicy(CreateNetworkPolicyPlan),
147 CreateIndex(CreateIndexPlan),
148 CreateType(CreateTypePlan),
149 Comment(CommentPlan),
150 DiscardTemp,
151 DiscardAll,
152 DropObjects(DropObjectsPlan),
153 DropOwned(DropOwnedPlan),
154 EmptyQuery,
155 ShowAllVariables,
156 ShowCreate(ShowCreatePlan),
157 ShowColumns(ShowColumnsPlan),
158 ShowVariable(ShowVariablePlan),
159 InspectShard(InspectShardPlan),
160 SetVariable(SetVariablePlan),
161 ResetVariable(ResetVariablePlan),
162 SetTransaction(SetTransactionPlan),
163 StartTransaction(StartTransactionPlan),
164 CommitTransaction(CommitTransactionPlan),
165 AbortTransaction(AbortTransactionPlan),
166 Select(SelectPlan),
167 Subscribe(SubscribePlan),
168 CopyFrom(CopyFromPlan),
169 CopyTo(CopyToPlan),
170 ExplainPlan(ExplainPlanPlan),
171 ExplainPushdown(ExplainPushdownPlan),
172 ExplainTimestamp(ExplainTimestampPlan),
173 ExplainSinkSchema(ExplainSinkSchemaPlan),
174 Insert(InsertPlan),
175 AlterCluster(AlterClusterPlan),
176 AlterClusterSwap(AlterClusterSwapPlan),
177 AlterNoop(AlterNoopPlan),
178 AlterSetCluster(AlterSetClusterPlan),
179 AlterConnection(AlterConnectionPlan),
180 AlterSource(AlterSourcePlan),
181 AlterClusterRename(AlterClusterRenamePlan),
182 AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
183 AlterItemRename(AlterItemRenamePlan),
184 AlterSchemaRename(AlterSchemaRenamePlan),
185 AlterSchemaSwap(AlterSchemaSwapPlan),
186 AlterSecret(AlterSecretPlan),
187 AlterSink(AlterSinkPlan),
188 AlterSystemSet(AlterSystemSetPlan),
189 AlterSystemReset(AlterSystemResetPlan),
190 AlterSystemResetAll(AlterSystemResetAllPlan),
191 AlterRole(AlterRolePlan),
192 AlterOwner(AlterOwnerPlan),
193 AlterTableAddColumn(AlterTablePlan),
194 AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
195 AlterNetworkPolicy(AlterNetworkPolicyPlan),
196 Declare(DeclarePlan),
197 Fetch(FetchPlan),
198 Close(ClosePlan),
199 ReadThenWrite(ReadThenWritePlan),
200 Prepare(PreparePlan),
201 Execute(ExecutePlan),
202 Deallocate(DeallocatePlan),
203 Raise(RaisePlan),
204 GrantRole(GrantRolePlan),
205 RevokeRole(RevokeRolePlan),
206 GrantPrivileges(GrantPrivilegesPlan),
207 RevokePrivileges(RevokePrivilegesPlan),
208 AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
209 ReassignOwned(ReassignOwnedPlan),
210 SideEffectingFunc(SideEffectingFunc),
211 ValidateConnection(ValidateConnectionPlan),
212 AlterRetainHistory(AlterRetainHistoryPlan),
213 AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
214}
215
216impl Plan {
217 pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220 match stmt {
221 StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222 StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223 StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224 StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225 StatementKind::AlterObjectRename => &[
226 PlanKind::AlterClusterRename,
227 PlanKind::AlterClusterReplicaRename,
228 PlanKind::AlterItemRename,
229 PlanKind::AlterSchemaRename,
230 PlanKind::AlterNoop,
231 ],
232 StatementKind::AlterObjectSwap => &[
233 PlanKind::AlterClusterSwap,
234 PlanKind::AlterSchemaSwap,
235 PlanKind::AlterNoop,
236 ],
237 StatementKind::AlterRole => &[PlanKind::AlterRole],
238 StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239 StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240 StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241 StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242 StatementKind::AlterSource => &[
243 PlanKind::AlterNoop,
244 PlanKind::AlterSource,
245 PlanKind::AlterRetainHistory,
246 PlanKind::AlterSourceTimestampInterval,
247 ],
248 StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249 StatementKind::AlterSystemResetAll => {
250 &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251 }
252 StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253 StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254 StatementKind::AlterTableAddColumn => {
255 &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256 }
257 StatementKind::AlterMaterializedViewApplyReplacement => &[
258 PlanKind::AlterNoop,
259 PlanKind::AlterMaterializedViewApplyReplacement,
260 ],
261 StatementKind::Close => &[PlanKind::Close],
262 StatementKind::Comment => &[PlanKind::Comment],
263 StatementKind::Commit => &[PlanKind::CommitTransaction],
264 StatementKind::Copy => &[
265 PlanKind::CopyFrom,
266 PlanKind::Select,
267 PlanKind::Subscribe,
268 PlanKind::CopyTo,
269 ],
270 StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271 StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272 StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273 StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274 StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275 StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276 StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277 StatementKind::CreateRole => &[PlanKind::CreateRole],
278 StatementKind::CreateSchema => &[PlanKind::CreateSchema],
279 StatementKind::CreateSecret => &[PlanKind::CreateSecret],
280 StatementKind::CreateSink => &[PlanKind::CreateSink],
281 StatementKind::CreateSource | StatementKind::CreateSubsource => {
282 &[PlanKind::CreateSource]
283 }
284 StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
285 StatementKind::CreateTable => &[PlanKind::CreateTable],
286 StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
287 StatementKind::CreateType => &[PlanKind::CreateType],
288 StatementKind::CreateView => &[PlanKind::CreateView],
289 StatementKind::Deallocate => &[PlanKind::Deallocate],
290 StatementKind::Declare => &[PlanKind::Declare],
291 StatementKind::Delete => &[PlanKind::ReadThenWrite],
292 StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
293 StatementKind::DropObjects => &[PlanKind::DropObjects],
294 StatementKind::DropOwned => &[PlanKind::DropOwned],
295 StatementKind::Execute => &[PlanKind::Execute],
296 StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
297 StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
298 StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
299 StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
300 StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
301 StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
302 StatementKind::Fetch => &[PlanKind::Fetch],
303 StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
304 StatementKind::GrantRole => &[PlanKind::GrantRole],
305 StatementKind::Insert => &[PlanKind::Insert],
306 StatementKind::Prepare => &[PlanKind::Prepare],
307 StatementKind::Raise => &[PlanKind::Raise],
308 StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
309 StatementKind::ResetVariable => &[PlanKind::ResetVariable],
310 StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
311 StatementKind::RevokeRole => &[PlanKind::RevokeRole],
312 StatementKind::Rollback => &[PlanKind::AbortTransaction],
313 StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
314 StatementKind::SetTransaction => &[PlanKind::SetTransaction],
315 StatementKind::SetVariable => &[PlanKind::SetVariable],
316 StatementKind::Show => &[
317 PlanKind::Select,
318 PlanKind::ShowVariable,
319 PlanKind::ShowCreate,
320 PlanKind::ShowColumns,
321 PlanKind::ShowAllVariables,
322 PlanKind::InspectShard,
323 ],
324 StatementKind::StartTransaction => &[PlanKind::StartTransaction],
325 StatementKind::Subscribe => &[PlanKind::Subscribe],
326 StatementKind::Update => &[PlanKind::ReadThenWrite],
327 StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
328 StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
329 }
330 }
331
332 pub fn name(&self) -> &str {
334 match self {
335 Plan::CreateConnection(_) => "create connection",
336 Plan::CreateDatabase(_) => "create database",
337 Plan::CreateSchema(_) => "create schema",
338 Plan::CreateRole(_) => "create role",
339 Plan::CreateCluster(_) => "create cluster",
340 Plan::CreateClusterReplica(_) => "create cluster replica",
341 Plan::CreateSource(_) => "create source",
342 Plan::CreateSources(_) => "create source",
343 Plan::CreateSecret(_) => "create secret",
344 Plan::CreateSink(_) => "create sink",
345 Plan::CreateTable(_) => "create table",
346 Plan::CreateView(_) => "create view",
347 Plan::CreateMaterializedView(_) => "create materialized view",
348 Plan::CreateIndex(_) => "create index",
349 Plan::CreateType(_) => "create type",
350 Plan::CreateNetworkPolicy(_) => "create network policy",
351 Plan::Comment(_) => "comment",
352 Plan::DiscardTemp => "discard temp",
353 Plan::DiscardAll => "discard all",
354 Plan::DropObjects(plan) => match plan.object_type {
355 ObjectType::Table => "drop table",
356 ObjectType::View => "drop view",
357 ObjectType::MaterializedView => "drop materialized view",
358 ObjectType::Source => "drop source",
359 ObjectType::Sink => "drop sink",
360 ObjectType::Index => "drop index",
361 ObjectType::Type => "drop type",
362 ObjectType::Role => "drop roles",
363 ObjectType::Cluster => "drop clusters",
364 ObjectType::ClusterReplica => "drop cluster replicas",
365 ObjectType::Secret => "drop secret",
366 ObjectType::Connection => "drop connection",
367 ObjectType::Database => "drop database",
368 ObjectType::Schema => "drop schema",
369 ObjectType::Func => "drop function",
370 ObjectType::NetworkPolicy => "drop network policy",
371 },
372 Plan::DropOwned(_) => "drop owned",
373 Plan::EmptyQuery => "do nothing",
374 Plan::ShowAllVariables => "show all variables",
375 Plan::ShowCreate(_) => "show create",
376 Plan::ShowColumns(_) => "show columns",
377 Plan::ShowVariable(_) => "show variable",
378 Plan::InspectShard(_) => "inspect shard",
379 Plan::SetVariable(_) => "set variable",
380 Plan::ResetVariable(_) => "reset variable",
381 Plan::SetTransaction(_) => "set transaction",
382 Plan::StartTransaction(_) => "start transaction",
383 Plan::CommitTransaction(_) => "commit",
384 Plan::AbortTransaction(_) => "abort",
385 Plan::Select(_) => "select",
386 Plan::Subscribe(_) => "subscribe",
387 Plan::CopyFrom(_) => "copy from",
388 Plan::CopyTo(_) => "copy to",
389 Plan::ExplainPlan(_) => "explain plan",
390 Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
391 Plan::ExplainTimestamp(_) => "explain timestamp",
392 Plan::ExplainSinkSchema(_) => "explain schema",
393 Plan::Insert(_) => "insert",
394 Plan::AlterNoop(plan) => match plan.object_type {
395 ObjectType::Table => "alter table",
396 ObjectType::View => "alter view",
397 ObjectType::MaterializedView => "alter materialized view",
398 ObjectType::Source => "alter source",
399 ObjectType::Sink => "alter sink",
400 ObjectType::Index => "alter index",
401 ObjectType::Type => "alter type",
402 ObjectType::Role => "alter role",
403 ObjectType::Cluster => "alter cluster",
404 ObjectType::ClusterReplica => "alter cluster replica",
405 ObjectType::Secret => "alter secret",
406 ObjectType::Connection => "alter connection",
407 ObjectType::Database => "alter database",
408 ObjectType::Schema => "alter schema",
409 ObjectType::Func => "alter function",
410 ObjectType::NetworkPolicy => "alter network policy",
411 },
412 Plan::AlterCluster(_) => "alter cluster",
413 Plan::AlterClusterRename(_) => "alter cluster rename",
414 Plan::AlterClusterSwap(_) => "alter cluster swap",
415 Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
416 Plan::AlterSetCluster(_) => "alter set cluster",
417 Plan::AlterConnection(_) => "alter connection",
418 Plan::AlterSource(_) => "alter source",
419 Plan::AlterItemRename(_) => "rename item",
420 Plan::AlterSchemaRename(_) => "alter rename schema",
421 Plan::AlterSchemaSwap(_) => "alter swap schema",
422 Plan::AlterSecret(_) => "alter secret",
423 Plan::AlterSink(_) => "alter sink",
424 Plan::AlterSystemSet(_) => "alter system",
425 Plan::AlterSystemReset(_) => "alter system",
426 Plan::AlterSystemResetAll(_) => "alter system",
427 Plan::AlterRole(_) => "alter role",
428 Plan::AlterNetworkPolicy(_) => "alter network policy",
429 Plan::AlterOwner(plan) => match plan.object_type {
430 ObjectType::Table => "alter table owner",
431 ObjectType::View => "alter view owner",
432 ObjectType::MaterializedView => "alter materialized view owner",
433 ObjectType::Source => "alter source owner",
434 ObjectType::Sink => "alter sink owner",
435 ObjectType::Index => "alter index owner",
436 ObjectType::Type => "alter type owner",
437 ObjectType::Role => "alter role owner",
438 ObjectType::Cluster => "alter cluster owner",
439 ObjectType::ClusterReplica => "alter cluster replica owner",
440 ObjectType::Secret => "alter secret owner",
441 ObjectType::Connection => "alter connection owner",
442 ObjectType::Database => "alter database owner",
443 ObjectType::Schema => "alter schema owner",
444 ObjectType::Func => "alter function owner",
445 ObjectType::NetworkPolicy => "alter network policy owner",
446 },
447 Plan::AlterTableAddColumn(_) => "alter table add column",
448 Plan::AlterMaterializedViewApplyReplacement(_) => {
449 "alter materialized view apply replacement"
450 }
451 Plan::Declare(_) => "declare",
452 Plan::Fetch(_) => "fetch",
453 Plan::Close(_) => "close",
454 Plan::ReadThenWrite(plan) => match plan.kind {
455 MutationKind::Insert => "insert into select",
456 MutationKind::Update => "update",
457 MutationKind::Delete => "delete",
458 },
459 Plan::Prepare(_) => "prepare",
460 Plan::Execute(_) => "execute",
461 Plan::Deallocate(_) => "deallocate",
462 Plan::Raise(_) => "raise",
463 Plan::GrantRole(_) => "grant role",
464 Plan::RevokeRole(_) => "revoke role",
465 Plan::GrantPrivileges(_) => "grant privilege",
466 Plan::RevokePrivileges(_) => "revoke privilege",
467 Plan::AlterDefaultPrivileges(_) => "alter default privileges",
468 Plan::ReassignOwned(_) => "reassign owned",
469 Plan::SideEffectingFunc(_) => "side effecting func",
470 Plan::ValidateConnection(_) => "validate connection",
471 Plan::AlterRetainHistory(_) => "alter retain history",
472 Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
473 }
474 }
475
476 pub fn allowed_in_read_only(&self) -> bool {
482 match self {
483 Plan::SetVariable(_) => true,
486 Plan::ResetVariable(_) => true,
487 Plan::SetTransaction(_) => true,
488 Plan::StartTransaction(_) => true,
489 Plan::CommitTransaction(_) => true,
490 Plan::AbortTransaction(_) => true,
491 Plan::Select(_) => true,
492 Plan::EmptyQuery => true,
493 Plan::ShowAllVariables => true,
494 Plan::ShowCreate(_) => true,
495 Plan::ShowColumns(_) => true,
496 Plan::ShowVariable(_) => true,
497 Plan::InspectShard(_) => true,
498 Plan::Subscribe(_) => true,
499 Plan::CopyTo(_) => true,
500 Plan::ExplainPlan(_) => true,
501 Plan::ExplainPushdown(_) => true,
502 Plan::ExplainTimestamp(_) => true,
503 Plan::ExplainSinkSchema(_) => true,
504 Plan::ValidateConnection(_) => true,
505 _ => false,
506 }
507 }
508}
509
510#[derive(Debug)]
511pub struct StartTransactionPlan {
512 pub access: Option<TransactionAccessMode>,
513 pub isolation_level: Option<TransactionIsolationLevel>,
514}
515
516#[derive(Debug)]
517pub enum TransactionType {
518 Explicit,
519 Implicit,
520}
521
522impl TransactionType {
523 pub fn is_explicit(&self) -> bool {
524 matches!(self, TransactionType::Explicit)
525 }
526
527 pub fn is_implicit(&self) -> bool {
528 matches!(self, TransactionType::Implicit)
529 }
530}
531
532#[derive(Debug)]
533pub struct CommitTransactionPlan {
534 pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct AbortTransactionPlan {
539 pub transaction_type: TransactionType,
540}
541
542#[derive(Debug)]
543pub struct CreateDatabasePlan {
544 pub name: String,
545 pub if_not_exists: bool,
546}
547
548#[derive(Debug)]
549pub struct CreateSchemaPlan {
550 pub database_spec: ResolvedDatabaseSpecifier,
551 pub schema_name: String,
552 pub if_not_exists: bool,
553}
554
555#[derive(Debug)]
556pub struct CreateRolePlan {
557 pub name: String,
558 pub attributes: RoleAttributesRaw,
559}
560
561#[derive(Debug, PartialEq, Eq, Clone)]
562pub struct CreateClusterPlan {
563 pub name: String,
564 pub variant: CreateClusterVariant,
565 pub workload_class: Option<String>,
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub enum CreateClusterVariant {
570 Managed(CreateClusterManagedPlan),
571 Unmanaged(CreateClusterUnmanagedPlan),
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterUnmanagedPlan {
576 pub replicas: Vec<(String, ReplicaConfig)>,
577}
578
579#[derive(Debug, PartialEq, Eq, Clone)]
580pub struct CreateClusterManagedPlan {
581 pub replication_factor: u32,
582 pub size: String,
583 pub availability_zones: Vec<String>,
584 pub compute: ComputeReplicaConfig,
585 pub optimizer_feature_overrides: OptimizerFeatureOverrides,
586 pub schedule: ClusterSchedule,
587}
588
589#[derive(Debug)]
590pub struct CreateClusterReplicaPlan {
591 pub cluster_id: ClusterId,
592 pub name: String,
593 pub config: ReplicaConfig,
594}
595
596#[derive(
598 Clone,
599 Copy,
600 Debug,
601 Serialize,
602 Deserialize,
603 PartialOrd,
604 Ord,
605 PartialEq,
606 Eq
607)]
608pub struct ComputeReplicaIntrospectionConfig {
609 pub debugging: bool,
611 pub interval: Duration,
613}
614
615#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
616pub struct ComputeReplicaConfig {
617 pub introspection: Option<ComputeReplicaIntrospectionConfig>,
618}
619
620#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
621pub enum ReplicaConfig {
622 Unorchestrated {
623 storagectl_addrs: Vec<String>,
624 computectl_addrs: Vec<String>,
625 compute: ComputeReplicaConfig,
626 },
627 Orchestrated {
628 size: String,
629 availability_zone: Option<String>,
630 compute: ComputeReplicaConfig,
631 internal: bool,
632 billed_as: Option<String>,
633 },
634}
635
636#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
637pub enum ClusterSchedule {
638 Manual,
640 Refresh { hydration_time_estimate: Duration },
644}
645
646impl Default for ClusterSchedule {
647 fn default() -> Self {
648 ClusterSchedule::Manual
650 }
651}
652
653#[derive(Debug)]
654pub struct CreateSourcePlan {
655 pub name: QualifiedItemName,
656 pub source: Source,
657 pub if_not_exists: bool,
658 pub timeline: Timeline,
659 pub in_cluster: Option<ClusterId>,
661}
662
663#[derive(Clone, Debug, PartialEq, Eq)]
664pub struct SourceReferences {
665 pub updated_at: u64,
666 pub references: Vec<SourceReference>,
667}
668
669#[derive(Clone, Debug, PartialEq, Eq)]
672pub struct SourceReference {
673 pub name: String,
674 pub namespace: Option<String>,
675 pub columns: Vec<String>,
676}
677
678#[derive(Debug)]
680pub struct CreateSourcePlanBundle {
681 pub item_id: CatalogItemId,
683 pub global_id: GlobalId,
685 pub plan: CreateSourcePlan,
687 pub resolved_ids: ResolvedIds,
689 pub available_source_references: Option<SourceReferences>,
693}
694
695#[derive(Debug)]
696pub struct CreateConnectionPlan {
697 pub name: QualifiedItemName,
698 pub if_not_exists: bool,
699 pub connection: Connection,
700 pub validate: bool,
701}
702
703#[derive(Debug)]
704pub struct ValidateConnectionPlan {
705 pub id: CatalogItemId,
707 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
709}
710
711#[derive(Debug)]
712pub struct CreateSecretPlan {
713 pub name: QualifiedItemName,
714 pub secret: Secret,
715 pub if_not_exists: bool,
716}
717
718#[derive(Debug)]
719pub struct CreateSinkPlan {
720 pub name: QualifiedItemName,
721 pub sink: Sink,
722 pub with_snapshot: bool,
723 pub if_not_exists: bool,
724 pub in_cluster: ClusterId,
725}
726
727#[derive(Debug)]
728pub struct CreateTablePlan {
729 pub name: QualifiedItemName,
730 pub table: Table,
731 pub if_not_exists: bool,
732}
733
734#[derive(Debug, Clone)]
735pub struct CreateViewPlan {
736 pub name: QualifiedItemName,
737 pub view: View,
738 pub replace: Option<CatalogItemId>,
740 pub drop_ids: Vec<CatalogItemId>,
742 pub if_not_exists: bool,
743 pub ambiguous_columns: bool,
746}
747
748#[derive(Debug, Clone)]
749pub struct CreateMaterializedViewPlan {
750 pub name: QualifiedItemName,
751 pub materialized_view: MaterializedView,
752 pub replace: Option<CatalogItemId>,
754 pub drop_ids: Vec<CatalogItemId>,
756 pub if_not_exists: bool,
757 pub ambiguous_columns: bool,
760}
761
762#[derive(Debug, Clone)]
763pub struct CreateNetworkPolicyPlan {
764 pub name: String,
765 pub rules: Vec<NetworkPolicyRule>,
766}
767
768#[derive(Debug, Clone)]
769pub struct AlterNetworkPolicyPlan {
770 pub id: NetworkPolicyId,
771 pub name: String,
772 pub rules: Vec<NetworkPolicyRule>,
773}
774
775#[derive(Debug, Clone)]
776pub struct CreateIndexPlan {
777 pub name: QualifiedItemName,
778 pub index: Index,
779 pub if_not_exists: bool,
780}
781
782#[derive(Debug)]
783pub struct CreateTypePlan {
784 pub name: QualifiedItemName,
785 pub typ: Type,
786}
787
788#[derive(Debug)]
789pub struct DropObjectsPlan {
790 pub referenced_ids: Vec<ObjectId>,
792 pub drop_ids: Vec<ObjectId>,
794 pub object_type: ObjectType,
797}
798
799#[derive(Debug)]
800pub struct DropOwnedPlan {
801 pub role_ids: Vec<RoleId>,
803 pub drop_ids: Vec<ObjectId>,
805 pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
807 pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
809}
810
811#[derive(Debug)]
812pub struct ShowVariablePlan {
813 pub name: String,
814}
815
816#[derive(Debug)]
817pub struct InspectShardPlan {
818 pub id: GlobalId,
820}
821
822#[derive(Debug)]
823pub struct SetVariablePlan {
824 pub name: String,
825 pub value: VariableValue,
826 pub local: bool,
827}
828
829#[derive(Debug)]
830pub enum VariableValue {
831 Default,
832 Values(Vec<String>),
833}
834
835#[derive(Debug)]
836pub struct ResetVariablePlan {
837 pub name: String,
838}
839
840#[derive(Debug)]
841pub struct SetTransactionPlan {
842 pub local: bool,
843 pub modes: Vec<TransactionMode>,
844}
845
846#[derive(Clone, Debug)]
848pub struct SelectPlan {
849 pub select: Option<Box<SelectStatement<Aug>>>,
852 pub source: HirRelationExpr,
854 pub when: QueryWhen,
856 pub finishing: RowSetFinishing,
858 pub copy_to: Option<CopyFormat>,
860}
861
862impl SelectPlan {
863 pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
864 let arity = typ.arity();
865 SelectPlan {
866 select: None,
867 source: HirRelationExpr::Constant { rows, typ },
868 when: QueryWhen::Immediately,
869 finishing: RowSetFinishing::trivial(arity),
870 copy_to: None,
871 }
872 }
873}
874
875#[derive(Debug, Clone)]
876pub enum SubscribeOutput {
877 Diffs,
878 WithinTimestampOrderBy {
879 order_by: Vec<ColumnOrder>,
881 },
882 EnvelopeUpsert {
883 order_by_keys: Vec<ColumnOrder>,
885 },
886 EnvelopeDebezium {
887 order_by_keys: Vec<ColumnOrder>,
889 },
890}
891
892impl SubscribeOutput {
893 pub fn row_order(&self) -> &[ColumnOrder] {
894 match self {
895 SubscribeOutput::Diffs => &[],
896 SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
898 SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
899 SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
900 }
901 }
902}
903
904#[derive(Debug, Clone)]
905pub struct SubscribePlan {
906 pub from: SubscribeFrom,
907 pub with_snapshot: bool,
908 pub when: QueryWhen,
909 pub up_to: Option<Timestamp>,
910 pub copy_to: Option<CopyFormat>,
911 pub emit_progress: bool,
912 pub output: SubscribeOutput,
913}
914
915#[derive(Debug, Clone)]
916pub enum SubscribeFrom {
917 Id(GlobalId),
919 Query {
921 expr: HirRelationExpr,
922 desc: RelationDesc,
923 },
924}
925
926impl SubscribeFrom {
927 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
928 match self {
929 SubscribeFrom::Id(id) => BTreeSet::from([*id]),
930 SubscribeFrom::Query { expr, .. } => expr.depends_on(),
931 }
932 }
933
934 pub fn contains_temporal(&self) -> bool {
935 match self {
936 SubscribeFrom::Id(_) => false,
937 SubscribeFrom::Query { expr, .. } => expr
938 .contains_temporal()
939 .expect("Unexpected error in `visit_scalars` call"),
940 }
941 }
942}
943
944#[derive(Debug)]
945pub struct ShowCreatePlan {
946 pub id: ObjectId,
947 pub row: Row,
948}
949
950#[derive(Debug)]
951pub struct ShowColumnsPlan {
952 pub id: CatalogItemId,
953 pub select_plan: SelectPlan,
954 pub new_resolved_ids: ResolvedIds,
955}
956
957#[derive(Debug)]
958pub struct CopyFromPlan {
959 pub target_id: CatalogItemId,
961 pub target_name: String,
963 pub source: CopyFromSource,
965 pub columns: Vec<ColumnIndex>,
969 pub source_desc: RelationDesc,
971 pub mfp: MapFilterProject,
973 pub params: CopyFormatParams<'static>,
975 pub filter: Option<CopyFromFilter>,
977}
978
979#[derive(Debug)]
980pub enum CopyFromSource {
981 Stdin,
983 Url(HirScalarExpr),
987 AwsS3 {
989 uri: HirScalarExpr,
991 connection: AwsConnection,
993 connection_id: CatalogItemId,
995 },
996}
997
998#[derive(Debug)]
999pub enum CopyFromFilter {
1000 Files(Vec<String>),
1001 Pattern(String),
1002}
1003
1004#[derive(Debug, Clone)]
1009pub struct CopyToPlan {
1010 pub select_plan: SelectPlan,
1012 pub desc: RelationDesc,
1013 pub to: HirScalarExpr,
1015 pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1016 pub connection_id: CatalogItemId,
1018 pub format: S3SinkFormat,
1019 pub max_file_size: u64,
1020}
1021
1022#[derive(Clone, Debug)]
1023pub struct ExplainPlanPlan {
1024 pub stage: ExplainStage,
1025 pub format: ExplainFormat,
1026 pub config: ExplainConfig,
1027 pub explainee: Explainee,
1028}
1029
1030#[derive(Clone, Debug)]
1032pub enum Explainee {
1033 View(CatalogItemId),
1035 MaterializedView(CatalogItemId),
1037 Index(CatalogItemId),
1039 ReplanView(CatalogItemId),
1041 ReplanMaterializedView(CatalogItemId),
1043 ReplanIndex(CatalogItemId),
1045 Statement(ExplaineeStatement),
1047}
1048
1049#[derive(Clone, Debug, EnumKind)]
1051#[enum_kind(ExplaineeStatementKind)]
1052pub enum ExplaineeStatement {
1053 Select {
1055 broken: bool,
1057 plan: plan::SelectPlan,
1058 desc: RelationDesc,
1059 },
1060 CreateView {
1062 broken: bool,
1064 plan: plan::CreateViewPlan,
1065 },
1066 CreateMaterializedView {
1068 broken: bool,
1070 plan: plan::CreateMaterializedViewPlan,
1071 },
1072 CreateIndex {
1074 broken: bool,
1076 plan: plan::CreateIndexPlan,
1077 },
1078 Subscribe {
1080 broken: bool,
1082 plan: plan::SubscribePlan,
1083 },
1084}
1085
1086impl ExplaineeStatement {
1087 pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1088 match self {
1089 Self::Select { plan, .. } => plan.source.depends_on(),
1090 Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1091 Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1092 Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1093 Self::Subscribe { plan, .. } => plan.from.depends_on(),
1094 }
1095 }
1096
1097 pub fn broken(&self) -> bool {
1108 match self {
1109 Self::Select { broken, .. } => *broken,
1110 Self::CreateView { broken, .. } => *broken,
1111 Self::CreateMaterializedView { broken, .. } => *broken,
1112 Self::CreateIndex { broken, .. } => *broken,
1113 Self::Subscribe { broken, .. } => *broken,
1114 }
1115 }
1116}
1117
1118impl ExplaineeStatementKind {
1119 pub fn supports(&self, stage: &ExplainStage) -> bool {
1120 use ExplainStage::*;
1121 match self {
1122 Self::Select => true,
1123 Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1124 Self::CreateMaterializedView => true,
1125 Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1126 Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1129 }
1130 }
1131}
1132
1133impl std::fmt::Display for ExplaineeStatementKind {
1134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1135 match self {
1136 Self::Select => write!(f, "SELECT"),
1137 Self::CreateView => write!(f, "CREATE VIEW"),
1138 Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1139 Self::CreateIndex => write!(f, "CREATE INDEX"),
1140 Self::Subscribe => write!(f, "SUBSCRIBE"),
1141 }
1142 }
1143}
1144
1145#[derive(Clone, Debug)]
1146pub struct ExplainPushdownPlan {
1147 pub explainee: Explainee,
1148}
1149
1150#[derive(Clone, Debug)]
1151pub struct ExplainTimestampPlan {
1152 pub format: ExplainFormat,
1153 pub raw_plan: HirRelationExpr,
1154 pub when: QueryWhen,
1155}
1156
1157#[derive(Debug)]
1158pub struct ExplainSinkSchemaPlan {
1159 pub sink_from: GlobalId,
1160 pub json_schema: String,
1161}
1162
1163#[derive(Debug)]
1164pub struct SendDiffsPlan {
1165 pub id: CatalogItemId,
1166 pub updates: Vec<(Row, Diff)>,
1167 pub kind: MutationKind,
1168 pub returning: Vec<(Row, NonZeroUsize)>,
1169 pub max_result_size: u64,
1170}
1171
1172#[derive(Debug)]
1173pub struct InsertPlan {
1174 pub id: CatalogItemId,
1175 pub values: HirRelationExpr,
1176 pub returning: Vec<mz_expr::MirScalarExpr>,
1177}
1178
1179#[derive(Debug)]
1180pub struct ReadThenWritePlan {
1181 pub id: CatalogItemId,
1182 pub selection: HirRelationExpr,
1183 pub finishing: RowSetFinishing,
1184 pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1185 pub kind: MutationKind,
1186 pub returning: Vec<mz_expr::MirScalarExpr>,
1187}
1188
1189#[derive(Debug)]
1191pub struct AlterNoopPlan {
1192 pub object_type: ObjectType,
1193}
1194
1195#[derive(Debug)]
1196pub struct AlterSetClusterPlan {
1197 pub id: CatalogItemId,
1198 pub set_cluster: ClusterId,
1199}
1200
1201#[derive(Debug)]
1202pub struct AlterRetainHistoryPlan {
1203 pub id: CatalogItemId,
1204 pub value: Option<Value>,
1205 pub window: CompactionWindow,
1206 pub object_type: ObjectType,
1207}
1208
1209#[derive(Debug)]
1210pub struct AlterSourceTimestampIntervalPlan {
1211 pub id: CatalogItemId,
1212 pub value: Option<Value>,
1213 pub interval: Duration,
1214}
1215
1216#[derive(Debug, Clone)]
1217
1218pub enum AlterOptionParameter<T = String> {
1219 Set(T),
1220 Reset,
1221 Unchanged,
1222}
1223
1224#[derive(Debug)]
1225pub enum AlterConnectionAction {
1226 RotateKeys,
1227 AlterOptions {
1228 set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1229 drop_options: BTreeSet<ConnectionOptionName>,
1230 validate: bool,
1231 },
1232}
1233
1234#[derive(Debug)]
1235pub struct AlterConnectionPlan {
1236 pub id: CatalogItemId,
1237 pub action: AlterConnectionAction,
1238}
1239
1240#[derive(Debug)]
1241pub enum AlterSourceAction {
1242 AddSubsourceExports {
1243 subsources: Vec<CreateSourcePlanBundle>,
1244 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1245 },
1246 RefreshReferences {
1247 references: SourceReferences,
1248 },
1249}
1250
1251#[derive(Debug)]
1252pub struct AlterSourcePlan {
1253 pub item_id: CatalogItemId,
1254 pub ingestion_id: GlobalId,
1255 pub action: AlterSourceAction,
1256}
1257
1258#[derive(Debug, Clone)]
1259pub struct AlterSinkPlan {
1260 pub item_id: CatalogItemId,
1261 pub global_id: GlobalId,
1262 pub sink: Sink,
1263 pub with_snapshot: bool,
1264 pub in_cluster: ClusterId,
1265}
1266
1267#[derive(Debug, Clone)]
1268pub struct AlterClusterPlan {
1269 pub id: ClusterId,
1270 pub name: String,
1271 pub options: PlanClusterOption,
1272 pub strategy: AlterClusterPlanStrategy,
1273}
1274
1275#[derive(Debug)]
1276pub struct AlterClusterRenamePlan {
1277 pub id: ClusterId,
1278 pub name: String,
1279 pub to_name: String,
1280}
1281
1282#[derive(Debug)]
1283pub struct AlterClusterReplicaRenamePlan {
1284 pub cluster_id: ClusterId,
1285 pub replica_id: ReplicaId,
1286 pub name: QualifiedReplica,
1287 pub to_name: String,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterItemRenamePlan {
1292 pub id: CatalogItemId,
1293 pub current_full_name: FullItemName,
1294 pub to_name: String,
1295 pub object_type: ObjectType,
1296}
1297
1298#[derive(Debug)]
1299pub struct AlterSchemaRenamePlan {
1300 pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1301 pub new_schema_name: String,
1302}
1303
1304#[derive(Debug)]
1305pub struct AlterSchemaSwapPlan {
1306 pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1307 pub schema_a_name: String,
1308 pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1309 pub schema_b_name: String,
1310 pub name_temp: String,
1311}
1312
1313#[derive(Debug)]
1314pub struct AlterClusterSwapPlan {
1315 pub id_a: ClusterId,
1316 pub id_b: ClusterId,
1317 pub name_a: String,
1318 pub name_b: String,
1319 pub name_temp: String,
1320}
1321
1322#[derive(Debug)]
1323pub struct AlterSecretPlan {
1324 pub id: CatalogItemId,
1325 pub secret_as: MirScalarExpr,
1326}
1327
1328#[derive(Debug)]
1329pub struct AlterSystemSetPlan {
1330 pub name: String,
1331 pub value: VariableValue,
1332}
1333
1334#[derive(Debug)]
1335pub struct AlterSystemResetPlan {
1336 pub name: String,
1337}
1338
1339#[derive(Debug)]
1340pub struct AlterSystemResetAllPlan {}
1341
1342#[derive(Debug)]
1343pub struct AlterRolePlan {
1344 pub id: RoleId,
1345 pub name: String,
1346 pub option: PlannedAlterRoleOption,
1347}
1348
1349#[derive(Debug)]
1350pub struct AlterOwnerPlan {
1351 pub id: ObjectId,
1352 pub object_type: ObjectType,
1353 pub new_owner: RoleId,
1354}
1355
1356#[derive(Debug)]
1357pub struct AlterTablePlan {
1358 pub relation_id: CatalogItemId,
1359 pub column_name: ColumnName,
1360 pub column_type: SqlColumnType,
1361 pub raw_sql_type: RawDataType,
1362}
1363
1364#[derive(Debug, Clone)]
1365pub struct AlterMaterializedViewApplyReplacementPlan {
1366 pub id: CatalogItemId,
1367 pub replacement_id: CatalogItemId,
1368}
1369
1370#[derive(Debug)]
1371pub struct DeclarePlan {
1372 pub name: String,
1373 pub stmt: Statement<Raw>,
1374 pub sql: String,
1375 pub params: Params,
1376}
1377
1378#[derive(Debug)]
1379pub struct FetchPlan {
1380 pub name: String,
1381 pub count: Option<FetchDirection>,
1382 pub timeout: ExecuteTimeout,
1383}
1384
1385#[derive(Debug)]
1386pub struct ClosePlan {
1387 pub name: String,
1388}
1389
1390#[derive(Debug)]
1391pub struct PreparePlan {
1392 pub name: String,
1393 pub stmt: Statement<Raw>,
1394 pub sql: String,
1395 pub desc: StatementDesc,
1396}
1397
1398#[derive(Debug)]
1399pub struct ExecutePlan {
1400 pub name: String,
1401 pub params: Params,
1402}
1403
1404#[derive(Debug)]
1405pub struct DeallocatePlan {
1406 pub name: Option<String>,
1407}
1408
1409#[derive(Debug)]
1410pub struct RaisePlan {
1411 pub severity: NoticeSeverity,
1412}
1413
1414#[derive(Debug)]
1415pub struct GrantRolePlan {
1416 pub role_ids: Vec<RoleId>,
1418 pub member_ids: Vec<RoleId>,
1420 pub grantor_id: RoleId,
1422}
1423
1424#[derive(Debug)]
1425pub struct RevokeRolePlan {
1426 pub role_ids: Vec<RoleId>,
1428 pub member_ids: Vec<RoleId>,
1430 pub grantor_id: RoleId,
1432}
1433
1434#[derive(Debug)]
1435pub struct UpdatePrivilege {
1436 pub acl_mode: AclMode,
1438 pub target_id: SystemObjectId,
1440 pub grantor: RoleId,
1442}
1443
1444#[derive(Debug)]
1445pub struct GrantPrivilegesPlan {
1446 pub update_privileges: Vec<UpdatePrivilege>,
1448 pub grantees: Vec<RoleId>,
1450}
1451
1452#[derive(Debug)]
1453pub struct RevokePrivilegesPlan {
1454 pub update_privileges: Vec<UpdatePrivilege>,
1456 pub revokees: Vec<RoleId>,
1458}
1459#[derive(Debug)]
1460pub struct AlterDefaultPrivilegesPlan {
1461 pub privilege_objects: Vec<DefaultPrivilegeObject>,
1463 pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1465 pub is_grant: bool,
1467}
1468
1469#[derive(Debug)]
1470pub struct ReassignOwnedPlan {
1471 pub old_roles: Vec<RoleId>,
1473 pub new_role: RoleId,
1475 pub reassign_ids: Vec<ObjectId>,
1477}
1478
1479#[derive(Debug)]
1480pub struct CommentPlan {
1481 pub object_id: CommentObjectId,
1483 pub sub_component: Option<usize>,
1487 pub comment: Option<String>,
1489}
1490
1491#[derive(Clone, Debug)]
1492pub enum TableDataSource {
1493 TableWrites { defaults: Vec<Expr<Aug>> },
1495
1496 DataSource {
1499 desc: DataSourceDesc,
1500 timeline: Timeline,
1501 },
1502}
1503
1504#[derive(Clone, Debug)]
1505pub struct Table {
1506 pub create_sql: String,
1507 pub desc: VersionedRelationDesc,
1508 pub temporary: bool,
1509 pub compaction_window: Option<CompactionWindow>,
1510 pub data_source: TableDataSource,
1511}
1512
1513#[derive(Clone, Debug)]
1514pub struct Source {
1515 pub create_sql: String,
1516 pub data_source: DataSourceDesc,
1517 pub desc: RelationDesc,
1518 pub compaction_window: Option<CompactionWindow>,
1519}
1520
1521#[derive(Debug, Clone)]
1522pub enum DataSourceDesc {
1523 Ingestion(SourceDesc<ReferencedConnection>),
1525 OldSyntaxIngestion {
1527 desc: SourceDesc<ReferencedConnection>,
1528 progress_subsource: CatalogItemId,
1531 data_config: SourceExportDataConfig<ReferencedConnection>,
1532 details: SourceExportDetails,
1533 },
1534 IngestionExport {
1537 ingestion_id: CatalogItemId,
1538 external_reference: UnresolvedItemName,
1539 details: SourceExportDetails,
1540 data_config: SourceExportDataConfig<ReferencedConnection>,
1541 },
1542 Progress,
1544 Webhook {
1546 validate_using: Option<WebhookValidation>,
1547 body_format: WebhookBodyFormat,
1548 headers: WebhookHeaders,
1549 cluster_id: Option<StorageInstanceId>,
1551 },
1552}
1553
1554#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1555pub struct WebhookValidation {
1556 pub expression: MirScalarExpr,
1558 pub relation_desc: RelationDesc,
1560 pub bodies: Vec<(usize, bool)>,
1562 pub headers: Vec<(usize, bool)>,
1564 pub secrets: Vec<WebhookValidationSecret>,
1566}
1567
1568impl WebhookValidation {
1569 const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1570
1571 pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1576 let WebhookValidation {
1577 expression,
1578 relation_desc,
1579 ..
1580 } = self;
1581
1582 let mut expression_ = expression.clone();
1584 let desc_ = relation_desc.clone();
1585 let reduce_task = mz_ore::task::spawn_blocking(
1586 || "webhook-validation-reduce",
1587 move || {
1588 let repr_col_types: Vec<ReprColumnType> = desc_
1589 .typ()
1590 .column_types
1591 .iter()
1592 .map(ReprColumnType::from)
1593 .collect();
1594 expression_.reduce(&repr_col_types);
1595 expression_
1596 },
1597 );
1598
1599 match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1600 Ok(reduced_expr) => {
1601 *expression = reduced_expr;
1602 Ok(())
1603 }
1604 Err(_) => Err("timeout"),
1605 }
1606 }
1607}
1608
1609#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1610pub struct WebhookHeaders {
1611 pub header_column: Option<WebhookHeaderFilters>,
1613 pub mapped_headers: BTreeMap<usize, (String, bool)>,
1615}
1616
1617impl WebhookHeaders {
1618 pub fn num_columns(&self) -> usize {
1620 let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1621 let mapped_headers = self.mapped_headers.len();
1622
1623 header_column + mapped_headers
1624 }
1625}
1626
1627#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1628pub struct WebhookHeaderFilters {
1629 pub block: BTreeSet<String>,
1630 pub allow: BTreeSet<String>,
1631}
1632
1633#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1634pub enum WebhookBodyFormat {
1635 Json { array: bool },
1636 Bytes,
1637 Text,
1638}
1639
1640impl From<WebhookBodyFormat> for SqlScalarType {
1641 fn from(value: WebhookBodyFormat) -> Self {
1642 match value {
1643 WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1644 WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1645 WebhookBodyFormat::Text => SqlScalarType::String,
1646 }
1647 }
1648}
1649
1650#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1651pub struct WebhookValidationSecret {
1652 pub id: CatalogItemId,
1654 pub column_idx: usize,
1656 pub use_bytes: bool,
1658}
1659
1660#[derive(Clone, Debug)]
1661pub struct Connection {
1662 pub create_sql: String,
1663 pub details: ConnectionDetails,
1664}
1665
1666#[derive(Clone, Debug, Serialize)]
1667pub enum ConnectionDetails {
1668 Kafka(KafkaConnection<ReferencedConnection>),
1669 Csr(CsrConnection<ReferencedConnection>),
1670 Postgres(PostgresConnection<ReferencedConnection>),
1671 Ssh {
1672 connection: SshConnection,
1673 key_1: SshKey,
1674 key_2: SshKey,
1675 },
1676 Aws(AwsConnection),
1677 AwsPrivatelink(AwsPrivatelinkConnection),
1678 MySql(MySqlConnection<ReferencedConnection>),
1679 SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1680 IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1681}
1682
1683impl ConnectionDetails {
1684 pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1685 match self {
1686 ConnectionDetails::Kafka(c) => {
1687 mz_storage_types::connections::Connection::Kafka(c.clone())
1688 }
1689 ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1690 ConnectionDetails::Postgres(c) => {
1691 mz_storage_types::connections::Connection::Postgres(c.clone())
1692 }
1693 ConnectionDetails::Ssh { connection, .. } => {
1694 mz_storage_types::connections::Connection::Ssh(connection.clone())
1695 }
1696 ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1697 ConnectionDetails::AwsPrivatelink(c) => {
1698 mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1699 }
1700 ConnectionDetails::MySql(c) => {
1701 mz_storage_types::connections::Connection::MySql(c.clone())
1702 }
1703 ConnectionDetails::SqlServer(c) => {
1704 mz_storage_types::connections::Connection::SqlServer(c.clone())
1705 }
1706 ConnectionDetails::IcebergCatalog(c) => {
1707 mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1708 }
1709 }
1710 }
1711}
1712
1713#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1714pub struct NetworkPolicyRule {
1715 pub name: String,
1716 pub action: NetworkPolicyRuleAction,
1717 pub address: PolicyAddress,
1718 pub direction: NetworkPolicyRuleDirection,
1719}
1720
1721#[derive(
1722 Debug,
1723 Clone,
1724 Serialize,
1725 Deserialize,
1726 PartialEq,
1727 Eq,
1728 Ord,
1729 PartialOrd,
1730 Hash
1731)]
1732pub enum NetworkPolicyRuleAction {
1733 Allow,
1734}
1735
1736impl std::fmt::Display for NetworkPolicyRuleAction {
1737 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1738 match self {
1739 Self::Allow => write!(f, "allow"),
1740 }
1741 }
1742}
1743impl TryFrom<&str> for NetworkPolicyRuleAction {
1744 type Error = PlanError;
1745 fn try_from(value: &str) -> Result<Self, Self::Error> {
1746 match value.to_uppercase().as_str() {
1747 "ALLOW" => Ok(Self::Allow),
1748 _ => Err(PlanError::Unstructured(
1749 "Allow is the only valid option".into(),
1750 )),
1751 }
1752 }
1753}
1754
1755#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1756pub enum NetworkPolicyRuleDirection {
1757 Ingress,
1758}
1759impl std::fmt::Display for NetworkPolicyRuleDirection {
1760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1761 match self {
1762 Self::Ingress => write!(f, "ingress"),
1763 }
1764 }
1765}
1766impl TryFrom<&str> for NetworkPolicyRuleDirection {
1767 type Error = PlanError;
1768 fn try_from(value: &str) -> Result<Self, Self::Error> {
1769 match value.to_uppercase().as_str() {
1770 "INGRESS" => Ok(Self::Ingress),
1771 _ => Err(PlanError::Unstructured(
1772 "Ingress is the only valid option".into(),
1773 )),
1774 }
1775 }
1776}
1777
1778#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1779pub struct PolicyAddress(pub IpNet);
1780impl std::fmt::Display for PolicyAddress {
1781 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1782 write!(f, "{}", &self.0.to_string())
1783 }
1784}
1785impl From<String> for PolicyAddress {
1786 fn from(value: String) -> Self {
1787 Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1788 }
1789}
1790impl TryFrom<&str> for PolicyAddress {
1791 type Error = PlanError;
1792 fn try_from(value: &str) -> Result<Self, Self::Error> {
1793 let net = IpNet::from_str(value)
1794 .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1795 Ok(Self(net))
1796 }
1797}
1798
1799impl Serialize for PolicyAddress {
1800 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1801 where
1802 S: serde::Serializer,
1803 {
1804 serializer.serialize_str(&format!("{}", &self.0))
1805 }
1806}
1807
1808#[derive(Clone, Debug, Serialize)]
1809pub enum SshKey {
1810 PublicOnly(String),
1811 Both(SshKeyPair),
1812}
1813
1814impl SshKey {
1815 pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1816 match self {
1817 SshKey::PublicOnly(_) => None,
1818 SshKey::Both(key_pair) => Some(key_pair),
1819 }
1820 }
1821
1822 pub fn public_key(&self) -> String {
1823 match self {
1824 SshKey::PublicOnly(s) => s.into(),
1825 SshKey::Both(p) => p.ssh_public_key(),
1826 }
1827 }
1828}
1829
1830#[derive(Clone, Debug)]
1831pub struct Secret {
1832 pub create_sql: String,
1833 pub secret_as: MirScalarExpr,
1834}
1835
1836#[derive(Clone, Debug)]
1837pub struct Sink {
1838 pub create_sql: String,
1840 pub from: GlobalId,
1842 pub connection: StorageSinkConnection<ReferencedConnection>,
1844 pub envelope: SinkEnvelope,
1846 pub version: u64,
1847 pub commit_interval: Option<Duration>,
1848}
1849
1850#[derive(Clone, Debug)]
1851pub struct View {
1852 pub create_sql: String,
1854 pub expr: HirRelationExpr,
1856 pub dependencies: DependencyIds,
1858 pub column_names: Vec<ColumnName>,
1860 pub temporary: bool,
1862}
1863
1864#[derive(Clone, Debug)]
1865pub struct MaterializedView {
1866 pub create_sql: String,
1868 pub expr: HirRelationExpr,
1870 pub dependencies: DependencyIds,
1872 pub column_names: Vec<ColumnName>,
1874 pub replacement_target: Option<CatalogItemId>,
1875 pub cluster_id: ClusterId,
1877 pub target_replica: Option<ReplicaId>,
1879 pub non_null_assertions: Vec<usize>,
1880 pub compaction_window: Option<CompactionWindow>,
1881 pub refresh_schedule: Option<RefreshSchedule>,
1882 pub as_of: Option<Timestamp>,
1883}
1884
1885#[derive(Clone, Debug)]
1886pub struct Index {
1887 pub create_sql: String,
1889 pub on: GlobalId,
1891 pub keys: Vec<mz_expr::MirScalarExpr>,
1892 pub compaction_window: Option<CompactionWindow>,
1893 pub cluster_id: ClusterId,
1894}
1895
1896#[derive(Clone, Debug)]
1897pub struct Type {
1898 pub create_sql: String,
1899 pub inner: CatalogType<IdReference>,
1900}
1901
1902#[derive(Deserialize, Clone, Debug, PartialEq)]
1904pub enum QueryWhen {
1905 Immediately,
1908 FreshestTableWrite,
1911 AtTimestamp(Timestamp),
1916 AtLeastTimestamp(Timestamp),
1919}
1920
1921impl QueryWhen {
1922 pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1924 match self {
1925 QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1926 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1927 }
1928 }
1929 pub fn constrains_upper(&self) -> bool {
1933 match self {
1934 QueryWhen::AtTimestamp(_) => true,
1935 QueryWhen::AtLeastTimestamp(_)
1936 | QueryWhen::Immediately
1937 | QueryWhen::FreshestTableWrite => false,
1938 }
1939 }
1940 pub fn advance_to_since(&self) -> bool {
1942 match self {
1943 QueryWhen::Immediately
1944 | QueryWhen::AtLeastTimestamp(_)
1945 | QueryWhen::FreshestTableWrite => true,
1946 QueryWhen::AtTimestamp(_) => false,
1947 }
1948 }
1949 pub fn can_advance_to_upper(&self) -> bool {
1951 match self {
1952 QueryWhen::Immediately => true,
1953 QueryWhen::FreshestTableWrite
1954 | QueryWhen::AtTimestamp(_)
1955 | QueryWhen::AtLeastTimestamp(_) => false,
1956 }
1957 }
1958
1959 pub fn can_advance_to_timeline_ts(&self) -> bool {
1961 match self {
1962 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1963 QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1964 }
1965 }
1966 pub fn must_advance_to_timeline_ts(&self) -> bool {
1968 match self {
1969 QueryWhen::FreshestTableWrite => true,
1970 QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1971 false
1972 }
1973 }
1974 }
1975 pub fn is_transactional(&self) -> bool {
1977 match self {
1978 QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1979 QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1980 }
1981 }
1982}
1983
1984#[derive(Debug, Copy, Clone)]
1985pub enum MutationKind {
1986 Insert,
1987 Update,
1988 Delete,
1989}
1990
1991#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1992pub enum CopyFormat {
1993 Text,
1994 Csv,
1995 Binary,
1996 Parquet,
1997}
1998
1999#[derive(Debug, Copy, Clone)]
2000pub enum ExecuteTimeout {
2001 None,
2002 Seconds(f64),
2003 WaitOnce,
2004}
2005
2006#[derive(Clone, Debug)]
2007pub enum IndexOption {
2008 RetainHistory(CompactionWindow),
2010}
2011
2012#[derive(Clone, Debug)]
2013pub enum TableOption {
2014 RetainHistory(CompactionWindow),
2016}
2017
2018#[derive(Clone, Debug)]
2019pub struct PlanClusterOption {
2020 pub availability_zones: AlterOptionParameter<Vec<String>>,
2021 pub introspection_debugging: AlterOptionParameter<bool>,
2022 pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2023 pub managed: AlterOptionParameter<bool>,
2024 pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2025 pub replication_factor: AlterOptionParameter<u32>,
2026 pub size: AlterOptionParameter,
2027 pub schedule: AlterOptionParameter<ClusterSchedule>,
2028 pub workload_class: AlterOptionParameter<Option<String>>,
2029}
2030
2031impl Default for PlanClusterOption {
2032 fn default() -> Self {
2033 Self {
2034 availability_zones: AlterOptionParameter::Unchanged,
2035 introspection_debugging: AlterOptionParameter::Unchanged,
2036 introspection_interval: AlterOptionParameter::Unchanged,
2037 managed: AlterOptionParameter::Unchanged,
2038 replicas: AlterOptionParameter::Unchanged,
2039 replication_factor: AlterOptionParameter::Unchanged,
2040 size: AlterOptionParameter::Unchanged,
2041 schedule: AlterOptionParameter::Unchanged,
2042 workload_class: AlterOptionParameter::Unchanged,
2043 }
2044 }
2045}
2046
2047#[derive(Clone, Debug, PartialEq, Eq)]
2048pub enum AlterClusterPlanStrategy {
2049 None,
2050 For(Duration),
2051 UntilReady {
2052 on_timeout: OnTimeoutAction,
2053 timeout: Duration,
2054 },
2055}
2056
2057#[derive(Clone, Debug, PartialEq, Eq)]
2058pub enum OnTimeoutAction {
2059 Commit,
2060 Rollback,
2061}
2062
2063impl Default for OnTimeoutAction {
2064 fn default() -> Self {
2065 Self::Commit
2066 }
2067}
2068
2069impl TryFrom<&str> for OnTimeoutAction {
2070 type Error = PlanError;
2071 fn try_from(value: &str) -> Result<Self, Self::Error> {
2072 match value.to_uppercase().as_str() {
2073 "COMMIT" => Ok(Self::Commit),
2074 "ROLLBACK" => Ok(Self::Rollback),
2075 _ => Err(PlanError::Unstructured(
2076 "Valid options are COMMIT, ROLLBACK".into(),
2077 )),
2078 }
2079 }
2080}
2081
2082impl AlterClusterPlanStrategy {
2083 pub fn is_none(&self) -> bool {
2084 matches!(self, Self::None)
2085 }
2086 pub fn is_some(&self) -> bool {
2087 !matches!(self, Self::None)
2088 }
2089}
2090
2091impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2092 type Error = PlanError;
2093
2094 fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2095 Ok(match value.wait {
2096 Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2097 Some(ClusterAlterOptionValue::UntilReady(options)) => {
2098 let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2099 Self::UntilReady {
2100 timeout: match extracted.timeout {
2101 Some(d) => d,
2102 None => Err(PlanError::UntilReadyTimeoutRequired)?,
2103 },
2104 on_timeout: match extracted.on_timeout {
2105 Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2106 PlanError::InvalidOptionValue {
2107 option_name: "ON TIMEOUT".into(),
2108 err: Box::new(e),
2109 }
2110 })?,
2111 None => OnTimeoutAction::default(),
2112 },
2113 }
2114 }
2115 None => Self::None,
2116 })
2117 }
2118}
2119
2120#[derive(Debug, Clone)]
2122pub struct Params {
2123 pub datums: Row,
2125 pub execute_types: Vec<SqlScalarType>,
2127 pub expected_types: Vec<SqlScalarType>,
2129}
2130
2131impl Params {
2132 pub fn empty() -> Params {
2134 Params {
2135 datums: Row::pack_slice(&[]),
2136 execute_types: vec![],
2137 expected_types: vec![],
2138 }
2139 }
2140}
2141
2142#[derive(
2144 Ord,
2145 PartialOrd,
2146 Clone,
2147 Debug,
2148 Eq,
2149 PartialEq,
2150 Serialize,
2151 Deserialize,
2152 Hash,
2153 Copy
2154)]
2155pub struct PlanContext {
2156 pub wall_time: DateTime<Utc>,
2157 pub ignore_if_exists_errors: bool,
2158}
2159
2160impl PlanContext {
2161 pub fn new(wall_time: DateTime<Utc>) -> Self {
2162 Self {
2163 wall_time,
2164 ignore_if_exists_errors: false,
2165 }
2166 }
2167
2168 pub fn zero() -> Self {
2172 PlanContext {
2173 wall_time: now::to_datetime(NOW_ZERO()),
2174 ignore_if_exists_errors: false,
2175 }
2176 }
2177
2178 pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2179 self.ignore_if_exists_errors = value;
2180 self
2181 }
2182}