Skip to main content

mz_sql/
plan.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL planning.
11//!
12//! SQL planning is the process of taking the abstract syntax tree of a
13//! [`Statement`] and turning it into a [`Plan`] that the dataflow layer can
14//! execute.
15//!
16//! Statements must be purified before they can be planned. See the
17//! [`pure`](crate::pure) module for details.
18
19// Internal module layout.
20//
21// The entry point for planning is `statement::handle_statement`. That function
22// dispatches to a more specific `handle` function for the particular statement
23// type. For most statements, this `handle` function is uninteresting and short,
24// but anything involving a `SELECT` statement gets complicated. `SELECT`
25// queries wind through the functions in the `query` module, starting with
26// `plan_root_query` and fanning out based on the contents of the `SELECT`
27// statement.
28
29use std::collections::{BTreeMap, BTreeSet};
30use std::num::NonZeroUsize;
31use std::str::FromStr;
32use std::time::Duration;
33
34use chrono::{DateTime, Utc};
35use enum_kinds::EnumKind;
36use ipnet::IpNet;
37use maplit::btreeset;
38use mz_adapter_types::compaction::CompactionWindow;
39use mz_controller_types::{ClusterId, ReplicaId};
40use mz_expr::{CollectionPlan, ColumnOrder, MapFilterProject, MirScalarExpr, RowSetFinishing};
41use mz_ore::now::{self, NOW_ZERO};
42use mz_pgcopy::CopyFormatParams;
43use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
44use mz_repr::explain::{ExplainConfig, ExplainFormat};
45use mz_repr::network_policy_id::NetworkPolicyId;
46use mz_repr::optimize::OptimizerFeatureOverrides;
47use mz_repr::refresh_schedule::RefreshSchedule;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, ColumnIndex, ColumnName, Diff, GlobalId, RelationDesc, ReprColumnType, Row,
51    SqlColumnType, SqlRelationType, SqlScalarType, Timestamp, VersionedRelationDesc,
52};
53use mz_sql_parser::ast::{
54    AlterSourceAddSubsourceOption, ClusterAlterOptionValue, ConnectionOptionName, QualifiedReplica,
55    RawDataType, SelectStatement, TransactionIsolationLevel, TransactionMode, UnresolvedItemName,
56    Value, WithOptionValue,
57};
58use mz_ssh_util::keys::SshKeyPair;
59use mz_storage_types::connections::aws::AwsConnection;
60use mz_storage_types::connections::inline::ReferencedConnection;
61use mz_storage_types::connections::{
62    AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection,
63    MySqlConnection, PostgresConnection, SqlServerConnectionDetails, SshConnection,
64};
65use mz_storage_types::instances::StorageInstanceId;
66use mz_storage_types::sinks::{S3SinkFormat, SinkEnvelope, StorageSinkConnection};
67use mz_storage_types::sources::{
68    SourceDesc, SourceExportDataConfig, SourceExportDetails, Timeline,
69};
70use proptest_derive::Arbitrary;
71use serde::{Deserialize, Serialize};
72
73use crate::ast::{
74    ExplainStage, Expr, FetchDirection, NoticeSeverity, Raw, Statement, StatementKind,
75    TransactionAccessMode,
76};
77use crate::catalog::{
78    CatalogType, DefaultPrivilegeAclItem, DefaultPrivilegeObject, IdReference, ObjectType,
79    RoleAttributesRaw,
80};
81use crate::names::{
82    Aug, CommentObjectId, DependencyIds, FullItemName, ObjectId, QualifiedItemName,
83    ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier, SystemObjectId,
84};
85
86pub(crate) mod error;
87pub(crate) mod explain;
88pub(crate) mod hir;
89pub(crate) mod literal;
90pub(crate) mod lowering;
91pub(crate) mod notice;
92pub(crate) mod plan_utils;
93pub(crate) mod query;
94pub(crate) mod scope;
95pub(crate) mod side_effecting_func;
96pub(crate) mod statement;
97pub(crate) mod transform_ast;
98pub(crate) mod transform_hir;
99pub(crate) mod typeconv;
100pub(crate) mod with_options;
101
102use crate::plan;
103use crate::plan::statement::ddl::ClusterAlterUntilReadyOptionExtracted;
104use crate::plan::with_options::OptionalDuration;
105pub use error::PlanError;
106pub use explain::normalize_subqueries;
107pub use hir::{
108    AggregateExpr, CoercibleScalarExpr, Hir, HirRelationExpr, HirScalarExpr, JoinKind,
109    WindowExprType,
110};
111pub use lowering::Config as HirToMirConfig;
112pub use notice::PlanNotice;
113pub use query::{ExprContext, QueryContext, QueryLifetime};
114pub use scope::Scope;
115pub use side_effecting_func::SideEffectingFunc;
116pub use statement::ddl::{
117    AlterSourceAddSubsourceOptionExtracted, MySqlConfigOptionExtracted, PgConfigOptionExtracted,
118    PlannedAlterRoleOption, PlannedRoleAttributes, PlannedRoleVariable,
119    SqlServerConfigOptionExtracted,
120};
121pub use statement::{
122    StatementClassification, StatementContext, StatementDesc, describe, plan, plan_copy_from,
123    resolve_cluster_for_materialized_view,
124};
125
126use self::statement::ddl::ClusterAlterOptionExtracted;
127use self::with_options::TryFromValue;
128
129/// Instructions for executing a SQL query.
130#[derive(Debug, EnumKind)]
131#[enum_kind(PlanKind)]
132pub enum Plan {
133    CreateConnection(CreateConnectionPlan),
134    CreateDatabase(CreateDatabasePlan),
135    CreateSchema(CreateSchemaPlan),
136    CreateRole(CreateRolePlan),
137    CreateCluster(CreateClusterPlan),
138    CreateClusterReplica(CreateClusterReplicaPlan),
139    CreateSource(CreateSourcePlan),
140    CreateSources(Vec<CreateSourcePlanBundle>),
141    CreateSecret(CreateSecretPlan),
142    CreateSink(CreateSinkPlan),
143    CreateTable(CreateTablePlan),
144    CreateView(CreateViewPlan),
145    CreateMaterializedView(CreateMaterializedViewPlan),
146    CreateNetworkPolicy(CreateNetworkPolicyPlan),
147    CreateIndex(CreateIndexPlan),
148    CreateType(CreateTypePlan),
149    Comment(CommentPlan),
150    DiscardTemp,
151    DiscardAll,
152    DropObjects(DropObjectsPlan),
153    DropOwned(DropOwnedPlan),
154    EmptyQuery,
155    ShowAllVariables,
156    ShowCreate(ShowCreatePlan),
157    ShowColumns(ShowColumnsPlan),
158    ShowVariable(ShowVariablePlan),
159    InspectShard(InspectShardPlan),
160    SetVariable(SetVariablePlan),
161    ResetVariable(ResetVariablePlan),
162    SetTransaction(SetTransactionPlan),
163    StartTransaction(StartTransactionPlan),
164    CommitTransaction(CommitTransactionPlan),
165    AbortTransaction(AbortTransactionPlan),
166    Select(SelectPlan),
167    Subscribe(SubscribePlan),
168    CopyFrom(CopyFromPlan),
169    CopyTo(CopyToPlan),
170    ExplainPlan(ExplainPlanPlan),
171    ExplainPushdown(ExplainPushdownPlan),
172    ExplainTimestamp(ExplainTimestampPlan),
173    ExplainSinkSchema(ExplainSinkSchemaPlan),
174    Insert(InsertPlan),
175    AlterCluster(AlterClusterPlan),
176    AlterClusterSwap(AlterClusterSwapPlan),
177    AlterNoop(AlterNoopPlan),
178    AlterSetCluster(AlterSetClusterPlan),
179    AlterConnection(AlterConnectionPlan),
180    AlterSource(AlterSourcePlan),
181    AlterClusterRename(AlterClusterRenamePlan),
182    AlterClusterReplicaRename(AlterClusterReplicaRenamePlan),
183    AlterItemRename(AlterItemRenamePlan),
184    AlterSchemaRename(AlterSchemaRenamePlan),
185    AlterSchemaSwap(AlterSchemaSwapPlan),
186    AlterSecret(AlterSecretPlan),
187    AlterSink(AlterSinkPlan),
188    AlterSystemSet(AlterSystemSetPlan),
189    AlterSystemReset(AlterSystemResetPlan),
190    AlterSystemResetAll(AlterSystemResetAllPlan),
191    AlterRole(AlterRolePlan),
192    AlterOwner(AlterOwnerPlan),
193    AlterTableAddColumn(AlterTablePlan),
194    AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan),
195    AlterNetworkPolicy(AlterNetworkPolicyPlan),
196    Declare(DeclarePlan),
197    Fetch(FetchPlan),
198    Close(ClosePlan),
199    ReadThenWrite(ReadThenWritePlan),
200    Prepare(PreparePlan),
201    Execute(ExecutePlan),
202    Deallocate(DeallocatePlan),
203    Raise(RaisePlan),
204    GrantRole(GrantRolePlan),
205    RevokeRole(RevokeRolePlan),
206    GrantPrivileges(GrantPrivilegesPlan),
207    RevokePrivileges(RevokePrivilegesPlan),
208    AlterDefaultPrivileges(AlterDefaultPrivilegesPlan),
209    ReassignOwned(ReassignOwnedPlan),
210    SideEffectingFunc(SideEffectingFunc),
211    ValidateConnection(ValidateConnectionPlan),
212    AlterRetainHistory(AlterRetainHistoryPlan),
213    AlterSourceTimestampInterval(AlterSourceTimestampIntervalPlan),
214}
215
216impl Plan {
217    /// Expresses which [`StatementKind`] can generate which set of
218    /// [`PlanKind`].
219    pub fn generated_from(stmt: &StatementKind) -> &'static [PlanKind] {
220        match stmt {
221            StatementKind::AlterCluster => &[PlanKind::AlterNoop, PlanKind::AlterCluster],
222            StatementKind::AlterConnection => &[PlanKind::AlterNoop, PlanKind::AlterConnection],
223            StatementKind::AlterDefaultPrivileges => &[PlanKind::AlterDefaultPrivileges],
224            StatementKind::AlterIndex => &[PlanKind::AlterRetainHistory, PlanKind::AlterNoop],
225            StatementKind::AlterObjectRename => &[
226                PlanKind::AlterClusterRename,
227                PlanKind::AlterClusterReplicaRename,
228                PlanKind::AlterItemRename,
229                PlanKind::AlterSchemaRename,
230                PlanKind::AlterNoop,
231            ],
232            StatementKind::AlterObjectSwap => &[
233                PlanKind::AlterClusterSwap,
234                PlanKind::AlterSchemaSwap,
235                PlanKind::AlterNoop,
236            ],
237            StatementKind::AlterRole => &[PlanKind::AlterRole],
238            StatementKind::AlterNetworkPolicy => &[PlanKind::AlterNetworkPolicy],
239            StatementKind::AlterSecret => &[PlanKind::AlterNoop, PlanKind::AlterSecret],
240            StatementKind::AlterSetCluster => &[PlanKind::AlterNoop, PlanKind::AlterSetCluster],
241            StatementKind::AlterSink => &[PlanKind::AlterNoop, PlanKind::AlterSink],
242            StatementKind::AlterSource => &[
243                PlanKind::AlterNoop,
244                PlanKind::AlterSource,
245                PlanKind::AlterRetainHistory,
246                PlanKind::AlterSourceTimestampInterval,
247            ],
248            StatementKind::AlterSystemReset => &[PlanKind::AlterNoop, PlanKind::AlterSystemReset],
249            StatementKind::AlterSystemResetAll => {
250                &[PlanKind::AlterNoop, PlanKind::AlterSystemResetAll]
251            }
252            StatementKind::AlterSystemSet => &[PlanKind::AlterNoop, PlanKind::AlterSystemSet],
253            StatementKind::AlterOwner => &[PlanKind::AlterNoop, PlanKind::AlterOwner],
254            StatementKind::AlterTableAddColumn => {
255                &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn]
256            }
257            StatementKind::AlterMaterializedViewApplyReplacement => &[
258                PlanKind::AlterNoop,
259                PlanKind::AlterMaterializedViewApplyReplacement,
260            ],
261            StatementKind::Close => &[PlanKind::Close],
262            StatementKind::Comment => &[PlanKind::Comment],
263            StatementKind::Commit => &[PlanKind::CommitTransaction],
264            StatementKind::Copy => &[
265                PlanKind::CopyFrom,
266                PlanKind::Select,
267                PlanKind::Subscribe,
268                PlanKind::CopyTo,
269            ],
270            StatementKind::CreateCluster => &[PlanKind::CreateCluster],
271            StatementKind::CreateClusterReplica => &[PlanKind::CreateClusterReplica],
272            StatementKind::CreateConnection => &[PlanKind::CreateConnection],
273            StatementKind::CreateDatabase => &[PlanKind::CreateDatabase],
274            StatementKind::CreateIndex => &[PlanKind::CreateIndex],
275            StatementKind::CreateNetworkPolicy => &[PlanKind::CreateNetworkPolicy],
276            StatementKind::CreateMaterializedView => &[PlanKind::CreateMaterializedView],
277            StatementKind::CreateRole => &[PlanKind::CreateRole],
278            StatementKind::CreateSchema => &[PlanKind::CreateSchema],
279            StatementKind::CreateSecret => &[PlanKind::CreateSecret],
280            StatementKind::CreateSink => &[PlanKind::CreateSink],
281            StatementKind::CreateSource | StatementKind::CreateSubsource => {
282                &[PlanKind::CreateSource]
283            }
284            StatementKind::CreateWebhookSource => &[PlanKind::CreateSource, PlanKind::CreateTable],
285            StatementKind::CreateTable => &[PlanKind::CreateTable],
286            StatementKind::CreateTableFromSource => &[PlanKind::CreateTable],
287            StatementKind::CreateType => &[PlanKind::CreateType],
288            StatementKind::CreateView => &[PlanKind::CreateView],
289            StatementKind::Deallocate => &[PlanKind::Deallocate],
290            StatementKind::Declare => &[PlanKind::Declare],
291            StatementKind::Delete => &[PlanKind::ReadThenWrite],
292            StatementKind::Discard => &[PlanKind::DiscardAll, PlanKind::DiscardTemp],
293            StatementKind::DropObjects => &[PlanKind::DropObjects],
294            StatementKind::DropOwned => &[PlanKind::DropOwned],
295            StatementKind::Execute => &[PlanKind::Execute],
296            StatementKind::ExplainPlan => &[PlanKind::ExplainPlan],
297            StatementKind::ExplainPushdown => &[PlanKind::ExplainPushdown],
298            StatementKind::ExplainAnalyzeObject => &[PlanKind::Select],
299            StatementKind::ExplainAnalyzeCluster => &[PlanKind::Select],
300            StatementKind::ExplainTimestamp => &[PlanKind::ExplainTimestamp],
301            StatementKind::ExplainSinkSchema => &[PlanKind::ExplainSinkSchema],
302            StatementKind::Fetch => &[PlanKind::Fetch],
303            StatementKind::GrantPrivileges => &[PlanKind::GrantPrivileges],
304            StatementKind::GrantRole => &[PlanKind::GrantRole],
305            StatementKind::Insert => &[PlanKind::Insert],
306            StatementKind::Prepare => &[PlanKind::Prepare],
307            StatementKind::Raise => &[PlanKind::Raise],
308            StatementKind::ReassignOwned => &[PlanKind::ReassignOwned],
309            StatementKind::ResetVariable => &[PlanKind::ResetVariable],
310            StatementKind::RevokePrivileges => &[PlanKind::RevokePrivileges],
311            StatementKind::RevokeRole => &[PlanKind::RevokeRole],
312            StatementKind::Rollback => &[PlanKind::AbortTransaction],
313            StatementKind::Select => &[PlanKind::Select, PlanKind::SideEffectingFunc],
314            StatementKind::SetTransaction => &[PlanKind::SetTransaction],
315            StatementKind::SetVariable => &[PlanKind::SetVariable],
316            StatementKind::Show => &[
317                PlanKind::Select,
318                PlanKind::ShowVariable,
319                PlanKind::ShowCreate,
320                PlanKind::ShowColumns,
321                PlanKind::ShowAllVariables,
322                PlanKind::InspectShard,
323            ],
324            StatementKind::StartTransaction => &[PlanKind::StartTransaction],
325            StatementKind::Subscribe => &[PlanKind::Subscribe],
326            StatementKind::Update => &[PlanKind::ReadThenWrite],
327            StatementKind::ValidateConnection => &[PlanKind::ValidateConnection],
328            StatementKind::AlterRetainHistory => &[PlanKind::AlterRetainHistory],
329            StatementKind::ExecuteUnitTest => &[],
330        }
331    }
332
333    /// Returns a human readable name of the plan. Meant for use in messages sent back to a user.
334    pub fn name(&self) -> &str {
335        match self {
336            Plan::CreateConnection(_) => "create connection",
337            Plan::CreateDatabase(_) => "create database",
338            Plan::CreateSchema(_) => "create schema",
339            Plan::CreateRole(_) => "create role",
340            Plan::CreateCluster(_) => "create cluster",
341            Plan::CreateClusterReplica(_) => "create cluster replica",
342            Plan::CreateSource(_) => "create source",
343            Plan::CreateSources(_) => "create source",
344            Plan::CreateSecret(_) => "create secret",
345            Plan::CreateSink(_) => "create sink",
346            Plan::CreateTable(_) => "create table",
347            Plan::CreateView(_) => "create view",
348            Plan::CreateMaterializedView(_) => "create materialized view",
349            Plan::CreateIndex(_) => "create index",
350            Plan::CreateType(_) => "create type",
351            Plan::CreateNetworkPolicy(_) => "create network policy",
352            Plan::Comment(_) => "comment",
353            Plan::DiscardTemp => "discard temp",
354            Plan::DiscardAll => "discard all",
355            Plan::DropObjects(plan) => match plan.object_type {
356                ObjectType::Table => "drop table",
357                ObjectType::View => "drop view",
358                ObjectType::MaterializedView => "drop materialized view",
359                ObjectType::Source => "drop source",
360                ObjectType::Sink => "drop sink",
361                ObjectType::Index => "drop index",
362                ObjectType::Type => "drop type",
363                ObjectType::Role => "drop roles",
364                ObjectType::Cluster => "drop clusters",
365                ObjectType::ClusterReplica => "drop cluster replicas",
366                ObjectType::Secret => "drop secret",
367                ObjectType::Connection => "drop connection",
368                ObjectType::Database => "drop database",
369                ObjectType::Schema => "drop schema",
370                ObjectType::Func => "drop function",
371                ObjectType::NetworkPolicy => "drop network policy",
372            },
373            Plan::DropOwned(_) => "drop owned",
374            Plan::EmptyQuery => "do nothing",
375            Plan::ShowAllVariables => "show all variables",
376            Plan::ShowCreate(_) => "show create",
377            Plan::ShowColumns(_) => "show columns",
378            Plan::ShowVariable(_) => "show variable",
379            Plan::InspectShard(_) => "inspect shard",
380            Plan::SetVariable(_) => "set variable",
381            Plan::ResetVariable(_) => "reset variable",
382            Plan::SetTransaction(_) => "set transaction",
383            Plan::StartTransaction(_) => "start transaction",
384            Plan::CommitTransaction(_) => "commit",
385            Plan::AbortTransaction(_) => "abort",
386            Plan::Select(_) => "select",
387            Plan::Subscribe(_) => "subscribe",
388            Plan::CopyFrom(_) => "copy from",
389            Plan::CopyTo(_) => "copy to",
390            Plan::ExplainPlan(_) => "explain plan",
391            Plan::ExplainPushdown(_) => "EXPLAIN FILTER PUSHDOWN",
392            Plan::ExplainTimestamp(_) => "explain timestamp",
393            Plan::ExplainSinkSchema(_) => "explain schema",
394            Plan::Insert(_) => "insert",
395            Plan::AlterNoop(plan) => match plan.object_type {
396                ObjectType::Table => "alter table",
397                ObjectType::View => "alter view",
398                ObjectType::MaterializedView => "alter materialized view",
399                ObjectType::Source => "alter source",
400                ObjectType::Sink => "alter sink",
401                ObjectType::Index => "alter index",
402                ObjectType::Type => "alter type",
403                ObjectType::Role => "alter role",
404                ObjectType::Cluster => "alter cluster",
405                ObjectType::ClusterReplica => "alter cluster replica",
406                ObjectType::Secret => "alter secret",
407                ObjectType::Connection => "alter connection",
408                ObjectType::Database => "alter database",
409                ObjectType::Schema => "alter schema",
410                ObjectType::Func => "alter function",
411                ObjectType::NetworkPolicy => "alter network policy",
412            },
413            Plan::AlterCluster(_) => "alter cluster",
414            Plan::AlterClusterRename(_) => "alter cluster rename",
415            Plan::AlterClusterSwap(_) => "alter cluster swap",
416            Plan::AlterClusterReplicaRename(_) => "alter cluster replica rename",
417            Plan::AlterSetCluster(_) => "alter set cluster",
418            Plan::AlterConnection(_) => "alter connection",
419            Plan::AlterSource(_) => "alter source",
420            Plan::AlterItemRename(_) => "rename item",
421            Plan::AlterSchemaRename(_) => "alter rename schema",
422            Plan::AlterSchemaSwap(_) => "alter swap schema",
423            Plan::AlterSecret(_) => "alter secret",
424            Plan::AlterSink(_) => "alter sink",
425            Plan::AlterSystemSet(_) => "alter system",
426            Plan::AlterSystemReset(_) => "alter system",
427            Plan::AlterSystemResetAll(_) => "alter system",
428            Plan::AlterRole(_) => "alter role",
429            Plan::AlterNetworkPolicy(_) => "alter network policy",
430            Plan::AlterOwner(plan) => match plan.object_type {
431                ObjectType::Table => "alter table owner",
432                ObjectType::View => "alter view owner",
433                ObjectType::MaterializedView => "alter materialized view owner",
434                ObjectType::Source => "alter source owner",
435                ObjectType::Sink => "alter sink owner",
436                ObjectType::Index => "alter index owner",
437                ObjectType::Type => "alter type owner",
438                ObjectType::Role => "alter role owner",
439                ObjectType::Cluster => "alter cluster owner",
440                ObjectType::ClusterReplica => "alter cluster replica owner",
441                ObjectType::Secret => "alter secret owner",
442                ObjectType::Connection => "alter connection owner",
443                ObjectType::Database => "alter database owner",
444                ObjectType::Schema => "alter schema owner",
445                ObjectType::Func => "alter function owner",
446                ObjectType::NetworkPolicy => "alter network policy owner",
447            },
448            Plan::AlterTableAddColumn(_) => "alter table add column",
449            Plan::AlterMaterializedViewApplyReplacement(_) => {
450                "alter materialized view apply replacement"
451            }
452            Plan::Declare(_) => "declare",
453            Plan::Fetch(_) => "fetch",
454            Plan::Close(_) => "close",
455            Plan::ReadThenWrite(plan) => match plan.kind {
456                MutationKind::Insert => "insert into select",
457                MutationKind::Update => "update",
458                MutationKind::Delete => "delete",
459            },
460            Plan::Prepare(_) => "prepare",
461            Plan::Execute(_) => "execute",
462            Plan::Deallocate(_) => "deallocate",
463            Plan::Raise(_) => "raise",
464            Plan::GrantRole(_) => "grant role",
465            Plan::RevokeRole(_) => "revoke role",
466            Plan::GrantPrivileges(_) => "grant privilege",
467            Plan::RevokePrivileges(_) => "revoke privilege",
468            Plan::AlterDefaultPrivileges(_) => "alter default privileges",
469            Plan::ReassignOwned(_) => "reassign owned",
470            Plan::SideEffectingFunc(_) => "side effecting func",
471            Plan::ValidateConnection(_) => "validate connection",
472            Plan::AlterRetainHistory(_) => "alter retain history",
473            Plan::AlterSourceTimestampInterval(_) => "alter source timestamp interval",
474        }
475    }
476
477    /// Returns `true` iff this `Plan` is allowed to be executed in read-only
478    /// mode.
479    ///
480    /// We use an explicit allow-list, to avoid future additions automatically
481    /// falling into the `true` category.
482    pub fn allowed_in_read_only(&self) -> bool {
483        match self {
484            // These two set non-durable session variables, so are okay in
485            // read-only mode.
486            Plan::SetVariable(_) => true,
487            Plan::ResetVariable(_) => true,
488            Plan::SetTransaction(_) => true,
489            Plan::StartTransaction(_) => true,
490            Plan::CommitTransaction(_) => true,
491            Plan::AbortTransaction(_) => true,
492            Plan::Select(_) => true,
493            Plan::EmptyQuery => true,
494            Plan::ShowAllVariables => true,
495            Plan::ShowCreate(_) => true,
496            Plan::ShowColumns(_) => true,
497            Plan::ShowVariable(_) => true,
498            Plan::InspectShard(_) => true,
499            Plan::Subscribe(_) => true,
500            Plan::CopyTo(_) => true,
501            Plan::ExplainPlan(_) => true,
502            Plan::ExplainPushdown(_) => true,
503            Plan::ExplainTimestamp(_) => true,
504            Plan::ExplainSinkSchema(_) => true,
505            Plan::ValidateConnection(_) => true,
506            _ => false,
507        }
508    }
509}
510
511#[derive(Debug)]
512pub struct StartTransactionPlan {
513    pub access: Option<TransactionAccessMode>,
514    pub isolation_level: Option<TransactionIsolationLevel>,
515}
516
517#[derive(Debug)]
518pub enum TransactionType {
519    Explicit,
520    Implicit,
521}
522
523impl TransactionType {
524    pub fn is_explicit(&self) -> bool {
525        matches!(self, TransactionType::Explicit)
526    }
527
528    pub fn is_implicit(&self) -> bool {
529        matches!(self, TransactionType::Implicit)
530    }
531}
532
533#[derive(Debug)]
534pub struct CommitTransactionPlan {
535    pub transaction_type: TransactionType,
536}
537
538#[derive(Debug)]
539pub struct AbortTransactionPlan {
540    pub transaction_type: TransactionType,
541}
542
543#[derive(Debug)]
544pub struct CreateDatabasePlan {
545    pub name: String,
546    pub if_not_exists: bool,
547}
548
549#[derive(Debug)]
550pub struct CreateSchemaPlan {
551    pub database_spec: ResolvedDatabaseSpecifier,
552    pub schema_name: String,
553    pub if_not_exists: bool,
554}
555
556#[derive(Debug)]
557pub struct CreateRolePlan {
558    pub name: String,
559    pub attributes: RoleAttributesRaw,
560}
561
562#[derive(Debug, PartialEq, Eq, Clone)]
563pub struct CreateClusterPlan {
564    pub name: String,
565    pub variant: CreateClusterVariant,
566    pub workload_class: Option<String>,
567}
568
569#[derive(Debug, PartialEq, Eq, Clone)]
570pub enum CreateClusterVariant {
571    Managed(CreateClusterManagedPlan),
572    Unmanaged(CreateClusterUnmanagedPlan),
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub struct CreateClusterUnmanagedPlan {
577    pub replicas: Vec<(String, ReplicaConfig)>,
578}
579
580#[derive(Debug, PartialEq, Eq, Clone)]
581pub struct CreateClusterManagedPlan {
582    pub replication_factor: u32,
583    pub size: String,
584    pub availability_zones: Vec<String>,
585    pub compute: ComputeReplicaConfig,
586    pub optimizer_feature_overrides: OptimizerFeatureOverrides,
587    pub schedule: ClusterSchedule,
588}
589
590#[derive(Debug)]
591pub struct CreateClusterReplicaPlan {
592    pub cluster_id: ClusterId,
593    pub name: String,
594    pub config: ReplicaConfig,
595}
596
597/// Configuration of introspection for a cluster replica.
598#[derive(
599    Clone,
600    Copy,
601    Debug,
602    Serialize,
603    Deserialize,
604    PartialOrd,
605    Ord,
606    PartialEq,
607    Eq
608)]
609pub struct ComputeReplicaIntrospectionConfig {
610    /// Whether to introspect the introspection.
611    pub debugging: bool,
612    /// The interval at which to introspect.
613    pub interval: Duration,
614}
615
616#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
617pub struct ComputeReplicaConfig {
618    pub introspection: Option<ComputeReplicaIntrospectionConfig>,
619}
620
621#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
622pub enum ReplicaConfig {
623    Unorchestrated {
624        storagectl_addrs: Vec<String>,
625        computectl_addrs: Vec<String>,
626        compute: ComputeReplicaConfig,
627    },
628    Orchestrated {
629        size: String,
630        availability_zone: Option<String>,
631        compute: ComputeReplicaConfig,
632        internal: bool,
633        billed_as: Option<String>,
634    },
635}
636
637#[derive(Clone, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)]
638pub enum ClusterSchedule {
639    /// The system won't automatically turn the cluster On or Off.
640    Manual,
641    /// The cluster will be On when a REFRESH materialized view on it needs to refresh.
642    /// `hydration_time_estimate` determines how much time before a refresh to turn the
643    /// cluster On, so that it can rehydrate already before the refresh time.
644    Refresh { hydration_time_estimate: Duration },
645}
646
647impl Default for ClusterSchedule {
648    fn default() -> Self {
649        // (Has to be consistent with `impl Default for ClusterScheduleOptionValue`.)
650        ClusterSchedule::Manual
651    }
652}
653
654#[derive(Debug)]
655pub struct CreateSourcePlan {
656    pub name: QualifiedItemName,
657    pub source: Source,
658    pub if_not_exists: bool,
659    pub timeline: Timeline,
660    // None for subsources, which run on the parent cluster.
661    pub in_cluster: Option<ClusterId>,
662}
663
664#[derive(Clone, Debug, PartialEq, Eq)]
665pub struct SourceReferences {
666    pub updated_at: u64,
667    pub references: Vec<SourceReference>,
668}
669
670/// An available external reference for a source and if possible to retrieve,
671/// any column names it contains.
672#[derive(Clone, Debug, PartialEq, Eq)]
673pub struct SourceReference {
674    pub name: String,
675    pub namespace: Option<String>,
676    pub columns: Vec<String>,
677}
678
679/// A [`CreateSourcePlan`] and the metadata necessary to sequence it.
680#[derive(Debug)]
681pub struct CreateSourcePlanBundle {
682    /// ID of this source in the Catalog.
683    pub item_id: CatalogItemId,
684    /// ID used to reference this source from outside the catalog, e.g. compute.
685    pub global_id: GlobalId,
686    /// Details of the source to create.
687    pub plan: CreateSourcePlan,
688    /// Other catalog objects that are referenced by this source, determined at name resolution.
689    pub resolved_ids: ResolvedIds,
690    /// All the available upstream references for this source.
691    /// Populated for top-level sources that can contain subsources/tables
692    /// and used during sequencing to populate the appropriate catalog fields.
693    pub available_source_references: Option<SourceReferences>,
694}
695
696#[derive(Debug)]
697pub struct CreateConnectionPlan {
698    pub name: QualifiedItemName,
699    pub if_not_exists: bool,
700    pub connection: Connection,
701    pub validate: bool,
702}
703
704#[derive(Debug)]
705pub struct ValidateConnectionPlan {
706    /// ID of the connection in the Catalog.
707    pub id: CatalogItemId,
708    /// The connection to validate.
709    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
710}
711
712#[derive(Debug)]
713pub struct CreateSecretPlan {
714    pub name: QualifiedItemName,
715    pub secret: Secret,
716    pub if_not_exists: bool,
717}
718
719#[derive(Debug)]
720pub struct CreateSinkPlan {
721    pub name: QualifiedItemName,
722    pub sink: Sink,
723    pub with_snapshot: bool,
724    pub if_not_exists: bool,
725    pub in_cluster: ClusterId,
726}
727
728#[derive(Debug)]
729pub struct CreateTablePlan {
730    pub name: QualifiedItemName,
731    pub table: Table,
732    pub if_not_exists: bool,
733}
734
735#[derive(Debug, Clone)]
736pub struct CreateViewPlan {
737    pub name: QualifiedItemName,
738    pub view: View,
739    /// The Catalog objects that this view is replacing, if any.
740    pub replace: Option<CatalogItemId>,
741    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
742    pub drop_ids: Vec<CatalogItemId>,
743    pub if_not_exists: bool,
744    /// True if the view contains an expression that can make the exact column list
745    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
746    pub ambiguous_columns: bool,
747}
748
749#[derive(Debug, Clone)]
750pub struct CreateMaterializedViewPlan {
751    pub name: QualifiedItemName,
752    pub materialized_view: MaterializedView,
753    /// The Catalog objects that this materialized view is replacing, if any.
754    pub replace: Option<CatalogItemId>,
755    /// The Catalog objects that need to be dropped. This includes `replace` and any dependents.
756    pub drop_ids: Vec<CatalogItemId>,
757    pub if_not_exists: bool,
758    /// True if the materialized view contains an expression that can make the exact column list
759    /// ambiguous. For example `NATURAL JOIN` or `SELECT *`.
760    pub ambiguous_columns: bool,
761}
762
763#[derive(Debug, Clone)]
764pub struct CreateNetworkPolicyPlan {
765    pub name: String,
766    pub rules: Vec<NetworkPolicyRule>,
767}
768
769#[derive(Debug, Clone)]
770pub struct AlterNetworkPolicyPlan {
771    pub id: NetworkPolicyId,
772    pub name: String,
773    pub rules: Vec<NetworkPolicyRule>,
774}
775
776#[derive(Debug, Clone)]
777pub struct CreateIndexPlan {
778    pub name: QualifiedItemName,
779    pub index: Index,
780    pub if_not_exists: bool,
781}
782
783#[derive(Debug)]
784pub struct CreateTypePlan {
785    pub name: QualifiedItemName,
786    pub typ: Type,
787}
788
789#[derive(Debug)]
790pub struct DropObjectsPlan {
791    /// The IDs of only the objects directly referenced in the `DROP` statement.
792    pub referenced_ids: Vec<ObjectId>,
793    /// All object IDs to drop. Includes `referenced_ids` and all descendants.
794    pub drop_ids: Vec<ObjectId>,
795    /// The type of object that was dropped explicitly in the DROP statement. `ids` may contain
796    /// objects of different types due to CASCADE.
797    pub object_type: ObjectType,
798}
799
800#[derive(Debug)]
801pub struct DropOwnedPlan {
802    /// The role IDs that own the objects.
803    pub role_ids: Vec<RoleId>,
804    /// All object IDs to drop.
805    pub drop_ids: Vec<ObjectId>,
806    /// The privileges to revoke.
807    pub privilege_revokes: Vec<(SystemObjectId, MzAclItem)>,
808    /// The default privileges to revoke.
809    pub default_privilege_revokes: Vec<(DefaultPrivilegeObject, DefaultPrivilegeAclItem)>,
810}
811
812#[derive(Debug)]
813pub struct ShowVariablePlan {
814    pub name: String,
815}
816
817#[derive(Debug)]
818pub struct InspectShardPlan {
819    /// ID of the storage collection to inspect.
820    pub id: GlobalId,
821}
822
823#[derive(Debug)]
824pub struct SetVariablePlan {
825    pub name: String,
826    pub value: VariableValue,
827    pub local: bool,
828}
829
830#[derive(Debug)]
831pub enum VariableValue {
832    Default,
833    Values(Vec<String>),
834}
835
836#[derive(Debug)]
837pub struct ResetVariablePlan {
838    pub name: String,
839}
840
841#[derive(Debug)]
842pub struct SetTransactionPlan {
843    pub local: bool,
844    pub modes: Vec<TransactionMode>,
845}
846
847/// A plan for select statements.
848#[derive(Clone, Debug)]
849pub struct SelectPlan {
850    /// The `SELECT` statement itself. Used for explain/notices, but not otherwise
851    /// load-bearing. Boxed to save stack space.
852    pub select: Option<Box<SelectStatement<Aug>>>,
853    /// The plan as a HIR.
854    pub source: HirRelationExpr,
855    /// At what time should this select happen?
856    pub when: QueryWhen,
857    /// Instructions how to form the result set.
858    pub finishing: RowSetFinishing,
859    /// For `COPY TO STDOUT`, the format to use.
860    pub copy_to: Option<CopyFormat>,
861}
862
863impl SelectPlan {
864    pub fn immediate(rows: Vec<Row>, typ: SqlRelationType) -> Self {
865        let arity = typ.arity();
866        SelectPlan {
867            select: None,
868            source: HirRelationExpr::Constant { rows, typ },
869            when: QueryWhen::Immediately,
870            finishing: RowSetFinishing::trivial(arity),
871            copy_to: None,
872        }
873    }
874}
875
876#[derive(Debug, Clone)]
877pub enum SubscribeOutput {
878    Diffs,
879    WithinTimestampOrderBy {
880        /// We pretend that mz_diff is prepended to the normal columns, making it index 0
881        order_by: Vec<ColumnOrder>,
882    },
883    EnvelopeUpsert {
884        /// Order by with just keys
885        order_by_keys: Vec<ColumnOrder>,
886    },
887    EnvelopeDebezium {
888        /// Order by with just keys
889        order_by_keys: Vec<ColumnOrder>,
890    },
891}
892
893impl SubscribeOutput {
894    pub fn row_order(&self) -> &[ColumnOrder] {
895        match self {
896            SubscribeOutput::Diffs => &[],
897            // This ordering prepends the diff, so its `order_by` field cannot be applied to rows.
898            SubscribeOutput::WithinTimestampOrderBy { .. } => &[],
899            SubscribeOutput::EnvelopeUpsert { order_by_keys } => order_by_keys,
900            SubscribeOutput::EnvelopeDebezium { order_by_keys } => order_by_keys,
901        }
902    }
903}
904
905#[derive(Debug, Clone)]
906pub struct SubscribePlan {
907    pub from: SubscribeFrom,
908    pub with_snapshot: bool,
909    pub when: QueryWhen,
910    pub up_to: Option<Timestamp>,
911    pub copy_to: Option<CopyFormat>,
912    pub emit_progress: bool,
913    pub output: SubscribeOutput,
914}
915
916#[derive(Debug, Clone)]
917pub enum SubscribeFrom {
918    /// ID of the collection to subscribe to.
919    Id(GlobalId),
920    /// Query to subscribe to.
921    Query {
922        expr: HirRelationExpr,
923        desc: RelationDesc,
924    },
925}
926
927impl SubscribeFrom {
928    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
929        match self {
930            SubscribeFrom::Id(id) => BTreeSet::from([*id]),
931            SubscribeFrom::Query { expr, .. } => expr.depends_on(),
932        }
933    }
934
935    pub fn contains_temporal(&self) -> bool {
936        match self {
937            SubscribeFrom::Id(_) => false,
938            SubscribeFrom::Query { expr, .. } => expr
939                .contains_temporal()
940                .expect("Unexpected error in `visit_scalars` call"),
941        }
942    }
943}
944
945#[derive(Debug)]
946pub struct ShowCreatePlan {
947    pub id: ObjectId,
948    pub row: Row,
949}
950
951#[derive(Debug)]
952pub struct ShowColumnsPlan {
953    pub id: CatalogItemId,
954    pub select_plan: SelectPlan,
955    pub new_resolved_ids: ResolvedIds,
956}
957
958#[derive(Debug)]
959pub struct CopyFromPlan {
960    /// Table we're copying into.
961    pub target_id: CatalogItemId,
962    /// Human-readable full name of the target table.
963    pub target_name: String,
964    /// Source we're copying data from.
965    pub source: CopyFromSource,
966    /// How input columns map to those on the destination table.
967    ///
968    /// TODO(cf2): Remove this field in favor of the mfp.
969    pub columns: Vec<ColumnIndex>,
970    /// [`RelationDesc`] describing the input data.
971    pub source_desc: RelationDesc,
972    /// Changes the shape of the input data to match the destination table.
973    pub mfp: MapFilterProject,
974    /// Format specific params for copying the input data.
975    pub params: CopyFormatParams<'static>,
976    /// Filter for the source files we're copying from, e.g. an S3 prefix.
977    pub filter: Option<CopyFromFilter>,
978}
979
980#[derive(Debug)]
981pub enum CopyFromSource {
982    /// Copying from a file local to the user, transmitted via pgwire.
983    Stdin,
984    /// A remote resource, e.g. HTTP file.
985    ///
986    /// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
987    Url(HirScalarExpr),
988    /// A file in an S3 bucket.
989    AwsS3 {
990        /// Expression that evaluates to the file we want to copy.
991        uri: HirScalarExpr,
992        /// Details for how we connect to AWS S3.
993        connection: AwsConnection,
994        /// ID of the connection object.
995        connection_id: CatalogItemId,
996    },
997}
998
999#[derive(Debug)]
1000pub enum CopyFromFilter {
1001    Files(Vec<String>),
1002    Pattern(String),
1003}
1004
1005/// `COPY TO S3`
1006///
1007/// (This is a completely different thing from `COPY TO STDOUT`. That is a `Plan::Select` with
1008/// `copy_to` set.)
1009#[derive(Debug, Clone)]
1010pub struct CopyToPlan {
1011    /// The select query plan whose data will be copied to destination uri.
1012    pub select_plan: SelectPlan,
1013    pub desc: RelationDesc,
1014    /// The scalar expression to be resolved to get the destination uri.
1015    pub to: HirScalarExpr,
1016    pub connection: mz_storage_types::connections::Connection<ReferencedConnection>,
1017    /// The ID of the connection.
1018    pub connection_id: CatalogItemId,
1019    pub format: S3SinkFormat,
1020    pub max_file_size: u64,
1021}
1022
1023#[derive(Clone, Debug)]
1024pub struct ExplainPlanPlan {
1025    pub stage: ExplainStage,
1026    pub format: ExplainFormat,
1027    pub config: ExplainConfig,
1028    pub explainee: Explainee,
1029}
1030
1031/// The type of object to be explained
1032#[derive(Clone, Debug)]
1033pub enum Explainee {
1034    /// Lookup and explain a plan saved for an view.
1035    View(CatalogItemId),
1036    /// Lookup and explain a plan saved for an existing materialized view.
1037    MaterializedView(CatalogItemId),
1038    /// Lookup and explain a plan saved for an existing index.
1039    Index(CatalogItemId),
1040    /// Replan an existing view.
1041    ReplanView(CatalogItemId),
1042    /// Replan an existing materialized view.
1043    ReplanMaterializedView(CatalogItemId),
1044    /// Replan an existing index.
1045    ReplanIndex(CatalogItemId),
1046    /// A SQL statement.
1047    Statement(ExplaineeStatement),
1048}
1049
1050/// Explainee types that are statements.
1051#[derive(Clone, Debug, EnumKind)]
1052#[enum_kind(ExplaineeStatementKind)]
1053pub enum ExplaineeStatement {
1054    /// The object to be explained is a SELECT statement.
1055    Select {
1056        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1057        broken: bool,
1058        plan: plan::SelectPlan,
1059        desc: RelationDesc,
1060    },
1061    /// The object to be explained is a CREATE VIEW.
1062    CreateView {
1063        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1064        broken: bool,
1065        plan: plan::CreateViewPlan,
1066    },
1067    /// The object to be explained is a CREATE MATERIALIZED VIEW.
1068    CreateMaterializedView {
1069        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1070        broken: bool,
1071        plan: plan::CreateMaterializedViewPlan,
1072    },
1073    /// The object to be explained is a CREATE INDEX.
1074    CreateIndex {
1075        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1076        broken: bool,
1077        plan: plan::CreateIndexPlan,
1078    },
1079    /// The object to be explained is a SUBSCRIBE statement.
1080    Subscribe {
1081        /// Broken flag (see [`ExplaineeStatement::broken()`]).
1082        broken: bool,
1083        plan: plan::SubscribePlan,
1084    },
1085}
1086
1087impl ExplaineeStatement {
1088    pub fn depends_on(&self) -> BTreeSet<GlobalId> {
1089        match self {
1090            Self::Select { plan, .. } => plan.source.depends_on(),
1091            Self::CreateView { plan, .. } => plan.view.expr.depends_on(),
1092            Self::CreateMaterializedView { plan, .. } => plan.materialized_view.expr.depends_on(),
1093            Self::CreateIndex { plan, .. } => btreeset! {plan.index.on},
1094            Self::Subscribe { plan, .. } => plan.from.depends_on(),
1095        }
1096    }
1097
1098    /// Statements that have their `broken` flag set are expected to cause a
1099    /// panic in the optimizer code. In this case:
1100    ///
1101    /// 1. The optimizer pipeline execution will stop, but the panic will be
1102    ///    intercepted and will not propagate to the caller. The partial
1103    ///    optimizer trace collected until this point will be available.
1104    /// 2. The optimizer trace tracing subscriber will delegate regular tracing
1105    ///    spans and events to the default subscriber.
1106    ///
1107    /// This is useful when debugging queries that cause panics.
1108    pub fn broken(&self) -> bool {
1109        match self {
1110            Self::Select { broken, .. } => *broken,
1111            Self::CreateView { broken, .. } => *broken,
1112            Self::CreateMaterializedView { broken, .. } => *broken,
1113            Self::CreateIndex { broken, .. } => *broken,
1114            Self::Subscribe { broken, .. } => *broken,
1115        }
1116    }
1117}
1118
1119impl ExplaineeStatementKind {
1120    pub fn supports(&self, stage: &ExplainStage) -> bool {
1121        use ExplainStage::*;
1122        match self {
1123            Self::Select => true,
1124            Self::CreateView => ![GlobalPlan, PhysicalPlan].contains(stage),
1125            Self::CreateMaterializedView => true,
1126            Self::CreateIndex => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1127            // SUBSCRIBE doesn't support RAW, DECORRELATED, or LOCAL stages because
1128            // it takes MIR directly rather than going through HIR lowering.
1129            Self::Subscribe => ![RawPlan, DecorrelatedPlan, LocalPlan].contains(stage),
1130        }
1131    }
1132}
1133
1134impl std::fmt::Display for ExplaineeStatementKind {
1135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1136        match self {
1137            Self::Select => write!(f, "SELECT"),
1138            Self::CreateView => write!(f, "CREATE VIEW"),
1139            Self::CreateMaterializedView => write!(f, "CREATE MATERIALIZED VIEW"),
1140            Self::CreateIndex => write!(f, "CREATE INDEX"),
1141            Self::Subscribe => write!(f, "SUBSCRIBE"),
1142        }
1143    }
1144}
1145
1146#[derive(Clone, Debug)]
1147pub struct ExplainPushdownPlan {
1148    pub explainee: Explainee,
1149}
1150
1151#[derive(Clone, Debug)]
1152pub struct ExplainTimestampPlan {
1153    pub format: ExplainFormat,
1154    pub raw_plan: HirRelationExpr,
1155    pub when: QueryWhen,
1156}
1157
1158#[derive(Debug)]
1159pub struct ExplainSinkSchemaPlan {
1160    pub sink_from: GlobalId,
1161    pub json_schema: String,
1162}
1163
1164#[derive(Debug)]
1165pub struct SendDiffsPlan {
1166    pub id: CatalogItemId,
1167    pub updates: Vec<(Row, Diff)>,
1168    pub kind: MutationKind,
1169    pub returning: Vec<(Row, NonZeroUsize)>,
1170    pub max_result_size: u64,
1171}
1172
1173#[derive(Debug)]
1174pub struct InsertPlan {
1175    pub id: CatalogItemId,
1176    pub values: HirRelationExpr,
1177    pub returning: Vec<mz_expr::MirScalarExpr>,
1178}
1179
1180#[derive(Debug)]
1181pub struct ReadThenWritePlan {
1182    pub id: CatalogItemId,
1183    pub selection: HirRelationExpr,
1184    pub finishing: RowSetFinishing,
1185    pub assignments: BTreeMap<usize, mz_expr::MirScalarExpr>,
1186    pub kind: MutationKind,
1187    pub returning: Vec<mz_expr::MirScalarExpr>,
1188}
1189
1190/// Generated by `ALTER ... IF EXISTS` if the named object did not exist.
1191#[derive(Debug)]
1192pub struct AlterNoopPlan {
1193    pub object_type: ObjectType,
1194}
1195
1196#[derive(Debug)]
1197pub struct AlterSetClusterPlan {
1198    pub id: CatalogItemId,
1199    pub set_cluster: ClusterId,
1200}
1201
1202#[derive(Debug)]
1203pub struct AlterRetainHistoryPlan {
1204    pub id: CatalogItemId,
1205    pub value: Option<Value>,
1206    pub window: CompactionWindow,
1207    pub object_type: ObjectType,
1208}
1209
1210#[derive(Debug)]
1211pub struct AlterSourceTimestampIntervalPlan {
1212    pub id: CatalogItemId,
1213    pub value: Option<Value>,
1214    pub interval: Duration,
1215}
1216
1217#[derive(Debug, Clone)]
1218
1219pub enum AlterOptionParameter<T = String> {
1220    Set(T),
1221    Reset,
1222    Unchanged,
1223}
1224
1225#[derive(Debug)]
1226pub enum AlterConnectionAction {
1227    RotateKeys,
1228    AlterOptions {
1229        set_options: BTreeMap<ConnectionOptionName, Option<WithOptionValue<Aug>>>,
1230        drop_options: BTreeSet<ConnectionOptionName>,
1231        validate: bool,
1232    },
1233}
1234
1235#[derive(Debug)]
1236pub struct AlterConnectionPlan {
1237    pub id: CatalogItemId,
1238    pub action: AlterConnectionAction,
1239}
1240
1241#[derive(Debug)]
1242pub enum AlterSourceAction {
1243    AddSubsourceExports {
1244        subsources: Vec<CreateSourcePlanBundle>,
1245        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1246    },
1247    RefreshReferences {
1248        references: SourceReferences,
1249    },
1250}
1251
1252#[derive(Debug)]
1253pub struct AlterSourcePlan {
1254    pub item_id: CatalogItemId,
1255    pub ingestion_id: GlobalId,
1256    pub action: AlterSourceAction,
1257}
1258
1259#[derive(Debug, Clone)]
1260pub struct AlterSinkPlan {
1261    pub item_id: CatalogItemId,
1262    pub global_id: GlobalId,
1263    pub sink: Sink,
1264    pub with_snapshot: bool,
1265    pub in_cluster: ClusterId,
1266}
1267
1268#[derive(Debug, Clone)]
1269pub struct AlterClusterPlan {
1270    pub id: ClusterId,
1271    pub name: String,
1272    pub options: PlanClusterOption,
1273    pub strategy: AlterClusterPlanStrategy,
1274}
1275
1276#[derive(Debug)]
1277pub struct AlterClusterRenamePlan {
1278    pub id: ClusterId,
1279    pub name: String,
1280    pub to_name: String,
1281}
1282
1283#[derive(Debug)]
1284pub struct AlterClusterReplicaRenamePlan {
1285    pub cluster_id: ClusterId,
1286    pub replica_id: ReplicaId,
1287    pub name: QualifiedReplica,
1288    pub to_name: String,
1289}
1290
1291#[derive(Debug)]
1292pub struct AlterItemRenamePlan {
1293    pub id: CatalogItemId,
1294    pub current_full_name: FullItemName,
1295    pub to_name: String,
1296    pub object_type: ObjectType,
1297}
1298
1299#[derive(Debug)]
1300pub struct AlterSchemaRenamePlan {
1301    pub cur_schema_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1302    pub new_schema_name: String,
1303}
1304
1305#[derive(Debug)]
1306pub struct AlterSchemaSwapPlan {
1307    pub schema_a_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1308    pub schema_a_name: String,
1309    pub schema_b_spec: (ResolvedDatabaseSpecifier, SchemaSpecifier),
1310    pub schema_b_name: String,
1311    pub name_temp: String,
1312}
1313
1314#[derive(Debug)]
1315pub struct AlterClusterSwapPlan {
1316    pub id_a: ClusterId,
1317    pub id_b: ClusterId,
1318    pub name_a: String,
1319    pub name_b: String,
1320    pub name_temp: String,
1321}
1322
1323#[derive(Debug)]
1324pub struct AlterSecretPlan {
1325    pub id: CatalogItemId,
1326    pub secret_as: MirScalarExpr,
1327}
1328
1329#[derive(Debug)]
1330pub struct AlterSystemSetPlan {
1331    pub name: String,
1332    pub value: VariableValue,
1333}
1334
1335#[derive(Debug)]
1336pub struct AlterSystemResetPlan {
1337    pub name: String,
1338}
1339
1340#[derive(Debug)]
1341pub struct AlterSystemResetAllPlan {}
1342
1343#[derive(Debug)]
1344pub struct AlterRolePlan {
1345    pub id: RoleId,
1346    pub name: String,
1347    pub option: PlannedAlterRoleOption,
1348}
1349
1350#[derive(Debug)]
1351pub struct AlterOwnerPlan {
1352    pub id: ObjectId,
1353    pub object_type: ObjectType,
1354    pub new_owner: RoleId,
1355}
1356
1357#[derive(Debug)]
1358pub struct AlterTablePlan {
1359    pub relation_id: CatalogItemId,
1360    pub column_name: ColumnName,
1361    pub column_type: SqlColumnType,
1362    pub raw_sql_type: RawDataType,
1363}
1364
1365#[derive(Debug, Clone)]
1366pub struct AlterMaterializedViewApplyReplacementPlan {
1367    pub id: CatalogItemId,
1368    pub replacement_id: CatalogItemId,
1369}
1370
1371#[derive(Debug)]
1372pub struct DeclarePlan {
1373    pub name: String,
1374    pub stmt: Statement<Raw>,
1375    pub sql: String,
1376    pub params: Params,
1377}
1378
1379#[derive(Debug)]
1380pub struct FetchPlan {
1381    pub name: String,
1382    pub count: Option<FetchDirection>,
1383    pub timeout: ExecuteTimeout,
1384}
1385
1386#[derive(Debug)]
1387pub struct ClosePlan {
1388    pub name: String,
1389}
1390
1391#[derive(Debug)]
1392pub struct PreparePlan {
1393    pub name: String,
1394    pub stmt: Statement<Raw>,
1395    pub sql: String,
1396    pub desc: StatementDesc,
1397}
1398
1399#[derive(Debug)]
1400pub struct ExecutePlan {
1401    pub name: String,
1402    pub params: Params,
1403}
1404
1405#[derive(Debug)]
1406pub struct DeallocatePlan {
1407    pub name: Option<String>,
1408}
1409
1410#[derive(Debug)]
1411pub struct RaisePlan {
1412    pub severity: NoticeSeverity,
1413}
1414
1415#[derive(Debug)]
1416pub struct GrantRolePlan {
1417    /// The roles that are gaining members.
1418    pub role_ids: Vec<RoleId>,
1419    /// The roles that will be added to `role_id`.
1420    pub member_ids: Vec<RoleId>,
1421    /// The role that granted the membership.
1422    pub grantor_id: RoleId,
1423}
1424
1425#[derive(Debug)]
1426pub struct RevokeRolePlan {
1427    /// The roles that are losing members.
1428    pub role_ids: Vec<RoleId>,
1429    /// The roles that will be removed from `role_id`.
1430    pub member_ids: Vec<RoleId>,
1431    /// The role that revoked the membership.
1432    pub grantor_id: RoleId,
1433}
1434
1435#[derive(Debug)]
1436pub struct UpdatePrivilege {
1437    /// The privileges being granted/revoked on an object.
1438    pub acl_mode: AclMode,
1439    /// The ID of the object receiving privileges.
1440    pub target_id: SystemObjectId,
1441    /// The role that is granting the privileges.
1442    pub grantor: RoleId,
1443}
1444
1445#[derive(Debug)]
1446pub struct GrantPrivilegesPlan {
1447    /// Description of each privilege being granted.
1448    pub update_privileges: Vec<UpdatePrivilege>,
1449    /// The roles that will granted the privileges.
1450    pub grantees: Vec<RoleId>,
1451}
1452
1453#[derive(Debug)]
1454pub struct RevokePrivilegesPlan {
1455    /// Description of each privilege being revoked.
1456    pub update_privileges: Vec<UpdatePrivilege>,
1457    /// The roles that will have privileges revoked.
1458    pub revokees: Vec<RoleId>,
1459}
1460#[derive(Debug)]
1461pub struct AlterDefaultPrivilegesPlan {
1462    /// Description of objects that match this default privilege.
1463    pub privilege_objects: Vec<DefaultPrivilegeObject>,
1464    /// The privilege to be granted/revoked from the matching objects.
1465    pub privilege_acl_items: Vec<DefaultPrivilegeAclItem>,
1466    /// Whether this is a grant or revoke.
1467    pub is_grant: bool,
1468}
1469
1470#[derive(Debug)]
1471pub struct ReassignOwnedPlan {
1472    /// The roles whose owned objects are being reassigned.
1473    pub old_roles: Vec<RoleId>,
1474    /// The new owner of the objects.
1475    pub new_role: RoleId,
1476    /// All object IDs to reassign.
1477    pub reassign_ids: Vec<ObjectId>,
1478}
1479
1480#[derive(Debug)]
1481pub struct CommentPlan {
1482    /// The object that this comment is associated with.
1483    pub object_id: CommentObjectId,
1484    /// A sub-component of the object that this comment is associated with, e.g. a column.
1485    ///
1486    /// TODO(parkmycar): <https://github.com/MaterializeInc/database-issues/issues/6711>.
1487    pub sub_component: Option<usize>,
1488    /// The comment itself. If `None` that indicates we should clear the existing comment.
1489    pub comment: Option<String>,
1490}
1491
1492#[derive(Clone, Debug)]
1493pub enum TableDataSource {
1494    /// The table owns data created via INSERT/UPDATE/DELETE statements.
1495    TableWrites { defaults: Vec<Expr<Aug>> },
1496
1497    /// The table receives its data from the identified `DataSourceDesc`.
1498    /// This table type does not support INSERT/UPDATE/DELETE statements.
1499    DataSource {
1500        desc: DataSourceDesc,
1501        timeline: Timeline,
1502    },
1503}
1504
1505#[derive(Clone, Debug)]
1506pub struct Table {
1507    pub create_sql: String,
1508    pub desc: VersionedRelationDesc,
1509    pub temporary: bool,
1510    pub compaction_window: Option<CompactionWindow>,
1511    pub data_source: TableDataSource,
1512}
1513
1514#[derive(Clone, Debug)]
1515pub struct Source {
1516    pub create_sql: String,
1517    pub data_source: DataSourceDesc,
1518    pub desc: RelationDesc,
1519    pub compaction_window: Option<CompactionWindow>,
1520}
1521
1522#[derive(Debug, Clone)]
1523pub enum DataSourceDesc {
1524    /// Receives data from an external system.
1525    Ingestion(SourceDesc<ReferencedConnection>),
1526    /// Receives data from an external system.
1527    OldSyntaxIngestion {
1528        desc: SourceDesc<ReferencedConnection>,
1529        // If we're dealing with an old syntax ingestion the progress id will be some other collection
1530        // and the ingestion itself will have the data from a default external reference
1531        progress_subsource: CatalogItemId,
1532        data_config: SourceExportDataConfig<ReferencedConnection>,
1533        details: SourceExportDetails,
1534    },
1535    /// This source receives its data from the identified ingestion,
1536    /// specifically the output identified by `external_reference`.
1537    IngestionExport {
1538        ingestion_id: CatalogItemId,
1539        external_reference: UnresolvedItemName,
1540        details: SourceExportDetails,
1541        data_config: SourceExportDataConfig<ReferencedConnection>,
1542    },
1543    /// Receives data from the source's reclocking/remapping operations.
1544    Progress,
1545    /// Receives data from HTTP post requests.
1546    Webhook {
1547        validate_using: Option<WebhookValidation>,
1548        body_format: WebhookBodyFormat,
1549        headers: WebhookHeaders,
1550        /// Only `Some` when created via `CREATE TABLE ... FROM WEBHOOK`.
1551        cluster_id: Option<StorageInstanceId>,
1552    },
1553}
1554
1555#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1556pub struct WebhookValidation {
1557    /// The expression used to validate a request.
1558    pub expression: MirScalarExpr,
1559    /// Description of the source that will be created.
1560    pub relation_desc: RelationDesc,
1561    /// The column index to provide the request body and whether to provide it as bytes.
1562    pub bodies: Vec<(usize, bool)>,
1563    /// The column index to provide the request headers and whether to provide the values as bytes.
1564    pub headers: Vec<(usize, bool)>,
1565    /// Any secrets that are used in that validation.
1566    pub secrets: Vec<WebhookValidationSecret>,
1567}
1568
1569impl WebhookValidation {
1570    const MAX_REDUCE_TIME: Duration = Duration::from_secs(60);
1571
1572    /// Attempt to reduce the internal [`MirScalarExpr`] into a simpler expression.
1573    ///
1574    /// The reduction happens on a separate thread, we also only wait for
1575    /// `WebhookValidation::MAX_REDUCE_TIME` before timing out and returning an error.
1576    pub async fn reduce_expression(&mut self) -> Result<(), &'static str> {
1577        let WebhookValidation {
1578            expression,
1579            relation_desc,
1580            ..
1581        } = self;
1582
1583        // On a different thread, attempt to reduce the expression.
1584        let mut expression_ = expression.clone();
1585        let desc_ = relation_desc.clone();
1586        let reduce_task = mz_ore::task::spawn_blocking(
1587            || "webhook-validation-reduce",
1588            move || {
1589                let repr_col_types: Vec<ReprColumnType> = desc_
1590                    .typ()
1591                    .column_types
1592                    .iter()
1593                    .map(ReprColumnType::from)
1594                    .collect();
1595                expression_.reduce(&repr_col_types);
1596                expression_
1597            },
1598        );
1599
1600        match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await {
1601            Ok(reduced_expr) => {
1602                *expression = reduced_expr;
1603                Ok(())
1604            }
1605            Err(_) => Err("timeout"),
1606        }
1607    }
1608}
1609
1610#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1611pub struct WebhookHeaders {
1612    /// Optionally include a column named `headers` whose content is possibly filtered.
1613    pub header_column: Option<WebhookHeaderFilters>,
1614    /// The column index to provide the specific request header, and whether to provide it as bytes.
1615    pub mapped_headers: BTreeMap<usize, (String, bool)>,
1616}
1617
1618impl WebhookHeaders {
1619    /// Returns the number of columns needed to represent our headers.
1620    pub fn num_columns(&self) -> usize {
1621        let header_column = self.header_column.as_ref().map(|_| 1).unwrap_or(0);
1622        let mapped_headers = self.mapped_headers.len();
1623
1624        header_column + mapped_headers
1625    }
1626}
1627
1628#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
1629pub struct WebhookHeaderFilters {
1630    pub block: BTreeSet<String>,
1631    pub allow: BTreeSet<String>,
1632}
1633
1634#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Arbitrary)]
1635pub enum WebhookBodyFormat {
1636    Json { array: bool },
1637    Bytes,
1638    Text,
1639}
1640
1641impl From<WebhookBodyFormat> for SqlScalarType {
1642    fn from(value: WebhookBodyFormat) -> Self {
1643        match value {
1644            WebhookBodyFormat::Json { .. } => SqlScalarType::Jsonb,
1645            WebhookBodyFormat::Bytes => SqlScalarType::Bytes,
1646            WebhookBodyFormat::Text => SqlScalarType::String,
1647        }
1648    }
1649}
1650
1651#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1652pub struct WebhookValidationSecret {
1653    /// Identifies the secret by [`CatalogItemId`].
1654    pub id: CatalogItemId,
1655    /// Column index for the expression context that this secret was originally evaluated in.
1656    pub column_idx: usize,
1657    /// Whether or not this secret should be provided to the expression as Bytes or a String.
1658    pub use_bytes: bool,
1659}
1660
1661#[derive(Clone, Debug)]
1662pub struct Connection {
1663    pub create_sql: String,
1664    pub details: ConnectionDetails,
1665}
1666
1667#[derive(Clone, Debug, Serialize)]
1668pub enum ConnectionDetails {
1669    Kafka(KafkaConnection<ReferencedConnection>),
1670    Csr(CsrConnection<ReferencedConnection>),
1671    Postgres(PostgresConnection<ReferencedConnection>),
1672    Ssh {
1673        connection: SshConnection,
1674        key_1: SshKey,
1675        key_2: SshKey,
1676    },
1677    Aws(AwsConnection),
1678    AwsPrivatelink(AwsPrivatelinkConnection),
1679    MySql(MySqlConnection<ReferencedConnection>),
1680    SqlServer(SqlServerConnectionDetails<ReferencedConnection>),
1681    IcebergCatalog(IcebergCatalogConnection<ReferencedConnection>),
1682}
1683
1684impl ConnectionDetails {
1685    pub fn to_connection(&self) -> mz_storage_types::connections::Connection<ReferencedConnection> {
1686        match self {
1687            ConnectionDetails::Kafka(c) => {
1688                mz_storage_types::connections::Connection::Kafka(c.clone())
1689            }
1690            ConnectionDetails::Csr(c) => mz_storage_types::connections::Connection::Csr(c.clone()),
1691            ConnectionDetails::Postgres(c) => {
1692                mz_storage_types::connections::Connection::Postgres(c.clone())
1693            }
1694            ConnectionDetails::Ssh { connection, .. } => {
1695                mz_storage_types::connections::Connection::Ssh(connection.clone())
1696            }
1697            ConnectionDetails::Aws(c) => mz_storage_types::connections::Connection::Aws(c.clone()),
1698            ConnectionDetails::AwsPrivatelink(c) => {
1699                mz_storage_types::connections::Connection::AwsPrivatelink(c.clone())
1700            }
1701            ConnectionDetails::MySql(c) => {
1702                mz_storage_types::connections::Connection::MySql(c.clone())
1703            }
1704            ConnectionDetails::SqlServer(c) => {
1705                mz_storage_types::connections::Connection::SqlServer(c.clone())
1706            }
1707            ConnectionDetails::IcebergCatalog(c) => {
1708                mz_storage_types::connections::Connection::IcebergCatalog(c.clone())
1709            }
1710        }
1711    }
1712}
1713
1714#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1715pub struct NetworkPolicyRule {
1716    pub name: String,
1717    pub action: NetworkPolicyRuleAction,
1718    pub address: PolicyAddress,
1719    pub direction: NetworkPolicyRuleDirection,
1720}
1721
1722#[derive(
1723    Debug,
1724    Clone,
1725    Serialize,
1726    Deserialize,
1727    PartialEq,
1728    Eq,
1729    Ord,
1730    PartialOrd,
1731    Hash
1732)]
1733pub enum NetworkPolicyRuleAction {
1734    Allow,
1735}
1736
1737impl std::fmt::Display for NetworkPolicyRuleAction {
1738    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1739        match self {
1740            Self::Allow => write!(f, "allow"),
1741        }
1742    }
1743}
1744impl TryFrom<&str> for NetworkPolicyRuleAction {
1745    type Error = PlanError;
1746    fn try_from(value: &str) -> Result<Self, Self::Error> {
1747        match value.to_uppercase().as_str() {
1748            "ALLOW" => Ok(Self::Allow),
1749            _ => Err(PlanError::Unstructured(
1750                "Allow is the only valid option".into(),
1751            )),
1752        }
1753    }
1754}
1755
1756#[derive(Debug, Clone, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)]
1757pub enum NetworkPolicyRuleDirection {
1758    Ingress,
1759}
1760impl std::fmt::Display for NetworkPolicyRuleDirection {
1761    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1762        match self {
1763            Self::Ingress => write!(f, "ingress"),
1764        }
1765    }
1766}
1767impl TryFrom<&str> for NetworkPolicyRuleDirection {
1768    type Error = PlanError;
1769    fn try_from(value: &str) -> Result<Self, Self::Error> {
1770        match value.to_uppercase().as_str() {
1771            "INGRESS" => Ok(Self::Ingress),
1772            _ => Err(PlanError::Unstructured(
1773                "Ingress is the only valid option".into(),
1774            )),
1775        }
1776    }
1777}
1778
1779#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
1780pub struct PolicyAddress(pub IpNet);
1781impl std::fmt::Display for PolicyAddress {
1782    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1783        write!(f, "{}", &self.0.to_string())
1784    }
1785}
1786impl From<String> for PolicyAddress {
1787    fn from(value: String) -> Self {
1788        Self(IpNet::from_str(&value).expect("expected value to be IpNet"))
1789    }
1790}
1791impl TryFrom<&str> for PolicyAddress {
1792    type Error = PlanError;
1793    fn try_from(value: &str) -> Result<Self, Self::Error> {
1794        let net = IpNet::from_str(value)
1795            .map_err(|_| PlanError::Unstructured("Value must be valid IPV4 or IPV6 CIDR".into()))?;
1796        Ok(Self(net))
1797    }
1798}
1799
1800impl Serialize for PolicyAddress {
1801    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1802    where
1803        S: serde::Serializer,
1804    {
1805        serializer.serialize_str(&format!("{}", &self.0))
1806    }
1807}
1808
1809#[derive(Clone, Debug, Serialize)]
1810pub enum SshKey {
1811    PublicOnly(String),
1812    Both(SshKeyPair),
1813}
1814
1815impl SshKey {
1816    pub fn as_key_pair(&self) -> Option<&SshKeyPair> {
1817        match self {
1818            SshKey::PublicOnly(_) => None,
1819            SshKey::Both(key_pair) => Some(key_pair),
1820        }
1821    }
1822
1823    pub fn public_key(&self) -> String {
1824        match self {
1825            SshKey::PublicOnly(s) => s.into(),
1826            SshKey::Both(p) => p.ssh_public_key(),
1827        }
1828    }
1829}
1830
1831#[derive(Clone, Debug)]
1832pub struct Secret {
1833    pub create_sql: String,
1834    pub secret_as: MirScalarExpr,
1835}
1836
1837#[derive(Clone, Debug)]
1838pub struct Sink {
1839    /// Parse-able SQL that is stored durably and defines this sink.
1840    pub create_sql: String,
1841    /// Collection we read into this sink.
1842    pub from: GlobalId,
1843    /// Type of connection to the external service we sink into.
1844    pub connection: StorageSinkConnection<ReferencedConnection>,
1845    // TODO(guswynn): this probably should just be in the `connection`.
1846    pub envelope: SinkEnvelope,
1847    pub version: u64,
1848    pub commit_interval: Option<Duration>,
1849}
1850
1851#[derive(Clone, Debug)]
1852pub struct View {
1853    /// Parse-able SQL that is stored durably and defines this view.
1854    pub create_sql: String,
1855    /// Unoptimized high-level expression from parsing the `create_sql`.
1856    pub expr: HirRelationExpr,
1857    /// All of the catalog objects that are referenced by this view, according to the `expr`.
1858    pub dependencies: DependencyIds,
1859    /// Columns of this view.
1860    pub column_names: Vec<ColumnName>,
1861    /// If this view is created in the temporary schema, e.g. `CREATE TEMPORARY ...`.
1862    pub temporary: bool,
1863}
1864
1865#[derive(Clone, Debug)]
1866pub struct MaterializedView {
1867    /// Parse-able SQL that is stored durably and defines this materialized view.
1868    pub create_sql: String,
1869    /// Unoptimized high-level expression from parsing the `create_sql`.
1870    pub expr: HirRelationExpr,
1871    /// All of the catalog objects that are referenced by this materialized view, according to the `expr`.
1872    pub dependencies: DependencyIds,
1873    /// Columns of this view.
1874    pub column_names: Vec<ColumnName>,
1875    pub replacement_target: Option<CatalogItemId>,
1876    /// Cluster this materialized view will get installed on.
1877    pub cluster_id: ClusterId,
1878    /// If set, only install this materialized view's dataflow on the specified replica.
1879    pub target_replica: Option<ReplicaId>,
1880    pub non_null_assertions: Vec<usize>,
1881    pub compaction_window: Option<CompactionWindow>,
1882    pub refresh_schedule: Option<RefreshSchedule>,
1883    pub as_of: Option<Timestamp>,
1884}
1885
1886#[derive(Clone, Debug)]
1887pub struct Index {
1888    /// Parse-able SQL that is stored durably and defines this index.
1889    pub create_sql: String,
1890    /// Collection this index is on top of.
1891    pub on: GlobalId,
1892    pub keys: Vec<mz_expr::MirScalarExpr>,
1893    pub compaction_window: Option<CompactionWindow>,
1894    pub cluster_id: ClusterId,
1895}
1896
1897#[derive(Clone, Debug)]
1898pub struct Type {
1899    pub create_sql: String,
1900    pub inner: CatalogType<IdReference>,
1901}
1902
1903/// Specifies when a `Peek` or `Subscribe` should occur.
1904#[derive(Deserialize, Clone, Debug, PartialEq)]
1905pub enum QueryWhen {
1906    /// The peek should occur at the latest possible timestamp that allows the
1907    /// peek to complete immediately.
1908    Immediately,
1909    /// The peek should occur at a timestamp that allows the peek to see all
1910    /// data written to tables within Materialize.
1911    FreshestTableWrite,
1912    /// The peek should occur at the timestamp described by the specified
1913    /// expression.
1914    ///
1915    /// The expression may have any type.
1916    AtTimestamp(Timestamp),
1917    /// Same as Immediately, but will also advance to at least the specified
1918    /// expression.
1919    AtLeastTimestamp(Timestamp),
1920}
1921
1922impl QueryWhen {
1923    /// Returns a timestamp to which the candidate must be advanced.
1924    pub fn advance_to_timestamp(&self) -> Option<Timestamp> {
1925        match self {
1926            QueryWhen::AtTimestamp(t) | QueryWhen::AtLeastTimestamp(t) => Some(t.clone()),
1927            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => None,
1928        }
1929    }
1930    /// Returns whether the candidate's upper bound is constrained.
1931    /// This is only true for `AtTimestamp` since it is the only variant that
1932    /// specifies a timestamp.
1933    pub fn constrains_upper(&self) -> bool {
1934        match self {
1935            QueryWhen::AtTimestamp(_) => true,
1936            QueryWhen::AtLeastTimestamp(_)
1937            | QueryWhen::Immediately
1938            | QueryWhen::FreshestTableWrite => false,
1939        }
1940    }
1941    /// Returns whether the candidate must be advanced to the since.
1942    pub fn advance_to_since(&self) -> bool {
1943        match self {
1944            QueryWhen::Immediately
1945            | QueryWhen::AtLeastTimestamp(_)
1946            | QueryWhen::FreshestTableWrite => true,
1947            QueryWhen::AtTimestamp(_) => false,
1948        }
1949    }
1950    /// Returns whether the candidate can be advanced to the upper.
1951    pub fn can_advance_to_upper(&self) -> bool {
1952        match self {
1953            QueryWhen::Immediately => true,
1954            QueryWhen::FreshestTableWrite
1955            | QueryWhen::AtTimestamp(_)
1956            | QueryWhen::AtLeastTimestamp(_) => false,
1957        }
1958    }
1959
1960    /// Returns whether the candidate can be advanced to the timeline's timestamp.
1961    pub fn can_advance_to_timeline_ts(&self) -> bool {
1962        match self {
1963            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1964            QueryWhen::AtTimestamp(_) | QueryWhen::AtLeastTimestamp(_) => false,
1965        }
1966    }
1967    /// Returns whether the candidate must be advanced to the timeline's timestamp.
1968    pub fn must_advance_to_timeline_ts(&self) -> bool {
1969        match self {
1970            QueryWhen::FreshestTableWrite => true,
1971            QueryWhen::Immediately | QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => {
1972                false
1973            }
1974        }
1975    }
1976    /// Returns whether the selected timestamp should be tracked within the current transaction.
1977    pub fn is_transactional(&self) -> bool {
1978        match self {
1979            QueryWhen::Immediately | QueryWhen::FreshestTableWrite => true,
1980            QueryWhen::AtLeastTimestamp(_) | QueryWhen::AtTimestamp(_) => false,
1981        }
1982    }
1983}
1984
1985#[derive(Debug, Copy, Clone)]
1986pub enum MutationKind {
1987    Insert,
1988    Update,
1989    Delete,
1990}
1991
1992#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
1993pub enum CopyFormat {
1994    Text,
1995    Csv,
1996    Binary,
1997    Parquet,
1998}
1999
2000#[derive(Debug, Copy, Clone)]
2001pub enum ExecuteTimeout {
2002    None,
2003    Seconds(f64),
2004    WaitOnce,
2005}
2006
2007#[derive(Clone, Debug)]
2008pub enum IndexOption {
2009    /// Configures the logical compaction window for an index.
2010    RetainHistory(CompactionWindow),
2011}
2012
2013#[derive(Clone, Debug)]
2014pub enum TableOption {
2015    /// Configures the logical compaction window for a table.
2016    RetainHistory(CompactionWindow),
2017}
2018
2019#[derive(Clone, Debug)]
2020pub struct PlanClusterOption {
2021    pub availability_zones: AlterOptionParameter<Vec<String>>,
2022    pub introspection_debugging: AlterOptionParameter<bool>,
2023    pub introspection_interval: AlterOptionParameter<OptionalDuration>,
2024    pub managed: AlterOptionParameter<bool>,
2025    pub replicas: AlterOptionParameter<Vec<(String, ReplicaConfig)>>,
2026    pub replication_factor: AlterOptionParameter<u32>,
2027    pub size: AlterOptionParameter,
2028    pub schedule: AlterOptionParameter<ClusterSchedule>,
2029    pub workload_class: AlterOptionParameter<Option<String>>,
2030}
2031
2032impl Default for PlanClusterOption {
2033    fn default() -> Self {
2034        Self {
2035            availability_zones: AlterOptionParameter::Unchanged,
2036            introspection_debugging: AlterOptionParameter::Unchanged,
2037            introspection_interval: AlterOptionParameter::Unchanged,
2038            managed: AlterOptionParameter::Unchanged,
2039            replicas: AlterOptionParameter::Unchanged,
2040            replication_factor: AlterOptionParameter::Unchanged,
2041            size: AlterOptionParameter::Unchanged,
2042            schedule: AlterOptionParameter::Unchanged,
2043            workload_class: AlterOptionParameter::Unchanged,
2044        }
2045    }
2046}
2047
2048#[derive(Clone, Debug, PartialEq, Eq)]
2049pub enum AlterClusterPlanStrategy {
2050    None,
2051    For(Duration),
2052    UntilReady {
2053        on_timeout: OnTimeoutAction,
2054        timeout: Duration,
2055    },
2056}
2057
2058#[derive(Clone, Debug, PartialEq, Eq)]
2059pub enum OnTimeoutAction {
2060    Commit,
2061    Rollback,
2062}
2063
2064impl Default for OnTimeoutAction {
2065    fn default() -> Self {
2066        Self::Commit
2067    }
2068}
2069
2070impl TryFrom<&str> for OnTimeoutAction {
2071    type Error = PlanError;
2072    fn try_from(value: &str) -> Result<Self, Self::Error> {
2073        match value.to_uppercase().as_str() {
2074            "COMMIT" => Ok(Self::Commit),
2075            "ROLLBACK" => Ok(Self::Rollback),
2076            _ => Err(PlanError::Unstructured(
2077                "Valid options are COMMIT, ROLLBACK".into(),
2078            )),
2079        }
2080    }
2081}
2082
2083impl AlterClusterPlanStrategy {
2084    pub fn is_none(&self) -> bool {
2085        matches!(self, Self::None)
2086    }
2087    pub fn is_some(&self) -> bool {
2088        !matches!(self, Self::None)
2089    }
2090}
2091
2092impl TryFrom<ClusterAlterOptionExtracted> for AlterClusterPlanStrategy {
2093    type Error = PlanError;
2094
2095    fn try_from(value: ClusterAlterOptionExtracted) -> Result<Self, Self::Error> {
2096        Ok(match value.wait {
2097            Some(ClusterAlterOptionValue::For(d)) => Self::For(Duration::try_from_value(d)?),
2098            Some(ClusterAlterOptionValue::UntilReady(options)) => {
2099                let extracted = ClusterAlterUntilReadyOptionExtracted::try_from(options)?;
2100                Self::UntilReady {
2101                    timeout: match extracted.timeout {
2102                        Some(d) => d,
2103                        None => Err(PlanError::UntilReadyTimeoutRequired)?,
2104                    },
2105                    on_timeout: match extracted.on_timeout {
2106                        Some(v) => OnTimeoutAction::try_from(v.as_str()).map_err(|e| {
2107                            PlanError::InvalidOptionValue {
2108                                option_name: "ON TIMEOUT".into(),
2109                                err: Box::new(e),
2110                            }
2111                        })?,
2112                        None => OnTimeoutAction::default(),
2113                    },
2114                }
2115            }
2116            None => Self::None,
2117        })
2118    }
2119}
2120
2121/// A vector of values to which parameter references should be bound.
2122#[derive(Debug, Clone)]
2123pub struct Params {
2124    /// The datums that were provided in the EXECUTE statement.
2125    pub datums: Row,
2126    /// The types of the datums provided in the EXECUTE statement.
2127    pub execute_types: Vec<SqlScalarType>,
2128    /// The types that the prepared statement expects based on its definition.
2129    pub expected_types: Vec<SqlScalarType>,
2130}
2131
2132impl Params {
2133    /// Returns a `Params` with no parameters.
2134    pub fn empty() -> Params {
2135        Params {
2136            datums: Row::pack_slice(&[]),
2137            execute_types: vec![],
2138            expected_types: vec![],
2139        }
2140    }
2141}
2142
2143/// Controls planning of a SQL query.
2144#[derive(
2145    Ord,
2146    PartialOrd,
2147    Clone,
2148    Debug,
2149    Eq,
2150    PartialEq,
2151    Serialize,
2152    Deserialize,
2153    Hash,
2154    Copy
2155)]
2156pub struct PlanContext {
2157    pub wall_time: DateTime<Utc>,
2158    pub ignore_if_exists_errors: bool,
2159}
2160
2161impl PlanContext {
2162    pub fn new(wall_time: DateTime<Utc>) -> Self {
2163        Self {
2164            wall_time,
2165            ignore_if_exists_errors: false,
2166        }
2167    }
2168
2169    /// Return a PlanContext with zero values. This should only be used when
2170    /// planning is required but unused (like in `plan_create_table()`) or in
2171    /// tests.
2172    pub fn zero() -> Self {
2173        PlanContext {
2174            wall_time: now::to_datetime(NOW_ZERO()),
2175            ignore_if_exists_errors: false,
2176        }
2177    }
2178
2179    pub fn with_ignore_if_exists_errors(mut self, value: bool) -> Self {
2180        self.ignore_if_exists_errors = value;
2181        self
2182    }
2183}