1use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{
41    CollectionPlan, ColumnOrder, MapFilterProject, MirRelationExpr, MirScalarExpr, RowSetFinishing,
42};
43use mz_ore::now::{self, NOW_ZERO};
44use mz_pgcopy::CopyFormatParams;
45use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
46use mz_repr::explain::{ExplainConfig, ExplainFormat};
47use mz_repr::network_policy_id::NetworkPolicyId;
48use mz_repr::optimize::OptimizerFeatureOverrides;
49use mz_repr::refresh_schedule::RefreshSchedule;
50use mz_repr::role_id::RoleId;
51use mz_repr::{
52    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, Row, SqlColumnType,
53    SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
54};
55use mz_sql_parser::ast::{
56    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
57    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
58    Value, WithOptionValue,
59};
60use mz_ssh_util::keys::SshKeyPair;
61use mz_storage_types::connections::aws::AwsConnection;
62use mz_storage_types::connections::inline::ReferencedConnection;
63use mz_storage_types::connections::{
64    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
65    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
66};
67use mz_storage_types::instances::StorageInstanceId;
68use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
69use mz_storage_types::sources::{
70    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
71};
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74
75use crate::ast::{
76    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
77    TransactionAccessMode,
78};
79use crate::catalog::{
80    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
81    RoleAttributesRaw,
82};
83use crate::names::{
84    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
85    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
86};
87
88pub(crate) mod error;
89pub(crate) mod explain;
90pub(crate) mod hir;
91pub(crate) mod literal;
92pub(crate) mod lowering;
93pub(crate) mod notice;
94pub(crate) mod plan_utils;
95pub(crate) mod query;
96pub(crate) mod scope;
97pub(crate) mod side_effecting_func;
98pub(crate) mod statement;
99pub(crate) mod transform_ast;
100pub(crate) mod transform_hir;
101pub(crate) mod typeconv;
102pub(crate) mod with_options;
103
104use crate::plan;
105use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
106use crate::plan::with_options::OptionalDuration;
107pub use error::PlanError;
108pub use explain::normalize_subqueries;
109pub use hir::{
110    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
111    WindowExprType,
112};
113pub use lowering::Config as HirToMirConfig;
114pub use notice::PlanNotice;
115pub use query::{ExprContext, QueryContext, QueryLifetime};
116pub use scope::Scope;
117pub use side_effecting_func::SideEffectingFunc;
118pub use statement::ddl::{
119    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
120    PlannedAlterRoleOption, PlannedRoleVariable, SqlServerConfigOptionExtracted,
121};
122pub use statement::{
123    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
124    resolve_cluster_for_materialized_view,
125};
126
127use self::statement::ddl::ClusterAlterOptionExtracted;
128use self::with_options::TryFromValue;
129
130#[derive(Debug, EnumKind)]
132#[enum_kind(PlanKind)]
133pub enum Plan {
134    CreateConnection(CreateConnectionPlan),
135    CreateDatabase(CreateDatabasePlan),
136    CreateSchema(CreateSchemaPlan),
137    CreateRole(CreateRolePlan),
138    CreateCluster(CreateClusterPlan),
139    CreateClusterReplica(CreateClusterReplicaPlan),
140    CreateSource(CreateSourcePlan),
141    CreateSources(Vec<CreateSourcePlanBundle>),
142    CreateSecret(CreateSecretPlan),
143    CreateSink(CreateSinkPlan),
144    CreateTable(CreateTablePlan),
145    CreateView(CreateViewPlan),
146    CreateMaterializedView(CreateMaterializedViewPlan),
147    CreateContinualTask(CreateContinualTaskPlan),
148    CreateNetworkPolicy(CreateNetworkPolicyPlan),
149    CreateIndex(CreateIndexPlan),
150    CreateType(CreateTypePlan),
151    Comment(CommentPlan),
152    DiscardTemp,
153    DiscardAll,
154    DropObjects(DropObjectsPlan),
155    DropOwned(DropOwnedPlan),
156    EmptyQuery,
157    ShowAllVariables,
158    ShowCreate(ShowCreatePlan),
159    ShowColumns(ShowColumnsPlan),
160    ShowVariable(ShowVariablePlan),
161    InspectShard(InspectShardPlan),
162    SetVariable(SetVariablePlan),
163    ResetVariable(ResetVariablePlan),
164    SetTransaction(SetTransactionPlan),
165    StartTransaction(StartTransactionPlan),
166    CommitTransaction(CommitTransactionPlan),
167    AbortTransaction(AbortTransactionPlan),
168    Select(SelectPlan),
169    Subscribe(SubscribePlan),
170    CopyFrom(CopyFromPlan),
171    CopyTo(CopyToPlan),
172    ExplainPlan(ExplainPlanPlan),
173    ExplainPushdown(ExplainPushdownPlan),
174    ExplainTimestamp(ExplainTimestampPlan),
175    ExplainSinkSchema(ExplainSinkSchemaPlan),
176    Insert(InsertPlan),
177    AlterCluster(AlterClusterPlan),
178    AlterClusterSwap(AlterClusterSwapPlan),
179    AlterNoop(AlterNoopPlan),
180    AlterSetCluster(AlterSetClusterPlan),
181    AlterConnection(AlterConnectionPlan),
182    AlterSource(AlterSourcePlan),
183    AlterClusterRename(AlterClusterRenamePlan),
184    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
185    AlterItemRename(AlterItemRenamePlan),
186    AlterSchemaRename(AlterSchemaRenamePlan),
187    AlterSchemaSwap(AlterSchemaSwapPlan),
188    AlterSecret(AlterSecretPlan),
189    AlterSink(AlterSinkPlan),
190    AlterSystemSet(AlterSystemSetPlan),
191    AlterSystemReset(AlterSystemResetPlan),
192    AlterSystemResetAll(AlterSystemResetAllPlan),
193    AlterRole(AlterRolePlan),
194    AlterOwner(AlterOwnerPlan),
195    AlterTableAddColumn(AlterTablePlan),
196    AlterNetworkPolicy(AlterNetworkPolicyPlan),
197    Declare(DeclarePlan),
198    Fetch(FetchPlan),
199    Close(ClosePlan),
200    ReadThenWrite(ReadThenWritePlan),
201    Prepare(PreparePlan),
202    Execute(ExecutePlan),
203    Deallocate(DeallocatePlan),
204    Raise(RaisePlan),
205    GrantRole(GrantRolePlan),
206    RevokeRole(RevokeRolePlan),
207    GrantPrivileges(GrantPrivilegesPlan),
208    RevokePrivileges(RevokePrivilegesPlan),
209    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
210    ReassignOwned(ReassignOwnedPlan),
211    SideEffectingFunc(SideEffectingFunc),
212    ValidateConnection(ValidateConnectionPlan),
213    AlterRetainHistory(AlterRetainHistoryPlan),
214}
215
216impl Plan {
217    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246            ],
247            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
248            StatementKind::AlterSystemResetAll => {
249                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
250            }
251            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
252            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
253            StatementKind::AlterTableAddColumn => {
254                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
255            }
256            StatementKind::Close => &[PlanKind::Close],
257            StatementKind::Comment => &[PlanKind::Comment],
258            StatementKind::Commit => &[PlanKind::CommitTransaction],
259            StatementKind::Copy => &[
260                PlanKind::CopyFrom,
261                PlanKind::Select,
262                PlanKind::Subscribe,
263                PlanKind::CopyTo,
264            ],
265            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
266            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
267            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
268            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
269            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
270            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
271            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
272            StatementKind::CreateContinualTask => &[PlanKind::CreateContinualTask],
273            StatementKind::CreateRole => &[PlanKind::CreateRole],
274            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
275            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
276            StatementKind::CreateSink => &[PlanKind::CreateSink],
277            StatementKind::CreateSource | StatementKind::CreateSubsource => {
278                &[PlanKind::CreateSource]
279            }
280            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
281            StatementKind::CreateTable => &[PlanKind::CreateTable],
282            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
283            StatementKind::CreateType => &[PlanKind::CreateType],
284            StatementKind::CreateView => &[PlanKind::CreateView],
285            StatementKind::Deallocate => &[PlanKind::Deallocate],
286            StatementKind::Declare => &[PlanKind::Declare],
287            StatementKind::Delete => &[PlanKind::ReadThenWrite],
288            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
289            StatementKind::DropObjects => &[PlanKind::DropObjects],
290            StatementKind::DropOwned => &[PlanKind::DropOwned],
291            StatementKind::Execute => &[PlanKind::Execute],
292            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
293            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
294            StatementKind::ExplainAnalyze => &[PlanKind::Select],
295            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
296            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
297            StatementKind::Fetch => &[PlanKind::Fetch],
298            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
299            StatementKind::GrantRole => &[PlanKind::GrantRole],
300            StatementKind::Insert => &[PlanKind::Insert],
301            StatementKind::Prepare => &[PlanKind::Prepare],
302            StatementKind::Raise => &[PlanKind::Raise],
303            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
304            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
305            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
306            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
307            StatementKind::Rollback => &[PlanKind::AbortTransaction],
308            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
309            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
310            StatementKind::SetVariable => &[PlanKind::SetVariable],
311            StatementKind::Show => &[
312                PlanKind::Select,
313                PlanKind::ShowVariable,
314                PlanKind::ShowCreate,
315                PlanKind::ShowColumns,
316                PlanKind::ShowAllVariables,
317                PlanKind::InspectShard,
318            ],
319            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
320            StatementKind::Subscribe => &[PlanKind::Subscribe],
321            StatementKind::Update => &[PlanKind::ReadThenWrite],
322            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
323            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
324        }
325    }
326
327    pub fn name(&self) -> &str {
329        match self {
330            Plan::CreateConnection(_) => "create connection",
331            Plan::CreateDatabase(_) => "create database",
332            Plan::CreateSchema(_) => "create schema",
333            Plan::CreateRole(_) => "create role",
334            Plan::CreateCluster(_) => "create cluster",
335            Plan::CreateClusterReplica(_) => "create cluster replica",
336            Plan::CreateSource(_) => "create source",
337            Plan::CreateSources(_) => "create source",
338            Plan::CreateSecret(_) => "create secret",
339            Plan::CreateSink(_) => "create sink",
340            Plan::CreateTable(_) => "create table",
341            Plan::CreateView(_) => "create view",
342            Plan::CreateMaterializedView(_) => "create materialized view",
343            Plan::CreateContinualTask(_) => "create continual task",
344            Plan::CreateIndex(_) => "create index",
345            Plan::CreateType(_) => "create type",
346            Plan::CreateNetworkPolicy(_) => "create network policy",
347            Plan::Comment(_) => "comment",
348            Plan::DiscardTemp => "discard temp",
349            Plan::DiscardAll => "discard all",
350            Plan::DropObjects(plan) => match plan.object_type {
351                ObjectType::Table => "drop table",
352                ObjectType::View => "drop view",
353                ObjectType::MaterializedView => "drop materialized view",
354                ObjectType::Source => "drop source",
355                ObjectType::Sink => "drop sink",
356                ObjectType::Index => "drop index",
357                ObjectType::Type => "drop type",
358                ObjectType::Role => "drop roles",
359                ObjectType::Cluster => "drop clusters",
360                ObjectType::ClusterReplica => "drop cluster replicas",
361                ObjectType::Secret => "drop secret",
362                ObjectType::Connection => "drop connection",
363                ObjectType::Database => "drop database",
364                ObjectType::Schema => "drop schema",
365                ObjectType::Func => "drop function",
366                ObjectType::ContinualTask => "drop continual task",
367                ObjectType::NetworkPolicy => "drop network policy",
368            },
369            Plan::DropOwned(_) => "drop owned",
370            Plan::EmptyQuery => "do nothing",
371            Plan::ShowAllVariables => "show all variables",
372            Plan::ShowCreate(_) => "show create",
373            Plan::ShowColumns(_) => "show columns",
374            Plan::ShowVariable(_) => "show variable",
375            Plan::InspectShard(_) => "inspect shard",
376            Plan::SetVariable(_) => "set variable",
377            Plan::ResetVariable(_) => "reset variable",
378            Plan::SetTransaction(_) => "set transaction",
379            Plan::StartTransaction(_) => "start transaction",
380            Plan::CommitTransaction(_) => "commit",
381            Plan::AbortTransaction(_) => "abort",
382            Plan::Select(_) => "select",
383            Plan::Subscribe(_) => "subscribe",
384            Plan::CopyFrom(_) => "copy from",
385            Plan::CopyTo(_) => "copy to",
386            Plan::ExplainPlan(_) => "explain plan",
387            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
388            Plan::ExplainTimestamp(_) => "explain timestamp",
389            Plan::ExplainSinkSchema(_) => "explain schema",
390            Plan::Insert(_) => "insert",
391            Plan::AlterNoop(plan) => match plan.object_type {
392                ObjectType::Table => "alter table",
393                ObjectType::View => "alter view",
394                ObjectType::MaterializedView => "alter materialized view",
395                ObjectType::Source => "alter source",
396                ObjectType::Sink => "alter sink",
397                ObjectType::Index => "alter index",
398                ObjectType::Type => "alter type",
399                ObjectType::Role => "alter role",
400                ObjectType::Cluster => "alter cluster",
401                ObjectType::ClusterReplica => "alter cluster replica",
402                ObjectType::Secret => "alter secret",
403                ObjectType::Connection => "alter connection",
404                ObjectType::Database => "alter database",
405                ObjectType::Schema => "alter schema",
406                ObjectType::Func => "alter function",
407                ObjectType::ContinualTask => "alter continual task",
408                ObjectType::NetworkPolicy => "alter network policy",
409            },
410            Plan::AlterCluster(_) => "alter cluster",
411            Plan::AlterClusterRename(_) => "alter cluster rename",
412            Plan::AlterClusterSwap(_) => "alter cluster swap",
413            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
414            Plan::AlterSetCluster(_) => "alter set cluster",
415            Plan::AlterConnection(_) => "alter connection",
416            Plan::AlterSource(_) => "alter source",
417            Plan::AlterItemRename(_) => "rename item",
418            Plan::AlterSchemaRename(_) => "alter rename schema",
419            Plan::AlterSchemaSwap(_) => "alter swap schema",
420            Plan::AlterSecret(_) => "alter secret",
421            Plan::AlterSink(_) => "alter sink",
422            Plan::AlterSystemSet(_) => "alter system",
423            Plan::AlterSystemReset(_) => "alter system",
424            Plan::AlterSystemResetAll(_) => "alter system",
425            Plan::AlterRole(_) => "alter role",
426            Plan::AlterNetworkPolicy(_) => "alter network policy",
427            Plan::AlterOwner(plan) => match plan.object_type {
428                ObjectType::Table => "alter table owner",
429                ObjectType::View => "alter view owner",
430                ObjectType::MaterializedView => "alter materialized view owner",
431                ObjectType::Source => "alter source owner",
432                ObjectType::Sink => "alter sink owner",
433                ObjectType::Index => "alter index owner",
434                ObjectType::Type => "alter type owner",
435                ObjectType::Role => "alter role owner",
436                ObjectType::Cluster => "alter cluster owner",
437                ObjectType::ClusterReplica => "alter cluster replica owner",
438                ObjectType::Secret => "alter secret owner",
439                ObjectType::Connection => "alter connection owner",
440                ObjectType::Database => "alter database owner",
441                ObjectType::Schema => "alter schema owner",
442                ObjectType::Func => "alter function owner",
443                ObjectType::ContinualTask => "alter continual task owner",
444                ObjectType::NetworkPolicy => "alter network policy owner",
445            },
446            Plan::AlterTableAddColumn(_) => "alter table add column",
447            Plan::Declare(_) => "declare",
448            Plan::Fetch(_) => "fetch",
449            Plan::Close(_) => "close",
450            Plan::ReadThenWrite(plan) => match plan.kind {
451                MutationKind::Insert => "insert into select",
452                MutationKind::Update => "update",
453                MutationKind::Delete => "delete",
454            },
455            Plan::Prepare(_) => "prepare",
456            Plan::Execute(_) => "execute",
457            Plan::Deallocate(_) => "deallocate",
458            Plan::Raise(_) => "raise",
459            Plan::GrantRole(_) => "grant role",
460            Plan::RevokeRole(_) => "revoke role",
461            Plan::GrantPrivileges(_) => "grant privilege",
462            Plan::RevokePrivileges(_) => "revoke privilege",
463            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
464            Plan::ReassignOwned(_) => "reassign owned",
465            Plan::SideEffectingFunc(_) => "side effecting func",
466            Plan::ValidateConnection(_) => "validate connection",
467            Plan::AlterRetainHistory(_) => "alter retain history",
468        }
469    }
470
471    pub fn allowed_in_read_only(&self) -> bool {
477        match self {
478            Plan::SetVariable(_) => true,
481            Plan::ResetVariable(_) => true,
482            Plan::SetTransaction(_) => true,
483            Plan::StartTransaction(_) => true,
484            Plan::CommitTransaction(_) => true,
485            Plan::AbortTransaction(_) => true,
486            Plan::Select(_) => true,
487            Plan::EmptyQuery => true,
488            Plan::ShowAllVariables => true,
489            Plan::ShowCreate(_) => true,
490            Plan::ShowColumns(_) => true,
491            Plan::ShowVariable(_) => true,
492            Plan::InspectShard(_) => true,
493            Plan::Subscribe(_) => true,
494            Plan::CopyTo(_) => true,
495            Plan::ExplainPlan(_) => true,
496            Plan::ExplainPushdown(_) => true,
497            Plan::ExplainTimestamp(_) => true,
498            Plan::ExplainSinkSchema(_) => true,
499            Plan::ValidateConnection(_) => true,
500            _ => false,
501        }
502    }
503}
504
505#[derive(Debug)]
506pub struct StartTransactionPlan {
507    pub access: Option<TransactionAccessMode>,
508    pub isolation_level: Option<TransactionIsolationLevel>,
509}
510
511#[derive(Debug)]
512pub enum TransactionType {
513    Explicit,
514    Implicit,
515}
516
517impl TransactionType {
518    pub fn is_explicit(&self) -> bool {
519        matches!(self, TransactionType::Explicit)
520    }
521
522    pub fn is_implicit(&self) -> bool {
523        matches!(self, TransactionType::Implicit)
524    }
525}
526
527#[derive(Debug)]
528pub struct CommitTransactionPlan {
529    pub transaction_type: TransactionType,
530}
531
532#[derive(Debug)]
533pub struct AbortTransactionPlan {
534    pub transaction_type: TransactionType,
535}
536
537#[derive(Debug)]
538pub struct CreateDatabasePlan {
539    pub name: String,
540    pub if_not_exists: bool,
541}
542
543#[derive(Debug)]
544pub struct CreateSchemaPlan {
545    pub database_spec: ResolvedDatabaseSpecifier,
546    pub schema_name: String,
547    pub if_not_exists: bool,
548}
549
550#[derive(Debug)]
551pub struct CreateRolePlan {
552    pub name: String,
553    pub attributes: RoleAttributesRaw,
554}
555
556#[derive(Debug, PartialEq, Eq, Clone)]
557pub struct CreateClusterPlan {
558    pub name: String,
559    pub variant: CreateClusterVariant,
560    pub workload_class: Option<String>,
561}
562
563#[derive(Debug, PartialEq, Eq, Clone)]
564pub enum CreateClusterVariant {
565    Managed(CreateClusterManagedPlan),
566    Unmanaged(CreateClusterUnmanagedPlan),
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub struct CreateClusterUnmanagedPlan {
571    pub replicas: Vec<(String, ReplicaConfig)>,
572}
573
574#[derive(Debug, PartialEq, Eq, Clone)]
575pub struct CreateClusterManagedPlan {
576    pub replication_factor: u32,
577    pub size: String,
578    pub availability_zones: Vec<String>,
579    pub compute: ComputeReplicaConfig,
580    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
581    pub schedule: ClusterSchedule,
582}
583
584#[derive(Debug)]
585pub struct CreateClusterReplicaPlan {
586    pub cluster_id: ClusterId,
587    pub name: String,
588    pub config: ReplicaConfig,
589}
590
591#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq)]
593pub struct ComputeReplicaIntrospectionConfig {
594    pub debugging: bool,
596    pub interval: Duration,
598}
599
600#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
601pub struct ComputeReplicaConfig {
602    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
603}
604
605#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
606pub enum ReplicaConfig {
607    Unorchestrated {
608        storagectl_addrs: Vec<String>,
609        computectl_addrs: Vec<String>,
610        compute: ComputeReplicaConfig,
611    },
612    Orchestrated {
613        size: String,
614        availability_zone: Option<String>,
615        compute: ComputeReplicaConfig,
616        internal: bool,
617        billed_as: Option<String>,
618    },
619}
620
621#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
622pub enum ClusterSchedule {
623    Manual,
625    Refresh { hydration_time_estimate: Duration },
629}
630
631impl Default for ClusterSchedule {
632    fn default() -> Self {
633        ClusterSchedule::Manual
635    }
636}
637
638#[derive(Debug)]
639pub struct CreateSourcePlan {
640    pub name: QualifiedItemName,
641    pub source: Source,
642    pub if_not_exists: bool,
643    pub timeline: Timeline,
644    pub in_cluster: Option<ClusterId>,
646}
647
648#[derive(Clone, Debug, PartialEq, Eq)]
649pub struct SourceReferences {
650    pub updated_at: u64,
651    pub references: Vec<SourceReference>,
652}
653
654#[derive(Clone, Debug, PartialEq, Eq)]
657pub struct SourceReference {
658    pub name: String,
659    pub namespace: Option<String>,
660    pub columns: Vec<String>,
661}
662
663#[derive(Debug)]
665pub struct CreateSourcePlanBundle {
666    pub item_id: CatalogItemId,
668    pub global_id: GlobalId,
670    pub plan: CreateSourcePlan,
672    pub resolved_ids: ResolvedIds,
674    pub available_source_references: Option<SourceReferences>,
678}
679
680#[derive(Debug)]
681pub struct CreateConnectionPlan {
682    pub name: QualifiedItemName,
683    pub if_not_exists: bool,
684    pub connection: Connection,
685    pub validate: bool,
686}
687
688#[derive(Debug)]
689pub struct ValidateConnectionPlan {
690    pub id: CatalogItemId,
692    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
694}
695
696#[derive(Debug)]
697pub struct CreateSecretPlan {
698    pub name: QualifiedItemName,
699    pub secret: Secret,
700    pub if_not_exists: bool,
701}
702
703#[derive(Debug)]
704pub struct CreateSinkPlan {
705    pub name: QualifiedItemName,
706    pub sink: Sink,
707    pub with_snapshot: bool,
708    pub if_not_exists: bool,
709    pub in_cluster: ClusterId,
710}
711
712#[derive(Debug)]
713pub struct CreateTablePlan {
714    pub name: QualifiedItemName,
715    pub table: Table,
716    pub if_not_exists: bool,
717}
718
719#[derive(Debug, Clone)]
720pub struct CreateViewPlan {
721    pub name: QualifiedItemName,
722    pub view: View,
723    pub replace: Option<CatalogItemId>,
725    pub drop_ids: Vec<CatalogItemId>,
727    pub if_not_exists: bool,
728    pub ambiguous_columns: bool,
731}
732
733#[derive(Debug, Clone)]
734pub struct CreateMaterializedViewPlan {
735    pub name: QualifiedItemName,
736    pub materialized_view: MaterializedView,
737    pub replace: Option<CatalogItemId>,
739    pub drop_ids: Vec<CatalogItemId>,
741    pub if_not_exists: bool,
742    pub ambiguous_columns: bool,
745}
746
747#[derive(Debug, Clone)]
748pub struct CreateContinualTaskPlan {
749    pub name: QualifiedItemName,
750    pub placeholder_id: Option<mz_expr::LocalId>,
753    pub desc: RelationDesc,
754    pub input_id: GlobalId,
756    pub with_snapshot: bool,
757    pub continual_task: MaterializedView,
759}
760
761#[derive(Debug, Clone)]
762pub struct CreateNetworkPolicyPlan {
763    pub name: String,
764    pub rules: Vec<NetworkPolicyRule>,
765}
766
767#[derive(Debug, Clone)]
768pub struct AlterNetworkPolicyPlan {
769    pub id: NetworkPolicyId,
770    pub name: String,
771    pub rules: Vec<NetworkPolicyRule>,
772}
773
774#[derive(Debug, Clone)]
775pub struct CreateIndexPlan {
776    pub name: QualifiedItemName,
777    pub index: Index,
778    pub if_not_exists: bool,
779}
780
781#[derive(Debug)]
782pub struct CreateTypePlan {
783    pub name: QualifiedItemName,
784    pub typ: Type,
785}
786
787#[derive(Debug)]
788pub struct DropObjectsPlan {
789    pub referenced_ids: Vec<ObjectId>,
791    pub drop_ids: Vec<ObjectId>,
793    pub object_type: ObjectType,
796}
797
798#[derive(Debug)]
799pub struct DropOwnedPlan {
800    pub role_ids: Vec<RoleId>,
802    pub drop_ids: Vec<ObjectId>,
804    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
806    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
808}
809
810#[derive(Debug)]
811pub struct ShowVariablePlan {
812    pub name: String,
813}
814
815#[derive(Debug)]
816pub struct InspectShardPlan {
817    pub id: GlobalId,
819}
820
821#[derive(Debug)]
822pub struct SetVariablePlan {
823    pub name: String,
824    pub value: VariableValue,
825    pub local: bool,
826}
827
828#[derive(Debug)]
829pub enum VariableValue {
830    Default,
831    Values(Vec<String>),
832}
833
834#[derive(Debug)]
835pub struct ResetVariablePlan {
836    pub name: String,
837}
838
839#[derive(Debug)]
840pub struct SetTransactionPlan {
841    pub local: bool,
842    pub modes: Vec<TransactionMode>,
843}
844
845#[derive(Clone, Debug)]
847pub struct SelectPlan {
848    pub select: Option<Box<SelectStatement<Aug>>>,
851    pub source: HirRelationExpr,
853    pub when: QueryWhen,
855    pub finishing: RowSetFinishing,
857    pub copy_to: Option<CopyFormat>,
859}
860
861impl SelectPlan {
862    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
863        let arity = typ.arity();
864        SelectPlan {
865            select: None,
866            source: HirRelationExpr::Constant { rows, typ },
867            when: QueryWhen::Immediately,
868            finishing: RowSetFinishing::trivial(arity),
869            copy_to: None,
870        }
871    }
872}
873
874#[derive(Debug)]
875pub enum SubscribeOutput {
876    Diffs,
877    WithinTimestampOrderBy {
878        order_by: Vec<ColumnOrder>,
880    },
881    EnvelopeUpsert {
882        order_by_keys: Vec<ColumnOrder>,
884    },
885    EnvelopeDebezium {
886        order_by_keys: Vec<ColumnOrder>,
888    },
889}
890
891#[derive(Debug)]
892pub struct SubscribePlan {
893    pub from: SubscribeFrom,
894    pub with_snapshot: bool,
895    pub when: QueryWhen,
896    pub up_to: Option<Timestamp>,
897    pub copy_to: Option<CopyFormat>,
898    pub emit_progress: bool,
899    pub output: SubscribeOutput,
900}
901
902#[derive(Debug, Clone)]
903pub enum SubscribeFrom {
904    Id(GlobalId),
906    Query {
908        expr: MirRelationExpr,
909        desc: RelationDesc,
910    },
911}
912
913impl SubscribeFrom {
914    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
915        match self {
916            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
917            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
918        }
919    }
920
921    pub fn contains_temporal(&self) -> bool {
922        match self {
923            SubscribeFrom::Id(_) => false,
924            SubscribeFrom::Query { expr, .. } => expr.contains_temporal(),
925        }
926    }
927}
928
929#[derive(Debug)]
930pub struct ShowCreatePlan {
931    pub id: ObjectId,
932    pub row: Row,
933}
934
935#[derive(Debug)]
936pub struct ShowColumnsPlan {
937    pub id: CatalogItemId,
938    pub select_plan: SelectPlan,
939    pub new_resolved_ids: ResolvedIds,
940}
941
942#[derive(Debug)]
943pub struct CopyFromPlan {
944    pub target_id: CatalogItemId,
946    pub target_name: String,
948    pub source: CopyFromSource,
950    pub columns: Vec<ColumnIndex>,
954    pub source_desc: RelationDesc,
956    pub mfp: MapFilterProject,
958    pub params: CopyFormatParams<'static>,
960    pub filter: Option<CopyFromFilter>,
962}
963
964#[derive(Debug)]
965pub enum CopyFromSource {
966    Stdin,
968    Url(HirScalarExpr),
972    AwsS3 {
974        uri: HirScalarExpr,
976        connection: AwsConnection,
978        connection_id: CatalogItemId,
980    },
981}
982
983#[derive(Debug)]
984pub enum CopyFromFilter {
985    Files(Vec<String>),
986    Pattern(String),
987}
988
989#[derive(Debug, Clone)]
990pub struct CopyToPlan {
991    pub select_plan: SelectPlan,
993    pub desc: RelationDesc,
994    pub to: HirScalarExpr,
996    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
997    pub connection_id: CatalogItemId,
999    pub format: S3SinkFormat,
1000    pub max_file_size: u64,
1001}
1002
1003#[derive(Clone, Debug)]
1004pub struct ExplainPlanPlan {
1005    pub stage: ExplainStage,
1006    pub format: ExplainFormat,
1007    pub config: ExplainConfig,
1008    pub explainee: Explainee,
1009}
1010
1011#[derive(Clone, Debug)]
1013pub enum Explainee {
1014    View(CatalogItemId),
1016    MaterializedView(CatalogItemId),
1018    Index(CatalogItemId),
1020    ReplanView(CatalogItemId),
1022    ReplanMaterializedView(CatalogItemId),
1024    ReplanIndex(CatalogItemId),
1026    Statement(ExplaineeStatement),
1028}
1029
1030#[derive(Clone, Debug, EnumKind)]
1032#[enum_kind(ExplaineeStatementKind)]
1033pub enum ExplaineeStatement {
1034    Select {
1036        broken: bool,
1038        plan: plan::SelectPlan,
1039        desc: RelationDesc,
1040    },
1041    CreateView {
1043        broken: bool,
1045        plan: plan::CreateViewPlan,
1046    },
1047    CreateMaterializedView {
1049        broken: bool,
1051        plan: plan::CreateMaterializedViewPlan,
1052    },
1053    CreateIndex {
1055        broken: bool,
1057        plan: plan::CreateIndexPlan,
1058    },
1059}
1060
1061impl ExplaineeStatement {
1062    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1063        match self {
1064            Self::Select { plan, .. } => plan.source.depends_on(),
1065            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1066            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1067            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1068        }
1069    }
1070
1071    pub fn broken(&self) -> bool {
1082        match self {
1083            Self::Select { broken, .. } => *broken,
1084            Self::CreateView { broken, .. } => *broken,
1085            Self::CreateMaterializedView { broken, .. } => *broken,
1086            Self::CreateIndex { broken, .. } => *broken,
1087        }
1088    }
1089}
1090
1091impl ExplaineeStatementKind {
1092    pub fn supports(&self, stage: &ExplainStage) -> bool {
1093        use ExplainStage::*;
1094        match self {
1095            Self::Select => true,
1096            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1097            Self::CreateMaterializedView => true,
1098            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1099        }
1100    }
1101}
1102
1103impl std::fmt::Display for ExplaineeStatementKind {
1104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1105        match self {
1106            Self::Select => write!(f, "SELECT"),
1107            Self::CreateView => write!(f, "CREATE VIEW"),
1108            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1109            Self::CreateIndex => write!(f, "CREATE INDEX"),
1110        }
1111    }
1112}
1113
1114#[derive(Clone, Debug)]
1115pub struct ExplainPushdownPlan {
1116    pub explainee: Explainee,
1117}
1118
1119#[derive(Clone, Debug)]
1120pub struct ExplainTimestampPlan {
1121    pub format: ExplainFormat,
1122    pub raw_plan: HirRelationExpr,
1123    pub when: QueryWhen,
1124}
1125
1126#[derive(Debug)]
1127pub struct ExplainSinkSchemaPlan {
1128    pub sink_from: GlobalId,
1129    pub json_schema: String,
1130}
1131
1132#[derive(Debug)]
1133pub struct SendDiffsPlan {
1134    pub id: CatalogItemId,
1135    pub updates: Vec<(Row, Diff)>,
1136    pub kind: MutationKind,
1137    pub returning: Vec<(Row, NonZeroUsize)>,
1138    pub max_result_size: u64,
1139}
1140
1141#[derive(Debug)]
1142pub struct InsertPlan {
1143    pub id: CatalogItemId,
1144    pub values: HirRelationExpr,
1145    pub returning: Vec<mz_expr::MirScalarExpr>,
1146}
1147
1148#[derive(Debug)]
1149pub struct ReadThenWritePlan {
1150    pub id: CatalogItemId,
1151    pub selection: HirRelationExpr,
1152    pub finishing: RowSetFinishing,
1153    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1154    pub kind: MutationKind,
1155    pub returning: Vec<mz_expr::MirScalarExpr>,
1156}
1157
1158#[derive(Debug)]
1160pub struct AlterNoopPlan {
1161    pub object_type: ObjectType,
1162}
1163
1164#[derive(Debug)]
1165pub struct AlterSetClusterPlan {
1166    pub id: CatalogItemId,
1167    pub set_cluster: ClusterId,
1168}
1169
1170#[derive(Debug)]
1171pub struct AlterRetainHistoryPlan {
1172    pub id: CatalogItemId,
1173    pub value: Option<Value>,
1174    pub window: CompactionWindow,
1175    pub object_type: ObjectType,
1176}
1177
1178#[derive(Debug, Clone)]
1179
1180pub enum AlterOptionParameter<T = String> {
1181    Set(T),
1182    Reset,
1183    Unchanged,
1184}
1185
1186#[derive(Debug)]
1187pub enum AlterConnectionAction {
1188    RotateKeys,
1189    AlterOptions {
1190        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1191        drop_options: BTreeSet<ConnectionOptionName>,
1192        validate: bool,
1193    },
1194}
1195
1196#[derive(Debug)]
1197pub struct AlterConnectionPlan {
1198    pub id: CatalogItemId,
1199    pub action: AlterConnectionAction,
1200}
1201
1202#[derive(Debug)]
1203pub enum AlterSourceAction {
1204    AddSubsourceExports {
1205        subsources: Vec<CreateSourcePlanBundle>,
1206        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1207    },
1208    RefreshReferences {
1209        references: SourceReferences,
1210    },
1211}
1212
1213#[derive(Debug)]
1214pub struct AlterSourcePlan {
1215    pub item_id: CatalogItemId,
1216    pub ingestion_id: GlobalId,
1217    pub action: AlterSourceAction,
1218}
1219
1220#[derive(Debug, Clone)]
1221pub struct AlterSinkPlan {
1222    pub item_id: CatalogItemId,
1223    pub global_id: GlobalId,
1224    pub sink: Sink,
1225    pub with_snapshot: bool,
1226    pub in_cluster: ClusterId,
1227}
1228
1229#[derive(Debug, Clone)]
1230pub struct AlterClusterPlan {
1231    pub id: ClusterId,
1232    pub name: String,
1233    pub options: PlanClusterOption,
1234    pub strategy: AlterClusterPlanStrategy,
1235}
1236
1237#[derive(Debug)]
1238pub struct AlterClusterRenamePlan {
1239    pub id: ClusterId,
1240    pub name: String,
1241    pub to_name: String,
1242}
1243
1244#[derive(Debug)]
1245pub struct AlterClusterReplicaRenamePlan {
1246    pub cluster_id: ClusterId,
1247    pub replica_id: ReplicaId,
1248    pub name: QualifiedReplica,
1249    pub to_name: String,
1250}
1251
1252#[derive(Debug)]
1253pub struct AlterItemRenamePlan {
1254    pub id: CatalogItemId,
1255    pub current_full_name: FullItemName,
1256    pub to_name: String,
1257    pub object_type: ObjectType,
1258}
1259
1260#[derive(Debug)]
1261pub struct AlterSchemaRenamePlan {
1262    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1263    pub new_schema_name: String,
1264}
1265
1266#[derive(Debug)]
1267pub struct AlterSchemaSwapPlan {
1268    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1269    pub schema_a_name: String,
1270    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1271    pub schema_b_name: String,
1272    pub name_temp: String,
1273}
1274
1275#[derive(Debug)]
1276pub struct AlterClusterSwapPlan {
1277    pub id_a: ClusterId,
1278    pub id_b: ClusterId,
1279    pub name_a: String,
1280    pub name_b: String,
1281    pub name_temp: String,
1282}
1283
1284#[derive(Debug)]
1285pub struct AlterSecretPlan {
1286    pub id: CatalogItemId,
1287    pub secret_as: MirScalarExpr,
1288}
1289
1290#[derive(Debug)]
1291pub struct AlterSystemSetPlan {
1292    pub name: String,
1293    pub value: VariableValue,
1294}
1295
1296#[derive(Debug)]
1297pub struct AlterSystemResetPlan {
1298    pub name: String,
1299}
1300
1301#[derive(Debug)]
1302pub struct AlterSystemResetAllPlan {}
1303
1304#[derive(Debug)]
1305pub struct AlterRolePlan {
1306    pub id: RoleId,
1307    pub name: String,
1308    pub option: PlannedAlterRoleOption,
1309}
1310
1311#[derive(Debug)]
1312pub struct AlterOwnerPlan {
1313    pub id: ObjectId,
1314    pub object_type: ObjectType,
1315    pub new_owner: RoleId,
1316}
1317
1318#[derive(Debug)]
1319pub struct AlterTablePlan {
1320    pub relation_id: CatalogItemId,
1321    pub column_name: ColumnName,
1322    pub column_type: SqlColumnType,
1323    pub raw_sql_type: RawDataType,
1324}
1325
1326#[derive(Debug)]
1327pub struct DeclarePlan {
1328    pub name: String,
1329    pub stmt: Statement<Raw>,
1330    pub sql: String,
1331    pub params: Params,
1332}
1333
1334#[derive(Debug)]
1335pub struct FetchPlan {
1336    pub name: String,
1337    pub count: Option<FetchDirection>,
1338    pub timeout: ExecuteTimeout,
1339}
1340
1341#[derive(Debug)]
1342pub struct ClosePlan {
1343    pub name: String,
1344}
1345
1346#[derive(Debug)]
1347pub struct PreparePlan {
1348    pub name: String,
1349    pub stmt: Statement<Raw>,
1350    pub sql: String,
1351    pub desc: StatementDesc,
1352}
1353
1354#[derive(Debug)]
1355pub struct ExecutePlan {
1356    pub name: String,
1357    pub params: Params,
1358}
1359
1360#[derive(Debug)]
1361pub struct DeallocatePlan {
1362    pub name: Option<String>,
1363}
1364
1365#[derive(Debug)]
1366pub struct RaisePlan {
1367    pub severity: NoticeSeverity,
1368}
1369
1370#[derive(Debug)]
1371pub struct GrantRolePlan {
1372    pub role_ids: Vec<RoleId>,
1374    pub member_ids: Vec<RoleId>,
1376    pub grantor_id: RoleId,
1378}
1379
1380#[derive(Debug)]
1381pub struct RevokeRolePlan {
1382    pub role_ids: Vec<RoleId>,
1384    pub member_ids: Vec<RoleId>,
1386    pub grantor_id: RoleId,
1388}
1389
1390#[derive(Debug)]
1391pub struct UpdatePrivilege {
1392    pub acl_mode: AclMode,
1394    pub target_id: SystemObjectId,
1396    pub grantor: RoleId,
1398}
1399
1400#[derive(Debug)]
1401pub struct GrantPrivilegesPlan {
1402    pub update_privileges: Vec<UpdatePrivilege>,
1404    pub grantees: Vec<RoleId>,
1406}
1407
1408#[derive(Debug)]
1409pub struct RevokePrivilegesPlan {
1410    pub update_privileges: Vec<UpdatePrivilege>,
1412    pub revokees: Vec<RoleId>,
1414}
1415#[derive(Debug)]
1416pub struct AlterDefaultPrivilegesPlan {
1417    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1419    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1421    pub is_grant: bool,
1423}
1424
1425#[derive(Debug)]
1426pub struct ReassignOwnedPlan {
1427    pub old_roles: Vec<RoleId>,
1429    pub new_role: RoleId,
1431    pub reassign_ids: Vec<ObjectId>,
1433}
1434
1435#[derive(Debug)]
1436pub struct CommentPlan {
1437    pub object_id: CommentObjectId,
1439    pub sub_component: Option<usize>,
1443    pub comment: Option<String>,
1445}
1446
1447#[derive(Clone, Debug)]
1448pub enum TableDataSource {
1449    TableWrites { defaults: Vec<Expr<Aug>> },
1451
1452    DataSource {
1455        desc: DataSourceDesc,
1456        timeline: Timeline,
1457    },
1458}
1459
1460#[derive(Clone, Debug)]
1461pub struct Table {
1462    pub create_sql: String,
1463    pub desc: VersionedRelationDesc,
1464    pub temporary: bool,
1465    pub compaction_window: Option<CompactionWindow>,
1466    pub data_source: TableDataSource,
1467}
1468
1469#[derive(Clone, Debug)]
1470pub struct Source {
1471    pub create_sql: String,
1472    pub data_source: DataSourceDesc,
1473    pub desc: RelationDesc,
1474    pub compaction_window: Option<CompactionWindow>,
1475}
1476
1477#[derive(Debug, Clone)]
1478pub enum DataSourceDesc {
1479    Ingestion(SourceDesc<ReferencedConnection>),
1481    OldSyntaxIngestion {
1483        desc: SourceDesc<ReferencedConnection>,
1484        progress_subsource: CatalogItemId,
1487        data_config: SourceExportDataConfig<ReferencedConnection>,
1488        details: SourceExportDetails,
1489    },
1490    IngestionExport {
1493        ingestion_id: CatalogItemId,
1494        external_reference: UnresolvedItemName,
1495        details: SourceExportDetails,
1496        data_config: SourceExportDataConfig<ReferencedConnection>,
1497    },
1498    Progress,
1500    Webhook {
1502        validate_using: Option<WebhookValidation>,
1503        body_format: WebhookBodyFormat,
1504        headers: WebhookHeaders,
1505        cluster_id: Option<StorageInstanceId>,
1507    },
1508}
1509
1510#[derive(Clone, Debug, Serialize)]
1511pub struct WebhookValidation {
1512    pub expression: MirScalarExpr,
1514    pub relation_desc: RelationDesc,
1516    pub bodies: Vec<(usize, bool)>,
1518    pub headers: Vec<(usize, bool)>,
1520    pub secrets: Vec<WebhookValidationSecret>,
1522}
1523
1524impl WebhookValidation {
1525    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1526
1527    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1532        let WebhookValidation {
1533            expression,
1534            relation_desc,
1535            ..
1536        } = self;
1537
1538        let mut expression_ = expression.clone();
1540        let desc_ = relation_desc.clone();
1541        let reduce_task = mz_ore::task::spawn_blocking(
1542            || "webhook-validation-reduce",
1543            move || {
1544                expression_.reduce(&desc_.typ().column_types);
1545                expression_
1546            },
1547        );
1548
1549        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1550            Ok(Ok(reduced_expr)) => {
1551                *expression = reduced_expr;
1552                Ok(())
1553            }
1554            Ok(Err(_)) => Err("joining task"),
1555            Err(_) => Err("timeout"),
1556        }
1557    }
1558}
1559
1560#[derive(Clone, Debug, Default, Serialize)]
1561pub struct WebhookHeaders {
1562    pub header_column: Option<WebhookHeaderFilters>,
1564    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1566}
1567
1568impl WebhookHeaders {
1569    pub fn num_columns(&self) -> usize {
1571        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1572        let mapped_headers = self.mapped_headers.len();
1573
1574        header_column + mapped_headers
1575    }
1576}
1577
1578#[derive(Clone, Debug, Default, Serialize)]
1579pub struct WebhookHeaderFilters {
1580    pub block: BTreeSet<String>,
1581    pub allow: BTreeSet<String>,
1582}
1583
1584#[derive(Copy, Clone, Debug, Serialize, Arbitrary)]
1585pub enum WebhookBodyFormat {
1586    Json { array: bool },
1587    Bytes,
1588    Text,
1589}
1590
1591impl From<WebhookBodyFormat> for SqlScalarType {
1592    fn from(value: WebhookBodyFormat) -> Self {
1593        match value {
1594            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1595            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1596            WebhookBodyFormat::Text => SqlScalarType::String,
1597        }
1598    }
1599}
1600
1601#[derive(Clone, Debug, Serialize)]
1602pub struct WebhookValidationSecret {
1603    pub id: CatalogItemId,
1605    pub column_idx: usize,
1607    pub use_bytes: bool,
1609}
1610
1611#[derive(Clone, Debug)]
1612pub struct Connection {
1613    pub create_sql: String,
1614    pub details: ConnectionDetails,
1615}
1616
1617#[derive(Clone, Debug, Serialize)]
1618pub enum ConnectionDetails {
1619    Kafka(KafkaConnection<ReferencedConnection>),
1620    Csr(CsrConnection<ReferencedConnection>),
1621    Postgres(PostgresConnection<ReferencedConnection>),
1622    Ssh {
1623        connection: SshConnection,
1624        key_1: SshKey,
1625        key_2: SshKey,
1626    },
1627    Aws(AwsConnection),
1628    AwsPrivatelink(AwsPrivatelinkConnection),
1629    MySql(MySqlConnection<ReferencedConnection>),
1630    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1631    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1632}
1633
1634impl ConnectionDetails {
1635    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1636        match self {
1637            ConnectionDetails::Kafka(c) => {
1638                mz_storage_types::connections::Connection::Kafka(c.clone())
1639            }
1640            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1641            ConnectionDetails::Postgres(c) => {
1642                mz_storage_types::connections::Connection::Postgres(c.clone())
1643            }
1644            ConnectionDetails::Ssh { connection, .. } => {
1645                mz_storage_types::connections::Connection::Ssh(connection.clone())
1646            }
1647            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1648            ConnectionDetails::AwsPrivatelink(c) => {
1649                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1650            }
1651            ConnectionDetails::MySql(c) => {
1652                mz_storage_types::connections::Connection::MySql(c.clone())
1653            }
1654            ConnectionDetails::SqlServer(c) => {
1655                mz_storage_types::connections::Connection::SqlServer(c.clone())
1656            }
1657            ConnectionDetails::IcebergCatalog(c) => {
1658                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1659            }
1660        }
1661    }
1662}
1663
1664#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1665pub struct NetworkPolicyRule {
1666    pub name: String,
1667    pub action: NetworkPolicyRuleAction,
1668    pub address: PolicyAddress,
1669    pub direction: NetworkPolicyRuleDirection,
1670}
1671
1672#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1673pub enum NetworkPolicyRuleAction {
1674    Allow,
1675}
1676
1677impl std::fmt::Display for NetworkPolicyRuleAction {
1678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679        match self {
1680            Self::Allow => write!(f, "allow"),
1681        }
1682    }
1683}
1684impl TryFrom<&str> for NetworkPolicyRuleAction {
1685    type Error = PlanError;
1686    fn try_from(value: &str) -> Result<Self, Self::Error> {
1687        match value.to_uppercase().as_str() {
1688            "ALLOW" => Ok(Self::Allow),
1689            _ => Err(PlanError::Unstructured(
1690                "Allow is the only valid option".into(),
1691            )),
1692        }
1693    }
1694}
1695
1696#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1697pub enum NetworkPolicyRuleDirection {
1698    Ingress,
1699}
1700impl std::fmt::Display for NetworkPolicyRuleDirection {
1701    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1702        match self {
1703            Self::Ingress => write!(f, "ingress"),
1704        }
1705    }
1706}
1707impl TryFrom<&str> for NetworkPolicyRuleDirection {
1708    type Error = PlanError;
1709    fn try_from(value: &str) -> Result<Self, Self::Error> {
1710        match value.to_uppercase().as_str() {
1711            "INGRESS" => Ok(Self::Ingress),
1712            _ => Err(PlanError::Unstructured(
1713                "Ingress is the only valid option".into(),
1714            )),
1715        }
1716    }
1717}
1718
1719#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1720pub struct PolicyAddress(pub IpNet);
1721impl std::fmt::Display for PolicyAddress {
1722    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1723        write!(f, "{}", &self.0.to_string())
1724    }
1725}
1726impl From<String> for PolicyAddress {
1727    fn from(value: String) -> Self {
1728        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1729    }
1730}
1731impl TryFrom<&str> for PolicyAddress {
1732    type Error = PlanError;
1733    fn try_from(value: &str) -> Result<Self, Self::Error> {
1734        let net = IpNet::from_str(value)
1735            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1736        Ok(Self(net))
1737    }
1738}
1739
1740impl Serialize for PolicyAddress {
1741    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1742    where
1743        S: serde::Serializer,
1744    {
1745        serializer.serialize_str(&format!("{}", &self.0))
1746    }
1747}
1748
1749#[derive(Clone, Debug, Serialize)]
1750pub enum SshKey {
1751    PublicOnly(String),
1752    Both(SshKeyPair),
1753}
1754
1755impl SshKey {
1756    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1757        match self {
1758            SshKey::PublicOnly(_) => None,
1759            SshKey::Both(key_pair) => Some(key_pair),
1760        }
1761    }
1762
1763    pub fn public_key(&self) -> String {
1764        match self {
1765            SshKey::PublicOnly(s) => s.into(),
1766            SshKey::Both(p) => p.ssh_public_key(),
1767        }
1768    }
1769}
1770
1771#[derive(Clone, Debug)]
1772pub struct Secret {
1773    pub create_sql: String,
1774    pub secret_as: MirScalarExpr,
1775}
1776
1777#[derive(Clone, Debug)]
1778pub struct Sink {
1779    pub create_sql: String,
1781    pub from: GlobalId,
1783    pub connection: StorageSinkConnection<ReferencedConnection>,
1785    pub envelope: SinkEnvelope,
1787    pub version: u64,
1788}
1789
1790#[derive(Clone, Debug)]
1791pub struct View {
1792    pub create_sql: String,
1794    pub expr: HirRelationExpr,
1796    pub dependencies: DependencyIds,
1798    pub column_names: Vec<ColumnName>,
1800    pub temporary: bool,
1802}
1803
1804#[derive(Clone, Debug)]
1805pub struct MaterializedView {
1806    pub create_sql: String,
1808    pub expr: HirRelationExpr,
1810    pub dependencies: DependencyIds,
1812    pub column_names: Vec<ColumnName>,
1814    pub cluster_id: ClusterId,
1816    pub non_null_assertions: Vec<usize>,
1817    pub compaction_window: Option<CompactionWindow>,
1818    pub refresh_schedule: Option<RefreshSchedule>,
1819    pub as_of: Option<Timestamp>,
1820}
1821
1822#[derive(Clone, Debug)]
1823pub struct Index {
1824    pub create_sql: String,
1826    pub on: GlobalId,
1828    pub keys: Vec<mz_expr::MirScalarExpr>,
1829    pub compaction_window: Option<CompactionWindow>,
1830    pub cluster_id: ClusterId,
1831}
1832
1833#[derive(Clone, Debug)]
1834pub struct Type {
1835    pub create_sql: String,
1836    pub inner: CatalogType<IdReference>,
1837}
1838
1839#[derive(Deserialize, Clone, Debug, PartialEq)]
1841pub enum QueryWhen {
1842    Immediately,
1845    FreshestTableWrite,
1848    AtTimestamp(Timestamp),
1853    AtLeastTimestamp(Timestamp),
1856}
1857
1858impl QueryWhen {
1859    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1861        match self {
1862            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1863            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1864        }
1865    }
1866    pub fn constrains_upper(&self) -> bool {
1870        match self {
1871            QueryWhen::AtTimestamp(_) => true,
1872            QueryWhen::AtLeastTimestamp(_)
1873            | QueryWhen::Immediately
1874            | QueryWhen::FreshestTableWrite => false,
1875        }
1876    }
1877    pub fn advance_to_since(&self) -> bool {
1879        match self {
1880            QueryWhen::Immediately
1881            | QueryWhen::AtLeastTimestamp(_)
1882            | QueryWhen::FreshestTableWrite => true,
1883            QueryWhen::AtTimestamp(_) => false,
1884        }
1885    }
1886    pub fn can_advance_to_upper(&self) -> bool {
1888        match self {
1889            QueryWhen::Immediately => true,
1890            QueryWhen::FreshestTableWrite
1891            | QueryWhen::AtTimestamp(_)
1892            | QueryWhen::AtLeastTimestamp(_) => false,
1893        }
1894    }
1895
1896    pub fn can_advance_to_timeline_ts(&self) -> bool {
1898        match self {
1899            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1900            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1901        }
1902    }
1903    pub fn must_advance_to_timeline_ts(&self) -> bool {
1905        match self {
1906            QueryWhen::FreshestTableWrite => true,
1907            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1908                false
1909            }
1910        }
1911    }
1912    pub fn is_transactional(&self) -> bool {
1914        match self {
1915            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1916            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1917        }
1918    }
1919}
1920
1921#[derive(Debug, Copy, Clone)]
1922pub enum MutationKind {
1923    Insert,
1924    Update,
1925    Delete,
1926}
1927
1928#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1929pub enum CopyFormat {
1930    Text,
1931    Csv,
1932    Binary,
1933    Parquet,
1934}
1935
1936#[derive(Debug, Copy, Clone)]
1937pub enum ExecuteTimeout {
1938    None,
1939    Seconds(f64),
1940    WaitOnce,
1941}
1942
1943#[derive(Clone, Debug)]
1944pub enum IndexOption {
1945    RetainHistory(CompactionWindow),
1947}
1948
1949#[derive(Clone, Debug)]
1950pub enum TableOption {
1951    RetainHistory(CompactionWindow),
1953}
1954
1955#[derive(Clone, Debug)]
1956pub struct PlanClusterOption {
1957    pub availability_zones: AlterOptionParameter<Vec<String>>,
1958    pub introspection_debugging: AlterOptionParameter<bool>,
1959    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
1960    pub managed: AlterOptionParameter<bool>,
1961    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
1962    pub replication_factor: AlterOptionParameter<u32>,
1963    pub size: AlterOptionParameter,
1964    pub schedule: AlterOptionParameter<ClusterSchedule>,
1965    pub workload_class: AlterOptionParameter<Option<String>>,
1966}
1967
1968impl Default for PlanClusterOption {
1969    fn default() -> Self {
1970        Self {
1971            availability_zones: AlterOptionParameter::Unchanged,
1972            introspection_debugging: AlterOptionParameter::Unchanged,
1973            introspection_interval: AlterOptionParameter::Unchanged,
1974            managed: AlterOptionParameter::Unchanged,
1975            replicas: AlterOptionParameter::Unchanged,
1976            replication_factor: AlterOptionParameter::Unchanged,
1977            size: AlterOptionParameter::Unchanged,
1978            schedule: AlterOptionParameter::Unchanged,
1979            workload_class: AlterOptionParameter::Unchanged,
1980        }
1981    }
1982}
1983
1984#[derive(Clone, Debug, PartialEq, Eq)]
1985pub enum AlterClusterPlanStrategy {
1986    None,
1987    For(Duration),
1988    UntilReady {
1989        on_timeout: OnTimeoutAction,
1990        timeout: Duration,
1991    },
1992}
1993
1994#[derive(Clone, Debug, PartialEq, Eq)]
1995pub enum OnTimeoutAction {
1996    Commit,
1997    Rollback,
1998}
1999
2000impl Default for OnTimeoutAction {
2001    fn default() -> Self {
2002        Self::Commit
2003    }
2004}
2005
2006impl TryFrom<&str> for OnTimeoutAction {
2007    type Error = PlanError;
2008    fn try_from(value: &str) -> Result<Self, Self::Error> {
2009        match value.to_uppercase().as_str() {
2010            "COMMIT" => Ok(Self::Commit),
2011            "ROLLBACK" => Ok(Self::Rollback),
2012            _ => Err(PlanError::Unstructured(
2013                "Valid options are COMMIT, ROLLBACK".into(),
2014            )),
2015        }
2016    }
2017}
2018
2019impl AlterClusterPlanStrategy {
2020    pub fn is_none(&self) -> bool {
2021        matches!(self, Self::None)
2022    }
2023    pub fn is_some(&self) -> bool {
2024        !matches!(self, Self::None)
2025    }
2026}
2027
2028impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2029    type Error = PlanError;
2030
2031    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2032        Ok(match value.wait {
2033            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2034            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2035                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2036                Self::UntilReady {
2037                    timeout: match extracted.timeout {
2038                        Some(d) => d,
2039                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2040                    },
2041                    on_timeout: match extracted.on_timeout {
2042                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2043                            PlanError::InvalidOptionValue {
2044                                option_name: "ON TIMEOUT".into(),
2045                                err: Box::new(e),
2046                            }
2047                        })?,
2048                        None => OnTimeoutAction::default(),
2049                    },
2050                }
2051            }
2052            None => Self::None,
2053        })
2054    }
2055}
2056
2057#[derive(Debug, Clone)]
2059pub struct Params {
2060    pub datums: Row,
2062    pub execute_types: Vec<SqlScalarType>,
2064    pub expected_types: Vec<SqlScalarType>,
2066}
2067
2068impl Params {
2069    pub fn empty() -> Params {
2071        Params {
2072            datums: Row::pack_slice(&[]),
2073            execute_types: vec![],
2074            expected_types: vec![],
2075        }
2076    }
2077}
2078
2079#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Copy)]
2081pub struct PlanContext {
2082    pub wall_time: DateTime<Utc>,
2083    pub ignore_if_exists_errors: bool,
2084}
2085
2086impl PlanContext {
2087    pub fn new(wall_time: DateTime<Utc>) -> Self {
2088        Self {
2089            wall_time,
2090            ignore_if_exists_errors: false,
2091        }
2092    }
2093
2094    pub fn zero() -> Self {
2098        PlanContext {
2099            wall_time: now::to_datetime(NOW_ZERO()),
2100            ignore_if_exists_errors: false,
2101        }
2102    }
2103
2104    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2105        self.ignore_if_exists_errors = value;
2106        self
2107    }
2108}